From 44437f13610aeff177650a3d6e15545f56395177 Mon Sep 17 00:00:00 2001 From: jmooreoliva Date: Wed, 18 Oct 2017 14:16:54 -0700 Subject: [PATCH] Shutdown that throws an exception will be retried. (#238) Without this change a transient error on shutdown with reason terminate prevents child shards from starting. --- .../lib/worker/ShardConsumer.java | 2 +- .../lib/worker/ShardConsumerTest.java | 124 ++++++++++++++++++ 2 files changed, 125 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java index d7a5545e..33187ad7 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java @@ -406,7 +406,7 @@ class ShardConsumer { if (taskOutcome == TaskOutcome.END_OF_SHARD) { markForShutdown(ShutdownReason.TERMINATE); } - if (isShutdownRequested()) { + if (isShutdownRequested() && taskOutcome != TaskOutcome.FAILURE) { currentState = currentState.shutdownTransition(shutdownReason); } else if (taskOutcome == TaskOutcome.SUCCESSFUL) { if (currentState.getTaskType() == currentTask.getTaskType()) { diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java index 0bd2f31a..1858e07c 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java @@ -40,6 +40,7 @@ import java.util.List; import java.util.ListIterator; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -66,12 +67,14 @@ import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisLocalFileProx import com.amazonaws.services.kinesis.clientlibrary.proxies.util.KinesisLocalFileDataCreator; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; +import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord; import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager; import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory; import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; import com.amazonaws.services.kinesis.model.Record; +import com.amazonaws.services.kinesis.model.Shard; import com.amazonaws.services.kinesis.model.ShardIteratorType; /** @@ -418,6 +421,127 @@ public class ShardConsumerTest { file.delete(); } + private static final class TransientShutdownErrorTestStreamlet extends TestStreamlet { + private final CountDownLatch errorShutdownLatch = new CountDownLatch(1); + + @Override + public void shutdown(ShutdownInput input) { + ShutdownReason reason = input.getShutdownReason(); + if (reason.equals(ShutdownReason.TERMINATE) && errorShutdownLatch.getCount() > 0) { + errorShutdownLatch.countDown(); + throw new RuntimeException("test"); + } else { + super.shutdown(input); + } + } + } + + /** + * Test method for {@link ShardConsumer#consumeShard()} that ensures a transient error thrown from the record + * processor's shutdown method with reason terminate will be retried. + */ + @Test + public final void testConsumeShardWithTransientTerminateError() throws Exception { + int numRecs = 10; + BigInteger startSeqNum = BigInteger.ONE; + String streamShardId = "kinesis-0-0"; + String testConcurrencyToken = "testToken"; + List shardList = KinesisLocalFileDataCreator.createShardList(1, "kinesis-0-", startSeqNum); + // Close the shard so that shutdown is called with reason terminate + shardList.get(0).getSequenceNumberRange().setEndingSequenceNumber( + KinesisLocalFileProxy.MAX_SEQUENCE_NUMBER.subtract(BigInteger.ONE).toString()); + File file = KinesisLocalFileDataCreator.generateTempDataFile(shardList, numRecs, "unitTestSCT002"); + + IKinesisProxy fileBasedProxy = new KinesisLocalFileProxy(file.getAbsolutePath()); + + final int maxRecords = 2; + final int idleTimeMS = 0; // keep unit tests fast + ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString()); + checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.TRIM_HORIZON, testConcurrencyToken); + when(leaseManager.getLease(anyString())).thenReturn(null); + + TransientShutdownErrorTestStreamlet processor = new TransientShutdownErrorTestStreamlet(); + + StreamConfig streamConfig = + new StreamConfig(fileBasedProxy, + maxRecords, + idleTimeMS, + callProcessRecordsForEmptyRecordList, + skipCheckpointValidationValue, INITIAL_POSITION_LATEST); + + ShardInfo shardInfo = new ShardInfo(streamShardId, testConcurrencyToken, null, null); + ShardConsumer consumer = + new ShardConsumer(shardInfo, + streamConfig, + checkpoint, + processor, + leaseManager, + parentShardPollIntervalMillis, + cleanupLeasesOfCompletedShards, + executorService, + metricsFactory, + taskBackoffTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); + + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + consumer.consumeShard(); // check on parent shards + Thread.sleep(50L); + consumer.consumeShard(); // start initialization + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); + consumer.consumeShard(); // initialize + processor.getInitializeLatch().await(5, TimeUnit.SECONDS); + + // We expect to process all records in numRecs calls + for (int i = 0; i < numRecs;) { + boolean newTaskSubmitted = consumer.consumeShard(); + if (newTaskSubmitted) { + LOG.debug("New processing task was submitted, call # " + i); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.PROCESSING))); + // CHECKSTYLE:IGNORE ModifiedControlVariable FOR NEXT 1 LINES + i += maxRecords; + } + Thread.sleep(50L); + } + + // Consume shards until shutdown terminate is called and it has thrown an exception + for (int i = 0; i < 100; i++) { + consumer.consumeShard(); + if (processor.errorShutdownLatch.await(50, TimeUnit.MILLISECONDS)) { + break; + } + } + assertEquals(0, processor.errorShutdownLatch.getCount()); + + // Wait for a retry of shutdown terminate that should succeed + for (int i = 0; i < 100; i++) { + consumer.consumeShard(); + if (processor.getShutdownLatch().await(50, TimeUnit.MILLISECONDS)) { + break; + } + } + assertEquals(0, processor.getShutdownLatch().getCount()); + + // Wait for shutdown complete now that terminate shutdown is successful + for (int i = 0; i < 100; i++) { + consumer.consumeShard(); + if (consumer.getCurrentState() == ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE) { + break; + } + Thread.sleep(50L); + } + assertThat(consumer.getCurrentState(), equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE)); + + assertThat(processor.getShutdownReason(), is(equalTo(ShutdownReason.TERMINATE))); + + executorService.shutdown(); + executorService.awaitTermination(60, TimeUnit.SECONDS); + + String iterator = fileBasedProxy.getIterator(streamShardId, ShardIteratorType.TRIM_HORIZON.toString()); + List expectedRecords = toUserRecords(fileBasedProxy.get(iterator, numRecs).getRecords()); + verifyConsumedRecords(expectedRecords, processor.getProcessedRecords()); + file.delete(); + } + /** * Test method for {@link ShardConsumer#consumeShard()} that starts from initial position of type AT_TIMESTAMP. */