Merge pull request #694 from ychunxue/v1.x

Stop passing latest shards to the next ShardSyncTask in ShardSyncTask…
This commit is contained in:
ychunxue 2020-03-02 10:18:24 -08:00 committed by GitHub
commit e9e64f8511
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 16 additions and 10 deletions

View file

@ -6,7 +6,7 @@
<artifactId>amazon-kinesis-client</artifactId> <artifactId>amazon-kinesis-client</artifactId>
<packaging>jar</packaging> <packaging>jar</packaging>
<name>Amazon Kinesis Client Library for Java</name> <name>Amazon Kinesis Client Library for Java</name>
<version>1.13.2</version> <version>1.13.3-SNAPSHOT</version>
<description>The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data <description>The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data
from Amazon Kinesis. from Amazon Kinesis.
</description> </description>
@ -25,7 +25,7 @@
</licenses> </licenses>
<properties> <properties>
<aws-java-sdk.version>1.11.655</aws-java-sdk.version> <aws-java-sdk.version>1.11.728</aws-java-sdk.version>
<sqlite4java.version>1.0.392</sqlite4java.version> <sqlite4java.version>1.0.392</sqlite4java.version>
<sqlite4java.native>libsqlite4java</sqlite4java.native> <sqlite4java.native>libsqlite4java</sqlite4java.native>
<sqlite4java.libpath>${project.build.directory}/test-lib</sqlite4java.libpath> <sqlite4java.libpath>${project.build.directory}/test-lib</sqlite4java.libpath>

View file

@ -117,7 +117,7 @@ import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, shardSyncIdleTimeMillis,
shardSyncer, latestShards), metricsFactory); shardSyncer, latestShards), metricsFactory);
future = CompletableFuture.supplyAsync(() -> currentTask.call(), executorService) future = CompletableFuture.supplyAsync(() -> currentTask.call(), executorService)
.whenComplete((taskResult, exception) -> handlePendingShardSyncs(latestShards, exception)); .whenComplete((taskResult, exception) -> handlePendingShardSyncs(exception, taskResult));
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Submitted new " + currentTask.getTaskType() + " task."); LOG.debug("Submitted new " + currentTask.getTaskType() + " task.");
} }
@ -132,9 +132,9 @@ import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
return submittedTaskFuture; return submittedTaskFuture;
} }
private void handlePendingShardSyncs(List<Shard> latestShards, Throwable exception) { private void handlePendingShardSyncs(Throwable exception, TaskResult taskResult) {
if (exception != null) { if (exception != null || taskResult.getException() != null) {
LOG.error("Caught exception running " + currentTask.getTaskType() + " task: ", exception); LOG.error("Caught exception running " + currentTask.getTaskType() + " task: ", exception != null ? exception : taskResult.getException());
} }
// Acquire lock here. If shardSyncRequestPending is false in this completionStage and // Acquire lock here. If shardSyncRequestPending is false in this completionStage and
// syncShardAndLeaseInfo is invoked, before completion stage exits (future completes) // syncShardAndLeaseInfo is invoked, before completion stage exits (future completes)
@ -150,7 +150,7 @@ import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
// reset future to null, so next call creates a new one // reset future to null, so next call creates a new one
// without trying to get results from the old future. // without trying to get results from the old future.
future = null; future = null;
checkAndSubmitNextTask(latestShards); checkAndSubmitNextTask(null);
} }
} finally { } finally {
lock.unlock(); lock.unlock();

View file

@ -23,6 +23,8 @@ import java.util.concurrent.Executors;
import static org.mockito.AdditionalAnswers.delegatesTo; import static org.mockito.AdditionalAnswers.delegatesTo;
import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.matches;
import static org.mockito.Matchers.notNull;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
@ -91,10 +93,14 @@ public class ShardSyncTaskManagerTest {
pausableNoOpShardSyncer.blockShardSyncLatch.countDown(); pausableNoOpShardSyncer.blockShardSyncLatch.countDown();
// Wait for ShardSyncer to be initialized. // Wait for ShardSyncer to be initialized.
pausableNoOpShardSyncer.waitForShardSyncerInitializationLatch.await(); pausableNoOpShardSyncer.waitForShardSyncerInitializationLatch.await();
// There should be 1 more shardSyncer invocation after the previous shardSync completes. // There should be totally 2 invocation of shardSyncer. The first one should be triggered with an empty list the latestShards.
verify(mockShardSyncer, times(2)) // The second invocation should be the pending shard sync task, which should have null as the latestShards.
verify(mockShardSyncer, times(1))
.checkAndCreateLeasesForNewShards(Matchers.any(), Matchers.any(), Matchers.any(), anyBoolean(), .checkAndCreateLeasesForNewShards(Matchers.any(), Matchers.any(), Matchers.any(), anyBoolean(),
anyBoolean(), Matchers.any()); anyBoolean(), Matchers.eq(new ArrayList<>()));
verify(mockShardSyncer, times(1))
.checkAndCreateLeasesForNewShards(Matchers.any(), Matchers.any(), Matchers.any(), anyBoolean(),
anyBoolean(), Matchers.eq(null));
} }
private static class PausableNoOpShardSyncer implements ShardSyncer { private static class PausableNoOpShardSyncer implements ShardSyncer {