feature(dynamo): Provide option to suppress exceptions associated with missing data

This commit is contained in:
Shiva Vanamala 2021-08-09 08:12:18 -07:00
parent 5bed718bc4
commit a9cc9bfa6f
11 changed files with 131 additions and 45 deletions

View file

@ -530,7 +530,8 @@ class ConsumerStates {
consumer.isIgnoreUnexpectedChildShards(), consumer.isIgnoreUnexpectedChildShards(),
consumer.getLeaseManager(), consumer.getLeaseManager(),
consumer.getTaskBackoffTimeMillis(), consumer.getTaskBackoffTimeMillis(),
consumer.getGetRecordsCache()); consumer.getGetRecordsCache(),
consumer.isSuppressMissingIncompleteLeasesException()); // change here
} }
@Override @Override

View file

@ -63,6 +63,8 @@ class ShardConsumer {
private final GetRecordsCache getRecordsCache; private final GetRecordsCache getRecordsCache;
private final boolean suppressMissingIncompleteLeasesException;
private static final GetRecordsRetrievalStrategy makeStrategy(KinesisDataFetcher dataFetcher, private static final GetRecordsRetrievalStrategy makeStrategy(KinesisDataFetcher dataFetcher,
Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> retryGetRecordsInSeconds,
Optional<Integer> maxGetRecordsThreadPool, Optional<Integer> maxGetRecordsThreadPool,
@ -124,7 +126,8 @@ class ShardConsumer {
skipShardSyncAtWorkerInitializationIfLeasesExist, skipShardSyncAtWorkerInitializationIfLeasesExist,
Optional.empty(), Optional.empty(),
Optional.empty(), Optional.empty(),
config); config,
Worker.DEFAULT_SUPPRESS_MISSING_INCOMPLETE_LEASES_EXCEPTION);
} }
/** /**
@ -155,7 +158,8 @@ class ShardConsumer {
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> retryGetRecordsInSeconds,
Optional<Integer> maxGetRecordsThreadPool, Optional<Integer> maxGetRecordsThreadPool,
KinesisClientLibConfiguration config) { KinesisClientLibConfiguration config,
boolean suppressMissingIncompleteLeasesException) {
this( this(
shardInfo, shardInfo,
@ -180,7 +184,8 @@ class ShardConsumer {
new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo), new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo),
retryGetRecordsInSeconds, retryGetRecordsInSeconds,
maxGetRecordsThreadPool, maxGetRecordsThreadPool,
config config,
suppressMissingIncompleteLeasesException
); );
} }
@ -217,7 +222,8 @@ class ShardConsumer {
KinesisDataFetcher kinesisDataFetcher, KinesisDataFetcher kinesisDataFetcher,
Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> retryGetRecordsInSeconds,
Optional<Integer> maxGetRecordsThreadPool, Optional<Integer> maxGetRecordsThreadPool,
KinesisClientLibConfiguration config) { KinesisClientLibConfiguration config,
boolean suppressMissingIncompleteLeasesException) {
this.shardInfo = shardInfo; this.shardInfo = shardInfo;
this.streamConfig = streamConfig; this.streamConfig = streamConfig;
this.checkpoint = checkpoint; this.checkpoint = checkpoint;
@ -235,6 +241,7 @@ class ShardConsumer {
this.getRecordsCache = config.getRecordsFetcherFactory().createRecordsFetcher( this.getRecordsCache = config.getRecordsFetcherFactory().createRecordsFetcher(
makeStrategy(this.dataFetcher, retryGetRecordsInSeconds, maxGetRecordsThreadPool, this.shardInfo), makeStrategy(this.dataFetcher, retryGetRecordsInSeconds, maxGetRecordsThreadPool, this.shardInfo),
this.getShardInfo().getShardId(), this.metricsFactory, this.config.getMaxRecords()); this.getShardInfo().getShardId(), this.metricsFactory, this.config.getMaxRecords());
this.suppressMissingIncompleteLeasesException = suppressMissingIncompleteLeasesException;
} }
/** /**
@ -397,7 +404,7 @@ class ShardConsumer {
* @return Return next task to run * @return Return next task to run
*/ */
private ITask getNextTask() { private ITask getNextTask() {
ITask nextTask = currentState.createTask(this); ITask nextTask = currentState.createTask(this); // change here (add property into consumer)
if (nextTask == null) { if (nextTask == null) {
return null; return null;
@ -503,4 +510,8 @@ class ShardConsumer {
ShutdownNotification getShutdownNotification() { ShutdownNotification getShutdownNotification() {
return shutdownNotification; return shutdownNotification;
} }
public boolean isSuppressMissingIncompleteLeasesException() {
return suppressMissingIncompleteLeasesException;
}
} }

View file

@ -39,6 +39,8 @@ class ShardSyncTask implements ITask {
private final long shardSyncTaskIdleTimeMillis; private final long shardSyncTaskIdleTimeMillis;
private final TaskType taskType = TaskType.SHARDSYNC; private final TaskType taskType = TaskType.SHARDSYNC;
private final boolean suppressMissingIncompleteLeasesException;
/** /**
* @param kinesisProxy Used to fetch information about the stream (e.g. shard list) * @param kinesisProxy Used to fetch information about the stream (e.g. shard list)
* @param leaseManager Used to fetch and create leases * @param leaseManager Used to fetch and create leases
@ -51,13 +53,15 @@ class ShardSyncTask implements ITask {
InitialPositionInStreamExtended initialPositionInStream, InitialPositionInStreamExtended initialPositionInStream,
boolean cleanupLeasesUponShardCompletion, boolean cleanupLeasesUponShardCompletion,
boolean ignoreUnexpectedChildShards, boolean ignoreUnexpectedChildShards,
long shardSyncTaskIdleTimeMillis) { long shardSyncTaskIdleTimeMillis,
boolean suppressMissingIncompleteLeasesException) { // change here
this.kinesisProxy = kinesisProxy; this.kinesisProxy = kinesisProxy;
this.leaseManager = leaseManager; this.leaseManager = leaseManager;
this.initialPosition = initialPositionInStream; this.initialPosition = initialPositionInStream;
this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion; this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion;
this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards; this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
this.shardSyncTaskIdleTimeMillis = shardSyncTaskIdleTimeMillis; this.shardSyncTaskIdleTimeMillis = shardSyncTaskIdleTimeMillis;
this.suppressMissingIncompleteLeasesException = suppressMissingIncompleteLeasesException;
} }
/* (non-Javadoc) /* (non-Javadoc)
@ -72,7 +76,8 @@ class ShardSyncTask implements ITask {
leaseManager, leaseManager,
initialPosition, initialPosition,
cleanupLeasesUponShardCompletion, cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards); ignoreUnexpectedChildShards,
suppressMissingIncompleteLeasesException); // change here
if (shardSyncTaskIdleTimeMillis > 0) { if (shardSyncTaskIdleTimeMillis > 0) {
Thread.sleep(shardSyncTaskIdleTimeMillis); Thread.sleep(shardSyncTaskIdleTimeMillis);
} }

View file

@ -47,10 +47,11 @@ class ShardSyncTaskManager {
private boolean ignoreUnexpectedChildShards; private boolean ignoreUnexpectedChildShards;
private final long shardSyncIdleTimeMillis; private final long shardSyncIdleTimeMillis;
private final boolean suppressMissingIncompleteLeasesException;
/** /**
* Constructor. * Constructor.
*
* @param kinesisProxy Proxy used to fetch streamInfo (shards) * @param kinesisProxy Proxy used to fetch streamInfo (shards)
* @param leaseManager Lease manager (used to list and create leases for shards) * @param leaseManager Lease manager (used to list and create leases for shards)
* @param initialPositionInStream Initial position in stream * @param initialPositionInStream Initial position in stream
@ -60,6 +61,7 @@ class ShardSyncTaskManager {
* @param shardSyncIdleTimeMillis Time between tasks to sync leases and Kinesis shards * @param shardSyncIdleTimeMillis Time between tasks to sync leases and Kinesis shards
* @param metricsFactory Metrics factory * @param metricsFactory Metrics factory
* @param executorService ExecutorService to execute the shard sync tasks * @param executorService ExecutorService to execute the shard sync tasks
* @param suppressMissingIncompleteLeasesException
*/ */
ShardSyncTaskManager(final IKinesisProxy kinesisProxy, ShardSyncTaskManager(final IKinesisProxy kinesisProxy,
final ILeaseManager<KinesisClientLease> leaseManager, final ILeaseManager<KinesisClientLease> leaseManager,
@ -68,7 +70,8 @@ class ShardSyncTaskManager {
final boolean ignoreUnexpectedChildShards, final boolean ignoreUnexpectedChildShards,
final long shardSyncIdleTimeMillis, final long shardSyncIdleTimeMillis,
final IMetricsFactory metricsFactory, final IMetricsFactory metricsFactory,
ExecutorService executorService) { ExecutorService executorService,
boolean suppressMissingIncompleteLeasesException) {
this.kinesisProxy = kinesisProxy; this.kinesisProxy = kinesisProxy;
this.leaseManager = leaseManager; this.leaseManager = leaseManager;
this.metricsFactory = metricsFactory; this.metricsFactory = metricsFactory;
@ -77,6 +80,7 @@ class ShardSyncTaskManager {
this.shardSyncIdleTimeMillis = shardSyncIdleTimeMillis; this.shardSyncIdleTimeMillis = shardSyncIdleTimeMillis;
this.executorService = executorService; this.executorService = executorService;
this.initialPositionInStream = initialPositionInStream; this.initialPositionInStream = initialPositionInStream;
this.suppressMissingIncompleteLeasesException = suppressMissingIncompleteLeasesException;
} }
synchronized boolean syncShardAndLeaseInfo(Set<String> closedShardIds) { synchronized boolean syncShardAndLeaseInfo(Set<String> closedShardIds) {
@ -104,7 +108,8 @@ class ShardSyncTaskManager {
initialPositionInStream, initialPositionInStream,
cleanupLeasesUponShardCompletion, cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards, ignoreUnexpectedChildShards,
shardSyncIdleTimeMillis), metricsFactory); shardSyncIdleTimeMillis,
suppressMissingIncompleteLeasesException), metricsFactory); // change here
future = executorService.submit(currentTask); future = executorService.submit(currentTask);
submittedNewTask = true; submittedNewTask = true;
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {

View file

@ -64,9 +64,9 @@ class ShardSyncer {
InitialPositionInStreamExtended initialPositionInStream, InitialPositionInStreamExtended initialPositionInStream,
boolean cleanupLeasesOfCompletedShards, boolean cleanupLeasesOfCompletedShards,
boolean ignoreUnexpectedChildShards) boolean ignoreUnexpectedChildShards)
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { // ignore
syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards, syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards); ignoreUnexpectedChildShards, Worker.DEFAULT_SUPPRESS_MISSING_INCOMPLETE_LEASES_EXCEPTION);
} }
/** /**
@ -86,9 +86,10 @@ class ShardSyncer {
ILeaseManager<KinesisClientLease> leaseManager, ILeaseManager<KinesisClientLease> leaseManager,
InitialPositionInStreamExtended initialPositionInStream, InitialPositionInStreamExtended initialPositionInStream,
boolean cleanupLeasesOfCompletedShards, boolean cleanupLeasesOfCompletedShards,
boolean ignoreUnexpectedChildShards) boolean ignoreUnexpectedChildShards,
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { boolean suppressMissingIncompleteLeasesException)
syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards); throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { // change here
syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, suppressMissingIncompleteLeasesException);
} }
static synchronized void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy, static synchronized void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy,
@ -96,7 +97,7 @@ class ShardSyncer {
InitialPositionInStreamExtended initialPositionInStream, InitialPositionInStreamExtended initialPositionInStream,
boolean cleanupLeasesOfCompletedShards) boolean cleanupLeasesOfCompletedShards)
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards, false); checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards, false, Worker.DEFAULT_SUPPRESS_MISSING_INCOMPLETE_LEASES_EXCEPTION);
} }
/** /**
@ -117,7 +118,8 @@ class ShardSyncer {
ILeaseManager<KinesisClientLease> leaseManager, ILeaseManager<KinesisClientLease> leaseManager,
InitialPositionInStreamExtended initialPosition, InitialPositionInStreamExtended initialPosition,
boolean cleanupLeasesOfCompletedShards, boolean cleanupLeasesOfCompletedShards,
boolean ignoreUnexpectedChildShards) boolean ignoreUnexpectedChildShards,
boolean suppressMissingIncompleteLeasesException)
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
List<Shard> shards = getShardList(kinesisProxy); List<Shard> shards = getShardList(kinesisProxy);
LOG.debug("Num shards: " + shards.size()); LOG.debug("Num shards: " + shards.size());
@ -150,7 +152,7 @@ class ShardSyncer {
trackedLeases.addAll(currentLeases); trackedLeases.addAll(currentLeases);
} }
trackedLeases.addAll(newLeasesToCreate); trackedLeases.addAll(newLeasesToCreate);
cleanupGarbageLeases(shards, trackedLeases, kinesisProxy, leaseManager); cleanupGarbageLeases(shards, trackedLeases, kinesisProxy, leaseManager, suppressMissingIncompleteLeasesException); // change here
if (cleanupLeasesOfCompletedShards) { if (cleanupLeasesOfCompletedShards) {
cleanupLeasesOfFinishedShards(currentLeases, cleanupLeasesOfFinishedShards(currentLeases,
shardIdToShardMap, shardIdToShardMap,
@ -597,7 +599,9 @@ class ShardSyncer {
private static void cleanupGarbageLeases(List<Shard> shards, private static void cleanupGarbageLeases(List<Shard> shards,
List<KinesisClientLease> trackedLeases, List<KinesisClientLease> trackedLeases,
IKinesisProxy kinesisProxy, IKinesisProxy kinesisProxy,
ILeaseManager<KinesisClientLease> leaseManager) ILeaseManager<KinesisClientLease> leaseManager,
boolean suppressMissingIncompleteLeasesException)
throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException { throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException {
Set<String> kinesisShards = new HashSet<>(); Set<String> kinesisShards = new HashSet<>();
for (Shard shard : shards) { for (Shard shard : shards) {
@ -625,7 +629,7 @@ class ShardSyncer {
for (KinesisClientLease lease : garbageLeases) { for (KinesisClientLease lease : garbageLeases) {
if (isCandidateForCleanup(lease, currentKinesisShardIds)) { if (isCandidateForCleanup(lease, currentKinesisShardIds)) {
if (lease.isComplete()) { if (lease.isComplete() || suppressMissingIncompleteLeasesException) { // change here
LOG.info("Deleting lease for a complete shard " + lease.getLeaseKey() LOG.info("Deleting lease for a complete shard " + lease.getLeaseKey()
+ " as it is not present in Kinesis stream."); + " as it is not present in Kinesis stream.");
leaseManager.deleteLease(lease); leaseManager.deleteLease(lease);

View file

@ -48,6 +48,7 @@ class ShutdownTask implements ITask {
private final TaskType taskType = TaskType.SHUTDOWN; private final TaskType taskType = TaskType.SHUTDOWN;
private final long backoffTimeMillis; private final long backoffTimeMillis;
private final GetRecordsCache getRecordsCache; private final GetRecordsCache getRecordsCache;
private final boolean isSuppressMissingIncompleteLeasesException;
/** /**
* Constructor. * Constructor.
@ -63,7 +64,8 @@ class ShutdownTask implements ITask {
boolean ignoreUnexpectedChildShards, boolean ignoreUnexpectedChildShards,
ILeaseManager<KinesisClientLease> leaseManager, ILeaseManager<KinesisClientLease> leaseManager,
long backoffTimeMillis, long backoffTimeMillis,
GetRecordsCache getRecordsCache) { GetRecordsCache getRecordsCache,
boolean isSuppressMissingIncompleteLeasesException) { // change here
this.shardInfo = shardInfo; this.shardInfo = shardInfo;
this.recordProcessor = recordProcessor; this.recordProcessor = recordProcessor;
this.recordProcessorCheckpointer = recordProcessorCheckpointer; this.recordProcessorCheckpointer = recordProcessorCheckpointer;
@ -75,6 +77,7 @@ class ShutdownTask implements ITask {
this.leaseManager = leaseManager; this.leaseManager = leaseManager;
this.backoffTimeMillis = backoffTimeMillis; this.backoffTimeMillis = backoffTimeMillis;
this.getRecordsCache = getRecordsCache; this.getRecordsCache = getRecordsCache;
this.isSuppressMissingIncompleteLeasesException = isSuppressMissingIncompleteLeasesException;
} }
/* /*
@ -131,7 +134,8 @@ class ShutdownTask implements ITask {
leaseManager, leaseManager,
initialPositionInStream, initialPositionInStream,
cleanupLeasesOfCompletedShards, cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards); ignoreUnexpectedChildShards,
isSuppressMissingIncompleteLeasesException); // change here
LOG.debug("Finished checking for child shards of shard " + shardInfo.getShardId()); LOG.debug("Finished checking for child shards of shard " + shardInfo.getShardId());
} }

View file

@ -81,6 +81,7 @@ public class Worker implements Runnable {
private static final int MAX_RETRIES = 4; private static final int MAX_RETRIES = 4;
private static final WorkerStateChangeListener DEFAULT_WORKER_STATE_CHANGE_LISTENER = new NoOpWorkerStateChangeListener(); private static final WorkerStateChangeListener DEFAULT_WORKER_STATE_CHANGE_LISTENER = new NoOpWorkerStateChangeListener();
private static final boolean DEFAULT_EXITS_ON_FAILURE = false; private static final boolean DEFAULT_EXITS_ON_FAILURE = false;
public static final boolean DEFAULT_SUPPRESS_MISSING_INCOMPLETE_LEASES_EXCEPTION = false;
private WorkerLog wlog = new WorkerLog(); private WorkerLog wlog = new WorkerLog();
@ -123,6 +124,8 @@ public class Worker implements Runnable {
// fivetran configurables // fivetran configurables
private final boolean exitOnFailure; private final boolean exitOnFailure;
private final boolean suppressMissingIncompleteLeasesException;
/** /**
* Used to ensure that only one requestedShutdown is in progress at a time. * Used to ensure that only one requestedShutdown is in progress at a time.
@ -434,6 +437,42 @@ public class Worker implements Runnable {
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization,
Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> maxGetRecordsThreadPool, WorkerStateChangeListener workerStateChangeListener, Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> maxGetRecordsThreadPool, WorkerStateChangeListener workerStateChangeListener,
boolean exitOnFailure) { boolean exitOnFailure) {
this(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream, parentShardPollIntervalMillis,
shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService,
metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist,
shardPrioritization, retryGetRecordsInSeconds, maxGetRecordsThreadPool, workerStateChangeListener, exitOnFailure, DEFAULT_SUPPRESS_MISSING_INCOMPLETE_LEASES_EXCEPTION);
}
/**
* @param applicationName Name of the Kinesis application
* @param recordProcessorFactory Used to get record processor instances for processing data from shards
* @param config Kinesis Library Configuration
* @param streamConfig Stream configuration
* @param initialPositionInStream One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. The KinesisClientLibrary will start fetching data from
* this location in the stream when an application starts up for the first time and there are no
* checkpoints. If there are checkpoints, we start from the checkpoint position.
* @param parentShardPollIntervalMillis Wait for this long between polls to check if parent shards are done
* @param shardSyncIdleTimeMillis Time between tasks to sync leases and Kinesis shards
* @param cleanupLeasesUponShardCompletion Clean up shards we've finished processing (don't wait till they expire in Kinesis)
* @param checkpoint Used to get/set checkpoints
* @param leaseCoordinator Lease coordinator (coordinates currently owned leases)
* @param execService ExecutorService to use for processing records (support for multi-threaded consumption)
* @param metricsFactory Metrics factory used to emit metrics
* @param taskBackoffTimeMillis Backoff period when tasks encounter an exception
* @param shardPrioritization Provides prioritization logic to decide which available shards process first
* @param retryGetRecordsInSeconds Time in seconds to wait before the worker retries to get a record.
* @param maxGetRecordsThreadPool Max number of threads in the getRecords thread pool.
*/
// NOTE: This has package level access solely for testing
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES
Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, StreamConfig streamConfig,
InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis,
long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint,
KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService,
IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization,
Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> maxGetRecordsThreadPool, WorkerStateChangeListener workerStateChangeListener,
boolean exitOnFailure, boolean suppressMissingIncompleteLeasesException) {
this.applicationName = applicationName; this.applicationName = applicationName;
this.recordProcessorFactory = recordProcessorFactory; this.recordProcessorFactory = recordProcessorFactory;
this.config = config; this.config = config;
@ -448,7 +487,7 @@ public class Worker implements Runnable {
this.metricsFactory = metricsFactory; this.metricsFactory = metricsFactory;
this.controlServer = new ShardSyncTaskManager(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(), this.controlServer = new ShardSyncTaskManager(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(),
initialPositionInStream, cleanupLeasesUponShardCompletion, config.shouldIgnoreUnexpectedChildShards(), initialPositionInStream, cleanupLeasesUponShardCompletion, config.shouldIgnoreUnexpectedChildShards(),
shardSyncIdleTimeMillis, metricsFactory, executorService); shardSyncIdleTimeMillis, metricsFactory, executorService, suppressMissingIncompleteLeasesException); // change here
this.taskBackoffTimeMillis = taskBackoffTimeMillis; this.taskBackoffTimeMillis = taskBackoffTimeMillis;
this.failoverTimeMillis = failoverTimeMillis; this.failoverTimeMillis = failoverTimeMillis;
this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist; this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist;
@ -457,6 +496,7 @@ public class Worker implements Runnable {
this.maxGetRecordsThreadPool = maxGetRecordsThreadPool; this.maxGetRecordsThreadPool = maxGetRecordsThreadPool;
this.workerStateChangeListener = workerStateChangeListener; this.workerStateChangeListener = workerStateChangeListener;
this.exitOnFailure = exitOnFailure; this.exitOnFailure = exitOnFailure;
this.suppressMissingIncompleteLeasesException = suppressMissingIncompleteLeasesException;
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.CREATED); workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.CREATED);
} }
@ -600,7 +640,7 @@ public class Worker implements Runnable {
LOG.info("Syncing Kinesis shard info"); LOG.info("Syncing Kinesis shard info");
ShardSyncTask shardSyncTask = new ShardSyncTask(streamConfig.getStreamProxy(), ShardSyncTask shardSyncTask = new ShardSyncTask(streamConfig.getStreamProxy(),
leaseCoordinator.getLeaseManager(), initialPosition, cleanupLeasesUponShardCompletion, leaseCoordinator.getLeaseManager(), initialPosition, cleanupLeasesUponShardCompletion,
config.shouldIgnoreUnexpectedChildShards(), 0L); config.shouldIgnoreUnexpectedChildShards(), 0L, suppressMissingIncompleteLeasesException); // change here
result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call(); result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call();
} else { } else {
LOG.info("Skipping shard sync per config setting (and lease table is not empty)"); LOG.info("Skipping shard sync per config setting (and lease table is not empty)");
@ -966,7 +1006,8 @@ public class Worker implements Runnable {
skipShardSyncAtWorkerInitializationIfLeasesExist, skipShardSyncAtWorkerInitializationIfLeasesExist,
retryGetRecordsInSeconds, retryGetRecordsInSeconds,
maxGetRecordsThreadPool, maxGetRecordsThreadPool,
config); config,
suppressMissingIncompleteLeasesException); // change here
} }
@ -1120,6 +1161,7 @@ public class Worker implements Runnable {
// fivetran additions // fivetran additions
boolean exitOnFailure = DEFAULT_EXITS_ON_FAILURE; boolean exitOnFailure = DEFAULT_EXITS_ON_FAILURE;
private boolean suppressMissingIncompleteLeasesException = DEFAULT_SUPPRESS_MISSING_INCOMPLETE_LEASES_EXCEPTION;
@VisibleForTesting @VisibleForTesting
AmazonKinesis getKinesisClient() { AmazonKinesis getKinesisClient() {
@ -1265,7 +1307,8 @@ public class Worker implements Runnable {
config.getRetryGetRecordsInSeconds(), config.getRetryGetRecordsInSeconds(),
config.getMaxGetRecordsThreadPool(), config.getMaxGetRecordsThreadPool(),
workerStateChangeListener, workerStateChangeListener,
exitOnFailure); exitOnFailure,
suppressMissingIncompleteLeasesException);
} }
<R, T extends AwsClientBuilder<T, R>> R createClient(final T builder, <R, T extends AwsClientBuilder<T, R>> R createClient(final T builder,
@ -1303,6 +1346,11 @@ public class Worker implements Runnable {
return this; return this;
} }
public Builder suppressMissingIncompleteLeasesException(boolean suppressMissingIncompleteLeasesException) {
this.suppressMissingIncompleteLeasesException = suppressMissingIncompleteLeasesException;
return this;
}
public Builder dynamoDBClient(AmazonDynamoDB dynamoDBClient) { public Builder dynamoDBClient(AmazonDynamoDB dynamoDBClient) {
this.dynamoDBClient = dynamoDBClient; this.dynamoDBClient = dynamoDBClient;
return this; return this;

View file

@ -370,7 +370,8 @@ public class ShardConsumerTest {
dataFetcher, dataFetcher,
Optional.empty(), Optional.empty(),
Optional.empty(), Optional.empty(),
config); config,
false);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // check on parent shards consumer.consumeShard(); // check on parent shards
@ -514,7 +515,8 @@ public class ShardConsumerTest {
dataFetcher, dataFetcher,
Optional.empty(), Optional.empty(),
Optional.empty(), Optional.empty(),
config); config,
false);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // check on parent shards consumer.consumeShard(); // check on parent shards
@ -651,7 +653,8 @@ public class ShardConsumerTest {
dataFetcher, dataFetcher,
Optional.empty(), Optional.empty(),
Optional.empty(), Optional.empty(),
config); config,
false);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // check on parent shards consumer.consumeShard(); // check on parent shards
@ -774,7 +777,8 @@ public class ShardConsumerTest {
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
Optional.empty(), Optional.empty(),
Optional.empty(), Optional.empty(),
config); config,
false);
assertEquals(shardConsumer.getGetRecordsCache().getGetRecordsRetrievalStrategy().getClass(), assertEquals(shardConsumer.getGetRecordsCache().getGetRecordsRetrievalStrategy().getClass(),
SynchronousGetRecordsRetrievalStrategy.class); SynchronousGetRecordsRetrievalStrategy.class);
@ -804,7 +808,8 @@ public class ShardConsumerTest {
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
Optional.of(1), Optional.of(1),
Optional.of(2), Optional.of(2),
config); config,
false);
assertEquals(shardConsumer.getGetRecordsCache().getGetRecordsRetrievalStrategy().getClass(), assertEquals(shardConsumer.getGetRecordsCache().getGetRecordsRetrievalStrategy().getClass(),
AsynchronousGetRecordsRetrievalStrategy.class); AsynchronousGetRecordsRetrievalStrategy.class);

View file

@ -125,7 +125,8 @@ public class ShardSyncTaskIntegrationTest {
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST), InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST),
false, false,
false, false,
0L); 0L,
false);
syncTask.call(); syncTask.call();
List<KinesisClientLease> leases = leaseManager.listLeases(); List<KinesisClientLease> leases = leaseManager.listLeases();
Set<String> leaseKeys = new HashSet<String>(); Set<String> leaseKeys = new HashSet<String>();

View file

@ -350,7 +350,7 @@ public class ShardSyncerTest {
dataFile.deleteOnExit(); dataFile.deleteOnExit();
IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath()); IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath());
ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, INITIAL_POSITION_LATEST, ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, INITIAL_POSITION_LATEST,
cleanupLeasesOfCompletedShards, true); cleanupLeasesOfCompletedShards, true, false);
List<KinesisClientLease> newLeases = leaseManager.listLeases(); List<KinesisClientLease> newLeases = leaseManager.listLeases();
Set<String> expectedLeaseShardIds = new HashSet<String>(); Set<String> expectedLeaseShardIds = new HashSet<String>();
expectedLeaseShardIds.add("shardId-4"); expectedLeaseShardIds.add("shardId-4");

View file

@ -111,7 +111,8 @@ public class ShutdownTaskTest {
ignoreUnexpectedChildShards, ignoreUnexpectedChildShards,
leaseManager, leaseManager,
TASK_BACKOFF_TIME_MILLIS, TASK_BACKOFF_TIME_MILLIS,
getRecordsCache); getRecordsCache,
false);
TaskResult result = task.call(); TaskResult result = task.call();
Assert.assertNotNull(result.getException()); Assert.assertNotNull(result.getException());
Assert.assertTrue(result.getException() instanceof IllegalArgumentException); Assert.assertTrue(result.getException() instanceof IllegalArgumentException);
@ -139,7 +140,8 @@ public class ShutdownTaskTest {
ignoreUnexpectedChildShards, ignoreUnexpectedChildShards,
leaseManager, leaseManager,
TASK_BACKOFF_TIME_MILLIS, TASK_BACKOFF_TIME_MILLIS,
getRecordsCache); getRecordsCache,
false);
TaskResult result = task.call(); TaskResult result = task.call();
Assert.assertNotNull(result.getException()); Assert.assertNotNull(result.getException());
Assert.assertTrue(result.getException() instanceof KinesisClientLibIOException); Assert.assertTrue(result.getException() instanceof KinesisClientLibIOException);
@ -151,7 +153,7 @@ public class ShutdownTaskTest {
*/ */
@Test @Test
public final void testGetTaskType() { public final void testGetTaskType() {
ShutdownTask task = new ShutdownTask(null, null, null, null, null, null, false, false, null, 0, getRecordsCache); ShutdownTask task = new ShutdownTask(null, null, null, null, null, null, false, false, null, 0, getRecordsCache, false);
Assert.assertEquals(TaskType.SHUTDOWN, task.getTaskType()); Assert.assertEquals(TaskType.SHUTDOWN, task.getTaskType());
} }