refactoring shutdownTask

This commit is contained in:
Chunxue Yang 2020-07-27 18:18:32 -07:00
parent 5f7d4b3bc6
commit c3b41c3b55
6 changed files with 197 additions and 112 deletions

View file

@ -532,7 +532,8 @@ class ConsumerStates {
consumer.getTaskBackoffTimeMillis(), consumer.getTaskBackoffTimeMillis(),
consumer.getGetRecordsCache(), consumer.getShardSyncer(), consumer.getGetRecordsCache(), consumer.getShardSyncer(),
consumer.getShardSyncStrategy(), consumer.getChildShards(), consumer.getShardSyncStrategy(), consumer.getChildShards(),
consumer.getLeaseCleanupManager()); consumer.getLeaseCleanupManager(),
consumer.getMetricsFactory());
} }
@Override @Override

View file

@ -54,6 +54,7 @@ class ShardConsumer {
private final ExecutorService executorService; private final ExecutorService executorService;
private final ShardInfo shardInfo; private final ShardInfo shardInfo;
private final KinesisDataFetcher dataFetcher; private final KinesisDataFetcher dataFetcher;
@Getter
private final IMetricsFactory metricsFactory; private final IMetricsFactory metricsFactory;
private final KinesisClientLibLeaseCoordinator leaseCoordinator; private final KinesisClientLibLeaseCoordinator leaseCoordinator;
private ICheckpoint checkpoint; private ICheckpoint checkpoint;
@ -221,7 +222,6 @@ class ShardConsumer {
* @param maxGetRecordsThreadPool max number of threads in the getRecords thread pool * @param maxGetRecordsThreadPool max number of threads in the getRecords thread pool
* @param config Kinesis library configuration * @param config Kinesis library configuration
* @param shardSyncer shardSyncer instance used to check and create new leases * @param shardSyncer shardSyncer instance used to check and create new leases
* @param leaseCleanupManager used to clean up leases in lease table.
*/ */
@Deprecated @Deprecated
ShardConsumer(ShardInfo shardInfo, ShardConsumer(ShardInfo shardInfo,

View file

@ -16,11 +16,13 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import com.amazonaws.services.kinesis.leases.LeasePendingDeletion; import com.amazonaws.services.kinesis.leases.LeasePendingDeletion;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.BlockedOnParentShardException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.BlockedOnParentShardException;
import com.amazonaws.services.kinesis.leases.exceptions.CustomerApplicationException;
import com.amazonaws.services.kinesis.leases.exceptions.DependencyException; import com.amazonaws.services.kinesis.leases.exceptions.DependencyException;
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException; import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException;
import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager; import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager;
import com.amazonaws.services.kinesis.leases.impl.UpdateField; import com.amazonaws.services.kinesis.leases.impl.UpdateField;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
import com.amazonaws.services.kinesis.model.ChildShard; import com.amazonaws.services.kinesis.model.ChildShard;
import com.amazonaws.util.CollectionUtils; import com.amazonaws.util.CollectionUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -37,6 +39,7 @@ import com.google.common.annotations.VisibleForTesting;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.Optional;
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -48,6 +51,7 @@ class ShutdownTask implements ITask {
private static final Log LOG = LogFactory.getLog(ShutdownTask.class); private static final Log LOG = LogFactory.getLog(ShutdownTask.class);
private static final String SHUTDOWN_TASK_OPERATION = "ShutdownTask";
private static final String RECORD_PROCESSOR_SHUTDOWN_METRIC = "RecordProcessor.shutdown"; private static final String RECORD_PROCESSOR_SHUTDOWN_METRIC = "RecordProcessor.shutdown";
@VisibleForTesting @VisibleForTesting
static final int RETRY_RANDOM_MAX_RANGE = 10; static final int RETRY_RANDOM_MAX_RANGE = 10;
@ -68,6 +72,7 @@ class ShutdownTask implements ITask {
private final ShardSyncStrategy shardSyncStrategy; private final ShardSyncStrategy shardSyncStrategy;
private final List<ChildShard> childShards; private final List<ChildShard> childShards;
private final LeaseCleanupManager leaseCleanupManager; private final LeaseCleanupManager leaseCleanupManager;
private final IMetricsFactory metricsFactory;
/** /**
* Constructor. * Constructor.
@ -85,7 +90,7 @@ class ShutdownTask implements ITask {
long backoffTimeMillis, long backoffTimeMillis,
GetRecordsCache getRecordsCache, ShardSyncer shardSyncer, GetRecordsCache getRecordsCache, ShardSyncer shardSyncer,
ShardSyncStrategy shardSyncStrategy, List<ChildShard> childShards, ShardSyncStrategy shardSyncStrategy, List<ChildShard> childShards,
LeaseCleanupManager leaseCleanupManager) { LeaseCleanupManager leaseCleanupManager, IMetricsFactory metricsFactory) {
this.shardInfo = shardInfo; this.shardInfo = shardInfo;
this.recordProcessor = recordProcessor; this.recordProcessor = recordProcessor;
this.recordProcessorCheckpointer = recordProcessorCheckpointer; this.recordProcessorCheckpointer = recordProcessorCheckpointer;
@ -101,6 +106,7 @@ class ShutdownTask implements ITask {
this.shardSyncStrategy = shardSyncStrategy; this.shardSyncStrategy = shardSyncStrategy;
this.childShards = childShards; this.childShards = childShards;
this.leaseCleanupManager = leaseCleanupManager; this.leaseCleanupManager = leaseCleanupManager;
this.metricsFactory = metricsFactory;
} }
/* /*
@ -111,110 +117,143 @@ class ShutdownTask implements ITask {
*/ */
@Override @Override
public TaskResult call() { public TaskResult call() {
MetricsHelper.startScope(metricsFactory, SHUTDOWN_TASK_OPERATION);
Exception exception; Exception exception;
boolean applicationException = false;
try { try {
LOG.info("Invoking shutdown() for shard " + shardInfo.getShardId() + ", concurrencyToken: " LOG.info("Invoking shutdown() for shard " + shardInfo.getShardId() + ", concurrencyToken: "
+ shardInfo.getConcurrencyToken() + ", original Shutdown reason: " + reason + ". childShards:" + childShards); + shardInfo.getConcurrencyToken() + ", original Shutdown reason: " + reason + ". childShards:" + childShards);
ShutdownReason localReason = attemptPersistingChildShardInfoAndOverrideShutdownReasonOnFailure(reason);
final ShutdownInput shutdownInput = new ShutdownInput()
.withShutdownReason(localReason)
.withCheckpointer(recordProcessorCheckpointer);
final long recordProcessorStartTimeMillis = System.currentTimeMillis();
try { try {
recordProcessor.shutdown(shutdownInput); final long startTime = System.currentTimeMillis();
ExtendedSequenceNumber lastCheckpointValue = recordProcessorCheckpointer.getLastCheckpointValue(); final KinesisClientLease currentShardLease = leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId());
final Runnable leaseLostAction = () -> takeLeaseLostAction();
final boolean successfullyCheckpointedShardEnd = lastCheckpointValue.equals(ExtendedSequenceNumber.SHARD_END); if (reason == ShutdownReason.TERMINATE) {
try {
if (localReason == ShutdownReason.TERMINATE) { takeShardEndAction(currentShardLease, startTime);
if ((lastCheckpointValue == null) || (!successfullyCheckpointedShardEnd)) { } catch (InvalidStateException e) {
throw new IllegalArgumentException("Application didn't checkpoint at end of shard " // If InvalidStateException happens, it indicates we have a non recoverable error in short term.
+ shardInfo.getShardId() + ". Application must checkpoint upon shutdown. " + // In this scenario, we should shutdown the shardConsumer with ZOMBIE reason to allow other worker to take the lease and retry shutting down.
"See IRecordProcessor.shutdown javadocs for more information."); LOG.warn("Lease " + shardInfo.getShardId() + ": Invalid state encountered while shutting down shardConsumer with TERMINATE reason. " +
} "Dropping the lease and shutting down shardConsumer using ZOMBIE reason. ", e);
dropLease(currentShardLease);
// Check if either the shard end ddb persist is successful or throwOnApplicationException(leaseLostAction, startTime);
// if childshards is empty. When child shards is empty then either it is due to
// completed shard being reprocessed or we got RNF from service.
// For these cases enqueue the lease for deletion.
if (successfullyCheckpointedShardEnd || CollectionUtils.isNullOrEmpty(childShards)) {
final KinesisClientLease currentLease = leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId());
final LeasePendingDeletion leasePendingDeletion = new LeasePendingDeletion(currentLease, shardInfo);
if (!leaseCleanupManager.isEnqueuedForDeletion(leasePendingDeletion)) {
leaseCleanupManager.enqueueForDeletion(leasePendingDeletion);
}
//TODO: Add shard end checkpointing here.
} }
} else {
throwOnApplicationException(leaseLostAction, startTime);
} }
LOG.debug("Shutting down retrieval strategy."); LOG.debug("Shutting down retrieval strategy.");
getRecordsCache.shutdown(); getRecordsCache.shutdown();
LOG.debug("Record processor completed shutdown() for shard " + shardInfo.getShardId()); LOG.debug("Record processor completed shutdown() for shard " + shardInfo.getShardId());
return new TaskResult(null);
} catch (Exception e) { } catch (Exception e) {
applicationException = true; if (e instanceof CustomerApplicationException) {
throw e; LOG.error("Shard " + shardInfo.getShardId() + ": Application exception: ", e);
} finally { } else {
MetricsHelper.addLatency(RECORD_PROCESSOR_SHUTDOWN_METRIC, recordProcessorStartTimeMillis, LOG.error("Shard " + shardInfo.getShardId() + ": Caught exception: ", e);
MetricsLevel.SUMMARY); }
}
return new TaskResult(null); exception = e;
} catch (Exception e) { // backoff if we encounter an exception.
if (applicationException) { try {
LOG.error("Application exception. ", e); Thread.sleep(this.backoffTimeMillis);
} else { } catch (InterruptedException ie) {
LOG.error("Caught exception: ", e); LOG.debug("Interrupted sleep", ie);
} }
exception = e;
// backoff if we encounter an exception.
try {
Thread.sleep(this.backoffTimeMillis);
} catch (InterruptedException ie) {
LOG.debug("Interrupted sleep", ie);
} }
} finally {
MetricsHelper.endScope();
} }
return new TaskResult(exception); return new TaskResult(exception);
} }
private ShutdownReason attemptPersistingChildShardInfoAndOverrideShutdownReasonOnFailure(ShutdownReason originalReason) // Involves persisting child shard info, attempt to checkpoint and enqueueing lease for cleanup.
throws DependencyException, ProvisionedThroughputException { private void takeShardEndAction(KinesisClientLease currentShardLease, long startTime)
ShutdownReason shutdownReason = originalReason; throws InvalidStateException, DependencyException, ProvisionedThroughputException, CustomerApplicationException {
if(originalReason == ShutdownReason.TERMINATE) { // Create new lease for the child shards if they don't exist.
// For TERMINATE shutdown reason, try to create and persist childShard leases before setting checkpoint. // We have one valid scenario that shutdown task got created with SHARD_END reason and an empty list of childShards.
try { // This would happen when KinesisDataFetcher catches ResourceNotFound exception.
final KinesisClientLease currentLease = leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId()); // In this case, KinesisDataFetcher will send out SHARD_END signal to trigger a shutdown task with empty list of childShards.
if (currentLease == null) { // This scenario could happen when customer deletes the stream while leaving the KCL application running.
throw new InvalidStateException(shardInfo.getShardId() if (currentShardLease == null) {
+ " : Lease not owned by the current worker. Leaving ShardEnd handling to new owner."); throw new InvalidStateException("Shard " + shardInfo.getShardId() + ": Lease not owned by the current worker. Leaving ShardEnd handling to new owner.");
} }
if (!CollectionUtils.isNullOrEmpty(childShards)) { if (!CollectionUtils.isNullOrEmpty(childShards)) {
createLeasesForChildShardsIfNotExist(); // If childShards is not empty, create new leases for the childShards and update the current lease with the childShards lease information.
updateCurrentLeaseWithChildShards(currentLease); createLeasesForChildShardsIfNotExist();
recordProcessorCheckpointer.setSequenceNumberAtShardEnd( updateCurrentLeaseWithChildShards(currentShardLease);
recordProcessorCheckpointer.getLargestPermittedCheckpointValue()); } else {
recordProcessorCheckpointer.setLargestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END); LOG.warn("Shard " + shardInfo.getShardId()
} else { + ": Shutting down consumer with SHARD_END reason without creating leases for child shards.");
LOG.warn("Shard " + shardInfo.getShardId() }
+ ": Shutting down consumer with SHARD_END reason without creating leases for child shards."); // Checkpoint with SHARD_END sequence number.
} final LeasePendingDeletion leasePendingDeletion = new LeasePendingDeletion(currentShardLease, shardInfo);
} catch (InvalidStateException e) { if (!leaseCleanupManager.isEnqueuedForDeletion(leasePendingDeletion)) {
// If invalidStateException happens, it indicates we are missing childShard related information. boolean isSuccess = false;
// In this scenario, we should shutdown the shardConsumer with ZOMBIE reason to allow other worker to take the lease and retry getting try {
// childShard information in the processTask. isSuccess = attemptShardEndCheckpointing(startTime);
shutdownReason = ShutdownReason.ZOMBIE; } finally {
dropLease(); // Check if either the shard end ddb persist is successful or
LOG.warn("Shard " + shardInfo.getShardId() + ": Exception happened while shutting down shardConsumer with TERMINATE reason. " + // if childshards is empty. When child shards is empty then either it is due to
"Dropping the lease and shutting down shardConsumer using ZOMBIE reason. Exception: ", e); // completed shard being reprocessed or we got RNF from service.
} // For these cases enqueue the lease for deletion.
if (isSuccess || CollectionUtils.isNullOrEmpty(childShards)) {
leaseCleanupManager.enqueueForDeletion(leasePendingDeletion);
}
}
}
}
private void takeLeaseLostAction() {
final ShutdownInput leaseLostShutdownInput = new ShutdownInput()
.withShutdownReason(ShutdownReason.ZOMBIE)
.withCheckpointer(recordProcessorCheckpointer);
recordProcessor.shutdown(leaseLostShutdownInput);
}
private boolean attemptShardEndCheckpointing(long startTime)
throws DependencyException, ProvisionedThroughputException, InvalidStateException, CustomerApplicationException {
final KinesisClientLease leaseFromDdb = Optional.ofNullable(leaseCoordinator.getLeaseManager().getLease(shardInfo.getShardId()))
.orElseThrow(() -> new InvalidStateException("Lease for shard " + shardInfo.getShardId() + " does not exist."));
if (!leaseFromDdb.getCheckpoint().equals(ExtendedSequenceNumber.SHARD_END)) {
// Call the recordProcessor to checkpoint with SHARD_END sequence number.
// The recordProcessor.shutdown is implemented by customer. We should validate if the SHARD_END checkpointing is successful after calling recordProcessor.shutdown.
throwOnApplicationException(() -> applicationCheckpointAndVerification(), startTime);
}
return true;
}
private void applicationCheckpointAndVerification() {
recordProcessorCheckpointer.setSequenceNumberAtShardEnd(
recordProcessorCheckpointer.getLargestPermittedCheckpointValue());
recordProcessorCheckpointer.setLargestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END);
final ShutdownInput shardEndShutdownInput = new ShutdownInput()
.withShutdownReason(ShutdownReason.TERMINATE)
.withCheckpointer(recordProcessorCheckpointer);
recordProcessor.shutdown(shardEndShutdownInput);
final ExtendedSequenceNumber lastCheckpointValue = recordProcessorCheckpointer.getLastCheckpointValue();
final boolean successfullyCheckpointedShardEnd = lastCheckpointValue.equals(ExtendedSequenceNumber.SHARD_END);
if ((lastCheckpointValue == null) || (!successfullyCheckpointedShardEnd)) {
throw new IllegalArgumentException("Application didn't checkpoint at end of shard "
+ shardInfo.getShardId() + ". Application must checkpoint upon shutdown. " +
"See IRecordProcessor.shutdown javadocs for more information.");
}
}
private void throwOnApplicationException(Runnable action, long startTime) throws CustomerApplicationException {
try {
action.run();
} catch (Exception e) {
throw new CustomerApplicationException("Customer application throws exception for shard " + shardInfo.getShardId(), e);
} finally {
MetricsHelper.addLatency(RECORD_PROCESSOR_SHUTDOWN_METRIC, startTime, MetricsLevel.SUMMARY);
} }
return shutdownReason;
} }
private void createLeasesForChildShardsIfNotExist() throws InvalidStateException, DependencyException, ProvisionedThroughputException { private void createLeasesForChildShardsIfNotExist() throws InvalidStateException, DependencyException, ProvisionedThroughputException {
@ -285,13 +324,12 @@ class ShutdownTask implements ITask {
return reason; return reason;
} }
private void dropLease() { private void dropLease(KinesisClientLease currentShardLease) {
KinesisClientLease lease = leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId()); if (currentShardLease == null) {
if (lease == null) { LOG.warn("Shard " + shardInfo.getShardId() + ": Unable to find the lease for shard. Will shutdown the shardConsumer directly.");
LOG.warn("Shard " + shardInfo.getShardId() + ": Lease already dropped. Will shutdown the shardConsumer directly.");
return; return;
} }
leaseCoordinator.dropLease(lease); leaseCoordinator.dropLease(currentShardLease);
LOG.warn("Dropped lease for shutting down ShardConsumer: " + lease.getLeaseKey()); LOG.warn("Dropped lease for shutting down ShardConsumer: " + currentShardLease.getLeaseKey());
} }
} }

View file

@ -0,0 +1,24 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.amazonaws.services.kinesis.leases.exceptions;
public class CustomerApplicationException extends Exception {
public CustomerApplicationException(Throwable t) {super(t);}
public CustomerApplicationException(String message, Throwable t) {super(message, t);}
public CustomerApplicationException(String message) {super(message);}
}

View file

@ -583,7 +583,12 @@ public class ShardConsumerTest {
.thenReturn(getRecordsCache); .thenReturn(getRecordsCache);
when(leaseManager.getLease(anyString())).thenReturn(null); when(leaseManager.getLease(anyString())).thenReturn(null);
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId())).thenReturn(new KinesisClientLease()); List<String> parentShardIds = new ArrayList<>();
parentShardIds.add("parentShardId");
KinesisClientLease currentLease = createLease(streamShardId, "leaseOwner", parentShardIds);
currentLease.setCheckpoint(new ExtendedSequenceNumber("testSequenceNumbeer"));
when(leaseManager.getLease(streamShardId)).thenReturn(currentLease);
when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId())).thenReturn(currentLease);
RecordProcessorCheckpointer recordProcessorCheckpointer = new RecordProcessorCheckpointer( RecordProcessorCheckpointer recordProcessorCheckpointer = new RecordProcessorCheckpointer(
shardInfo, shardInfo,
@ -705,7 +710,11 @@ public class ShardConsumerTest {
final int idleTimeMS = 0; // keep unit tests fast final int idleTimeMS = 0; // keep unit tests fast
ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString()); ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString());
checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.TRIM_HORIZON, testConcurrencyToken); checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.TRIM_HORIZON, testConcurrencyToken);
when(leaseManager.getLease(anyString())).thenReturn(null); List<String> parentShardIds = new ArrayList<>();
parentShardIds.add("parentShardId");
KinesisClientLease currentLease = createLease(streamShardId, "leaseOwner", parentShardIds);
currentLease.setCheckpoint(new ExtendedSequenceNumber("testSequenceNumbeer"));
when(leaseManager.getLease(streamShardId)).thenReturn(currentLease);
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
TransientShutdownErrorTestStreamlet processor = new TransientShutdownErrorTestStreamlet(); TransientShutdownErrorTestStreamlet processor = new TransientShutdownErrorTestStreamlet();
@ -758,11 +767,7 @@ public class ShardConsumerTest {
shardSyncer, shardSyncer,
shardSyncStrategy); shardSyncStrategy);
List<String> parentShardIds = new ArrayList<>(); when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId())).thenReturn(currentLease);
parentShardIds.add(shardInfo.getShardId());
when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId())).thenReturn(createLease(shardInfo.getShardId(),
"leaseOwner",
parentShardIds));
when(leaseCoordinator.updateLease(any(KinesisClientLease.class), any(UUID.class))).thenReturn(true); when(leaseCoordinator.updateLease(any(KinesisClientLease.class), any(UUID.class))).thenReturn(true);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));

View file

@ -38,9 +38,12 @@ import java.util.UUID;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.BlockedOnParentShardException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.BlockedOnParentShardException;
import com.amazonaws.services.kinesis.clientlibrary.proxies.ShardListWrappingShardClosureVerificationResponse; import com.amazonaws.services.kinesis.clientlibrary.proxies.ShardListWrappingShardClosureVerificationResponse;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
import com.amazonaws.services.kinesis.leases.exceptions.CustomerApplicationException;
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.leases.impl.UpdateField; import com.amazonaws.services.kinesis.leases.impl.UpdateField;
import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager; import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager;
import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
import com.amazonaws.services.kinesis.model.ChildShard; import com.amazonaws.services.kinesis.model.ChildShard;
import com.amazonaws.services.kinesis.model.HashKeyRange; import com.amazonaws.services.kinesis.model.HashKeyRange;
import com.amazonaws.services.kinesis.model.SequenceNumberRange; import com.amazonaws.services.kinesis.model.SequenceNumberRange;
@ -81,6 +84,7 @@ public class ShutdownTaskTest {
defaultParentShardIds, defaultParentShardIds,
ExtendedSequenceNumber.LATEST); ExtendedSequenceNumber.LATEST);
ShardSyncer shardSyncer = new KinesisShardSyncer(new KinesisLeaseCleanupValidator()); ShardSyncer shardSyncer = new KinesisShardSyncer(new KinesisLeaseCleanupValidator());
IMetricsFactory metricsFactory = new NullMetricsFactory();
@Mock @Mock
@ -119,8 +123,11 @@ public class ShutdownTaskTest {
public void setUp() throws Exception { public void setUp() throws Exception {
doNothing().when(getRecordsCache).shutdown(); doNothing().when(getRecordsCache).shutdown();
final KinesisClientLease parentLease = createLease(defaultShardId, "leaseOwner", Collections.emptyList()); final KinesisClientLease parentLease = createLease(defaultShardId, "leaseOwner", Collections.emptyList());
parentLease.setCheckpoint(new ExtendedSequenceNumber("3298"));
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
when(leaseCoordinator.getCurrentlyHeldLease(defaultShardId)).thenReturn(parentLease); when(leaseCoordinator.getCurrentlyHeldLease(defaultShardId)).thenReturn(parentLease);
when(leaseManager.getLease(defaultShardId)).thenReturn(parentLease);
} }
/** /**
@ -154,12 +161,12 @@ public class ShutdownTaskTest {
shardSyncer, shardSyncer,
shardSyncStrategy, shardSyncStrategy,
constructSplitChildShards(), constructSplitChildShards(),
leaseCleanupManager); leaseCleanupManager,
metricsFactory);
TaskResult result = task.call(); TaskResult result = task.call();
assertNotNull(result.getException()); assertNotNull(result.getException());
Assert.assertTrue(result.getException() instanceof IllegalArgumentException); Assert.assertTrue(result.getException() instanceof CustomerApplicationException);
final String expectedExceptionMessage = "Application didn't checkpoint at end of shard shardId-0. " + final String expectedExceptionMessage = "Customer application throws exception for shard shardId-0";
"Application must checkpoint upon shutdown. See IRecordProcessor.shutdown javadocs for more information.";
Assert.assertEquals(expectedExceptionMessage, result.getException().getMessage()); Assert.assertEquals(expectedExceptionMessage, result.getException().getMessage());
} }
@ -190,7 +197,8 @@ public class ShutdownTaskTest {
shardSyncer, shardSyncer,
shardSyncStrategy, shardSyncStrategy,
constructSplitChildShards(), constructSplitChildShards(),
leaseCleanupManager); leaseCleanupManager,
metricsFactory);
TaskResult result = task.call(); TaskResult result = task.call();
verify(getRecordsCache).shutdown(); verify(getRecordsCache).shutdown();
verify(leaseCoordinator).dropLease(any(KinesisClientLease.class)); verify(leaseCoordinator).dropLease(any(KinesisClientLease.class));
@ -206,6 +214,7 @@ public class ShutdownTaskTest {
boolean ignoreUnexpectedChildShards = false; boolean ignoreUnexpectedChildShards = false;
KinesisClientLease currentLease = createLease(defaultShardId, "leaseOwner", Collections.emptyList()); KinesisClientLease currentLease = createLease(defaultShardId, "leaseOwner", Collections.emptyList());
currentLease.setCheckpoint(new ExtendedSequenceNumber("3298"));
KinesisClientLease adjacentParentLease = createLease("ShardId-1", "leaseOwner", Collections.emptyList()); KinesisClientLease adjacentParentLease = createLease("ShardId-1", "leaseOwner", Collections.emptyList());
when(leaseCoordinator.getCurrentlyHeldLease(defaultShardId)).thenReturn( currentLease); when(leaseCoordinator.getCurrentlyHeldLease(defaultShardId)).thenReturn( currentLease);
when(leaseManager.getLease(defaultShardId)).thenReturn(currentLease); when(leaseManager.getLease(defaultShardId)).thenReturn(currentLease);
@ -227,7 +236,8 @@ public class ShutdownTaskTest {
shardSyncer, shardSyncer,
shardSyncStrategy, shardSyncStrategy,
constructMergeChildShards(), constructMergeChildShards(),
leaseCleanupManager)); leaseCleanupManager,
metricsFactory));
when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(false); when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(false);
TaskResult result = task.call(); TaskResult result = task.call();
assertNotNull(result.getException()); assertNotNull(result.getException());
@ -253,7 +263,8 @@ public class ShutdownTaskTest {
shardSyncer, shardSyncer,
shardSyncStrategy, shardSyncStrategy,
constructMergeChildShards(), constructMergeChildShards(),
leaseCleanupManager)); leaseCleanupManager,
metricsFactory));
when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(false); when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(false);
TaskResult result = task.call(); TaskResult result = task.call();
assertNull(result.getException()); assertNull(result.getException());
@ -291,7 +302,8 @@ public class ShutdownTaskTest {
shardSyncer, shardSyncer,
shardSyncStrategy, shardSyncStrategy,
constructMergeChildShards(), constructMergeChildShards(),
leaseCleanupManager)); leaseCleanupManager,
metricsFactory));
when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(false); when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(false);
TaskResult result = task.call(); TaskResult result = task.call();
assertNotNull(result.getException()); assertNotNull(result.getException());
@ -316,7 +328,8 @@ public class ShutdownTaskTest {
shardSyncer, shardSyncer,
shardSyncStrategy, shardSyncStrategy,
constructMergeChildShards(), constructMergeChildShards(),
leaseCleanupManager)); leaseCleanupManager,
metricsFactory));
when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(true); when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(true);
TaskResult result = task.call(); TaskResult result = task.call();
assertNull(result.getException()); assertNull(result.getException());
@ -347,7 +360,8 @@ public class ShutdownTaskTest {
shardSyncer, shardSyncer,
shardSyncStrategy, shardSyncStrategy,
constructSplitChildShards(), constructSplitChildShards(),
leaseCleanupManager); leaseCleanupManager,
metricsFactory);
TaskResult result = task.call(); TaskResult result = task.call();
verify(leaseManager, times(2)).createLeaseIfNotExists(any(KinesisClientLease.class)); verify(leaseManager, times(2)).createLeaseIfNotExists(any(KinesisClientLease.class));
verify(leaseManager).updateLeaseWithMetaInfo(any(KinesisClientLease.class), any(UpdateField.class)); verify(leaseManager).updateLeaseWithMetaInfo(any(KinesisClientLease.class), any(UpdateField.class));
@ -381,7 +395,8 @@ public class ShutdownTaskTest {
shardSyncer, shardSyncer,
shardSyncStrategy, shardSyncStrategy,
Collections.emptyList(), Collections.emptyList(),
leaseCleanupManager); leaseCleanupManager,
metricsFactory);
TaskResult result = task.call(); TaskResult result = task.call();
verify(leaseManager, never()).createLeaseIfNotExists(any(KinesisClientLease.class)); verify(leaseManager, never()).createLeaseIfNotExists(any(KinesisClientLease.class));
verify(leaseManager, never()).updateLeaseWithMetaInfo(any(KinesisClientLease.class), any(UpdateField.class)); verify(leaseManager, never()).updateLeaseWithMetaInfo(any(KinesisClientLease.class), any(UpdateField.class));
@ -411,7 +426,8 @@ public class ShutdownTaskTest {
shardSyncer, shardSyncer,
shardSyncStrategy, shardSyncStrategy,
Collections.emptyList(), Collections.emptyList(),
leaseCleanupManager); leaseCleanupManager,
metricsFactory);
TaskResult result = task.call(); TaskResult result = task.call();
verify(leaseManager, never()).createLeaseIfNotExists(any(KinesisClientLease.class)); verify(leaseManager, never()).createLeaseIfNotExists(any(KinesisClientLease.class));
verify(leaseManager, never()).updateLeaseWithMetaInfo(any(KinesisClientLease.class), any(UpdateField.class)); verify(leaseManager, never()).updateLeaseWithMetaInfo(any(KinesisClientLease.class), any(UpdateField.class));
@ -428,7 +444,8 @@ public class ShutdownTaskTest {
ShutdownTask task = new ShutdownTask(null, null, null, null, ShutdownTask task = new ShutdownTask(null, null, null, null,
null, null, false, null, null, false,
false, leaseCoordinator, 0, false, leaseCoordinator, 0,
getRecordsCache, shardSyncer, shardSyncStrategy, Collections.emptyList(), leaseCleanupManager); getRecordsCache, shardSyncer, shardSyncStrategy, Collections.emptyList(),
leaseCleanupManager, metricsFactory);
Assert.assertEquals(TaskType.SHUTDOWN, task.getTaskType()); Assert.assertEquals(TaskType.SHUTDOWN, task.getTaskType());
} }