From 5760a279621591380c322d8c9f40841eb545bace Mon Sep 17 00:00:00 2001 From: Chunxue Yang Date: Tue, 25 Feb 2020 16:18:56 -0800 Subject: [PATCH] Stop passing latest shards to the next ShardSyncTask in ShardSyncTaskManager. --- pom.xml | 4 ++-- .../clientlibrary/lib/worker/ShardSyncTaskManager.java | 10 +++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/pom.xml b/pom.xml index 325f82e7..6ed0afcf 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ amazon-kinesis-client jar Amazon Kinesis Client Library for Java - 1.13.2 + 1.13.3-SNAPSHOT The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data from Amazon Kinesis. @@ -25,7 +25,7 @@ - 1.11.655 + 1.11.728 1.0.392 libsqlite4java ${project.build.directory}/test-lib diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java index da238e4b..76db0c6f 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java @@ -117,7 +117,7 @@ import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, shardSyncIdleTimeMillis, shardSyncer, latestShards), metricsFactory); future = CompletableFuture.supplyAsync(() -> currentTask.call(), executorService) - .whenComplete((taskResult, exception) -> handlePendingShardSyncs(latestShards, exception)); + .whenComplete((taskResult, exception) -> handlePendingShardSyncs(exception, taskResult)); if (LOG.isDebugEnabled()) { LOG.debug("Submitted new " + currentTask.getTaskType() + " task."); } @@ -132,9 +132,9 @@ import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; return submittedTaskFuture; } - private void handlePendingShardSyncs(List latestShards, Throwable exception) { - if (exception != null) { - LOG.error("Caught exception running " + currentTask.getTaskType() + " task: ", exception); + private void handlePendingShardSyncs(Throwable exception, TaskResult taskResult) { + if (exception != null || taskResult.getException() != null) { + LOG.error("Caught exception running " + currentTask.getTaskType() + " task: ", exception != null ? exception : taskResult.getException()); } // Acquire lock here. If shardSyncRequestPending is false in this completionStage and // syncShardAndLeaseInfo is invoked, before completion stage exits (future completes) @@ -150,7 +150,7 @@ import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; // reset future to null, so next call creates a new one // without trying to get results from the old future. future = null; - checkAndSubmitNextTask(latestShards); + checkAndSubmitNextTask(null); } } finally { lock.unlock();