From a9cc9bfa6fa4529417e769100d3794fb0c6388cc Mon Sep 17 00:00:00 2001 From: Shiva Vanamala Date: Mon, 9 Aug 2021 08:12:18 -0700 Subject: [PATCH] feature(dynamo): Provide option to suppress exceptions associated with missing data --- .../lib/worker/ConsumerStates.java | 3 +- .../lib/worker/ShardConsumer.java | 21 +++++-- .../lib/worker/ShardSyncTask.java | 9 ++- .../lib/worker/ShardSyncTaskManager.java | 27 +++++---- .../clientlibrary/lib/worker/ShardSyncer.java | 24 ++++---- .../lib/worker/ShutdownTask.java | 8 ++- .../clientlibrary/lib/worker/Worker.java | 56 +++++++++++++++++-- .../lib/worker/ShardConsumerTest.java | 15 +++-- .../worker/ShardSyncTaskIntegrationTest.java | 3 +- .../lib/worker/ShardSyncerTest.java | 2 +- .../lib/worker/ShutdownTaskTest.java | 8 ++- 11 files changed, 131 insertions(+), 45 deletions(-) diff --git a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java index f16f1d81..c092dab8 100644 --- a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java +++ b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java @@ -530,7 +530,8 @@ class ConsumerStates { consumer.isIgnoreUnexpectedChildShards(), consumer.getLeaseManager(), consumer.getTaskBackoffTimeMillis(), - consumer.getGetRecordsCache()); + consumer.getGetRecordsCache(), + consumer.isSuppressMissingIncompleteLeasesException()); // change here } @Override diff --git a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java index 59254143..bbe6ef0b 100644 --- a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java +++ b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java @@ -63,6 +63,8 @@ class ShardConsumer { private final GetRecordsCache getRecordsCache; + private final boolean suppressMissingIncompleteLeasesException; + private static final GetRecordsRetrievalStrategy makeStrategy(KinesisDataFetcher dataFetcher, Optional retryGetRecordsInSeconds, Optional maxGetRecordsThreadPool, @@ -124,7 +126,8 @@ class ShardConsumer { skipShardSyncAtWorkerInitializationIfLeasesExist, Optional.empty(), Optional.empty(), - config); + config, + Worker.DEFAULT_SUPPRESS_MISSING_INCOMPLETE_LEASES_EXCEPTION); } /** @@ -155,7 +158,8 @@ class ShardConsumer { boolean skipShardSyncAtWorkerInitializationIfLeasesExist, Optional retryGetRecordsInSeconds, Optional maxGetRecordsThreadPool, - KinesisClientLibConfiguration config) { + KinesisClientLibConfiguration config, + boolean suppressMissingIncompleteLeasesException) { this( shardInfo, @@ -180,7 +184,8 @@ class ShardConsumer { new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo), retryGetRecordsInSeconds, maxGetRecordsThreadPool, - config + config, + suppressMissingIncompleteLeasesException ); } @@ -217,7 +222,8 @@ class ShardConsumer { KinesisDataFetcher kinesisDataFetcher, Optional retryGetRecordsInSeconds, Optional maxGetRecordsThreadPool, - KinesisClientLibConfiguration config) { + KinesisClientLibConfiguration config, + boolean suppressMissingIncompleteLeasesException) { this.shardInfo = shardInfo; this.streamConfig = streamConfig; this.checkpoint = checkpoint; @@ -235,6 +241,7 @@ class ShardConsumer { this.getRecordsCache = config.getRecordsFetcherFactory().createRecordsFetcher( makeStrategy(this.dataFetcher, retryGetRecordsInSeconds, maxGetRecordsThreadPool, this.shardInfo), this.getShardInfo().getShardId(), this.metricsFactory, this.config.getMaxRecords()); + this.suppressMissingIncompleteLeasesException = suppressMissingIncompleteLeasesException; } /** @@ -397,7 +404,7 @@ class ShardConsumer { * @return Return next task to run */ private ITask getNextTask() { - ITask nextTask = currentState.createTask(this); + ITask nextTask = currentState.createTask(this); // change here (add property into consumer) if (nextTask == null) { return null; @@ -503,4 +510,8 @@ class ShardConsumer { ShutdownNotification getShutdownNotification() { return shutdownNotification; } + + public boolean isSuppressMissingIncompleteLeasesException() { + return suppressMissingIncompleteLeasesException; + } } diff --git a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTask.java b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTask.java index 1af33a7f..bbc1f181 100644 --- a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTask.java +++ b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTask.java @@ -39,6 +39,8 @@ class ShardSyncTask implements ITask { private final long shardSyncTaskIdleTimeMillis; private final TaskType taskType = TaskType.SHARDSYNC; + private final boolean suppressMissingIncompleteLeasesException; + /** * @param kinesisProxy Used to fetch information about the stream (e.g. shard list) * @param leaseManager Used to fetch and create leases @@ -51,13 +53,15 @@ class ShardSyncTask implements ITask { InitialPositionInStreamExtended initialPositionInStream, boolean cleanupLeasesUponShardCompletion, boolean ignoreUnexpectedChildShards, - long shardSyncTaskIdleTimeMillis) { + long shardSyncTaskIdleTimeMillis, + boolean suppressMissingIncompleteLeasesException) { // change here this.kinesisProxy = kinesisProxy; this.leaseManager = leaseManager; this.initialPosition = initialPositionInStream; this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion; this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards; this.shardSyncTaskIdleTimeMillis = shardSyncTaskIdleTimeMillis; + this.suppressMissingIncompleteLeasesException = suppressMissingIncompleteLeasesException; } /* (non-Javadoc) @@ -72,7 +76,8 @@ class ShardSyncTask implements ITask { leaseManager, initialPosition, cleanupLeasesUponShardCompletion, - ignoreUnexpectedChildShards); + ignoreUnexpectedChildShards, + suppressMissingIncompleteLeasesException); // change here if (shardSyncTaskIdleTimeMillis > 0) { Thread.sleep(shardSyncTaskIdleTimeMillis); } diff --git a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java index a00c41e6..0003640c 100644 --- a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java +++ b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java @@ -47,28 +47,31 @@ class ShardSyncTaskManager { private boolean ignoreUnexpectedChildShards; private final long shardSyncIdleTimeMillis; + private final boolean suppressMissingIncompleteLeasesException; + /** * Constructor. - * - * @param kinesisProxy Proxy used to fetch streamInfo (shards) + * @param kinesisProxy Proxy used to fetch streamInfo (shards) * @param leaseManager Lease manager (used to list and create leases for shards) * @param initialPositionInStream Initial position in stream * @param cleanupLeasesUponShardCompletion Clean up leases for shards that we've finished processing (don't wait - * until they expire) +* until they expire) * @param ignoreUnexpectedChildShards Ignore child shards with open parents * @param shardSyncIdleTimeMillis Time between tasks to sync leases and Kinesis shards * @param metricsFactory Metrics factory * @param executorService ExecutorService to execute the shard sync tasks + * @param suppressMissingIncompleteLeasesException */ ShardSyncTaskManager(final IKinesisProxy kinesisProxy, - final ILeaseManager leaseManager, - final InitialPositionInStreamExtended initialPositionInStream, - final boolean cleanupLeasesUponShardCompletion, - final boolean ignoreUnexpectedChildShards, - final long shardSyncIdleTimeMillis, - final IMetricsFactory metricsFactory, - ExecutorService executorService) { + final ILeaseManager leaseManager, + final InitialPositionInStreamExtended initialPositionInStream, + final boolean cleanupLeasesUponShardCompletion, + final boolean ignoreUnexpectedChildShards, + final long shardSyncIdleTimeMillis, + final IMetricsFactory metricsFactory, + ExecutorService executorService, + boolean suppressMissingIncompleteLeasesException) { this.kinesisProxy = kinesisProxy; this.leaseManager = leaseManager; this.metricsFactory = metricsFactory; @@ -77,6 +80,7 @@ class ShardSyncTaskManager { this.shardSyncIdleTimeMillis = shardSyncIdleTimeMillis; this.executorService = executorService; this.initialPositionInStream = initialPositionInStream; + this.suppressMissingIncompleteLeasesException = suppressMissingIncompleteLeasesException; } synchronized boolean syncShardAndLeaseInfo(Set closedShardIds) { @@ -104,7 +108,8 @@ class ShardSyncTaskManager { initialPositionInStream, cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, - shardSyncIdleTimeMillis), metricsFactory); + shardSyncIdleTimeMillis, + suppressMissingIncompleteLeasesException), metricsFactory); // change here future = executorService.submit(currentTask); submittedNewTask = true; if (LOG.isDebugEnabled()) { diff --git a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java index 59a9a505..7b5c1597 100644 --- a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java +++ b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java @@ -64,9 +64,9 @@ class ShardSyncer { InitialPositionInStreamExtended initialPositionInStream, boolean cleanupLeasesOfCompletedShards, boolean ignoreUnexpectedChildShards) - throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { + throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { // ignore syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards, - ignoreUnexpectedChildShards); + ignoreUnexpectedChildShards, Worker.DEFAULT_SUPPRESS_MISSING_INCOMPLETE_LEASES_EXCEPTION); } /** @@ -86,9 +86,10 @@ class ShardSyncer { ILeaseManager leaseManager, InitialPositionInStreamExtended initialPositionInStream, boolean cleanupLeasesOfCompletedShards, - boolean ignoreUnexpectedChildShards) - throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { - syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards); + boolean ignoreUnexpectedChildShards, + boolean suppressMissingIncompleteLeasesException) + throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { // change here + syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, suppressMissingIncompleteLeasesException); } static synchronized void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy, @@ -96,7 +97,7 @@ class ShardSyncer { InitialPositionInStreamExtended initialPositionInStream, boolean cleanupLeasesOfCompletedShards) throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { - checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards, false); + checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards, false, Worker.DEFAULT_SUPPRESS_MISSING_INCOMPLETE_LEASES_EXCEPTION); } /** @@ -117,7 +118,8 @@ class ShardSyncer { ILeaseManager leaseManager, InitialPositionInStreamExtended initialPosition, boolean cleanupLeasesOfCompletedShards, - boolean ignoreUnexpectedChildShards) + boolean ignoreUnexpectedChildShards, + boolean suppressMissingIncompleteLeasesException) throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { List shards = getShardList(kinesisProxy); LOG.debug("Num shards: " + shards.size()); @@ -150,7 +152,7 @@ class ShardSyncer { trackedLeases.addAll(currentLeases); } trackedLeases.addAll(newLeasesToCreate); - cleanupGarbageLeases(shards, trackedLeases, kinesisProxy, leaseManager); + cleanupGarbageLeases(shards, trackedLeases, kinesisProxy, leaseManager, suppressMissingIncompleteLeasesException); // change here if (cleanupLeasesOfCompletedShards) { cleanupLeasesOfFinishedShards(currentLeases, shardIdToShardMap, @@ -597,7 +599,9 @@ class ShardSyncer { private static void cleanupGarbageLeases(List shards, List trackedLeases, IKinesisProxy kinesisProxy, - ILeaseManager leaseManager) + ILeaseManager leaseManager, + boolean suppressMissingIncompleteLeasesException) + throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException { Set kinesisShards = new HashSet<>(); for (Shard shard : shards) { @@ -625,7 +629,7 @@ class ShardSyncer { for (KinesisClientLease lease : garbageLeases) { if (isCandidateForCleanup(lease, currentKinesisShardIds)) { - if (lease.isComplete()) { + if (lease.isComplete() || suppressMissingIncompleteLeasesException) { // change here LOG.info("Deleting lease for a complete shard " + lease.getLeaseKey() + " as it is not present in Kinesis stream."); leaseManager.deleteLease(lease); diff --git a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java index b0d1d182..5ce6fa20 100644 --- a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java +++ b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java @@ -48,6 +48,7 @@ class ShutdownTask implements ITask { private final TaskType taskType = TaskType.SHUTDOWN; private final long backoffTimeMillis; private final GetRecordsCache getRecordsCache; + private final boolean isSuppressMissingIncompleteLeasesException; /** * Constructor. @@ -63,7 +64,8 @@ class ShutdownTask implements ITask { boolean ignoreUnexpectedChildShards, ILeaseManager leaseManager, long backoffTimeMillis, - GetRecordsCache getRecordsCache) { + GetRecordsCache getRecordsCache, + boolean isSuppressMissingIncompleteLeasesException) { // change here this.shardInfo = shardInfo; this.recordProcessor = recordProcessor; this.recordProcessorCheckpointer = recordProcessorCheckpointer; @@ -75,6 +77,7 @@ class ShutdownTask implements ITask { this.leaseManager = leaseManager; this.backoffTimeMillis = backoffTimeMillis; this.getRecordsCache = getRecordsCache; + this.isSuppressMissingIncompleteLeasesException = isSuppressMissingIncompleteLeasesException; } /* @@ -131,7 +134,8 @@ class ShutdownTask implements ITask { leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards, - ignoreUnexpectedChildShards); + ignoreUnexpectedChildShards, + isSuppressMissingIncompleteLeasesException); // change here LOG.debug("Finished checking for child shards of shard " + shardInfo.getShardId()); } diff --git a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 83af9e2c..bab754a9 100644 --- a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -81,6 +81,7 @@ public class Worker implements Runnable { private static final int MAX_RETRIES = 4; private static final WorkerStateChangeListener DEFAULT_WORKER_STATE_CHANGE_LISTENER = new NoOpWorkerStateChangeListener(); private static final boolean DEFAULT_EXITS_ON_FAILURE = false; + public static final boolean DEFAULT_SUPPRESS_MISSING_INCOMPLETE_LEASES_EXCEPTION = false; private WorkerLog wlog = new WorkerLog(); @@ -123,6 +124,8 @@ public class Worker implements Runnable { // fivetran configurables private final boolean exitOnFailure; + private final boolean suppressMissingIncompleteLeasesException; + /** * Used to ensure that only one requestedShutdown is in progress at a time. @@ -434,6 +437,42 @@ public class Worker implements Runnable { boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization, Optional retryGetRecordsInSeconds, Optional maxGetRecordsThreadPool, WorkerStateChangeListener workerStateChangeListener, boolean exitOnFailure) { + this(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream, parentShardPollIntervalMillis, + shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService, + metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, + shardPrioritization, retryGetRecordsInSeconds, maxGetRecordsThreadPool, workerStateChangeListener, exitOnFailure, DEFAULT_SUPPRESS_MISSING_INCOMPLETE_LEASES_EXCEPTION); + } + + /** + * @param applicationName Name of the Kinesis application + * @param recordProcessorFactory Used to get record processor instances for processing data from shards + * @param config Kinesis Library Configuration + * @param streamConfig Stream configuration + * @param initialPositionInStream One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. The KinesisClientLibrary will start fetching data from + * this location in the stream when an application starts up for the first time and there are no + * checkpoints. If there are checkpoints, we start from the checkpoint position. + * @param parentShardPollIntervalMillis Wait for this long between polls to check if parent shards are done + * @param shardSyncIdleTimeMillis Time between tasks to sync leases and Kinesis shards + * @param cleanupLeasesUponShardCompletion Clean up shards we've finished processing (don't wait till they expire in Kinesis) + * @param checkpoint Used to get/set checkpoints + * @param leaseCoordinator Lease coordinator (coordinates currently owned leases) + * @param execService ExecutorService to use for processing records (support for multi-threaded consumption) + * @param metricsFactory Metrics factory used to emit metrics + * @param taskBackoffTimeMillis Backoff period when tasks encounter an exception + * @param shardPrioritization Provides prioritization logic to decide which available shards process first + * @param retryGetRecordsInSeconds Time in seconds to wait before the worker retries to get a record. + * @param maxGetRecordsThreadPool Max number of threads in the getRecords thread pool. + */ + // NOTE: This has package level access solely for testing + // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES + Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, StreamConfig streamConfig, + InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis, + long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint, + KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, + IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, + boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization, + Optional retryGetRecordsInSeconds, Optional maxGetRecordsThreadPool, WorkerStateChangeListener workerStateChangeListener, + boolean exitOnFailure, boolean suppressMissingIncompleteLeasesException) { this.applicationName = applicationName; this.recordProcessorFactory = recordProcessorFactory; this.config = config; @@ -448,7 +487,7 @@ public class Worker implements Runnable { this.metricsFactory = metricsFactory; this.controlServer = new ShardSyncTaskManager(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(), initialPositionInStream, cleanupLeasesUponShardCompletion, config.shouldIgnoreUnexpectedChildShards(), - shardSyncIdleTimeMillis, metricsFactory, executorService); + shardSyncIdleTimeMillis, metricsFactory, executorService, suppressMissingIncompleteLeasesException); // change here this.taskBackoffTimeMillis = taskBackoffTimeMillis; this.failoverTimeMillis = failoverTimeMillis; this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist; @@ -457,6 +496,7 @@ public class Worker implements Runnable { this.maxGetRecordsThreadPool = maxGetRecordsThreadPool; this.workerStateChangeListener = workerStateChangeListener; this.exitOnFailure = exitOnFailure; + this.suppressMissingIncompleteLeasesException = suppressMissingIncompleteLeasesException; workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.CREATED); } @@ -600,7 +640,7 @@ public class Worker implements Runnable { LOG.info("Syncing Kinesis shard info"); ShardSyncTask shardSyncTask = new ShardSyncTask(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(), initialPosition, cleanupLeasesUponShardCompletion, - config.shouldIgnoreUnexpectedChildShards(), 0L); + config.shouldIgnoreUnexpectedChildShards(), 0L, suppressMissingIncompleteLeasesException); // change here result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call(); } else { LOG.info("Skipping shard sync per config setting (and lease table is not empty)"); @@ -966,7 +1006,8 @@ public class Worker implements Runnable { skipShardSyncAtWorkerInitializationIfLeasesExist, retryGetRecordsInSeconds, maxGetRecordsThreadPool, - config); + config, + suppressMissingIncompleteLeasesException); // change here } @@ -1120,6 +1161,7 @@ public class Worker implements Runnable { // fivetran additions boolean exitOnFailure = DEFAULT_EXITS_ON_FAILURE; + private boolean suppressMissingIncompleteLeasesException = DEFAULT_SUPPRESS_MISSING_INCOMPLETE_LEASES_EXCEPTION; @VisibleForTesting AmazonKinesis getKinesisClient() { @@ -1265,7 +1307,8 @@ public class Worker implements Runnable { config.getRetryGetRecordsInSeconds(), config.getMaxGetRecordsThreadPool(), workerStateChangeListener, - exitOnFailure); + exitOnFailure, + suppressMissingIncompleteLeasesException); } > R createClient(final T builder, @@ -1303,6 +1346,11 @@ public class Worker implements Runnable { return this; } + public Builder suppressMissingIncompleteLeasesException(boolean suppressMissingIncompleteLeasesException) { + this.suppressMissingIncompleteLeasesException = suppressMissingIncompleteLeasesException; + return this; + } + public Builder dynamoDBClient(AmazonDynamoDB dynamoDBClient) { this.dynamoDBClient = dynamoDBClient; return this; diff --git a/src/test/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java b/src/test/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java index 0b369f6a..1d5911d6 100644 --- a/src/test/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java +++ b/src/test/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java @@ -370,7 +370,8 @@ public class ShardConsumerTest { dataFetcher, Optional.empty(), Optional.empty(), - config); + config, + false); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // check on parent shards @@ -514,7 +515,8 @@ public class ShardConsumerTest { dataFetcher, Optional.empty(), Optional.empty(), - config); + config, + false); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // check on parent shards @@ -651,7 +653,8 @@ public class ShardConsumerTest { dataFetcher, Optional.empty(), Optional.empty(), - config); + config, + false); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // check on parent shards @@ -774,7 +777,8 @@ public class ShardConsumerTest { KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, Optional.empty(), Optional.empty(), - config); + config, + false); assertEquals(shardConsumer.getGetRecordsCache().getGetRecordsRetrievalStrategy().getClass(), SynchronousGetRecordsRetrievalStrategy.class); @@ -804,7 +808,8 @@ public class ShardConsumerTest { KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, Optional.of(1), Optional.of(2), - config); + config, + false); assertEquals(shardConsumer.getGetRecordsCache().getGetRecordsRetrievalStrategy().getClass(), AsynchronousGetRecordsRetrievalStrategy.class); diff --git a/src/test/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java b/src/test/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java index 34f7da09..148a9f29 100644 --- a/src/test/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java +++ b/src/test/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java @@ -125,7 +125,8 @@ public class ShardSyncTaskIntegrationTest { InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST), false, false, - 0L); + 0L, + false); syncTask.call(); List leases = leaseManager.listLeases(); Set leaseKeys = new HashSet(); diff --git a/src/test/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java b/src/test/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java index 5ff63ab1..d231c9d4 100644 --- a/src/test/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java +++ b/src/test/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java @@ -350,7 +350,7 @@ public class ShardSyncerTest { dataFile.deleteOnExit(); IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath()); ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, INITIAL_POSITION_LATEST, - cleanupLeasesOfCompletedShards, true); + cleanupLeasesOfCompletedShards, true, false); List newLeases = leaseManager.listLeases(); Set expectedLeaseShardIds = new HashSet(); expectedLeaseShardIds.add("shardId-4"); diff --git a/src/test/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java b/src/test/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java index a6c6cbaf..9d0ad6bc 100644 --- a/src/test/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java +++ b/src/test/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java @@ -111,7 +111,8 @@ public class ShutdownTaskTest { ignoreUnexpectedChildShards, leaseManager, TASK_BACKOFF_TIME_MILLIS, - getRecordsCache); + getRecordsCache, + false); TaskResult result = task.call(); Assert.assertNotNull(result.getException()); Assert.assertTrue(result.getException() instanceof IllegalArgumentException); @@ -139,7 +140,8 @@ public class ShutdownTaskTest { ignoreUnexpectedChildShards, leaseManager, TASK_BACKOFF_TIME_MILLIS, - getRecordsCache); + getRecordsCache, + false); TaskResult result = task.call(); Assert.assertNotNull(result.getException()); Assert.assertTrue(result.getException() instanceof KinesisClientLibIOException); @@ -151,7 +153,7 @@ public class ShutdownTaskTest { */ @Test public final void testGetTaskType() { - ShutdownTask task = new ShutdownTask(null, null, null, null, null, null, false, false, null, 0, getRecordsCache); + ShutdownTask task = new ShutdownTask(null, null, null, null, null, null, false, false, null, 0, getRecordsCache, false); Assert.assertEquals(TaskType.SHUTDOWN, task.getTaskType()); }