diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java index 5cf55dbf..b5cde2bc 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java @@ -532,7 +532,8 @@ class ConsumerStates { consumer.getTaskBackoffTimeMillis(), consumer.getGetRecordsCache(), consumer.getShardSyncer(), consumer.getShardSyncStrategy(), consumer.getChildShards(), - consumer.getLeaseCleanupManager()); + consumer.getLeaseCleanupManager(), + consumer.getMetricsFactory()); } @Override diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java index 71f8a6bc..10600def 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java @@ -54,6 +54,7 @@ class ShardConsumer { private final ExecutorService executorService; private final ShardInfo shardInfo; private final KinesisDataFetcher dataFetcher; + @Getter private final IMetricsFactory metricsFactory; private final KinesisClientLibLeaseCoordinator leaseCoordinator; private ICheckpoint checkpoint; @@ -221,7 +222,6 @@ class ShardConsumer { * @param maxGetRecordsThreadPool max number of threads in the getRecords thread pool * @param config Kinesis library configuration * @param shardSyncer shardSyncer instance used to check and create new leases - * @param leaseCleanupManager used to clean up leases in lease table. */ @Deprecated ShardConsumer(ShardInfo shardInfo, diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java index d30ec765..a9ec8d76 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java @@ -16,11 +16,13 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import com.amazonaws.services.kinesis.leases.LeasePendingDeletion; import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.BlockedOnParentShardException; +import com.amazonaws.services.kinesis.leases.exceptions.CustomerApplicationException; import com.amazonaws.services.kinesis.leases.exceptions.DependencyException; import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException; import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager; import com.amazonaws.services.kinesis.leases.impl.UpdateField; +import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; import com.amazonaws.services.kinesis.model.ChildShard; import com.amazonaws.util.CollectionUtils; import org.apache.commons.logging.Log; @@ -37,6 +39,7 @@ import com.google.common.annotations.VisibleForTesting; import java.util.List; import java.util.Objects; +import java.util.Optional; import java.util.Random; import java.util.Set; import java.util.stream.Collectors; @@ -48,6 +51,7 @@ class ShutdownTask implements ITask { private static final Log LOG = LogFactory.getLog(ShutdownTask.class); + private static final String SHUTDOWN_TASK_OPERATION = "ShutdownTask"; private static final String RECORD_PROCESSOR_SHUTDOWN_METRIC = "RecordProcessor.shutdown"; @VisibleForTesting static final int RETRY_RANDOM_MAX_RANGE = 10; @@ -68,6 +72,7 @@ class ShutdownTask implements ITask { private final ShardSyncStrategy shardSyncStrategy; private final List childShards; private final LeaseCleanupManager leaseCleanupManager; + private final IMetricsFactory metricsFactory; /** * Constructor. @@ -85,7 +90,7 @@ class ShutdownTask implements ITask { long backoffTimeMillis, GetRecordsCache getRecordsCache, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy, List childShards, - LeaseCleanupManager leaseCleanupManager) { + LeaseCleanupManager leaseCleanupManager, IMetricsFactory metricsFactory) { this.shardInfo = shardInfo; this.recordProcessor = recordProcessor; this.recordProcessorCheckpointer = recordProcessorCheckpointer; @@ -101,6 +106,7 @@ class ShutdownTask implements ITask { this.shardSyncStrategy = shardSyncStrategy; this.childShards = childShards; this.leaseCleanupManager = leaseCleanupManager; + this.metricsFactory = metricsFactory; } /* @@ -111,110 +117,143 @@ class ShutdownTask implements ITask { */ @Override public TaskResult call() { + MetricsHelper.startScope(metricsFactory, SHUTDOWN_TASK_OPERATION); Exception exception; - boolean applicationException = false; try { LOG.info("Invoking shutdown() for shard " + shardInfo.getShardId() + ", concurrencyToken: " - + shardInfo.getConcurrencyToken() + ", original Shutdown reason: " + reason + ". childShards:" + childShards); + + shardInfo.getConcurrencyToken() + ", original Shutdown reason: " + reason + ". childShards:" + childShards); - ShutdownReason localReason = attemptPersistingChildShardInfoAndOverrideShutdownReasonOnFailure(reason); - - final ShutdownInput shutdownInput = new ShutdownInput() - .withShutdownReason(localReason) - .withCheckpointer(recordProcessorCheckpointer); - final long recordProcessorStartTimeMillis = System.currentTimeMillis(); try { - recordProcessor.shutdown(shutdownInput); - ExtendedSequenceNumber lastCheckpointValue = recordProcessorCheckpointer.getLastCheckpointValue(); + final long startTime = System.currentTimeMillis(); + final KinesisClientLease currentShardLease = leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId()); + final Runnable leaseLostAction = () -> takeLeaseLostAction(); - final boolean successfullyCheckpointedShardEnd = lastCheckpointValue.equals(ExtendedSequenceNumber.SHARD_END); - - if (localReason == ShutdownReason.TERMINATE) { - if ((lastCheckpointValue == null) || (!successfullyCheckpointedShardEnd)) { - throw new IllegalArgumentException("Application didn't checkpoint at end of shard " - + shardInfo.getShardId() + ". Application must checkpoint upon shutdown. " + - "See IRecordProcessor.shutdown javadocs for more information."); - } - - // Check if either the shard end ddb persist is successful or - // if childshards is empty. When child shards is empty then either it is due to - // completed shard being reprocessed or we got RNF from service. - // For these cases enqueue the lease for deletion. - if (successfullyCheckpointedShardEnd || CollectionUtils.isNullOrEmpty(childShards)) { - final KinesisClientLease currentLease = leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId()); - final LeasePendingDeletion leasePendingDeletion = new LeasePendingDeletion(currentLease, shardInfo); - - if (!leaseCleanupManager.isEnqueuedForDeletion(leasePendingDeletion)) { - leaseCleanupManager.enqueueForDeletion(leasePendingDeletion); - } - - //TODO: Add shard end checkpointing here. + if (reason == ShutdownReason.TERMINATE) { + try { + takeShardEndAction(currentShardLease, startTime); + } catch (InvalidStateException e) { + // If InvalidStateException happens, it indicates we have a non recoverable error in short term. + // In this scenario, we should shutdown the shardConsumer with ZOMBIE reason to allow other worker to take the lease and retry shutting down. + LOG.warn("Lease " + shardInfo.getShardId() + ": Invalid state encountered while shutting down shardConsumer with TERMINATE reason. " + + "Dropping the lease and shutting down shardConsumer using ZOMBIE reason. ", e); + dropLease(currentShardLease); + throwOnApplicationException(leaseLostAction, startTime); } + } else { + throwOnApplicationException(leaseLostAction, startTime); } + LOG.debug("Shutting down retrieval strategy."); getRecordsCache.shutdown(); LOG.debug("Record processor completed shutdown() for shard " + shardInfo.getShardId()); + return new TaskResult(null); } catch (Exception e) { - applicationException = true; - throw e; - } finally { - MetricsHelper.addLatency(RECORD_PROCESSOR_SHUTDOWN_METRIC, recordProcessorStartTimeMillis, - MetricsLevel.SUMMARY); - } + if (e instanceof CustomerApplicationException) { + LOG.error("Shard " + shardInfo.getShardId() + ": Application exception: ", e); + } else { + LOG.error("Shard " + shardInfo.getShardId() + ": Caught exception: ", e); + } - return new TaskResult(null); - } catch (Exception e) { - if (applicationException) { - LOG.error("Application exception. ", e); - } else { - LOG.error("Caught exception: ", e); - } - exception = e; - // backoff if we encounter an exception. - try { - Thread.sleep(this.backoffTimeMillis); - } catch (InterruptedException ie) { - LOG.debug("Interrupted sleep", ie); + exception = e; + // backoff if we encounter an exception. + try { + Thread.sleep(this.backoffTimeMillis); + } catch (InterruptedException ie) { + LOG.debug("Interrupted sleep", ie); + } } + } finally { + MetricsHelper.endScope(); } return new TaskResult(exception); } - private ShutdownReason attemptPersistingChildShardInfoAndOverrideShutdownReasonOnFailure(ShutdownReason originalReason) - throws DependencyException, ProvisionedThroughputException { - ShutdownReason shutdownReason = originalReason; - if(originalReason == ShutdownReason.TERMINATE) { - // For TERMINATE shutdown reason, try to create and persist childShard leases before setting checkpoint. - try { - final KinesisClientLease currentLease = leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId()); - if (currentLease == null) { - throw new InvalidStateException(shardInfo.getShardId() - + " : Lease not owned by the current worker. Leaving ShardEnd handling to new owner."); - } - if (!CollectionUtils.isNullOrEmpty(childShards)) { - createLeasesForChildShardsIfNotExist(); - updateCurrentLeaseWithChildShards(currentLease); - recordProcessorCheckpointer.setSequenceNumberAtShardEnd( - recordProcessorCheckpointer.getLargestPermittedCheckpointValue()); - recordProcessorCheckpointer.setLargestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END); - } else { - LOG.warn("Shard " + shardInfo.getShardId() - + ": Shutting down consumer with SHARD_END reason without creating leases for child shards."); - } - } catch (InvalidStateException e) { - // If invalidStateException happens, it indicates we are missing childShard related information. - // In this scenario, we should shutdown the shardConsumer with ZOMBIE reason to allow other worker to take the lease and retry getting - // childShard information in the processTask. - shutdownReason = ShutdownReason.ZOMBIE; - dropLease(); - LOG.warn("Shard " + shardInfo.getShardId() + ": Exception happened while shutting down shardConsumer with TERMINATE reason. " + - "Dropping the lease and shutting down shardConsumer using ZOMBIE reason. Exception: ", e); - } - + // Involves persisting child shard info, attempt to checkpoint and enqueueing lease for cleanup. + private void takeShardEndAction(KinesisClientLease currentShardLease, long startTime) + throws InvalidStateException, DependencyException, ProvisionedThroughputException, CustomerApplicationException { + // Create new lease for the child shards if they don't exist. + // We have one valid scenario that shutdown task got created with SHARD_END reason and an empty list of childShards. + // This would happen when KinesisDataFetcher catches ResourceNotFound exception. + // In this case, KinesisDataFetcher will send out SHARD_END signal to trigger a shutdown task with empty list of childShards. + // This scenario could happen when customer deletes the stream while leaving the KCL application running. + if (currentShardLease == null) { + throw new InvalidStateException("Shard " + shardInfo.getShardId() + ": Lease not owned by the current worker. Leaving ShardEnd handling to new owner."); + } + if (!CollectionUtils.isNullOrEmpty(childShards)) { + // If childShards is not empty, create new leases for the childShards and update the current lease with the childShards lease information. + createLeasesForChildShardsIfNotExist(); + updateCurrentLeaseWithChildShards(currentShardLease); + } else { + LOG.warn("Shard " + shardInfo.getShardId() + + ": Shutting down consumer with SHARD_END reason without creating leases for child shards."); + } + // Checkpoint with SHARD_END sequence number. + final LeasePendingDeletion leasePendingDeletion = new LeasePendingDeletion(currentShardLease, shardInfo); + if (!leaseCleanupManager.isEnqueuedForDeletion(leasePendingDeletion)) { + boolean isSuccess = false; + try { + isSuccess = attemptShardEndCheckpointing(startTime); + } finally { + // Check if either the shard end ddb persist is successful or + // if childshards is empty. When child shards is empty then either it is due to + // completed shard being reprocessed or we got RNF from service. + // For these cases enqueue the lease for deletion. + if (isSuccess || CollectionUtils.isNullOrEmpty(childShards)) { + leaseCleanupManager.enqueueForDeletion(leasePendingDeletion); + } + } + } + } + + private void takeLeaseLostAction() { + final ShutdownInput leaseLostShutdownInput = new ShutdownInput() + .withShutdownReason(ShutdownReason.ZOMBIE) + .withCheckpointer(recordProcessorCheckpointer); + recordProcessor.shutdown(leaseLostShutdownInput); + } + + private boolean attemptShardEndCheckpointing(long startTime) + throws DependencyException, ProvisionedThroughputException, InvalidStateException, CustomerApplicationException { + final KinesisClientLease leaseFromDdb = Optional.ofNullable(leaseCoordinator.getLeaseManager().getLease(shardInfo.getShardId())) + .orElseThrow(() -> new InvalidStateException("Lease for shard " + shardInfo.getShardId() + " does not exist.")); + if (!leaseFromDdb.getCheckpoint().equals(ExtendedSequenceNumber.SHARD_END)) { + // Call the recordProcessor to checkpoint with SHARD_END sequence number. + // The recordProcessor.shutdown is implemented by customer. We should validate if the SHARD_END checkpointing is successful after calling recordProcessor.shutdown. + throwOnApplicationException(() -> applicationCheckpointAndVerification(), startTime); + } + return true; + } + + private void applicationCheckpointAndVerification() { + recordProcessorCheckpointer.setSequenceNumberAtShardEnd( + recordProcessorCheckpointer.getLargestPermittedCheckpointValue()); + recordProcessorCheckpointer.setLargestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END); + final ShutdownInput shardEndShutdownInput = new ShutdownInput() + .withShutdownReason(ShutdownReason.TERMINATE) + .withCheckpointer(recordProcessorCheckpointer); + recordProcessor.shutdown(shardEndShutdownInput); + + final ExtendedSequenceNumber lastCheckpointValue = recordProcessorCheckpointer.getLastCheckpointValue(); + + final boolean successfullyCheckpointedShardEnd = lastCheckpointValue.equals(ExtendedSequenceNumber.SHARD_END); + + if ((lastCheckpointValue == null) || (!successfullyCheckpointedShardEnd)) { + throw new IllegalArgumentException("Application didn't checkpoint at end of shard " + + shardInfo.getShardId() + ". Application must checkpoint upon shutdown. " + + "See IRecordProcessor.shutdown javadocs for more information."); + } + } + + private void throwOnApplicationException(Runnable action, long startTime) throws CustomerApplicationException { + try { + action.run(); + } catch (Exception e) { + throw new CustomerApplicationException("Customer application throws exception for shard " + shardInfo.getShardId(), e); + } finally { + MetricsHelper.addLatency(RECORD_PROCESSOR_SHUTDOWN_METRIC, startTime, MetricsLevel.SUMMARY); } - return shutdownReason; } private void createLeasesForChildShardsIfNotExist() throws InvalidStateException, DependencyException, ProvisionedThroughputException { @@ -285,13 +324,12 @@ class ShutdownTask implements ITask { return reason; } - private void dropLease() { - KinesisClientLease lease = leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId()); - if (lease == null) { - LOG.warn("Shard " + shardInfo.getShardId() + ": Lease already dropped. Will shutdown the shardConsumer directly."); + private void dropLease(KinesisClientLease currentShardLease) { + if (currentShardLease == null) { + LOG.warn("Shard " + shardInfo.getShardId() + ": Unable to find the lease for shard. Will shutdown the shardConsumer directly."); return; } - leaseCoordinator.dropLease(lease); - LOG.warn("Dropped lease for shutting down ShardConsumer: " + lease.getLeaseKey()); + leaseCoordinator.dropLease(currentShardLease); + LOG.warn("Dropped lease for shutting down ShardConsumer: " + currentShardLease.getLeaseKey()); } } diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/exceptions/CustomerApplicationException.java b/src/main/java/com/amazonaws/services/kinesis/leases/exceptions/CustomerApplicationException.java new file mode 100644 index 00000000..1ef906af --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/leases/exceptions/CustomerApplicationException.java @@ -0,0 +1,24 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazonaws.services.kinesis.leases.exceptions; + +public class CustomerApplicationException extends Exception { + public CustomerApplicationException(Throwable t) {super(t);} + + public CustomerApplicationException(String message, Throwable t) {super(message, t);} + + public CustomerApplicationException(String message) {super(message);} +} diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java index 67bf3697..cb89b619 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java @@ -583,7 +583,12 @@ public class ShardConsumerTest { .thenReturn(getRecordsCache); when(leaseManager.getLease(anyString())).thenReturn(null); when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); - when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId())).thenReturn(new KinesisClientLease()); + List parentShardIds = new ArrayList<>(); + parentShardIds.add("parentShardId"); + KinesisClientLease currentLease = createLease(streamShardId, "leaseOwner", parentShardIds); + currentLease.setCheckpoint(new ExtendedSequenceNumber("testSequenceNumbeer")); + when(leaseManager.getLease(streamShardId)).thenReturn(currentLease); + when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId())).thenReturn(currentLease); RecordProcessorCheckpointer recordProcessorCheckpointer = new RecordProcessorCheckpointer( shardInfo, @@ -705,7 +710,11 @@ public class ShardConsumerTest { final int idleTimeMS = 0; // keep unit tests fast ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString()); checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.TRIM_HORIZON, testConcurrencyToken); - when(leaseManager.getLease(anyString())).thenReturn(null); + List parentShardIds = new ArrayList<>(); + parentShardIds.add("parentShardId"); + KinesisClientLease currentLease = createLease(streamShardId, "leaseOwner", parentShardIds); + currentLease.setCheckpoint(new ExtendedSequenceNumber("testSequenceNumbeer")); + when(leaseManager.getLease(streamShardId)).thenReturn(currentLease); when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); TransientShutdownErrorTestStreamlet processor = new TransientShutdownErrorTestStreamlet(); @@ -758,11 +767,7 @@ public class ShardConsumerTest { shardSyncer, shardSyncStrategy); - List parentShardIds = new ArrayList<>(); - parentShardIds.add(shardInfo.getShardId()); - when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId())).thenReturn(createLease(shardInfo.getShardId(), - "leaseOwner", - parentShardIds)); + when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId())).thenReturn(currentLease); when(leaseCoordinator.updateLease(any(KinesisClientLease.class), any(UUID.class))).thenReturn(true); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java index 219fd28c..2942dcb7 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java @@ -38,9 +38,12 @@ import java.util.UUID; import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.BlockedOnParentShardException; import com.amazonaws.services.kinesis.clientlibrary.proxies.ShardListWrappingShardClosureVerificationResponse; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; +import com.amazonaws.services.kinesis.leases.exceptions.CustomerApplicationException; import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.leases.impl.UpdateField; import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager; +import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory; +import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; import com.amazonaws.services.kinesis.model.ChildShard; import com.amazonaws.services.kinesis.model.HashKeyRange; import com.amazonaws.services.kinesis.model.SequenceNumberRange; @@ -81,6 +84,7 @@ public class ShutdownTaskTest { defaultParentShardIds, ExtendedSequenceNumber.LATEST); ShardSyncer shardSyncer = new KinesisShardSyncer(new KinesisLeaseCleanupValidator()); + IMetricsFactory metricsFactory = new NullMetricsFactory(); @Mock @@ -119,8 +123,11 @@ public class ShutdownTaskTest { public void setUp() throws Exception { doNothing().when(getRecordsCache).shutdown(); final KinesisClientLease parentLease = createLease(defaultShardId, "leaseOwner", Collections.emptyList()); + parentLease.setCheckpoint(new ExtendedSequenceNumber("3298")); when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); when(leaseCoordinator.getCurrentlyHeldLease(defaultShardId)).thenReturn(parentLease); + when(leaseManager.getLease(defaultShardId)).thenReturn(parentLease); + } /** @@ -154,12 +161,12 @@ public class ShutdownTaskTest { shardSyncer, shardSyncStrategy, constructSplitChildShards(), - leaseCleanupManager); + leaseCleanupManager, + metricsFactory); TaskResult result = task.call(); assertNotNull(result.getException()); - Assert.assertTrue(result.getException() instanceof IllegalArgumentException); - final String expectedExceptionMessage = "Application didn't checkpoint at end of shard shardId-0. " + - "Application must checkpoint upon shutdown. See IRecordProcessor.shutdown javadocs for more information."; + Assert.assertTrue(result.getException() instanceof CustomerApplicationException); + final String expectedExceptionMessage = "Customer application throws exception for shard shardId-0"; Assert.assertEquals(expectedExceptionMessage, result.getException().getMessage()); } @@ -190,7 +197,8 @@ public class ShutdownTaskTest { shardSyncer, shardSyncStrategy, constructSplitChildShards(), - leaseCleanupManager); + leaseCleanupManager, + metricsFactory); TaskResult result = task.call(); verify(getRecordsCache).shutdown(); verify(leaseCoordinator).dropLease(any(KinesisClientLease.class)); @@ -206,6 +214,7 @@ public class ShutdownTaskTest { boolean ignoreUnexpectedChildShards = false; KinesisClientLease currentLease = createLease(defaultShardId, "leaseOwner", Collections.emptyList()); + currentLease.setCheckpoint(new ExtendedSequenceNumber("3298")); KinesisClientLease adjacentParentLease = createLease("ShardId-1", "leaseOwner", Collections.emptyList()); when(leaseCoordinator.getCurrentlyHeldLease(defaultShardId)).thenReturn( currentLease); when(leaseManager.getLease(defaultShardId)).thenReturn(currentLease); @@ -227,7 +236,8 @@ public class ShutdownTaskTest { shardSyncer, shardSyncStrategy, constructMergeChildShards(), - leaseCleanupManager)); + leaseCleanupManager, + metricsFactory)); when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(false); TaskResult result = task.call(); assertNotNull(result.getException()); @@ -253,7 +263,8 @@ public class ShutdownTaskTest { shardSyncer, shardSyncStrategy, constructMergeChildShards(), - leaseCleanupManager)); + leaseCleanupManager, + metricsFactory)); when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(false); TaskResult result = task.call(); assertNull(result.getException()); @@ -291,7 +302,8 @@ public class ShutdownTaskTest { shardSyncer, shardSyncStrategy, constructMergeChildShards(), - leaseCleanupManager)); + leaseCleanupManager, + metricsFactory)); when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(false); TaskResult result = task.call(); assertNotNull(result.getException()); @@ -316,7 +328,8 @@ public class ShutdownTaskTest { shardSyncer, shardSyncStrategy, constructMergeChildShards(), - leaseCleanupManager)); + leaseCleanupManager, + metricsFactory)); when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(true); TaskResult result = task.call(); assertNull(result.getException()); @@ -347,7 +360,8 @@ public class ShutdownTaskTest { shardSyncer, shardSyncStrategy, constructSplitChildShards(), - leaseCleanupManager); + leaseCleanupManager, + metricsFactory); TaskResult result = task.call(); verify(leaseManager, times(2)).createLeaseIfNotExists(any(KinesisClientLease.class)); verify(leaseManager).updateLeaseWithMetaInfo(any(KinesisClientLease.class), any(UpdateField.class)); @@ -381,7 +395,8 @@ public class ShutdownTaskTest { shardSyncer, shardSyncStrategy, Collections.emptyList(), - leaseCleanupManager); + leaseCleanupManager, + metricsFactory); TaskResult result = task.call(); verify(leaseManager, never()).createLeaseIfNotExists(any(KinesisClientLease.class)); verify(leaseManager, never()).updateLeaseWithMetaInfo(any(KinesisClientLease.class), any(UpdateField.class)); @@ -411,7 +426,8 @@ public class ShutdownTaskTest { shardSyncer, shardSyncStrategy, Collections.emptyList(), - leaseCleanupManager); + leaseCleanupManager, + metricsFactory); TaskResult result = task.call(); verify(leaseManager, never()).createLeaseIfNotExists(any(KinesisClientLease.class)); verify(leaseManager, never()).updateLeaseWithMetaInfo(any(KinesisClientLease.class), any(UpdateField.class)); @@ -428,7 +444,8 @@ public class ShutdownTaskTest { ShutdownTask task = new ShutdownTask(null, null, null, null, null, null, false, false, leaseCoordinator, 0, - getRecordsCache, shardSyncer, shardSyncStrategy, Collections.emptyList(), leaseCleanupManager); + getRecordsCache, shardSyncer, shardSyncStrategy, Collections.emptyList(), + leaseCleanupManager, metricsFactory); Assert.assertEquals(TaskType.SHUTDOWN, task.getTaskType()); }