leaseKey parameter fix and logging change for shutdownTask

This commit is contained in:
Chunxue Yang 2020-05-05 15:44:45 -07:00
parent c479984fb4
commit 1036006bb4
3 changed files with 15 additions and 2 deletions

View file

@ -25,6 +25,8 @@ import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import java.util.function.Function;
/**
* Task to block until processing of all data records in the parent shard(s) is completed.
* We check if we have checkpoint(s) for the parent shard(s).
@ -59,7 +61,8 @@ public class BlockOnParentShardTask implements ConsumerTask {
try {
boolean blockedOnParentShard = false;
for (String shardId : shardInfo.parentShardIds()) {
Lease lease = leaseRefresher.getLease(shardId);
final String leaseKey = shardInfo.streamIdentifierSerOpt().map(s -> s + ":" + shardId).orElse(shardId);
final Lease lease = leaseRefresher.getLease(leaseKey);
if (lease != null) {
ExtendedSequenceNumber checkpoint = lease.checkpoint();
if ((checkpoint == null) || (!checkpoint.equals(ExtendedSequenceNumber.SHARD_END))) {

View file

@ -183,9 +183,13 @@ public class ShutdownTask implements ConsumerTask {
private void createLeasesForChildShardsIfNotExist()
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
for(ChildShard childShard : childShards) {
if(leaseCoordinator.getCurrentlyHeldLease(childShard.shardId()) == null) {
final String leaseKey = shardInfo.streamIdentifierSerOpt()
.map(s -> s + ":" + childShard.shardId())
.orElse(childShard.shardId());
if(leaseCoordinator.getCurrentlyHeldLease(leaseKey) == null) {
final Lease leaseToCreate = hierarchicalShardSyncer.createLeaseForChildShard(childShard, shardDetector.streamIdentifier());
leaseCoordinator.leaseRefresher().createLeaseIfNotExists(leaseToCreate);
log.info("Shard {}: Created child shard lease: {}", shardInfo.shardId(), leaseToCreate.leaseKey());
}
}
}
@ -198,6 +202,7 @@ public class ShutdownTask implements ConsumerTask {
final Lease updatedLease = currentLease.copy();
updatedLease.childShardIds(childShardIds);
leaseCoordinator.updateLease(updatedLease, UUID.fromString(shardInfo.concurrencyToken()), SHUTDOWN_TASK_OPERATION, leaseKeyProvider.apply(shardInfo));
log.info("Shard {}: Updated current lease {} with child shard information: {}", shardInfo.shardId(), currentLease.leaseKey(), childShardIds);
}
/*

View file

@ -46,6 +46,7 @@ import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.leases.Lease;
@ -107,6 +108,10 @@ public class ShutdownTaskTest {
public void setUp() throws Exception {
doNothing().when(recordsPublisher).shutdown();
when(recordProcessorCheckpointer.checkpointer()).thenReturn(checkpointer);
final Lease childLease = new Lease();
childLease.leaseKey("childShardLeaseKey");
when(hierarchicalShardSyncer.createLeaseForChildShard(Matchers.any(ChildShard.class), Matchers.any(StreamIdentifier.class)))
.thenReturn(childLease);
shardInfo = new ShardInfo(shardId, concurrencyToken, Collections.emptySet(),
ExtendedSequenceNumber.LATEST);