From 054cd8828451f3349115ab075de88155dcc27de3 Mon Sep 17 00:00:00 2001 From: Chunxue Yang Date: Wed, 15 Apr 2020 17:57:38 -0700 Subject: [PATCH] ShardEnd sync persistence change quick fix Addressing comments --- .../software/amazon/kinesis/leases/Lease.java | 26 +++++++++++++++++-- .../dynamodb/DynamoDBLeaseSerializer.java | 11 ++++++++ .../kinesis/lifecycle/ShutdownTask.java | 21 +++++++++++++++ .../amazon/kinesis/leases/LeaseBuilder.java | 3 ++- ...DynamoDBLeaseRefresherIntegrationTest.java | 13 ++++++++++ .../leases/dynamodb/TestHarnessBuilder.java | 2 ++ .../kinesis/lifecycle/ShutdownTaskTest.java | 15 ++++++++++- 7 files changed, 87 insertions(+), 4 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java index 682a6f9e..a04e2725 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java @@ -38,7 +38,7 @@ import java.util.concurrent.TimeUnit; @NoArgsConstructor @Getter @Accessors(fluent = true) -@EqualsAndHashCode(exclude = {"concurrencyToken", "lastCounterIncrementNanos"}) +@EqualsAndHashCode(exclude = {"concurrencyToken", "lastCounterIncrementNanos", "childShardIds"}) @ToString public class Lease { /* @@ -87,6 +87,7 @@ public class Lease { */ private Long ownerSwitchesSinceCheckpoint = 0L; private Set parentShardIds = new HashSet<>(); + private Set childShardIds = new HashSet<>(); /** * Copy constructor, used by clone(). @@ -96,13 +97,21 @@ public class Lease { protected Lease(Lease lease) { this(lease.leaseKey(), lease.leaseOwner(), lease.leaseCounter(), lease.concurrencyToken(), lease.lastCounterIncrementNanos(), lease.checkpoint(), lease.pendingCheckpoint(), - lease.ownerSwitchesSinceCheckpoint(), lease.parentShardIds()); + lease.ownerSwitchesSinceCheckpoint(), lease.parentShardIds(), lease.childShardIds()); } public Lease(final String leaseKey, final String leaseOwner, final Long leaseCounter, final UUID concurrencyToken, final Long lastCounterIncrementNanos, final ExtendedSequenceNumber checkpoint, final ExtendedSequenceNumber pendingCheckpoint, final Long ownerSwitchesSinceCheckpoint, final Set parentShardIds) { + this(leaseKey, leaseOwner, leaseCounter, concurrencyToken, lastCounterIncrementNanos, checkpoint, pendingCheckpoint, + ownerSwitchesSinceCheckpoint, parentShardIds, new HashSet<>()); + } + + public Lease(final String leaseKey, final String leaseOwner, final Long leaseCounter, + final UUID concurrencyToken, final Long lastCounterIncrementNanos, + final ExtendedSequenceNumber checkpoint, final ExtendedSequenceNumber pendingCheckpoint, + final Long ownerSwitchesSinceCheckpoint, final Set parentShardIds, final Set childShardIds) { this.leaseKey = leaseKey; this.leaseOwner = leaseOwner; this.leaseCounter = leaseCounter; @@ -114,6 +123,9 @@ public class Lease { if (parentShardIds != null) { this.parentShardIds.addAll(parentShardIds); } + if (childShardIds != null) { + this.childShardIds.addAll(childShardIds); + } } /** @@ -134,6 +146,7 @@ public class Lease { checkpoint(lease.checkpoint); pendingCheckpoint(lease.pendingCheckpoint); parentShardIds(lease.parentShardIds); + childShardIds(lease.childShardIds()); } /** @@ -231,6 +244,15 @@ public class Lease { this.parentShardIds.addAll(parentShardIds); } + /** + * Sets childShardIds. + * + * @param childShardIds may not be null + */ + public void childShardIds(@NonNull final Collection childShardIds) { + this.childShardIds.addAll(childShardIds); + } + /** * Sets leaseOwner. * diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java index a02e2a6e..52b4d014 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java @@ -30,6 +30,7 @@ import software.amazon.awssdk.services.dynamodb.model.ExpectedAttributeValue; import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement; import software.amazon.awssdk.services.dynamodb.model.KeyType; import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType; +import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.leases.DynamoUtils; import software.amazon.kinesis.leases.Lease; @@ -51,6 +52,7 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer { private static final String PENDING_CHECKPOINT_SEQUENCE_KEY = "pendingCheckpoint"; private static final String PENDING_CHECKPOINT_SUBSEQUENCE_KEY = "pendingCheckpointSubSequenceNumber"; private static final String PARENT_SHARD_ID_KEY = "parentShardId"; + private static final String CHILD_SHARD_ID_KEY = "childShardId"; @Override public Map toDynamoRecord(final Lease lease) { @@ -69,6 +71,9 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer { if (lease.parentShardIds() != null && !lease.parentShardIds().isEmpty()) { result.put(PARENT_SHARD_ID_KEY, DynamoUtils.createAttributeValue(lease.parentShardIds())); } + if (!CollectionUtils.isNullOrEmpty(lease.childShardIds())) { + result.put(CHILD_SHARD_ID_KEY, DynamoUtils.createAttributeValue(lease.childShardIds())); + } if (lease.pendingCheckpoint() != null && !lease.pendingCheckpoint().sequenceNumber().isEmpty()) { result.put(PENDING_CHECKPOINT_SEQUENCE_KEY, DynamoUtils.createAttributeValue(lease.pendingCheckpoint().sequenceNumber())); @@ -97,6 +102,7 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer { DynamoUtils.safeGetLong(dynamoRecord, CHECKPOINT_SUBSEQUENCE_NUMBER_KEY)) ); leaseToUpdate.parentShardIds(DynamoUtils.safeGetSS(dynamoRecord, PARENT_SHARD_ID_KEY)); + leaseToUpdate.childShardIds(DynamoUtils.safeGetSS(dynamoRecord, CHILD_SHARD_ID_KEY)); if (!Strings.isNullOrEmpty(DynamoUtils.safeGetString(dynamoRecord, PENDING_CHECKPOINT_SEQUENCE_KEY))) { leaseToUpdate.pendingCheckpoint( @@ -220,6 +226,11 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer { result.put(PENDING_CHECKPOINT_SEQUENCE_KEY, AttributeValueUpdate.builder().action(AttributeAction.DELETE).build()); result.put(PENDING_CHECKPOINT_SUBSEQUENCE_KEY, AttributeValueUpdate.builder().action(AttributeAction.DELETE).build()); } + + if (!CollectionUtils.isNullOrEmpty(lease.childShardIds())) { + result.put(CHILD_SHARD_ID_KEY, putUpdate(DynamoUtils.createAttributeValue(lease.childShardIds()))); + } + return result; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index 33eb4497..96b17b3c 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -45,7 +45,10 @@ import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; +import java.util.HashSet; import java.util.List; +import java.util.Set; +import java.util.UUID; import java.util.function.Function; /** @@ -109,8 +112,13 @@ public class ShutdownTask implements ConsumerTask { final long startTime = System.currentTimeMillis(); if (reason == ShutdownReason.SHARD_END) { // Create new lease for the child shards if they don't exist. + // We have one valid scenario that shutdown task got created with SHARD_END reason and an empty list of childShards. + // This would happen when KinesisDataFetcher(for polling mode) or FanOutRecordsPublisher(for StoS mode) catches ResourceNotFound exception. + // In this case, KinesisDataFetcher and FanOutRecordsPublisher will send out SHARD_END signal to trigger a shutdown task with empty list of childShards. + // This scenario could happen when customer deletes the stream while leaving the KCL application running. if (!CollectionUtils.isNullOrEmpty(childShards)) { createLeasesForChildShardsIfNotExist(); + updateLeasesForChildShards(); } else { log.warn("Shard {} no longer exists. Shutting down consumer with SHARD_END reason without creating leases for child shards.", shardInfoIdProvider.apply(shardInfo)); } @@ -182,6 +190,19 @@ public class ShutdownTask implements ConsumerTask { } } + private void updateLeasesForChildShards() + throws DependencyException, InvalidStateException, ProvisionedThroughputException { + final Lease currentLease = leaseCoordinator.getCurrentlyHeldLease(shardInfoIdProvider.apply(shardInfo)); + Set childShardIds = new HashSet<>(); + for (ChildShard childShard : childShards) { + childShardIds.add(childShard.shardId()); + } + + final Lease upatedLease = currentLease.copy(); + upatedLease.childShardIds(childShardIds); + leaseCoordinator.updateLease(upatedLease, UUID.fromString(shardInfo.concurrencyToken()), SHUTDOWN_TASK_OPERATION, shardInfoIdProvider.apply(shardInfo)); + } + /* * (non-Javadoc) * diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseBuilder.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseBuilder.java index ee38116f..8ab99a18 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseBuilder.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseBuilder.java @@ -34,9 +34,10 @@ public class LeaseBuilder { private ExtendedSequenceNumber pendingCheckpoint; private Long ownerSwitchesSinceCheckpoint = 0L; private Set parentShardIds = new HashSet<>(); + private Set childShardIds = new HashSet<>(); public Lease build() { return new Lease(leaseKey, leaseOwner, leaseCounter, concurrencyToken, lastCounterIncrementNanos, - checkpoint, pendingCheckpoint, ownerSwitchesSinceCheckpoint, parentShardIds); + checkpoint, pendingCheckpoint, ownerSwitchesSinceCheckpoint, parentShardIds, childShardIds); } } \ No newline at end of file diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherIntegrationTest.java index 414f7975..99dcd64d 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherIntegrationTest.java @@ -25,6 +25,7 @@ import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.verify; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; @@ -239,6 +240,18 @@ public class DynamoDBLeaseRefresherIntegrationTest extends LeaseIntegrationTest assertNull(newLease); } + @Test + public void testUpdateLease() throws LeasingException { + TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher); + Lease lease = builder.withLease("1").build().get("1"); + Lease updatedLease = lease.copy(); + updatedLease.childShardIds(Collections.singleton("updatedChildShardId")); + + leaseRefresher.updateLease(updatedLease); + Lease newLease = leaseRefresher.getLease(lease.leaseKey()); + assertEquals(Collections.singleton("updatedChildShardId"), newLease.childShardIds()); + } + /** * Tests deleteLease when a lease does not exist. */ diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/TestHarnessBuilder.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/TestHarnessBuilder.java index 6afa0045..677303d6 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/TestHarnessBuilder.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/TestHarnessBuilder.java @@ -20,6 +20,7 @@ import static org.junit.Assert.assertNotNull; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; @@ -72,6 +73,7 @@ public class TestHarnessBuilder { lease.leaseCounter(0L); lease.leaseOwner(owner); lease.parentShardIds(Collections.singleton("parentShardId")); + lease.childShardIds(new HashSet<>()); lease.leaseKey(shardId); return lease; diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java index cbb9b834..9f1737fe 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java @@ -30,6 +30,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.UUID; import org.junit.Before; import org.junit.Test; @@ -77,7 +78,7 @@ public class ShutdownTaskTest { private static final ShutdownReason LEASE_LOST_SHUTDOWN_REASON = ShutdownReason.LEASE_LOST; private static final MetricsFactory NULL_METRICS_FACTORY = new NullMetricsFactory(); - private final String concurrencyToken = "testToken4398"; + private final String concurrencyToken = "0-1-2-3-4"; private final String shardId = "shardId-0"; private boolean cleanupLeasesOfCompletedShards = false; private boolean ignoreUnexpectedChildShards = false; @@ -122,6 +123,7 @@ public class ShutdownTaskTest { @Test public final void testCallWhenApplicationDoesNotCheckpoint() { when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(new ExtendedSequenceNumber("3298")); + when(leaseCoordinator.getCurrentlyHeldLease("shardId-0")).thenReturn(createLease()); when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher); final TaskResult result = task.call(); @@ -162,6 +164,7 @@ public class ShutdownTaskTest { hierarchicalShardSyncer, NULL_METRICS_FACTORY, constructChildShards()); when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END); + when(leaseCoordinator.getCurrentlyHeldLease("shardId-0")).thenReturn(createLease()); when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher); final TaskResult result = task.call(); @@ -169,6 +172,7 @@ public class ShutdownTaskTest { verify(recordsPublisher).shutdown(); verify(shardRecordProcessor).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build()); verify(shardRecordProcessor, never()).leaseLost(LeaseLostInput.builder().build()); + verify(leaseCoordinator).updateLease(Matchers.any(Lease.class), Matchers.any(UUID.class), Matchers.anyString(), Matchers.anyString()); verify(leaseRefresher, times(2)).createLeaseIfNotExists(Matchers.any(Lease.class)); verify(leaseCoordinator, never()).dropLease(Matchers.any(Lease.class)); } @@ -249,4 +253,13 @@ public class ShutdownTaskTest { return childShards; } + private Lease createLease() { + Lease lease = new Lease(); + lease.leaseKey("shardId-0"); + lease.leaseOwner("leaseOwner"); + lease.parentShardIds(Collections.singleton("parentShardIds")); + + return lease; + } + }