Merge pull request #16 from ychunxue/ltr_1_merge

ShardEnd child shard persistence change
This commit is contained in:
ychunxue 2020-04-22 15:02:46 -07:00 committed by GitHub
commit 3de44dc4eb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 91 additions and 10 deletions

View file

@ -38,7 +38,7 @@ import java.util.concurrent.TimeUnit;
@NoArgsConstructor
@Getter
@Accessors(fluent = true)
@EqualsAndHashCode(exclude = {"concurrencyToken", "lastCounterIncrementNanos", "pendingCheckpointState"})
@EqualsAndHashCode(exclude = {"concurrencyToken", "lastCounterIncrementNanos", "childShardIds", "pendingCheckpointState"})
@ToString
public class Lease {
/*
@ -95,6 +95,7 @@ public class Lease {
*/
private Long ownerSwitchesSinceCheckpoint = 0L;
private Set<String> parentShardIds = new HashSet<>();
private Set<String> childShardIds = new HashSet<>();
/**
* Copy constructor, used by clone().
@ -104,7 +105,7 @@ public class Lease {
protected Lease(Lease lease) {
this(lease.leaseKey(), lease.leaseOwner(), lease.leaseCounter(), lease.concurrencyToken(),
lease.lastCounterIncrementNanos(), lease.checkpoint(), lease.pendingCheckpoint(),
lease.ownerSwitchesSinceCheckpoint(), lease.parentShardIds(), lease.pendingCheckpointState());
lease.ownerSwitchesSinceCheckpoint(), lease.parentShardIds(), lease.childShardIds(), lease.pendingCheckpointState());
}
@Deprecated
@ -113,13 +114,14 @@ public class Lease {
final ExtendedSequenceNumber checkpoint, final ExtendedSequenceNumber pendingCheckpoint,
final Long ownerSwitchesSinceCheckpoint, final Set<String> parentShardIds) {
this(leaseKey, leaseOwner, leaseCounter, concurrencyToken, lastCounterIncrementNanos, checkpoint, pendingCheckpoint,
ownerSwitchesSinceCheckpoint, parentShardIds, null);
ownerSwitchesSinceCheckpoint, parentShardIds, new HashSet<>(), null);
}
public Lease(final String leaseKey, final String leaseOwner, final Long leaseCounter,
final UUID concurrencyToken, final Long lastCounterIncrementNanos,
final ExtendedSequenceNumber checkpoint, final ExtendedSequenceNumber pendingCheckpoint,
final Long ownerSwitchesSinceCheckpoint, final Set<String> parentShardIds, final byte[] pendingCheckpointState) {
final Long ownerSwitchesSinceCheckpoint, final Set<String> parentShardIds, final Set<String> childShardIds,
final byte[] pendingCheckpointState) {
this.leaseKey = leaseKey;
this.leaseOwner = leaseOwner;
this.leaseCounter = leaseCounter;
@ -131,6 +133,9 @@ public class Lease {
if (parentShardIds != null) {
this.parentShardIds.addAll(parentShardIds);
}
if (childShardIds != null) {
this.childShardIds.addAll(childShardIds);
}
this.pendingCheckpointState = pendingCheckpointState;
}
@ -153,6 +158,7 @@ public class Lease {
pendingCheckpoint(lease.pendingCheckpoint);
pendingCheckpointState(lease.pendingCheckpointState);
parentShardIds(lease.parentShardIds);
childShardIds(lease.childShardIds());
}
/**
@ -259,6 +265,15 @@ public class Lease {
this.parentShardIds.addAll(parentShardIds);
}
/**
* Sets childShardIds.
*
* @param childShardIds may not be null
*/
public void childShardIds(@NonNull final Collection<String> childShardIds) {
this.childShardIds.addAll(childShardIds);
}
/**
* Sets leaseOwner.
*

View file

@ -30,6 +30,7 @@ import software.amazon.awssdk.services.dynamodb.model.ExpectedAttributeValue;
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
import software.amazon.awssdk.services.dynamodb.model.KeyType;
import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.leases.DynamoUtils;
import software.amazon.kinesis.leases.Lease;
@ -52,6 +53,7 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer {
private static final String PENDING_CHECKPOINT_SUBSEQUENCE_KEY = "pendingCheckpointSubSequenceNumber";
private static final String PENDING_CHECKPOINT_STATE_KEY = "pendingCheckpointState";
private static final String PARENT_SHARD_ID_KEY = "parentShardId";
private static final String CHILD_SHARD_IDS_KEY = "childShardIds";
@Override
public Map<String, AttributeValue> toDynamoRecord(final Lease lease) {
@ -70,6 +72,9 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer {
if (lease.parentShardIds() != null && !lease.parentShardIds().isEmpty()) {
result.put(PARENT_SHARD_ID_KEY, DynamoUtils.createAttributeValue(lease.parentShardIds()));
}
if (!CollectionUtils.isNullOrEmpty(lease.childShardIds())) {
result.put(CHILD_SHARD_IDS_KEY, DynamoUtils.createAttributeValue(lease.childShardIds()));
}
if (lease.pendingCheckpoint() != null && !lease.pendingCheckpoint().sequenceNumber().isEmpty()) {
result.put(PENDING_CHECKPOINT_SEQUENCE_KEY, DynamoUtils.createAttributeValue(lease.pendingCheckpoint().sequenceNumber()));
@ -102,6 +107,7 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer {
DynamoUtils.safeGetLong(dynamoRecord, CHECKPOINT_SUBSEQUENCE_NUMBER_KEY))
);
leaseToUpdate.parentShardIds(DynamoUtils.safeGetSS(dynamoRecord, PARENT_SHARD_ID_KEY));
leaseToUpdate.childShardIds(DynamoUtils.safeGetSS(dynamoRecord, CHILD_SHARD_IDS_KEY));
if (!Strings.isNullOrEmpty(DynamoUtils.safeGetString(dynamoRecord, PENDING_CHECKPOINT_SEQUENCE_KEY))) {
leaseToUpdate.pendingCheckpoint(
@ -235,6 +241,11 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer {
result.put(PENDING_CHECKPOINT_STATE_KEY, AttributeValueUpdate.builder().action(AttributeAction.DELETE).build());
}
if (!CollectionUtils.isNullOrEmpty(lease.childShardIds())) {
result.put(CHILD_SHARD_IDS_KEY, putUpdate(DynamoUtils.createAttributeValue(lease.childShardIds())));
}
return result;
}

View file

@ -46,6 +46,14 @@ import software.amazon.kinesis.metrics.MetricsUtil;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* Task for invoking the ShardRecordProcessor shutdown() callback.
*/
@ -107,8 +115,13 @@ public class ShutdownTask implements ConsumerTask {
final long startTime = System.currentTimeMillis();
if (reason == ShutdownReason.SHARD_END) {
// Create new lease for the child shards if they don't exist.
// We have one valid scenario that shutdown task got created with SHARD_END reason and an empty list of childShards.
// This would happen when KinesisDataFetcher(for polling mode) or FanOutRecordsPublisher(for StoS mode) catches ResourceNotFound exception.
// In this case, KinesisDataFetcher and FanOutRecordsPublisher will send out SHARD_END signal to trigger a shutdown task with empty list of childShards.
// This scenario could happen when customer deletes the stream while leaving the KCL application running.
if (!CollectionUtils.isNullOrEmpty(childShards)) {
createLeasesForChildShardsIfNotExist();
updateLeasesForChildShards();
} else {
log.warn("Shard {} no longer exists. Shutting down consumer with SHARD_END reason without creating leases for child shards.", shardInfoIdProvider.apply(shardInfo));
}
@ -180,6 +193,16 @@ public class ShutdownTask implements ConsumerTask {
}
}
private void updateLeasesForChildShards()
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
final Lease currentLease = leaseCoordinator.getCurrentlyHeldLease(shardInfoIdProvider.apply(shardInfo));
Set<String> childShardIds = childShards.stream().map(ChildShard::shardId).collect(Collectors.toSet());
final Lease updatedLease = currentLease.copy();
updatedLease.childShardIds(childShardIds);
leaseCoordinator.updateLease(updatedLease, UUID.fromString(shardInfo.concurrencyToken()), SHUTDOWN_TASK_OPERATION, shardInfoIdProvider.apply(shardInfo));
}
/*
* (non-Javadoc)
*

View file

@ -433,7 +433,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
.records(records)
.millisBehindLatest(getRecordsResult.millisBehindLatest())
.cacheEntryTime(lastSuccessfulCall)
.isAtShardEnd(getRecordsRetrievalStrategy.getDataFetcher().isShardEndReached())
.isAtShardEnd(getRecordsRetrievalStrategy.dataFetcher().isShardEndReached())
.childShards(getRecordsResult.childShards())
.build();

View file

@ -969,7 +969,7 @@ public class HierarchicalShardSyncerTest {
parentShardIds.add(shard.adjacentParentShardId());
}
return new Lease(shard.shardId(), leaseOwner, 0L, UUID.randomUUID(), 0L, checkpoint, null, 0L,
parentShardIds, null);
parentShardIds, new HashSet<>(), null);
}).collect(Collectors.toList());
}

View file

@ -34,10 +34,11 @@ public class LeaseBuilder {
private ExtendedSequenceNumber pendingCheckpoint;
private Long ownerSwitchesSinceCheckpoint = 0L;
private Set<String> parentShardIds = new HashSet<>();
private Set<String> childShardIds = new HashSet<>();
private byte[] pendingCheckpointState;
public Lease build() {
return new Lease(leaseKey, leaseOwner, leaseCounter, concurrencyToken, lastCounterIncrementNanos,
checkpoint, pendingCheckpoint, ownerSwitchesSinceCheckpoint, parentShardIds, pendingCheckpointState);
return new Lease(leaseKey, leaseOwner, leaseCounter, concurrencyToken, lastCounterIncrementNanos, checkpoint,
pendingCheckpoint, ownerSwitchesSinceCheckpoint, parentShardIds, childShardIds, pendingCheckpointState);
}
}

View file

@ -25,6 +25,7 @@ import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.verify;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
@ -239,6 +240,18 @@ public class DynamoDBLeaseRefresherIntegrationTest extends LeaseIntegrationTest
assertNull(newLease);
}
@Test
public void testUpdateLease() throws LeasingException {
TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher);
Lease lease = builder.withLease("1").build().get("1");
Lease updatedLease = lease.copy();
updatedLease.childShardIds(Collections.singleton("updatedChildShardId"));
leaseRefresher.updateLease(updatedLease);
Lease newLease = leaseRefresher.getLease(lease.leaseKey());
assertEquals(Collections.singleton("updatedChildShardId"), newLease.childShardIds());
}
/**
* Tests deleteLease when a lease does not exist.
*/

View file

@ -55,7 +55,7 @@ public class DynamoDBLeaseRenewerTest {
private LeaseRefresher leaseRefresher;
private static Lease newLease(String leaseKey) {
return new Lease(leaseKey, "LeaseOwner", 0L, UUID.randomUUID(), System.nanoTime(), null, null, null, new HashSet<>(), null);
return new Lease(leaseKey, "LeaseOwner", 0L, UUID.randomUUID(), System.nanoTime(), null, null, null, new HashSet<>(), new HashSet<>(), null);
}
@Before

View file

@ -20,6 +20,7 @@ import static org.junit.Assert.assertNotNull;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
@ -72,6 +73,7 @@ public class TestHarnessBuilder {
lease.leaseCounter(0L);
lease.leaseOwner(owner);
lease.parentShardIds(Collections.singleton("parentShardId"));
lease.childShardIds(new HashSet<>());
lease.leaseKey(shardId);
return lease;

View file

@ -28,8 +28,10 @@ import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import org.junit.Before;
import org.junit.Test;
@ -77,7 +79,7 @@ public class ShutdownTaskTest {
private static final ShutdownReason LEASE_LOST_SHUTDOWN_REASON = ShutdownReason.LEASE_LOST;
private static final MetricsFactory NULL_METRICS_FACTORY = new NullMetricsFactory();
private final String concurrencyToken = "testToken4398";
private final String concurrencyToken = "0-1-2-3-4";
private final String shardId = "shardId-0";
private boolean cleanupLeasesOfCompletedShards = false;
private boolean ignoreUnexpectedChildShards = false;
@ -122,6 +124,8 @@ public class ShutdownTaskTest {
@Test
public final void testCallWhenApplicationDoesNotCheckpoint() {
when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(new ExtendedSequenceNumber("3298"));
Lease heldLease = createLease("shardId-0", "leaseOwner", Collections.singleton("parentShardId"));
when(leaseCoordinator.getCurrentlyHeldLease("shardId-0")).thenReturn(heldLease);
when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher);
final TaskResult result = task.call();
@ -162,6 +166,8 @@ public class ShutdownTaskTest {
hierarchicalShardSyncer, NULL_METRICS_FACTORY, constructChildShards());
when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END);
Lease heldLease = createLease("shardId-0", "leaseOwner", Collections.singleton("parentShardId"));
when(leaseCoordinator.getCurrentlyHeldLease("shardId-0")).thenReturn(heldLease);
when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher);
final TaskResult result = task.call();
@ -169,6 +175,7 @@ public class ShutdownTaskTest {
verify(recordsPublisher).shutdown();
verify(shardRecordProcessor).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build());
verify(shardRecordProcessor, never()).leaseLost(LeaseLostInput.builder().build());
verify(leaseCoordinator).updateLease(Matchers.any(Lease.class), Matchers.any(UUID.class), Matchers.anyString(), Matchers.anyString());
verify(leaseRefresher, times(2)).createLeaseIfNotExists(Matchers.any(Lease.class));
verify(leaseCoordinator, never()).dropLease(Matchers.any(Lease.class));
}
@ -249,4 +256,13 @@ public class ShutdownTaskTest {
return childShards;
}
private Lease createLease(String leaseKey, String leaseOwner, Collection<String> parentShardIds) {
Lease lease = new Lease();
lease.leaseKey(leaseKey);
lease.leaseOwner(leaseOwner);
lease.parentShardIds(parentShardIds);
return lease;
}
}