fix for premature childShard lease creation
This commit is contained in:
parent
4fd63989d3
commit
5bf24bda43
2 changed files with 188 additions and 57 deletions
|
|
@ -14,9 +14,11 @@
|
||||||
*/
|
*/
|
||||||
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||||
|
|
||||||
|
import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.BlockedOnParentShardException;
|
||||||
import com.amazonaws.services.kinesis.leases.exceptions.DependencyException;
|
import com.amazonaws.services.kinesis.leases.exceptions.DependencyException;
|
||||||
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
|
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
|
||||||
import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException;
|
import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException;
|
||||||
|
import com.amazonaws.services.kinesis.leases.impl.UpdateField;
|
||||||
import com.amazonaws.services.kinesis.model.ChildShard;
|
import com.amazonaws.services.kinesis.model.ChildShard;
|
||||||
import com.amazonaws.util.CollectionUtils;
|
import com.amazonaws.util.CollectionUtils;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
|
|
@ -32,8 +34,8 @@ import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Objects;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.UUID;
|
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -44,6 +46,7 @@ class ShutdownTask implements ITask {
|
||||||
private static final Log LOG = LogFactory.getLog(ShutdownTask.class);
|
private static final Log LOG = LogFactory.getLog(ShutdownTask.class);
|
||||||
|
|
||||||
private static final String RECORD_PROCESSOR_SHUTDOWN_METRIC = "RecordProcessor.shutdown";
|
private static final String RECORD_PROCESSOR_SHUTDOWN_METRIC = "RecordProcessor.shutdown";
|
||||||
|
private int retryLeftForValidParentState = 10;
|
||||||
|
|
||||||
private final ShardInfo shardInfo;
|
private final ShardInfo shardInfo;
|
||||||
private final IRecordProcessor recordProcessor;
|
private final IRecordProcessor recordProcessor;
|
||||||
|
|
@ -107,43 +110,8 @@ class ShutdownTask implements ITask {
|
||||||
try {
|
try {
|
||||||
LOG.info("Invoking shutdown() for shard " + shardInfo.getShardId() + ", concurrencyToken: "
|
LOG.info("Invoking shutdown() for shard " + shardInfo.getShardId() + ", concurrencyToken: "
|
||||||
+ shardInfo.getConcurrencyToken() + ", original Shutdown reason: " + reason + ". childShards:" + childShards);
|
+ shardInfo.getConcurrencyToken() + ", original Shutdown reason: " + reason + ". childShards:" + childShards);
|
||||||
ShutdownReason localReason = reason;
|
|
||||||
/*
|
|
||||||
* Revalidate if the current shard is closed before shutting down the shard consumer with reason SHARD_END
|
|
||||||
* If current shard is not closed, shut down the shard consumer with reason LEASE_LOST that allows active
|
|
||||||
* workers to contend for the lease of this shard.
|
|
||||||
*/
|
|
||||||
if(localReason == ShutdownReason.TERMINATE) {
|
|
||||||
// Create new lease for the child shards if they don't exist.
|
|
||||||
// We have one valid scenario that shutdown task got created with SHARD_END reason and an empty list of childShards.
|
|
||||||
// This would happen when KinesisDataFetcher catches ResourceNotFound exception.
|
|
||||||
// In this case, KinesisDataFetcher will send out SHARD_END signal to trigger a shutdown task with empty list of childShards.
|
|
||||||
// This scenario could happen when customer deletes the stream while leaving the KCL application running.
|
|
||||||
try {
|
|
||||||
if (!CollectionUtils.isNullOrEmpty(childShards)) {
|
|
||||||
createLeasesForChildShardsIfNotExist();
|
|
||||||
updateCurrentLeaseWithChildShards();
|
|
||||||
} else {
|
|
||||||
LOG.warn("Shard " + shardInfo.getShardId()
|
|
||||||
+ ": Shutting down consumer with SHARD_END reason without creating leases for child shards.");
|
|
||||||
}
|
|
||||||
} catch (InvalidStateException e) {
|
|
||||||
// If invalidStateException happens, it indicates we are missing childShard related information.
|
|
||||||
// In this scenario, we should shutdown the shardConsumer with ZOMBIE reason to allow other worker to take the lease and retry getting
|
|
||||||
// childShard information in the processTask.
|
|
||||||
localReason = ShutdownReason.ZOMBIE;
|
|
||||||
dropLease();
|
|
||||||
LOG.warn("Shard " + shardInfo.getShardId() + ": Exception happened while shutting down shardConsumer with TERMINATE reason. " +
|
|
||||||
"Dropping the lease and shutting down shardConsumer using ZOMBIE reason. Exception: ", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// If we reached end of the shard, set sequence number to SHARD_END.
|
ShutdownReason localReason = attemptPersistingChildShardInfoAndOverrideShutdownReasonOnFailure(reason);
|
||||||
if (localReason == ShutdownReason.TERMINATE) {
|
|
||||||
recordProcessorCheckpointer.setSequenceNumberAtShardEnd(
|
|
||||||
recordProcessorCheckpointer.getLargestPermittedCheckpointValue());
|
|
||||||
recordProcessorCheckpointer.setLargestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END);
|
|
||||||
}
|
|
||||||
|
|
||||||
final ShutdownInput shutdownInput = new ShutdownInput()
|
final ShutdownInput shutdownInput = new ShutdownInput()
|
||||||
.withShutdownReason(localReason)
|
.withShutdownReason(localReason)
|
||||||
|
|
@ -191,7 +159,67 @@ class ShutdownTask implements ITask {
|
||||||
return new TaskResult(exception);
|
return new TaskResult(exception);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private ShutdownReason attemptPersistingChildShardInfoAndOverrideShutdownReasonOnFailure(ShutdownReason originalReason)
|
||||||
|
throws DependencyException, ProvisionedThroughputException {
|
||||||
|
ShutdownReason shutdownReason = originalReason;
|
||||||
|
if(originalReason == ShutdownReason.TERMINATE) {
|
||||||
|
// For TERMINATE shutdown reason, try to create and persist childShard leases before setting checkpoint.
|
||||||
|
try {
|
||||||
|
final KinesisClientLease currentLease = leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId());
|
||||||
|
if (currentLease == null) {
|
||||||
|
throw new InvalidStateException(shardInfo.getShardId()
|
||||||
|
+ " : Lease not owned by the current worker. Leaving ShardEnd handling to new owner.");
|
||||||
|
}
|
||||||
|
if (!CollectionUtils.isNullOrEmpty(childShards)) {
|
||||||
|
createLeasesForChildShardsIfNotExist();
|
||||||
|
updateCurrentLeaseWithChildShards(currentLease);
|
||||||
|
recordProcessorCheckpointer.setSequenceNumberAtShardEnd(
|
||||||
|
recordProcessorCheckpointer.getLargestPermittedCheckpointValue());
|
||||||
|
recordProcessorCheckpointer.setLargestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END);
|
||||||
|
} else {
|
||||||
|
LOG.warn("Shard " + shardInfo.getShardId()
|
||||||
|
+ ": Shutting down consumer with SHARD_END reason without creating leases for child shards.");
|
||||||
|
}
|
||||||
|
} catch (InvalidStateException e) {
|
||||||
|
// If invalidStateException happens, it indicates we are missing childShard related information.
|
||||||
|
// In this scenario, we should shutdown the shardConsumer with ZOMBIE reason to allow other worker to take the lease and retry getting
|
||||||
|
// childShard information in the processTask.
|
||||||
|
shutdownReason = ShutdownReason.ZOMBIE;
|
||||||
|
dropLease();
|
||||||
|
LOG.warn("Shard " + shardInfo.getShardId() + ": Exception happened while shutting down shardConsumer with TERMINATE reason. " +
|
||||||
|
"Dropping the lease and shutting down shardConsumer using ZOMBIE reason. Exception: ", e);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
return shutdownReason;
|
||||||
|
}
|
||||||
|
|
||||||
private void createLeasesForChildShardsIfNotExist() throws InvalidStateException, DependencyException, ProvisionedThroughputException {
|
private void createLeasesForChildShardsIfNotExist() throws InvalidStateException, DependencyException, ProvisionedThroughputException {
|
||||||
|
// For child shard resulted from merge of two parent shards, verify if both the parents are either present or
|
||||||
|
// not present in the lease table before creating the lease entry.
|
||||||
|
if (!CollectionUtils.isNullOrEmpty(childShards) && childShards.size() == 1) {
|
||||||
|
final ChildShard childShard = childShards.get(0);
|
||||||
|
final List<String> parentLeaseKeys = childShard.getParentShards();
|
||||||
|
|
||||||
|
if (parentLeaseKeys.size() != 2) {
|
||||||
|
throw new InvalidStateException("Shard " + shardInfo.getShardId()+ "'s only child shard " + childShard
|
||||||
|
+ " does not contain other parent information.");
|
||||||
|
} else {
|
||||||
|
boolean isValidLeaseTableState = Objects.isNull(leaseCoordinator.getLeaseManager().getLease(parentLeaseKeys.get(0))) ==
|
||||||
|
Objects.isNull(leaseCoordinator.getLeaseManager().getLease(parentLeaseKeys.get(1)));
|
||||||
|
if (!isValidLeaseTableState) {
|
||||||
|
if(--retryLeftForValidParentState >= 0) {
|
||||||
|
throw new BlockedOnParentShardException(
|
||||||
|
"Shard " + shardInfo.getShardId() + "'s only child shard " + childShard
|
||||||
|
+ " has partial parent information in lease table. Hence deferring lease creation of child shard.");
|
||||||
|
} else {
|
||||||
|
throw new InvalidStateException("Shard " + shardInfo.getShardId() + "'s only child shard " + childShard
|
||||||
|
+ " has partial parent information in lease table.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Attempt create leases for child shards.
|
||||||
for (ChildShard childShard : childShards) {
|
for (ChildShard childShard : childShards) {
|
||||||
final String leaseKey = childShard.getShardId();
|
final String leaseKey = childShard.getShardId();
|
||||||
if (leaseCoordinator.getLeaseManager().getLease(leaseKey) == null) {
|
if (leaseCoordinator.getLeaseManager().getLease(leaseKey) == null) {
|
||||||
|
|
@ -202,18 +230,10 @@ class ShutdownTask implements ITask {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void updateCurrentLeaseWithChildShards() throws DependencyException, InvalidStateException, ProvisionedThroughputException {
|
private void updateCurrentLeaseWithChildShards(KinesisClientLease currentLease) throws DependencyException, InvalidStateException, ProvisionedThroughputException {
|
||||||
final KinesisClientLease currentLease = leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId());
|
|
||||||
if (currentLease == null) {
|
|
||||||
throw new InvalidStateException("Failed to retrieve current lease for shard " + shardInfo.getShardId());
|
|
||||||
}
|
|
||||||
final Set<String> childShardIds = childShards.stream().map(ChildShard::getShardId).collect(Collectors.toSet());
|
final Set<String> childShardIds = childShards.stream().map(ChildShard::getShardId).collect(Collectors.toSet());
|
||||||
|
|
||||||
currentLease.setChildShardIds(childShardIds);
|
currentLease.setChildShardIds(childShardIds);
|
||||||
final boolean updateResult = leaseCoordinator.updateLease(currentLease, UUID.fromString(shardInfo.getConcurrencyToken()));
|
leaseCoordinator.getLeaseManager().updateLeaseWithMetaInfo(currentLease, UpdateField.CHILD_SHARDS);
|
||||||
if (!updateResult) {
|
|
||||||
throw new InvalidStateException("Failed to update parent lease with child shard information for shard " + shardInfo.getShardId());
|
|
||||||
}
|
|
||||||
LOG.info("Shard " + shardInfo.getShardId() + ": Updated current lease with child shard information: " + currentLease.getLeaseKey());
|
LOG.info("Shard " + shardInfo.getShardId() + ": Updated current lease with child shard information: " + currentLease.getLeaseKey());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -14,6 +14,9 @@
|
||||||
*/
|
*/
|
||||||
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||||
|
|
||||||
|
import static junit.framework.TestCase.assertTrue;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.mockito.Matchers.any;
|
import static org.mockito.Matchers.any;
|
||||||
import static org.mockito.Matchers.anyString;
|
import static org.mockito.Matchers.anyString;
|
||||||
import static org.mockito.Mockito.doNothing;
|
import static org.mockito.Mockito.doNothing;
|
||||||
|
|
@ -31,9 +34,11 @@ import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
|
||||||
|
import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.BlockedOnParentShardException;
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.proxies.ShardListWrappingShardClosureVerificationResponse;
|
import com.amazonaws.services.kinesis.clientlibrary.proxies.ShardListWrappingShardClosureVerificationResponse;
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
|
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
|
||||||
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
|
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
|
||||||
|
import com.amazonaws.services.kinesis.leases.impl.UpdateField;
|
||||||
import com.amazonaws.services.kinesis.model.ChildShard;
|
import com.amazonaws.services.kinesis.model.ChildShard;
|
||||||
import com.amazonaws.services.kinesis.model.HashKeyRange;
|
import com.amazonaws.services.kinesis.model.HashKeyRange;
|
||||||
import com.amazonaws.services.kinesis.model.SequenceNumberRange;
|
import com.amazonaws.services.kinesis.model.SequenceNumberRange;
|
||||||
|
|
@ -72,7 +77,6 @@ public class ShutdownTaskTest {
|
||||||
defaultConcurrencyToken,
|
defaultConcurrencyToken,
|
||||||
defaultParentShardIds,
|
defaultParentShardIds,
|
||||||
ExtendedSequenceNumber.LATEST);
|
ExtendedSequenceNumber.LATEST);
|
||||||
IRecordProcessor defaultRecordProcessor = new TestStreamlet();
|
|
||||||
ShardSyncer shardSyncer = new KinesisShardSyncer(new KinesisLeaseCleanupValidator());
|
ShardSyncer shardSyncer = new KinesisShardSyncer(new KinesisLeaseCleanupValidator());
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -86,6 +90,8 @@ public class ShutdownTaskTest {
|
||||||
private ILeaseManager<KinesisClientLease> leaseManager;
|
private ILeaseManager<KinesisClientLease> leaseManager;
|
||||||
@Mock
|
@Mock
|
||||||
private KinesisClientLibLeaseCoordinator leaseCoordinator;
|
private KinesisClientLibLeaseCoordinator leaseCoordinator;
|
||||||
|
@Mock
|
||||||
|
private IRecordProcessor defaultRecordProcessor;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @throws java.lang.Exception
|
* @throws java.lang.Exception
|
||||||
|
|
@ -110,7 +116,6 @@ public class ShutdownTaskTest {
|
||||||
final KinesisClientLease parentLease = createLease(defaultShardId, "leaseOwner", Collections.emptyList());
|
final KinesisClientLease parentLease = createLease(defaultShardId, "leaseOwner", Collections.emptyList());
|
||||||
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
|
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
|
||||||
when(leaseCoordinator.getCurrentlyHeldLease(defaultShardId)).thenReturn(parentLease);
|
when(leaseCoordinator.getCurrentlyHeldLease(defaultShardId)).thenReturn(parentLease);
|
||||||
when(leaseCoordinator.updateLease(any(KinesisClientLease.class), any(UUID.class))).thenReturn(true);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -143,9 +148,9 @@ public class ShutdownTaskTest {
|
||||||
getRecordsCache,
|
getRecordsCache,
|
||||||
shardSyncer,
|
shardSyncer,
|
||||||
shardSyncStrategy,
|
shardSyncStrategy,
|
||||||
constructChildShards());
|
constructSplitChildShards());
|
||||||
TaskResult result = task.call();
|
TaskResult result = task.call();
|
||||||
Assert.assertNotNull(result.getException());
|
assertNotNull(result.getException());
|
||||||
Assert.assertTrue(result.getException() instanceof IllegalArgumentException);
|
Assert.assertTrue(result.getException() instanceof IllegalArgumentException);
|
||||||
final String expectedExceptionMessage = "Application didn't checkpoint at end of shard shardId-0. " +
|
final String expectedExceptionMessage = "Application didn't checkpoint at end of shard shardId-0. " +
|
||||||
"Application must checkpoint upon shutdown. See IRecordProcessor.shutdown javadocs for more information.";
|
"Application must checkpoint upon shutdown. See IRecordProcessor.shutdown javadocs for more information.";
|
||||||
|
|
@ -178,13 +183,104 @@ public class ShutdownTaskTest {
|
||||||
getRecordsCache,
|
getRecordsCache,
|
||||||
shardSyncer,
|
shardSyncer,
|
||||||
shardSyncStrategy,
|
shardSyncStrategy,
|
||||||
constructChildShards());
|
constructSplitChildShards());
|
||||||
TaskResult result = task.call();
|
TaskResult result = task.call();
|
||||||
verify(getRecordsCache).shutdown();
|
verify(getRecordsCache).shutdown();
|
||||||
verify(leaseCoordinator).dropLease(any(KinesisClientLease.class));
|
verify(leaseCoordinator).dropLease(any(KinesisClientLease.class));
|
||||||
Assert.assertNull(result.getException());
|
Assert.assertNull(result.getException());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public final void testCallWhenParentInfoNotPresentInLease() throws Exception {
|
||||||
|
RecordProcessorCheckpointer checkpointer = mock(RecordProcessorCheckpointer.class);
|
||||||
|
when(checkpointer.getLastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END);
|
||||||
|
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
|
||||||
|
boolean cleanupLeasesOfCompletedShards = false;
|
||||||
|
boolean ignoreUnexpectedChildShards = false;
|
||||||
|
|
||||||
|
KinesisClientLease currentLease = createLease(defaultShardId, "leaseOwner", Collections.emptyList());
|
||||||
|
KinesisClientLease adjacentParentLease = createLease("ShardId-1", "leaseOwner", Collections.emptyList());
|
||||||
|
when(leaseCoordinator.getCurrentlyHeldLease(defaultShardId)).thenReturn( currentLease);
|
||||||
|
when(leaseManager.getLease(defaultShardId)).thenReturn(currentLease);
|
||||||
|
when(leaseManager.getLease("ShardId-1")).thenReturn(null, null, null, null, null, adjacentParentLease);
|
||||||
|
|
||||||
|
ShutdownTask task = new ShutdownTask(defaultShardInfo,
|
||||||
|
defaultRecordProcessor,
|
||||||
|
checkpointer,
|
||||||
|
ShutdownReason.TERMINATE,
|
||||||
|
kinesisProxy,
|
||||||
|
INITIAL_POSITION_TRIM_HORIZON,
|
||||||
|
cleanupLeasesOfCompletedShards,
|
||||||
|
ignoreUnexpectedChildShards,
|
||||||
|
leaseCoordinator,
|
||||||
|
TASK_BACKOFF_TIME_MILLIS,
|
||||||
|
getRecordsCache,
|
||||||
|
shardSyncer,
|
||||||
|
shardSyncStrategy,
|
||||||
|
constructMergeChildShards());
|
||||||
|
|
||||||
|
// Make first 5 attempts with partial parent info in lease table
|
||||||
|
for (int i = 0; i < 5; i++) {
|
||||||
|
TaskResult result = task.call();
|
||||||
|
assertNotNull(result.getException());
|
||||||
|
assertTrue(result.getException() instanceof BlockedOnParentShardException);
|
||||||
|
assertTrue(result.getException().getMessage().contains("has partial parent information in lease table"));
|
||||||
|
verify(getRecordsCache, never()).shutdown();
|
||||||
|
verify(defaultRecordProcessor, never()).shutdown(any(ShutdownInput.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make next attempt with complete parent info in lease table
|
||||||
|
TaskResult result = task.call();
|
||||||
|
assertNull(result.getException());
|
||||||
|
verify(getRecordsCache).shutdown();
|
||||||
|
verify(defaultRecordProcessor).shutdown(any(ShutdownInput.class));
|
||||||
|
verify(leaseCoordinator, never()).dropLease(currentLease);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public final void testCallTriggersLeaseLossWhenParentInfoNotPresentInLease() throws Exception {
|
||||||
|
RecordProcessorCheckpointer checkpointer = mock(RecordProcessorCheckpointer.class);
|
||||||
|
when(checkpointer.getLastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END);
|
||||||
|
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
|
||||||
|
boolean cleanupLeasesOfCompletedShards = false;
|
||||||
|
boolean ignoreUnexpectedChildShards = false;
|
||||||
|
|
||||||
|
KinesisClientLease currentLease = createLease(defaultShardId, "leaseOwner", Collections.emptyList());
|
||||||
|
when(leaseCoordinator.getCurrentlyHeldLease(defaultShardId)).thenReturn( currentLease);
|
||||||
|
when(leaseManager.getLease(defaultShardId)).thenReturn(currentLease);
|
||||||
|
when(leaseManager.getLease("ShardId-1")).thenReturn(null, null, null, null, null, null, null, null, null, null, null);
|
||||||
|
|
||||||
|
ShutdownTask task = new ShutdownTask(defaultShardInfo,
|
||||||
|
defaultRecordProcessor,
|
||||||
|
checkpointer,
|
||||||
|
ShutdownReason.TERMINATE,
|
||||||
|
kinesisProxy,
|
||||||
|
INITIAL_POSITION_TRIM_HORIZON,
|
||||||
|
cleanupLeasesOfCompletedShards,
|
||||||
|
ignoreUnexpectedChildShards,
|
||||||
|
leaseCoordinator,
|
||||||
|
TASK_BACKOFF_TIME_MILLIS,
|
||||||
|
getRecordsCache,
|
||||||
|
shardSyncer,
|
||||||
|
shardSyncStrategy,
|
||||||
|
constructMergeChildShards());
|
||||||
|
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
TaskResult result = task.call();
|
||||||
|
assertNotNull(result.getException());
|
||||||
|
assertTrue(result.getException() instanceof BlockedOnParentShardException);
|
||||||
|
assertTrue(result.getException().getMessage().contains("has partial parent information in lease table"));
|
||||||
|
verify(getRecordsCache, never()).shutdown();
|
||||||
|
verify(defaultRecordProcessor, never()).shutdown(any(ShutdownInput.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
TaskResult result = task.call();
|
||||||
|
assertNull(result.getException());
|
||||||
|
verify(getRecordsCache).shutdown();
|
||||||
|
verify(defaultRecordProcessor).shutdown(any(ShutdownInput.class));
|
||||||
|
verify(leaseCoordinator).dropLease(currentLease);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public final void testCallWhenShardEnd() throws Exception {
|
public final void testCallWhenShardEnd() throws Exception {
|
||||||
RecordProcessorCheckpointer checkpointer = mock(RecordProcessorCheckpointer.class);
|
RecordProcessorCheckpointer checkpointer = mock(RecordProcessorCheckpointer.class);
|
||||||
|
|
@ -205,10 +301,10 @@ public class ShutdownTaskTest {
|
||||||
getRecordsCache,
|
getRecordsCache,
|
||||||
shardSyncer,
|
shardSyncer,
|
||||||
shardSyncStrategy,
|
shardSyncStrategy,
|
||||||
constructChildShards());
|
constructSplitChildShards());
|
||||||
TaskResult result = task.call();
|
TaskResult result = task.call();
|
||||||
verify(leaseManager, times(2)).createLeaseIfNotExists(any(KinesisClientLease.class));
|
verify(leaseManager, times(2)).createLeaseIfNotExists(any(KinesisClientLease.class));
|
||||||
verify(leaseCoordinator).updateLease(any(KinesisClientLease.class), any(UUID.class));
|
verify(leaseManager).updateLeaseWithMetaInfo(any(KinesisClientLease.class), any(UpdateField.class));
|
||||||
Assert.assertNull(result.getException());
|
Assert.assertNull(result.getException());
|
||||||
verify(getRecordsCache).shutdown();
|
verify(getRecordsCache).shutdown();
|
||||||
}
|
}
|
||||||
|
|
@ -241,7 +337,7 @@ public class ShutdownTaskTest {
|
||||||
Collections.emptyList());
|
Collections.emptyList());
|
||||||
TaskResult result = task.call();
|
TaskResult result = task.call();
|
||||||
verify(leaseManager, never()).createLeaseIfNotExists(any(KinesisClientLease.class));
|
verify(leaseManager, never()).createLeaseIfNotExists(any(KinesisClientLease.class));
|
||||||
verify(leaseCoordinator, never()).updateLease(any(KinesisClientLease.class), any(UUID.class));
|
verify(leaseManager, never()).updateLeaseWithMetaInfo(any(KinesisClientLease.class), any(UpdateField.class));
|
||||||
Assert.assertNull(result.getException());
|
Assert.assertNull(result.getException());
|
||||||
verify(getRecordsCache).shutdown();
|
verify(getRecordsCache).shutdown();
|
||||||
}
|
}
|
||||||
|
|
@ -270,7 +366,7 @@ public class ShutdownTaskTest {
|
||||||
Collections.emptyList());
|
Collections.emptyList());
|
||||||
TaskResult result = task.call();
|
TaskResult result = task.call();
|
||||||
verify(leaseManager, never()).createLeaseIfNotExists(any(KinesisClientLease.class));
|
verify(leaseManager, never()).createLeaseIfNotExists(any(KinesisClientLease.class));
|
||||||
verify(leaseCoordinator, never()).updateLease(any(KinesisClientLease.class), any(UUID.class));
|
verify(leaseManager, never()).updateLeaseWithMetaInfo(any(KinesisClientLease.class), any(UpdateField.class));
|
||||||
Assert.assertNull(result.getException());
|
Assert.assertNull(result.getException());
|
||||||
verify(getRecordsCache).shutdown();
|
verify(getRecordsCache).shutdown();
|
||||||
}
|
}
|
||||||
|
|
@ -288,7 +384,7 @@ public class ShutdownTaskTest {
|
||||||
Assert.assertEquals(TaskType.SHUTDOWN, task.getTaskType());
|
Assert.assertEquals(TaskType.SHUTDOWN, task.getTaskType());
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<ChildShard> constructChildShards() {
|
private List<ChildShard> constructSplitChildShards() {
|
||||||
List<ChildShard> childShards = new ArrayList<>();
|
List<ChildShard> childShards = new ArrayList<>();
|
||||||
List<String> parentShards = new ArrayList<>();
|
List<String> parentShards = new ArrayList<>();
|
||||||
parentShards.add(defaultShardId);
|
parentShards.add(defaultShardId);
|
||||||
|
|
@ -307,6 +403,21 @@ public class ShutdownTaskTest {
|
||||||
return childShards;
|
return childShards;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private List<ChildShard> constructMergeChildShards() {
|
||||||
|
List<ChildShard> childShards = new ArrayList<>();
|
||||||
|
List<String> parentShards = new ArrayList<>();
|
||||||
|
parentShards.add(defaultShardId);
|
||||||
|
parentShards.add("ShardId-1");
|
||||||
|
|
||||||
|
ChildShard childShard = new ChildShard();
|
||||||
|
childShard.setShardId("ShardId-2");
|
||||||
|
childShard.setParentShards(parentShards);
|
||||||
|
childShard.setHashKeyRange(ShardObjectHelper.newHashKeyRange("0", "99"));
|
||||||
|
childShards.add(childShard);
|
||||||
|
|
||||||
|
return childShards;
|
||||||
|
}
|
||||||
|
|
||||||
private KinesisClientLease createLease(String leaseKey, String leaseOwner, Collection<String> parentShardIds) {
|
private KinesisClientLease createLease(String leaseKey, String leaseOwner, Collection<String> parentShardIds) {
|
||||||
KinesisClientLease lease = new KinesisClientLease();
|
KinesisClientLease lease = new KinesisClientLease();
|
||||||
lease.setLeaseKey(leaseKey);
|
lease.setLeaseKey(leaseKey);
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue