From bc44df9c10cbf02b328fcefba9810429f7f49d5d Mon Sep 17 00:00:00 2001 From: "Pfifer, Justin" Date: Thu, 1 Sep 2016 07:15:32 -0700 Subject: [PATCH] Allow Disabling Shard Sync at Startup Allow disabling the shard sync at startup if the lease table already contains leases. This will reduce the startup load for larger streams when restarted the Kinesis application. --- .../worker/KinesisClientLibConfiguration.java | 32 +++++++++++++++ .../clientlibrary/lib/worker/ProcessTask.java | 13 ++++-- .../lib/worker/ShardConsumer.java | 8 +++- .../clientlibrary/lib/worker/Worker.java | 40 +++++++++++++------ .../kinesis/leases/impl/LeaseCoordinator.java | 1 + .../kinesis/leases/impl/LeaseManager.java | 10 ++++- .../leases/interfaces/ILeaseManager.java | 13 +++++- .../worker/ExceptionThrowingLeaseManager.java | 6 +++ .../lib/worker/ProcessTaskTest.java | 3 +- .../lib/worker/ShardConsumerTest.java | 15 ++++--- .../clientlibrary/lib/worker/WorkerTest.java | 8 +++- 11 files changed, 121 insertions(+), 28 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index 0f930053..37e609b5 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -154,6 +154,13 @@ public class KinesisClientLibConfiguration { */ public static final int DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY = 10; + /* + * The Worker will skip shard sync during initialization if there are one or more leases in the lease table. + * This assumes that the shards and leases are in-sync. + * This enables customers to choose faster startup times (e.g. during incremental deployments of an application). + */ + public static final boolean DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST = false; + /** * Default Shard prioritization strategy. */ @@ -191,6 +198,8 @@ public class KinesisClientLibConfiguration { private int initialLeaseTableReadCapacity; private int initialLeaseTableWriteCapacity; private InitialPositionInStreamExtended initialPositionInStreamExtended; + // This is useful for optimizing deployments to large fleets working on a stable stream. + private boolean skipShardSyncAtWorkerInitializationIfLeasesExist; private ShardPrioritization shardPrioritization; /** @@ -338,6 +347,7 @@ public class KinesisClientLibConfiguration { this.initialLeaseTableWriteCapacity = DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY; this.initialPositionInStreamExtended = InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream); + this.skipShardSyncAtWorkerInitializationIfLeasesExist = DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST; this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION; } @@ -561,6 +571,13 @@ public class KinesisClientLibConfiguration { return regionName; } + /** + * @return true if Worker should skip syncing shards and leases at startup if leases are present + */ + public boolean getSkipShardSyncAtWorkerInitializationIfLeasesExist() { + return skipShardSyncAtWorkerInitializationIfLeasesExist; + } + /** * @return Max leases this Worker can handle at a time */ @@ -862,6 +879,21 @@ public class KinesisClientLibConfiguration { return this; } + /** + * If set to true, the Worker will not sync shards and leases during initialization if there are one or more leases + * in the lease table. This assumes that the shards and leases are in-sync. + * This enables customers to choose faster startup times (e.g. during incremental deployments of an application). + * + * @param skipShardSyncAtStartupIfLeasesExist Should Worker skip syncing shards and leases at startup (Worker + * initialization). + * @return KinesisClientLibConfiguration + */ + public KinesisClientLibConfiguration withSkipShardSyncAtStartupIfLeasesExist( + boolean skipShardSyncAtStartupIfLeasesExist) { + this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtStartupIfLeasesExist; + return this; + } + /** * * @param regionName The region name for the service diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java index db0970d5..6ee34880 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java @@ -73,7 +73,8 @@ class ProcessTask implements ITask { IRecordProcessor recordProcessor, RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher, - long backoffTimeMillis) { + long backoffTimeMillis, + boolean skipShardSyncAtWorkerInitializationIfLeasesExist) { super(); this.shardInfo = shardInfo; this.recordProcessor = recordProcessor; @@ -82,13 +83,19 @@ class ProcessTask implements ITask { this.streamConfig = streamConfig; this.backoffTimeMillis = backoffTimeMillis; IKinesisProxy kinesisProxy = this.streamConfig.getStreamProxy(); - if (kinesisProxy instanceof IKinesisProxyExtended) { + // If skipShardSyncAtWorkerInitializationIfLeasesExist is set, we will not get the shard for + // this ProcessTask. In this case, duplicate KPL user records in the event of resharding will + // not be dropped during deaggregation of Amazon Kinesis records. This is only applicable if + // KPL is used for ingestion and KPL's aggregation feature is used. + if (!skipShardSyncAtWorkerInitializationIfLeasesExist && kinesisProxy instanceof IKinesisProxyExtended) { this.shard = ((IKinesisProxyExtended) kinesisProxy).getShard(this.shardInfo.getShardId()); } else { + this.shard = null; + } + if (this.shard == null && !skipShardSyncAtWorkerInitializationIfLeasesExist) { LOG.warn("Cannot get the shard for this ProcessTask, so duplicate KPL user records " + "in the event of resharding will not be dropped during deaggregation of Amazon " + "Kinesis records."); - this.shard = null; } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java index b6cc76aa..b9e8d2df 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java @@ -58,6 +58,7 @@ class ShardConsumer { private final long parentShardPollIntervalMillis; private final boolean cleanupLeasesOfCompletedShards; private final long taskBackoffTimeMillis; + private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist; private ITask currentTask; private long currentTaskSubmitTime; @@ -96,7 +97,8 @@ class ShardConsumer { boolean cleanupLeasesOfCompletedShards, ExecutorService executorService, IMetricsFactory metricsFactory, - long backoffTimeMillis) { + long backoffTimeMillis, + boolean skipShardSyncAtWorkerInitializationIfLeasesExist) { this.streamConfig = streamConfig; this.recordProcessor = recordProcessor; this.executorService = executorService; @@ -114,6 +116,7 @@ class ShardConsumer { this.parentShardPollIntervalMillis = parentShardPollIntervalMillis; this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards; this.taskBackoffTimeMillis = backoffTimeMillis; + this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist; } /** @@ -262,7 +265,8 @@ class ShardConsumer { recordProcessor, recordProcessorCheckpointer, dataFetcher, - taskBackoffTimeMillis); + taskBackoffTimeMillis, + skipShardSyncAtWorkerInitializationIfLeasesExist); break; case SHUTTING_DOWN: nextTask = diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 32efa442..09c31d87 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -70,7 +70,7 @@ public class Worker implements Runnable { private final long idleTimeInMilliseconds; // Backoff time when polling to check if application has finished processing // parent shards - private final long parentShardPollIntervalMillis; + private final long parentShardPollIntervalMillis; private final ExecutorService executorService; private final IMetricsFactory metricsFactory; // Backoff time when running tasks if they encounter exceptions @@ -91,6 +91,8 @@ public class Worker implements Runnable { private ConcurrentMap shardInfoShardConsumerMap = new ConcurrentHashMap(); private final boolean cleanupLeasesUponShardCompletion; + + private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist; /** * Constructor. @@ -208,7 +210,7 @@ public class Worker implements Runnable { ExecutorService execService) { this( config.getApplicationName(), - new V1ToV2RecordProcessorFactoryAdapter(recordProcessorFactory), + new V1ToV2RecordProcessorFactoryAdapter(recordProcessorFactory), new StreamConfig( new KinesisProxyFactory(config.getKinesisCredentialsProvider(), kinesisClient) .getProxy(config.getStreamName()), @@ -235,7 +237,9 @@ public class Worker implements Runnable { metricsFactory, config.getTaskBackoffTimeMillis(), config.getFailoverTimeMillis(), + config.getSkipShardSyncAtWorkerInitializationIfLeasesExist(), config.getShardPrioritizationStrategy()); + // If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB. if (config.getRegionName() != null) { Region region = RegionUtils.getRegion(config.getRegionName()); @@ -292,6 +296,7 @@ public class Worker implements Runnable { IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, + boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) { this.applicationName = applicationName; this.recordProcessorFactory = recordProcessorFactory; @@ -313,7 +318,8 @@ public class Worker implements Runnable { metricsFactory, executorService); this.taskBackoffTimeMillis = taskBackoffTimeMillis; - this.failoverTimeMillis = failoverTimeMillis; + this.failoverTimeMillis = failoverTimeMillis; + this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist; this.shardPrioritization = shardPrioritization; } @@ -395,16 +401,22 @@ public class Worker implements Runnable { LOG.info("Initializing LeaseCoordinator"); leaseCoordinator.initialize(); - LOG.info("Syncing Kinesis shard info"); - ShardSyncTask shardSyncTask = - new ShardSyncTask(streamConfig.getStreamProxy(), - leaseCoordinator.getLeaseManager(), - initialPosition, - cleanupLeasesUponShardCompletion, - 0L); - TaskResult result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call(); + TaskResult result = null; + if (!skipShardSyncAtWorkerInitializationIfLeasesExist + || leaseCoordinator.getLeaseManager().isLeaseTableEmpty()) { + LOG.info("Syncing Kinesis shard info"); + ShardSyncTask shardSyncTask = + new ShardSyncTask(streamConfig.getStreamProxy(), + leaseCoordinator.getLeaseManager(), + initialPosition, + cleanupLeasesUponShardCompletion, + 0L); + result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call(); + } else { + LOG.info("Skipping shard sync per config setting (and lease table is not empty)"); + } - if (result.getException() == null) { + if (result == null || result.getException() == null) { if (!leaseCoordinator.isRunning()) { LOG.info("Starting LeaseCoordinator"); leaseCoordinator.start(); @@ -567,7 +579,7 @@ public class Worker implements Runnable { return new ShardConsumer(shardInfo, streamConfig, checkpointTracker, recordProcessor, leaseCoordinator.getLeaseManager(), parentShardPollIntervalMillis, cleanupLeasesUponShardCompletion, - executorService, metricsFactory, taskBackoffTimeMillis); + executorService, metricsFactory, taskBackoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist); } @@ -991,7 +1003,9 @@ public class Worker implements Runnable { metricsFactory, config.getTaskBackoffTimeMillis(), config.getFailoverTimeMillis(), + config.getSkipShardSyncAtWorkerInitializationIfLeasesExist(), shardPrioritization); + } } diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCoordinator.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCoordinator.java index e356ac49..55c26674 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCoordinator.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCoordinator.java @@ -70,6 +70,7 @@ public class LeaseCoordinator { // Package level access for testing. static final int MAX_LEASE_RENEWAL_THREAD_COUNT = 20; + private final ILeaseRenewer leaseRenewer; private final ILeaseTaker leaseTaker; private final long renewerIntervalMillis; diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java index 3a4d408c..226756eb 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Amazon Software License (the "License"). * You may not use this file except in compliance with the License. @@ -214,6 +214,14 @@ public class LeaseManager implements ILeaseManager { return list(null); } + /** + * {@inheritDoc} + */ + @Override + public boolean isLeaseTableEmpty() throws DependencyException, InvalidStateException, ProvisionedThroughputException { + return list(1).isEmpty(); + } + /** * List with the given page size. Package access for integration testing. * diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/interfaces/ILeaseManager.java b/src/main/java/com/amazonaws/services/kinesis/leases/interfaces/ILeaseManager.java index 06d6d432..ab296cc1 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/interfaces/ILeaseManager.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/interfaces/ILeaseManager.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Amazon Software License (the "License"). * You may not use this file except in compliance with the License. @@ -180,4 +180,15 @@ public interface ILeaseManager { public boolean updateLease(T lease) throws DependencyException, InvalidStateException, ProvisionedThroughputException; + /** + * Check (synchronously) if there are any leases in the lease table. + * + * @return true if there are no leases in the lease table + * + * @throws DependencyException if DynamoDB scan fails in an unexpected way + * @throws InvalidStateException if lease table does not exist + * @throws ProvisionedThroughputException if DynamoDB scan fails due to lack of capacity + */ + public boolean isLeaseTableEmpty() throws DependencyException, InvalidStateException, ProvisionedThroughputException; + } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ExceptionThrowingLeaseManager.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ExceptionThrowingLeaseManager.java index 765aaa44..2a07d1ed 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ExceptionThrowingLeaseManager.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ExceptionThrowingLeaseManager.java @@ -212,4 +212,10 @@ class ExceptionThrowingLeaseManager implements ILeaseManager leaseManager.deleteAll(); } + @Override + public boolean isLeaseTableEmpty() throws DependencyException, + InvalidStateException, ProvisionedThroughputException { + return false; + } + } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java index f1d908f0..4d32566e 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java @@ -89,7 +89,8 @@ public class ProcessTaskTest { INITIAL_POSITION_LATEST); final ShardInfo shardInfo = new ShardInfo(shardId, null, null, null); processTask = new ProcessTask( - shardInfo, config, mockRecordProcessor, mockCheckpointer, mockDataFetcher, taskBackoffTimeMillis); + shardInfo, config, mockRecordProcessor, mockCheckpointer, mockDataFetcher, taskBackoffTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); } @Test diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java index 26337381..755d08a4 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java @@ -117,7 +117,8 @@ public class ShardConsumerTest { cleanupLeasesOfCompletedShards, executorService, metricsFactory, - taskBackoffTimeMillis); + taskBackoffTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // initialize @@ -167,7 +168,8 @@ public class ShardConsumerTest { cleanupLeasesOfCompletedShards, spyExecutorService, metricsFactory, - taskBackoffTimeMillis); + taskBackoffTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // initialize @@ -211,7 +213,8 @@ public class ShardConsumerTest { cleanupLeasesOfCompletedShards, executorService, metricsFactory, - taskBackoffTimeMillis); + taskBackoffTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); when(leaseManager.getLease(anyString())).thenReturn(null); when(checkpoint.getCheckpoint(anyString())).thenReturn(new ExtendedSequenceNumber("123")); @@ -300,7 +303,8 @@ public class ShardConsumerTest { cleanupLeasesOfCompletedShards, executorService, metricsFactory, - taskBackoffTimeMillis); + taskBackoffTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // check on parent shards @@ -390,7 +394,8 @@ public class ShardConsumerTest { cleanupLeasesOfCompletedShards, executorService, metricsFactory, - taskBackoffTimeMillis); + taskBackoffTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // check on parent shards diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index 0747f83d..53d89dc2 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -224,6 +224,7 @@ public class WorkerTest { nullMetricsFactory, taskBackoffTimeMillis, failoverTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, shardPrioritization); ShardInfo shardInfo = new ShardInfo(dummyKinesisShardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); ShardConsumer consumer = worker.createOrGetShardConsumer(shardInfo, streamletFactory); @@ -263,7 +264,7 @@ public class WorkerTest { Worker worker = new Worker(stageName, streamletFactory, streamConfig, INITIAL_POSITION_LATEST, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService, nullMetricsFactory, taskBackoffTimeMillis, failoverTimeMillis, - shardPrioritization); + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, shardPrioritization); Worker workerSpy = spy(worker); @@ -327,6 +328,7 @@ public class WorkerTest { nullMetricsFactory, taskBackoffTimeMillis, failoverTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, shardPrioritization); ShardInfo shardInfo1 = new ShardInfo(dummyKinesisShardId, concurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); @@ -380,6 +382,7 @@ public class WorkerTest { nullMetricsFactory, taskBackoffTimeMillis, failoverTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, shardPrioritization); worker.run(); Assert.assertTrue(count > 0); @@ -823,8 +826,9 @@ public class WorkerTest { metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, shardPrioritization); - + WorkerThread workerThread = new WorkerThread(worker); workerThread.start(); return workerThread;