minor cleanup of todos
This commit is contained in:
parent
a9cc9bfa6f
commit
68cafc4dce
7 changed files with 13 additions and 13 deletions
|
|
@ -531,7 +531,7 @@ class ConsumerStates {
|
||||||
consumer.getLeaseManager(),
|
consumer.getLeaseManager(),
|
||||||
consumer.getTaskBackoffTimeMillis(),
|
consumer.getTaskBackoffTimeMillis(),
|
||||||
consumer.getGetRecordsCache(),
|
consumer.getGetRecordsCache(),
|
||||||
consumer.isSuppressMissingIncompleteLeasesException()); // change here
|
consumer.isSuppressMissingIncompleteLeasesException());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
||||||
|
|
@ -404,7 +404,7 @@ class ShardConsumer {
|
||||||
* @return Return next task to run
|
* @return Return next task to run
|
||||||
*/
|
*/
|
||||||
private ITask getNextTask() {
|
private ITask getNextTask() {
|
||||||
ITask nextTask = currentState.createTask(this); // change here (add property into consumer)
|
ITask nextTask = currentState.createTask(this);
|
||||||
|
|
||||||
if (nextTask == null) {
|
if (nextTask == null) {
|
||||||
return null;
|
return null;
|
||||||
|
|
|
||||||
|
|
@ -54,7 +54,7 @@ class ShardSyncTask implements ITask {
|
||||||
boolean cleanupLeasesUponShardCompletion,
|
boolean cleanupLeasesUponShardCompletion,
|
||||||
boolean ignoreUnexpectedChildShards,
|
boolean ignoreUnexpectedChildShards,
|
||||||
long shardSyncTaskIdleTimeMillis,
|
long shardSyncTaskIdleTimeMillis,
|
||||||
boolean suppressMissingIncompleteLeasesException) { // change here
|
boolean suppressMissingIncompleteLeasesException) {
|
||||||
this.kinesisProxy = kinesisProxy;
|
this.kinesisProxy = kinesisProxy;
|
||||||
this.leaseManager = leaseManager;
|
this.leaseManager = leaseManager;
|
||||||
this.initialPosition = initialPositionInStream;
|
this.initialPosition = initialPositionInStream;
|
||||||
|
|
@ -77,7 +77,7 @@ class ShardSyncTask implements ITask {
|
||||||
initialPosition,
|
initialPosition,
|
||||||
cleanupLeasesUponShardCompletion,
|
cleanupLeasesUponShardCompletion,
|
||||||
ignoreUnexpectedChildShards,
|
ignoreUnexpectedChildShards,
|
||||||
suppressMissingIncompleteLeasesException); // change here
|
suppressMissingIncompleteLeasesException);
|
||||||
if (shardSyncTaskIdleTimeMillis > 0) {
|
if (shardSyncTaskIdleTimeMillis > 0) {
|
||||||
Thread.sleep(shardSyncTaskIdleTimeMillis);
|
Thread.sleep(shardSyncTaskIdleTimeMillis);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -109,7 +109,7 @@ class ShardSyncTaskManager {
|
||||||
cleanupLeasesUponShardCompletion,
|
cleanupLeasesUponShardCompletion,
|
||||||
ignoreUnexpectedChildShards,
|
ignoreUnexpectedChildShards,
|
||||||
shardSyncIdleTimeMillis,
|
shardSyncIdleTimeMillis,
|
||||||
suppressMissingIncompleteLeasesException), metricsFactory); // change here
|
suppressMissingIncompleteLeasesException), metricsFactory);
|
||||||
future = executorService.submit(currentTask);
|
future = executorService.submit(currentTask);
|
||||||
submittedNewTask = true;
|
submittedNewTask = true;
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
|
|
|
||||||
|
|
@ -88,7 +88,7 @@ class ShardSyncer {
|
||||||
boolean cleanupLeasesOfCompletedShards,
|
boolean cleanupLeasesOfCompletedShards,
|
||||||
boolean ignoreUnexpectedChildShards,
|
boolean ignoreUnexpectedChildShards,
|
||||||
boolean suppressMissingIncompleteLeasesException)
|
boolean suppressMissingIncompleteLeasesException)
|
||||||
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { // change here
|
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
|
||||||
syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, suppressMissingIncompleteLeasesException);
|
syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, suppressMissingIncompleteLeasesException);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -152,7 +152,7 @@ class ShardSyncer {
|
||||||
trackedLeases.addAll(currentLeases);
|
trackedLeases.addAll(currentLeases);
|
||||||
}
|
}
|
||||||
trackedLeases.addAll(newLeasesToCreate);
|
trackedLeases.addAll(newLeasesToCreate);
|
||||||
cleanupGarbageLeases(shards, trackedLeases, kinesisProxy, leaseManager, suppressMissingIncompleteLeasesException); // change here
|
cleanupGarbageLeases(shards, trackedLeases, kinesisProxy, leaseManager, suppressMissingIncompleteLeasesException);
|
||||||
if (cleanupLeasesOfCompletedShards) {
|
if (cleanupLeasesOfCompletedShards) {
|
||||||
cleanupLeasesOfFinishedShards(currentLeases,
|
cleanupLeasesOfFinishedShards(currentLeases,
|
||||||
shardIdToShardMap,
|
shardIdToShardMap,
|
||||||
|
|
@ -629,7 +629,7 @@ class ShardSyncer {
|
||||||
|
|
||||||
for (KinesisClientLease lease : garbageLeases) {
|
for (KinesisClientLease lease : garbageLeases) {
|
||||||
if (isCandidateForCleanup(lease, currentKinesisShardIds)) {
|
if (isCandidateForCleanup(lease, currentKinesisShardIds)) {
|
||||||
if (lease.isComplete() || suppressMissingIncompleteLeasesException) { // change here
|
if (lease.isComplete() || suppressMissingIncompleteLeasesException) {
|
||||||
LOG.info("Deleting lease for a complete shard " + lease.getLeaseKey()
|
LOG.info("Deleting lease for a complete shard " + lease.getLeaseKey()
|
||||||
+ " as it is not present in Kinesis stream.");
|
+ " as it is not present in Kinesis stream.");
|
||||||
leaseManager.deleteLease(lease);
|
leaseManager.deleteLease(lease);
|
||||||
|
|
|
||||||
|
|
@ -65,7 +65,7 @@ class ShutdownTask implements ITask {
|
||||||
ILeaseManager<KinesisClientLease> leaseManager,
|
ILeaseManager<KinesisClientLease> leaseManager,
|
||||||
long backoffTimeMillis,
|
long backoffTimeMillis,
|
||||||
GetRecordsCache getRecordsCache,
|
GetRecordsCache getRecordsCache,
|
||||||
boolean isSuppressMissingIncompleteLeasesException) { // change here
|
boolean isSuppressMissingIncompleteLeasesException) {
|
||||||
this.shardInfo = shardInfo;
|
this.shardInfo = shardInfo;
|
||||||
this.recordProcessor = recordProcessor;
|
this.recordProcessor = recordProcessor;
|
||||||
this.recordProcessorCheckpointer = recordProcessorCheckpointer;
|
this.recordProcessorCheckpointer = recordProcessorCheckpointer;
|
||||||
|
|
@ -135,7 +135,7 @@ class ShutdownTask implements ITask {
|
||||||
initialPositionInStream,
|
initialPositionInStream,
|
||||||
cleanupLeasesOfCompletedShards,
|
cleanupLeasesOfCompletedShards,
|
||||||
ignoreUnexpectedChildShards,
|
ignoreUnexpectedChildShards,
|
||||||
isSuppressMissingIncompleteLeasesException); // change here
|
isSuppressMissingIncompleteLeasesException);
|
||||||
LOG.debug("Finished checking for child shards of shard " + shardInfo.getShardId());
|
LOG.debug("Finished checking for child shards of shard " + shardInfo.getShardId());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -487,7 +487,7 @@ public class Worker implements Runnable {
|
||||||
this.metricsFactory = metricsFactory;
|
this.metricsFactory = metricsFactory;
|
||||||
this.controlServer = new ShardSyncTaskManager(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(),
|
this.controlServer = new ShardSyncTaskManager(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(),
|
||||||
initialPositionInStream, cleanupLeasesUponShardCompletion, config.shouldIgnoreUnexpectedChildShards(),
|
initialPositionInStream, cleanupLeasesUponShardCompletion, config.shouldIgnoreUnexpectedChildShards(),
|
||||||
shardSyncIdleTimeMillis, metricsFactory, executorService, suppressMissingIncompleteLeasesException); // change here
|
shardSyncIdleTimeMillis, metricsFactory, executorService, suppressMissingIncompleteLeasesException);
|
||||||
this.taskBackoffTimeMillis = taskBackoffTimeMillis;
|
this.taskBackoffTimeMillis = taskBackoffTimeMillis;
|
||||||
this.failoverTimeMillis = failoverTimeMillis;
|
this.failoverTimeMillis = failoverTimeMillis;
|
||||||
this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist;
|
this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist;
|
||||||
|
|
@ -640,7 +640,7 @@ public class Worker implements Runnable {
|
||||||
LOG.info("Syncing Kinesis shard info");
|
LOG.info("Syncing Kinesis shard info");
|
||||||
ShardSyncTask shardSyncTask = new ShardSyncTask(streamConfig.getStreamProxy(),
|
ShardSyncTask shardSyncTask = new ShardSyncTask(streamConfig.getStreamProxy(),
|
||||||
leaseCoordinator.getLeaseManager(), initialPosition, cleanupLeasesUponShardCompletion,
|
leaseCoordinator.getLeaseManager(), initialPosition, cleanupLeasesUponShardCompletion,
|
||||||
config.shouldIgnoreUnexpectedChildShards(), 0L, suppressMissingIncompleteLeasesException); // change here
|
config.shouldIgnoreUnexpectedChildShards(), 0L, suppressMissingIncompleteLeasesException);
|
||||||
result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call();
|
result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call();
|
||||||
} else {
|
} else {
|
||||||
LOG.info("Skipping shard sync per config setting (and lease table is not empty)");
|
LOG.info("Skipping shard sync per config setting (and lease table is not empty)");
|
||||||
|
|
@ -1007,7 +1007,7 @@ public class Worker implements Runnable {
|
||||||
retryGetRecordsInSeconds,
|
retryGetRecordsInSeconds,
|
||||||
maxGetRecordsThreadPool,
|
maxGetRecordsThreadPool,
|
||||||
config,
|
config,
|
||||||
suppressMissingIncompleteLeasesException); // change here
|
suppressMissingIncompleteLeasesException);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue