From 5c497d87a958068e3616c742d3762ae0fad64969 Mon Sep 17 00:00:00 2001 From: Randy Findley Date: Fri, 14 Oct 2016 11:10:38 -0400 Subject: [PATCH 1/3] Feature/multiple cred providers (#111) * Add ability to specify different credential providers for Kinesis, DynamoDB, and CloudWatch. This is needed when accessing a cross-account Kineses stream using an assumed role. * Fix copy/paste mistake. * Update tests. Thanks to rgfindl@ --- .../config/KinesisClientLibConfigurator.java | 28 +++- .../KinesisClientLibConfiguratorTest.java | 152 ++++++++++++++++++ 2 files changed, 176 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/config/KinesisClientLibConfigurator.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/config/KinesisClientLibConfigurator.java index 027dde22..e239f967 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/config/KinesisClientLibConfigurator.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/config/KinesisClientLibConfigurator.java @@ -50,7 +50,9 @@ public class KinesisClientLibConfigurator { // Required properties private static final String PROP_APP_NAME = "applicationName"; private static final String PROP_STREAM_NAME = "streamName"; - private static final String PROP_CREDENTIALS_PROVIDER = "AWSCredentialsProvider"; + private static final String PROP_CREDENTIALS_PROVIDER_KINESIS = "AWSCredentialsProvider"; + private static final String PROP_CREDENTIALS_PROVIDER_DYNAMODB = "AWSCredentialsProviderDynamoDB"; + private static final String PROP_CREDENTIALS_PROVIDER_CLOUDWATCH = "AWSCredentialsProviderCloudWatch"; private static final String PROP_WORKER_ID = "workerId"; private Map, IPropertyValueDecoder> classToDecoder; @@ -107,7 +109,7 @@ public class KinesisClientLibConfigurator { String applicationName = stringValueDecoder.decodeValue(properties.getProperty(PROP_APP_NAME)); String streamName = stringValueDecoder.decodeValue(properties.getProperty(PROP_STREAM_NAME)); AWSCredentialsProvider provider = - awsCPPropGetter.decodeValue(properties.getProperty(PROP_CREDENTIALS_PROVIDER)); + awsCPPropGetter.decodeValue(properties.getProperty(PROP_CREDENTIALS_PROVIDER_KINESIS)); if (applicationName == null || applicationName.isEmpty()) { throw new IllegalArgumentException("Value of applicationName should be explicitly provided."); @@ -116,6 +118,24 @@ public class KinesisClientLibConfigurator { throw new IllegalArgumentException("Value of streamName should be explicitly provided."); } + // Decode the DynamoDB credentials provider if it exists. If not use the Kinesis credentials provider. + AWSCredentialsProvider providerDynamoDB; + String propCredentialsProviderDynamoDBValue = properties.getProperty(PROP_CREDENTIALS_PROVIDER_DYNAMODB); + if (propCredentialsProviderDynamoDBValue == null) { + providerDynamoDB = provider; + } else { + providerDynamoDB = awsCPPropGetter.decodeValue(propCredentialsProviderDynamoDBValue); + } + + // Decode the CloudWatch credentials provider if it exists. If not use the Kinesis credentials provider. + AWSCredentialsProvider providerCloudWatch; + String propCredentialsProviderCloudWatchValue = properties.getProperty(PROP_CREDENTIALS_PROVIDER_CLOUDWATCH); + if (propCredentialsProviderCloudWatchValue == null) { + providerCloudWatch = provider; + } else { + providerCloudWatch = awsCPPropGetter.decodeValue(propCredentialsProviderCloudWatchValue); + } + // Allow customer not to provide workerId or to provide empty worker id. String workerId = stringValueDecoder.decodeValue(properties.getProperty(PROP_WORKER_ID)); if (workerId == null || workerId.isEmpty()) { @@ -125,13 +145,13 @@ public class KinesisClientLibConfigurator { } KinesisClientLibConfiguration config = - new KinesisClientLibConfiguration(applicationName, streamName, provider, workerId); + new KinesisClientLibConfiguration(applicationName, streamName, provider, providerDynamoDB, providerCloudWatch, workerId); Set requiredNames = new HashSet(Arrays.asList(PROP_STREAM_NAME, PROP_APP_NAME, PROP_WORKER_ID, - PROP_CREDENTIALS_PROVIDER)); + PROP_CREDENTIALS_PROVIDER_KINESIS)); // Set all the variables that are not used for constructor. for (Object keyObject : properties.keySet()) { diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/config/KinesisClientLibConfiguratorTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/config/KinesisClientLibConfiguratorTest.java index 1ccd4941..cbdd0a2d 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/config/KinesisClientLibConfiguratorTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/config/KinesisClientLibConfiguratorTest.java @@ -40,6 +40,12 @@ public class KinesisClientLibConfiguratorTest { "com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfiguratorTest$AlwaysSucceedCredentialsProvider"; private String credentialName2 = "com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfiguratorTest$AlwaysFailCredentialsProvider"; + private String credentialNameKinesis = + "com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfiguratorTest$AlwaysSucceedCredentialsProviderKinesis"; + private String credentialNameDynamoDB = + "com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfiguratorTest$AlwaysSucceedCredentialsProviderDynamoDB"; + private String credentialNameCloudWatch = + "com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfiguratorTest$AlwaysSucceedCredentialsProviderCloudWatch"; private KinesisClientLibConfigurator configurator = new KinesisClientLibConfigurator(); @Test @@ -329,6 +335,74 @@ public class KinesisClientLibConfiguratorTest { } } + @Test + public void testWithDifferentAWSCredentialsForDynamoDBAndCloudWatch() { + String test = StringUtils.join(new String[] { + "streamName = a", + "applicationName = b", + "AWSCredentialsProvider = " + credentialNameKinesis, + "AWSCredentialsProviderDynamoDB = " + credentialNameDynamoDB, + "AWSCredentialsProviderCloudWatch = " + credentialNameCloudWatch, + "failoverTimeMillis = 100", + "shardSyncIntervalMillis = 500" + }, '\n'); + InputStream input = new ByteArrayInputStream(test.getBytes()); + + // separate input stream with getConfiguration to explicitly catch exception from the getConfiguration statement + KinesisClientLibConfiguration config = configurator.getConfiguration(input); + try { + config.getKinesisCredentialsProvider().getCredentials(); + } catch (Exception e) { + fail("Kinesis credential providers should not fail."); + } + try { + config.getDynamoDBCredentialsProvider().getCredentials(); + } catch (Exception e) { + fail("DynamoDB credential providers should not fail."); + } + try { + config.getCloudWatchCredentialsProvider().getCredentials(); + } catch (Exception e) { + fail("CloudWatch credential providers should not fail."); + } + } + + @Test + public void testWithDifferentAWSCredentialsForDynamoDBAndCloudWatchFailed() { + String test = StringUtils.join(new String[] { + "streamName = a", + "applicationName = b", + "AWSCredentialsProvider = " + credentialNameKinesis, + "AWSCredentialsProviderDynamoDB = " + credentialName1, + "AWSCredentialsProviderCloudWatch = " + credentialName1, + "failoverTimeMillis = 100", + "shardSyncIntervalMillis = 500" + }, '\n'); + InputStream input = new ByteArrayInputStream(test.getBytes()); + + // separate input stream with getConfiguration to explicitly catch exception from the getConfiguration statement + + // separate input stream with getConfiguration to explicitly catch exception from the getConfiguration statement + KinesisClientLibConfiguration config = configurator.getConfiguration(input); + try { + config.getKinesisCredentialsProvider().getCredentials(); + } catch (Exception e) { + fail("Kinesis credential providers should not fail."); + } + try { + config.getDynamoDBCredentialsProvider().getCredentials(); + fail("DynamoDB credential providers should fail."); + } catch (Exception e) { + // succeed + } + try { + config.getCloudWatchCredentialsProvider().getCredentials(); + fail("CloudWatch credential providers should fail."); + } catch (Exception e) { + // succeed + } + } + /** * This credentials provider will always succeed */ @@ -345,6 +419,84 @@ public class KinesisClientLibConfiguratorTest { } + /** + * This credentials provider will always succeed + */ + public static class AlwaysSucceedCredentialsProviderKinesis implements AWSCredentialsProvider { + + @Override + public AWSCredentials getCredentials() { + return new AWSCredentials() { + @Override + public String getAWSAccessKeyId() { + return ""; + } + + @Override + public String getAWSSecretKey() { + return ""; + } + }; + } + + @Override + public void refresh() { + } + + } + + /** + * This credentials provider will always succeed + */ + public static class AlwaysSucceedCredentialsProviderDynamoDB implements AWSCredentialsProvider { + + @Override + public AWSCredentials getCredentials() { + return new AWSCredentials() { + @Override + public String getAWSAccessKeyId() { + return ""; + } + + @Override + public String getAWSSecretKey() { + return ""; + } + }; + } + + @Override + public void refresh() { + } + + } + + /** + * This credentials provider will always succeed + */ + public static class AlwaysSucceedCredentialsProviderCloudWatch implements AWSCredentialsProvider { + + @Override + public AWSCredentials getCredentials() { + return new AWSCredentials() { + @Override + public String getAWSAccessKeyId() { + return ""; + } + + @Override + public String getAWSSecretKey() { + return ""; + } + }; + } + + @Override + public void refresh() { + } + + } + /** * This credentials provider will always fail */ From 56c59f685ae0c0ec2daa0ee5355a602b8c1c9c9c Mon Sep 17 00:00:00 2001 From: Justin Pfifer Date: Fri, 14 Oct 2016 08:11:32 -0700 Subject: [PATCH 2/3] Allow Disabling Shard Sync at Startup (#102) Allow disabling the shard sync at startup if the lease table already contains leases. This will reduce the startup load for larger streams when restarted the Kinesis application. --- .../worker/KinesisClientLibConfiguration.java | 32 +++++++++++++++ .../clientlibrary/lib/worker/ProcessTask.java | 13 ++++-- .../lib/worker/ShardConsumer.java | 8 +++- .../clientlibrary/lib/worker/Worker.java | 40 +++++++++++++------ .../kinesis/leases/impl/LeaseCoordinator.java | 1 + .../kinesis/leases/impl/LeaseManager.java | 10 ++++- .../leases/interfaces/ILeaseManager.java | 13 +++++- .../worker/ExceptionThrowingLeaseManager.java | 6 +++ .../lib/worker/ProcessTaskTest.java | 3 +- .../lib/worker/ShardConsumerTest.java | 15 ++++--- .../clientlibrary/lib/worker/WorkerTest.java | 8 +++- 11 files changed, 121 insertions(+), 28 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index 0f930053..37e609b5 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -154,6 +154,13 @@ public class KinesisClientLibConfiguration { */ public static final int DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY = 10; + /* + * The Worker will skip shard sync during initialization if there are one or more leases in the lease table. + * This assumes that the shards and leases are in-sync. + * This enables customers to choose faster startup times (e.g. during incremental deployments of an application). + */ + public static final boolean DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST = false; + /** * Default Shard prioritization strategy. */ @@ -191,6 +198,8 @@ public class KinesisClientLibConfiguration { private int initialLeaseTableReadCapacity; private int initialLeaseTableWriteCapacity; private InitialPositionInStreamExtended initialPositionInStreamExtended; + // This is useful for optimizing deployments to large fleets working on a stable stream. + private boolean skipShardSyncAtWorkerInitializationIfLeasesExist; private ShardPrioritization shardPrioritization; /** @@ -338,6 +347,7 @@ public class KinesisClientLibConfiguration { this.initialLeaseTableWriteCapacity = DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY; this.initialPositionInStreamExtended = InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream); + this.skipShardSyncAtWorkerInitializationIfLeasesExist = DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST; this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION; } @@ -561,6 +571,13 @@ public class KinesisClientLibConfiguration { return regionName; } + /** + * @return true if Worker should skip syncing shards and leases at startup if leases are present + */ + public boolean getSkipShardSyncAtWorkerInitializationIfLeasesExist() { + return skipShardSyncAtWorkerInitializationIfLeasesExist; + } + /** * @return Max leases this Worker can handle at a time */ @@ -862,6 +879,21 @@ public class KinesisClientLibConfiguration { return this; } + /** + * If set to true, the Worker will not sync shards and leases during initialization if there are one or more leases + * in the lease table. This assumes that the shards and leases are in-sync. + * This enables customers to choose faster startup times (e.g. during incremental deployments of an application). + * + * @param skipShardSyncAtStartupIfLeasesExist Should Worker skip syncing shards and leases at startup (Worker + * initialization). + * @return KinesisClientLibConfiguration + */ + public KinesisClientLibConfiguration withSkipShardSyncAtStartupIfLeasesExist( + boolean skipShardSyncAtStartupIfLeasesExist) { + this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtStartupIfLeasesExist; + return this; + } + /** * * @param regionName The region name for the service diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java index db0970d5..6ee34880 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java @@ -73,7 +73,8 @@ class ProcessTask implements ITask { IRecordProcessor recordProcessor, RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher, - long backoffTimeMillis) { + long backoffTimeMillis, + boolean skipShardSyncAtWorkerInitializationIfLeasesExist) { super(); this.shardInfo = shardInfo; this.recordProcessor = recordProcessor; @@ -82,13 +83,19 @@ class ProcessTask implements ITask { this.streamConfig = streamConfig; this.backoffTimeMillis = backoffTimeMillis; IKinesisProxy kinesisProxy = this.streamConfig.getStreamProxy(); - if (kinesisProxy instanceof IKinesisProxyExtended) { + // If skipShardSyncAtWorkerInitializationIfLeasesExist is set, we will not get the shard for + // this ProcessTask. In this case, duplicate KPL user records in the event of resharding will + // not be dropped during deaggregation of Amazon Kinesis records. This is only applicable if + // KPL is used for ingestion and KPL's aggregation feature is used. + if (!skipShardSyncAtWorkerInitializationIfLeasesExist && kinesisProxy instanceof IKinesisProxyExtended) { this.shard = ((IKinesisProxyExtended) kinesisProxy).getShard(this.shardInfo.getShardId()); } else { + this.shard = null; + } + if (this.shard == null && !skipShardSyncAtWorkerInitializationIfLeasesExist) { LOG.warn("Cannot get the shard for this ProcessTask, so duplicate KPL user records " + "in the event of resharding will not be dropped during deaggregation of Amazon " + "Kinesis records."); - this.shard = null; } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java index b6cc76aa..b9e8d2df 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java @@ -58,6 +58,7 @@ class ShardConsumer { private final long parentShardPollIntervalMillis; private final boolean cleanupLeasesOfCompletedShards; private final long taskBackoffTimeMillis; + private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist; private ITask currentTask; private long currentTaskSubmitTime; @@ -96,7 +97,8 @@ class ShardConsumer { boolean cleanupLeasesOfCompletedShards, ExecutorService executorService, IMetricsFactory metricsFactory, - long backoffTimeMillis) { + long backoffTimeMillis, + boolean skipShardSyncAtWorkerInitializationIfLeasesExist) { this.streamConfig = streamConfig; this.recordProcessor = recordProcessor; this.executorService = executorService; @@ -114,6 +116,7 @@ class ShardConsumer { this.parentShardPollIntervalMillis = parentShardPollIntervalMillis; this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards; this.taskBackoffTimeMillis = backoffTimeMillis; + this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist; } /** @@ -262,7 +265,8 @@ class ShardConsumer { recordProcessor, recordProcessorCheckpointer, dataFetcher, - taskBackoffTimeMillis); + taskBackoffTimeMillis, + skipShardSyncAtWorkerInitializationIfLeasesExist); break; case SHUTTING_DOWN: nextTask = diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 32efa442..09c31d87 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -70,7 +70,7 @@ public class Worker implements Runnable { private final long idleTimeInMilliseconds; // Backoff time when polling to check if application has finished processing // parent shards - private final long parentShardPollIntervalMillis; + private final long parentShardPollIntervalMillis; private final ExecutorService executorService; private final IMetricsFactory metricsFactory; // Backoff time when running tasks if they encounter exceptions @@ -91,6 +91,8 @@ public class Worker implements Runnable { private ConcurrentMap shardInfoShardConsumerMap = new ConcurrentHashMap(); private final boolean cleanupLeasesUponShardCompletion; + + private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist; /** * Constructor. @@ -208,7 +210,7 @@ public class Worker implements Runnable { ExecutorService execService) { this( config.getApplicationName(), - new V1ToV2RecordProcessorFactoryAdapter(recordProcessorFactory), + new V1ToV2RecordProcessorFactoryAdapter(recordProcessorFactory), new StreamConfig( new KinesisProxyFactory(config.getKinesisCredentialsProvider(), kinesisClient) .getProxy(config.getStreamName()), @@ -235,7 +237,9 @@ public class Worker implements Runnable { metricsFactory, config.getTaskBackoffTimeMillis(), config.getFailoverTimeMillis(), + config.getSkipShardSyncAtWorkerInitializationIfLeasesExist(), config.getShardPrioritizationStrategy()); + // If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB. if (config.getRegionName() != null) { Region region = RegionUtils.getRegion(config.getRegionName()); @@ -292,6 +296,7 @@ public class Worker implements Runnable { IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, + boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) { this.applicationName = applicationName; this.recordProcessorFactory = recordProcessorFactory; @@ -313,7 +318,8 @@ public class Worker implements Runnable { metricsFactory, executorService); this.taskBackoffTimeMillis = taskBackoffTimeMillis; - this.failoverTimeMillis = failoverTimeMillis; + this.failoverTimeMillis = failoverTimeMillis; + this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist; this.shardPrioritization = shardPrioritization; } @@ -395,16 +401,22 @@ public class Worker implements Runnable { LOG.info("Initializing LeaseCoordinator"); leaseCoordinator.initialize(); - LOG.info("Syncing Kinesis shard info"); - ShardSyncTask shardSyncTask = - new ShardSyncTask(streamConfig.getStreamProxy(), - leaseCoordinator.getLeaseManager(), - initialPosition, - cleanupLeasesUponShardCompletion, - 0L); - TaskResult result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call(); + TaskResult result = null; + if (!skipShardSyncAtWorkerInitializationIfLeasesExist + || leaseCoordinator.getLeaseManager().isLeaseTableEmpty()) { + LOG.info("Syncing Kinesis shard info"); + ShardSyncTask shardSyncTask = + new ShardSyncTask(streamConfig.getStreamProxy(), + leaseCoordinator.getLeaseManager(), + initialPosition, + cleanupLeasesUponShardCompletion, + 0L); + result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call(); + } else { + LOG.info("Skipping shard sync per config setting (and lease table is not empty)"); + } - if (result.getException() == null) { + if (result == null || result.getException() == null) { if (!leaseCoordinator.isRunning()) { LOG.info("Starting LeaseCoordinator"); leaseCoordinator.start(); @@ -567,7 +579,7 @@ public class Worker implements Runnable { return new ShardConsumer(shardInfo, streamConfig, checkpointTracker, recordProcessor, leaseCoordinator.getLeaseManager(), parentShardPollIntervalMillis, cleanupLeasesUponShardCompletion, - executorService, metricsFactory, taskBackoffTimeMillis); + executorService, metricsFactory, taskBackoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist); } @@ -991,7 +1003,9 @@ public class Worker implements Runnable { metricsFactory, config.getTaskBackoffTimeMillis(), config.getFailoverTimeMillis(), + config.getSkipShardSyncAtWorkerInitializationIfLeasesExist(), shardPrioritization); + } } diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCoordinator.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCoordinator.java index e356ac49..55c26674 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCoordinator.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCoordinator.java @@ -70,6 +70,7 @@ public class LeaseCoordinator { // Package level access for testing. static final int MAX_LEASE_RENEWAL_THREAD_COUNT = 20; + private final ILeaseRenewer leaseRenewer; private final ILeaseTaker leaseTaker; private final long renewerIntervalMillis; diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java index 3a4d408c..226756eb 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Amazon Software License (the "License"). * You may not use this file except in compliance with the License. @@ -214,6 +214,14 @@ public class LeaseManager implements ILeaseManager { return list(null); } + /** + * {@inheritDoc} + */ + @Override + public boolean isLeaseTableEmpty() throws DependencyException, InvalidStateException, ProvisionedThroughputException { + return list(1).isEmpty(); + } + /** * List with the given page size. Package access for integration testing. * diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/interfaces/ILeaseManager.java b/src/main/java/com/amazonaws/services/kinesis/leases/interfaces/ILeaseManager.java index 06d6d432..ab296cc1 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/interfaces/ILeaseManager.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/interfaces/ILeaseManager.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Amazon Software License (the "License"). * You may not use this file except in compliance with the License. @@ -180,4 +180,15 @@ public interface ILeaseManager { public boolean updateLease(T lease) throws DependencyException, InvalidStateException, ProvisionedThroughputException; + /** + * Check (synchronously) if there are any leases in the lease table. + * + * @return true if there are no leases in the lease table + * + * @throws DependencyException if DynamoDB scan fails in an unexpected way + * @throws InvalidStateException if lease table does not exist + * @throws ProvisionedThroughputException if DynamoDB scan fails due to lack of capacity + */ + public boolean isLeaseTableEmpty() throws DependencyException, InvalidStateException, ProvisionedThroughputException; + } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ExceptionThrowingLeaseManager.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ExceptionThrowingLeaseManager.java index 765aaa44..2a07d1ed 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ExceptionThrowingLeaseManager.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ExceptionThrowingLeaseManager.java @@ -212,4 +212,10 @@ class ExceptionThrowingLeaseManager implements ILeaseManager leaseManager.deleteAll(); } + @Override + public boolean isLeaseTableEmpty() throws DependencyException, + InvalidStateException, ProvisionedThroughputException { + return false; + } + } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java index f1d908f0..4d32566e 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java @@ -89,7 +89,8 @@ public class ProcessTaskTest { INITIAL_POSITION_LATEST); final ShardInfo shardInfo = new ShardInfo(shardId, null, null, null); processTask = new ProcessTask( - shardInfo, config, mockRecordProcessor, mockCheckpointer, mockDataFetcher, taskBackoffTimeMillis); + shardInfo, config, mockRecordProcessor, mockCheckpointer, mockDataFetcher, taskBackoffTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); } @Test diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java index 26337381..755d08a4 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java @@ -117,7 +117,8 @@ public class ShardConsumerTest { cleanupLeasesOfCompletedShards, executorService, metricsFactory, - taskBackoffTimeMillis); + taskBackoffTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // initialize @@ -167,7 +168,8 @@ public class ShardConsumerTest { cleanupLeasesOfCompletedShards, spyExecutorService, metricsFactory, - taskBackoffTimeMillis); + taskBackoffTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // initialize @@ -211,7 +213,8 @@ public class ShardConsumerTest { cleanupLeasesOfCompletedShards, executorService, metricsFactory, - taskBackoffTimeMillis); + taskBackoffTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); when(leaseManager.getLease(anyString())).thenReturn(null); when(checkpoint.getCheckpoint(anyString())).thenReturn(new ExtendedSequenceNumber("123")); @@ -300,7 +303,8 @@ public class ShardConsumerTest { cleanupLeasesOfCompletedShards, executorService, metricsFactory, - taskBackoffTimeMillis); + taskBackoffTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // check on parent shards @@ -390,7 +394,8 @@ public class ShardConsumerTest { cleanupLeasesOfCompletedShards, executorService, metricsFactory, - taskBackoffTimeMillis); + taskBackoffTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // check on parent shards diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index 0747f83d..53d89dc2 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -224,6 +224,7 @@ public class WorkerTest { nullMetricsFactory, taskBackoffTimeMillis, failoverTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, shardPrioritization); ShardInfo shardInfo = new ShardInfo(dummyKinesisShardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); ShardConsumer consumer = worker.createOrGetShardConsumer(shardInfo, streamletFactory); @@ -263,7 +264,7 @@ public class WorkerTest { Worker worker = new Worker(stageName, streamletFactory, streamConfig, INITIAL_POSITION_LATEST, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService, nullMetricsFactory, taskBackoffTimeMillis, failoverTimeMillis, - shardPrioritization); + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, shardPrioritization); Worker workerSpy = spy(worker); @@ -327,6 +328,7 @@ public class WorkerTest { nullMetricsFactory, taskBackoffTimeMillis, failoverTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, shardPrioritization); ShardInfo shardInfo1 = new ShardInfo(dummyKinesisShardId, concurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); @@ -380,6 +382,7 @@ public class WorkerTest { nullMetricsFactory, taskBackoffTimeMillis, failoverTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, shardPrioritization); worker.run(); Assert.assertTrue(count > 0); @@ -823,8 +826,9 @@ public class WorkerTest { metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, shardPrioritization); - + WorkerThread workerThread = new WorkerThread(worker); workerThread.start(); return workerThread; From db314b970b21b868f83ab0c4d49c3b19b261bafd Mon Sep 17 00:00:00 2001 From: Josh Date: Fri, 14 Oct 2016 11:05:44 -0600 Subject: [PATCH 3/3] Allow for credential providers with basic arg types (#99) * Add support for configuring DynamoDB endpoint Adding a new field named `dynamoDBEndpoint` to the .properties file that gets passed into the KCL multi-lang daemon. We need this ability to point the KCL worker at a local instance of DynamoDB rather than in AWS. * Added the ability to use AWSCredentialsProvider's that require non-empty contructor args e.g. ProfileCredentialsProvider where you provide the profile name to use from your ~/.aws/credentials file * Created a constructor without the dynamoDBEndpoint argument i.e. same arguments before the dynamo change, for backwards compatibility --- ...edentialsProviderPropertyValueDecoder.java | 34 ++++-- .../worker/KinesisClientLibConfiguration.java | 110 +++++++++++++++-- .../clientlibrary/lib/worker/Worker.java | 5 + ...tialsProviderPropertyValueDecoderTest.java | 115 ++++++++++++++++++ .../KinesisClientLibConfigurationTest.java | 8 +- 5 files changed, 250 insertions(+), 22 deletions(-) create mode 100644 src/test/java/com/amazonaws/services/kinesis/clientlibrary/config/AWSCredentialsProviderPropertyValueDecoderTest.java diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/config/AWSCredentialsProviderPropertyValueDecoder.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/config/AWSCredentialsProviderPropertyValueDecoder.java index 1f508771..9976b071 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/config/AWSCredentialsProviderPropertyValueDecoder.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/config/AWSCredentialsProviderPropertyValueDecoder.java @@ -17,6 +17,7 @@ package com.amazonaws.services.kinesis.clientlibrary.config; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.lang.reflect.Constructor; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -30,6 +31,7 @@ class AWSCredentialsProviderPropertyValueDecoder implements IPropertyValueDecode private static final Log LOG = LogFactory.getLog(AWSCredentialsProviderPropertyValueDecoder.class); private static final String AUTH_PREFIX = "com.amazonaws.auth."; private static final String LIST_DELIMITER = ","; + private static final String ARG_DELIMITER = "|"; /** * Constructor. @@ -39,7 +41,7 @@ class AWSCredentialsProviderPropertyValueDecoder implements IPropertyValueDecode /** * Get AWSCredentialsProvider property. - * + * * @param value property value as String * @return corresponding variable in correct type */ @@ -70,11 +72,25 @@ class AWSCredentialsProviderPropertyValueDecoder implements IPropertyValueDecode private static List getValidCredentialsProviders(List providerNames) { List credentialsProviders = new ArrayList(); for (String providerName : providerNames) { - try { - Class className = Class.forName(providerName); - credentialsProviders.add((AWSCredentialsProvider) className.newInstance()); - } catch (Exception e) { - LOG.debug("Can't find any credentials provider matching " + providerName + "."); + if (providerName.contains(ARG_DELIMITER)) { + String[] nameAndArgs = providerName.split("\\" + ARG_DELIMITER); + Class[] argTypes = new Class[nameAndArgs.length - 1]; + Arrays.fill(argTypes, String.class); + try { + Class className = Class.forName(nameAndArgs[0]); + Constructor c = className.getConstructor(argTypes); + credentialsProviders.add((AWSCredentialsProvider) c.newInstance( + Arrays.copyOfRange(nameAndArgs, 1, nameAndArgs.length))); + } catch (Exception e) { + LOG.debug("Can't find any credentials provider matching " + providerName + "."); + } + } else { + try { + Class className = Class.forName(providerName); + credentialsProviders.add((AWSCredentialsProvider) className.newInstance()); + } catch (Exception e) { + LOG.debug("Can't find any credentials provider matching " + providerName + "."); + } } } return credentialsProviders; @@ -97,13 +113,13 @@ class AWSCredentialsProviderPropertyValueDecoder implements IPropertyValueDecode private static List getPossibleFullClassNames(String s) { /* * We take care of three cases : - * + * * 1. Customer provides a short name of common providers in com.amazonaws.auth package i.e. any classes * implementing the AWSCredentialsProvider interface: * http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/AWSCredentialsProvider.html - * + * * 2. Customer provides a full name of common providers e.g. com.amazonaws.auth.ClasspathFileCredentialsProvider - * + * * 3. Customer provides a custom credentials provider with full name of provider */ diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index 37e609b5..174fb65e 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -170,6 +170,7 @@ public class KinesisClientLibConfiguration { private String tableName; private String streamName; private String kinesisEndpoint; + private String dynamoDBEndpoint; private InitialPositionInStream initialPositionInStream; private AWSCredentialsProvider kinesisCredentialsProvider; private AWSCredentialsProvider dynamoDBCredentialsProvider; @@ -204,7 +205,7 @@ public class KinesisClientLibConfiguration { /** * Constructor. - * + * * @param applicationName Name of the Amazon Kinesis application. * By default the application name is included in the user agent string used to make AWS requests. This * can assist with troubleshooting (e.g. distinguish requests made by separate applications). @@ -221,7 +222,7 @@ public class KinesisClientLibConfiguration { /** * Constructor. - * + * * @param applicationName Name of the Amazon Kinesis application * By default the application name is included in the user agent string used to make AWS requests. This * can assist with troubleshooting (e.g. distinguish requests made by separate applications). @@ -237,7 +238,7 @@ public class KinesisClientLibConfiguration { AWSCredentialsProvider dynamoDBCredentialsProvider, AWSCredentialsProvider cloudWatchCredentialsProvider, String workerId) { - this(applicationName, streamName, null, DEFAULT_INITIAL_POSITION_IN_STREAM, kinesisCredentialsProvider, + this(applicationName, streamName, null, null, DEFAULT_INITIAL_POSITION_IN_STREAM, kinesisCredentialsProvider, dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, DEFAULT_FAILOVER_TIME_MILLIS, workerId, DEFAULT_MAX_RECORDS, DEFAULT_IDLETIME_BETWEEN_READS_MILLIS, DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST, DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS, @@ -305,6 +306,76 @@ public class KinesisClientLibConfiguration { int metricsMaxQueueSize, boolean validateSequenceNumberBeforeCheckpointing, String regionName) { + this(applicationName, streamName, kinesisEndpoint, null, initialPositionInStream, kinesisCredentialsProvider, + dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, failoverTimeMillis, workerId, + maxRecords, idleTimeBetweenReadsInMillis, + callProcessRecordsEvenForEmptyRecordList, parentShardPollIntervalMillis, + shardSyncIntervalMillis, cleanupTerminatedShardsBeforeExpiry, + kinesisClientConfig, dynamoDBClientConfig, cloudWatchClientConfig, + taskBackoffTimeMillis, metricsBufferTimeMillis, metricsMaxQueueSize, + validateSequenceNumberBeforeCheckpointing, regionName); + } + + /** + * @param applicationName Name of the Kinesis application + * By default the application name is included in the user agent string used to make AWS requests. This + * can assist with troubleshooting (e.g. distinguish requests made by separate applications). + * @param streamName Name of the Kinesis stream + * @param kinesisEndpoint Kinesis endpoint + * @param dynamoDBEndpoint DynamoDB endpoint + * @param initialPositionInStream One of LATEST or TRIM_HORIZON. The KinesisClientLibrary will start fetching + * records from that location in the stream when an application starts up for the first time and there + * are no checkpoints. If there are checkpoints, then we start from the checkpoint position. + * @param kinesisCredentialsProvider Provides credentials used to access Kinesis + * @param dynamoDBCredentialsProvider Provides credentials used to access DynamoDB + * @param cloudWatchCredentialsProvider Provides credentials used to access CloudWatch + * @param failoverTimeMillis Lease duration (leases not renewed within this period will be claimed by others) + * @param workerId Used to distinguish different workers/processes of a Kinesis application + * @param maxRecords Max records to read per Kinesis getRecords() call + * @param idleTimeBetweenReadsInMillis Idle time between calls to fetch data from Kinesis + * @param callProcessRecordsEvenForEmptyRecordList Call the IRecordProcessor::processRecords() API even if + * GetRecords returned an empty record list. + * @param parentShardPollIntervalMillis Wait for this long between polls to check if parent shards are done + * @param shardSyncIntervalMillis Time between tasks to sync leases and Kinesis shards + * @param cleanupTerminatedShardsBeforeExpiry Clean up shards we've finished processing (don't wait for expiration + * in Kinesis) + * @param kinesisClientConfig Client Configuration used by Kinesis client + * @param dynamoDBClientConfig Client Configuration used by DynamoDB client + * @param cloudWatchClientConfig Client Configuration used by CloudWatch client + * @param taskBackoffTimeMillis Backoff period when tasks encounter an exception + * @param metricsBufferTimeMillis Metrics are buffered for at most this long before publishing to CloudWatch + * @param metricsMaxQueueSize Max number of metrics to buffer before publishing to CloudWatch + * @param validateSequenceNumberBeforeCheckpointing whether KCL should validate client provided sequence numbers + * with a call to Amazon Kinesis before checkpointing for calls to + * {@link RecordProcessorCheckpointer#checkpoint(String)} + * @param regionName The region name for the service + */ + // CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 26 LINES + // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 26 LINES + public KinesisClientLibConfiguration(String applicationName, + String streamName, + String kinesisEndpoint, + String dynamoDBEndpoint, + InitialPositionInStream initialPositionInStream, + AWSCredentialsProvider kinesisCredentialsProvider, + AWSCredentialsProvider dynamoDBCredentialsProvider, + AWSCredentialsProvider cloudWatchCredentialsProvider, + long failoverTimeMillis, + String workerId, + int maxRecords, + long idleTimeBetweenReadsInMillis, + boolean callProcessRecordsEvenForEmptyRecordList, + long parentShardPollIntervalMillis, + long shardSyncIntervalMillis, + boolean cleanupTerminatedShardsBeforeExpiry, + ClientConfiguration kinesisClientConfig, + ClientConfiguration dynamoDBClientConfig, + ClientConfiguration cloudWatchClientConfig, + long taskBackoffTimeMillis, + long metricsBufferTimeMillis, + int metricsMaxQueueSize, + boolean validateSequenceNumberBeforeCheckpointing, + String regionName) { // Check following values are greater than zero checkIsValuePositive("FailoverTimeMillis", failoverTimeMillis); checkIsValuePositive("IdleTimeBetweenReadsInMillis", idleTimeBetweenReadsInMillis); @@ -319,6 +390,7 @@ public class KinesisClientLibConfiguration { this.tableName = applicationName; this.streamName = streamName; this.kinesisEndpoint = kinesisEndpoint; + this.dynamoDBEndpoint = dynamoDBEndpoint; this.initialPositionInStream = initialPositionInStream; this.kinesisCredentialsProvider = kinesisCredentialsProvider; this.dynamoDBCredentialsProvider = dynamoDBCredentialsProvider; @@ -354,7 +426,7 @@ public class KinesisClientLibConfiguration { // Check if value is positive, otherwise throw an exception private void checkIsValuePositive(String key, long value) { if (value <= 0) { - throw new IllegalArgumentException("Value of " + key + throw new IllegalArgumentException("Value of " + key + " should be positive, but current value is " + value); } } @@ -373,11 +445,11 @@ public class KinesisClientLibConfiguration { config.setUserAgent(existingUserAgent); return config; } - + private void checkIsRegionNameValid(String regionNameToCheck) { if (regionNameToCheck != null && RegionUtils.getRegion(regionNameToCheck) == null) { throw new IllegalArgumentException("The specified region name is not valid"); - } + } } /** @@ -478,6 +550,13 @@ public class KinesisClientLibConfiguration { return kinesisEndpoint; } + /** + * @return DynamoDB endpoint + */ + public String getDynamoDBEndpoint() { + return dynamoDBEndpoint; + } + /** * @return the initialPositionInStream */ @@ -648,6 +727,15 @@ public class KinesisClientLibConfiguration { return this; } + /** + * @param dynamoDBEndpoint DynamoDB endpoint + * @return KinesisClientLibConfiguration + */ + public KinesisClientLibConfiguration withDynamoDBEndpoint(String dynamoDBEndpoint) { + this.dynamoDBEndpoint = dynamoDBEndpoint; + return this; + } + /** * @param initialPositionInStream One of LATEST or TRIM_HORIZON. The Amazon Kinesis Client Library * will start fetching records from this position when the application starts up if there are no checkpoints. @@ -784,7 +872,7 @@ public class KinesisClientLibConfiguration { /** * Override the default user agent (application name). - * + * * @param userAgent User agent to use in AWS requests * @return KinesisClientLibConfiguration */ @@ -840,7 +928,7 @@ public class KinesisClientLibConfiguration { * NONE * SUMMARY * DETAILED - * + * * @param metricsLevel Metrics level to enable. * @return KinesisClientLibConfiguration */ @@ -867,7 +955,7 @@ public class KinesisClientLibConfiguration { } /** - * + * * @param validateSequenceNumberBeforeCheckpointing whether KCL should validate client provided sequence numbers * with a call to Amazon Kinesis before checkpointing for calls to * {@link RecordProcessorCheckpointer#checkpoint(String)}. @@ -883,7 +971,7 @@ public class KinesisClientLibConfiguration { * If set to true, the Worker will not sync shards and leases during initialization if there are one or more leases * in the lease table. This assumes that the shards and leases are in-sync. * This enables customers to choose faster startup times (e.g. during incremental deployments of an application). - * + * * @param skipShardSyncAtStartupIfLeasesExist Should Worker skip syncing shards and leases at startup (Worker * initialization). * @return KinesisClientLibConfiguration @@ -895,7 +983,7 @@ public class KinesisClientLibConfiguration { } /** - * + * * @param regionName The region name for the service * @return KinesisClientLibConfiguration */ diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 09c31d87..b644a790 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -248,6 +248,11 @@ public class Worker implements Runnable { dynamoDBClient.setRegion(region); LOG.debug("The region of Amazon DynamoDB client has been set to " + config.getRegionName()); } + // If a dynamoDB endpoint was explicitly specified, use it to set the DynamoDB endpoint. + if (config.getDynamoDBEndpoint() != null) { + dynamoDBClient.setEndpoint(config.getDynamoDBEndpoint()); + LOG.debug("The endpoint of Amazon DynamoDB client has been set to " + config.getDynamoDBEndpoint()); + } // If a kinesis endpoint was explicitly specified, use it to set the region of kinesis. if (config.getKinesisEndpoint() != null) { kinesisClient.setEndpoint(config.getKinesisEndpoint()); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/config/AWSCredentialsProviderPropertyValueDecoderTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/config/AWSCredentialsProviderPropertyValueDecoderTest.java new file mode 100644 index 00000000..cddd837a --- /dev/null +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/config/AWSCredentialsProviderPropertyValueDecoderTest.java @@ -0,0 +1,115 @@ +/* + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package com.amazonaws.services.kinesis.clientlibrary.config; + +import static org.junit.Assert.assertEquals; + +import org.junit.Test; + +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.AWSCredentialsProviderChain; +import com.amazonaws.services.kinesis.clientlibrary.config.AWSCredentialsProviderPropertyValueDecoder; + +public class AWSCredentialsProviderPropertyValueDecoderTest { + + private static final String TEST_ACCESS_KEY_ID = "123"; + private static final String TEST_SECRET_KEY = "456"; + + private String credentialName1 = + "com.amazonaws.services.kinesis.clientlibrary.config.AWSCredentialsProviderPropertyValueDecoderTest$AlwaysSucceedCredentialsProvider"; + private String credentialName2 = + "com.amazonaws.services.kinesis.clientlibrary.config.AWSCredentialsProviderPropertyValueDecoderTest$ConstructorCredentialsProvider"; + private AWSCredentialsProviderPropertyValueDecoder decoder = new AWSCredentialsProviderPropertyValueDecoder(); + + @Test + public void testSingleProvider() { + AWSCredentialsProvider provider = decoder.decodeValue(credentialName1); + assertEquals(provider.getClass(), AWSCredentialsProviderChain.class); + assertEquals(provider.getCredentials().getAWSAccessKeyId(), TEST_ACCESS_KEY_ID); + assertEquals(provider.getCredentials().getAWSSecretKey(), TEST_SECRET_KEY); + } + + @Test + public void testTwoProviders() { + AWSCredentialsProvider provider = decoder.decodeValue(credentialName1 + "," + credentialName1); + assertEquals(provider.getClass(), AWSCredentialsProviderChain.class); + assertEquals(provider.getCredentials().getAWSAccessKeyId(), TEST_ACCESS_KEY_ID); + assertEquals(provider.getCredentials().getAWSSecretKey(), TEST_SECRET_KEY); + } + + @Test + public void testProfileProviderWithOneArg() { + AWSCredentialsProvider provider = decoder.decodeValue(credentialName2 + "|arg"); + assertEquals(provider.getClass(), AWSCredentialsProviderChain.class); + assertEquals(provider.getCredentials().getAWSAccessKeyId(), "arg"); + assertEquals(provider.getCredentials().getAWSSecretKey(), "blank"); + } + + @Test + public void testProfileProviderWithTwoArgs() { + AWSCredentialsProvider provider = decoder.decodeValue(credentialName2 + + "|arg1|arg2"); + assertEquals(provider.getClass(), AWSCredentialsProviderChain.class); + assertEquals(provider.getCredentials().getAWSAccessKeyId(), "arg1"); + assertEquals(provider.getCredentials().getAWSSecretKey(), "arg2"); + } + + /** + * This credentials provider will always succeed + */ + public static class AlwaysSucceedCredentialsProvider implements AWSCredentialsProvider { + + @Override + public AWSCredentials getCredentials() { + return new BasicAWSCredentials(TEST_ACCESS_KEY_ID, TEST_SECRET_KEY); + } + + @Override + public void refresh() { + } + + } + + /** + * This credentials provider needs a constructor call to instantiate it + */ + public static class ConstructorCredentialsProvider implements AWSCredentialsProvider { + + private String arg1; + private String arg2; + + public ConstructorCredentialsProvider(String arg1) { + this.arg1 = arg1; + this.arg2 = "blank"; + } + + public ConstructorCredentialsProvider(String arg1, String arg2) { + this.arg1 = arg1; + this.arg2 = arg2; + } + + @Override + public AWSCredentials getCredentials() { + return new BasicAWSCredentials(arg1, arg2); + } + + @Override + public void refresh() { + } + + } +} diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java index e71948c9..4874a164 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java @@ -62,6 +62,7 @@ public class KinesisClientLibConfigurationTest { // Test constructor with all valid arguments. config = new KinesisClientLibConfiguration(TEST_STRING, + TEST_STRING, TEST_STRING, TEST_STRING, InitialPositionInStream.LATEST, @@ -99,6 +100,7 @@ public class KinesisClientLibConfigurationTest { try { config = new KinesisClientLibConfiguration(TEST_STRING, + TEST_STRING, TEST_STRING, TEST_STRING, InitialPositionInStream.LATEST, @@ -132,6 +134,7 @@ public class KinesisClientLibConfigurationTest { try { config = new KinesisClientLibConfiguration(TEST_STRING, + TEST_STRING, TEST_STRING, TEST_STRING, InitialPositionInStream.LATEST, @@ -209,7 +212,7 @@ public class KinesisClientLibConfigurationTest { AmazonDynamoDBClient dclient = Mockito.mock(AmazonDynamoDBClient.class); AmazonCloudWatchClient cclient = Mockito.mock(AmazonCloudWatchClient.class); Region region = RegionUtils.getRegion("us-west-2"); - + AWSCredentialsProvider credentialsProvider = Mockito.mock(AWSCredentialsProvider.class); KinesisClientLibConfiguration kclConfig = new KinesisClientLibConfiguration("Test", "Test", credentialsProvider, "0") @@ -262,7 +265,7 @@ public class KinesisClientLibConfigurationTest { Mockito.verify(kclConfig, Mockito.times(9)).getRegionName(); Mockito.verify(kclConfig, Mockito.times(4)).getKinesisEndpoint(); - + kclConfig = Mockito.spy( new KinesisClientLibConfiguration("Test", "Test", credentialsProvider, "0") .withKinesisEndpoint("https://kinesis.eu-west-1.amazonaws.com")); @@ -294,6 +297,7 @@ public class KinesisClientLibConfigurationTest { Mockito.mock(AWSCredentialsProvider.class); try { new KinesisClientLibConfiguration(TEST_STRING, + TEST_STRING, TEST_STRING, TEST_STRING, null,