ShardEnd sync persistence change

quick fix

Addressing comments
This commit is contained in:
Chunxue Yang 2020-04-15 17:57:38 -07:00
parent b1a3d215d0
commit 054cd88284
7 changed files with 87 additions and 4 deletions

View file

@ -38,7 +38,7 @@ import java.util.concurrent.TimeUnit;
@NoArgsConstructor @NoArgsConstructor
@Getter @Getter
@Accessors(fluent = true) @Accessors(fluent = true)
@EqualsAndHashCode(exclude = {"concurrencyToken", "lastCounterIncrementNanos"}) @EqualsAndHashCode(exclude = {"concurrencyToken", "lastCounterIncrementNanos", "childShardIds"})
@ToString @ToString
public class Lease { public class Lease {
/* /*
@ -87,6 +87,7 @@ public class Lease {
*/ */
private Long ownerSwitchesSinceCheckpoint = 0L; private Long ownerSwitchesSinceCheckpoint = 0L;
private Set<String> parentShardIds = new HashSet<>(); private Set<String> parentShardIds = new HashSet<>();
private Set<String> childShardIds = new HashSet<>();
/** /**
* Copy constructor, used by clone(). * Copy constructor, used by clone().
@ -96,13 +97,21 @@ public class Lease {
protected Lease(Lease lease) { protected Lease(Lease lease) {
this(lease.leaseKey(), lease.leaseOwner(), lease.leaseCounter(), lease.concurrencyToken(), this(lease.leaseKey(), lease.leaseOwner(), lease.leaseCounter(), lease.concurrencyToken(),
lease.lastCounterIncrementNanos(), lease.checkpoint(), lease.pendingCheckpoint(), lease.lastCounterIncrementNanos(), lease.checkpoint(), lease.pendingCheckpoint(),
lease.ownerSwitchesSinceCheckpoint(), lease.parentShardIds()); lease.ownerSwitchesSinceCheckpoint(), lease.parentShardIds(), lease.childShardIds());
} }
public Lease(final String leaseKey, final String leaseOwner, final Long leaseCounter, public Lease(final String leaseKey, final String leaseOwner, final Long leaseCounter,
final UUID concurrencyToken, final Long lastCounterIncrementNanos, final UUID concurrencyToken, final Long lastCounterIncrementNanos,
final ExtendedSequenceNumber checkpoint, final ExtendedSequenceNumber pendingCheckpoint, final ExtendedSequenceNumber checkpoint, final ExtendedSequenceNumber pendingCheckpoint,
final Long ownerSwitchesSinceCheckpoint, final Set<String> parentShardIds) { final Long ownerSwitchesSinceCheckpoint, final Set<String> parentShardIds) {
this(leaseKey, leaseOwner, leaseCounter, concurrencyToken, lastCounterIncrementNanos, checkpoint, pendingCheckpoint,
ownerSwitchesSinceCheckpoint, parentShardIds, new HashSet<>());
}
public Lease(final String leaseKey, final String leaseOwner, final Long leaseCounter,
final UUID concurrencyToken, final Long lastCounterIncrementNanos,
final ExtendedSequenceNumber checkpoint, final ExtendedSequenceNumber pendingCheckpoint,
final Long ownerSwitchesSinceCheckpoint, final Set<String> parentShardIds, final Set<String> childShardIds) {
this.leaseKey = leaseKey; this.leaseKey = leaseKey;
this.leaseOwner = leaseOwner; this.leaseOwner = leaseOwner;
this.leaseCounter = leaseCounter; this.leaseCounter = leaseCounter;
@ -114,6 +123,9 @@ public class Lease {
if (parentShardIds != null) { if (parentShardIds != null) {
this.parentShardIds.addAll(parentShardIds); this.parentShardIds.addAll(parentShardIds);
} }
if (childShardIds != null) {
this.childShardIds.addAll(childShardIds);
}
} }
/** /**
@ -134,6 +146,7 @@ public class Lease {
checkpoint(lease.checkpoint); checkpoint(lease.checkpoint);
pendingCheckpoint(lease.pendingCheckpoint); pendingCheckpoint(lease.pendingCheckpoint);
parentShardIds(lease.parentShardIds); parentShardIds(lease.parentShardIds);
childShardIds(lease.childShardIds());
} }
/** /**
@ -231,6 +244,15 @@ public class Lease {
this.parentShardIds.addAll(parentShardIds); this.parentShardIds.addAll(parentShardIds);
} }
/**
* Sets childShardIds.
*
* @param childShardIds may not be null
*/
public void childShardIds(@NonNull final Collection<String> childShardIds) {
this.childShardIds.addAll(childShardIds);
}
/** /**
* Sets leaseOwner. * Sets leaseOwner.
* *

View file

@ -30,6 +30,7 @@ import software.amazon.awssdk.services.dynamodb.model.ExpectedAttributeValue;
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement; import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
import software.amazon.awssdk.services.dynamodb.model.KeyType; import software.amazon.awssdk.services.dynamodb.model.KeyType;
import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType; import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.leases.DynamoUtils; import software.amazon.kinesis.leases.DynamoUtils;
import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.Lease;
@ -51,6 +52,7 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer {
private static final String PENDING_CHECKPOINT_SEQUENCE_KEY = "pendingCheckpoint"; private static final String PENDING_CHECKPOINT_SEQUENCE_KEY = "pendingCheckpoint";
private static final String PENDING_CHECKPOINT_SUBSEQUENCE_KEY = "pendingCheckpointSubSequenceNumber"; private static final String PENDING_CHECKPOINT_SUBSEQUENCE_KEY = "pendingCheckpointSubSequenceNumber";
private static final String PARENT_SHARD_ID_KEY = "parentShardId"; private static final String PARENT_SHARD_ID_KEY = "parentShardId";
private static final String CHILD_SHARD_ID_KEY = "childShardId";
@Override @Override
public Map<String, AttributeValue> toDynamoRecord(final Lease lease) { public Map<String, AttributeValue> toDynamoRecord(final Lease lease) {
@ -69,6 +71,9 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer {
if (lease.parentShardIds() != null && !lease.parentShardIds().isEmpty()) { if (lease.parentShardIds() != null && !lease.parentShardIds().isEmpty()) {
result.put(PARENT_SHARD_ID_KEY, DynamoUtils.createAttributeValue(lease.parentShardIds())); result.put(PARENT_SHARD_ID_KEY, DynamoUtils.createAttributeValue(lease.parentShardIds()));
} }
if (!CollectionUtils.isNullOrEmpty(lease.childShardIds())) {
result.put(CHILD_SHARD_ID_KEY, DynamoUtils.createAttributeValue(lease.childShardIds()));
}
if (lease.pendingCheckpoint() != null && !lease.pendingCheckpoint().sequenceNumber().isEmpty()) { if (lease.pendingCheckpoint() != null && !lease.pendingCheckpoint().sequenceNumber().isEmpty()) {
result.put(PENDING_CHECKPOINT_SEQUENCE_KEY, DynamoUtils.createAttributeValue(lease.pendingCheckpoint().sequenceNumber())); result.put(PENDING_CHECKPOINT_SEQUENCE_KEY, DynamoUtils.createAttributeValue(lease.pendingCheckpoint().sequenceNumber()));
@ -97,6 +102,7 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer {
DynamoUtils.safeGetLong(dynamoRecord, CHECKPOINT_SUBSEQUENCE_NUMBER_KEY)) DynamoUtils.safeGetLong(dynamoRecord, CHECKPOINT_SUBSEQUENCE_NUMBER_KEY))
); );
leaseToUpdate.parentShardIds(DynamoUtils.safeGetSS(dynamoRecord, PARENT_SHARD_ID_KEY)); leaseToUpdate.parentShardIds(DynamoUtils.safeGetSS(dynamoRecord, PARENT_SHARD_ID_KEY));
leaseToUpdate.childShardIds(DynamoUtils.safeGetSS(dynamoRecord, CHILD_SHARD_ID_KEY));
if (!Strings.isNullOrEmpty(DynamoUtils.safeGetString(dynamoRecord, PENDING_CHECKPOINT_SEQUENCE_KEY))) { if (!Strings.isNullOrEmpty(DynamoUtils.safeGetString(dynamoRecord, PENDING_CHECKPOINT_SEQUENCE_KEY))) {
leaseToUpdate.pendingCheckpoint( leaseToUpdate.pendingCheckpoint(
@ -220,6 +226,11 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer {
result.put(PENDING_CHECKPOINT_SEQUENCE_KEY, AttributeValueUpdate.builder().action(AttributeAction.DELETE).build()); result.put(PENDING_CHECKPOINT_SEQUENCE_KEY, AttributeValueUpdate.builder().action(AttributeAction.DELETE).build());
result.put(PENDING_CHECKPOINT_SUBSEQUENCE_KEY, AttributeValueUpdate.builder().action(AttributeAction.DELETE).build()); result.put(PENDING_CHECKPOINT_SUBSEQUENCE_KEY, AttributeValueUpdate.builder().action(AttributeAction.DELETE).build());
} }
if (!CollectionUtils.isNullOrEmpty(lease.childShardIds())) {
result.put(CHILD_SHARD_ID_KEY, putUpdate(DynamoUtils.createAttributeValue(lease.childShardIds())));
}
return result; return result;
} }

View file

@ -45,7 +45,10 @@ import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function; import java.util.function.Function;
/** /**
@ -109,8 +112,13 @@ public class ShutdownTask implements ConsumerTask {
final long startTime = System.currentTimeMillis(); final long startTime = System.currentTimeMillis();
if (reason == ShutdownReason.SHARD_END) { if (reason == ShutdownReason.SHARD_END) {
// Create new lease for the child shards if they don't exist. // Create new lease for the child shards if they don't exist.
// We have one valid scenario that shutdown task got created with SHARD_END reason and an empty list of childShards.
// This would happen when KinesisDataFetcher(for polling mode) or FanOutRecordsPublisher(for StoS mode) catches ResourceNotFound exception.
// In this case, KinesisDataFetcher and FanOutRecordsPublisher will send out SHARD_END signal to trigger a shutdown task with empty list of childShards.
// This scenario could happen when customer deletes the stream while leaving the KCL application running.
if (!CollectionUtils.isNullOrEmpty(childShards)) { if (!CollectionUtils.isNullOrEmpty(childShards)) {
createLeasesForChildShardsIfNotExist(); createLeasesForChildShardsIfNotExist();
updateLeasesForChildShards();
} else { } else {
log.warn("Shard {} no longer exists. Shutting down consumer with SHARD_END reason without creating leases for child shards.", shardInfoIdProvider.apply(shardInfo)); log.warn("Shard {} no longer exists. Shutting down consumer with SHARD_END reason without creating leases for child shards.", shardInfoIdProvider.apply(shardInfo));
} }
@ -182,6 +190,19 @@ public class ShutdownTask implements ConsumerTask {
} }
} }
private void updateLeasesForChildShards()
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
final Lease currentLease = leaseCoordinator.getCurrentlyHeldLease(shardInfoIdProvider.apply(shardInfo));
Set<String> childShardIds = new HashSet<>();
for (ChildShard childShard : childShards) {
childShardIds.add(childShard.shardId());
}
final Lease upatedLease = currentLease.copy();
upatedLease.childShardIds(childShardIds);
leaseCoordinator.updateLease(upatedLease, UUID.fromString(shardInfo.concurrencyToken()), SHUTDOWN_TASK_OPERATION, shardInfoIdProvider.apply(shardInfo));
}
/* /*
* (non-Javadoc) * (non-Javadoc)
* *

View file

@ -34,9 +34,10 @@ public class LeaseBuilder {
private ExtendedSequenceNumber pendingCheckpoint; private ExtendedSequenceNumber pendingCheckpoint;
private Long ownerSwitchesSinceCheckpoint = 0L; private Long ownerSwitchesSinceCheckpoint = 0L;
private Set<String> parentShardIds = new HashSet<>(); private Set<String> parentShardIds = new HashSet<>();
private Set<String> childShardIds = new HashSet<>();
public Lease build() { public Lease build() {
return new Lease(leaseKey, leaseOwner, leaseCounter, concurrencyToken, lastCounterIncrementNanos, return new Lease(leaseKey, leaseOwner, leaseCounter, concurrencyToken, lastCounterIncrementNanos,
checkpoint, pendingCheckpoint, ownerSwitchesSinceCheckpoint, parentShardIds); checkpoint, pendingCheckpoint, ownerSwitchesSinceCheckpoint, parentShardIds, childShardIds);
} }
} }

View file

@ -25,6 +25,7 @@ import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -239,6 +240,18 @@ public class DynamoDBLeaseRefresherIntegrationTest extends LeaseIntegrationTest
assertNull(newLease); assertNull(newLease);
} }
@Test
public void testUpdateLease() throws LeasingException {
TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher);
Lease lease = builder.withLease("1").build().get("1");
Lease updatedLease = lease.copy();
updatedLease.childShardIds(Collections.singleton("updatedChildShardId"));
leaseRefresher.updateLease(updatedLease);
Lease newLease = leaseRefresher.getLease(lease.leaseKey());
assertEquals(Collections.singleton("updatedChildShardId"), newLease.childShardIds());
}
/** /**
* Tests deleteLease when a lease does not exist. * Tests deleteLease when a lease does not exist.
*/ */

View file

@ -20,6 +20,7 @@ import static org.junit.Assert.assertNotNull;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
@ -72,6 +73,7 @@ public class TestHarnessBuilder {
lease.leaseCounter(0L); lease.leaseCounter(0L);
lease.leaseOwner(owner); lease.leaseOwner(owner);
lease.parentShardIds(Collections.singleton("parentShardId")); lease.parentShardIds(Collections.singleton("parentShardId"));
lease.childShardIds(new HashSet<>());
lease.leaseKey(shardId); lease.leaseKey(shardId);
return lease; return lease;

View file

@ -30,6 +30,7 @@ import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.UUID;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -77,7 +78,7 @@ public class ShutdownTaskTest {
private static final ShutdownReason LEASE_LOST_SHUTDOWN_REASON = ShutdownReason.LEASE_LOST; private static final ShutdownReason LEASE_LOST_SHUTDOWN_REASON = ShutdownReason.LEASE_LOST;
private static final MetricsFactory NULL_METRICS_FACTORY = new NullMetricsFactory(); private static final MetricsFactory NULL_METRICS_FACTORY = new NullMetricsFactory();
private final String concurrencyToken = "testToken4398"; private final String concurrencyToken = "0-1-2-3-4";
private final String shardId = "shardId-0"; private final String shardId = "shardId-0";
private boolean cleanupLeasesOfCompletedShards = false; private boolean cleanupLeasesOfCompletedShards = false;
private boolean ignoreUnexpectedChildShards = false; private boolean ignoreUnexpectedChildShards = false;
@ -122,6 +123,7 @@ public class ShutdownTaskTest {
@Test @Test
public final void testCallWhenApplicationDoesNotCheckpoint() { public final void testCallWhenApplicationDoesNotCheckpoint() {
when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(new ExtendedSequenceNumber("3298")); when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(new ExtendedSequenceNumber("3298"));
when(leaseCoordinator.getCurrentlyHeldLease("shardId-0")).thenReturn(createLease());
when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher); when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher);
final TaskResult result = task.call(); final TaskResult result = task.call();
@ -162,6 +164,7 @@ public class ShutdownTaskTest {
hierarchicalShardSyncer, NULL_METRICS_FACTORY, constructChildShards()); hierarchicalShardSyncer, NULL_METRICS_FACTORY, constructChildShards());
when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END); when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END);
when(leaseCoordinator.getCurrentlyHeldLease("shardId-0")).thenReturn(createLease());
when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher); when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher);
final TaskResult result = task.call(); final TaskResult result = task.call();
@ -169,6 +172,7 @@ public class ShutdownTaskTest {
verify(recordsPublisher).shutdown(); verify(recordsPublisher).shutdown();
verify(shardRecordProcessor).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build()); verify(shardRecordProcessor).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build());
verify(shardRecordProcessor, never()).leaseLost(LeaseLostInput.builder().build()); verify(shardRecordProcessor, never()).leaseLost(LeaseLostInput.builder().build());
verify(leaseCoordinator).updateLease(Matchers.any(Lease.class), Matchers.any(UUID.class), Matchers.anyString(), Matchers.anyString());
verify(leaseRefresher, times(2)).createLeaseIfNotExists(Matchers.any(Lease.class)); verify(leaseRefresher, times(2)).createLeaseIfNotExists(Matchers.any(Lease.class));
verify(leaseCoordinator, never()).dropLease(Matchers.any(Lease.class)); verify(leaseCoordinator, never()).dropLease(Matchers.any(Lease.class));
} }
@ -249,4 +253,13 @@ public class ShutdownTaskTest {
return childShards; return childShards;
} }
private Lease createLease() {
Lease lease = new Lease();
lease.leaseKey("shardId-0");
lease.leaseOwner("leaseOwner");
lease.parentShardIds(Collections.singleton("parentShardIds"));
return lease;
}
} }