Shutdown that throws an exception will be retried. (#238)

Without this change a transient error on shutdown with reason terminate prevents
child shards from starting.
This commit is contained in:
jmooreoliva 2017-10-18 14:16:54 -07:00 committed by BtXin
parent ee3a6c24e6
commit 44437f1361
2 changed files with 125 additions and 1 deletions

View file

@ -406,7 +406,7 @@ class ShardConsumer {
if (taskOutcome == TaskOutcome.END_OF_SHARD) { if (taskOutcome == TaskOutcome.END_OF_SHARD) {
markForShutdown(ShutdownReason.TERMINATE); markForShutdown(ShutdownReason.TERMINATE);
} }
if (isShutdownRequested()) { if (isShutdownRequested() && taskOutcome != TaskOutcome.FAILURE) {
currentState = currentState.shutdownTransition(shutdownReason); currentState = currentState.shutdownTransition(shutdownReason);
} else if (taskOutcome == TaskOutcome.SUCCESSFUL) { } else if (taskOutcome == TaskOutcome.SUCCESSFUL) {
if (currentState.getTaskType() == currentTask.getTaskType()) { if (currentState.getTaskType() == currentTask.getTaskType()) {

View file

@ -40,6 +40,7 @@ import java.util.List;
import java.util.ListIterator; import java.util.ListIterator;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -66,12 +67,14 @@ import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisLocalFileProx
import com.amazonaws.services.kinesis.clientlibrary.proxies.util.KinesisLocalFileDataCreator; import com.amazonaws.services.kinesis.clientlibrary.proxies.util.KinesisLocalFileDataCreator;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord; import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager; import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory; import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.model.ShardIteratorType; import com.amazonaws.services.kinesis.model.ShardIteratorType;
/** /**
@ -418,6 +421,127 @@ public class ShardConsumerTest {
file.delete(); file.delete();
} }
private static final class TransientShutdownErrorTestStreamlet extends TestStreamlet {
private final CountDownLatch errorShutdownLatch = new CountDownLatch(1);
@Override
public void shutdown(ShutdownInput input) {
ShutdownReason reason = input.getShutdownReason();
if (reason.equals(ShutdownReason.TERMINATE) && errorShutdownLatch.getCount() > 0) {
errorShutdownLatch.countDown();
throw new RuntimeException("test");
} else {
super.shutdown(input);
}
}
}
/**
* Test method for {@link ShardConsumer#consumeShard()} that ensures a transient error thrown from the record
* processor's shutdown method with reason terminate will be retried.
*/
@Test
public final void testConsumeShardWithTransientTerminateError() throws Exception {
int numRecs = 10;
BigInteger startSeqNum = BigInteger.ONE;
String streamShardId = "kinesis-0-0";
String testConcurrencyToken = "testToken";
List<Shard> shardList = KinesisLocalFileDataCreator.createShardList(1, "kinesis-0-", startSeqNum);
// Close the shard so that shutdown is called with reason terminate
shardList.get(0).getSequenceNumberRange().setEndingSequenceNumber(
KinesisLocalFileProxy.MAX_SEQUENCE_NUMBER.subtract(BigInteger.ONE).toString());
File file = KinesisLocalFileDataCreator.generateTempDataFile(shardList, numRecs, "unitTestSCT002");
IKinesisProxy fileBasedProxy = new KinesisLocalFileProxy(file.getAbsolutePath());
final int maxRecords = 2;
final int idleTimeMS = 0; // keep unit tests fast
ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString());
checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.TRIM_HORIZON, testConcurrencyToken);
when(leaseManager.getLease(anyString())).thenReturn(null);
TransientShutdownErrorTestStreamlet processor = new TransientShutdownErrorTestStreamlet();
StreamConfig streamConfig =
new StreamConfig(fileBasedProxy,
maxRecords,
idleTimeMS,
callProcessRecordsForEmptyRecordList,
skipCheckpointValidationValue, INITIAL_POSITION_LATEST);
ShardInfo shardInfo = new ShardInfo(streamShardId, testConcurrencyToken, null, null);
ShardConsumer consumer =
new ShardConsumer(shardInfo,
streamConfig,
checkpoint,
processor,
leaseManager,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
executorService,
metricsFactory,
taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // check on parent shards
Thread.sleep(50L);
consumer.consumeShard(); // start initialization
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING)));
consumer.consumeShard(); // initialize
processor.getInitializeLatch().await(5, TimeUnit.SECONDS);
// We expect to process all records in numRecs calls
for (int i = 0; i < numRecs;) {
boolean newTaskSubmitted = consumer.consumeShard();
if (newTaskSubmitted) {
LOG.debug("New processing task was submitted, call # " + i);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.PROCESSING)));
// CHECKSTYLE:IGNORE ModifiedControlVariable FOR NEXT 1 LINES
i += maxRecords;
}
Thread.sleep(50L);
}
// Consume shards until shutdown terminate is called and it has thrown an exception
for (int i = 0; i < 100; i++) {
consumer.consumeShard();
if (processor.errorShutdownLatch.await(50, TimeUnit.MILLISECONDS)) {
break;
}
}
assertEquals(0, processor.errorShutdownLatch.getCount());
// Wait for a retry of shutdown terminate that should succeed
for (int i = 0; i < 100; i++) {
consumer.consumeShard();
if (processor.getShutdownLatch().await(50, TimeUnit.MILLISECONDS)) {
break;
}
}
assertEquals(0, processor.getShutdownLatch().getCount());
// Wait for shutdown complete now that terminate shutdown is successful
for (int i = 0; i < 100; i++) {
consumer.consumeShard();
if (consumer.getCurrentState() == ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE) {
break;
}
Thread.sleep(50L);
}
assertThat(consumer.getCurrentState(), equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE));
assertThat(processor.getShutdownReason(), is(equalTo(ShutdownReason.TERMINATE)));
executorService.shutdown();
executorService.awaitTermination(60, TimeUnit.SECONDS);
String iterator = fileBasedProxy.getIterator(streamShardId, ShardIteratorType.TRIM_HORIZON.toString());
List<Record> expectedRecords = toUserRecords(fileBasedProxy.get(iterator, numRecs).getRecords());
verifyConsumedRecords(expectedRecords, processor.getProcessedRecords());
file.delete();
}
/** /**
* Test method for {@link ShardConsumer#consumeShard()} that starts from initial position of type AT_TIMESTAMP. * Test method for {@link ShardConsumer#consumeShard()} that starts from initial position of type AT_TIMESTAMP.
*/ */