diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index 8ec40e48..8f0e33fd 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -14,6 +14,7 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +import java.net.ProxySelector; import java.util.Set; import com.amazonaws.ClientConfiguration; diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemonConfig.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemonConfig.java index 7793f12b..97ebfd95 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemonConfig.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemonConfig.java @@ -16,6 +16,12 @@ package com.amazonaws.services.kinesis.multilang; import java.io.IOException; import java.io.InputStream; +import java.net.InetSocketAddress; +import java.net.Proxy; +import java.net.ProxySelector; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.List; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -23,6 +29,11 @@ import java.util.concurrent.Executors; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import com.amazonaws.ClientConfiguration; +import com.amazonaws.Protocol; +import com.amazonaws.regions.Region; +import com.amazonaws.regions.RegionUtils; +import com.amazonaws.regions.ServiceAbbreviations; import com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfigurator; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; @@ -92,6 +103,12 @@ public class MultiLangDaemonConfig { String processingLanguage = properties.getProperty(PROP_PROCESSING_LANGUAGE); kinesisClientLibConfig = configurator.getConfiguration(properties); + + kinesisClientLibConfig + .withKinesisClientConfig(buildClientConfig(ServiceAbbreviations.Kinesis, kinesisClientLibConfig)) + .withCloudWatchClientConfig(buildClientConfig(ServiceAbbreviations.CloudWatch, kinesisClientLibConfig)) + .withDynamoDBClientConfig(buildClientConfig(ServiceAbbreviations.Dynamodb, kinesisClientLibConfig)); + executorService = buildExecutorService(properties); recordProcessorFactory = new MultiLangRecordProcessorFactory(executableName, executorService); @@ -100,6 +117,29 @@ public class MultiLangDaemonConfig { prepare(processingLanguage); } + private ClientConfiguration buildClientConfig(final String serviceName, KinesisClientLibConfiguration config) + { + try + { + Region region = RegionUtils.getRegion(config.getRegionName()); + String serviceEndpoint = serviceName + "." + region.getName() + "." + region.getDomain(); + ClientConfiguration clientConfig = new ClientConfiguration(); + List proxies = ProxySelector.getDefault().select(new URI(Protocol.HTTPS.toString(), + serviceEndpoint, null, null)); + + Proxy proxy = proxies.isEmpty() ? Proxy.NO_PROXY : proxies.get(0); + if (proxy.type() == Proxy.Type.HTTP) { + InetSocketAddress addr = (InetSocketAddress)proxy.address(); + clientConfig = clientConfig.withProxyHost(addr.getHostString()).withProxyPort(addr.getPort()); + } + return clientConfig; + } + catch(URISyntaxException e) + { + throw new RuntimeException(e); + } + } + private void prepare(String processingLanguage) { // Ensure the JVM will refresh the cached IP values of AWS resources (e.g. service endpoints). java.security.Security.setProperty("networkaddress.cache.ttl", "60");