From 5c497d87a958068e3616c742d3762ae0fad64969 Mon Sep 17 00:00:00 2001 From: Randy Findley Date: Fri, 14 Oct 2016 11:10:38 -0400 Subject: [PATCH 1/2] Feature/multiple cred providers (#111) * Add ability to specify different credential providers for Kinesis, DynamoDB, and CloudWatch. This is needed when accessing a cross-account Kineses stream using an assumed role. * Fix copy/paste mistake. * Update tests. Thanks to rgfindl@ --- .../config/KinesisClientLibConfigurator.java | 28 +++- .../KinesisClientLibConfiguratorTest.java | 152 ++++++++++++++++++ 2 files changed, 176 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/config/KinesisClientLibConfigurator.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/config/KinesisClientLibConfigurator.java index 027dde22..e239f967 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/config/KinesisClientLibConfigurator.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/config/KinesisClientLibConfigurator.java @@ -50,7 +50,9 @@ public class KinesisClientLibConfigurator { // Required properties private static final String PROP_APP_NAME = "applicationName"; private static final String PROP_STREAM_NAME = "streamName"; - private static final String PROP_CREDENTIALS_PROVIDER = "AWSCredentialsProvider"; + private static final String PROP_CREDENTIALS_PROVIDER_KINESIS = "AWSCredentialsProvider"; + private static final String PROP_CREDENTIALS_PROVIDER_DYNAMODB = "AWSCredentialsProviderDynamoDB"; + private static final String PROP_CREDENTIALS_PROVIDER_CLOUDWATCH = "AWSCredentialsProviderCloudWatch"; private static final String PROP_WORKER_ID = "workerId"; private Map, IPropertyValueDecoder> classToDecoder; @@ -107,7 +109,7 @@ public class KinesisClientLibConfigurator { String applicationName = stringValueDecoder.decodeValue(properties.getProperty(PROP_APP_NAME)); String streamName = stringValueDecoder.decodeValue(properties.getProperty(PROP_STREAM_NAME)); AWSCredentialsProvider provider = - awsCPPropGetter.decodeValue(properties.getProperty(PROP_CREDENTIALS_PROVIDER)); + awsCPPropGetter.decodeValue(properties.getProperty(PROP_CREDENTIALS_PROVIDER_KINESIS)); if (applicationName == null || applicationName.isEmpty()) { throw new IllegalArgumentException("Value of applicationName should be explicitly provided."); @@ -116,6 +118,24 @@ public class KinesisClientLibConfigurator { throw new IllegalArgumentException("Value of streamName should be explicitly provided."); } + // Decode the DynamoDB credentials provider if it exists. If not use the Kinesis credentials provider. + AWSCredentialsProvider providerDynamoDB; + String propCredentialsProviderDynamoDBValue = properties.getProperty(PROP_CREDENTIALS_PROVIDER_DYNAMODB); + if (propCredentialsProviderDynamoDBValue == null) { + providerDynamoDB = provider; + } else { + providerDynamoDB = awsCPPropGetter.decodeValue(propCredentialsProviderDynamoDBValue); + } + + // Decode the CloudWatch credentials provider if it exists. If not use the Kinesis credentials provider. + AWSCredentialsProvider providerCloudWatch; + String propCredentialsProviderCloudWatchValue = properties.getProperty(PROP_CREDENTIALS_PROVIDER_CLOUDWATCH); + if (propCredentialsProviderCloudWatchValue == null) { + providerCloudWatch = provider; + } else { + providerCloudWatch = awsCPPropGetter.decodeValue(propCredentialsProviderCloudWatchValue); + } + // Allow customer not to provide workerId or to provide empty worker id. String workerId = stringValueDecoder.decodeValue(properties.getProperty(PROP_WORKER_ID)); if (workerId == null || workerId.isEmpty()) { @@ -125,13 +145,13 @@ public class KinesisClientLibConfigurator { } KinesisClientLibConfiguration config = - new KinesisClientLibConfiguration(applicationName, streamName, provider, workerId); + new KinesisClientLibConfiguration(applicationName, streamName, provider, providerDynamoDB, providerCloudWatch, workerId); Set requiredNames = new HashSet(Arrays.asList(PROP_STREAM_NAME, PROP_APP_NAME, PROP_WORKER_ID, - PROP_CREDENTIALS_PROVIDER)); + PROP_CREDENTIALS_PROVIDER_KINESIS)); // Set all the variables that are not used for constructor. for (Object keyObject : properties.keySet()) { diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/config/KinesisClientLibConfiguratorTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/config/KinesisClientLibConfiguratorTest.java index 1ccd4941..cbdd0a2d 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/config/KinesisClientLibConfiguratorTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/config/KinesisClientLibConfiguratorTest.java @@ -40,6 +40,12 @@ public class KinesisClientLibConfiguratorTest { "com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfiguratorTest$AlwaysSucceedCredentialsProvider"; private String credentialName2 = "com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfiguratorTest$AlwaysFailCredentialsProvider"; + private String credentialNameKinesis = + "com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfiguratorTest$AlwaysSucceedCredentialsProviderKinesis"; + private String credentialNameDynamoDB = + "com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfiguratorTest$AlwaysSucceedCredentialsProviderDynamoDB"; + private String credentialNameCloudWatch = + "com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfiguratorTest$AlwaysSucceedCredentialsProviderCloudWatch"; private KinesisClientLibConfigurator configurator = new KinesisClientLibConfigurator(); @Test @@ -329,6 +335,74 @@ public class KinesisClientLibConfiguratorTest { } } + @Test + public void testWithDifferentAWSCredentialsForDynamoDBAndCloudWatch() { + String test = StringUtils.join(new String[] { + "streamName = a", + "applicationName = b", + "AWSCredentialsProvider = " + credentialNameKinesis, + "AWSCredentialsProviderDynamoDB = " + credentialNameDynamoDB, + "AWSCredentialsProviderCloudWatch = " + credentialNameCloudWatch, + "failoverTimeMillis = 100", + "shardSyncIntervalMillis = 500" + }, '\n'); + InputStream input = new ByteArrayInputStream(test.getBytes()); + + // separate input stream with getConfiguration to explicitly catch exception from the getConfiguration statement + KinesisClientLibConfiguration config = configurator.getConfiguration(input); + try { + config.getKinesisCredentialsProvider().getCredentials(); + } catch (Exception e) { + fail("Kinesis credential providers should not fail."); + } + try { + config.getDynamoDBCredentialsProvider().getCredentials(); + } catch (Exception e) { + fail("DynamoDB credential providers should not fail."); + } + try { + config.getCloudWatchCredentialsProvider().getCredentials(); + } catch (Exception e) { + fail("CloudWatch credential providers should not fail."); + } + } + + @Test + public void testWithDifferentAWSCredentialsForDynamoDBAndCloudWatchFailed() { + String test = StringUtils.join(new String[] { + "streamName = a", + "applicationName = b", + "AWSCredentialsProvider = " + credentialNameKinesis, + "AWSCredentialsProviderDynamoDB = " + credentialName1, + "AWSCredentialsProviderCloudWatch = " + credentialName1, + "failoverTimeMillis = 100", + "shardSyncIntervalMillis = 500" + }, '\n'); + InputStream input = new ByteArrayInputStream(test.getBytes()); + + // separate input stream with getConfiguration to explicitly catch exception from the getConfiguration statement + + // separate input stream with getConfiguration to explicitly catch exception from the getConfiguration statement + KinesisClientLibConfiguration config = configurator.getConfiguration(input); + try { + config.getKinesisCredentialsProvider().getCredentials(); + } catch (Exception e) { + fail("Kinesis credential providers should not fail."); + } + try { + config.getDynamoDBCredentialsProvider().getCredentials(); + fail("DynamoDB credential providers should fail."); + } catch (Exception e) { + // succeed + } + try { + config.getCloudWatchCredentialsProvider().getCredentials(); + fail("CloudWatch credential providers should fail."); + } catch (Exception e) { + // succeed + } + } + /** * This credentials provider will always succeed */ @@ -345,6 +419,84 @@ public class KinesisClientLibConfiguratorTest { } + /** + * This credentials provider will always succeed + */ + public static class AlwaysSucceedCredentialsProviderKinesis implements AWSCredentialsProvider { + + @Override + public AWSCredentials getCredentials() { + return new AWSCredentials() { + @Override + public String getAWSAccessKeyId() { + return ""; + } + + @Override + public String getAWSSecretKey() { + return ""; + } + }; + } + + @Override + public void refresh() { + } + + } + + /** + * This credentials provider will always succeed + */ + public static class AlwaysSucceedCredentialsProviderDynamoDB implements AWSCredentialsProvider { + + @Override + public AWSCredentials getCredentials() { + return new AWSCredentials() { + @Override + public String getAWSAccessKeyId() { + return ""; + } + + @Override + public String getAWSSecretKey() { + return ""; + } + }; + } + + @Override + public void refresh() { + } + + } + + /** + * This credentials provider will always succeed + */ + public static class AlwaysSucceedCredentialsProviderCloudWatch implements AWSCredentialsProvider { + + @Override + public AWSCredentials getCredentials() { + return new AWSCredentials() { + @Override + public String getAWSAccessKeyId() { + return ""; + } + + @Override + public String getAWSSecretKey() { + return ""; + } + }; + } + + @Override + public void refresh() { + } + + } + /** * This credentials provider will always fail */ From 56c59f685ae0c0ec2daa0ee5355a602b8c1c9c9c Mon Sep 17 00:00:00 2001 From: Justin Pfifer Date: Fri, 14 Oct 2016 08:11:32 -0700 Subject: [PATCH 2/2] Allow Disabling Shard Sync at Startup (#102) Allow disabling the shard sync at startup if the lease table already contains leases. This will reduce the startup load for larger streams when restarted the Kinesis application. --- .../worker/KinesisClientLibConfiguration.java | 32 +++++++++++++++ .../clientlibrary/lib/worker/ProcessTask.java | 13 ++++-- .../lib/worker/ShardConsumer.java | 8 +++- .../clientlibrary/lib/worker/Worker.java | 40 +++++++++++++------ .../kinesis/leases/impl/LeaseCoordinator.java | 1 + .../kinesis/leases/impl/LeaseManager.java | 10 ++++- .../leases/interfaces/ILeaseManager.java | 13 +++++- .../worker/ExceptionThrowingLeaseManager.java | 6 +++ .../lib/worker/ProcessTaskTest.java | 3 +- .../lib/worker/ShardConsumerTest.java | 15 ++++--- .../clientlibrary/lib/worker/WorkerTest.java | 8 +++- 11 files changed, 121 insertions(+), 28 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index 0f930053..37e609b5 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -154,6 +154,13 @@ public class KinesisClientLibConfiguration { */ public static final int DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY = 10; + /* + * The Worker will skip shard sync during initialization if there are one or more leases in the lease table. + * This assumes that the shards and leases are in-sync. + * This enables customers to choose faster startup times (e.g. during incremental deployments of an application). + */ + public static final boolean DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST = false; + /** * Default Shard prioritization strategy. */ @@ -191,6 +198,8 @@ public class KinesisClientLibConfiguration { private int initialLeaseTableReadCapacity; private int initialLeaseTableWriteCapacity; private InitialPositionInStreamExtended initialPositionInStreamExtended; + // This is useful for optimizing deployments to large fleets working on a stable stream. + private boolean skipShardSyncAtWorkerInitializationIfLeasesExist; private ShardPrioritization shardPrioritization; /** @@ -338,6 +347,7 @@ public class KinesisClientLibConfiguration { this.initialLeaseTableWriteCapacity = DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY; this.initialPositionInStreamExtended = InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream); + this.skipShardSyncAtWorkerInitializationIfLeasesExist = DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST; this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION; } @@ -561,6 +571,13 @@ public class KinesisClientLibConfiguration { return regionName; } + /** + * @return true if Worker should skip syncing shards and leases at startup if leases are present + */ + public boolean getSkipShardSyncAtWorkerInitializationIfLeasesExist() { + return skipShardSyncAtWorkerInitializationIfLeasesExist; + } + /** * @return Max leases this Worker can handle at a time */ @@ -862,6 +879,21 @@ public class KinesisClientLibConfiguration { return this; } + /** + * If set to true, the Worker will not sync shards and leases during initialization if there are one or more leases + * in the lease table. This assumes that the shards and leases are in-sync. + * This enables customers to choose faster startup times (e.g. during incremental deployments of an application). + * + * @param skipShardSyncAtStartupIfLeasesExist Should Worker skip syncing shards and leases at startup (Worker + * initialization). + * @return KinesisClientLibConfiguration + */ + public KinesisClientLibConfiguration withSkipShardSyncAtStartupIfLeasesExist( + boolean skipShardSyncAtStartupIfLeasesExist) { + this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtStartupIfLeasesExist; + return this; + } + /** * * @param regionName The region name for the service diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java index db0970d5..6ee34880 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java @@ -73,7 +73,8 @@ class ProcessTask implements ITask { IRecordProcessor recordProcessor, RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher, - long backoffTimeMillis) { + long backoffTimeMillis, + boolean skipShardSyncAtWorkerInitializationIfLeasesExist) { super(); this.shardInfo = shardInfo; this.recordProcessor = recordProcessor; @@ -82,13 +83,19 @@ class ProcessTask implements ITask { this.streamConfig = streamConfig; this.backoffTimeMillis = backoffTimeMillis; IKinesisProxy kinesisProxy = this.streamConfig.getStreamProxy(); - if (kinesisProxy instanceof IKinesisProxyExtended) { + // If skipShardSyncAtWorkerInitializationIfLeasesExist is set, we will not get the shard for + // this ProcessTask. In this case, duplicate KPL user records in the event of resharding will + // not be dropped during deaggregation of Amazon Kinesis records. This is only applicable if + // KPL is used for ingestion and KPL's aggregation feature is used. + if (!skipShardSyncAtWorkerInitializationIfLeasesExist && kinesisProxy instanceof IKinesisProxyExtended) { this.shard = ((IKinesisProxyExtended) kinesisProxy).getShard(this.shardInfo.getShardId()); } else { + this.shard = null; + } + if (this.shard == null && !skipShardSyncAtWorkerInitializationIfLeasesExist) { LOG.warn("Cannot get the shard for this ProcessTask, so duplicate KPL user records " + "in the event of resharding will not be dropped during deaggregation of Amazon " + "Kinesis records."); - this.shard = null; } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java index b6cc76aa..b9e8d2df 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java @@ -58,6 +58,7 @@ class ShardConsumer { private final long parentShardPollIntervalMillis; private final boolean cleanupLeasesOfCompletedShards; private final long taskBackoffTimeMillis; + private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist; private ITask currentTask; private long currentTaskSubmitTime; @@ -96,7 +97,8 @@ class ShardConsumer { boolean cleanupLeasesOfCompletedShards, ExecutorService executorService, IMetricsFactory metricsFactory, - long backoffTimeMillis) { + long backoffTimeMillis, + boolean skipShardSyncAtWorkerInitializationIfLeasesExist) { this.streamConfig = streamConfig; this.recordProcessor = recordProcessor; this.executorService = executorService; @@ -114,6 +116,7 @@ class ShardConsumer { this.parentShardPollIntervalMillis = parentShardPollIntervalMillis; this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards; this.taskBackoffTimeMillis = backoffTimeMillis; + this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist; } /** @@ -262,7 +265,8 @@ class ShardConsumer { recordProcessor, recordProcessorCheckpointer, dataFetcher, - taskBackoffTimeMillis); + taskBackoffTimeMillis, + skipShardSyncAtWorkerInitializationIfLeasesExist); break; case SHUTTING_DOWN: nextTask = diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 32efa442..09c31d87 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -70,7 +70,7 @@ public class Worker implements Runnable { private final long idleTimeInMilliseconds; // Backoff time when polling to check if application has finished processing // parent shards - private final long parentShardPollIntervalMillis; + private final long parentShardPollIntervalMillis; private final ExecutorService executorService; private final IMetricsFactory metricsFactory; // Backoff time when running tasks if they encounter exceptions @@ -91,6 +91,8 @@ public class Worker implements Runnable { private ConcurrentMap shardInfoShardConsumerMap = new ConcurrentHashMap(); private final boolean cleanupLeasesUponShardCompletion; + + private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist; /** * Constructor. @@ -208,7 +210,7 @@ public class Worker implements Runnable { ExecutorService execService) { this( config.getApplicationName(), - new V1ToV2RecordProcessorFactoryAdapter(recordProcessorFactory), + new V1ToV2RecordProcessorFactoryAdapter(recordProcessorFactory), new StreamConfig( new KinesisProxyFactory(config.getKinesisCredentialsProvider(), kinesisClient) .getProxy(config.getStreamName()), @@ -235,7 +237,9 @@ public class Worker implements Runnable { metricsFactory, config.getTaskBackoffTimeMillis(), config.getFailoverTimeMillis(), + config.getSkipShardSyncAtWorkerInitializationIfLeasesExist(), config.getShardPrioritizationStrategy()); + // If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB. if (config.getRegionName() != null) { Region region = RegionUtils.getRegion(config.getRegionName()); @@ -292,6 +296,7 @@ public class Worker implements Runnable { IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, + boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) { this.applicationName = applicationName; this.recordProcessorFactory = recordProcessorFactory; @@ -313,7 +318,8 @@ public class Worker implements Runnable { metricsFactory, executorService); this.taskBackoffTimeMillis = taskBackoffTimeMillis; - this.failoverTimeMillis = failoverTimeMillis; + this.failoverTimeMillis = failoverTimeMillis; + this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist; this.shardPrioritization = shardPrioritization; } @@ -395,16 +401,22 @@ public class Worker implements Runnable { LOG.info("Initializing LeaseCoordinator"); leaseCoordinator.initialize(); - LOG.info("Syncing Kinesis shard info"); - ShardSyncTask shardSyncTask = - new ShardSyncTask(streamConfig.getStreamProxy(), - leaseCoordinator.getLeaseManager(), - initialPosition, - cleanupLeasesUponShardCompletion, - 0L); - TaskResult result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call(); + TaskResult result = null; + if (!skipShardSyncAtWorkerInitializationIfLeasesExist + || leaseCoordinator.getLeaseManager().isLeaseTableEmpty()) { + LOG.info("Syncing Kinesis shard info"); + ShardSyncTask shardSyncTask = + new ShardSyncTask(streamConfig.getStreamProxy(), + leaseCoordinator.getLeaseManager(), + initialPosition, + cleanupLeasesUponShardCompletion, + 0L); + result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call(); + } else { + LOG.info("Skipping shard sync per config setting (and lease table is not empty)"); + } - if (result.getException() == null) { + if (result == null || result.getException() == null) { if (!leaseCoordinator.isRunning()) { LOG.info("Starting LeaseCoordinator"); leaseCoordinator.start(); @@ -567,7 +579,7 @@ public class Worker implements Runnable { return new ShardConsumer(shardInfo, streamConfig, checkpointTracker, recordProcessor, leaseCoordinator.getLeaseManager(), parentShardPollIntervalMillis, cleanupLeasesUponShardCompletion, - executorService, metricsFactory, taskBackoffTimeMillis); + executorService, metricsFactory, taskBackoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist); } @@ -991,7 +1003,9 @@ public class Worker implements Runnable { metricsFactory, config.getTaskBackoffTimeMillis(), config.getFailoverTimeMillis(), + config.getSkipShardSyncAtWorkerInitializationIfLeasesExist(), shardPrioritization); + } } diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCoordinator.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCoordinator.java index e356ac49..55c26674 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCoordinator.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCoordinator.java @@ -70,6 +70,7 @@ public class LeaseCoordinator { // Package level access for testing. static final int MAX_LEASE_RENEWAL_THREAD_COUNT = 20; + private final ILeaseRenewer leaseRenewer; private final ILeaseTaker leaseTaker; private final long renewerIntervalMillis; diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java index 3a4d408c..226756eb 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Amazon Software License (the "License"). * You may not use this file except in compliance with the License. @@ -214,6 +214,14 @@ public class LeaseManager implements ILeaseManager { return list(null); } + /** + * {@inheritDoc} + */ + @Override + public boolean isLeaseTableEmpty() throws DependencyException, InvalidStateException, ProvisionedThroughputException { + return list(1).isEmpty(); + } + /** * List with the given page size. Package access for integration testing. * diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/interfaces/ILeaseManager.java b/src/main/java/com/amazonaws/services/kinesis/leases/interfaces/ILeaseManager.java index 06d6d432..ab296cc1 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/interfaces/ILeaseManager.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/interfaces/ILeaseManager.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Amazon Software License (the "License"). * You may not use this file except in compliance with the License. @@ -180,4 +180,15 @@ public interface ILeaseManager { public boolean updateLease(T lease) throws DependencyException, InvalidStateException, ProvisionedThroughputException; + /** + * Check (synchronously) if there are any leases in the lease table. + * + * @return true if there are no leases in the lease table + * + * @throws DependencyException if DynamoDB scan fails in an unexpected way + * @throws InvalidStateException if lease table does not exist + * @throws ProvisionedThroughputException if DynamoDB scan fails due to lack of capacity + */ + public boolean isLeaseTableEmpty() throws DependencyException, InvalidStateException, ProvisionedThroughputException; + } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ExceptionThrowingLeaseManager.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ExceptionThrowingLeaseManager.java index 765aaa44..2a07d1ed 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ExceptionThrowingLeaseManager.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ExceptionThrowingLeaseManager.java @@ -212,4 +212,10 @@ class ExceptionThrowingLeaseManager implements ILeaseManager leaseManager.deleteAll(); } + @Override + public boolean isLeaseTableEmpty() throws DependencyException, + InvalidStateException, ProvisionedThroughputException { + return false; + } + } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java index f1d908f0..4d32566e 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java @@ -89,7 +89,8 @@ public class ProcessTaskTest { INITIAL_POSITION_LATEST); final ShardInfo shardInfo = new ShardInfo(shardId, null, null, null); processTask = new ProcessTask( - shardInfo, config, mockRecordProcessor, mockCheckpointer, mockDataFetcher, taskBackoffTimeMillis); + shardInfo, config, mockRecordProcessor, mockCheckpointer, mockDataFetcher, taskBackoffTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); } @Test diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java index 26337381..755d08a4 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java @@ -117,7 +117,8 @@ public class ShardConsumerTest { cleanupLeasesOfCompletedShards, executorService, metricsFactory, - taskBackoffTimeMillis); + taskBackoffTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // initialize @@ -167,7 +168,8 @@ public class ShardConsumerTest { cleanupLeasesOfCompletedShards, spyExecutorService, metricsFactory, - taskBackoffTimeMillis); + taskBackoffTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // initialize @@ -211,7 +213,8 @@ public class ShardConsumerTest { cleanupLeasesOfCompletedShards, executorService, metricsFactory, - taskBackoffTimeMillis); + taskBackoffTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); when(leaseManager.getLease(anyString())).thenReturn(null); when(checkpoint.getCheckpoint(anyString())).thenReturn(new ExtendedSequenceNumber("123")); @@ -300,7 +303,8 @@ public class ShardConsumerTest { cleanupLeasesOfCompletedShards, executorService, metricsFactory, - taskBackoffTimeMillis); + taskBackoffTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // check on parent shards @@ -390,7 +394,8 @@ public class ShardConsumerTest { cleanupLeasesOfCompletedShards, executorService, metricsFactory, - taskBackoffTimeMillis); + taskBackoffTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // check on parent shards diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index 0747f83d..53d89dc2 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -224,6 +224,7 @@ public class WorkerTest { nullMetricsFactory, taskBackoffTimeMillis, failoverTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, shardPrioritization); ShardInfo shardInfo = new ShardInfo(dummyKinesisShardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); ShardConsumer consumer = worker.createOrGetShardConsumer(shardInfo, streamletFactory); @@ -263,7 +264,7 @@ public class WorkerTest { Worker worker = new Worker(stageName, streamletFactory, streamConfig, INITIAL_POSITION_LATEST, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService, nullMetricsFactory, taskBackoffTimeMillis, failoverTimeMillis, - shardPrioritization); + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, shardPrioritization); Worker workerSpy = spy(worker); @@ -327,6 +328,7 @@ public class WorkerTest { nullMetricsFactory, taskBackoffTimeMillis, failoverTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, shardPrioritization); ShardInfo shardInfo1 = new ShardInfo(dummyKinesisShardId, concurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); @@ -380,6 +382,7 @@ public class WorkerTest { nullMetricsFactory, taskBackoffTimeMillis, failoverTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, shardPrioritization); worker.run(); Assert.assertTrue(count > 0); @@ -823,8 +826,9 @@ public class WorkerTest { metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, shardPrioritization); - + WorkerThread workerThread = new WorkerThread(worker); workerThread.start(); return workerThread;