diff --git a/pom.xml b/pom.xml
index 325f82e7..6ed0afcf 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
amazon-kinesis-client
jar
Amazon Kinesis Client Library for Java
- 1.13.2
+ 1.13.3-SNAPSHOT
The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data
from Amazon Kinesis.
@@ -25,7 +25,7 @@
- 1.11.655
+ 1.11.728
1.0.392
libsqlite4java
${project.build.directory}/test-lib
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java
index da238e4b..76db0c6f 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java
@@ -117,7 +117,7 @@ import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, shardSyncIdleTimeMillis,
shardSyncer, latestShards), metricsFactory);
future = CompletableFuture.supplyAsync(() -> currentTask.call(), executorService)
- .whenComplete((taskResult, exception) -> handlePendingShardSyncs(latestShards, exception));
+ .whenComplete((taskResult, exception) -> handlePendingShardSyncs(exception, taskResult));
if (LOG.isDebugEnabled()) {
LOG.debug("Submitted new " + currentTask.getTaskType() + " task.");
}
@@ -132,9 +132,9 @@ import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
return submittedTaskFuture;
}
- private void handlePendingShardSyncs(List latestShards, Throwable exception) {
- if (exception != null) {
- LOG.error("Caught exception running " + currentTask.getTaskType() + " task: ", exception);
+ private void handlePendingShardSyncs(Throwable exception, TaskResult taskResult) {
+ if (exception != null || taskResult.getException() != null) {
+ LOG.error("Caught exception running " + currentTask.getTaskType() + " task: ", exception != null ? exception : taskResult.getException());
}
// Acquire lock here. If shardSyncRequestPending is false in this completionStage and
// syncShardAndLeaseInfo is invoked, before completion stage exits (future completes)
@@ -150,7 +150,7 @@ import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
// reset future to null, so next call creates a new one
// without trying to get results from the old future.
future = null;
- checkAndSubmitNextTask(latestShards);
+ checkAndSubmitNextTask(null);
}
} finally {
lock.unlock();