From 6fa9bab66e33e8f7fa165f1908879d8474df2bef Mon Sep 17 00:00:00 2001
From: Micah Jaffe <31011877+micah-jaffe@users.noreply.github.com>
Date: Fri, 4 Oct 2019 13:53:14 -0700
Subject: [PATCH 1/2] Update Sonatype to use dedicated AWS endpoint (#619)
---
pom.xml | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git a/pom.xml b/pom.xml
index 04bcaf7c..b847f18a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -63,11 +63,11 @@
ossrh
- https://oss.sonatype.org/content/repositories/snapshots
+ https://aws.oss.sonatype.org/content/repositories/snapshots
ossrh
- https://oss.sonatype.org/service/local/staging/deploy/maven2/
+ https://aws.oss.sonatype.org/service/local/staging/deploy/maven2/
@@ -114,7 +114,7 @@
true
sonatype-nexus-staging
- https://oss.sonatype.org
+ https://aws.oss.sonatype.org
false
From 1f686488c7f1095304dcc7092ef72c3b0a946059 Mon Sep 17 00:00:00 2001
From: ychunxue <52723802+ychunxue@users.noreply.github.com>
Date: Wed, 23 Oct 2019 09:33:07 -0700
Subject: [PATCH 2/2] Shard end v2 (#624)
* Revalidate if current shard is closed before shutting down the ShardConsumer
* Renaming Method
* Force Lease to be lost before shutting down with Zombi state
* Adding comments for ShardEnd related unit tests
---
.../amazon/kinesis/coordinator/Scheduler.java | 2 +-
.../leases/HierarchicalShardSyncer.java | 27 ++-
.../kinesis/lifecycle/ConsumerStates.java | 4 +-
.../lifecycle/ShardConsumerArgument.java | 3 +-
.../kinesis/lifecycle/ShutdownTask.java | 66 +++++++-
.../leases/HierarchicalShardSyncerTest.java | 77 ++++++++-
.../kinesis/lifecycle/ConsumerStatesTest.java | 10 +-
.../kinesis/lifecycle/ShutdownTaskTest.java | 157 ++++++++++++++++--
8 files changed, 312 insertions(+), 34 deletions(-)
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java
index 0f26c3d6..fd8dfcb1 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java
@@ -588,7 +588,7 @@ public class Scheduler implements Runnable {
checkpoint);
ShardConsumerArgument argument = new ShardConsumerArgument(shardInfo,
streamName,
- leaseRefresher,
+ leaseCoordinator,
executorService,
cache,
shardRecordProcessorFactory.shardRecordProcessor(),
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java
index 5998ea62..578af465 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java
@@ -78,10 +78,21 @@ public class HierarchicalShardSyncer {
final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards,
final MetricsScope scope) throws DependencyException, InvalidStateException,
ProvisionedThroughputException, KinesisClientLibIOException {
- final List shards = getShardList(shardDetector);
- log.debug("Num shards: {}", shards.size());
+ final List latestShards = getShardList(shardDetector);
+ checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, cleanupLeasesOfCompletedShards,
+ ignoreUnexpectedChildShards, scope, latestShards);
+ }
- final Map shardIdToShardMap = constructShardIdToShardMap(shards);
+ //Provide a pre-collcted list of shards to avoid calling ListShards API
+ public synchronized void checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector,
+ final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition, final boolean cleanupLeasesOfCompletedShards,
+ final boolean ignoreUnexpectedChildShards, final MetricsScope scope, List latestShards)throws DependencyException, InvalidStateException,
+ ProvisionedThroughputException, KinesisClientLibIOException {
+ if (!CollectionUtils.isNullOrEmpty(latestShards)) {
+ log.debug("Num shards: {}", latestShards.size());
+ }
+
+ final Map shardIdToShardMap = constructShardIdToShardMap(latestShards);
final Map> shardIdToChildShardIdsMap = constructShardIdToChildShardIdsMap(
shardIdToShardMap);
final Set inconsistentShardIds = findInconsistentShardIds(shardIdToChildShardIdsMap, shardIdToShardMap);
@@ -91,8 +102,7 @@ public class HierarchicalShardSyncer {
final List currentLeases = leaseRefresher.listLeases();
- final List newLeasesToCreate = determineNewLeasesToCreate(shards, currentLeases, initialPosition,
- inconsistentShardIds);
+ final List newLeasesToCreate = determineNewLeasesToCreate(latestShards, currentLeases, initialPosition, inconsistentShardIds);
log.debug("Num new leases to create: {}", newLeasesToCreate.size());
for (Lease lease : newLeasesToCreate) {
long startTime = System.currentTimeMillis();
@@ -104,14 +114,13 @@ public class HierarchicalShardSyncer {
MetricsUtil.addSuccessAndLatency(scope, "CreateLease", success, startTime, MetricsLevel.DETAILED);
}
}
-
final List trackedLeases = new ArrayList<>(currentLeases);
trackedLeases.addAll(newLeasesToCreate);
- cleanupGarbageLeases(shardDetector, shards, trackedLeases, leaseRefresher);
+ cleanupGarbageLeases(shardDetector, latestShards, trackedLeases, leaseRefresher);
if (cleanupLeasesOfCompletedShards) {
- cleanupLeasesOfFinishedShards(currentLeases, shardIdToShardMap, shardIdToChildShardIdsMap, trackedLeases,
- leaseRefresher);
+ cleanupLeasesOfFinishedShards(currentLeases, shardIdToShardMap, shardIdToChildShardIdsMap, trackedLeases, leaseRefresher);
}
+
}
// CHECKSTYLE:ON CyclomaticComplexity
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java
index df7ea6aa..bb1788b2 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java
@@ -135,7 +135,7 @@ class ConsumerStates {
@Override
public ConsumerTask createTask(ShardConsumerArgument consumerArgument, ShardConsumer consumer, ProcessRecordsInput input) {
return new BlockOnParentShardTask(consumerArgument.shardInfo(),
- consumerArgument.leaseRefresher(),
+ consumerArgument.leaseCoordinator().leaseRefresher(),
consumerArgument.parentShardPollIntervalMillis());
}
@@ -492,7 +492,7 @@ class ConsumerStates {
argument.initialPositionInStream(),
argument.cleanupLeasesOfCompletedShards(),
argument.ignoreUnexpectedChildShards(),
- argument.leaseRefresher(),
+ argument.leaseCoordinator(),
argument.taskBackoffTimeMillis(),
argument.recordsPublisher(),
argument.hierarchicalShardSyncer(),
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java
index fcc49f63..4f1db733 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java
@@ -21,6 +21,7 @@ import lombok.experimental.Accessors;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
+import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardInfo;
@@ -42,7 +43,7 @@ public class ShardConsumerArgument {
@NonNull
private final String streamName;
@NonNull
- private final LeaseRefresher leaseRefresher;
+ private final LeaseCoordinator leaseCoordinator;
@NonNull
private final ExecutorService executorService;
@NonNull
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java
index edb69649..2bfcd358 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java
@@ -16,12 +16,18 @@ package software.amazon.kinesis.lifecycle;
import com.google.common.annotations.VisibleForTesting;
+import com.sun.org.apache.bcel.internal.generic.LUSHR;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import software.amazon.awssdk.services.kinesis.model.Shard;
+import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
+import software.amazon.kinesis.leases.Lease;
+import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardInfo;
@@ -36,6 +42,10 @@ import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
/**
* Task for invoking the ShardRecordProcessor shutdown() callback.
*/
@@ -61,7 +71,7 @@ public class ShutdownTask implements ConsumerTask {
private final boolean cleanupLeasesOfCompletedShards;
private final boolean ignoreUnexpectedChildShards;
@NonNull
- private final LeaseRefresher leaseRefresher;
+ private final LeaseCoordinator leaseCoordinator;
private final long backoffTimeMillis;
@NonNull
private final RecordsPublisher recordsPublisher;
@@ -88,20 +98,38 @@ public class ShutdownTask implements ConsumerTask {
try {
try {
+ ShutdownReason localReason = reason;
+ List latestShards = null;
+ /*
+ * Revalidate if the current shard is closed before shutting down the shard consumer with reason SHARD_END
+ * If current shard is not closed, shut down the shard consumer with reason LEASE_LOST that allows
+ * active workers to contend for the lease of this shard.
+ */
+ if (localReason == ShutdownReason.SHARD_END) {
+ latestShards = shardDetector.listShards();
+
+ //If latestShards is empty, should also shutdown the ShardConsumer without checkpoint with SHARD_END
+ if (CollectionUtils.isNullOrEmpty(latestShards) || !isShardInContextParentOfAny(latestShards)) {
+ localReason = ShutdownReason.LEASE_LOST;
+ dropLease();
+ log.info("Forcing the lease to be lost before shutting down the consumer for Shard: " + shardInfo.shardId());
+ }
+ }
+
// If we reached end of the shard, set sequence number to SHARD_END.
- if (reason == ShutdownReason.SHARD_END) {
+ if (localReason == ShutdownReason.SHARD_END) {
recordProcessorCheckpointer
.sequenceNumberAtShardEnd(recordProcessorCheckpointer.largestPermittedCheckpointValue());
recordProcessorCheckpointer.largestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END);
}
log.debug("Invoking shutdown() for shard {}, concurrencyToken {}. Shutdown reason: {}",
- shardInfo.shardId(), shardInfo.concurrencyToken(), reason);
- final ShutdownInput shutdownInput = ShutdownInput.builder().shutdownReason(reason)
+ shardInfo.shardId(), shardInfo.concurrencyToken(), localReason);
+ final ShutdownInput shutdownInput = ShutdownInput.builder().shutdownReason(localReason)
.checkpointer(recordProcessorCheckpointer).build();
final long startTime = System.currentTimeMillis();
try {
- if (reason == ShutdownReason.SHARD_END) {
+ if (localReason == ShutdownReason.SHARD_END) {
shardRecordProcessor.shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build());
ExtendedSequenceNumber lastCheckpointValue = recordProcessorCheckpointer.lastCheckpointValue();
if (lastCheckpointValue == null
@@ -123,11 +151,11 @@ public class ShutdownTask implements ConsumerTask {
MetricsUtil.addLatency(scope, RECORD_PROCESSOR_SHUTDOWN_METRIC, startTime, MetricsLevel.SUMMARY);
}
- if (reason == ShutdownReason.SHARD_END) {
+ if (localReason == ShutdownReason.SHARD_END) {
log.debug("Looking for child shards of shard {}", shardInfo.shardId());
// create leases for the child shards
- hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher,
- initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope);
+ hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseCoordinator.leaseRefresher(),
+ initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, latestShards);
log.debug("Finished checking for child shards of shard {}", shardInfo.shardId());
}
@@ -169,4 +197,26 @@ public class ShutdownTask implements ConsumerTask {
return reason;
}
+ private boolean isShardInContextParentOfAny(List shards) {
+ for(Shard shard : shards) {
+ if (isChildShardOfShardInContext(shard)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private boolean isChildShardOfShardInContext(Shard shard) {
+ return (StringUtils.equals(shard.parentShardId(), shardInfo.shardId())
+ || StringUtils.equals(shard.adjacentParentShardId(), shardInfo.shardId()));
+ }
+
+ private void dropLease() {
+ Lease currentLease = leaseCoordinator.getCurrentlyHeldLease(shardInfo.shardId());
+ leaseCoordinator.dropLease(currentLease);
+ if(currentLease != null) {
+ log.warn("Dropped lease for shutting down ShardConsumer: " + currentLease.leaseKey());
+ }
+ }
+
}
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java
index 85230c0d..23d2e423 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java
@@ -171,6 +171,9 @@ public class HierarchicalShardSyncerTest {
testCheckAndCreateLeasesForShardsIfMissing(INITIAL_POSITION_LATEST);
}
+ /**
+ * Test checkAndCreateLeaseForNewShards while not providing a pre-fetched list of shards
+ */
@Test
public void testCheckAndCreateLeasesForShardsIfMissingAtLatest() throws Exception {
final List shards = constructShardListForGraphA();
@@ -205,6 +208,74 @@ public class HierarchicalShardSyncerTest {
}
+ /**
+ * Test checkAndCreateLeaseForNewShards with a pre-fetched list of shards. In this scenario, shardDetector.listShards()
+ * should never be called.
+ */
+ @Test
+ public void testCheckAndCreateLeasesForShardsWithShardList() throws Exception {
+ final List latestShards = constructShardListForGraphA();
+
+ final ArgumentCaptor leaseCaptor = ArgumentCaptor.forClass(Lease.class);
+ when(shardDetector.listShards()).thenReturn(latestShards);
+ when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList());
+ when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true);
+
+ hierarchicalShardSyncer
+ .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
+ cleanupLeasesOfCompletedShards, false, SCOPE, latestShards);
+
+ final Set expectedShardIds = new HashSet<>(
+ Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10"));
+
+ final List requestLeases = leaseCaptor.getAllValues();
+ final Set requestLeaseKeys = requestLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
+ final Set extendedSequenceNumbers = requestLeases.stream().map(Lease::checkpoint)
+ .collect(Collectors.toSet());
+
+ assertThat(requestLeases.size(), equalTo(expectedShardIds.size()));
+ assertThat(requestLeaseKeys, equalTo(expectedShardIds));
+ assertThat(extendedSequenceNumbers.size(), equalTo(1));
+
+ extendedSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST)));
+
+ verify(shardDetector, never()).listShards();
+ verify(dynamoDBLeaseRefresher, times(expectedShardIds.size())).createLeaseIfNotExists(any(Lease.class));
+ verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
+ }
+
+ /**
+ * Test checkAndCreateLeaseForNewShards with an empty list of shards. In this scenario, shardDetector.listShards()
+ * should never be called.
+ */
+ @Test
+ public void testCheckAndCreateLeasesForShardsWithEmptyShardList() throws Exception {
+ final List shards = constructShardListForGraphA();
+
+ final ArgumentCaptor leaseCaptor = ArgumentCaptor.forClass(Lease.class);
+ when(shardDetector.listShards()).thenReturn(shards);
+ when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList());
+ when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true);
+
+ hierarchicalShardSyncer
+ .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
+ cleanupLeasesOfCompletedShards, false, SCOPE, new ArrayList());
+
+ final Set expectedShardIds = new HashSet<>();
+
+ final List requestLeases = leaseCaptor.getAllValues();
+ final Set requestLeaseKeys = requestLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
+ final Set extendedSequenceNumbers = requestLeases.stream().map(Lease::checkpoint)
+ .collect(Collectors.toSet());
+
+ assertThat(requestLeases.size(), equalTo(expectedShardIds.size()));
+ assertThat(extendedSequenceNumbers.size(), equalTo(0));
+
+ verify(shardDetector, never()).listShards();
+ verify(dynamoDBLeaseRefresher, never()).createLeaseIfNotExists(any(Lease.class));
+ verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
+ }
+
@Test
public void testCheckAndCreateLeasesForNewShardsAtTrimHorizon() throws Exception {
testCheckAndCreateLeaseForShardsIfMissing(constructShardListForGraphA(), INITIAL_POSITION_TRIM_HORIZON);
@@ -1035,7 +1106,11 @@ public class HierarchicalShardSyncerTest {
/*
* Helper method to construct a shard list for graph A. Graph A is defined below. Shard structure (y-axis is
- * epochs): 0 1 2 3 4 5- shards till epoch 102 \ / \ / | | 6 7 4 5- shards from epoch 103 - 205 \ / | /\ 8 4 9 10 -
+ * epochs): 0 1 2 3 4 5- shards till
+ * \ / \ / | |
+ * 6 7 4 5- shards from epoch 103 - 205
+ * \ / | /\
+ * 8 4 9 10 -
* shards from epoch 206 (open - no ending sequenceNumber)
*/
private List constructShardListForGraphA() {
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java
index f9287701..16f5e9a4 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java
@@ -43,6 +43,7 @@ import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
+import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardInfo;
@@ -55,6 +56,8 @@ import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.retrieval.AggregatorUtil;
import software.amazon.kinesis.retrieval.RecordsPublisher;
+import javax.swing.*;
+
@RunWith(MockitoJUnitRunner.class)
public class ConsumerStatesTest {
private static final String STREAM_NAME = "TestStream";
@@ -73,6 +76,8 @@ public class ConsumerStatesTest {
@Mock
private ShardInfo shardInfo;
@Mock
+ private LeaseCoordinator leaseCoordinator;
+ @Mock
private LeaseRefresher leaseRefresher;
@Mock
private Checkpointer checkpointer;
@@ -109,7 +114,7 @@ public class ConsumerStatesTest {
@Before
public void setup() {
- argument = new ShardConsumerArgument(shardInfo, STREAM_NAME, leaseRefresher, executorService, recordsPublisher,
+ argument = new ShardConsumerArgument(shardInfo, STREAM_NAME, leaseCoordinator, executorService, recordsPublisher,
shardRecordProcessor, checkpointer, recordProcessorCheckpointer, parentShardPollIntervalMillis,
taskBackoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, listShardsBackoffTimeInMillis,
maxListShardsRetryAttempts, shouldCallProcessRecordsEvenForEmptyRecordList, idleTimeInMillis,
@@ -127,6 +132,7 @@ public class ConsumerStatesTest {
@Test
public void blockOnParentStateTest() {
ConsumerState state = ShardConsumerState.WAITING_ON_PARENT_SHARDS.consumerState();
+ when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher);
ConsumerTask task = state.createTask(argument, consumer, null);
@@ -309,7 +315,7 @@ public class ConsumerStatesTest {
assertThat(task, shutdownTask(ShardRecordProcessorCheckpointer.class, "recordProcessorCheckpointer",
equalTo(recordProcessorCheckpointer)));
assertThat(task, shutdownTask(ShutdownReason.class, "reason", equalTo(reason)));
- assertThat(task, shutdownTask(LEASE_REFRESHER_CLASS, "leaseRefresher", equalTo(leaseRefresher)));
+ assertThat(task, shutdownTask(LeaseCoordinator.class, "leaseCoordinator", equalTo(leaseCoordinator)));
assertThat(task, shutdownTask(InitialPositionInStreamExtended.class, "initialPositionInStream",
equalTo(initialPositionInStream)));
assertThat(task,
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java
index 50f905f0..6af62edb 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java
@@ -16,13 +16,18 @@ package software.amazon.kinesis.lifecycle;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.List;
import org.junit.Before;
import org.junit.Test;
@@ -30,21 +35,27 @@ import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
+import software.amazon.awssdk.services.kinesis.model.SequenceNumberRange;
+import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
+import software.amazon.kinesis.leases.Lease;
+import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardInfo;
+import software.amazon.kinesis.leases.ShardObjectHelper;
+import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
+import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.NullMetricsFactory;
import software.amazon.kinesis.processor.Checkpointer;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
-import software.amazon.kinesis.utils.TestStreamlet;
/**
*
@@ -54,14 +65,14 @@ public class ShutdownTaskTest {
private static final long TASK_BACKOFF_TIME_MILLIS = 1L;
private static final InitialPositionInStreamExtended INITIAL_POSITION_TRIM_HORIZON =
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON);
- private static final ShutdownReason TERMINATE_SHUTDOWN_REASON = ShutdownReason.SHARD_END;
+ private static final ShutdownReason SHARD_END_SHUTDOWN_REASON = ShutdownReason.SHARD_END;
+ private static final ShutdownReason LEASE_LOST_SHUTDOWN_REASON = ShutdownReason.LEASE_LOST;
private static final MetricsFactory NULL_METRICS_FACTORY = new NullMetricsFactory();
private final String concurrencyToken = "testToken4398";
- private final String shardId = "shardId-0000397840";
+ private final String shardId = "shardId-0";
private boolean cleanupLeasesOfCompletedShards = false;
private boolean ignoreUnexpectedChildShards = false;
- private ShardRecordProcessor shardRecordProcessor;
private ShardInfo shardInfo;
private ShutdownTask task;
@@ -74,9 +85,13 @@ public class ShutdownTaskTest {
@Mock
private LeaseRefresher leaseRefresher;
@Mock
+ private LeaseCoordinator leaseCoordinator;
+ @Mock
private ShardDetector shardDetector;
@Mock
private HierarchicalShardSyncer hierarchicalShardSyncer;
+ @Mock
+ private ShardRecordProcessor shardRecordProcessor;
@Before
public void setUp() throws Exception {
@@ -85,20 +100,22 @@ public class ShutdownTaskTest {
shardInfo = new ShardInfo(shardId, concurrencyToken, Collections.emptySet(),
ExtendedSequenceNumber.LATEST);
- shardRecordProcessor = new TestStreamlet();
task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer,
- TERMINATE_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards,
- ignoreUnexpectedChildShards, leaseRefresher, TASK_BACKOFF_TIME_MILLIS, recordsPublisher,
+ SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards,
+ ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher,
hierarchicalShardSyncer, NULL_METRICS_FACTORY);
}
/**
* Test method for {@link ShutdownTask#call()}.
+ * This test is for the scenario that customer doesn't implement checkpoint in their implementation
*/
@Test
public final void testCallWhenApplicationDoesNotCheckpoint() {
+ when(shardDetector.listShards()).thenReturn(constructShardListGraphA());
when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(new ExtendedSequenceNumber("3298"));
+
final TaskResult result = task.call();
assertNotNull(result.getException());
assertTrue(result.getException() instanceof IllegalArgumentException);
@@ -106,22 +123,101 @@ public class ShutdownTaskTest {
/**
* Test method for {@link ShutdownTask#call()}.
+ * This test is for the scenario that checkAndCreateLeaseForNewShards throws an exception.
*/
@Test
public final void testCallWhenSyncingShardsThrows() throws Exception {
+ List latestShards = constructShardListGraphA();
+ when(shardDetector.listShards()).thenReturn(latestShards);
when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END);
- when(shardDetector.listShards()).thenReturn(null);
+ when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher);
+
doAnswer((invocation) -> {
throw new KinesisClientLibIOException("KinesisClientLibIOException");
}).when(hierarchicalShardSyncer)
.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, INITIAL_POSITION_TRIM_HORIZON,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards,
- NULL_METRICS_FACTORY.createMetrics());
+ NULL_METRICS_FACTORY.createMetrics(), latestShards);
- TaskResult result = task.call();
+ final TaskResult result = task.call();
assertNotNull(result.getException());
assertTrue(result.getException() instanceof KinesisClientLibIOException);
verify(recordsPublisher).shutdown();
+ verify(shardRecordProcessor).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build());
+ verify(shardRecordProcessor, never()).leaseLost(LeaseLostInput.builder().build());
+ }
+
+ /**
+ * Test method for {@link ShutdownTask#call()}.
+ * This test is for the scenario that ShutdownTask is created for ShardConsumer reaching the Shard End.
+ */
+ @Test
+ public final void testCallWhenTrueShardEnd() {
+ shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(),
+ ExtendedSequenceNumber.LATEST);
+ task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer,
+ SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards,
+ ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher,
+ hierarchicalShardSyncer, NULL_METRICS_FACTORY);
+
+ when(shardDetector.listShards()).thenReturn(constructShardListGraphA());
+ when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END);
+
+ final TaskResult result = task.call();
+ assertNull(result.getException());
+ verify(recordsPublisher).shutdown();
+ verify(shardRecordProcessor).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build());
+ verify(shardRecordProcessor, never()).leaseLost(LeaseLostInput.builder().build());
+ verify(shardDetector, times(1)).listShards();
+ verify(leaseCoordinator, never()).getAssignments();
+ }
+
+ /**
+ * Test method for {@link ShutdownTask#call()}.
+ * This test is for the scenario that a ShutdownTask is created for detecting a false Shard End.
+ */
+ @Test
+ public final void testCallWhenFalseShardEnd() {
+ shardInfo = new ShardInfo("shardId-4", concurrencyToken, Collections.emptySet(),
+ ExtendedSequenceNumber.LATEST);
+ task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer,
+ SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards,
+ ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher,
+ hierarchicalShardSyncer, NULL_METRICS_FACTORY);
+
+ when(shardDetector.listShards()).thenReturn(constructShardListGraphA());
+
+ final TaskResult result = task.call();
+ assertNull(result.getException());
+ verify(recordsPublisher).shutdown();
+ verify(shardRecordProcessor, never()).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build());
+ verify(shardRecordProcessor).leaseLost(LeaseLostInput.builder().build());
+ verify(shardDetector, times(1)).listShards();
+ verify(leaseCoordinator).getCurrentlyHeldLease(shardInfo.shardId());
+ }
+
+ /**
+ * Test method for {@link ShutdownTask#call()}.
+ * This test is for the scenario that a ShutdownTask is created for the ShardConsumer losing the lease.
+ */
+ @Test
+ public final void testCallWhenLeaseLost() {
+ shardInfo = new ShardInfo("shardId-4", concurrencyToken, Collections.emptySet(),
+ ExtendedSequenceNumber.LATEST);
+ task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer,
+ LEASE_LOST_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards,
+ ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher,
+ hierarchicalShardSyncer, NULL_METRICS_FACTORY);
+
+ when(shardDetector.listShards()).thenReturn(constructShardListGraphA());
+
+ final TaskResult result = task.call();
+ assertNull(result.getException());
+ verify(recordsPublisher).shutdown();
+ verify(shardRecordProcessor, never()).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build());
+ verify(shardRecordProcessor).leaseLost(LeaseLostInput.builder().build());
+ verify(shardDetector, never()).listShards();
+ verify(leaseCoordinator, never()).getAssignments();
}
/**
@@ -132,4 +228,45 @@ public class ShutdownTaskTest {
assertEquals(TaskType.SHUTDOWN, task.taskType());
}
+
+ /*
+ * Helper method to construct a shard list for graph A. Graph A is defined below. Shard structure (y-axis is
+ * epochs): 0 1 2 3 4 5 - shards till
+ * \ / \ / | |
+ * 6 7 4 5 - shards from epoch 103 - 205
+ * \ / | /\
+ * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
+ */
+ private List constructShardListGraphA() {
+ final SequenceNumberRange range0 = ShardObjectHelper.newSequenceNumberRange("11", "102");
+ final SequenceNumberRange range1 = ShardObjectHelper.newSequenceNumberRange("11", null);
+ final SequenceNumberRange range2 = ShardObjectHelper.newSequenceNumberRange("11", "205");
+ final SequenceNumberRange range3 = ShardObjectHelper.newSequenceNumberRange("103", "205");
+ final SequenceNumberRange range4 = ShardObjectHelper.newSequenceNumberRange("206", null);
+
+ return Arrays.asList(
+ ShardObjectHelper.newShard("shardId-0", null, null, range0,
+ ShardObjectHelper.newHashKeyRange("0", "99")),
+ ShardObjectHelper.newShard("shardId-1", null, null, range0,
+ ShardObjectHelper.newHashKeyRange("100", "199")),
+ ShardObjectHelper.newShard("shardId-2", null, null, range0,
+ ShardObjectHelper.newHashKeyRange("200", "299")),
+ ShardObjectHelper.newShard("shardId-3", null, null, range0,
+ ShardObjectHelper.newHashKeyRange("300", "399")),
+ ShardObjectHelper.newShard("shardId-4", null, null, range1,
+ ShardObjectHelper.newHashKeyRange("400", "499")),
+ ShardObjectHelper.newShard("shardId-5", null, null, range2,
+ ShardObjectHelper.newHashKeyRange("500", ShardObjectHelper.MAX_HASH_KEY)),
+ ShardObjectHelper.newShard("shardId-6", "shardId-0", "shardId-1", range3,
+ ShardObjectHelper.newHashKeyRange("0", "199")),
+ ShardObjectHelper.newShard("shardId-7", "shardId-2", "shardId-3", range3,
+ ShardObjectHelper.newHashKeyRange("200", "399")),
+ ShardObjectHelper.newShard("shardId-8", "shardId-6", "shardId-7", range4,
+ ShardObjectHelper.newHashKeyRange("0", "399")),
+ ShardObjectHelper.newShard("shardId-9", "shardId-5", null, range4,
+ ShardObjectHelper.newHashKeyRange("500", "799")),
+ ShardObjectHelper.newShard("shardId-10", null, "shardId-5", range4,
+ ShardObjectHelper.newHashKeyRange("800", ShardObjectHelper.MAX_HASH_KEY)));
+ }
+
}