Stop passing latest shards to the next ShardSyncTask in ShardSyncTaskManager.
This commit is contained in:
parent
de52856b45
commit
5760a27962
2 changed files with 7 additions and 7 deletions
4
pom.xml
4
pom.xml
|
|
@ -6,7 +6,7 @@
|
||||||
<artifactId>amazon-kinesis-client</artifactId>
|
<artifactId>amazon-kinesis-client</artifactId>
|
||||||
<packaging>jar</packaging>
|
<packaging>jar</packaging>
|
||||||
<name>Amazon Kinesis Client Library for Java</name>
|
<name>Amazon Kinesis Client Library for Java</name>
|
||||||
<version>1.13.2</version>
|
<version>1.13.3-SNAPSHOT</version>
|
||||||
<description>The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data
|
<description>The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data
|
||||||
from Amazon Kinesis.
|
from Amazon Kinesis.
|
||||||
</description>
|
</description>
|
||||||
|
|
@ -25,7 +25,7 @@
|
||||||
</licenses>
|
</licenses>
|
||||||
|
|
||||||
<properties>
|
<properties>
|
||||||
<aws-java-sdk.version>1.11.655</aws-java-sdk.version>
|
<aws-java-sdk.version>1.11.728</aws-java-sdk.version>
|
||||||
<sqlite4java.version>1.0.392</sqlite4java.version>
|
<sqlite4java.version>1.0.392</sqlite4java.version>
|
||||||
<sqlite4java.native>libsqlite4java</sqlite4java.native>
|
<sqlite4java.native>libsqlite4java</sqlite4java.native>
|
||||||
<sqlite4java.libpath>${project.build.directory}/test-lib</sqlite4java.libpath>
|
<sqlite4java.libpath>${project.build.directory}/test-lib</sqlite4java.libpath>
|
||||||
|
|
|
||||||
|
|
@ -117,7 +117,7 @@ import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
|
||||||
cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, shardSyncIdleTimeMillis,
|
cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, shardSyncIdleTimeMillis,
|
||||||
shardSyncer, latestShards), metricsFactory);
|
shardSyncer, latestShards), metricsFactory);
|
||||||
future = CompletableFuture.supplyAsync(() -> currentTask.call(), executorService)
|
future = CompletableFuture.supplyAsync(() -> currentTask.call(), executorService)
|
||||||
.whenComplete((taskResult, exception) -> handlePendingShardSyncs(latestShards, exception));
|
.whenComplete((taskResult, exception) -> handlePendingShardSyncs(exception, taskResult));
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Submitted new " + currentTask.getTaskType() + " task.");
|
LOG.debug("Submitted new " + currentTask.getTaskType() + " task.");
|
||||||
}
|
}
|
||||||
|
|
@ -132,9 +132,9 @@ import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
|
||||||
return submittedTaskFuture;
|
return submittedTaskFuture;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void handlePendingShardSyncs(List<Shard> latestShards, Throwable exception) {
|
private void handlePendingShardSyncs(Throwable exception, TaskResult taskResult) {
|
||||||
if (exception != null) {
|
if (exception != null || taskResult.getException() != null) {
|
||||||
LOG.error("Caught exception running " + currentTask.getTaskType() + " task: ", exception);
|
LOG.error("Caught exception running " + currentTask.getTaskType() + " task: ", exception != null ? exception : taskResult.getException());
|
||||||
}
|
}
|
||||||
// Acquire lock here. If shardSyncRequestPending is false in this completionStage and
|
// Acquire lock here. If shardSyncRequestPending is false in this completionStage and
|
||||||
// syncShardAndLeaseInfo is invoked, before completion stage exits (future completes)
|
// syncShardAndLeaseInfo is invoked, before completion stage exits (future completes)
|
||||||
|
|
@ -150,7 +150,7 @@ import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
|
||||||
// reset future to null, so next call creates a new one
|
// reset future to null, so next call creates a new one
|
||||||
// without trying to get results from the old future.
|
// without trying to get results from the old future.
|
||||||
future = null;
|
future = null;
|
||||||
checkAndSubmitNextTask(latestShards);
|
checkAndSubmitNextTask(null);
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue