diff --git a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java index c092dab8..c3371c3d 100644 --- a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java +++ b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java @@ -531,7 +531,7 @@ class ConsumerStates { consumer.getLeaseManager(), consumer.getTaskBackoffTimeMillis(), consumer.getGetRecordsCache(), - consumer.isSuppressMissingIncompleteLeasesException()); // change here + consumer.isSuppressMissingIncompleteLeasesException()); } @Override diff --git a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java index bbe6ef0b..0675dff9 100644 --- a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java +++ b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java @@ -404,7 +404,7 @@ class ShardConsumer { * @return Return next task to run */ private ITask getNextTask() { - ITask nextTask = currentState.createTask(this); // change here (add property into consumer) + ITask nextTask = currentState.createTask(this); if (nextTask == null) { return null; diff --git a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTask.java b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTask.java index bbc1f181..678bf9a1 100644 --- a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTask.java +++ b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTask.java @@ -54,7 +54,7 @@ class ShardSyncTask implements ITask { boolean cleanupLeasesUponShardCompletion, boolean ignoreUnexpectedChildShards, long shardSyncTaskIdleTimeMillis, - boolean suppressMissingIncompleteLeasesException) { // change here + boolean suppressMissingIncompleteLeasesException) { this.kinesisProxy = kinesisProxy; this.leaseManager = leaseManager; this.initialPosition = initialPositionInStream; @@ -77,7 +77,7 @@ class ShardSyncTask implements ITask { initialPosition, cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, - suppressMissingIncompleteLeasesException); // change here + suppressMissingIncompleteLeasesException); if (shardSyncTaskIdleTimeMillis > 0) { Thread.sleep(shardSyncTaskIdleTimeMillis); } diff --git a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java index 0003640c..d960c19f 100644 --- a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java +++ b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java @@ -109,7 +109,7 @@ class ShardSyncTaskManager { cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, shardSyncIdleTimeMillis, - suppressMissingIncompleteLeasesException), metricsFactory); // change here + suppressMissingIncompleteLeasesException), metricsFactory); future = executorService.submit(currentTask); submittedNewTask = true; if (LOG.isDebugEnabled()) { diff --git a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java index 7b5c1597..b3ddd2d9 100644 --- a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java +++ b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java @@ -88,7 +88,7 @@ class ShardSyncer { boolean cleanupLeasesOfCompletedShards, boolean ignoreUnexpectedChildShards, boolean suppressMissingIncompleteLeasesException) - throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { // change here + throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, suppressMissingIncompleteLeasesException); } @@ -152,7 +152,7 @@ class ShardSyncer { trackedLeases.addAll(currentLeases); } trackedLeases.addAll(newLeasesToCreate); - cleanupGarbageLeases(shards, trackedLeases, kinesisProxy, leaseManager, suppressMissingIncompleteLeasesException); // change here + cleanupGarbageLeases(shards, trackedLeases, kinesisProxy, leaseManager, suppressMissingIncompleteLeasesException); if (cleanupLeasesOfCompletedShards) { cleanupLeasesOfFinishedShards(currentLeases, shardIdToShardMap, @@ -629,7 +629,7 @@ class ShardSyncer { for (KinesisClientLease lease : garbageLeases) { if (isCandidateForCleanup(lease, currentKinesisShardIds)) { - if (lease.isComplete() || suppressMissingIncompleteLeasesException) { // change here + if (lease.isComplete() || suppressMissingIncompleteLeasesException) { LOG.info("Deleting lease for a complete shard " + lease.getLeaseKey() + " as it is not present in Kinesis stream."); leaseManager.deleteLease(lease); diff --git a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java index 5ce6fa20..886ac00a 100644 --- a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java +++ b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java @@ -65,7 +65,7 @@ class ShutdownTask implements ITask { ILeaseManager leaseManager, long backoffTimeMillis, GetRecordsCache getRecordsCache, - boolean isSuppressMissingIncompleteLeasesException) { // change here + boolean isSuppressMissingIncompleteLeasesException) { this.shardInfo = shardInfo; this.recordProcessor = recordProcessor; this.recordProcessorCheckpointer = recordProcessorCheckpointer; @@ -135,7 +135,7 @@ class ShutdownTask implements ITask { initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, - isSuppressMissingIncompleteLeasesException); // change here + isSuppressMissingIncompleteLeasesException); LOG.debug("Finished checking for child shards of shard " + shardInfo.getShardId()); } diff --git a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index bab754a9..37b75ddf 100644 --- a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -487,7 +487,7 @@ public class Worker implements Runnable { this.metricsFactory = metricsFactory; this.controlServer = new ShardSyncTaskManager(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(), initialPositionInStream, cleanupLeasesUponShardCompletion, config.shouldIgnoreUnexpectedChildShards(), - shardSyncIdleTimeMillis, metricsFactory, executorService, suppressMissingIncompleteLeasesException); // change here + shardSyncIdleTimeMillis, metricsFactory, executorService, suppressMissingIncompleteLeasesException); this.taskBackoffTimeMillis = taskBackoffTimeMillis; this.failoverTimeMillis = failoverTimeMillis; this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist; @@ -640,7 +640,7 @@ public class Worker implements Runnable { LOG.info("Syncing Kinesis shard info"); ShardSyncTask shardSyncTask = new ShardSyncTask(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(), initialPosition, cleanupLeasesUponShardCompletion, - config.shouldIgnoreUnexpectedChildShards(), 0L, suppressMissingIncompleteLeasesException); // change here + config.shouldIgnoreUnexpectedChildShards(), 0L, suppressMissingIncompleteLeasesException); result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call(); } else { LOG.info("Skipping shard sync per config setting (and lease table is not empty)"); @@ -1007,7 +1007,7 @@ public class Worker implements Runnable { retryGetRecordsInSeconds, maxGetRecordsThreadPool, config, - suppressMissingIncompleteLeasesException); // change here + suppressMissingIncompleteLeasesException); }