Adding Proxy support to multilang for version 1.x (#376)

* Add proxy support

Read proxy info from application.properties file first,
then java system settings, and finally from ENV vars.

* Formatted code according to AWS scheme.

Import specific classes, not *.

* Add proxy config unit tests

* Changed per @sahilpalvia comments

* Fix failing test

* Changed per @sahilpalvia comments

* Fixing missed http_proxy string

* Changed per @sahilpalvia comments
This commit is contained in:
Michael Scharp 2018-08-23 14:35:03 -06:00 committed by Sahil Palvia
parent f1d60ec1a6
commit cba8bf6bfa
2 changed files with 184 additions and 43 deletions

View file

@ -14,11 +14,21 @@
*/ */
package com.amazonaws.services.kinesis.multilang; package com.amazonaws.services.kinesis.multilang;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfigurator;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.File; import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.FileInputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
@ -26,13 +36,6 @@ import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfigurator;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/** /**
* This class captures the configuration needed to run the MultiLangDaemon. * This class captures the configuration needed to run the MultiLangDaemon.
*/ */
@ -47,6 +50,10 @@ public class MultiLangDaemonConfig {
private static final String PROP_PROCESSING_LANGUAGE = "processingLanguage"; private static final String PROP_PROCESSING_LANGUAGE = "processingLanguage";
private static final String PROP_MAX_ACTIVE_THREADS = "maxActiveThreads"; private static final String PROP_MAX_ACTIVE_THREADS = "maxActiveThreads";
public static final String PROXY_HOST_PROP = "http.proxyHost";
public static final String PROXY_PORT_PROP = "http.proxyPort";
public static final String HTTP_PROXY_ENV_VAR = "HTTP_PROXY";
private KinesisClientLibConfiguration kinesisClientLibConfig; private KinesisClientLibConfiguration kinesisClientLibConfig;
private ExecutorService executorService; private ExecutorService executorService;
@ -65,20 +72,18 @@ public class MultiLangDaemonConfig {
} }
/** /**
*
* @param propertiesFile The location of the properties file. * @param propertiesFile The location of the properties file.
* @param classLoader A classloader, useful if trying to programmatically configure with the daemon, such as in a * @param classLoader A classloader, useful if trying to programmatically configure with the daemon, such as in a
* unit test. * unit test.
* @throws IOException Thrown when the properties file can't be accessed. * @throws IOException Thrown when the properties file can't be accessed.
* @throws IllegalArgumentException Thrown when the contents of the properties file are not as expected. * @throws IllegalArgumentException Thrown when the contents of the properties file are not as expected.
*/ */
public MultiLangDaemonConfig(String propertiesFile, ClassLoader classLoader) throws IOException, public MultiLangDaemonConfig(String propertiesFile, ClassLoader classLoader)
IllegalArgumentException { throws IOException, IllegalArgumentException {
this(propertiesFile, classLoader, new KinesisClientLibConfigurator()); this(propertiesFile, classLoader, new KinesisClientLibConfigurator());
} }
/** /**
*
* @param propertiesFile The location of the properties file. * @param propertiesFile The location of the properties file.
* @param classLoader A classloader, useful if trying to programmatically configure with the daemon, such as in a * @param classLoader A classloader, useful if trying to programmatically configure with the daemon, such as in a
* unit test. * unit test.
@ -86,34 +91,74 @@ public class MultiLangDaemonConfig {
* @throws IOException Thrown when the properties file can't be accessed. * @throws IOException Thrown when the properties file can't be accessed.
* @throws IllegalArgumentException Thrown when the contents of the properties file are not as expected. * @throws IllegalArgumentException Thrown when the contents of the properties file are not as expected.
*/ */
public MultiLangDaemonConfig(String propertiesFile, public MultiLangDaemonConfig(String propertiesFile, ClassLoader classLoader,
ClassLoader classLoader,
KinesisClientLibConfigurator configurator) throws IOException, IllegalArgumentException { KinesisClientLibConfigurator configurator) throws IOException, IllegalArgumentException {
Properties properties = loadProperties(classLoader, propertiesFile); Properties properties = loadProperties(classLoader, propertiesFile);
if (!validateProperties(properties)) { if (!validateProperties(properties)) {
throw new IllegalArgumentException("Must provide an executable name in the properties file, " throw new IllegalArgumentException(
+ "e.g. executableName = sampleapp.py"); "Must provide an executable name in the properties file, " + "e.g. executableName = sampleapp.py");
} }
String executableName = properties.getProperty(PROP_EXECUTABLE_NAME); String executableName = properties.getProperty(PROP_EXECUTABLE_NAME);
String processingLanguage = properties.getProperty(PROP_PROCESSING_LANGUAGE); String processingLanguage = properties.getProperty(PROP_PROCESSING_LANGUAGE);
ClientConfiguration clientConfig = buildClientConfig(properties);
kinesisClientLibConfig = configurator.getConfiguration(properties).withKinesisClientConfig(clientConfig)
.withCloudWatchClientConfig(clientConfig).withDynamoDBClientConfig(clientConfig);
kinesisClientLibConfig = configurator.getConfiguration(properties);
executorService = buildExecutorService(properties); executorService = buildExecutorService(properties);
recordProcessorFactory = new MultiLangRecordProcessorFactory(executableName, executorService, kinesisClientLibConfig); recordProcessorFactory = new MultiLangRecordProcessorFactory(executableName, executorService,
kinesisClientLibConfig);
LOG.info("Running " + kinesisClientLibConfig.getApplicationName() + " to process stream " LOG.info("Running " + kinesisClientLibConfig.getApplicationName() + " to process stream "
+ kinesisClientLibConfig.getStreamName() + " with executable " + executableName); + kinesisClientLibConfig.getStreamName() + " with executable " + executableName);
prepare(processingLanguage); prepare(processingLanguage);
} }
private ClientConfiguration buildClientConfig(Properties properties) {
ClientConfiguration clientConfig = new ClientConfiguration();
String proxyHost = null;
int proxyPort = 0;
if (properties.getProperty(PROXY_HOST_PROP) != null) {
LOG.debug("Getting proxy info from properties file.");
proxyHost = properties.getProperty(PROXY_HOST_PROP);
proxyPort = Integer.parseInt(properties.getProperty(PROXY_PORT_PROP));
} else if (System.getProperty(PROXY_HOST_PROP) != null) {
LOG.debug("Getting proxy info from java system properties");
proxyHost = System.getProperty(PROXY_HOST_PROP);
proxyPort = Integer.parseInt(System.getProperty(PROXY_PORT_PROP));
} else if (System.getenv(HTTP_PROXY_ENV_VAR) != null) {
LOG.debug("Getting proxy info environment settings");
try {
URI proxyAddr = new URI(System.getenv(HTTP_PROXY_ENV_VAR));
proxyHost = proxyAddr.getHost();
proxyPort = proxyAddr.getPort();
} catch (URISyntaxException e) {
LOG.error("System proxy not set correctly", e);
}
}
if (StringUtils.isNotEmpty(proxyHost) && proxyPort > 0) {
clientConfig = clientConfig.withProxyHost(proxyHost).withProxyPort(proxyPort);
} else {
LOG.debug("Not configuring proxy as none specified");
}
return clientConfig;
}
private void prepare(String processingLanguage) { private void prepare(String processingLanguage) {
// Ensure the JVM will refresh the cached IP values of AWS resources (e.g. service endpoints). // Ensure the JVM will refresh the cached IP values of AWS resources (e.g. service endpoints).
java.security.Security.setProperty("networkaddress.cache.ttl", "60"); java.security.Security.setProperty("networkaddress.cache.ttl", "60");
LOG.info("Using workerId: " + kinesisClientLibConfig.getWorkerIdentifier()); LOG.info("Using workerId: " + kinesisClientLibConfig.getWorkerIdentifier());
LOG.info("Using credentials with access key id: " LOG.info("Using credentials with access key id: " + kinesisClientLibConfig.getKinesisCredentialsProvider()
+ kinesisClientLibConfig.getKinesisCredentialsProvider().getCredentials().getAWSAccessKeyId()); .getCredentials().getAWSAccessKeyId());
StringBuilder userAgent = new StringBuilder(KinesisClientLibConfiguration.KINESIS_CLIENT_LIB_USER_AGENT); StringBuilder userAgent = new StringBuilder(KinesisClientLibConfiguration.KINESIS_CLIENT_LIB_USER_AGENT);
userAgent.append(" "); userAgent.append(" ");
@ -187,7 +232,6 @@ public class MultiLangDaemonConfig {
} }
/** /**
*
* @return A KinesisClientLibConfiguration object based on the properties file provided. * @return A KinesisClientLibConfiguration object based on the properties file provided.
*/ */
public KinesisClientLibConfiguration getKinesisClientLibConfiguration() { public KinesisClientLibConfiguration getKinesisClientLibConfiguration() {
@ -195,7 +239,6 @@ public class MultiLangDaemonConfig {
} }
/** /**
*
* @return An executor service based on the properties file provided. * @return An executor service based on the properties file provided.
*/ */
public ExecutorService getExecutorService() { public ExecutorService getExecutorService() {
@ -203,7 +246,6 @@ public class MultiLangDaemonConfig {
} }
/** /**
*
* @return A MultiLangRecordProcessorFactory based on the properties file provided. * @return A MultiLangRecordProcessorFactory based on the properties file provided.
*/ */
public MultiLangRecordProcessorFactory getRecordProcessorFactory() { public MultiLangRecordProcessorFactory getRecordProcessorFactory() {

View file

@ -18,6 +18,10 @@ import static org.junit.Assert.assertNotNull;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Field;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties; import java.util.Properties;
import junit.framework.Assert; import junit.framework.Assert;
@ -39,23 +43,38 @@ public class MultiLangDaemonConfigTest {
AWSCredentials creds = Mockito.mock(AWSCredentials.class); AWSCredentials creds = Mockito.mock(AWSCredentials.class);
Mockito.doReturn(creds).when(credentialsProvider).getCredentials(); Mockito.doReturn(creds).when(credentialsProvider).getCredentials();
Mockito.doReturn("cool-user").when(creds).getAWSAccessKeyId(); Mockito.doReturn("cool-user").when(creds).getAWSAccessKeyId();
KinesisClientLibConfiguration kclConfig = KinesisClientLibConfiguration kclConfig = new KinesisClientLibConfiguration("cool-app", "cool-stream",
new KinesisClientLibConfiguration("cool-app", "cool-stream", credentialsProvider, "cool-worker"); credentialsProvider, "cool-worker");
KinesisClientLibConfigurator configurator = Mockito.mock(KinesisClientLibConfigurator.class); KinesisClientLibConfigurator configurator = Mockito.mock(KinesisClientLibConfigurator.class);
Mockito.doReturn(kclConfig).when(configurator).getConfiguration(Mockito.any(Properties.class)); Mockito.doReturn(kclConfig).when(configurator).getConfiguration(Mockito.any(Properties.class));
return configurator; return configurator;
} }
// Function to mock ENV variables
private void setEnv(Map<String, String> newenv) throws Exception {
Class[] classes = Collections.class.getDeclaredClasses();
Map<String, String> env = System.getenv();
for (Class cl : classes) {
if ("java.util.Collections$UnmodifiableMap".equals(cl.getName())) {
Field field = cl.getDeclaredField("m");
field.setAccessible(true);
Object obj = field.get(env);
Map<String, String> map = (Map<String, String>) obj;
map.clear();
map.putAll(newenv);
}
}
}
@Test @Test
public void constructorTest() throws IOException { public void constructorTest() throws IOException {
String PROPERTIES = String properties =
"executableName = randomEXE \n" + "applicationName = testApp \n" + "streamName = fakeStream \n" "executableName = randomEXE \n" + "applicationName = testApp \n" + "streamName = fakeStream \n"
+ "AWSCredentialsProvider = DefaultAWSCredentialsProviderChain\n" + "AWSCredentialsProvider = DefaultAWSCredentialsProviderChain\n"
+ "processingLanguage = malbolge"; + "processingLanguage = malbolge";
ClassLoader classLoader = Mockito.mock(ClassLoader.class); ClassLoader classLoader = Mockito.mock(ClassLoader.class);
Mockito.doReturn(new ByteArrayInputStream(PROPERTIES.getBytes())) Mockito.doReturn(new ByteArrayInputStream(properties.getBytes())).when(classLoader)
.when(classLoader)
.getResourceAsStream(FILENAME); .getResourceAsStream(FILENAME);
MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, buildMockConfigurator()); MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, buildMockConfigurator());
@ -67,14 +86,11 @@ public class MultiLangDaemonConfigTest {
@Test @Test
public void propertyValidation() { public void propertyValidation() {
String PROPERTIES_NO_EXECUTABLE_NAME = String propertiesNoExecutableName = "applicationName = testApp \n" + "streamName = fakeStream \n"
"applicationName = testApp \n" + "streamName = fakeStream \n" + "AWSCredentialsProvider = DefaultAWSCredentialsProviderChain\n" + "processingLanguage = malbolge";
+ "AWSCredentialsProvider = DefaultAWSCredentialsProviderChain\n"
+ "processingLanguage = malbolge";
ClassLoader classLoader = Mockito.mock(ClassLoader.class); ClassLoader classLoader = Mockito.mock(ClassLoader.class);
Mockito.doReturn(new ByteArrayInputStream(PROPERTIES_NO_EXECUTABLE_NAME.getBytes())) Mockito.doReturn(new ByteArrayInputStream(propertiesNoExecutableName.getBytes())).when(classLoader)
.when(classLoader)
.getResourceAsStream(FILENAME); .getResourceAsStream(FILENAME);
MultiLangDaemonConfig config; MultiLangDaemonConfig config;
@ -88,4 +104,87 @@ public class MultiLangDaemonConfigTest {
} }
} }
@Test
public void testKinesisClientLibConfigurationShouldGetProxyInfoFromPropertiesFile() throws Exception {
String properties =
"executableName = randomEXE \n" + "applicationName = testApp \n" + "streamName = fakeStream \n"
+ "AWSCredentialsProvider = DefaultAWSCredentialsProviderChain\n"
+ "http.proxyHost = http://proxy.com\n" + "http.proxyPort = 1234\n"
+ "processingLanguage = malbolge";
ClassLoader classLoader = Mockito.mock(ClassLoader.class);
Mockito.doReturn(new ByteArrayInputStream(properties.getBytes())).when(classLoader)
.getResourceAsStream(FILENAME);
MultiLangDaemonConfig config = new MultiLangDaemonConfig(FILENAME, classLoader, buildMockConfigurator());
assertAgainstKclConfig(config.getKinesisClientLibConfiguration(), "http://proxy.com", 1234);
}
@Test
public void testKinesisClientLibConfigurationShouldGetProxyInfoFromSystemProperties() throws Exception {
String properties =
"executableName = randomEXE \n" + "applicationName = testApp \n" + "streamName = fakeStream \n"
+ "AWSCredentialsProvider = DefaultAWSCredentialsProviderChain\n"
+ "processingLanguage = malbolge";
ClassLoader classLoader = Mockito.mock(ClassLoader.class);
Mockito.doReturn(new ByteArrayInputStream(properties.getBytes())).when(classLoader)
.getResourceAsStream(FILENAME);
System.setProperty(MultiLangDaemonConfig.PROXY_HOST_PROP, "http://proxy.com");
System.setProperty(MultiLangDaemonConfig.PROXY_PORT_PROP, "1234");
MultiLangDaemonConfig config = new MultiLangDaemonConfig(FILENAME, classLoader, buildMockConfigurator());
assertAgainstKclConfig(config.getKinesisClientLibConfiguration(), "http://proxy.com", 1234);
}
@Test
public void testKinesisClientLibConfigurationShouldGetProxyInfoFromEnvVars() throws Exception {
String properties =
"executableName = randomEXE \n" + "applicationName = testApp \n" + "streamName = fakeStream \n"
+ "AWSCredentialsProvider = DefaultAWSCredentialsProviderChain\n"
+ "processingLanguage = malbolge";
ClassLoader classLoader = Mockito.mock(ClassLoader.class);
Map<String, String> env = new HashMap<>();
env.put(MultiLangDaemonConfig.HTTP_PROXY_ENV_VAR, "http://proxy.com:1234");
setEnv(env);
Mockito.doReturn(new ByteArrayInputStream(properties.getBytes())).when(classLoader)
.getResourceAsStream(FILENAME);
MultiLangDaemonConfig config = new MultiLangDaemonConfig(FILENAME, classLoader, buildMockConfigurator());
assertAgainstKclConfig(config.getKinesisClientLibConfiguration(), "http://proxy.com", 1234);
}
@Test
public void testKinesisClientLibConfigurationShouldNotGetProxyInfo() throws Exception {
String properties =
"executableName = randomEXE \n" + "applicationName = testApp \n" + "streamName = fakeStream \n"
+ "AWSCredentialsProvider = DefaultAWSCredentialsProviderChain\n"
+ "processingLanguage = malbolge";
ClassLoader classLoader = Mockito.mock(ClassLoader.class);
Map<String, String> env = new HashMap<>();
//clear out any env vars loaded from system
setEnv(env);
Mockito.doReturn(new ByteArrayInputStream(properties.getBytes())).when(classLoader)
.getResourceAsStream(FILENAME);
MultiLangDaemonConfig config = new MultiLangDaemonConfig(FILENAME, classLoader, buildMockConfigurator());
assertAgainstKclConfig(config.getKinesisClientLibConfiguration(), null, -1);
}
private void assertAgainstKclConfig(KinesisClientLibConfiguration kclConfig, String host, int port) {
Assert.assertEquals(host, kclConfig.getKinesisClientConfiguration().getProxyHost());
Assert.assertEquals(host, kclConfig.getDynamoDBClientConfiguration().getProxyHost());
Assert.assertEquals(host, kclConfig.getCloudWatchClientConfiguration().getProxyHost());
Assert.assertEquals(port, kclConfig.getKinesisClientConfiguration().getProxyPort());
Assert.assertEquals(port, kclConfig.getDynamoDBClientConfiguration().getProxyPort());
Assert.assertEquals(port, kclConfig.getCloudWatchClientConfiguration().getProxyPort());
}
} }