diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index 0f930053..37e609b5 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -154,6 +154,13 @@ public class KinesisClientLibConfiguration { */ public static final int DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY = 10; + /* + * The Worker will skip shard sync during initialization if there are one or more leases in the lease table. + * This assumes that the shards and leases are in-sync. + * This enables customers to choose faster startup times (e.g. during incremental deployments of an application). + */ + public static final boolean DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST = false; + /** * Default Shard prioritization strategy. */ @@ -191,6 +198,8 @@ public class KinesisClientLibConfiguration { private int initialLeaseTableReadCapacity; private int initialLeaseTableWriteCapacity; private InitialPositionInStreamExtended initialPositionInStreamExtended; + // This is useful for optimizing deployments to large fleets working on a stable stream. + private boolean skipShardSyncAtWorkerInitializationIfLeasesExist; private ShardPrioritization shardPrioritization; /** @@ -338,6 +347,7 @@ public class KinesisClientLibConfiguration { this.initialLeaseTableWriteCapacity = DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY; this.initialPositionInStreamExtended = InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream); + this.skipShardSyncAtWorkerInitializationIfLeasesExist = DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST; this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION; } @@ -561,6 +571,13 @@ public class KinesisClientLibConfiguration { return regionName; } + /** + * @return true if Worker should skip syncing shards and leases at startup if leases are present + */ + public boolean getSkipShardSyncAtWorkerInitializationIfLeasesExist() { + return skipShardSyncAtWorkerInitializationIfLeasesExist; + } + /** * @return Max leases this Worker can handle at a time */ @@ -862,6 +879,21 @@ public class KinesisClientLibConfiguration { return this; } + /** + * If set to true, the Worker will not sync shards and leases during initialization if there are one or more leases + * in the lease table. This assumes that the shards and leases are in-sync. + * This enables customers to choose faster startup times (e.g. during incremental deployments of an application). + * + * @param skipShardSyncAtStartupIfLeasesExist Should Worker skip syncing shards and leases at startup (Worker + * initialization). + * @return KinesisClientLibConfiguration + */ + public KinesisClientLibConfiguration withSkipShardSyncAtStartupIfLeasesExist( + boolean skipShardSyncAtStartupIfLeasesExist) { + this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtStartupIfLeasesExist; + return this; + } + /** * * @param regionName The region name for the service diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java index db0970d5..6ee34880 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java @@ -73,7 +73,8 @@ class ProcessTask implements ITask { IRecordProcessor recordProcessor, RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher, - long backoffTimeMillis) { + long backoffTimeMillis, + boolean skipShardSyncAtWorkerInitializationIfLeasesExist) { super(); this.shardInfo = shardInfo; this.recordProcessor = recordProcessor; @@ -82,13 +83,19 @@ class ProcessTask implements ITask { this.streamConfig = streamConfig; this.backoffTimeMillis = backoffTimeMillis; IKinesisProxy kinesisProxy = this.streamConfig.getStreamProxy(); - if (kinesisProxy instanceof IKinesisProxyExtended) { + // If skipShardSyncAtWorkerInitializationIfLeasesExist is set, we will not get the shard for + // this ProcessTask. In this case, duplicate KPL user records in the event of resharding will + // not be dropped during deaggregation of Amazon Kinesis records. This is only applicable if + // KPL is used for ingestion and KPL's aggregation feature is used. + if (!skipShardSyncAtWorkerInitializationIfLeasesExist && kinesisProxy instanceof IKinesisProxyExtended) { this.shard = ((IKinesisProxyExtended) kinesisProxy).getShard(this.shardInfo.getShardId()); } else { + this.shard = null; + } + if (this.shard == null && !skipShardSyncAtWorkerInitializationIfLeasesExist) { LOG.warn("Cannot get the shard for this ProcessTask, so duplicate KPL user records " + "in the event of resharding will not be dropped during deaggregation of Amazon " + "Kinesis records."); - this.shard = null; } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java index b6cc76aa..b9e8d2df 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java @@ -58,6 +58,7 @@ class ShardConsumer { private final long parentShardPollIntervalMillis; private final boolean cleanupLeasesOfCompletedShards; private final long taskBackoffTimeMillis; + private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist; private ITask currentTask; private long currentTaskSubmitTime; @@ -96,7 +97,8 @@ class ShardConsumer { boolean cleanupLeasesOfCompletedShards, ExecutorService executorService, IMetricsFactory metricsFactory, - long backoffTimeMillis) { + long backoffTimeMillis, + boolean skipShardSyncAtWorkerInitializationIfLeasesExist) { this.streamConfig = streamConfig; this.recordProcessor = recordProcessor; this.executorService = executorService; @@ -114,6 +116,7 @@ class ShardConsumer { this.parentShardPollIntervalMillis = parentShardPollIntervalMillis; this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards; this.taskBackoffTimeMillis = backoffTimeMillis; + this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist; } /** @@ -262,7 +265,8 @@ class ShardConsumer { recordProcessor, recordProcessorCheckpointer, dataFetcher, - taskBackoffTimeMillis); + taskBackoffTimeMillis, + skipShardSyncAtWorkerInitializationIfLeasesExist); break; case SHUTTING_DOWN: nextTask = diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 32efa442..09c31d87 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -70,7 +70,7 @@ public class Worker implements Runnable { private final long idleTimeInMilliseconds; // Backoff time when polling to check if application has finished processing // parent shards - private final long parentShardPollIntervalMillis; + private final long parentShardPollIntervalMillis; private final ExecutorService executorService; private final IMetricsFactory metricsFactory; // Backoff time when running tasks if they encounter exceptions @@ -91,6 +91,8 @@ public class Worker implements Runnable { private ConcurrentMap shardInfoShardConsumerMap = new ConcurrentHashMap(); private final boolean cleanupLeasesUponShardCompletion; + + private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist; /** * Constructor. @@ -208,7 +210,7 @@ public class Worker implements Runnable { ExecutorService execService) { this( config.getApplicationName(), - new V1ToV2RecordProcessorFactoryAdapter(recordProcessorFactory), + new V1ToV2RecordProcessorFactoryAdapter(recordProcessorFactory), new StreamConfig( new KinesisProxyFactory(config.getKinesisCredentialsProvider(), kinesisClient) .getProxy(config.getStreamName()), @@ -235,7 +237,9 @@ public class Worker implements Runnable { metricsFactory, config.getTaskBackoffTimeMillis(), config.getFailoverTimeMillis(), + config.getSkipShardSyncAtWorkerInitializationIfLeasesExist(), config.getShardPrioritizationStrategy()); + // If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB. if (config.getRegionName() != null) { Region region = RegionUtils.getRegion(config.getRegionName()); @@ -292,6 +296,7 @@ public class Worker implements Runnable { IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, + boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) { this.applicationName = applicationName; this.recordProcessorFactory = recordProcessorFactory; @@ -313,7 +318,8 @@ public class Worker implements Runnable { metricsFactory, executorService); this.taskBackoffTimeMillis = taskBackoffTimeMillis; - this.failoverTimeMillis = failoverTimeMillis; + this.failoverTimeMillis = failoverTimeMillis; + this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist; this.shardPrioritization = shardPrioritization; } @@ -395,16 +401,22 @@ public class Worker implements Runnable { LOG.info("Initializing LeaseCoordinator"); leaseCoordinator.initialize(); - LOG.info("Syncing Kinesis shard info"); - ShardSyncTask shardSyncTask = - new ShardSyncTask(streamConfig.getStreamProxy(), - leaseCoordinator.getLeaseManager(), - initialPosition, - cleanupLeasesUponShardCompletion, - 0L); - TaskResult result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call(); + TaskResult result = null; + if (!skipShardSyncAtWorkerInitializationIfLeasesExist + || leaseCoordinator.getLeaseManager().isLeaseTableEmpty()) { + LOG.info("Syncing Kinesis shard info"); + ShardSyncTask shardSyncTask = + new ShardSyncTask(streamConfig.getStreamProxy(), + leaseCoordinator.getLeaseManager(), + initialPosition, + cleanupLeasesUponShardCompletion, + 0L); + result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call(); + } else { + LOG.info("Skipping shard sync per config setting (and lease table is not empty)"); + } - if (result.getException() == null) { + if (result == null || result.getException() == null) { if (!leaseCoordinator.isRunning()) { LOG.info("Starting LeaseCoordinator"); leaseCoordinator.start(); @@ -567,7 +579,7 @@ public class Worker implements Runnable { return new ShardConsumer(shardInfo, streamConfig, checkpointTracker, recordProcessor, leaseCoordinator.getLeaseManager(), parentShardPollIntervalMillis, cleanupLeasesUponShardCompletion, - executorService, metricsFactory, taskBackoffTimeMillis); + executorService, metricsFactory, taskBackoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist); } @@ -991,7 +1003,9 @@ public class Worker implements Runnable { metricsFactory, config.getTaskBackoffTimeMillis(), config.getFailoverTimeMillis(), + config.getSkipShardSyncAtWorkerInitializationIfLeasesExist(), shardPrioritization); + } } diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCoordinator.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCoordinator.java index e356ac49..55c26674 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCoordinator.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCoordinator.java @@ -70,6 +70,7 @@ public class LeaseCoordinator { // Package level access for testing. static final int MAX_LEASE_RENEWAL_THREAD_COUNT = 20; + private final ILeaseRenewer leaseRenewer; private final ILeaseTaker leaseTaker; private final long renewerIntervalMillis; diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java index 3a4d408c..226756eb 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Amazon Software License (the "License"). * You may not use this file except in compliance with the License. @@ -214,6 +214,14 @@ public class LeaseManager implements ILeaseManager { return list(null); } + /** + * {@inheritDoc} + */ + @Override + public boolean isLeaseTableEmpty() throws DependencyException, InvalidStateException, ProvisionedThroughputException { + return list(1).isEmpty(); + } + /** * List with the given page size. Package access for integration testing. * diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/interfaces/ILeaseManager.java b/src/main/java/com/amazonaws/services/kinesis/leases/interfaces/ILeaseManager.java index 06d6d432..ab296cc1 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/interfaces/ILeaseManager.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/interfaces/ILeaseManager.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Amazon Software License (the "License"). * You may not use this file except in compliance with the License. @@ -180,4 +180,15 @@ public interface ILeaseManager { public boolean updateLease(T lease) throws DependencyException, InvalidStateException, ProvisionedThroughputException; + /** + * Check (synchronously) if there are any leases in the lease table. + * + * @return true if there are no leases in the lease table + * + * @throws DependencyException if DynamoDB scan fails in an unexpected way + * @throws InvalidStateException if lease table does not exist + * @throws ProvisionedThroughputException if DynamoDB scan fails due to lack of capacity + */ + public boolean isLeaseTableEmpty() throws DependencyException, InvalidStateException, ProvisionedThroughputException; + } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ExceptionThrowingLeaseManager.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ExceptionThrowingLeaseManager.java index 765aaa44..2a07d1ed 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ExceptionThrowingLeaseManager.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ExceptionThrowingLeaseManager.java @@ -212,4 +212,10 @@ class ExceptionThrowingLeaseManager implements ILeaseManager leaseManager.deleteAll(); } + @Override + public boolean isLeaseTableEmpty() throws DependencyException, + InvalidStateException, ProvisionedThroughputException { + return false; + } + } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java index f1d908f0..4d32566e 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java @@ -89,7 +89,8 @@ public class ProcessTaskTest { INITIAL_POSITION_LATEST); final ShardInfo shardInfo = new ShardInfo(shardId, null, null, null); processTask = new ProcessTask( - shardInfo, config, mockRecordProcessor, mockCheckpointer, mockDataFetcher, taskBackoffTimeMillis); + shardInfo, config, mockRecordProcessor, mockCheckpointer, mockDataFetcher, taskBackoffTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); } @Test diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java index 26337381..755d08a4 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java @@ -117,7 +117,8 @@ public class ShardConsumerTest { cleanupLeasesOfCompletedShards, executorService, metricsFactory, - taskBackoffTimeMillis); + taskBackoffTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // initialize @@ -167,7 +168,8 @@ public class ShardConsumerTest { cleanupLeasesOfCompletedShards, spyExecutorService, metricsFactory, - taskBackoffTimeMillis); + taskBackoffTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // initialize @@ -211,7 +213,8 @@ public class ShardConsumerTest { cleanupLeasesOfCompletedShards, executorService, metricsFactory, - taskBackoffTimeMillis); + taskBackoffTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); when(leaseManager.getLease(anyString())).thenReturn(null); when(checkpoint.getCheckpoint(anyString())).thenReturn(new ExtendedSequenceNumber("123")); @@ -300,7 +303,8 @@ public class ShardConsumerTest { cleanupLeasesOfCompletedShards, executorService, metricsFactory, - taskBackoffTimeMillis); + taskBackoffTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // check on parent shards @@ -390,7 +394,8 @@ public class ShardConsumerTest { cleanupLeasesOfCompletedShards, executorService, metricsFactory, - taskBackoffTimeMillis); + taskBackoffTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // check on parent shards diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index 0747f83d..53d89dc2 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -224,6 +224,7 @@ public class WorkerTest { nullMetricsFactory, taskBackoffTimeMillis, failoverTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, shardPrioritization); ShardInfo shardInfo = new ShardInfo(dummyKinesisShardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); ShardConsumer consumer = worker.createOrGetShardConsumer(shardInfo, streamletFactory); @@ -263,7 +264,7 @@ public class WorkerTest { Worker worker = new Worker(stageName, streamletFactory, streamConfig, INITIAL_POSITION_LATEST, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService, nullMetricsFactory, taskBackoffTimeMillis, failoverTimeMillis, - shardPrioritization); + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, shardPrioritization); Worker workerSpy = spy(worker); @@ -327,6 +328,7 @@ public class WorkerTest { nullMetricsFactory, taskBackoffTimeMillis, failoverTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, shardPrioritization); ShardInfo shardInfo1 = new ShardInfo(dummyKinesisShardId, concurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); @@ -380,6 +382,7 @@ public class WorkerTest { nullMetricsFactory, taskBackoffTimeMillis, failoverTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, shardPrioritization); worker.run(); Assert.assertTrue(count > 0); @@ -823,8 +826,9 @@ public class WorkerTest { metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, shardPrioritization); - + WorkerThread workerThread = new WorkerThread(worker); workerThread.start(); return workerThread;