diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java index 5998ea62..6aaaff60 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java @@ -79,6 +79,18 @@ public class HierarchicalShardSyncer { final MetricsScope scope) throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { final List shards = getShardList(shardDetector); + checkAndCreateLeaseForNewShards(shards, shardDetector, leaseRefresher, initialPosition, cleanupLeasesOfCompletedShards, + ignoreUnexpectedChildShards, scope); + } + + //Provide a pre-collcted list of shards to avoid calling ListShards API + public synchronized void checkAndCreateLeaseForNewShards(List shards, @NonNull final ShardDetector shardDetector, + final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition, final boolean cleanupLeasesOfCompletedShards, + final boolean ignoreUnexpectedChildShards, final MetricsScope scope)throws DependencyException, InvalidStateException, + ProvisionedThroughputException, KinesisClientLibIOException { + if(CollectionUtils.isNullOrEmpty(shards)) { + shards = getShardList(shardDetector); + } log.debug("Num shards: {}", shards.size()); final Map shardIdToShardMap = constructShardIdToShardMap(shards); @@ -92,7 +104,7 @@ public class HierarchicalShardSyncer { final List currentLeases = leaseRefresher.listLeases(); final List newLeasesToCreate = determineNewLeasesToCreate(shards, currentLeases, initialPosition, - inconsistentShardIds); + inconsistentShardIds); log.debug("Num new leases to create: {}", newLeasesToCreate.size()); for (Lease lease : newLeasesToCreate) { long startTime = System.currentTimeMillis(); @@ -104,14 +116,15 @@ public class HierarchicalShardSyncer { MetricsUtil.addSuccessAndLatency(scope, "CreateLease", success, startTime, MetricsLevel.DETAILED); } } - + final List trackedLeases = new ArrayList<>(currentLeases); trackedLeases.addAll(newLeasesToCreate); cleanupGarbageLeases(shardDetector, shards, trackedLeases, leaseRefresher); if (cleanupLeasesOfCompletedShards) { cleanupLeasesOfFinishedShards(currentLeases, shardIdToShardMap, shardIdToChildShardIdsMap, trackedLeases, - leaseRefresher); + leaseRefresher); } + } // CHECKSTYLE:ON CyclomaticComplexity diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index edb69649..a5cbdf64 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -16,9 +16,11 @@ package software.amazon.kinesis.lifecycle; import com.google.common.annotations.VisibleForTesting; +import com.sun.org.apache.bcel.internal.generic.LUSHR; import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; import software.amazon.kinesis.common.InitialPositionInStreamExtended; @@ -36,6 +38,9 @@ import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; +import java.util.ArrayList; +import java.util.List; + /** * Task for invoking the ShardRecordProcessor shutdown() callback. */ @@ -55,7 +60,7 @@ public class ShutdownTask implements ConsumerTask { @NonNull private final ShardRecordProcessorCheckpointer recordProcessorCheckpointer; @NonNull - private final ShutdownReason reason; + private ShutdownReason reason; @NonNull private final InitialPositionInStreamExtended initialPositionInStream; private final boolean cleanupLeasesOfCompletedShards; @@ -88,6 +93,15 @@ public class ShutdownTask implements ConsumerTask { try { try { + List allShards = new ArrayList<>(); + if(reason == ShutdownReason.SHARD_END) { + allShards = shardDetector.listShards(); + + if(!isRealShardEnd(allShards)) { + reason = ShutdownReason.LEASE_LOST; + } + } + // If we reached end of the shard, set sequence number to SHARD_END. if (reason == ShutdownReason.SHARD_END) { recordProcessorCheckpointer @@ -126,7 +140,7 @@ public class ShutdownTask implements ConsumerTask { if (reason == ShutdownReason.SHARD_END) { log.debug("Looking for child shards of shard {}", shardInfo.shardId()); // create leases for the child shards - hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, + hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(allShards, shardDetector, leaseRefresher, initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope); log.debug("Finished checking for child shards of shard {}", shardInfo.shardId()); } @@ -169,4 +183,17 @@ public class ShutdownTask implements ConsumerTask { return reason; } + private boolean isRealShardEnd(List shards) { + boolean realShardEnd = false; + + for(Shard shard : shards) { + if(shard.parentShardId() != null && shard.parentShardId().equals(shardInfo.shardId()) + || shard.adjacentParentShardId() != null && shard.adjacentParentShardId().equals(shardInfo.shardId())) { + realShardEnd = true; + break; + } + } + return realShardEnd; + } + } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java index 85230c0d..5d4ac902 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java @@ -205,6 +205,70 @@ public class HierarchicalShardSyncerTest { } + @Test + public void testCheckAndCreateLeasesForShardsWithShardList() throws Exception { + final List shards = constructShardListForGraphA(); + + final ArgumentCaptor leaseCaptor = ArgumentCaptor.forClass(Lease.class); + when(shardDetector.listShards()).thenReturn(shards); + when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList()); + when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true); + + hierarchicalShardSyncer + .checkAndCreateLeaseForNewShards(shards, shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, + cleanupLeasesOfCompletedShards, false, SCOPE); + + final Set expectedShardIds = new HashSet<>( + Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10")); + + final List requestLeases = leaseCaptor.getAllValues(); + final Set requestLeaseKeys = requestLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); + final Set extendedSequenceNumbers = requestLeases.stream().map(Lease::checkpoint) + .collect(Collectors.toSet()); + + assertThat(requestLeases.size(), equalTo(expectedShardIds.size())); + assertThat(requestLeaseKeys, equalTo(expectedShardIds)); + assertThat(extendedSequenceNumbers.size(), equalTo(1)); + + extendedSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST))); + + verify(shardDetector, never()).listShards(); + verify(dynamoDBLeaseRefresher, times(expectedShardIds.size())).createLeaseIfNotExists(any(Lease.class)); + verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); + } + + @Test + public void testCheckAndCreateLeasesForShardsWithEmptyShardList() throws Exception { + final List shards = constructShardListForGraphA(); + + final ArgumentCaptor leaseCaptor = ArgumentCaptor.forClass(Lease.class); + when(shardDetector.listShards()).thenReturn(shards); + when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList()); + when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true); + + hierarchicalShardSyncer + .checkAndCreateLeaseForNewShards(new ArrayList(), shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, + cleanupLeasesOfCompletedShards, false, SCOPE); + + final Set expectedShardIds = new HashSet<>( + Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10")); + + final List requestLeases = leaseCaptor.getAllValues(); + final Set requestLeaseKeys = requestLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); + final Set extendedSequenceNumbers = requestLeases.stream().map(Lease::checkpoint) + .collect(Collectors.toSet()); + + assertThat(requestLeases.size(), equalTo(expectedShardIds.size())); + assertThat(requestLeaseKeys, equalTo(expectedShardIds)); + assertThat(extendedSequenceNumbers.size(), equalTo(1)); + + extendedSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST))); + + verify(shardDetector).listShards(); + verify(dynamoDBLeaseRefresher, times(expectedShardIds.size())).createLeaseIfNotExists(any(Lease.class)); + verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); + } + @Test public void testCheckAndCreateLeasesForNewShardsAtTrimHorizon() throws Exception { testCheckAndCreateLeaseForShardsIfMissing(constructShardListForGraphA(), INITIAL_POSITION_TRIM_HORIZON); @@ -1035,7 +1099,11 @@ public class HierarchicalShardSyncerTest { /* * Helper method to construct a shard list for graph A. Graph A is defined below. Shard structure (y-axis is - * epochs): 0 1 2 3 4 5- shards till epoch 102 \ / \ / | | 6 7 4 5- shards from epoch 103 - 205 \ / | /\ 8 4 9 10 - + * epochs): 0 1 2 3 4 5- shards till + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - * shards from epoch 206 (open - no ending sequenceNumber) */ private List constructShardListForGraphA() { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java index 50f905f0..8a62bb6c 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java @@ -16,13 +16,18 @@ package software.amazon.kinesis.lifecycle; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.util.Arrays; import java.util.Collections; +import java.util.List; import org.junit.Before; import org.junit.Test; @@ -30,6 +35,8 @@ import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; +import software.amazon.awssdk.services.kinesis.model.SequenceNumberRange; +import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; @@ -38,13 +45,15 @@ import software.amazon.kinesis.leases.HierarchicalShardSyncer; import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardInfo; +import software.amazon.kinesis.leases.ShardObjectHelper; +import software.amazon.kinesis.lifecycle.events.LeaseLostInput; +import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.metrics.NullMetricsFactory; import software.amazon.kinesis.processor.Checkpointer; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; -import software.amazon.kinesis.utils.TestStreamlet; /** * @@ -54,14 +63,14 @@ public class ShutdownTaskTest { private static final long TASK_BACKOFF_TIME_MILLIS = 1L; private static final InitialPositionInStreamExtended INITIAL_POSITION_TRIM_HORIZON = InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON); - private static final ShutdownReason TERMINATE_SHUTDOWN_REASON = ShutdownReason.SHARD_END; + private static final ShutdownReason SHARD_END_SHUTDOWN_REASON = ShutdownReason.SHARD_END; + private static final ShutdownReason LEASE_LOST_SHUTDOWN_REASON = ShutdownReason.LEASE_LOST; private static final MetricsFactory NULL_METRICS_FACTORY = new NullMetricsFactory(); private final String concurrencyToken = "testToken4398"; - private final String shardId = "shardId-0000397840"; + private final String shardId = "shardId-0"; private boolean cleanupLeasesOfCompletedShards = false; private boolean ignoreUnexpectedChildShards = false; - private ShardRecordProcessor shardRecordProcessor; private ShardInfo shardInfo; private ShutdownTask task; @@ -77,6 +86,8 @@ public class ShutdownTaskTest { private ShardDetector shardDetector; @Mock private HierarchicalShardSyncer hierarchicalShardSyncer; + @Mock + private ShardRecordProcessor shardRecordProcessor; @Before public void setUp() throws Exception { @@ -85,10 +96,9 @@ public class ShutdownTaskTest { shardInfo = new ShardInfo(shardId, concurrencyToken, Collections.emptySet(), ExtendedSequenceNumber.LATEST); - shardRecordProcessor = new TestStreamlet(); task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer, - TERMINATE_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, + SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, leaseRefresher, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, hierarchicalShardSyncer, NULL_METRICS_FACTORY); } @@ -98,7 +108,9 @@ public class ShutdownTaskTest { */ @Test public final void testCallWhenApplicationDoesNotCheckpoint() { + when(shardDetector.listShards()).thenReturn(constructShardListGraphA()); when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(new ExtendedSequenceNumber("3298")); + final TaskResult result = task.call(); assertNotNull(result.getException()); assertTrue(result.getException() instanceof IllegalArgumentException); @@ -109,19 +121,90 @@ public class ShutdownTaskTest { */ @Test public final void testCallWhenSyncingShardsThrows() throws Exception { + List shards = constructShardListGraphA(); + when(shardDetector.listShards()).thenReturn(shards); when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END); - when(shardDetector.listShards()).thenReturn(null); + doAnswer((invocation) -> { throw new KinesisClientLibIOException("KinesisClientLibIOException"); }).when(hierarchicalShardSyncer) - .checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, INITIAL_POSITION_TRIM_HORIZON, + .checkAndCreateLeaseForNewShards(shards, shardDetector, leaseRefresher, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, NULL_METRICS_FACTORY.createMetrics()); - TaskResult result = task.call(); + final TaskResult result = task.call(); assertNotNull(result.getException()); assertTrue(result.getException() instanceof KinesisClientLibIOException); verify(recordsPublisher).shutdown(); + verify(shardRecordProcessor).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build()); + verify(shardRecordProcessor, never()).leaseLost(LeaseLostInput.builder().build()); + } + + /** + * Test method for {@link ShutdownTask#call()}. + */ + @Test + public final void testCallWhenTrueShardEnd() { + shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(), + ExtendedSequenceNumber.LATEST); + task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer, + SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, + ignoreUnexpectedChildShards, leaseRefresher, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, + hierarchicalShardSyncer, NULL_METRICS_FACTORY); + + when(shardDetector.listShards()).thenReturn(constructShardListGraphA()); + when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END); + + final TaskResult result = task.call(); + assertNull(result.getException()); + verify(recordsPublisher).shutdown(); + verify(shardRecordProcessor).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build()); + verify(shardRecordProcessor, never()).leaseLost(LeaseLostInput.builder().build()); + verify(shardDetector, times(1)).listShards(); + } + + /** + * Test method for {@link ShutdownTask#call()}. + */ + @Test + public final void testCallWhenFalseShardEnd() { + shardInfo = new ShardInfo("shardId-4", concurrencyToken, Collections.emptySet(), + ExtendedSequenceNumber.LATEST); + task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer, + SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, + ignoreUnexpectedChildShards, leaseRefresher, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, + hierarchicalShardSyncer, NULL_METRICS_FACTORY); + + when(shardDetector.listShards()).thenReturn(constructShardListGraphA()); + + final TaskResult result = task.call(); + assertNull(result.getException()); + verify(recordsPublisher).shutdown(); + verify(shardRecordProcessor, never()).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build()); + verify(shardRecordProcessor).leaseLost(LeaseLostInput.builder().build()); + verify(shardDetector, times(1)).listShards(); + } + + /** + * Test method for {@link ShutdownTask#call()}. + */ + @Test + public final void testCallWhenLeaseLost() { + shardInfo = new ShardInfo("shardId-4", concurrencyToken, Collections.emptySet(), + ExtendedSequenceNumber.LATEST); + task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer, + LEASE_LOST_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, + ignoreUnexpectedChildShards, leaseRefresher, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, + hierarchicalShardSyncer, NULL_METRICS_FACTORY); + + when(shardDetector.listShards()).thenReturn(constructShardListGraphA()); + + final TaskResult result = task.call(); + assertNull(result.getException()); + verify(recordsPublisher).shutdown(); + verify(shardRecordProcessor, never()).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build()); + verify(shardRecordProcessor).leaseLost(LeaseLostInput.builder().build()); + verify(shardDetector, never()).listShards(); } /** @@ -132,4 +215,45 @@ public class ShutdownTaskTest { assertEquals(TaskType.SHUTDOWN, task.taskType()); } + + /* + * Helper method to construct a shard list for graph A. Graph A is defined below. Shard structure (y-axis is + * epochs): 0 1 2 3 4 5 - shards till + * \ / \ / | | + * 6 7 4 5 - shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + */ + private List constructShardListGraphA() { + final SequenceNumberRange range0 = ShardObjectHelper.newSequenceNumberRange("11", "102"); + final SequenceNumberRange range1 = ShardObjectHelper.newSequenceNumberRange("11", null); + final SequenceNumberRange range2 = ShardObjectHelper.newSequenceNumberRange("11", "205"); + final SequenceNumberRange range3 = ShardObjectHelper.newSequenceNumberRange("103", "205"); + final SequenceNumberRange range4 = ShardObjectHelper.newSequenceNumberRange("206", null); + + return Arrays.asList( + ShardObjectHelper.newShard("shardId-0", null, null, range0, + ShardObjectHelper.newHashKeyRange("0", "99")), + ShardObjectHelper.newShard("shardId-1", null, null, range0, + ShardObjectHelper.newHashKeyRange("100", "199")), + ShardObjectHelper.newShard("shardId-2", null, null, range0, + ShardObjectHelper.newHashKeyRange("200", "299")), + ShardObjectHelper.newShard("shardId-3", null, null, range0, + ShardObjectHelper.newHashKeyRange("300", "399")), + ShardObjectHelper.newShard("shardId-4", null, null, range1, + ShardObjectHelper.newHashKeyRange("400", "499")), + ShardObjectHelper.newShard("shardId-5", null, null, range2, + ShardObjectHelper.newHashKeyRange("500", ShardObjectHelper.MAX_HASH_KEY)), + ShardObjectHelper.newShard("shardId-6", "shardId-0", "shardId-1", range3, + ShardObjectHelper.newHashKeyRange("0", "199")), + ShardObjectHelper.newShard("shardId-7", "shardId-2", "shardId-3", range3, + ShardObjectHelper.newHashKeyRange("200", "399")), + ShardObjectHelper.newShard("shardId-8", "shardId-6", "shardId-7", range4, + ShardObjectHelper.newHashKeyRange("0", "399")), + ShardObjectHelper.newShard("shardId-9", "shardId-5", null, range4, + ShardObjectHelper.newHashKeyRange("500", "799")), + ShardObjectHelper.newShard("shardId-10", null, "shardId-5", range4, + ShardObjectHelper.newHashKeyRange("800", ShardObjectHelper.MAX_HASH_KEY))); + } + }