From 44437f13610aeff177650a3d6e15545f56395177 Mon Sep 17 00:00:00 2001 From: jmooreoliva Date: Wed, 18 Oct 2017 14:16:54 -0700 Subject: [PATCH 1/2] Shutdown that throws an exception will be retried. (#238) Without this change a transient error on shutdown with reason terminate prevents child shards from starting. --- .../lib/worker/ShardConsumer.java | 2 +- .../lib/worker/ShardConsumerTest.java | 124 ++++++++++++++++++ 2 files changed, 125 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java index d7a5545e..33187ad7 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java @@ -406,7 +406,7 @@ class ShardConsumer { if (taskOutcome == TaskOutcome.END_OF_SHARD) { markForShutdown(ShutdownReason.TERMINATE); } - if (isShutdownRequested()) { + if (isShutdownRequested() && taskOutcome != TaskOutcome.FAILURE) { currentState = currentState.shutdownTransition(shutdownReason); } else if (taskOutcome == TaskOutcome.SUCCESSFUL) { if (currentState.getTaskType() == currentTask.getTaskType()) { diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java index 0bd2f31a..1858e07c 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java @@ -40,6 +40,7 @@ import java.util.List; import java.util.ListIterator; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -66,12 +67,14 @@ import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisLocalFileProx import com.amazonaws.services.kinesis.clientlibrary.proxies.util.KinesisLocalFileDataCreator; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; +import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord; import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager; import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory; import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; import com.amazonaws.services.kinesis.model.Record; +import com.amazonaws.services.kinesis.model.Shard; import com.amazonaws.services.kinesis.model.ShardIteratorType; /** @@ -418,6 +421,127 @@ public class ShardConsumerTest { file.delete(); } + private static final class TransientShutdownErrorTestStreamlet extends TestStreamlet { + private final CountDownLatch errorShutdownLatch = new CountDownLatch(1); + + @Override + public void shutdown(ShutdownInput input) { + ShutdownReason reason = input.getShutdownReason(); + if (reason.equals(ShutdownReason.TERMINATE) && errorShutdownLatch.getCount() > 0) { + errorShutdownLatch.countDown(); + throw new RuntimeException("test"); + } else { + super.shutdown(input); + } + } + } + + /** + * Test method for {@link ShardConsumer#consumeShard()} that ensures a transient error thrown from the record + * processor's shutdown method with reason terminate will be retried. + */ + @Test + public final void testConsumeShardWithTransientTerminateError() throws Exception { + int numRecs = 10; + BigInteger startSeqNum = BigInteger.ONE; + String streamShardId = "kinesis-0-0"; + String testConcurrencyToken = "testToken"; + List shardList = KinesisLocalFileDataCreator.createShardList(1, "kinesis-0-", startSeqNum); + // Close the shard so that shutdown is called with reason terminate + shardList.get(0).getSequenceNumberRange().setEndingSequenceNumber( + KinesisLocalFileProxy.MAX_SEQUENCE_NUMBER.subtract(BigInteger.ONE).toString()); + File file = KinesisLocalFileDataCreator.generateTempDataFile(shardList, numRecs, "unitTestSCT002"); + + IKinesisProxy fileBasedProxy = new KinesisLocalFileProxy(file.getAbsolutePath()); + + final int maxRecords = 2; + final int idleTimeMS = 0; // keep unit tests fast + ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString()); + checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.TRIM_HORIZON, testConcurrencyToken); + when(leaseManager.getLease(anyString())).thenReturn(null); + + TransientShutdownErrorTestStreamlet processor = new TransientShutdownErrorTestStreamlet(); + + StreamConfig streamConfig = + new StreamConfig(fileBasedProxy, + maxRecords, + idleTimeMS, + callProcessRecordsForEmptyRecordList, + skipCheckpointValidationValue, INITIAL_POSITION_LATEST); + + ShardInfo shardInfo = new ShardInfo(streamShardId, testConcurrencyToken, null, null); + ShardConsumer consumer = + new ShardConsumer(shardInfo, + streamConfig, + checkpoint, + processor, + leaseManager, + parentShardPollIntervalMillis, + cleanupLeasesOfCompletedShards, + executorService, + metricsFactory, + taskBackoffTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); + + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + consumer.consumeShard(); // check on parent shards + Thread.sleep(50L); + consumer.consumeShard(); // start initialization + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); + consumer.consumeShard(); // initialize + processor.getInitializeLatch().await(5, TimeUnit.SECONDS); + + // We expect to process all records in numRecs calls + for (int i = 0; i < numRecs;) { + boolean newTaskSubmitted = consumer.consumeShard(); + if (newTaskSubmitted) { + LOG.debug("New processing task was submitted, call # " + i); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.PROCESSING))); + // CHECKSTYLE:IGNORE ModifiedControlVariable FOR NEXT 1 LINES + i += maxRecords; + } + Thread.sleep(50L); + } + + // Consume shards until shutdown terminate is called and it has thrown an exception + for (int i = 0; i < 100; i++) { + consumer.consumeShard(); + if (processor.errorShutdownLatch.await(50, TimeUnit.MILLISECONDS)) { + break; + } + } + assertEquals(0, processor.errorShutdownLatch.getCount()); + + // Wait for a retry of shutdown terminate that should succeed + for (int i = 0; i < 100; i++) { + consumer.consumeShard(); + if (processor.getShutdownLatch().await(50, TimeUnit.MILLISECONDS)) { + break; + } + } + assertEquals(0, processor.getShutdownLatch().getCount()); + + // Wait for shutdown complete now that terminate shutdown is successful + for (int i = 0; i < 100; i++) { + consumer.consumeShard(); + if (consumer.getCurrentState() == ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE) { + break; + } + Thread.sleep(50L); + } + assertThat(consumer.getCurrentState(), equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE)); + + assertThat(processor.getShutdownReason(), is(equalTo(ShutdownReason.TERMINATE))); + + executorService.shutdown(); + executorService.awaitTermination(60, TimeUnit.SECONDS); + + String iterator = fileBasedProxy.getIterator(streamShardId, ShardIteratorType.TRIM_HORIZON.toString()); + List expectedRecords = toUserRecords(fileBasedProxy.get(iterator, numRecs).getRecords()); + verifyConsumedRecords(expectedRecords, processor.getProcessedRecords()); + file.delete(); + } + /** * Test method for {@link ShardConsumer#consumeShard()} that starts from initial position of type AT_TIMESTAMP. */ From 8ed6c81ceabb5a124a644c914a1e4b407374cfb3 Mon Sep 17 00:00:00 2001 From: Justin Pfifer Date: Thu, 19 Oct 2017 11:40:44 -0700 Subject: [PATCH 2/2] Revert "Shutdown that throws an exception will be retried. (#238)" (#251) This reverts commit 44437f13610aeff177650a3d6e15545f56395177. Reverted due to Build Failures --- .../lib/worker/ShardConsumer.java | 2 +- .../lib/worker/ShardConsumerTest.java | 124 ------------------ 2 files changed, 1 insertion(+), 125 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java index 33187ad7..d7a5545e 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java @@ -406,7 +406,7 @@ class ShardConsumer { if (taskOutcome == TaskOutcome.END_OF_SHARD) { markForShutdown(ShutdownReason.TERMINATE); } - if (isShutdownRequested() && taskOutcome != TaskOutcome.FAILURE) { + if (isShutdownRequested()) { currentState = currentState.shutdownTransition(shutdownReason); } else if (taskOutcome == TaskOutcome.SUCCESSFUL) { if (currentState.getTaskType() == currentTask.getTaskType()) { diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java index 1858e07c..0bd2f31a 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java @@ -40,7 +40,6 @@ import java.util.List; import java.util.ListIterator; import java.util.Objects; import java.util.Optional; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -67,14 +66,12 @@ import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisLocalFileProx import com.amazonaws.services.kinesis.clientlibrary.proxies.util.KinesisLocalFileDataCreator; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; -import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord; import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager; import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory; import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; import com.amazonaws.services.kinesis.model.Record; -import com.amazonaws.services.kinesis.model.Shard; import com.amazonaws.services.kinesis.model.ShardIteratorType; /** @@ -421,127 +418,6 @@ public class ShardConsumerTest { file.delete(); } - private static final class TransientShutdownErrorTestStreamlet extends TestStreamlet { - private final CountDownLatch errorShutdownLatch = new CountDownLatch(1); - - @Override - public void shutdown(ShutdownInput input) { - ShutdownReason reason = input.getShutdownReason(); - if (reason.equals(ShutdownReason.TERMINATE) && errorShutdownLatch.getCount() > 0) { - errorShutdownLatch.countDown(); - throw new RuntimeException("test"); - } else { - super.shutdown(input); - } - } - } - - /** - * Test method for {@link ShardConsumer#consumeShard()} that ensures a transient error thrown from the record - * processor's shutdown method with reason terminate will be retried. - */ - @Test - public final void testConsumeShardWithTransientTerminateError() throws Exception { - int numRecs = 10; - BigInteger startSeqNum = BigInteger.ONE; - String streamShardId = "kinesis-0-0"; - String testConcurrencyToken = "testToken"; - List shardList = KinesisLocalFileDataCreator.createShardList(1, "kinesis-0-", startSeqNum); - // Close the shard so that shutdown is called with reason terminate - shardList.get(0).getSequenceNumberRange().setEndingSequenceNumber( - KinesisLocalFileProxy.MAX_SEQUENCE_NUMBER.subtract(BigInteger.ONE).toString()); - File file = KinesisLocalFileDataCreator.generateTempDataFile(shardList, numRecs, "unitTestSCT002"); - - IKinesisProxy fileBasedProxy = new KinesisLocalFileProxy(file.getAbsolutePath()); - - final int maxRecords = 2; - final int idleTimeMS = 0; // keep unit tests fast - ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString()); - checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.TRIM_HORIZON, testConcurrencyToken); - when(leaseManager.getLease(anyString())).thenReturn(null); - - TransientShutdownErrorTestStreamlet processor = new TransientShutdownErrorTestStreamlet(); - - StreamConfig streamConfig = - new StreamConfig(fileBasedProxy, - maxRecords, - idleTimeMS, - callProcessRecordsForEmptyRecordList, - skipCheckpointValidationValue, INITIAL_POSITION_LATEST); - - ShardInfo shardInfo = new ShardInfo(streamShardId, testConcurrencyToken, null, null); - ShardConsumer consumer = - new ShardConsumer(shardInfo, - streamConfig, - checkpoint, - processor, - leaseManager, - parentShardPollIntervalMillis, - cleanupLeasesOfCompletedShards, - executorService, - metricsFactory, - taskBackoffTimeMillis, - KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); - - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); - consumer.consumeShard(); // check on parent shards - Thread.sleep(50L); - consumer.consumeShard(); // start initialization - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); - consumer.consumeShard(); // initialize - processor.getInitializeLatch().await(5, TimeUnit.SECONDS); - - // We expect to process all records in numRecs calls - for (int i = 0; i < numRecs;) { - boolean newTaskSubmitted = consumer.consumeShard(); - if (newTaskSubmitted) { - LOG.debug("New processing task was submitted, call # " + i); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.PROCESSING))); - // CHECKSTYLE:IGNORE ModifiedControlVariable FOR NEXT 1 LINES - i += maxRecords; - } - Thread.sleep(50L); - } - - // Consume shards until shutdown terminate is called and it has thrown an exception - for (int i = 0; i < 100; i++) { - consumer.consumeShard(); - if (processor.errorShutdownLatch.await(50, TimeUnit.MILLISECONDS)) { - break; - } - } - assertEquals(0, processor.errorShutdownLatch.getCount()); - - // Wait for a retry of shutdown terminate that should succeed - for (int i = 0; i < 100; i++) { - consumer.consumeShard(); - if (processor.getShutdownLatch().await(50, TimeUnit.MILLISECONDS)) { - break; - } - } - assertEquals(0, processor.getShutdownLatch().getCount()); - - // Wait for shutdown complete now that terminate shutdown is successful - for (int i = 0; i < 100; i++) { - consumer.consumeShard(); - if (consumer.getCurrentState() == ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE) { - break; - } - Thread.sleep(50L); - } - assertThat(consumer.getCurrentState(), equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE)); - - assertThat(processor.getShutdownReason(), is(equalTo(ShutdownReason.TERMINATE))); - - executorService.shutdown(); - executorService.awaitTermination(60, TimeUnit.SECONDS); - - String iterator = fileBasedProxy.getIterator(streamShardId, ShardIteratorType.TRIM_HORIZON.toString()); - List expectedRecords = toUserRecords(fileBasedProxy.get(iterator, numRecs).getRecords()); - verifyConsumedRecords(expectedRecords, processor.getProcessedRecords()); - file.delete(); - } - /** * Test method for {@link ShardConsumer#consumeShard()} that starts from initial position of type AT_TIMESTAMP. */