Addressing comments from V1 version
This commit is contained in:
parent
89d4a07e0f
commit
2db1f068f7
4 changed files with 31 additions and 34 deletions
|
|
@ -78,21 +78,21 @@ public class HierarchicalShardSyncer {
|
||||||
final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards,
|
final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards,
|
||||||
final MetricsScope scope) throws DependencyException, InvalidStateException,
|
final MetricsScope scope) throws DependencyException, InvalidStateException,
|
||||||
ProvisionedThroughputException, KinesisClientLibIOException {
|
ProvisionedThroughputException, KinesisClientLibIOException {
|
||||||
final List<Shard> shards = getShardList(shardDetector);
|
final List<Shard> latestShards = getShardList(shardDetector);
|
||||||
checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, cleanupLeasesOfCompletedShards,
|
checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, cleanupLeasesOfCompletedShards,
|
||||||
ignoreUnexpectedChildShards, scope, shards);
|
ignoreUnexpectedChildShards, scope, latestShards);
|
||||||
}
|
}
|
||||||
|
|
||||||
//Provide a pre-collcted list of shards to avoid calling ListShards API
|
//Provide a pre-collcted list of shards to avoid calling ListShards API
|
||||||
public synchronized void checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector,
|
public synchronized void checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector,
|
||||||
final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition, final boolean cleanupLeasesOfCompletedShards,
|
final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition, final boolean cleanupLeasesOfCompletedShards,
|
||||||
final boolean ignoreUnexpectedChildShards, final MetricsScope scope, List<Shard> shards)throws DependencyException, InvalidStateException,
|
final boolean ignoreUnexpectedChildShards, final MetricsScope scope, List<Shard> latestShards)throws DependencyException, InvalidStateException,
|
||||||
ProvisionedThroughputException, KinesisClientLibIOException {
|
ProvisionedThroughputException, KinesisClientLibIOException {
|
||||||
if (!CollectionUtils.isNullOrEmpty(shards)) {
|
if (!CollectionUtils.isNullOrEmpty(latestShards)) {
|
||||||
log.debug("Num shards: {}", shards.size());
|
log.debug("Num shards: {}", latestShards.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
final Map<String, Shard> shardIdToShardMap = constructShardIdToShardMap(shards);
|
final Map<String, Shard> shardIdToShardMap = constructShardIdToShardMap(latestShards);
|
||||||
final Map<String, Set<String>> shardIdToChildShardIdsMap = constructShardIdToChildShardIdsMap(
|
final Map<String, Set<String>> shardIdToChildShardIdsMap = constructShardIdToChildShardIdsMap(
|
||||||
shardIdToShardMap);
|
shardIdToShardMap);
|
||||||
final Set<String> inconsistentShardIds = findInconsistentShardIds(shardIdToChildShardIdsMap, shardIdToShardMap);
|
final Set<String> inconsistentShardIds = findInconsistentShardIds(shardIdToChildShardIdsMap, shardIdToShardMap);
|
||||||
|
|
@ -102,7 +102,7 @@ public class HierarchicalShardSyncer {
|
||||||
|
|
||||||
final List<Lease> currentLeases = leaseRefresher.listLeases();
|
final List<Lease> currentLeases = leaseRefresher.listLeases();
|
||||||
|
|
||||||
final List<Lease> newLeasesToCreate = determineNewLeasesToCreate(shards, currentLeases, initialPosition, inconsistentShardIds);
|
final List<Lease> newLeasesToCreate = determineNewLeasesToCreate(latestShards, currentLeases, initialPosition, inconsistentShardIds);
|
||||||
log.debug("Num new leases to create: {}", newLeasesToCreate.size());
|
log.debug("Num new leases to create: {}", newLeasesToCreate.size());
|
||||||
for (Lease lease : newLeasesToCreate) {
|
for (Lease lease : newLeasesToCreate) {
|
||||||
long startTime = System.currentTimeMillis();
|
long startTime = System.currentTimeMillis();
|
||||||
|
|
@ -116,7 +116,7 @@ public class HierarchicalShardSyncer {
|
||||||
}
|
}
|
||||||
final List<Lease> trackedLeases = new ArrayList<>(currentLeases);
|
final List<Lease> trackedLeases = new ArrayList<>(currentLeases);
|
||||||
trackedLeases.addAll(newLeasesToCreate);
|
trackedLeases.addAll(newLeasesToCreate);
|
||||||
cleanupGarbageLeases(shardDetector, shards, trackedLeases, leaseRefresher);
|
cleanupGarbageLeases(shardDetector, latestShards, trackedLeases, leaseRefresher);
|
||||||
if (cleanupLeasesOfCompletedShards) {
|
if (cleanupLeasesOfCompletedShards) {
|
||||||
cleanupLeasesOfFinishedShards(currentLeases, shardIdToShardMap, shardIdToChildShardIdsMap, trackedLeases, leaseRefresher);
|
cleanupLeasesOfFinishedShards(currentLeases, shardIdToShardMap, shardIdToChildShardIdsMap, trackedLeases, leaseRefresher);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -99,19 +99,20 @@ public class ShutdownTask implements ConsumerTask {
|
||||||
try {
|
try {
|
||||||
try {
|
try {
|
||||||
ShutdownReason localReason = reason;
|
ShutdownReason localReason = reason;
|
||||||
List<Shard> allShards = new ArrayList<>();
|
List<Shard> latestShards = null;
|
||||||
/*
|
/*
|
||||||
* Revalidate if the current shard is closed before shutting down the shard consumer with reason SHARD_END
|
* Revalidate if the current shard is closed before shutting down the shard consumer with reason SHARD_END
|
||||||
* If current shard is not closed, shut down the shard consumer with reason LEASE_LOST that allows active
|
* If current shard is not closed, shut down the shard consumer with reason LEASE_LOST that allows
|
||||||
* workers to contend for the lease of this shard.
|
* active workers to contend for the lease of this shard.
|
||||||
*/
|
*/
|
||||||
if (localReason == ShutdownReason.SHARD_END) {
|
if (localReason == ShutdownReason.SHARD_END) {
|
||||||
allShards = shardDetector.listShards();
|
latestShards = shardDetector.listShards();
|
||||||
|
|
||||||
if (!CollectionUtils.isNullOrEmpty(allShards) && !validateShardEnd(allShards)) {
|
//If latestShards is empty, should also shutdown the ShardConsumer without checkpoint with SHARD_END
|
||||||
|
if (CollectionUtils.isNullOrEmpty(latestShards) || !isShardInContextParentOfAny(latestShards)) {
|
||||||
localReason = ShutdownReason.LEASE_LOST;
|
localReason = ShutdownReason.LEASE_LOST;
|
||||||
dropLease();
|
dropLease();
|
||||||
log.info("Force the lease to be lost before shutting down the consumer for Shard: " + shardInfo.shardId());
|
log.info("Forcing the lease to be lost before shutting down the consumer for Shard: " + shardInfo.shardId());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -154,7 +155,7 @@ public class ShutdownTask implements ConsumerTask {
|
||||||
log.debug("Looking for child shards of shard {}", shardInfo.shardId());
|
log.debug("Looking for child shards of shard {}", shardInfo.shardId());
|
||||||
// create leases for the child shards
|
// create leases for the child shards
|
||||||
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseCoordinator.leaseRefresher(),
|
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseCoordinator.leaseRefresher(),
|
||||||
initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, allShards);
|
initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, latestShards);
|
||||||
log.debug("Finished checking for child shards of shard {}", shardInfo.shardId());
|
log.debug("Finished checking for child shards of shard {}", shardInfo.shardId());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -196,30 +197,25 @@ public class ShutdownTask implements ConsumerTask {
|
||||||
return reason;
|
return reason;
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean validateShardEnd(List<Shard> shards) {
|
private boolean isShardInContextParentOfAny(List<Shard> shards) {
|
||||||
for(Shard shard : shards) {
|
for(Shard shard : shards) {
|
||||||
if (isChildShardOfCurrentShard(shard)) {
|
if (isChildShardOfShardInContext(shard)) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isChildShardOfCurrentShard(Shard shard) {
|
private boolean isChildShardOfShardInContext(Shard shard) {
|
||||||
return (StringUtils.equals(shard.parentShardId(), shardInfo.shardId())
|
return (StringUtils.equals(shard.parentShardId(), shardInfo.shardId())
|
||||||
|| StringUtils.equals(shard.adjacentParentShardId(), shardInfo.shardId()));
|
|| StringUtils.equals(shard.adjacentParentShardId(), shardInfo.shardId()));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void dropLease() {
|
private void dropLease() {
|
||||||
Collection<Lease> leases = leaseCoordinator.getAssignments();
|
Lease currentLease = leaseCoordinator.getCurrentlyHeldLease(shardInfo.shardId());
|
||||||
if(leases != null && !leases.isEmpty()) {
|
leaseCoordinator.dropLease(currentLease);
|
||||||
for(Lease lease : leases) {
|
if(currentLease != null) {
|
||||||
if(lease.leaseKey().equals(shardInfo.shardId())) {
|
log.warn("Dropped lease for shutting down ShardConsumer: " + currentLease.leaseKey());
|
||||||
leaseCoordinator.dropLease(lease);
|
|
||||||
log.warn("Dropped lease for shutting down ShardConsumer: " + lease.leaseKey());
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -207,16 +207,16 @@ public class HierarchicalShardSyncerTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCheckAndCreateLeasesForShardsWithShardList() throws Exception {
|
public void testCheckAndCreateLeasesForShardsWithShardList() throws Exception {
|
||||||
final List<Shard> shards = constructShardListForGraphA();
|
final List<Shard> latestShards = constructShardListForGraphA();
|
||||||
|
|
||||||
final ArgumentCaptor<Lease> leaseCaptor = ArgumentCaptor.forClass(Lease.class);
|
final ArgumentCaptor<Lease> leaseCaptor = ArgumentCaptor.forClass(Lease.class);
|
||||||
when(shardDetector.listShards()).thenReturn(shards);
|
when(shardDetector.listShards()).thenReturn(latestShards);
|
||||||
when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList());
|
when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList());
|
||||||
when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true);
|
when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true);
|
||||||
|
|
||||||
hierarchicalShardSyncer
|
hierarchicalShardSyncer
|
||||||
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
|
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
|
||||||
cleanupLeasesOfCompletedShards, false, SCOPE, shards);
|
cleanupLeasesOfCompletedShards, false, SCOPE, latestShards);
|
||||||
|
|
||||||
final Set<String> expectedShardIds = new HashSet<>(
|
final Set<String> expectedShardIds = new HashSet<>(
|
||||||
Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10"));
|
Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10"));
|
||||||
|
|
|
||||||
|
|
@ -42,6 +42,7 @@ import software.amazon.kinesis.common.InitialPositionInStream;
|
||||||
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
|
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
|
||||||
import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException;
|
import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException;
|
||||||
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
|
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
|
||||||
|
import software.amazon.kinesis.leases.Lease;
|
||||||
import software.amazon.kinesis.leases.LeaseCoordinator;
|
import software.amazon.kinesis.leases.LeaseCoordinator;
|
||||||
import software.amazon.kinesis.leases.LeaseRefresher;
|
import software.amazon.kinesis.leases.LeaseRefresher;
|
||||||
import software.amazon.kinesis.leases.ShardDetector;
|
import software.amazon.kinesis.leases.ShardDetector;
|
||||||
|
|
@ -124,8 +125,8 @@ public class ShutdownTaskTest {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public final void testCallWhenSyncingShardsThrows() throws Exception {
|
public final void testCallWhenSyncingShardsThrows() throws Exception {
|
||||||
List<Shard> shards = constructShardListGraphA();
|
List<Shard> latestShards = constructShardListGraphA();
|
||||||
when(shardDetector.listShards()).thenReturn(shards);
|
when(shardDetector.listShards()).thenReturn(latestShards);
|
||||||
when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END);
|
when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END);
|
||||||
when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher);
|
when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher);
|
||||||
|
|
||||||
|
|
@ -134,7 +135,7 @@ public class ShutdownTaskTest {
|
||||||
}).when(hierarchicalShardSyncer)
|
}).when(hierarchicalShardSyncer)
|
||||||
.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, INITIAL_POSITION_TRIM_HORIZON,
|
.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, INITIAL_POSITION_TRIM_HORIZON,
|
||||||
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards,
|
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards,
|
||||||
NULL_METRICS_FACTORY.createMetrics(), shards);
|
NULL_METRICS_FACTORY.createMetrics(), latestShards);
|
||||||
|
|
||||||
final TaskResult result = task.call();
|
final TaskResult result = task.call();
|
||||||
assertNotNull(result.getException());
|
assertNotNull(result.getException());
|
||||||
|
|
@ -188,7 +189,7 @@ public class ShutdownTaskTest {
|
||||||
verify(shardRecordProcessor, never()).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build());
|
verify(shardRecordProcessor, never()).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build());
|
||||||
verify(shardRecordProcessor).leaseLost(LeaseLostInput.builder().build());
|
verify(shardRecordProcessor).leaseLost(LeaseLostInput.builder().build());
|
||||||
verify(shardDetector, times(1)).listShards();
|
verify(shardDetector, times(1)).listShards();
|
||||||
verify(leaseCoordinator).getAssignments();
|
verify(leaseCoordinator).getCurrentlyHeldLease(shardInfo.shardId());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue