Removed Automatic Indents

This commit is contained in:
Nicholas Gutierrez 2022-08-10 11:53:03 -07:00
parent 03c78fd15e
commit 366d275447
10 changed files with 162 additions and 178 deletions

View file

@ -46,8 +46,8 @@ public class BlockOnParentShardTask implements ITask {
* @param parentShardPollIntervalMillis Sleep time if the parent shard has not completed processing * @param parentShardPollIntervalMillis Sleep time if the parent shard has not completed processing
*/ */
public BlockOnParentShardTask(ShardInfo shardInfo, public BlockOnParentShardTask(ShardInfo shardInfo,
ILeaseManager<KinesisClientLease> leaseManager, ILeaseManager<KinesisClientLease> leaseManager,
long parentShardPollIntervalMillis) { long parentShardPollIntervalMillis) {
this.shardInfo = shardInfo; this.shardInfo = shardInfo;
this.leaseManager = leaseManager; this.leaseManager = leaseManager;
this.parentShardPollIntervalMillis = parentShardPollIntervalMillis; this.parentShardPollIntervalMillis = parentShardPollIntervalMillis;

View file

@ -50,13 +50,13 @@ public class InitializeTask implements ITask {
* Constructor. * Constructor.
*/ */
public InitializeTask(ShardInfo shardInfo, public InitializeTask(ShardInfo shardInfo,
IRecordProcessor recordProcessor, IRecordProcessor recordProcessor,
ICheckpoint checkpoint, ICheckpoint checkpoint,
RecordProcessorCheckpointer recordProcessorCheckpointer, RecordProcessorCheckpointer recordProcessorCheckpointer,
IDataFetcher dataFetcher, IDataFetcher dataFetcher,
long backoffTimeMillis, long backoffTimeMillis,
StreamConfig streamConfig, StreamConfig streamConfig,
GetRecordsCache getRecordsCache) { GetRecordsCache getRecordsCache) {
this.shardInfo = shardInfo; this.shardInfo = shardInfo;
this.recordProcessor = recordProcessor; this.recordProcessor = recordProcessor;
this.checkpoint = checkpoint; this.checkpoint = checkpoint;

View file

@ -20,7 +20,6 @@ import java.util.Optional;
import java.util.Set; import java.util.Set;
import com.amazonaws.services.dynamodbv2.model.BillingMode; import com.amazonaws.services.dynamodbv2.model.BillingMode;
import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager;
import org.apache.commons.lang3.Validate; import org.apache.commons.lang3.Validate;
import com.amazonaws.ClientConfiguration; import com.amazonaws.ClientConfiguration;
@ -92,7 +91,7 @@ public class KinesisClientLibConfiguration {
public static final boolean DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION = true; public static final boolean DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION = true;
/** /**
* Interval to run lease cleanup thread in {@link LeaseCleanupManager}. * Interval to run lease cleanup thread in {@link com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager}.
*/ */
private static final long DEFAULT_LEASE_CLEANUP_INTERVAL_MILLIS = Duration.ofMinutes(1).toMillis(); private static final long DEFAULT_LEASE_CLEANUP_INTERVAL_MILLIS = Duration.ofMinutes(1).toMillis();
@ -628,7 +627,7 @@ public class KinesisClientLibConfiguration {
* @param billingMode The DDB Billing mode to set for lease table creation. * @param billingMode The DDB Billing mode to set for lease table creation.
* @param recordsFetcherFactory Factory to create the records fetcher to retrieve data from Kinesis for a given shard. * @param recordsFetcherFactory Factory to create the records fetcher to retrieve data from Kinesis for a given shard.
* @param leaseCleanupIntervalMillis Rate at which to run lease cleanup thread in * @param leaseCleanupIntervalMillis Rate at which to run lease cleanup thread in
* {@link LeaseCleanupManager} * {@link com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager}
* @param completedLeaseCleanupThresholdMillis Threshold in millis at which to check if there are any completed leases * @param completedLeaseCleanupThresholdMillis Threshold in millis at which to check if there are any completed leases
* (leases for shards which have been closed as a result of a resharding operation) that need to be cleaned up. * (leases for shards which have been closed as a result of a resharding operation) that need to be cleaned up.
* @param garbageLeaseCleanupThresholdMillis Threshold in millis at which to check if there are any garbage leases * @param garbageLeaseCleanupThresholdMillis Threshold in millis at which to check if there are any garbage leases
@ -927,7 +926,7 @@ public class KinesisClientLibConfiguration {
} }
/** /**
* @return Interval in millis at which to run lease cleanup thread in {@link LeaseCleanupManager} * @return Interval in millis at which to run lease cleanup thread in {@link com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager}
*/ */
public long leaseCleanupIntervalMillis() { public long leaseCleanupIntervalMillis() {
return leaseCleanupIntervalMillis; return leaseCleanupIntervalMillis;
@ -1624,7 +1623,7 @@ public class KinesisClientLibConfiguration {
/** /**
* @param leaseCleanupIntervalMillis Rate at which to run lease cleanup thread in * @param leaseCleanupIntervalMillis Rate at which to run lease cleanup thread in
* {@link LeaseCleanupManager} * {@link com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager}
* @return * @return
*/ */
public KinesisClientLibConfiguration withLeaseCleanupIntervalMillis(long leaseCleanupIntervalMillis) { public KinesisClientLibConfiguration withLeaseCleanupIntervalMillis(long leaseCleanupIntervalMillis) {

View file

@ -14,20 +14,6 @@
*/ */
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.BlockedOnParentShardException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
import com.amazonaws.services.kinesis.model.ChildShard;
import com.amazonaws.util.CollectionUtils;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
@ -35,6 +21,21 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager;
import com.amazonaws.services.kinesis.model.ChildShard;
import com.amazonaws.util.CollectionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.BlockedOnParentShardException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
import com.google.common.annotations.VisibleForTesting;
import lombok.Getter;
/** /**
* Responsible for consuming data records of a (specified) shard. * Responsible for consuming data records of a (specified) shard.
* The instance should be shutdown when we lose the primary responsibility for a shard. * The instance should be shutdown when we lose the primary responsibility for a shard.
@ -61,7 +62,7 @@ public class KinesisShardConsumer implements IShardConsumer{
private final long taskBackoffTimeMillis; private final long taskBackoffTimeMillis;
private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist; private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist;
//@Getter @Getter
private final ShardSyncer shardSyncer; private final ShardSyncer shardSyncer;
private ITask currentTask; private ITask currentTask;
@ -69,28 +70,16 @@ public class KinesisShardConsumer implements IShardConsumer{
private Future<TaskResult> future; private Future<TaskResult> future;
private ShardSyncStrategy shardSyncStrategy; private ShardSyncStrategy shardSyncStrategy;
//@Getter @Getter
private List<ChildShard> childShards; private List<ChildShard> childShards;
//@Getter @Getter
private final GetRecordsCache getRecordsCache; private final GetRecordsCache getRecordsCache;
public List<ChildShard> getChildShards() {
return childShards;
}
public GetRecordsCache getGetRecordsCache() {
return getRecordsCache;
}
public ShardSyncer getShardSyncer() {
return shardSyncer;
}
private static final GetRecordsRetrievalStrategy makeStrategy(IDataFetcher dataFetcher, private static final GetRecordsRetrievalStrategy makeStrategy(IDataFetcher dataFetcher,
Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> retryGetRecordsInSeconds,
Optional<Integer> maxGetRecordsThreadPool, Optional<Integer> maxGetRecordsThreadPool,
ShardInfo shardInfo) { ShardInfo shardInfo) {
Optional<GetRecordsRetrievalStrategy> getRecordsRetrievalStrategy = retryGetRecordsInSeconds.flatMap(retry -> Optional<GetRecordsRetrievalStrategy> getRecordsRetrievalStrategy = retryGetRecordsInSeconds.flatMap(retry ->
maxGetRecordsThreadPool.map(max -> maxGetRecordsThreadPool.map(max ->
new AsynchronousGetRecordsRetrievalStrategy(dataFetcher, retry, max, shardInfo.getShardId()))); new AsynchronousGetRecordsRetrievalStrategy(dataFetcher, retry, max, shardInfo.getShardId())));
@ -126,17 +115,17 @@ public class KinesisShardConsumer implements IShardConsumer{
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES
@Deprecated @Deprecated
KinesisShardConsumer(ShardInfo shardInfo, KinesisShardConsumer(ShardInfo shardInfo,
StreamConfig streamConfig, StreamConfig streamConfig,
ICheckpoint checkpoint, ICheckpoint checkpoint,
IRecordProcessor recordProcessor, IRecordProcessor recordProcessor,
KinesisClientLibLeaseCoordinator leaseCoordinator, KinesisClientLibLeaseCoordinator leaseCoordinator,
long parentShardPollIntervalMillis, long parentShardPollIntervalMillis,
boolean cleanupLeasesOfCompletedShards, boolean cleanupLeasesOfCompletedShards,
ExecutorService executorService, ExecutorService executorService,
IMetricsFactory metricsFactory, IMetricsFactory metricsFactory,
long backoffTimeMillis, long backoffTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy) { KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy) {
this(shardInfo, this(shardInfo,
streamConfig, streamConfig,
@ -172,19 +161,19 @@ public class KinesisShardConsumer implements IShardConsumer{
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES
@Deprecated @Deprecated
KinesisShardConsumer(ShardInfo shardInfo, KinesisShardConsumer(ShardInfo shardInfo,
StreamConfig streamConfig, StreamConfig streamConfig,
ICheckpoint checkpoint, ICheckpoint checkpoint,
IRecordProcessor recordProcessor, IRecordProcessor recordProcessor,
KinesisClientLibLeaseCoordinator leaseCoordinator, KinesisClientLibLeaseCoordinator leaseCoordinator,
long parentShardPollIntervalMillis, long parentShardPollIntervalMillis,
boolean cleanupLeasesOfCompletedShards, boolean cleanupLeasesOfCompletedShards,
ExecutorService executorService, ExecutorService executorService,
IMetricsFactory metricsFactory, IMetricsFactory metricsFactory,
long backoffTimeMillis, long backoffTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> retryGetRecordsInSeconds,
Optional<Integer> maxGetRecordsThreadPool, Optional<Integer> maxGetRecordsThreadPool,
KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy) { KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy) {
this( this(
shardInfo, shardInfo,
streamConfig, streamConfig,
@ -233,21 +222,21 @@ public class KinesisShardConsumer implements IShardConsumer{
*/ */
@Deprecated @Deprecated
KinesisShardConsumer(ShardInfo shardInfo, KinesisShardConsumer(ShardInfo shardInfo,
StreamConfig streamConfig, StreamConfig streamConfig,
ICheckpoint checkpoint, ICheckpoint checkpoint,
IRecordProcessor recordProcessor, IRecordProcessor recordProcessor,
RecordProcessorCheckpointer recordProcessorCheckpointer, RecordProcessorCheckpointer recordProcessorCheckpointer,
KinesisClientLibLeaseCoordinator leaseCoordinator, KinesisClientLibLeaseCoordinator leaseCoordinator,
long parentShardPollIntervalMillis, long parentShardPollIntervalMillis,
boolean cleanupLeasesOfCompletedShards, boolean cleanupLeasesOfCompletedShards,
ExecutorService executorService, ExecutorService executorService,
IMetricsFactory metricsFactory, IMetricsFactory metricsFactory,
long backoffTimeMillis, long backoffTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
KinesisDataFetcher kinesisDataFetcher, KinesisDataFetcher kinesisDataFetcher,
Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> retryGetRecordsInSeconds,
Optional<Integer> maxGetRecordsThreadPool, Optional<Integer> maxGetRecordsThreadPool,
KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy) { KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy) {
this(shardInfo, streamConfig, checkpoint, recordProcessor, recordProcessorCheckpointer, leaseCoordinator, this(shardInfo, streamConfig, checkpoint, recordProcessor, recordProcessorCheckpointer, leaseCoordinator,
parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, executorService, metricsFactory, parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, executorService, metricsFactory,
@ -279,22 +268,22 @@ public class KinesisShardConsumer implements IShardConsumer{
* @param leaseCleanupManager used to clean up leases in lease table. * @param leaseCleanupManager used to clean up leases in lease table.
*/ */
KinesisShardConsumer(ShardInfo shardInfo, KinesisShardConsumer(ShardInfo shardInfo,
StreamConfig streamConfig, StreamConfig streamConfig,
ICheckpoint checkpoint, ICheckpoint checkpoint,
IRecordProcessor recordProcessor, IRecordProcessor recordProcessor,
RecordProcessorCheckpointer recordProcessorCheckpointer, RecordProcessorCheckpointer recordProcessorCheckpointer,
KinesisClientLibLeaseCoordinator leaseCoordinator, KinesisClientLibLeaseCoordinator leaseCoordinator,
long parentShardPollIntervalMillis, long parentShardPollIntervalMillis,
boolean cleanupLeasesOfCompletedShards, boolean cleanupLeasesOfCompletedShards,
ExecutorService executorService, ExecutorService executorService,
IMetricsFactory metricsFactory, IMetricsFactory metricsFactory,
long backoffTimeMillis, long backoffTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
KinesisDataFetcher kinesisDataFetcher, KinesisDataFetcher kinesisDataFetcher,
Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> retryGetRecordsInSeconds,
Optional<Integer> maxGetRecordsThreadPool, Optional<Integer> maxGetRecordsThreadPool,
KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy, KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy,
LeaseCleanupManager leaseCleanupManager) { LeaseCleanupManager leaseCleanupManager) {
this.shardInfo = shardInfo; this.shardInfo = shardInfo;
this.streamConfig = streamConfig; this.streamConfig = streamConfig;
this.checkpoint = checkpoint; this.checkpoint = checkpoint;
@ -382,10 +371,6 @@ public class KinesisShardConsumer implements IShardConsumer{
return skipShardSyncAtWorkerInitializationIfLeasesExist; return skipShardSyncAtWorkerInitializationIfLeasesExist;
} }
/*public enum TaskOutcome {
SUCCESSFUL, END_OF_SHARD, NOT_COMPLETE, FAILURE, LEASE_NOT_FOUND
}*/
private TaskOutcome determineTaskOutcome() { private TaskOutcome determineTaskOutcome() {
try { try {
TaskResult result = future.get(); TaskResult result = future.get();

View file

@ -72,18 +72,18 @@ public class KinesisShutdownTask implements ITask {
*/ */
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES
KinesisShutdownTask(ShardInfo shardInfo, KinesisShutdownTask(ShardInfo shardInfo,
IRecordProcessor recordProcessor, IRecordProcessor recordProcessor,
RecordProcessorCheckpointer recordProcessorCheckpointer, RecordProcessorCheckpointer recordProcessorCheckpointer,
ShutdownReason reason, ShutdownReason reason,
IKinesisProxy kinesisProxy, IKinesisProxy kinesisProxy,
InitialPositionInStreamExtended initialPositionInStream, InitialPositionInStreamExtended initialPositionInStream,
boolean cleanupLeasesOfCompletedShards, boolean cleanupLeasesOfCompletedShards,
boolean ignoreUnexpectedChildShards, boolean ignoreUnexpectedChildShards,
KinesisClientLibLeaseCoordinator leaseCoordinator, KinesisClientLibLeaseCoordinator leaseCoordinator,
long backoffTimeMillis, long backoffTimeMillis,
GetRecordsCache getRecordsCache, ShardSyncer shardSyncer, GetRecordsCache getRecordsCache, ShardSyncer shardSyncer,
ShardSyncStrategy shardSyncStrategy, List<ChildShard> childShards, ShardSyncStrategy shardSyncStrategy, List<ChildShard> childShards,
LeaseCleanupManager leaseCleanupManager) { LeaseCleanupManager leaseCleanupManager) {
this.shardInfo = shardInfo; this.shardInfo = shardInfo;
this.recordProcessor = recordProcessor; this.recordProcessor = recordProcessor;
this.recordProcessorCheckpointer = recordProcessorCheckpointer; this.recordProcessorCheckpointer = recordProcessorCheckpointer;

View file

@ -98,9 +98,9 @@ class PeriodicShardSyncManager {
boolean isAuditorMode, boolean isAuditorMode,
long leasesRecoveryAuditorExecutionFrequencyMillis, long leasesRecoveryAuditorExecutionFrequencyMillis,
int leasesRecoveryAuditorInconsistencyConfidenceThreshold) { int leasesRecoveryAuditorInconsistencyConfidenceThreshold) {
this(workerId, leaderDecider, shardSyncTask, Executors.newSingleThreadScheduledExecutor(), metricsFactory, this(workerId, leaderDecider, shardSyncTask, Executors.newSingleThreadScheduledExecutor(), metricsFactory,
leaseManager, kinesisProxy, isAuditorMode, leasesRecoveryAuditorExecutionFrequencyMillis, leaseManager, kinesisProxy, isAuditorMode, leasesRecoveryAuditorExecutionFrequencyMillis,
leasesRecoveryAuditorInconsistencyConfidenceThreshold); leasesRecoveryAuditorInconsistencyConfidenceThreshold);
} }
PeriodicShardSyncManager(String workerId, PeriodicShardSyncManager(String workerId,
@ -231,7 +231,7 @@ class PeriodicShardSyncManager {
return new ShardSyncResponse(hasHoleWithHighConfidence, true, return new ShardSyncResponse(hasHoleWithHighConfidence, true,
"Detected the same hole for " + hashRangeHoleTracker.getNumConsecutiveHoles() + " times. " + "Detected the same hole for " + hashRangeHoleTracker.getNumConsecutiveHoles() + " times. " +
"Will initiate shard sync after reaching threshold: " + leasesRecoveryAuditorInconsistencyConfidenceThreshold); "Will initiate shard sync after reaching threshold: " + leasesRecoveryAuditorInconsistencyConfidenceThreshold);
} else { } else {
// If hole is not present, clear any previous hole tracking and return false. // If hole is not present, clear any previous hole tracking and return false.
hashRangeHoleTracker.reset(); hashRangeHoleTracker.reset();
@ -296,7 +296,7 @@ class PeriodicShardSyncManager {
final KinesisClientLease maxHashKeyLease = final KinesisClientLease maxHashKeyLease =
sortedLeasesWithHashKeyRanges.get(sortedLeasesWithHashKeyRanges.size() - 1); sortedLeasesWithHashKeyRanges.get(sortedLeasesWithHashKeyRanges.size() - 1);
if (!minHashKeyLease.getHashKeyRange().startingHashKey().equals(MIN_HASH_KEY) || if (!minHashKeyLease.getHashKeyRange().startingHashKey().equals(MIN_HASH_KEY) ||
!maxHashKeyLease.getHashKeyRange().endingHashKey().equals(MAX_HASH_KEY)) { !maxHashKeyLease.getHashKeyRange().endingHashKey().equals(MAX_HASH_KEY)) {
LOG.error("Incomplete hash range found between " + minHashKeyLease + " and " + maxHashKeyLease); LOG.error("Incomplete hash range found between " + minHashKeyLease + " and " + maxHashKeyLease);
return Optional.of(new HashRangeHole(minHashKeyLease.getHashKeyRange(), maxHashKeyLease.getHashKeyRange())); return Optional.of(new HashRangeHole(minHashKeyLease.getHashKeyRange(), maxHashKeyLease.getHashKeyRange()));
} }

View file

@ -63,9 +63,9 @@ public class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer
* @param validator Used for validating sequence numbers * @param validator Used for validating sequence numbers
*/ */
public RecordProcessorCheckpointer(ShardInfo shardInfo, public RecordProcessorCheckpointer(ShardInfo shardInfo,
ICheckpoint checkpoint, ICheckpoint checkpoint,
SequenceNumberValidator validator, SequenceNumberValidator validator,
IMetricsFactory metricsFactory) { IMetricsFactory metricsFactory) {
this.shardInfo = shardInfo; this.shardInfo = shardInfo;
this.checkpoint = checkpoint; this.checkpoint = checkpoint;
this.sequenceNumberValidator = validator; this.sequenceNumberValidator = validator;

View file

@ -38,11 +38,11 @@ public class StreamConfig {
* @param initialPositionInStream Initial position in stream * @param initialPositionInStream Initial position in stream
*/ */
StreamConfig(IKinesisProxy proxy, StreamConfig(IKinesisProxy proxy,
int maxRecords, int maxRecords,
long idleTimeInMilliseconds, long idleTimeInMilliseconds,
boolean callProcessRecordsEvenForEmptyRecordList, boolean callProcessRecordsEvenForEmptyRecordList,
boolean validateSequenceNumberBeforeCheckpointing, boolean validateSequenceNumberBeforeCheckpointing,
InitialPositionInStreamExtended initialPositionInStream) { InitialPositionInStreamExtended initialPositionInStream) {
this.streamProxy = proxy; this.streamProxy = proxy;
this.maxRecords = maxRecords; this.maxRecords = maxRecords;
this.idleTimeInMilliseconds = idleTimeInMilliseconds; this.idleTimeInMilliseconds = idleTimeInMilliseconds;

View file

@ -473,11 +473,11 @@ public class Worker implements Runnable {
// NOTE: This has package level access solely for testing // NOTE: This has package level access solely for testing
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES
Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config,
StreamConfig streamConfig, InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis, StreamConfig streamConfig, InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis,
long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint, long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint,
KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService,
IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) { boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) {
this(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream, parentShardPollIntervalMillis, this(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream, parentShardPollIntervalMillis,
shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService, shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService,
metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist,
@ -530,13 +530,13 @@ public class Worker implements Runnable {
// NOTE: This has package level access solely for testing // NOTE: This has package level access solely for testing
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES
Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, StreamConfig streamConfig, Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, StreamConfig streamConfig,
InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis, InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis,
long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint, long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint,
KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService,
IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization,
Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> maxGetRecordsThreadPool, WorkerStateChangeListener workerStateChangeListener, Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> maxGetRecordsThreadPool, WorkerStateChangeListener workerStateChangeListener,
LeaseCleanupValidator leaseCleanupValidator, LeaderDecider leaderDecider, PeriodicShardSyncManager periodicShardSyncManager) { LeaseCleanupValidator leaseCleanupValidator, LeaderDecider leaderDecider, PeriodicShardSyncManager periodicShardSyncManager) {
this(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream, this(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream,
parentShardPollIntervalMillis, shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, parentShardPollIntervalMillis, shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint,
leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis,
@ -546,14 +546,14 @@ public class Worker implements Runnable {
} }
Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config,
StreamConfig streamConfig, InitialPositionInStreamExtended initialPositionInStream, StreamConfig streamConfig, InitialPositionInStreamExtended initialPositionInStream,
long parentShardPollIntervalMillis, long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, long parentShardPollIntervalMillis, long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion,
ICheckpoint checkpoint, KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, ICheckpoint checkpoint, KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService,
IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization,
Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> maxGetRecordsThreadPool, Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> maxGetRecordsThreadPool,
WorkerStateChangeListener workerStateChangeListener, ShardSyncer shardSyncer, LeaderDecider leaderDecider, WorkerStateChangeListener workerStateChangeListener, ShardSyncer shardSyncer, LeaderDecider leaderDecider,
PeriodicShardSyncManager periodicShardSyncManager, IShardConsumerFactory shardConsumerFactory) { PeriodicShardSyncManager periodicShardSyncManager, IShardConsumerFactory shardConsumerFactory) {
this.applicationName = applicationName; this.applicationName = applicationName;
this.recordProcessorFactory = recordProcessorFactory; this.recordProcessorFactory = recordProcessorFactory;
this.config = config; this.config = config;
@ -606,7 +606,7 @@ public class Worker implements Runnable {
default: default:
if (leaderDecider != null) { if (leaderDecider != null) {
LOG.warn("LeaderDecider cannot be customized with non-PERIODIC shard sync strategy type. Using " + LOG.warn("LeaderDecider cannot be customized with non-PERIODIC shard sync strategy type. Using " +
"default LeaderDecider."); "default LeaderDecider.");
} }
this.leaderDecider = getOrCreateLeaderDecider(null); this.leaderDecider = getOrCreateLeaderDecider(null);
this.leaderElectedPeriodicShardSyncManager = this.leaderElectedPeriodicShardSyncManager =
@ -618,7 +618,7 @@ public class Worker implements Runnable {
} }
private static KinesisClientLibLeaseCoordinator getLeaseCoordinator(KinesisClientLibConfiguration config, private static KinesisClientLibLeaseCoordinator getLeaseCoordinator(KinesisClientLibConfiguration config,
AmazonDynamoDB dynamoDBClient, IMetricsFactory metricsFactory) { AmazonDynamoDB dynamoDBClient, IMetricsFactory metricsFactory) {
return new KinesisClientLibLeaseCoordinator( return new KinesisClientLibLeaseCoordinator(
new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient, config.getBillingMode()), DEFAULT_LEASE_SELECTOR, new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient, config.getBillingMode()), DEFAULT_LEASE_SELECTOR,
config.getWorkerIdentifier(), config.getFailoverTimeMillis(), config.getEpsilonMillis(), config.getWorkerIdentifier(), config.getFailoverTimeMillis(), config.getEpsilonMillis(),
@ -1233,7 +1233,7 @@ public class Worker implements Runnable {
* @return Returns metrics factory based on the config. * @return Returns metrics factory based on the config.
*/ */
public static IMetricsFactory getMetricsFactory(AmazonCloudWatch cloudWatchClient, public static IMetricsFactory getMetricsFactory(AmazonCloudWatch cloudWatchClient,
KinesisClientLibConfiguration config) { KinesisClientLibConfiguration config) {
IMetricsFactory metricsFactory; IMetricsFactory metricsFactory;
if (config.getMetricsLevel() == MetricsLevel.NONE) { if (config.getMetricsLevel() == MetricsLevel.NONE) {
metricsFactory = new NullMetricsFactory(); metricsFactory = new NullMetricsFactory();
@ -1287,27 +1287,27 @@ public class Worker implements Runnable {
/** A non-null PeriodicShardSyncManager can only provided from unit tests. Any application code will create the /** A non-null PeriodicShardSyncManager can only provided from unit tests. Any application code will create the
* PeriodicShardSyncManager for the first time here. */ * PeriodicShardSyncManager for the first time here. */
private PeriodicShardSyncManager getOrCreatePeriodicShardSyncManager(PeriodicShardSyncManager periodicShardSyncManager, private PeriodicShardSyncManager getOrCreatePeriodicShardSyncManager(PeriodicShardSyncManager periodicShardSyncManager,
boolean isAuditorMode) { boolean isAuditorMode) {
if (periodicShardSyncManager != null) { if (periodicShardSyncManager != null) {
return periodicShardSyncManager; return periodicShardSyncManager;
} }
return new PeriodicShardSyncManager(config.getWorkerIdentifier(), return new PeriodicShardSyncManager(config.getWorkerIdentifier(),
leaderDecider, leaderDecider,
new ShardSyncTask(streamConfig.getStreamProxy(), new ShardSyncTask(streamConfig.getStreamProxy(),
leaseCoordinator.getLeaseManager(), leaseCoordinator.getLeaseManager(),
config.getInitialPositionInStreamExtended(), config.getInitialPositionInStreamExtended(),
config.shouldCleanupLeasesUponShardCompletion(), config.shouldCleanupLeasesUponShardCompletion(),
config.shouldIgnoreUnexpectedChildShards(), config.shouldIgnoreUnexpectedChildShards(),
SHARD_SYNC_SLEEP_FOR_PERIODIC_SHARD_SYNC, SHARD_SYNC_SLEEP_FOR_PERIODIC_SHARD_SYNC,
shardSyncer, shardSyncer,
null), null),
metricsFactory, metricsFactory,
leaseCoordinator.getLeaseManager(), leaseCoordinator.getLeaseManager(),
streamConfig.getStreamProxy(), streamConfig.getStreamProxy(),
isAuditorMode, isAuditorMode,
config.getLeasesRecoveryAuditorExecutionFrequencyMillis(), config.getLeasesRecoveryAuditorExecutionFrequencyMillis(),
config.getLeasesRecoveryAuditorInconsistencyConfidenceThreshold()); config.getLeasesRecoveryAuditorInconsistencyConfidenceThreshold());
} }
/** /**
@ -1317,7 +1317,7 @@ public class Worker implements Runnable {
static class WorkerCWMetricsFactory extends CWMetricsFactory { static class WorkerCWMetricsFactory extends CWMetricsFactory {
WorkerCWMetricsFactory(AmazonCloudWatch cloudWatchClient, String namespace, long bufferTimeMillis, WorkerCWMetricsFactory(AmazonCloudWatch cloudWatchClient, String namespace, long bufferTimeMillis,
int maxQueueSize, MetricsLevel metricsLevel, Set<String> metricsEnabledDimensions) { int maxQueueSize, MetricsLevel metricsLevel, Set<String> metricsEnabledDimensions) {
super(cloudWatchClient, namespace, bufferTimeMillis, maxQueueSize, metricsLevel, metricsEnabledDimensions); super(cloudWatchClient, namespace, bufferTimeMillis, maxQueueSize, metricsLevel, metricsEnabledDimensions);
} }
} }
@ -1524,7 +1524,7 @@ public class Worker implements Runnable {
if (leaderDecider == null) { if (leaderDecider == null) {
leaderDecider = new DeterministicShuffleShardSyncLeaderDecider(leaseManager, leaderDecider = new DeterministicShuffleShardSyncLeaderDecider(leaseManager,
Executors.newSingleThreadScheduledExecutor(), PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT); Executors.newSingleThreadScheduledExecutor(), PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT);
} }
return new Worker(config.getApplicationName(), return new Worker(config.getApplicationName(),
recordProcessorFactory, recordProcessorFactory,
@ -1564,10 +1564,10 @@ public class Worker implements Runnable {
} }
<R, T extends AwsClientBuilder<T, R>> R createClient(final T builder, <R, T extends AwsClientBuilder<T, R>> R createClient(final T builder,
final AWSCredentialsProvider credentialsProvider, final AWSCredentialsProvider credentialsProvider,
final ClientConfiguration clientConfiguration, final ClientConfiguration clientConfiguration,
final String endpointUrl, final String endpointUrl,
final String region) { final String region) {
if (credentialsProvider != null) { if (credentialsProvider != null) {
builder.withCredentials(credentialsProvider); builder.withCredentials(credentialsProvider);
} }

View file

@ -114,7 +114,7 @@ public class LeaseCleanupManager {
completedLeaseStopwatch.start(); completedLeaseStopwatch.start();
garbageLeaseStopwatch.start(); garbageLeaseStopwatch.start();
deletionThreadPool.scheduleAtFixedRate(new LeaseCleanupThread(), INITIAL_DELAY, leaseCleanupIntervalMillis, deletionThreadPool.scheduleAtFixedRate(new LeaseCleanupThread(), INITIAL_DELAY, leaseCleanupIntervalMillis,
TimeUnit.MILLISECONDS); TimeUnit.MILLISECONDS);
isRunning = true; isRunning = true;
} else { } else {
LOG.info("Lease cleanup thread already running, no need to start."); LOG.info("Lease cleanup thread already running, no need to start.");
@ -241,7 +241,7 @@ public class LeaseCleanupManager {
if (!cleanedUpCompletedLease && !alreadyCheckedForGarbageCollection && timeToCheckForGarbageShard) { if (!cleanedUpCompletedLease && !alreadyCheckedForGarbageCollection && timeToCheckForGarbageShard) {
// throws ResourceNotFoundException // throws ResourceNotFoundException
wereChildShardsPresent = !CollectionUtils wereChildShardsPresent = !CollectionUtils
.isNullOrEmpty(getChildShardsFromService(shardInfo)); .isNullOrEmpty(getChildShardsFromService(shardInfo));
} }
} catch (ResourceNotFoundException e) { } catch (ResourceNotFoundException e) {
wasResourceNotFound = true; wasResourceNotFound = true;
@ -296,7 +296,7 @@ public class LeaseCleanupManager {
for (String childShardLeaseKey : childShardLeaseKeys) { for (String childShardLeaseKey : childShardLeaseKeys) {
final KinesisClientLease childShardLease = Optional.ofNullable( final KinesisClientLease childShardLease = Optional.ofNullable(
leaseManager.getLease(childShardLeaseKey)) leaseManager.getLease(childShardLeaseKey))
.orElseThrow(() -> new IllegalStateException( .orElseThrow(() -> new IllegalStateException(
"Child lease " + childShardLeaseKey + " for completed shard not found in " "Child lease " + childShardLeaseKey + " for completed shard not found in "
+ "lease table - not cleaning up lease " + lease)); + "lease table - not cleaning up lease " + lease));