Merge pull request #26 from ychunxue/ltr_1_lease_parameter
leaseKey parameter fix and logging change for shutdownTask
This commit is contained in:
commit
8e4b8d789b
4 changed files with 19 additions and 4 deletions
|
|
@ -149,9 +149,13 @@ public class ShardInfo {
|
||||||
* @return lease key
|
* @return lease key
|
||||||
*/
|
*/
|
||||||
public static String getLeaseKey(ShardInfo shardInfo) {
|
public static String getLeaseKey(ShardInfo shardInfo) {
|
||||||
|
return getLeaseKey(shardInfo, shardInfo.shardId());
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String getLeaseKey(ShardInfo shardInfo, String shardId) {
|
||||||
return shardInfo.streamIdentifierSerOpt().isPresent() ?
|
return shardInfo.streamIdentifierSerOpt().isPresent() ?
|
||||||
MultiStreamLease.getLeaseKey(shardInfo.streamIdentifierSerOpt().get(), shardInfo.shardId()) :
|
MultiStreamLease.getLeaseKey(shardInfo.streamIdentifierSerOpt().get(), shardId) :
|
||||||
shardInfo.shardId();
|
shardId;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -25,6 +25,8 @@ import software.amazon.kinesis.leases.LeaseRefresher;
|
||||||
import software.amazon.kinesis.leases.ShardInfo;
|
import software.amazon.kinesis.leases.ShardInfo;
|
||||||
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
|
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
|
||||||
|
|
||||||
|
import java.util.function.Function;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Task to block until processing of all data records in the parent shard(s) is completed.
|
* Task to block until processing of all data records in the parent shard(s) is completed.
|
||||||
* We check if we have checkpoint(s) for the parent shard(s).
|
* We check if we have checkpoint(s) for the parent shard(s).
|
||||||
|
|
@ -59,7 +61,8 @@ public class BlockOnParentShardTask implements ConsumerTask {
|
||||||
try {
|
try {
|
||||||
boolean blockedOnParentShard = false;
|
boolean blockedOnParentShard = false;
|
||||||
for (String shardId : shardInfo.parentShardIds()) {
|
for (String shardId : shardInfo.parentShardIds()) {
|
||||||
Lease lease = leaseRefresher.getLease(shardId);
|
final String leaseKey = ShardInfo.getLeaseKey(shardInfo, shardId);
|
||||||
|
final Lease lease = leaseRefresher.getLease(leaseKey);
|
||||||
if (lease != null) {
|
if (lease != null) {
|
||||||
ExtendedSequenceNumber checkpoint = lease.checkpoint();
|
ExtendedSequenceNumber checkpoint = lease.checkpoint();
|
||||||
if ((checkpoint == null) || (!checkpoint.equals(ExtendedSequenceNumber.SHARD_END))) {
|
if ((checkpoint == null) || (!checkpoint.equals(ExtendedSequenceNumber.SHARD_END))) {
|
||||||
|
|
|
||||||
|
|
@ -183,9 +183,11 @@ public class ShutdownTask implements ConsumerTask {
|
||||||
private void createLeasesForChildShardsIfNotExist()
|
private void createLeasesForChildShardsIfNotExist()
|
||||||
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
|
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
|
||||||
for(ChildShard childShard : childShards) {
|
for(ChildShard childShard : childShards) {
|
||||||
if(leaseCoordinator.getCurrentlyHeldLease(childShard.shardId()) == null) {
|
final String leaseKey = ShardInfo.getLeaseKey(shardInfo, childShard.shardId());
|
||||||
|
if(leaseCoordinator.getCurrentlyHeldLease(leaseKey) == null) {
|
||||||
final Lease leaseToCreate = hierarchicalShardSyncer.createLeaseForChildShard(childShard, shardDetector.streamIdentifier());
|
final Lease leaseToCreate = hierarchicalShardSyncer.createLeaseForChildShard(childShard, shardDetector.streamIdentifier());
|
||||||
leaseCoordinator.leaseRefresher().createLeaseIfNotExists(leaseToCreate);
|
leaseCoordinator.leaseRefresher().createLeaseIfNotExists(leaseToCreate);
|
||||||
|
log.info("Shard {}: Created child shard lease: {}", shardInfo.shardId(), leaseToCreate.leaseKey());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -198,6 +200,7 @@ public class ShutdownTask implements ConsumerTask {
|
||||||
final Lease updatedLease = currentLease.copy();
|
final Lease updatedLease = currentLease.copy();
|
||||||
updatedLease.childShardIds(childShardIds);
|
updatedLease.childShardIds(childShardIds);
|
||||||
leaseCoordinator.updateLease(updatedLease, UUID.fromString(shardInfo.concurrencyToken()), SHUTDOWN_TASK_OPERATION, leaseKeyProvider.apply(shardInfo));
|
leaseCoordinator.updateLease(updatedLease, UUID.fromString(shardInfo.concurrencyToken()), SHUTDOWN_TASK_OPERATION, leaseKeyProvider.apply(shardInfo));
|
||||||
|
log.info("Shard {}: Updated current lease {} with child shard information: {}", shardInfo.shardId(), currentLease.leaseKey(), childShardIds);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
|
||||||
|
|
@ -46,6 +46,7 @@ import software.amazon.awssdk.services.kinesis.model.Shard;
|
||||||
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
|
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
|
||||||
import software.amazon.kinesis.common.InitialPositionInStream;
|
import software.amazon.kinesis.common.InitialPositionInStream;
|
||||||
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
|
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
|
||||||
|
import software.amazon.kinesis.common.StreamIdentifier;
|
||||||
import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException;
|
import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException;
|
||||||
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
|
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
|
||||||
import software.amazon.kinesis.leases.Lease;
|
import software.amazon.kinesis.leases.Lease;
|
||||||
|
|
@ -107,6 +108,10 @@ public class ShutdownTaskTest {
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
doNothing().when(recordsPublisher).shutdown();
|
doNothing().when(recordsPublisher).shutdown();
|
||||||
when(recordProcessorCheckpointer.checkpointer()).thenReturn(checkpointer);
|
when(recordProcessorCheckpointer.checkpointer()).thenReturn(checkpointer);
|
||||||
|
final Lease childLease = new Lease();
|
||||||
|
childLease.leaseKey("childShardLeaseKey");
|
||||||
|
when(hierarchicalShardSyncer.createLeaseForChildShard(Matchers.any(ChildShard.class), Matchers.any(StreamIdentifier.class)))
|
||||||
|
.thenReturn(childLease);
|
||||||
|
|
||||||
shardInfo = new ShardInfo(shardId, concurrencyToken, Collections.emptySet(),
|
shardInfo = new ShardInfo(shardId, concurrencyToken, Collections.emptySet(),
|
||||||
ExtendedSequenceNumber.LATEST);
|
ExtendedSequenceNumber.LATEST);
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue