Allow Disabling Shard Sync at Startup (#102)

Allow disabling the shard sync at startup if the lease table already
contains leases.  This will reduce the startup load for larger streams
when restarted the Kinesis application.
This commit is contained in:
Justin Pfifer 2016-10-14 08:11:32 -07:00 committed by GitHub
parent 5c497d87a9
commit 56c59f685a
11 changed files with 121 additions and 28 deletions

View file

@ -154,6 +154,13 @@ public class KinesisClientLibConfiguration {
*/
public static final int DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY = 10;
/*
* The Worker will skip shard sync during initialization if there are one or more leases in the lease table.
* This assumes that the shards and leases are in-sync.
* This enables customers to choose faster startup times (e.g. during incremental deployments of an application).
*/
public static final boolean DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST = false;
/**
* Default Shard prioritization strategy.
*/
@ -191,6 +198,8 @@ public class KinesisClientLibConfiguration {
private int initialLeaseTableReadCapacity;
private int initialLeaseTableWriteCapacity;
private InitialPositionInStreamExtended initialPositionInStreamExtended;
// This is useful for optimizing deployments to large fleets working on a stable stream.
private boolean skipShardSyncAtWorkerInitializationIfLeasesExist;
private ShardPrioritization shardPrioritization;
/**
@ -338,6 +347,7 @@ public class KinesisClientLibConfiguration {
this.initialLeaseTableWriteCapacity = DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY;
this.initialPositionInStreamExtended =
InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream);
this.skipShardSyncAtWorkerInitializationIfLeasesExist = DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST;
this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION;
}
@ -561,6 +571,13 @@ public class KinesisClientLibConfiguration {
return regionName;
}
/**
* @return true if Worker should skip syncing shards and leases at startup if leases are present
*/
public boolean getSkipShardSyncAtWorkerInitializationIfLeasesExist() {
return skipShardSyncAtWorkerInitializationIfLeasesExist;
}
/**
* @return Max leases this Worker can handle at a time
*/
@ -862,6 +879,21 @@ public class KinesisClientLibConfiguration {
return this;
}
/**
* If set to true, the Worker will not sync shards and leases during initialization if there are one or more leases
* in the lease table. This assumes that the shards and leases are in-sync.
* This enables customers to choose faster startup times (e.g. during incremental deployments of an application).
*
* @param skipShardSyncAtStartupIfLeasesExist Should Worker skip syncing shards and leases at startup (Worker
* initialization).
* @return KinesisClientLibConfiguration
*/
public KinesisClientLibConfiguration withSkipShardSyncAtStartupIfLeasesExist(
boolean skipShardSyncAtStartupIfLeasesExist) {
this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtStartupIfLeasesExist;
return this;
}
/**
*
* @param regionName The region name for the service

View file

@ -73,7 +73,8 @@ class ProcessTask implements ITask {
IRecordProcessor recordProcessor,
RecordProcessorCheckpointer recordProcessorCheckpointer,
KinesisDataFetcher dataFetcher,
long backoffTimeMillis) {
long backoffTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist) {
super();
this.shardInfo = shardInfo;
this.recordProcessor = recordProcessor;
@ -82,13 +83,19 @@ class ProcessTask implements ITask {
this.streamConfig = streamConfig;
this.backoffTimeMillis = backoffTimeMillis;
IKinesisProxy kinesisProxy = this.streamConfig.getStreamProxy();
if (kinesisProxy instanceof IKinesisProxyExtended) {
// If skipShardSyncAtWorkerInitializationIfLeasesExist is set, we will not get the shard for
// this ProcessTask. In this case, duplicate KPL user records in the event of resharding will
// not be dropped during deaggregation of Amazon Kinesis records. This is only applicable if
// KPL is used for ingestion and KPL's aggregation feature is used.
if (!skipShardSyncAtWorkerInitializationIfLeasesExist && kinesisProxy instanceof IKinesisProxyExtended) {
this.shard = ((IKinesisProxyExtended) kinesisProxy).getShard(this.shardInfo.getShardId());
} else {
this.shard = null;
}
if (this.shard == null && !skipShardSyncAtWorkerInitializationIfLeasesExist) {
LOG.warn("Cannot get the shard for this ProcessTask, so duplicate KPL user records "
+ "in the event of resharding will not be dropped during deaggregation of Amazon "
+ "Kinesis records.");
this.shard = null;
}
}

View file

@ -58,6 +58,7 @@ class ShardConsumer {
private final long parentShardPollIntervalMillis;
private final boolean cleanupLeasesOfCompletedShards;
private final long taskBackoffTimeMillis;
private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist;
private ITask currentTask;
private long currentTaskSubmitTime;
@ -96,7 +97,8 @@ class ShardConsumer {
boolean cleanupLeasesOfCompletedShards,
ExecutorService executorService,
IMetricsFactory metricsFactory,
long backoffTimeMillis) {
long backoffTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist) {
this.streamConfig = streamConfig;
this.recordProcessor = recordProcessor;
this.executorService = executorService;
@ -114,6 +116,7 @@ class ShardConsumer {
this.parentShardPollIntervalMillis = parentShardPollIntervalMillis;
this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards;
this.taskBackoffTimeMillis = backoffTimeMillis;
this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist;
}
/**
@ -262,7 +265,8 @@ class ShardConsumer {
recordProcessor,
recordProcessorCheckpointer,
dataFetcher,
taskBackoffTimeMillis);
taskBackoffTimeMillis,
skipShardSyncAtWorkerInitializationIfLeasesExist);
break;
case SHUTTING_DOWN:
nextTask =

View file

@ -92,6 +92,8 @@ public class Worker implements Runnable {
new ConcurrentHashMap<ShardInfo, ShardConsumer>();
private final boolean cleanupLeasesUponShardCompletion;
private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist;
/**
* Constructor.
*
@ -235,7 +237,9 @@ public class Worker implements Runnable {
metricsFactory,
config.getTaskBackoffTimeMillis(),
config.getFailoverTimeMillis(),
config.getSkipShardSyncAtWorkerInitializationIfLeasesExist(),
config.getShardPrioritizationStrategy());
// If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB.
if (config.getRegionName() != null) {
Region region = RegionUtils.getRegion(config.getRegionName());
@ -292,6 +296,7 @@ public class Worker implements Runnable {
IMetricsFactory metricsFactory,
long taskBackoffTimeMillis,
long failoverTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
ShardPrioritization shardPrioritization) {
this.applicationName = applicationName;
this.recordProcessorFactory = recordProcessorFactory;
@ -314,6 +319,7 @@ public class Worker implements Runnable {
executorService);
this.taskBackoffTimeMillis = taskBackoffTimeMillis;
this.failoverTimeMillis = failoverTimeMillis;
this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist;
this.shardPrioritization = shardPrioritization;
}
@ -395,6 +401,9 @@ public class Worker implements Runnable {
LOG.info("Initializing LeaseCoordinator");
leaseCoordinator.initialize();
TaskResult result = null;
if (!skipShardSyncAtWorkerInitializationIfLeasesExist
|| leaseCoordinator.getLeaseManager().isLeaseTableEmpty()) {
LOG.info("Syncing Kinesis shard info");
ShardSyncTask shardSyncTask =
new ShardSyncTask(streamConfig.getStreamProxy(),
@ -402,9 +411,12 @@ public class Worker implements Runnable {
initialPosition,
cleanupLeasesUponShardCompletion,
0L);
TaskResult result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call();
result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call();
} else {
LOG.info("Skipping shard sync per config setting (and lease table is not empty)");
}
if (result.getException() == null) {
if (result == null || result.getException() == null) {
if (!leaseCoordinator.isRunning()) {
LOG.info("Starting LeaseCoordinator");
leaseCoordinator.start();
@ -567,7 +579,7 @@ public class Worker implements Runnable {
return new ShardConsumer(shardInfo, streamConfig, checkpointTracker, recordProcessor,
leaseCoordinator.getLeaseManager(), parentShardPollIntervalMillis, cleanupLeasesUponShardCompletion,
executorService, metricsFactory, taskBackoffTimeMillis);
executorService, metricsFactory, taskBackoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist);
}
@ -991,7 +1003,9 @@ public class Worker implements Runnable {
metricsFactory,
config.getTaskBackoffTimeMillis(),
config.getFailoverTimeMillis(),
config.getSkipShardSyncAtWorkerInitializationIfLeasesExist(),
shardPrioritization);
}
}

View file

@ -70,6 +70,7 @@ public class LeaseCoordinator<T extends Lease> {
// Package level access for testing.
static final int MAX_LEASE_RENEWAL_THREAD_COUNT = 20;
private final ILeaseRenewer<T> leaseRenewer;
private final ILeaseTaker<T> leaseTaker;
private final long renewerIntervalMillis;

View file

@ -1,5 +1,5 @@
/*
* Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@ -214,6 +214,14 @@ public class LeaseManager<T extends Lease> implements ILeaseManager<T> {
return list(null);
}
/**
* {@inheritDoc}
*/
@Override
public boolean isLeaseTableEmpty() throws DependencyException, InvalidStateException, ProvisionedThroughputException {
return list(1).isEmpty();
}
/**
* List with the given page size. Package access for integration testing.
*

View file

@ -1,5 +1,5 @@
/*
* Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@ -180,4 +180,15 @@ public interface ILeaseManager<T extends Lease> {
public boolean updateLease(T lease)
throws DependencyException, InvalidStateException, ProvisionedThroughputException;
/**
* Check (synchronously) if there are any leases in the lease table.
*
* @return true if there are no leases in the lease table
*
* @throws DependencyException if DynamoDB scan fails in an unexpected way
* @throws InvalidStateException if lease table does not exist
* @throws ProvisionedThroughputException if DynamoDB scan fails due to lack of capacity
*/
public boolean isLeaseTableEmpty() throws DependencyException, InvalidStateException, ProvisionedThroughputException;
}

View file

@ -212,4 +212,10 @@ class ExceptionThrowingLeaseManager implements ILeaseManager<KinesisClientLease>
leaseManager.deleteAll();
}
@Override
public boolean isLeaseTableEmpty() throws DependencyException,
InvalidStateException, ProvisionedThroughputException {
return false;
}
}

View file

@ -89,7 +89,8 @@ public class ProcessTaskTest {
INITIAL_POSITION_LATEST);
final ShardInfo shardInfo = new ShardInfo(shardId, null, null, null);
processTask = new ProcessTask(
shardInfo, config, mockRecordProcessor, mockCheckpointer, mockDataFetcher, taskBackoffTimeMillis);
shardInfo, config, mockRecordProcessor, mockCheckpointer, mockDataFetcher, taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST);
}
@Test

View file

@ -117,7 +117,8 @@ public class ShardConsumerTest {
cleanupLeasesOfCompletedShards,
executorService,
metricsFactory,
taskBackoffTimeMillis);
taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST);
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // initialize
@ -167,7 +168,8 @@ public class ShardConsumerTest {
cleanupLeasesOfCompletedShards,
spyExecutorService,
metricsFactory,
taskBackoffTimeMillis);
taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST);
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // initialize
@ -211,7 +213,8 @@ public class ShardConsumerTest {
cleanupLeasesOfCompletedShards,
executorService,
metricsFactory,
taskBackoffTimeMillis);
taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST);
when(leaseManager.getLease(anyString())).thenReturn(null);
when(checkpoint.getCheckpoint(anyString())).thenReturn(new ExtendedSequenceNumber("123"));
@ -300,7 +303,8 @@ public class ShardConsumerTest {
cleanupLeasesOfCompletedShards,
executorService,
metricsFactory,
taskBackoffTimeMillis);
taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST);
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // check on parent shards
@ -390,7 +394,8 @@ public class ShardConsumerTest {
cleanupLeasesOfCompletedShards,
executorService,
metricsFactory,
taskBackoffTimeMillis);
taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST);
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // check on parent shards

View file

@ -224,6 +224,7 @@ public class WorkerTest {
nullMetricsFactory,
taskBackoffTimeMillis,
failoverTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
shardPrioritization);
ShardInfo shardInfo = new ShardInfo(dummyKinesisShardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON);
ShardConsumer consumer = worker.createOrGetShardConsumer(shardInfo, streamletFactory);
@ -263,7 +264,7 @@ public class WorkerTest {
Worker worker = new Worker(stageName, streamletFactory, streamConfig, INITIAL_POSITION_LATEST,
parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, checkpoint,
leaseCoordinator, execService, nullMetricsFactory, taskBackoffTimeMillis, failoverTimeMillis,
shardPrioritization);
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, shardPrioritization);
Worker workerSpy = spy(worker);
@ -327,6 +328,7 @@ public class WorkerTest {
nullMetricsFactory,
taskBackoffTimeMillis,
failoverTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
shardPrioritization);
ShardInfo shardInfo1 = new ShardInfo(dummyKinesisShardId, concurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON);
@ -380,6 +382,7 @@ public class WorkerTest {
nullMetricsFactory,
taskBackoffTimeMillis,
failoverTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
shardPrioritization);
worker.run();
Assert.assertTrue(count > 0);
@ -823,6 +826,7 @@ public class WorkerTest {
metricsFactory,
taskBackoffTimeMillis,
failoverTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
shardPrioritization);
WorkerThread workerThread = new WorkerThread(worker);