diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockOnParentShardTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockOnParentShardTask.java index c9f9e613..2f001b2e 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockOnParentShardTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockOnParentShardTask.java @@ -46,8 +46,8 @@ public class BlockOnParentShardTask implements ITask { * @param parentShardPollIntervalMillis Sleep time if the parent shard has not completed processing */ public BlockOnParentShardTask(ShardInfo shardInfo, - ILeaseManager leaseManager, - long parentShardPollIntervalMillis) { + ILeaseManager leaseManager, + long parentShardPollIntervalMillis) { this.shardInfo = shardInfo; this.leaseManager = leaseManager; this.parentShardPollIntervalMillis = parentShardPollIntervalMillis; diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java index 7ee4ab27..0b4fa651 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java @@ -50,13 +50,13 @@ public class InitializeTask implements ITask { * Constructor. */ public InitializeTask(ShardInfo shardInfo, - IRecordProcessor recordProcessor, - ICheckpoint checkpoint, - RecordProcessorCheckpointer recordProcessorCheckpointer, - IDataFetcher dataFetcher, - long backoffTimeMillis, - StreamConfig streamConfig, - GetRecordsCache getRecordsCache) { + IRecordProcessor recordProcessor, + ICheckpoint checkpoint, + RecordProcessorCheckpointer recordProcessorCheckpointer, + IDataFetcher dataFetcher, + long backoffTimeMillis, + StreamConfig streamConfig, + GetRecordsCache getRecordsCache) { this.shardInfo = shardInfo; this.recordProcessor = recordProcessor; this.checkpoint = checkpoint; diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index 10441234..7aa2f885 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -20,7 +20,6 @@ import java.util.Optional; import java.util.Set; import com.amazonaws.services.dynamodbv2.model.BillingMode; -import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager; import org.apache.commons.lang3.Validate; import com.amazonaws.ClientConfiguration; @@ -92,7 +91,7 @@ public class KinesisClientLibConfiguration { public static final boolean DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION = true; /** - * Interval to run lease cleanup thread in {@link LeaseCleanupManager}. + * Interval to run lease cleanup thread in {@link com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager}. */ private static final long DEFAULT_LEASE_CLEANUP_INTERVAL_MILLIS = Duration.ofMinutes(1).toMillis(); @@ -628,7 +627,7 @@ public class KinesisClientLibConfiguration { * @param billingMode The DDB Billing mode to set for lease table creation. * @param recordsFetcherFactory Factory to create the records fetcher to retrieve data from Kinesis for a given shard. * @param leaseCleanupIntervalMillis Rate at which to run lease cleanup thread in - * {@link LeaseCleanupManager} + * {@link com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager} * @param completedLeaseCleanupThresholdMillis Threshold in millis at which to check if there are any completed leases * (leases for shards which have been closed as a result of a resharding operation) that need to be cleaned up. * @param garbageLeaseCleanupThresholdMillis Threshold in millis at which to check if there are any garbage leases @@ -927,7 +926,7 @@ public class KinesisClientLibConfiguration { } /** - * @return Interval in millis at which to run lease cleanup thread in {@link LeaseCleanupManager} + * @return Interval in millis at which to run lease cleanup thread in {@link com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager} */ public long leaseCleanupIntervalMillis() { return leaseCleanupIntervalMillis; @@ -1624,7 +1623,7 @@ public class KinesisClientLibConfiguration { /** * @param leaseCleanupIntervalMillis Rate at which to run lease cleanup thread in - * {@link LeaseCleanupManager} + * {@link com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager} * @return */ public KinesisClientLibConfiguration withLeaseCleanupIntervalMillis(long leaseCleanupIntervalMillis) { diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardConsumer.java index 6039ad3b..b8e227c4 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardConsumer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardConsumer.java @@ -14,20 +14,6 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; - -import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.BlockedOnParentShardException; -import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint; -import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; -import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; -import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager; -import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager; -import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; -import com.amazonaws.services.kinesis.model.ChildShard; -import com.amazonaws.util.CollectionUtils; -import com.google.common.annotations.VisibleForTesting; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - import java.util.List; import java.util.Optional; import java.util.concurrent.ExecutorService; @@ -35,6 +21,21 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; +import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager; +import com.amazonaws.services.kinesis.model.ChildShard; +import com.amazonaws.util.CollectionUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.BlockedOnParentShardException; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; +import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; +import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager; +import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; +import com.google.common.annotations.VisibleForTesting; +import lombok.Getter; + /** * Responsible for consuming data records of a (specified) shard. * The instance should be shutdown when we lose the primary responsibility for a shard. @@ -61,7 +62,7 @@ public class KinesisShardConsumer implements IShardConsumer{ private final long taskBackoffTimeMillis; private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist; - //@Getter + @Getter private final ShardSyncer shardSyncer; private ITask currentTask; @@ -69,28 +70,16 @@ public class KinesisShardConsumer implements IShardConsumer{ private Future future; private ShardSyncStrategy shardSyncStrategy; - //@Getter + @Getter private List childShards; - //@Getter + @Getter private final GetRecordsCache getRecordsCache; - public List getChildShards() { - return childShards; - } - - public GetRecordsCache getGetRecordsCache() { - return getRecordsCache; - } - - public ShardSyncer getShardSyncer() { - return shardSyncer; - } - private static final GetRecordsRetrievalStrategy makeStrategy(IDataFetcher dataFetcher, - Optional retryGetRecordsInSeconds, - Optional maxGetRecordsThreadPool, - ShardInfo shardInfo) { + Optional retryGetRecordsInSeconds, + Optional maxGetRecordsThreadPool, + ShardInfo shardInfo) { Optional getRecordsRetrievalStrategy = retryGetRecordsInSeconds.flatMap(retry -> maxGetRecordsThreadPool.map(max -> new AsynchronousGetRecordsRetrievalStrategy(dataFetcher, retry, max, shardInfo.getShardId()))); @@ -126,17 +115,17 @@ public class KinesisShardConsumer implements IShardConsumer{ // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES @Deprecated KinesisShardConsumer(ShardInfo shardInfo, - StreamConfig streamConfig, - ICheckpoint checkpoint, - IRecordProcessor recordProcessor, - KinesisClientLibLeaseCoordinator leaseCoordinator, - long parentShardPollIntervalMillis, - boolean cleanupLeasesOfCompletedShards, - ExecutorService executorService, - IMetricsFactory metricsFactory, - long backoffTimeMillis, - boolean skipShardSyncAtWorkerInitializationIfLeasesExist, - KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy) { + StreamConfig streamConfig, + ICheckpoint checkpoint, + IRecordProcessor recordProcessor, + KinesisClientLibLeaseCoordinator leaseCoordinator, + long parentShardPollIntervalMillis, + boolean cleanupLeasesOfCompletedShards, + ExecutorService executorService, + IMetricsFactory metricsFactory, + long backoffTimeMillis, + boolean skipShardSyncAtWorkerInitializationIfLeasesExist, + KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy) { this(shardInfo, streamConfig, @@ -172,19 +161,19 @@ public class KinesisShardConsumer implements IShardConsumer{ // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES @Deprecated KinesisShardConsumer(ShardInfo shardInfo, - StreamConfig streamConfig, - ICheckpoint checkpoint, - IRecordProcessor recordProcessor, - KinesisClientLibLeaseCoordinator leaseCoordinator, - long parentShardPollIntervalMillis, - boolean cleanupLeasesOfCompletedShards, - ExecutorService executorService, - IMetricsFactory metricsFactory, - long backoffTimeMillis, - boolean skipShardSyncAtWorkerInitializationIfLeasesExist, - Optional retryGetRecordsInSeconds, - Optional maxGetRecordsThreadPool, - KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy) { + StreamConfig streamConfig, + ICheckpoint checkpoint, + IRecordProcessor recordProcessor, + KinesisClientLibLeaseCoordinator leaseCoordinator, + long parentShardPollIntervalMillis, + boolean cleanupLeasesOfCompletedShards, + ExecutorService executorService, + IMetricsFactory metricsFactory, + long backoffTimeMillis, + boolean skipShardSyncAtWorkerInitializationIfLeasesExist, + Optional retryGetRecordsInSeconds, + Optional maxGetRecordsThreadPool, + KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy) { this( shardInfo, streamConfig, @@ -233,21 +222,21 @@ public class KinesisShardConsumer implements IShardConsumer{ */ @Deprecated KinesisShardConsumer(ShardInfo shardInfo, - StreamConfig streamConfig, - ICheckpoint checkpoint, - IRecordProcessor recordProcessor, - RecordProcessorCheckpointer recordProcessorCheckpointer, - KinesisClientLibLeaseCoordinator leaseCoordinator, - long parentShardPollIntervalMillis, - boolean cleanupLeasesOfCompletedShards, - ExecutorService executorService, - IMetricsFactory metricsFactory, - long backoffTimeMillis, - boolean skipShardSyncAtWorkerInitializationIfLeasesExist, - KinesisDataFetcher kinesisDataFetcher, - Optional retryGetRecordsInSeconds, - Optional maxGetRecordsThreadPool, - KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy) { + StreamConfig streamConfig, + ICheckpoint checkpoint, + IRecordProcessor recordProcessor, + RecordProcessorCheckpointer recordProcessorCheckpointer, + KinesisClientLibLeaseCoordinator leaseCoordinator, + long parentShardPollIntervalMillis, + boolean cleanupLeasesOfCompletedShards, + ExecutorService executorService, + IMetricsFactory metricsFactory, + long backoffTimeMillis, + boolean skipShardSyncAtWorkerInitializationIfLeasesExist, + KinesisDataFetcher kinesisDataFetcher, + Optional retryGetRecordsInSeconds, + Optional maxGetRecordsThreadPool, + KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy) { this(shardInfo, streamConfig, checkpoint, recordProcessor, recordProcessorCheckpointer, leaseCoordinator, parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, executorService, metricsFactory, @@ -279,22 +268,22 @@ public class KinesisShardConsumer implements IShardConsumer{ * @param leaseCleanupManager used to clean up leases in lease table. */ KinesisShardConsumer(ShardInfo shardInfo, - StreamConfig streamConfig, - ICheckpoint checkpoint, - IRecordProcessor recordProcessor, - RecordProcessorCheckpointer recordProcessorCheckpointer, - KinesisClientLibLeaseCoordinator leaseCoordinator, - long parentShardPollIntervalMillis, - boolean cleanupLeasesOfCompletedShards, - ExecutorService executorService, - IMetricsFactory metricsFactory, - long backoffTimeMillis, - boolean skipShardSyncAtWorkerInitializationIfLeasesExist, - KinesisDataFetcher kinesisDataFetcher, - Optional retryGetRecordsInSeconds, - Optional maxGetRecordsThreadPool, - KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy, - LeaseCleanupManager leaseCleanupManager) { + StreamConfig streamConfig, + ICheckpoint checkpoint, + IRecordProcessor recordProcessor, + RecordProcessorCheckpointer recordProcessorCheckpointer, + KinesisClientLibLeaseCoordinator leaseCoordinator, + long parentShardPollIntervalMillis, + boolean cleanupLeasesOfCompletedShards, + ExecutorService executorService, + IMetricsFactory metricsFactory, + long backoffTimeMillis, + boolean skipShardSyncAtWorkerInitializationIfLeasesExist, + KinesisDataFetcher kinesisDataFetcher, + Optional retryGetRecordsInSeconds, + Optional maxGetRecordsThreadPool, + KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy, + LeaseCleanupManager leaseCleanupManager) { this.shardInfo = shardInfo; this.streamConfig = streamConfig; this.checkpoint = checkpoint; @@ -382,10 +371,6 @@ public class KinesisShardConsumer implements IShardConsumer{ return skipShardSyncAtWorkerInitializationIfLeasesExist; } - /*public enum TaskOutcome { - SUCCESSFUL, END_OF_SHARD, NOT_COMPLETE, FAILURE, LEASE_NOT_FOUND - }*/ - private TaskOutcome determineTaskOutcome() { try { TaskResult result = future.get(); diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShutdownTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShutdownTask.java index 85c4aa7e..a444f26f 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShutdownTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShutdownTask.java @@ -72,18 +72,18 @@ public class KinesisShutdownTask implements ITask { */ // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES KinesisShutdownTask(ShardInfo shardInfo, - IRecordProcessor recordProcessor, - RecordProcessorCheckpointer recordProcessorCheckpointer, - ShutdownReason reason, - IKinesisProxy kinesisProxy, - InitialPositionInStreamExtended initialPositionInStream, - boolean cleanupLeasesOfCompletedShards, - boolean ignoreUnexpectedChildShards, - KinesisClientLibLeaseCoordinator leaseCoordinator, - long backoffTimeMillis, - GetRecordsCache getRecordsCache, ShardSyncer shardSyncer, - ShardSyncStrategy shardSyncStrategy, List childShards, - LeaseCleanupManager leaseCleanupManager) { + IRecordProcessor recordProcessor, + RecordProcessorCheckpointer recordProcessorCheckpointer, + ShutdownReason reason, + IKinesisProxy kinesisProxy, + InitialPositionInStreamExtended initialPositionInStream, + boolean cleanupLeasesOfCompletedShards, + boolean ignoreUnexpectedChildShards, + KinesisClientLibLeaseCoordinator leaseCoordinator, + long backoffTimeMillis, + GetRecordsCache getRecordsCache, ShardSyncer shardSyncer, + ShardSyncStrategy shardSyncStrategy, List childShards, + LeaseCleanupManager leaseCleanupManager) { this.shardInfo = shardInfo; this.recordProcessor = recordProcessor; this.recordProcessorCheckpointer = recordProcessorCheckpointer; diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncManager.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncManager.java index 4e0e8434..35b07b9d 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncManager.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncManager.java @@ -98,9 +98,9 @@ class PeriodicShardSyncManager { boolean isAuditorMode, long leasesRecoveryAuditorExecutionFrequencyMillis, int leasesRecoveryAuditorInconsistencyConfidenceThreshold) { - this(workerId, leaderDecider, shardSyncTask, Executors.newSingleThreadScheduledExecutor(), metricsFactory, - leaseManager, kinesisProxy, isAuditorMode, leasesRecoveryAuditorExecutionFrequencyMillis, - leasesRecoveryAuditorInconsistencyConfidenceThreshold); + this(workerId, leaderDecider, shardSyncTask, Executors.newSingleThreadScheduledExecutor(), metricsFactory, + leaseManager, kinesisProxy, isAuditorMode, leasesRecoveryAuditorExecutionFrequencyMillis, + leasesRecoveryAuditorInconsistencyConfidenceThreshold); } PeriodicShardSyncManager(String workerId, @@ -231,7 +231,7 @@ class PeriodicShardSyncManager { return new ShardSyncResponse(hasHoleWithHighConfidence, true, "Detected the same hole for " + hashRangeHoleTracker.getNumConsecutiveHoles() + " times. " + - "Will initiate shard sync after reaching threshold: " + leasesRecoveryAuditorInconsistencyConfidenceThreshold); + "Will initiate shard sync after reaching threshold: " + leasesRecoveryAuditorInconsistencyConfidenceThreshold); } else { // If hole is not present, clear any previous hole tracking and return false. hashRangeHoleTracker.reset(); @@ -296,7 +296,7 @@ class PeriodicShardSyncManager { final KinesisClientLease maxHashKeyLease = sortedLeasesWithHashKeyRanges.get(sortedLeasesWithHashKeyRanges.size() - 1); if (!minHashKeyLease.getHashKeyRange().startingHashKey().equals(MIN_HASH_KEY) || - !maxHashKeyLease.getHashKeyRange().endingHashKey().equals(MAX_HASH_KEY)) { + !maxHashKeyLease.getHashKeyRange().endingHashKey().equals(MAX_HASH_KEY)) { LOG.error("Incomplete hash range found between " + minHashKeyLease + " and " + maxHashKeyLease); return Optional.of(new HashRangeHole(minHashKeyLease.getHashKeyRange(), maxHashKeyLease.getHashKeyRange())); } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointer.java index 16e2b317..49c1b0e5 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointer.java @@ -63,9 +63,9 @@ public class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer * @param validator Used for validating sequence numbers */ public RecordProcessorCheckpointer(ShardInfo shardInfo, - ICheckpoint checkpoint, - SequenceNumberValidator validator, - IMetricsFactory metricsFactory) { + ICheckpoint checkpoint, + SequenceNumberValidator validator, + IMetricsFactory metricsFactory) { this.shardInfo = shardInfo; this.checkpoint = checkpoint; this.sequenceNumberValidator = validator; diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/StreamConfig.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/StreamConfig.java index 547f1d12..7ef5be9c 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/StreamConfig.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/StreamConfig.java @@ -38,11 +38,11 @@ public class StreamConfig { * @param initialPositionInStream Initial position in stream */ StreamConfig(IKinesisProxy proxy, - int maxRecords, - long idleTimeInMilliseconds, - boolean callProcessRecordsEvenForEmptyRecordList, - boolean validateSequenceNumberBeforeCheckpointing, - InitialPositionInStreamExtended initialPositionInStream) { + int maxRecords, + long idleTimeInMilliseconds, + boolean callProcessRecordsEvenForEmptyRecordList, + boolean validateSequenceNumberBeforeCheckpointing, + InitialPositionInStreamExtended initialPositionInStream) { this.streamProxy = proxy; this.maxRecords = maxRecords; this.idleTimeInMilliseconds = idleTimeInMilliseconds; diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 6a98962f..33faa996 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -473,11 +473,11 @@ public class Worker implements Runnable { // NOTE: This has package level access solely for testing // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, - StreamConfig streamConfig, InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis, - long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint, - KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, - IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, - boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) { + StreamConfig streamConfig, InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis, + long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint, + KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, + IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, + boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) { this(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream, parentShardPollIntervalMillis, shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, @@ -530,13 +530,13 @@ public class Worker implements Runnable { // NOTE: This has package level access solely for testing // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, StreamConfig streamConfig, - InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis, - long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint, - KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, - IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, - boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization, - Optional retryGetRecordsInSeconds, Optional maxGetRecordsThreadPool, WorkerStateChangeListener workerStateChangeListener, - LeaseCleanupValidator leaseCleanupValidator, LeaderDecider leaderDecider, PeriodicShardSyncManager periodicShardSyncManager) { + InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis, + long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint, + KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, + IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, + boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization, + Optional retryGetRecordsInSeconds, Optional maxGetRecordsThreadPool, WorkerStateChangeListener workerStateChangeListener, + LeaseCleanupValidator leaseCleanupValidator, LeaderDecider leaderDecider, PeriodicShardSyncManager periodicShardSyncManager) { this(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream, parentShardPollIntervalMillis, shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, @@ -546,14 +546,14 @@ public class Worker implements Runnable { } Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, - StreamConfig streamConfig, InitialPositionInStreamExtended initialPositionInStream, - long parentShardPollIntervalMillis, long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, - ICheckpoint checkpoint, KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, - IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, - boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization, - Optional retryGetRecordsInSeconds, Optional maxGetRecordsThreadPool, - WorkerStateChangeListener workerStateChangeListener, ShardSyncer shardSyncer, LeaderDecider leaderDecider, - PeriodicShardSyncManager periodicShardSyncManager, IShardConsumerFactory shardConsumerFactory) { + StreamConfig streamConfig, InitialPositionInStreamExtended initialPositionInStream, + long parentShardPollIntervalMillis, long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, + ICheckpoint checkpoint, KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, + IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, + boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization, + Optional retryGetRecordsInSeconds, Optional maxGetRecordsThreadPool, + WorkerStateChangeListener workerStateChangeListener, ShardSyncer shardSyncer, LeaderDecider leaderDecider, + PeriodicShardSyncManager periodicShardSyncManager, IShardConsumerFactory shardConsumerFactory) { this.applicationName = applicationName; this.recordProcessorFactory = recordProcessorFactory; this.config = config; @@ -606,7 +606,7 @@ public class Worker implements Runnable { default: if (leaderDecider != null) { LOG.warn("LeaderDecider cannot be customized with non-PERIODIC shard sync strategy type. Using " + - "default LeaderDecider."); + "default LeaderDecider."); } this.leaderDecider = getOrCreateLeaderDecider(null); this.leaderElectedPeriodicShardSyncManager = @@ -618,7 +618,7 @@ public class Worker implements Runnable { } private static KinesisClientLibLeaseCoordinator getLeaseCoordinator(KinesisClientLibConfiguration config, - AmazonDynamoDB dynamoDBClient, IMetricsFactory metricsFactory) { + AmazonDynamoDB dynamoDBClient, IMetricsFactory metricsFactory) { return new KinesisClientLibLeaseCoordinator( new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient, config.getBillingMode()), DEFAULT_LEASE_SELECTOR, config.getWorkerIdentifier(), config.getFailoverTimeMillis(), config.getEpsilonMillis(), @@ -1233,7 +1233,7 @@ public class Worker implements Runnable { * @return Returns metrics factory based on the config. */ public static IMetricsFactory getMetricsFactory(AmazonCloudWatch cloudWatchClient, - KinesisClientLibConfiguration config) { + KinesisClientLibConfiguration config) { IMetricsFactory metricsFactory; if (config.getMetricsLevel() == MetricsLevel.NONE) { metricsFactory = new NullMetricsFactory(); @@ -1287,27 +1287,27 @@ public class Worker implements Runnable { /** A non-null PeriodicShardSyncManager can only provided from unit tests. Any application code will create the * PeriodicShardSyncManager for the first time here. */ private PeriodicShardSyncManager getOrCreatePeriodicShardSyncManager(PeriodicShardSyncManager periodicShardSyncManager, - boolean isAuditorMode) { + boolean isAuditorMode) { if (periodicShardSyncManager != null) { return periodicShardSyncManager; } return new PeriodicShardSyncManager(config.getWorkerIdentifier(), - leaderDecider, - new ShardSyncTask(streamConfig.getStreamProxy(), - leaseCoordinator.getLeaseManager(), - config.getInitialPositionInStreamExtended(), - config.shouldCleanupLeasesUponShardCompletion(), - config.shouldIgnoreUnexpectedChildShards(), - SHARD_SYNC_SLEEP_FOR_PERIODIC_SHARD_SYNC, - shardSyncer, - null), - metricsFactory, - leaseCoordinator.getLeaseManager(), - streamConfig.getStreamProxy(), - isAuditorMode, - config.getLeasesRecoveryAuditorExecutionFrequencyMillis(), - config.getLeasesRecoveryAuditorInconsistencyConfidenceThreshold()); + leaderDecider, + new ShardSyncTask(streamConfig.getStreamProxy(), + leaseCoordinator.getLeaseManager(), + config.getInitialPositionInStreamExtended(), + config.shouldCleanupLeasesUponShardCompletion(), + config.shouldIgnoreUnexpectedChildShards(), + SHARD_SYNC_SLEEP_FOR_PERIODIC_SHARD_SYNC, + shardSyncer, + null), + metricsFactory, + leaseCoordinator.getLeaseManager(), + streamConfig.getStreamProxy(), + isAuditorMode, + config.getLeasesRecoveryAuditorExecutionFrequencyMillis(), + config.getLeasesRecoveryAuditorInconsistencyConfidenceThreshold()); } /** @@ -1317,7 +1317,7 @@ public class Worker implements Runnable { static class WorkerCWMetricsFactory extends CWMetricsFactory { WorkerCWMetricsFactory(AmazonCloudWatch cloudWatchClient, String namespace, long bufferTimeMillis, - int maxQueueSize, MetricsLevel metricsLevel, Set metricsEnabledDimensions) { + int maxQueueSize, MetricsLevel metricsLevel, Set metricsEnabledDimensions) { super(cloudWatchClient, namespace, bufferTimeMillis, maxQueueSize, metricsLevel, metricsEnabledDimensions); } } @@ -1524,7 +1524,7 @@ public class Worker implements Runnable { if (leaderDecider == null) { leaderDecider = new DeterministicShuffleShardSyncLeaderDecider(leaseManager, - Executors.newSingleThreadScheduledExecutor(), PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT); + Executors.newSingleThreadScheduledExecutor(), PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT); } return new Worker(config.getApplicationName(), recordProcessorFactory, @@ -1564,10 +1564,10 @@ public class Worker implements Runnable { } > R createClient(final T builder, - final AWSCredentialsProvider credentialsProvider, - final ClientConfiguration clientConfiguration, - final String endpointUrl, - final String region) { + final AWSCredentialsProvider credentialsProvider, + final ClientConfiguration clientConfiguration, + final String endpointUrl, + final String region) { if (credentialsProvider != null) { builder.withCredentials(credentialsProvider); } diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCleanupManager.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCleanupManager.java index dfb44cf8..ab254961 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCleanupManager.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCleanupManager.java @@ -114,7 +114,7 @@ public class LeaseCleanupManager { completedLeaseStopwatch.start(); garbageLeaseStopwatch.start(); deletionThreadPool.scheduleAtFixedRate(new LeaseCleanupThread(), INITIAL_DELAY, leaseCleanupIntervalMillis, - TimeUnit.MILLISECONDS); + TimeUnit.MILLISECONDS); isRunning = true; } else { LOG.info("Lease cleanup thread already running, no need to start."); @@ -241,7 +241,7 @@ public class LeaseCleanupManager { if (!cleanedUpCompletedLease && !alreadyCheckedForGarbageCollection && timeToCheckForGarbageShard) { // throws ResourceNotFoundException wereChildShardsPresent = !CollectionUtils - .isNullOrEmpty(getChildShardsFromService(shardInfo)); + .isNullOrEmpty(getChildShardsFromService(shardInfo)); } } catch (ResourceNotFoundException e) { wasResourceNotFound = true; @@ -296,7 +296,7 @@ public class LeaseCleanupManager { for (String childShardLeaseKey : childShardLeaseKeys) { final KinesisClientLease childShardLease = Optional.ofNullable( - leaseManager.getLease(childShardLeaseKey)) + leaseManager.getLease(childShardLeaseKey)) .orElseThrow(() -> new IllegalStateException( "Child lease " + childShardLeaseKey + " for completed shard not found in " + "lease table - not cleaning up lease " + lease));