From cba8bf6bfac824270d81e4cfb9470f4478ce580f Mon Sep 17 00:00:00 2001 From: Michael Scharp Date: Thu, 23 Aug 2018 14:35:03 -0600 Subject: [PATCH] Adding Proxy support to multilang for version 1.x (#376) * Add proxy support Read proxy info from application.properties file first, then java system settings, and finally from ENV vars. * Formatted code according to AWS scheme. Import specific classes, not *. * Add proxy config unit tests * Changed per @sahilpalvia comments * Fix failing test * Changed per @sahilpalvia comments * Fixing missed http_proxy string * Changed per @sahilpalvia comments --- .../multilang/MultiLangDaemonConfig.java | 106 ++++++++++----- .../multilang/MultiLangDaemonConfigTest.java | 121 ++++++++++++++++-- 2 files changed, 184 insertions(+), 43 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemonConfig.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemonConfig.java index fc143083..2b7e4520 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemonConfig.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemonConfig.java @@ -14,11 +14,21 @@ */ package com.amazonaws.services.kinesis.multilang; +import com.amazonaws.ClientConfiguration; +import com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfigurator; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + import java.io.File; -import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; +import java.io.FileInputStream; +import java.net.URI; +import java.net.URISyntaxException; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; @@ -26,13 +36,6 @@ import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfigurator; -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; -import com.google.common.util.concurrent.ThreadFactoryBuilder; - /** * This class captures the configuration needed to run the MultiLangDaemon. */ @@ -47,6 +50,10 @@ public class MultiLangDaemonConfig { private static final String PROP_PROCESSING_LANGUAGE = "processingLanguage"; private static final String PROP_MAX_ACTIVE_THREADS = "maxActiveThreads"; + public static final String PROXY_HOST_PROP = "http.proxyHost"; + public static final String PROXY_PORT_PROP = "http.proxyPort"; + public static final String HTTP_PROXY_ENV_VAR = "HTTP_PROXY"; + private KinesisClientLibConfiguration kinesisClientLibConfig; private ExecutorService executorService; @@ -55,9 +62,9 @@ public class MultiLangDaemonConfig { /** * Constructor. - * + * * @param propertiesFile The location of the properties file. - * @throws IOException Thrown when the properties file can't be accessed. + * @throws IOException Thrown when the properties file can't be accessed. * @throws IllegalArgumentException Thrown when the contents of the properties file are not as expected. */ public MultiLangDaemonConfig(String propertiesFile) throws IOException, IllegalArgumentException { @@ -65,55 +72,93 @@ public class MultiLangDaemonConfig { } /** - * * @param propertiesFile The location of the properties file. - * @param classLoader A classloader, useful if trying to programmatically configure with the daemon, such as in a - * unit test. - * @throws IOException Thrown when the properties file can't be accessed. + * @param classLoader A classloader, useful if trying to programmatically configure with the daemon, such as in a + * unit test. + * @throws IOException Thrown when the properties file can't be accessed. * @throws IllegalArgumentException Thrown when the contents of the properties file are not as expected. */ - public MultiLangDaemonConfig(String propertiesFile, ClassLoader classLoader) throws IOException, - IllegalArgumentException { + public MultiLangDaemonConfig(String propertiesFile, ClassLoader classLoader) + throws IOException, IllegalArgumentException { this(propertiesFile, classLoader, new KinesisClientLibConfigurator()); } /** - * * @param propertiesFile The location of the properties file. - * @param classLoader A classloader, useful if trying to programmatically configure with the daemon, such as in a - * unit test. - * @param configurator A configurator to use. - * @throws IOException Thrown when the properties file can't be accessed. + * @param classLoader A classloader, useful if trying to programmatically configure with the daemon, such as in a + * unit test. + * @param configurator A configurator to use. + * @throws IOException Thrown when the properties file can't be accessed. * @throws IllegalArgumentException Thrown when the contents of the properties file are not as expected. */ - public MultiLangDaemonConfig(String propertiesFile, - ClassLoader classLoader, + public MultiLangDaemonConfig(String propertiesFile, ClassLoader classLoader, KinesisClientLibConfigurator configurator) throws IOException, IllegalArgumentException { Properties properties = loadProperties(classLoader, propertiesFile); if (!validateProperties(properties)) { - throw new IllegalArgumentException("Must provide an executable name in the properties file, " - + "e.g. executableName = sampleapp.py"); + throw new IllegalArgumentException( + "Must provide an executable name in the properties file, " + "e.g. executableName = sampleapp.py"); } String executableName = properties.getProperty(PROP_EXECUTABLE_NAME); String processingLanguage = properties.getProperty(PROP_PROCESSING_LANGUAGE); + ClientConfiguration clientConfig = buildClientConfig(properties); + + kinesisClientLibConfig = configurator.getConfiguration(properties).withKinesisClientConfig(clientConfig) + .withCloudWatchClientConfig(clientConfig).withDynamoDBClientConfig(clientConfig); - kinesisClientLibConfig = configurator.getConfiguration(properties); executorService = buildExecutorService(properties); - recordProcessorFactory = new MultiLangRecordProcessorFactory(executableName, executorService, kinesisClientLibConfig); + recordProcessorFactory = new MultiLangRecordProcessorFactory(executableName, executorService, + kinesisClientLibConfig); LOG.info("Running " + kinesisClientLibConfig.getApplicationName() + " to process stream " + kinesisClientLibConfig.getStreamName() + " with executable " + executableName); prepare(processingLanguage); } + private ClientConfiguration buildClientConfig(Properties properties) { + ClientConfiguration clientConfig = new ClientConfiguration(); + String proxyHost = null; + int proxyPort = 0; + + if (properties.getProperty(PROXY_HOST_PROP) != null) { + LOG.debug("Getting proxy info from properties file."); + + proxyHost = properties.getProperty(PROXY_HOST_PROP); + proxyPort = Integer.parseInt(properties.getProperty(PROXY_PORT_PROP)); + } else if (System.getProperty(PROXY_HOST_PROP) != null) { + LOG.debug("Getting proxy info from java system properties"); + + proxyHost = System.getProperty(PROXY_HOST_PROP); + proxyPort = Integer.parseInt(System.getProperty(PROXY_PORT_PROP)); + } else if (System.getenv(HTTP_PROXY_ENV_VAR) != null) { + LOG.debug("Getting proxy info environment settings"); + + try { + URI proxyAddr = new URI(System.getenv(HTTP_PROXY_ENV_VAR)); + + proxyHost = proxyAddr.getHost(); + proxyPort = proxyAddr.getPort(); + } catch (URISyntaxException e) { + LOG.error("System proxy not set correctly", e); + } + } + + if (StringUtils.isNotEmpty(proxyHost) && proxyPort > 0) { + clientConfig = clientConfig.withProxyHost(proxyHost).withProxyPort(proxyPort); + } else { + LOG.debug("Not configuring proxy as none specified"); + } + + return clientConfig; + } + private void prepare(String processingLanguage) { // Ensure the JVM will refresh the cached IP values of AWS resources (e.g. service endpoints). java.security.Security.setProperty("networkaddress.cache.ttl", "60"); LOG.info("Using workerId: " + kinesisClientLibConfig.getWorkerIdentifier()); - LOG.info("Using credentials with access key id: " - + kinesisClientLibConfig.getKinesisCredentialsProvider().getCredentials().getAWSAccessKeyId()); + LOG.info("Using credentials with access key id: " + kinesisClientLibConfig.getKinesisCredentialsProvider() + .getCredentials().getAWSAccessKeyId()); StringBuilder userAgent = new StringBuilder(KinesisClientLibConfiguration.KINESIS_CLIENT_LIB_USER_AGENT); userAgent.append(" "); @@ -187,7 +232,6 @@ public class MultiLangDaemonConfig { } /** - * * @return A KinesisClientLibConfiguration object based on the properties file provided. */ public KinesisClientLibConfiguration getKinesisClientLibConfiguration() { @@ -195,7 +239,6 @@ public class MultiLangDaemonConfig { } /** - * * @return An executor service based on the properties file provided. */ public ExecutorService getExecutorService() { @@ -203,7 +246,6 @@ public class MultiLangDaemonConfig { } /** - * * @return A MultiLangRecordProcessorFactory based on the properties file provided. */ public MultiLangRecordProcessorFactory getRecordProcessorFactory() { diff --git a/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemonConfigTest.java b/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemonConfigTest.java index 6a687577..1bd3580d 100644 --- a/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemonConfigTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemonConfigTest.java @@ -18,6 +18,10 @@ import static org.junit.Assert.assertNotNull; import java.io.ByteArrayInputStream; import java.io.IOException; +import java.lang.reflect.Field; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; import junit.framework.Assert; @@ -39,23 +43,38 @@ public class MultiLangDaemonConfigTest { AWSCredentials creds = Mockito.mock(AWSCredentials.class); Mockito.doReturn(creds).when(credentialsProvider).getCredentials(); Mockito.doReturn("cool-user").when(creds).getAWSAccessKeyId(); - KinesisClientLibConfiguration kclConfig = - new KinesisClientLibConfiguration("cool-app", "cool-stream", credentialsProvider, "cool-worker"); + KinesisClientLibConfiguration kclConfig = new KinesisClientLibConfiguration("cool-app", "cool-stream", + credentialsProvider, "cool-worker"); KinesisClientLibConfigurator configurator = Mockito.mock(KinesisClientLibConfigurator.class); Mockito.doReturn(kclConfig).when(configurator).getConfiguration(Mockito.any(Properties.class)); return configurator; } + // Function to mock ENV variables + private void setEnv(Map newenv) throws Exception { + Class[] classes = Collections.class.getDeclaredClasses(); + Map env = System.getenv(); + for (Class cl : classes) { + if ("java.util.Collections$UnmodifiableMap".equals(cl.getName())) { + Field field = cl.getDeclaredField("m"); + field.setAccessible(true); + Object obj = field.get(env); + Map map = (Map) obj; + map.clear(); + map.putAll(newenv); + } + } + } + @Test public void constructorTest() throws IOException { - String PROPERTIES = + String properties = "executableName = randomEXE \n" + "applicationName = testApp \n" + "streamName = fakeStream \n" + "AWSCredentialsProvider = DefaultAWSCredentialsProviderChain\n" + "processingLanguage = malbolge"; ClassLoader classLoader = Mockito.mock(ClassLoader.class); - Mockito.doReturn(new ByteArrayInputStream(PROPERTIES.getBytes())) - .when(classLoader) + Mockito.doReturn(new ByteArrayInputStream(properties.getBytes())).when(classLoader) .getResourceAsStream(FILENAME); MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, buildMockConfigurator()); @@ -67,14 +86,11 @@ public class MultiLangDaemonConfigTest { @Test public void propertyValidation() { - String PROPERTIES_NO_EXECUTABLE_NAME = - "applicationName = testApp \n" + "streamName = fakeStream \n" - + "AWSCredentialsProvider = DefaultAWSCredentialsProviderChain\n" - + "processingLanguage = malbolge"; + String propertiesNoExecutableName = "applicationName = testApp \n" + "streamName = fakeStream \n" + + "AWSCredentialsProvider = DefaultAWSCredentialsProviderChain\n" + "processingLanguage = malbolge"; ClassLoader classLoader = Mockito.mock(ClassLoader.class); - Mockito.doReturn(new ByteArrayInputStream(PROPERTIES_NO_EXECUTABLE_NAME.getBytes())) - .when(classLoader) + Mockito.doReturn(new ByteArrayInputStream(propertiesNoExecutableName.getBytes())).when(classLoader) .getResourceAsStream(FILENAME); MultiLangDaemonConfig config; @@ -88,4 +104,87 @@ public class MultiLangDaemonConfigTest { } } + @Test + public void testKinesisClientLibConfigurationShouldGetProxyInfoFromPropertiesFile() throws Exception { + String properties = + "executableName = randomEXE \n" + "applicationName = testApp \n" + "streamName = fakeStream \n" + + "AWSCredentialsProvider = DefaultAWSCredentialsProviderChain\n" + + "http.proxyHost = http://proxy.com\n" + "http.proxyPort = 1234\n" + + "processingLanguage = malbolge"; + ClassLoader classLoader = Mockito.mock(ClassLoader.class); + + Mockito.doReturn(new ByteArrayInputStream(properties.getBytes())).when(classLoader) + .getResourceAsStream(FILENAME); + + MultiLangDaemonConfig config = new MultiLangDaemonConfig(FILENAME, classLoader, buildMockConfigurator()); + assertAgainstKclConfig(config.getKinesisClientLibConfiguration(), "http://proxy.com", 1234); + } + + @Test + public void testKinesisClientLibConfigurationShouldGetProxyInfoFromSystemProperties() throws Exception { + String properties = + "executableName = randomEXE \n" + "applicationName = testApp \n" + "streamName = fakeStream \n" + + "AWSCredentialsProvider = DefaultAWSCredentialsProviderChain\n" + + "processingLanguage = malbolge"; + ClassLoader classLoader = Mockito.mock(ClassLoader.class); + + Mockito.doReturn(new ByteArrayInputStream(properties.getBytes())).when(classLoader) + .getResourceAsStream(FILENAME); + + System.setProperty(MultiLangDaemonConfig.PROXY_HOST_PROP, "http://proxy.com"); + System.setProperty(MultiLangDaemonConfig.PROXY_PORT_PROP, "1234"); + + MultiLangDaemonConfig config = new MultiLangDaemonConfig(FILENAME, classLoader, buildMockConfigurator()); + assertAgainstKclConfig(config.getKinesisClientLibConfiguration(), "http://proxy.com", 1234); + } + + @Test + public void testKinesisClientLibConfigurationShouldGetProxyInfoFromEnvVars() throws Exception { + String properties = + "executableName = randomEXE \n" + "applicationName = testApp \n" + "streamName = fakeStream \n" + + "AWSCredentialsProvider = DefaultAWSCredentialsProviderChain\n" + + "processingLanguage = malbolge"; + ClassLoader classLoader = Mockito.mock(ClassLoader.class); + + Map env = new HashMap<>(); + env.put(MultiLangDaemonConfig.HTTP_PROXY_ENV_VAR, "http://proxy.com:1234"); + + setEnv(env); + + Mockito.doReturn(new ByteArrayInputStream(properties.getBytes())).when(classLoader) + .getResourceAsStream(FILENAME); + + MultiLangDaemonConfig config = new MultiLangDaemonConfig(FILENAME, classLoader, buildMockConfigurator()); + + assertAgainstKclConfig(config.getKinesisClientLibConfiguration(), "http://proxy.com", 1234); + } + + @Test + public void testKinesisClientLibConfigurationShouldNotGetProxyInfo() throws Exception { + String properties = + "executableName = randomEXE \n" + "applicationName = testApp \n" + "streamName = fakeStream \n" + + "AWSCredentialsProvider = DefaultAWSCredentialsProviderChain\n" + + "processingLanguage = malbolge"; + ClassLoader classLoader = Mockito.mock(ClassLoader.class); + + Map env = new HashMap<>(); + + //clear out any env vars loaded from system + setEnv(env); + + Mockito.doReturn(new ByteArrayInputStream(properties.getBytes())).when(classLoader) + .getResourceAsStream(FILENAME); + + MultiLangDaemonConfig config = new MultiLangDaemonConfig(FILENAME, classLoader, buildMockConfigurator()); + assertAgainstKclConfig(config.getKinesisClientLibConfiguration(), null, -1); + } + + private void assertAgainstKclConfig(KinesisClientLibConfiguration kclConfig, String host, int port) { + Assert.assertEquals(host, kclConfig.getKinesisClientConfiguration().getProxyHost()); + Assert.assertEquals(host, kclConfig.getDynamoDBClientConfiguration().getProxyHost()); + Assert.assertEquals(host, kclConfig.getCloudWatchClientConfiguration().getProxyHost()); + Assert.assertEquals(port, kclConfig.getKinesisClientConfiguration().getProxyPort()); + Assert.assertEquals(port, kclConfig.getDynamoDBClientConfiguration().getProxyPort()); + Assert.assertEquals(port, kclConfig.getCloudWatchClientConfiguration().getProxyPort()); + } }