Add some basic support for http proxy settings to MultiLangDaemon
This commit is contained in:
parent
850db1a3da
commit
5d19ac4c63
2 changed files with 41 additions and 0 deletions
|
|
@ -14,6 +14,7 @@
|
||||||
*/
|
*/
|
||||||
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||||
|
|
||||||
|
import java.net.ProxySelector;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import com.amazonaws.ClientConfiguration;
|
import com.amazonaws.ClientConfiguration;
|
||||||
|
|
|
||||||
|
|
@ -16,6 +16,12 @@ package com.amazonaws.services.kinesis.multilang;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.net.Proxy;
|
||||||
|
import java.net.ProxySelector;
|
||||||
|
import java.net.URI;
|
||||||
|
import java.net.URISyntaxException;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
|
|
@ -23,6 +29,11 @@ import java.util.concurrent.Executors;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
|
import com.amazonaws.ClientConfiguration;
|
||||||
|
import com.amazonaws.Protocol;
|
||||||
|
import com.amazonaws.regions.Region;
|
||||||
|
import com.amazonaws.regions.RegionUtils;
|
||||||
|
import com.amazonaws.regions.ServiceAbbreviations;
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfigurator;
|
import com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfigurator;
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
|
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
|
||||||
|
|
||||||
|
|
@ -92,6 +103,12 @@ public class MultiLangDaemonConfig {
|
||||||
String processingLanguage = properties.getProperty(PROP_PROCESSING_LANGUAGE);
|
String processingLanguage = properties.getProperty(PROP_PROCESSING_LANGUAGE);
|
||||||
|
|
||||||
kinesisClientLibConfig = configurator.getConfiguration(properties);
|
kinesisClientLibConfig = configurator.getConfiguration(properties);
|
||||||
|
|
||||||
|
kinesisClientLibConfig
|
||||||
|
.withKinesisClientConfig(buildClientConfig(ServiceAbbreviations.Kinesis, kinesisClientLibConfig))
|
||||||
|
.withCloudWatchClientConfig(buildClientConfig(ServiceAbbreviations.CloudWatch, kinesisClientLibConfig))
|
||||||
|
.withDynamoDBClientConfig(buildClientConfig(ServiceAbbreviations.Dynamodb, kinesisClientLibConfig));
|
||||||
|
|
||||||
executorService = buildExecutorService(properties);
|
executorService = buildExecutorService(properties);
|
||||||
recordProcessorFactory = new MultiLangRecordProcessorFactory(executableName, executorService);
|
recordProcessorFactory = new MultiLangRecordProcessorFactory(executableName, executorService);
|
||||||
|
|
||||||
|
|
@ -100,6 +117,29 @@ public class MultiLangDaemonConfig {
|
||||||
prepare(processingLanguage);
|
prepare(processingLanguage);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private ClientConfiguration buildClientConfig(final String serviceName, KinesisClientLibConfiguration config)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
Region region = RegionUtils.getRegion(config.getRegionName());
|
||||||
|
String serviceEndpoint = serviceName + "." + region.getName() + "." + region.getDomain();
|
||||||
|
ClientConfiguration clientConfig = new ClientConfiguration();
|
||||||
|
List<Proxy> proxies = ProxySelector.getDefault().select(new URI(Protocol.HTTPS.toString(),
|
||||||
|
serviceEndpoint, null, null));
|
||||||
|
|
||||||
|
Proxy proxy = proxies.isEmpty() ? Proxy.NO_PROXY : proxies.get(0);
|
||||||
|
if (proxy.type() == Proxy.Type.HTTP) {
|
||||||
|
InetSocketAddress addr = (InetSocketAddress)proxy.address();
|
||||||
|
clientConfig = clientConfig.withProxyHost(addr.getHostString()).withProxyPort(addr.getPort());
|
||||||
|
}
|
||||||
|
return clientConfig;
|
||||||
|
}
|
||||||
|
catch(URISyntaxException e)
|
||||||
|
{
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void prepare(String processingLanguage) {
|
private void prepare(String processingLanguage) {
|
||||||
// Ensure the JVM will refresh the cached IP values of AWS resources (e.g. service endpoints).
|
// Ensure the JVM will refresh the cached IP values of AWS resources (e.g. service endpoints).
|
||||||
java.security.Security.setProperty("networkaddress.cache.ttl", "60");
|
java.security.Security.setProperty("networkaddress.cache.ttl", "60");
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue