diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java index 33187ad7..d7a5545e 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java @@ -406,7 +406,7 @@ class ShardConsumer { if (taskOutcome == TaskOutcome.END_OF_SHARD) { markForShutdown(ShutdownReason.TERMINATE); } - if (isShutdownRequested() && taskOutcome != TaskOutcome.FAILURE) { + if (isShutdownRequested()) { currentState = currentState.shutdownTransition(shutdownReason); } else if (taskOutcome == TaskOutcome.SUCCESSFUL) { if (currentState.getTaskType() == currentTask.getTaskType()) { diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java index 1858e07c..0bd2f31a 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java @@ -40,7 +40,6 @@ import java.util.List; import java.util.ListIterator; import java.util.Objects; import java.util.Optional; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -67,14 +66,12 @@ import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisLocalFileProx import com.amazonaws.services.kinesis.clientlibrary.proxies.util.KinesisLocalFileDataCreator; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; -import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord; import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager; import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory; import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; import com.amazonaws.services.kinesis.model.Record; -import com.amazonaws.services.kinesis.model.Shard; import com.amazonaws.services.kinesis.model.ShardIteratorType; /** @@ -421,127 +418,6 @@ public class ShardConsumerTest { file.delete(); } - private static final class TransientShutdownErrorTestStreamlet extends TestStreamlet { - private final CountDownLatch errorShutdownLatch = new CountDownLatch(1); - - @Override - public void shutdown(ShutdownInput input) { - ShutdownReason reason = input.getShutdownReason(); - if (reason.equals(ShutdownReason.TERMINATE) && errorShutdownLatch.getCount() > 0) { - errorShutdownLatch.countDown(); - throw new RuntimeException("test"); - } else { - super.shutdown(input); - } - } - } - - /** - * Test method for {@link ShardConsumer#consumeShard()} that ensures a transient error thrown from the record - * processor's shutdown method with reason terminate will be retried. - */ - @Test - public final void testConsumeShardWithTransientTerminateError() throws Exception { - int numRecs = 10; - BigInteger startSeqNum = BigInteger.ONE; - String streamShardId = "kinesis-0-0"; - String testConcurrencyToken = "testToken"; - List shardList = KinesisLocalFileDataCreator.createShardList(1, "kinesis-0-", startSeqNum); - // Close the shard so that shutdown is called with reason terminate - shardList.get(0).getSequenceNumberRange().setEndingSequenceNumber( - KinesisLocalFileProxy.MAX_SEQUENCE_NUMBER.subtract(BigInteger.ONE).toString()); - File file = KinesisLocalFileDataCreator.generateTempDataFile(shardList, numRecs, "unitTestSCT002"); - - IKinesisProxy fileBasedProxy = new KinesisLocalFileProxy(file.getAbsolutePath()); - - final int maxRecords = 2; - final int idleTimeMS = 0; // keep unit tests fast - ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString()); - checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.TRIM_HORIZON, testConcurrencyToken); - when(leaseManager.getLease(anyString())).thenReturn(null); - - TransientShutdownErrorTestStreamlet processor = new TransientShutdownErrorTestStreamlet(); - - StreamConfig streamConfig = - new StreamConfig(fileBasedProxy, - maxRecords, - idleTimeMS, - callProcessRecordsForEmptyRecordList, - skipCheckpointValidationValue, INITIAL_POSITION_LATEST); - - ShardInfo shardInfo = new ShardInfo(streamShardId, testConcurrencyToken, null, null); - ShardConsumer consumer = - new ShardConsumer(shardInfo, - streamConfig, - checkpoint, - processor, - leaseManager, - parentShardPollIntervalMillis, - cleanupLeasesOfCompletedShards, - executorService, - metricsFactory, - taskBackoffTimeMillis, - KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); - - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); - consumer.consumeShard(); // check on parent shards - Thread.sleep(50L); - consumer.consumeShard(); // start initialization - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); - consumer.consumeShard(); // initialize - processor.getInitializeLatch().await(5, TimeUnit.SECONDS); - - // We expect to process all records in numRecs calls - for (int i = 0; i < numRecs;) { - boolean newTaskSubmitted = consumer.consumeShard(); - if (newTaskSubmitted) { - LOG.debug("New processing task was submitted, call # " + i); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.PROCESSING))); - // CHECKSTYLE:IGNORE ModifiedControlVariable FOR NEXT 1 LINES - i += maxRecords; - } - Thread.sleep(50L); - } - - // Consume shards until shutdown terminate is called and it has thrown an exception - for (int i = 0; i < 100; i++) { - consumer.consumeShard(); - if (processor.errorShutdownLatch.await(50, TimeUnit.MILLISECONDS)) { - break; - } - } - assertEquals(0, processor.errorShutdownLatch.getCount()); - - // Wait for a retry of shutdown terminate that should succeed - for (int i = 0; i < 100; i++) { - consumer.consumeShard(); - if (processor.getShutdownLatch().await(50, TimeUnit.MILLISECONDS)) { - break; - } - } - assertEquals(0, processor.getShutdownLatch().getCount()); - - // Wait for shutdown complete now that terminate shutdown is successful - for (int i = 0; i < 100; i++) { - consumer.consumeShard(); - if (consumer.getCurrentState() == ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE) { - break; - } - Thread.sleep(50L); - } - assertThat(consumer.getCurrentState(), equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE)); - - assertThat(processor.getShutdownReason(), is(equalTo(ShutdownReason.TERMINATE))); - - executorService.shutdown(); - executorService.awaitTermination(60, TimeUnit.SECONDS); - - String iterator = fileBasedProxy.getIterator(streamShardId, ShardIteratorType.TRIM_HORIZON.toString()); - List expectedRecords = toUserRecords(fileBasedProxy.get(iterator, numRecs).getRecords()); - verifyConsumedRecords(expectedRecords, processor.getProcessedRecords()); - file.delete(); - } - /** * Test method for {@link ShardConsumer#consumeShard()} that starts from initial position of type AT_TIMESTAMP. */