Merge pull request #74 from ashwing/ltr_1_fix_for_premature_childshard_creation
Ltr 1 fix for premature childshard creation
This commit is contained in:
commit
4457ebc350
2 changed files with 192 additions and 44 deletions
|
|
@ -17,6 +17,7 @@ package software.amazon.kinesis.lifecycle;
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Objects;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
|
||||||
import lombok.NonNull;
|
import lombok.NonNull;
|
||||||
|
|
@ -24,11 +25,11 @@ import lombok.RequiredArgsConstructor;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import software.amazon.awssdk.services.kinesis.model.ChildShard;
|
import software.amazon.awssdk.services.kinesis.model.ChildShard;
|
||||||
import software.amazon.awssdk.utils.CollectionUtils;
|
import software.amazon.awssdk.utils.CollectionUtils;
|
||||||
import software.amazon.awssdk.utils.Validate;
|
|
||||||
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
|
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
|
||||||
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
|
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
|
||||||
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
|
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
|
||||||
import software.amazon.kinesis.common.StreamIdentifier;
|
import software.amazon.kinesis.common.StreamIdentifier;
|
||||||
|
import software.amazon.kinesis.exceptions.internal.BlockedOnParentShardException;
|
||||||
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
|
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
|
||||||
import software.amazon.kinesis.leases.Lease;
|
import software.amazon.kinesis.leases.Lease;
|
||||||
import software.amazon.kinesis.leases.LeaseCleanupManager;
|
import software.amazon.kinesis.leases.LeaseCleanupManager;
|
||||||
|
|
@ -52,7 +53,6 @@ import software.amazon.kinesis.retrieval.RecordsPublisher;
|
||||||
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
|
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
|
||||||
|
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.UUID;
|
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
|
@ -99,6 +99,7 @@ public class ShutdownTask implements ConsumerTask {
|
||||||
private final LeaseCleanupManager leaseCleanupManager;
|
private final LeaseCleanupManager leaseCleanupManager;
|
||||||
|
|
||||||
private static final Function<ShardInfo, String> leaseKeyProvider = shardInfo -> ShardInfo.getLeaseKey(shardInfo);
|
private static final Function<ShardInfo, String> leaseKeyProvider = shardInfo -> ShardInfo.getLeaseKey(shardInfo);
|
||||||
|
private int retryLeftForValidParentState = 10;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Invokes ShardRecordProcessor shutdown() API.
|
* Invokes ShardRecordProcessor shutdown() API.
|
||||||
|
|
@ -120,26 +121,21 @@ public class ShutdownTask implements ConsumerTask {
|
||||||
|
|
||||||
final long startTime = System.currentTimeMillis();
|
final long startTime = System.currentTimeMillis();
|
||||||
final Lease currentShardLease = leaseCoordinator.getCurrentlyHeldLease(leaseKeyProvider.apply(shardInfo));
|
final Lease currentShardLease = leaseCoordinator.getCurrentlyHeldLease(leaseKeyProvider.apply(shardInfo));
|
||||||
final ShutdownReason localReason = attemptPersistingChildShardInfoAndOverrideShutdownReasonOnFailure(reason, currentShardLease);
|
final Runnable leaseLostAction = () -> shardRecordProcessor.leaseLost(LeaseLostInput.builder().build());
|
||||||
|
|
||||||
if (localReason == ShutdownReason.SHARD_END) {
|
if (reason == ShutdownReason.SHARD_END) {
|
||||||
final LeasePendingDeletion leasePendingDeletion = new LeasePendingDeletion(streamIdentifier, currentShardLease, shardInfo);
|
try {
|
||||||
if (!leaseCleanupManager.isEnqueuedForDeletion(leasePendingDeletion)) {
|
takeShardEndAction(currentShardLease, scope, startTime);
|
||||||
boolean isSuccess = false;
|
} catch (InvalidStateException e) {
|
||||||
try {
|
// If InvalidStateException happens, it indicates we have a non recoverable error in short term.
|
||||||
isSuccess = attemptShardEndCheckpointing(scope, startTime);
|
// In this scenario, we should shutdown the shardConsumer with LEASE_LOST reason to allow other worker to take the lease and retry shutting down.
|
||||||
} finally {
|
log.warn("Lease {}: Invalid state encountered while shutting down shardConsumer with SHARD_END reason. " +
|
||||||
// Check if either the shard end ddb persist is successful or
|
"Dropping the lease and shutting down shardConsumer using LEASE_LOST reason. ", currentShardLease.leaseKey(), e);
|
||||||
// if childshards is empty. When child shards is empty then either it is due to
|
dropLease(currentShardLease);
|
||||||
// completed shard being reprocessed or we got RNF from service.
|
throwOnApplicationException(leaseLostAction, scope, startTime);
|
||||||
// For these cases enqueue the lease for deletion.
|
|
||||||
if (isSuccess || CollectionUtils.isNullOrEmpty(childShards)) {
|
|
||||||
leaseCleanupManager.enqueueForDeletion(leasePendingDeletion);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
throwOnApplicationException(() -> shardRecordProcessor.leaseLost(LeaseLostInput.builder().build()), scope, startTime);
|
throwOnApplicationException(leaseLostAction, scope, startTime);
|
||||||
}
|
}
|
||||||
|
|
||||||
log.debug("Shutting down retrieval strategy for shard {}.", leaseKeyProvider.apply(shardInfo));
|
log.debug("Shutting down retrieval strategy for shard {}.", leaseKeyProvider.apply(shardInfo));
|
||||||
|
|
@ -168,41 +164,47 @@ public class ShutdownTask implements ConsumerTask {
|
||||||
return new TaskResult(exception);
|
return new TaskResult(exception);
|
||||||
}
|
}
|
||||||
|
|
||||||
private ShutdownReason attemptPersistingChildShardInfoAndOverrideShutdownReasonOnFailure(ShutdownReason originalReason, Lease currentShardLease)
|
// Involves persisting child shard info, attempt to checkpoint and enqueueing lease for cleanup.
|
||||||
throws DependencyException, ProvisionedThroughputException {
|
private void takeShardEndAction(Lease currentShardLease,
|
||||||
ShutdownReason localReason = originalReason;
|
MetricsScope scope, long startTime)
|
||||||
if (originalReason == ShutdownReason.SHARD_END) {
|
throws DependencyException, ProvisionedThroughputException, InvalidStateException,
|
||||||
// Create new lease for the child shards if they don't exist.
|
CustomerApplicationException {
|
||||||
// We have one valid scenario that shutdown task got created with SHARD_END reason and an empty list of childShards.
|
// Create new lease for the child shards if they don't exist.
|
||||||
// This would happen when KinesisDataFetcher(for polling mode) or FanOutRecordsPublisher(for StoS mode) catches ResourceNotFound exception.
|
// We have one valid scenario that shutdown task got created with SHARD_END reason and an empty list of childShards.
|
||||||
// In this case, KinesisDataFetcher and FanOutRecordsPublisher will send out SHARD_END signal to trigger a shutdown task with empty list of childShards.
|
// This would happen when KinesisDataFetcher(for polling mode) or FanOutRecordsPublisher(for StoS mode) catches ResourceNotFound exception.
|
||||||
// This scenario could happen when customer deletes the stream while leaving the KCL application running.
|
// In this case, KinesisDataFetcher and FanOutRecordsPublisher will send out SHARD_END signal to trigger a shutdown task with empty list of childShards.
|
||||||
Validate.validState(currentShardLease != null,
|
// This scenario could happen when customer deletes the stream while leaving the KCL application running.
|
||||||
"%s : Lease not owned by the current worker. Leaving ShardEnd handling to new owner.",
|
if (currentShardLease == null) {
|
||||||
leaseKeyProvider.apply(shardInfo));
|
throw new InvalidStateException(leaseKeyProvider.apply(shardInfo)
|
||||||
|
+ " : Lease not owned by the current worker. Leaving ShardEnd handling to new owner.");
|
||||||
|
}
|
||||||
|
if (!CollectionUtils.isNullOrEmpty(childShards)) {
|
||||||
|
createLeasesForChildShardsIfNotExist();
|
||||||
|
updateLeaseWithChildShards(currentShardLease);
|
||||||
|
}
|
||||||
|
final LeasePendingDeletion leasePendingDeletion = new LeasePendingDeletion(streamIdentifier, currentShardLease,
|
||||||
|
shardInfo);
|
||||||
|
if (!leaseCleanupManager.isEnqueuedForDeletion(leasePendingDeletion)) {
|
||||||
|
boolean isSuccess = false;
|
||||||
try {
|
try {
|
||||||
if (!CollectionUtils.isNullOrEmpty(childShards)) {
|
isSuccess = attemptShardEndCheckpointing(scope, startTime);
|
||||||
createLeasesForChildShardsIfNotExist();
|
} finally {
|
||||||
updateLeaseWithChildShards(currentShardLease);
|
// Check if either the shard end ddb persist is successful or
|
||||||
|
// if childshards is empty. When child shards is empty then either it is due to
|
||||||
|
// completed shard being reprocessed or we got RNF from service.
|
||||||
|
// For these cases enqueue the lease for deletion.
|
||||||
|
if (isSuccess || CollectionUtils.isNullOrEmpty(childShards)) {
|
||||||
|
leaseCleanupManager.enqueueForDeletion(leasePendingDeletion);
|
||||||
}
|
}
|
||||||
} catch (InvalidStateException e) {
|
|
||||||
// If InvalidStateException happens, it indicates we are missing childShard related information.
|
|
||||||
// In this scenario, we should shutdown the shardConsumer with LEASE_LOST reason to allow other worker to take the lease and retry getting
|
|
||||||
// childShard information in the processTask.
|
|
||||||
localReason = ShutdownReason.LEASE_LOST;
|
|
||||||
log.warn("Lease {}: Exception happened while shutting down shardConsumer with SHARD_END reason. " +
|
|
||||||
"Dropping the lease and shutting down shardConsumer using LEASE_LOST reason. Exception: ", currentShardLease.leaseKey(), e);
|
|
||||||
dropLease(currentShardLease);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return localReason;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean attemptShardEndCheckpointing(MetricsScope scope, long startTime)
|
private boolean attemptShardEndCheckpointing(MetricsScope scope, long startTime)
|
||||||
throws DependencyException, ProvisionedThroughputException, InvalidStateException,
|
throws DependencyException, ProvisionedThroughputException, InvalidStateException,
|
||||||
CustomerApplicationException {
|
CustomerApplicationException {
|
||||||
final Lease leaseFromDdb = Optional.ofNullable(leaseCoordinator.leaseRefresher().getLease(leaseKeyProvider.apply(shardInfo)))
|
final Lease leaseFromDdb = Optional.ofNullable(leaseCoordinator.leaseRefresher().getLease(leaseKeyProvider.apply(shardInfo)))
|
||||||
.orElseThrow(() -> new IllegalStateException("Lease for shard " + leaseKeyProvider.apply(shardInfo) + " does not exist."));
|
.orElseThrow(() -> new InvalidStateException("Lease for shard " + leaseKeyProvider.apply(shardInfo) + " does not exist."));
|
||||||
if (!leaseFromDdb.checkpoint().equals(ExtendedSequenceNumber.SHARD_END)) {
|
if (!leaseFromDdb.checkpoint().equals(ExtendedSequenceNumber.SHARD_END)) {
|
||||||
// Call the shardRecordsProcessor to checkpoint with SHARD_END sequence number.
|
// Call the shardRecordsProcessor to checkpoint with SHARD_END sequence number.
|
||||||
// The shardEnded is implemented by customer. We should validate if the SHARD_END checkpointing is successful after calling shardEnded.
|
// The shardEnded is implemented by customer. We should validate if the SHARD_END checkpointing is successful after calling shardEnded.
|
||||||
|
|
@ -237,6 +239,33 @@ public class ShutdownTask implements ConsumerTask {
|
||||||
|
|
||||||
private void createLeasesForChildShardsIfNotExist()
|
private void createLeasesForChildShardsIfNotExist()
|
||||||
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
|
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
|
||||||
|
// For child shard resulted from merge of two parent shards, verify if both the parents are either present or
|
||||||
|
// not present in the lease table before creating the lease entry.
|
||||||
|
if (!CollectionUtils.isNullOrEmpty(childShards) && childShards.size() == 1) {
|
||||||
|
final ChildShard childShard = childShards.get(0);
|
||||||
|
final List<String> parentLeaseKeys = childShard.parentShards().stream()
|
||||||
|
.map(parentShardId -> ShardInfo.getLeaseKey(shardInfo, parentShardId)).collect(Collectors.toList());
|
||||||
|
if (parentLeaseKeys.size() != 2) {
|
||||||
|
throw new InvalidStateException("Shard " + shardInfo.shardId() + "'s only child shard " + childShard
|
||||||
|
+ " does not contain other parent information.");
|
||||||
|
} else {
|
||||||
|
boolean isValidLeaseTableState =
|
||||||
|
Objects.isNull(leaseCoordinator.leaseRefresher().getLease(parentLeaseKeys.get(0))) == Objects
|
||||||
|
.isNull(leaseCoordinator.leaseRefresher().getLease(parentLeaseKeys.get(1)));
|
||||||
|
if (!isValidLeaseTableState) {
|
||||||
|
if (--retryLeftForValidParentState >= 0) {
|
||||||
|
throw new BlockedOnParentShardException(
|
||||||
|
"Shard " + shardInfo.shardId() + "'s only child shard " + childShard
|
||||||
|
+ " has partial parent information in lease table. Hence deferring lease creation of child shard.");
|
||||||
|
} else {
|
||||||
|
throw new InvalidStateException(
|
||||||
|
"Shard " + shardInfo.shardId() + "'s only child shard " + childShard
|
||||||
|
+ " has partial parent information in lease table. Hence deferring lease creation of child shard.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Attempt create leases for child shards.
|
||||||
for(ChildShard childShard : childShards) {
|
for(ChildShard childShard : childShards) {
|
||||||
final String leaseKey = ShardInfo.getLeaseKey(shardInfo, childShard.shardId());
|
final String leaseKey = ShardInfo.getLeaseKey(shardInfo, childShard.shardId());
|
||||||
if(leaseCoordinator.leaseRefresher().getLease(leaseKey) == null) {
|
if(leaseCoordinator.leaseRefresher().getLease(leaseKey) == null) {
|
||||||
|
|
|
||||||
|
|
@ -17,6 +17,7 @@ package software.amazon.kinesis.lifecycle;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
|
import static org.junit.Assert.assertThat;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.mockito.Matchers.any;
|
import static org.mockito.Matchers.any;
|
||||||
import static org.mockito.Mockito.doNothing;
|
import static org.mockito.Mockito.doNothing;
|
||||||
|
|
@ -30,6 +31,7 @@ import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
|
|
@ -42,6 +44,7 @@ import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
|
||||||
import software.amazon.kinesis.common.InitialPositionInStream;
|
import software.amazon.kinesis.common.InitialPositionInStream;
|
||||||
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
|
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
|
||||||
import software.amazon.kinesis.common.StreamIdentifier;
|
import software.amazon.kinesis.common.StreamIdentifier;
|
||||||
|
import software.amazon.kinesis.exceptions.internal.BlockedOnParentShardException;
|
||||||
import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException;
|
import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException;
|
||||||
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
|
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
|
||||||
import software.amazon.kinesis.leases.Lease;
|
import software.amazon.kinesis.leases.Lease;
|
||||||
|
|
@ -194,6 +197,108 @@ public class ShutdownTaskTest {
|
||||||
verify(leaseCleanupManager, times(1)).enqueueForDeletion(any(LeasePendingDeletion.class));
|
verify(leaseCleanupManager, times(1)).enqueueForDeletion(any(LeasePendingDeletion.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public final void testCallThrowsUntilParentInfoNotPresentInLease() throws DependencyException, InvalidStateException, ProvisionedThroughputException {
|
||||||
|
shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(),
|
||||||
|
ExtendedSequenceNumber.LATEST);
|
||||||
|
task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer,
|
||||||
|
SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards,
|
||||||
|
ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher,
|
||||||
|
hierarchicalShardSyncer, NULL_METRICS_FACTORY, constructChildShard(), streamIdentifier, leaseCleanupManager);
|
||||||
|
|
||||||
|
when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END);
|
||||||
|
Lease heldLease = LeaseHelper.createLease("shardId-0", "leaseOwner", ImmutableList.of("parent1", "parent2"));
|
||||||
|
Lease parentLease = LeaseHelper.createLease("shardId-1", "leaseOwner", Collections.emptyList());
|
||||||
|
when(leaseCoordinator.getCurrentlyHeldLease("shardId-0")).thenReturn(heldLease);
|
||||||
|
when(leaseCoordinator.getCurrentlyHeldLease("shardId-1"))
|
||||||
|
.thenReturn(null, null, null, null, null, parentLease);
|
||||||
|
when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher);
|
||||||
|
when(leaseCoordinator.updateLease(Matchers.any(Lease.class), Matchers.any(UUID.class), Matchers.anyString(), Matchers.anyString())).thenReturn(true);
|
||||||
|
when(leaseRefresher.getLease("shardId-0")).thenReturn(heldLease);
|
||||||
|
// Return null lease first time to simulate partial parent lease info
|
||||||
|
when(leaseRefresher.getLease("shardId-1"))
|
||||||
|
.thenReturn(null, null, null, null, null, parentLease);
|
||||||
|
|
||||||
|
// Make first 5 attempts with partial parent info in lease table
|
||||||
|
for (int i = 0; i < 5; i++) {
|
||||||
|
TaskResult result = task.call();
|
||||||
|
assertNotNull(result.getException());
|
||||||
|
assertTrue(result.getException() instanceof BlockedOnParentShardException);
|
||||||
|
assertTrue(result.getException().getMessage().contains("has partial parent information in lease table"));
|
||||||
|
verify(recordsPublisher, never()).shutdown();
|
||||||
|
verify(shardRecordProcessor, never())
|
||||||
|
.shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build());
|
||||||
|
verify(shardRecordProcessor, never()).leaseLost(LeaseLostInput.builder().build());
|
||||||
|
verify(leaseCoordinator, never())
|
||||||
|
.updateLease(Matchers.any(Lease.class), Matchers.any(UUID.class), Matchers.anyString(), Matchers.anyString());
|
||||||
|
verify(leaseRefresher, never()).createLeaseIfNotExists(Matchers.any(Lease.class));
|
||||||
|
verify(leaseCoordinator, never()).dropLease(Matchers.any(Lease.class));
|
||||||
|
verify(leaseCleanupManager, never()).enqueueForDeletion(any(LeasePendingDeletion.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
// make next attempt with complete parent info in lease table
|
||||||
|
TaskResult result = task.call();
|
||||||
|
assertNull(result.getException());
|
||||||
|
verify(recordsPublisher).shutdown();
|
||||||
|
verify(shardRecordProcessor).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build());
|
||||||
|
verify(shardRecordProcessor, never()).leaseLost(LeaseLostInput.builder().build());
|
||||||
|
verify(leaseRefresher).updateLeaseWithMetaInfo(Matchers.any(Lease.class), Matchers.any(UpdateField.class));
|
||||||
|
verify(leaseRefresher, times(1)).createLeaseIfNotExists(Matchers.any(Lease.class));
|
||||||
|
verify(leaseCoordinator, never()).dropLease(Matchers.any(Lease.class));
|
||||||
|
verify(leaseCleanupManager, times(1)).enqueueForDeletion(any(LeasePendingDeletion.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public final void testCallTriggersLeaseLossWhenParentInfoNotPresentInLease() throws DependencyException, InvalidStateException, ProvisionedThroughputException {
|
||||||
|
shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(),
|
||||||
|
ExtendedSequenceNumber.LATEST);
|
||||||
|
task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer,
|
||||||
|
SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards,
|
||||||
|
ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher,
|
||||||
|
hierarchicalShardSyncer, NULL_METRICS_FACTORY, constructChildShard(), streamIdentifier, leaseCleanupManager);
|
||||||
|
|
||||||
|
when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END);
|
||||||
|
Lease heldLease = LeaseHelper.createLease("shardId-0", "leaseOwner", ImmutableList.of("parent1", "parent2"));
|
||||||
|
Lease parentLease = LeaseHelper.createLease("shardId-1", "leaseOwner", Collections.emptyList());
|
||||||
|
when(leaseCoordinator.getCurrentlyHeldLease("shardId-0")).thenReturn(heldLease);
|
||||||
|
when(leaseCoordinator.getCurrentlyHeldLease("shardId-1"))
|
||||||
|
.thenReturn(null, null, null, null, null, null, null, null, null, null, null);
|
||||||
|
when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher);
|
||||||
|
when(leaseCoordinator.updateLease(Matchers.any(Lease.class), Matchers.any(UUID.class), Matchers.anyString(), Matchers.anyString())).thenReturn(true);
|
||||||
|
when(leaseRefresher.getLease("shardId-0")).thenReturn(heldLease);
|
||||||
|
// Return null lease first time to simulate partial parent lease info
|
||||||
|
when(leaseRefresher.getLease("shardId-1"))
|
||||||
|
.thenReturn(null, null, null, null, null, null, null, null, null, null, null);
|
||||||
|
|
||||||
|
// Make first 10 attempts with partial parent info in lease table
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
TaskResult result = task.call();
|
||||||
|
assertNotNull(result.getException());
|
||||||
|
assertTrue(result.getException() instanceof BlockedOnParentShardException);
|
||||||
|
assertTrue(result.getException().getMessage().contains("has partial parent information in lease table"));
|
||||||
|
verify(recordsPublisher, never()).shutdown();
|
||||||
|
verify(shardRecordProcessor, never())
|
||||||
|
.shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build());
|
||||||
|
verify(shardRecordProcessor, never()).leaseLost(LeaseLostInput.builder().build());
|
||||||
|
verify(leaseCoordinator, never())
|
||||||
|
.updateLease(Matchers.any(Lease.class), Matchers.any(UUID.class), Matchers.anyString(), Matchers.anyString());
|
||||||
|
verify(leaseRefresher, never()).createLeaseIfNotExists(Matchers.any(Lease.class));
|
||||||
|
verify(leaseCoordinator, never()).dropLease(Matchers.any(Lease.class));
|
||||||
|
verify(leaseCleanupManager, never()).enqueueForDeletion(any(LeasePendingDeletion.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
// make final attempt with incomplete parent info in lease table
|
||||||
|
TaskResult result = task.call();
|
||||||
|
assertNull(result.getException());
|
||||||
|
verify(recordsPublisher).shutdown();
|
||||||
|
verify(shardRecordProcessor, never()).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build());
|
||||||
|
verify(shardRecordProcessor).leaseLost(LeaseLostInput.builder().build());
|
||||||
|
verify(leaseRefresher, never()).updateLeaseWithMetaInfo(Matchers.any(Lease.class), Matchers.any(UpdateField.class));
|
||||||
|
verify(leaseRefresher, never()).createLeaseIfNotExists(Matchers.any(Lease.class));
|
||||||
|
verify(leaseCoordinator).dropLease(Matchers.any(Lease.class));
|
||||||
|
verify(leaseCleanupManager, never()).enqueueForDeletion(any(LeasePendingDeletion.class));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test method for {@link ShutdownTask#call()}.
|
* Test method for {@link ShutdownTask#call()}.
|
||||||
* This test is for the scenario that a ShutdownTask is created for detecting a false Shard End.
|
* This test is for the scenario that a ShutdownTask is created for detecting a false Shard End.
|
||||||
|
|
@ -269,4 +374,18 @@ public class ShutdownTaskTest {
|
||||||
childShards.add(rightChild);
|
childShards.add(rightChild);
|
||||||
return childShards;
|
return childShards;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private List<ChildShard> constructChildShard() {
|
||||||
|
List<ChildShard> childShards = new ArrayList<>();
|
||||||
|
List<String> parentShards = new ArrayList<>();
|
||||||
|
parentShards.add(shardId);
|
||||||
|
parentShards.add("shardId-1");
|
||||||
|
ChildShard leftChild = ChildShard.builder()
|
||||||
|
.shardId("shardId-2")
|
||||||
|
.parentShards(parentShards)
|
||||||
|
.hashKeyRange(ShardObjectHelper.newHashKeyRange("0", "49"))
|
||||||
|
.build();
|
||||||
|
childShards.add(leftChild);
|
||||||
|
return childShards;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue