Merge 5d19ac4c63 into bdab058394
This commit is contained in:
commit
eb3828a066
2 changed files with 41 additions and 0 deletions
|
|
@ -14,6 +14,7 @@
|
|||
*/
|
||||
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||
|
||||
import java.net.ProxySelector;
|
||||
import java.util.Set;
|
||||
|
||||
import com.amazonaws.ClientConfiguration;
|
||||
|
|
|
|||
|
|
@ -16,6 +16,12 @@ package com.amazonaws.services.kinesis.multilang;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.Proxy;
|
||||
import java.net.ProxySelector;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
|
@ -23,6 +29,11 @@ import java.util.concurrent.Executors;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import com.amazonaws.ClientConfiguration;
|
||||
import com.amazonaws.Protocol;
|
||||
import com.amazonaws.regions.Region;
|
||||
import com.amazonaws.regions.RegionUtils;
|
||||
import com.amazonaws.regions.ServiceAbbreviations;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfigurator;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
|
||||
|
||||
|
|
@ -92,6 +103,12 @@ public class MultiLangDaemonConfig {
|
|||
String processingLanguage = properties.getProperty(PROP_PROCESSING_LANGUAGE);
|
||||
|
||||
kinesisClientLibConfig = configurator.getConfiguration(properties);
|
||||
|
||||
kinesisClientLibConfig
|
||||
.withKinesisClientConfig(buildClientConfig(ServiceAbbreviations.Kinesis, kinesisClientLibConfig))
|
||||
.withCloudWatchClientConfig(buildClientConfig(ServiceAbbreviations.CloudWatch, kinesisClientLibConfig))
|
||||
.withDynamoDBClientConfig(buildClientConfig(ServiceAbbreviations.Dynamodb, kinesisClientLibConfig));
|
||||
|
||||
executorService = buildExecutorService(properties);
|
||||
recordProcessorFactory = new MultiLangRecordProcessorFactory(executableName, executorService);
|
||||
|
||||
|
|
@ -100,6 +117,29 @@ public class MultiLangDaemonConfig {
|
|||
prepare(processingLanguage);
|
||||
}
|
||||
|
||||
private ClientConfiguration buildClientConfig(final String serviceName, KinesisClientLibConfiguration config)
|
||||
{
|
||||
try
|
||||
{
|
||||
Region region = RegionUtils.getRegion(config.getRegionName());
|
||||
String serviceEndpoint = serviceName + "." + region.getName() + "." + region.getDomain();
|
||||
ClientConfiguration clientConfig = new ClientConfiguration();
|
||||
List<Proxy> proxies = ProxySelector.getDefault().select(new URI(Protocol.HTTPS.toString(),
|
||||
serviceEndpoint, null, null));
|
||||
|
||||
Proxy proxy = proxies.isEmpty() ? Proxy.NO_PROXY : proxies.get(0);
|
||||
if (proxy.type() == Proxy.Type.HTTP) {
|
||||
InetSocketAddress addr = (InetSocketAddress)proxy.address();
|
||||
clientConfig = clientConfig.withProxyHost(addr.getHostString()).withProxyPort(addr.getPort());
|
||||
}
|
||||
return clientConfig;
|
||||
}
|
||||
catch(URISyntaxException e)
|
||||
{
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private void prepare(String processingLanguage) {
|
||||
// Ensure the JVM will refresh the cached IP values of AWS resources (e.g. service endpoints).
|
||||
java.security.Security.setProperty("networkaddress.cache.ttl", "60");
|
||||
|
|
|
|||
Loading…
Reference in a new issue