diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java index 15f79739..2d0ce8c2 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java @@ -38,7 +38,7 @@ import java.util.concurrent.TimeUnit; @NoArgsConstructor @Getter @Accessors(fluent = true) -@EqualsAndHashCode(exclude = {"concurrencyToken", "lastCounterIncrementNanos", "pendingCheckpointState"}) +@EqualsAndHashCode(exclude = {"concurrencyToken", "lastCounterIncrementNanos", "childShardIds", "pendingCheckpointState"}) @ToString public class Lease { /* @@ -95,6 +95,7 @@ public class Lease { */ private Long ownerSwitchesSinceCheckpoint = 0L; private Set parentShardIds = new HashSet<>(); + private Set childShardIds = new HashSet<>(); /** * Copy constructor, used by clone(). @@ -104,7 +105,7 @@ public class Lease { protected Lease(Lease lease) { this(lease.leaseKey(), lease.leaseOwner(), lease.leaseCounter(), lease.concurrencyToken(), lease.lastCounterIncrementNanos(), lease.checkpoint(), lease.pendingCheckpoint(), - lease.ownerSwitchesSinceCheckpoint(), lease.parentShardIds(), lease.pendingCheckpointState()); + lease.ownerSwitchesSinceCheckpoint(), lease.parentShardIds(), lease.childShardIds(), lease.pendingCheckpointState()); } @Deprecated @@ -113,13 +114,14 @@ public class Lease { final ExtendedSequenceNumber checkpoint, final ExtendedSequenceNumber pendingCheckpoint, final Long ownerSwitchesSinceCheckpoint, final Set parentShardIds) { this(leaseKey, leaseOwner, leaseCounter, concurrencyToken, lastCounterIncrementNanos, checkpoint, pendingCheckpoint, - ownerSwitchesSinceCheckpoint, parentShardIds, null); + ownerSwitchesSinceCheckpoint, parentShardIds, new HashSet<>(), null); } public Lease(final String leaseKey, final String leaseOwner, final Long leaseCounter, final UUID concurrencyToken, final Long lastCounterIncrementNanos, final ExtendedSequenceNumber checkpoint, final ExtendedSequenceNumber pendingCheckpoint, - final Long ownerSwitchesSinceCheckpoint, final Set parentShardIds, final byte[] pendingCheckpointState) { + final Long ownerSwitchesSinceCheckpoint, final Set parentShardIds, final Set childShardIds, + final byte[] pendingCheckpointState) { this.leaseKey = leaseKey; this.leaseOwner = leaseOwner; this.leaseCounter = leaseCounter; @@ -131,6 +133,9 @@ public class Lease { if (parentShardIds != null) { this.parentShardIds.addAll(parentShardIds); } + if (childShardIds != null) { + this.childShardIds.addAll(childShardIds); + } this.pendingCheckpointState = pendingCheckpointState; } @@ -153,6 +158,7 @@ public class Lease { pendingCheckpoint(lease.pendingCheckpoint); pendingCheckpointState(lease.pendingCheckpointState); parentShardIds(lease.parentShardIds); + childShardIds(lease.childShardIds()); } /** @@ -259,6 +265,15 @@ public class Lease { this.parentShardIds.addAll(parentShardIds); } + /** + * Sets childShardIds. + * + * @param childShardIds may not be null + */ + public void childShardIds(@NonNull final Collection childShardIds) { + this.childShardIds.addAll(childShardIds); + } + /** * Sets leaseOwner. * diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java index f42bafcf..85632500 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java @@ -30,6 +30,7 @@ import software.amazon.awssdk.services.dynamodb.model.ExpectedAttributeValue; import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement; import software.amazon.awssdk.services.dynamodb.model.KeyType; import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType; +import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.leases.DynamoUtils; import software.amazon.kinesis.leases.Lease; @@ -52,6 +53,7 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer { private static final String PENDING_CHECKPOINT_SUBSEQUENCE_KEY = "pendingCheckpointSubSequenceNumber"; private static final String PENDING_CHECKPOINT_STATE_KEY = "pendingCheckpointState"; private static final String PARENT_SHARD_ID_KEY = "parentShardId"; + private static final String CHILD_SHARD_IDS_KEY = "childShardIds"; @Override public Map toDynamoRecord(final Lease lease) { @@ -70,6 +72,9 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer { if (lease.parentShardIds() != null && !lease.parentShardIds().isEmpty()) { result.put(PARENT_SHARD_ID_KEY, DynamoUtils.createAttributeValue(lease.parentShardIds())); } + if (!CollectionUtils.isNullOrEmpty(lease.childShardIds())) { + result.put(CHILD_SHARD_IDS_KEY, DynamoUtils.createAttributeValue(lease.childShardIds())); + } if (lease.pendingCheckpoint() != null && !lease.pendingCheckpoint().sequenceNumber().isEmpty()) { result.put(PENDING_CHECKPOINT_SEQUENCE_KEY, DynamoUtils.createAttributeValue(lease.pendingCheckpoint().sequenceNumber())); @@ -102,6 +107,7 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer { DynamoUtils.safeGetLong(dynamoRecord, CHECKPOINT_SUBSEQUENCE_NUMBER_KEY)) ); leaseToUpdate.parentShardIds(DynamoUtils.safeGetSS(dynamoRecord, PARENT_SHARD_ID_KEY)); + leaseToUpdate.childShardIds(DynamoUtils.safeGetSS(dynamoRecord, CHILD_SHARD_IDS_KEY)); if (!Strings.isNullOrEmpty(DynamoUtils.safeGetString(dynamoRecord, PENDING_CHECKPOINT_SEQUENCE_KEY))) { leaseToUpdate.pendingCheckpoint( @@ -235,6 +241,11 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer { result.put(PENDING_CHECKPOINT_STATE_KEY, AttributeValueUpdate.builder().action(AttributeAction.DELETE).build()); } + + if (!CollectionUtils.isNullOrEmpty(lease.childShardIds())) { + result.put(CHILD_SHARD_IDS_KEY, putUpdate(DynamoUtils.createAttributeValue(lease.childShardIds()))); + } + return result; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index aab984eb..bf07f6e2 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -46,6 +46,14 @@ import software.amazon.kinesis.metrics.MetricsUtil; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.function.Function; +import java.util.stream.Collectors; + /** * Task for invoking the ShardRecordProcessor shutdown() callback. */ @@ -107,8 +115,13 @@ public class ShutdownTask implements ConsumerTask { final long startTime = System.currentTimeMillis(); if (reason == ShutdownReason.SHARD_END) { // Create new lease for the child shards if they don't exist. + // We have one valid scenario that shutdown task got created with SHARD_END reason and an empty list of childShards. + // This would happen when KinesisDataFetcher(for polling mode) or FanOutRecordsPublisher(for StoS mode) catches ResourceNotFound exception. + // In this case, KinesisDataFetcher and FanOutRecordsPublisher will send out SHARD_END signal to trigger a shutdown task with empty list of childShards. + // This scenario could happen when customer deletes the stream while leaving the KCL application running. if (!CollectionUtils.isNullOrEmpty(childShards)) { createLeasesForChildShardsIfNotExist(); + updateLeasesForChildShards(); } else { log.warn("Shard {} no longer exists. Shutting down consumer with SHARD_END reason without creating leases for child shards.", shardInfoIdProvider.apply(shardInfo)); } @@ -180,6 +193,16 @@ public class ShutdownTask implements ConsumerTask { } } + private void updateLeasesForChildShards() + throws DependencyException, InvalidStateException, ProvisionedThroughputException { + final Lease currentLease = leaseCoordinator.getCurrentlyHeldLease(shardInfoIdProvider.apply(shardInfo)); + Set childShardIds = childShards.stream().map(ChildShard::shardId).collect(Collectors.toSet()); + + final Lease updatedLease = currentLease.copy(); + updatedLease.childShardIds(childShardIds); + leaseCoordinator.updateLease(updatedLease, UUID.fromString(shardInfo.concurrencyToken()), SHUTDOWN_TASK_OPERATION, shardInfoIdProvider.apply(shardInfo)); + } + /* * (non-Javadoc) * diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java index d9e00669..4a0c75eb 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java @@ -433,7 +433,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { .records(records) .millisBehindLatest(getRecordsResult.millisBehindLatest()) .cacheEntryTime(lastSuccessfulCall) - .isAtShardEnd(getRecordsRetrievalStrategy.getDataFetcher().isShardEndReached()) + .isAtShardEnd(getRecordsRetrievalStrategy.dataFetcher().isShardEndReached()) .childShards(getRecordsResult.childShards()) .build(); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java index 44f6acf4..cbf7add7 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java @@ -969,7 +969,7 @@ public class HierarchicalShardSyncerTest { parentShardIds.add(shard.adjacentParentShardId()); } return new Lease(shard.shardId(), leaseOwner, 0L, UUID.randomUUID(), 0L, checkpoint, null, 0L, - parentShardIds, null); + parentShardIds, new HashSet<>(), null); }).collect(Collectors.toList()); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseBuilder.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseBuilder.java index 591e7db0..cf06f586 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseBuilder.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseBuilder.java @@ -34,10 +34,11 @@ public class LeaseBuilder { private ExtendedSequenceNumber pendingCheckpoint; private Long ownerSwitchesSinceCheckpoint = 0L; private Set parentShardIds = new HashSet<>(); + private Set childShardIds = new HashSet<>(); private byte[] pendingCheckpointState; public Lease build() { - return new Lease(leaseKey, leaseOwner, leaseCounter, concurrencyToken, lastCounterIncrementNanos, - checkpoint, pendingCheckpoint, ownerSwitchesSinceCheckpoint, parentShardIds, pendingCheckpointState); + return new Lease(leaseKey, leaseOwner, leaseCounter, concurrencyToken, lastCounterIncrementNanos, checkpoint, + pendingCheckpoint, ownerSwitchesSinceCheckpoint, parentShardIds, childShardIds, pendingCheckpointState); } } \ No newline at end of file diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherIntegrationTest.java index c9df9106..75431866 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherIntegrationTest.java @@ -25,6 +25,7 @@ import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.verify; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; @@ -239,6 +240,18 @@ public class DynamoDBLeaseRefresherIntegrationTest extends LeaseIntegrationTest assertNull(newLease); } + @Test + public void testUpdateLease() throws LeasingException { + TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher); + Lease lease = builder.withLease("1").build().get("1"); + Lease updatedLease = lease.copy(); + updatedLease.childShardIds(Collections.singleton("updatedChildShardId")); + + leaseRefresher.updateLease(updatedLease); + Lease newLease = leaseRefresher.getLease(lease.leaseKey()); + assertEquals(Collections.singleton("updatedChildShardId"), newLease.childShardIds()); + } + /** * Tests deleteLease when a lease does not exist. */ diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerTest.java index f22e6e4d..1daf85b8 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerTest.java @@ -55,7 +55,7 @@ public class DynamoDBLeaseRenewerTest { private LeaseRefresher leaseRefresher; private static Lease newLease(String leaseKey) { - return new Lease(leaseKey, "LeaseOwner", 0L, UUID.randomUUID(), System.nanoTime(), null, null, null, new HashSet<>(), null); + return new Lease(leaseKey, "LeaseOwner", 0L, UUID.randomUUID(), System.nanoTime(), null, null, null, new HashSet<>(), new HashSet<>(), null); } @Before diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/TestHarnessBuilder.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/TestHarnessBuilder.java index 6afa0045..677303d6 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/TestHarnessBuilder.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/TestHarnessBuilder.java @@ -20,6 +20,7 @@ import static org.junit.Assert.assertNotNull; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; @@ -72,6 +73,7 @@ public class TestHarnessBuilder { lease.leaseCounter(0L); lease.leaseOwner(owner); lease.parentShardIds(Collections.singleton("parentShardId")); + lease.childShardIds(new HashSet<>()); lease.leaseKey(shardId); return lease; diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java index cbb9b834..c94a3266 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java @@ -28,8 +28,10 @@ import static org.mockito.Mockito.when; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.UUID; import org.junit.Before; import org.junit.Test; @@ -77,7 +79,7 @@ public class ShutdownTaskTest { private static final ShutdownReason LEASE_LOST_SHUTDOWN_REASON = ShutdownReason.LEASE_LOST; private static final MetricsFactory NULL_METRICS_FACTORY = new NullMetricsFactory(); - private final String concurrencyToken = "testToken4398"; + private final String concurrencyToken = "0-1-2-3-4"; private final String shardId = "shardId-0"; private boolean cleanupLeasesOfCompletedShards = false; private boolean ignoreUnexpectedChildShards = false; @@ -122,6 +124,8 @@ public class ShutdownTaskTest { @Test public final void testCallWhenApplicationDoesNotCheckpoint() { when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(new ExtendedSequenceNumber("3298")); + Lease heldLease = createLease("shardId-0", "leaseOwner", Collections.singleton("parentShardId")); + when(leaseCoordinator.getCurrentlyHeldLease("shardId-0")).thenReturn(heldLease); when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher); final TaskResult result = task.call(); @@ -162,6 +166,8 @@ public class ShutdownTaskTest { hierarchicalShardSyncer, NULL_METRICS_FACTORY, constructChildShards()); when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END); + Lease heldLease = createLease("shardId-0", "leaseOwner", Collections.singleton("parentShardId")); + when(leaseCoordinator.getCurrentlyHeldLease("shardId-0")).thenReturn(heldLease); when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher); final TaskResult result = task.call(); @@ -169,6 +175,7 @@ public class ShutdownTaskTest { verify(recordsPublisher).shutdown(); verify(shardRecordProcessor).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build()); verify(shardRecordProcessor, never()).leaseLost(LeaseLostInput.builder().build()); + verify(leaseCoordinator).updateLease(Matchers.any(Lease.class), Matchers.any(UUID.class), Matchers.anyString(), Matchers.anyString()); verify(leaseRefresher, times(2)).createLeaseIfNotExists(Matchers.any(Lease.class)); verify(leaseCoordinator, never()).dropLease(Matchers.any(Lease.class)); } @@ -249,4 +256,13 @@ public class ShutdownTaskTest { return childShards; } + private Lease createLease(String leaseKey, String leaseOwner, Collection parentShardIds) { + Lease lease = new Lease(); + lease.leaseKey(leaseKey); + lease.leaseOwner(leaseOwner); + lease.parentShardIds(parentShardIds); + + return lease; + } + }