diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java index d8aa88d1..9fb8e8e9 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java @@ -412,7 +412,7 @@ class ShardConsumer { if (taskOutcome == TaskOutcome.END_OF_SHARD) { markForShutdown(ShutdownReason.TERMINATE); } - if (isShutdownRequested()) { + if (isShutdownRequested() && taskOutcome != TaskOutcome.FAILURE) { currentState = currentState.shutdownTransition(shutdownReason); } else if (taskOutcome == TaskOutcome.SUCCESSFUL) { if (currentState.getTaskType() == currentTask.getTaskType()) { diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java index 516788c7..8a91c6e6 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java @@ -42,6 +42,7 @@ import java.util.List; import java.util.ListIterator; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -69,12 +70,14 @@ import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisLocalFileProx import com.amazonaws.services.kinesis.clientlibrary.proxies.util.KinesisLocalFileDataCreator; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; +import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord; import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager; import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory; import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; import com.amazonaws.services.kinesis.model.Record; +import com.amazonaws.services.kinesis.model.Shard; import com.amazonaws.services.kinesis.model.ShardIteratorType; /** @@ -102,6 +105,8 @@ public class ShardConsumerTest { private GetRecordsCache getRecordsCache; + private KinesisDataFetcher dataFetcher; + @Mock private IRecordProcessor processor; @Mock @@ -118,6 +123,7 @@ public class ShardConsumerTest { @Before public void setup() { getRecordsCache = null; + dataFetcher = null; recordsFetcherFactory = spy(new SimpleRecordsFetcherFactory()); when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory); @@ -339,7 +345,7 @@ public class ShardConsumerTest { ) ); - KinesisDataFetcher dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo); + dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo); getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords, new SynchronousGetRecordsRetrievalStrategy(dataFetcher))); @@ -422,6 +428,154 @@ public class ShardConsumerTest { file.delete(); } + private static final class TransientShutdownErrorTestStreamlet extends TestStreamlet { + private final CountDownLatch errorShutdownLatch = new CountDownLatch(1); + + @Override + public void shutdown(ShutdownInput input) { + ShutdownReason reason = input.getShutdownReason(); + if (reason.equals(ShutdownReason.TERMINATE) && errorShutdownLatch.getCount() > 0) { + errorShutdownLatch.countDown(); + throw new RuntimeException("test"); + } else { + super.shutdown(input); + } + } + } + + /** + * Test method for {@link ShardConsumer#consumeShard()} that ensures a transient error thrown from the record + * processor's shutdown method with reason terminate will be retried. + */ + @Test + public final void testConsumeShardWithTransientTerminateError() throws Exception { + int numRecs = 10; + BigInteger startSeqNum = BigInteger.ONE; + String streamShardId = "kinesis-0-0"; + String testConcurrencyToken = "testToken"; + List shardList = KinesisLocalFileDataCreator.createShardList(1, "kinesis-0-", startSeqNum); + // Close the shard so that shutdown is called with reason terminate + shardList.get(0).getSequenceNumberRange().setEndingSequenceNumber( + KinesisLocalFileProxy.MAX_SEQUENCE_NUMBER.subtract(BigInteger.ONE).toString()); + File file = KinesisLocalFileDataCreator.generateTempDataFile(shardList, numRecs, "unitTestSCT002"); + + IKinesisProxy fileBasedProxy = new KinesisLocalFileProxy(file.getAbsolutePath()); + + final int maxRecords = 2; + final int idleTimeMS = 0; // keep unit tests fast + ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString()); + checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.TRIM_HORIZON, testConcurrencyToken); + when(leaseManager.getLease(anyString())).thenReturn(null); + + TransientShutdownErrorTestStreamlet processor = new TransientShutdownErrorTestStreamlet(); + + StreamConfig streamConfig = + new StreamConfig(fileBasedProxy, + maxRecords, + idleTimeMS, + callProcessRecordsForEmptyRecordList, + skipCheckpointValidationValue, INITIAL_POSITION_LATEST); + + ShardInfo shardInfo = new ShardInfo(streamShardId, testConcurrencyToken, null, null); + + dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo); + + getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords, + new SynchronousGetRecordsRetrievalStrategy(dataFetcher))); + when(recordsFetcherFactory.createRecordsFetcher(any(GetRecordsRetrievalStrategy.class), anyString(), + any(IMetricsFactory.class), anyInt())) + .thenReturn(getRecordsCache); + + RecordProcessorCheckpointer recordProcessorCheckpointer = new RecordProcessorCheckpointer( + shardInfo, + checkpoint, + new SequenceNumberValidator( + streamConfig.getStreamProxy(), + shardInfo.getShardId(), + streamConfig.shouldValidateSequenceNumberBeforeCheckpointing() + ) + ); + + ShardConsumer consumer = + new ShardConsumer(shardInfo, + streamConfig, + checkpoint, + processor, + recordProcessorCheckpointer, + leaseManager, + parentShardPollIntervalMillis, + cleanupLeasesOfCompletedShards, + executorService, + metricsFactory, + taskBackoffTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, + dataFetcher, + Optional.empty(), + Optional.empty(), + config); + + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + consumer.consumeShard(); // check on parent shards + Thread.sleep(50L); + consumer.consumeShard(); // start initialization + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); + consumer.consumeShard(); // initialize + processor.getInitializeLatch().await(5, TimeUnit.SECONDS); + verify(getRecordsCache).start(); + + // We expect to process all records in numRecs calls + for (int i = 0; i < numRecs;) { + boolean newTaskSubmitted = consumer.consumeShard(); + if (newTaskSubmitted) { + LOG.debug("New processing task was submitted, call # " + i); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.PROCESSING))); + // CHECKSTYLE:IGNORE ModifiedControlVariable FOR NEXT 1 LINES + i += maxRecords; + } + Thread.sleep(50L); + } + + // Consume shards until shutdown terminate is called and it has thrown an exception + for (int i = 0; i < 100; i++) { + consumer.consumeShard(); + if (processor.errorShutdownLatch.await(50, TimeUnit.MILLISECONDS)) { + break; + } + } + assertEquals(0, processor.errorShutdownLatch.getCount()); + + // Wait for a retry of shutdown terminate that should succeed + for (int i = 0; i < 100; i++) { + consumer.consumeShard(); + if (processor.getShutdownLatch().await(50, TimeUnit.MILLISECONDS)) { + break; + } + } + assertEquals(0, processor.getShutdownLatch().getCount()); + + // Wait for shutdown complete now that terminate shutdown is successful + for (int i = 0; i < 100; i++) { + consumer.consumeShard(); + if (consumer.getCurrentState() == ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE) { + break; + } + Thread.sleep(50L); + } + assertThat(consumer.getCurrentState(), equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE)); + + assertThat(processor.getShutdownReason(), is(equalTo(ShutdownReason.TERMINATE))); + + verify(getRecordsCache).shutdown(); + + executorService.shutdown(); + executorService.awaitTermination(60, TimeUnit.SECONDS); + + String iterator = fileBasedProxy.getIterator(streamShardId, ShardIteratorType.TRIM_HORIZON.toString()); + List expectedRecords = toUserRecords(fileBasedProxy.get(iterator, numRecs).getRecords()); + verifyConsumedRecords(expectedRecords, processor.getProcessedRecords()); + file.delete(); + } + /** * Test method for {@link ShardConsumer#consumeShard()} that starts from initial position of type AT_TIMESTAMP. */ @@ -470,7 +624,7 @@ public class ShardConsumerTest { ) ); - KinesisDataFetcher dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo); + dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo); getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords, new SynchronousGetRecordsRetrievalStrategy(dataFetcher)));