This reverts commit 44437f1361.
Reverted due to Build Failures
This commit is contained in:
parent
44437f1361
commit
8ed6c81cea
2 changed files with 1 additions and 125 deletions
|
|
@ -406,7 +406,7 @@ class ShardConsumer {
|
||||||
if (taskOutcome == TaskOutcome.END_OF_SHARD) {
|
if (taskOutcome == TaskOutcome.END_OF_SHARD) {
|
||||||
markForShutdown(ShutdownReason.TERMINATE);
|
markForShutdown(ShutdownReason.TERMINATE);
|
||||||
}
|
}
|
||||||
if (isShutdownRequested() && taskOutcome != TaskOutcome.FAILURE) {
|
if (isShutdownRequested()) {
|
||||||
currentState = currentState.shutdownTransition(shutdownReason);
|
currentState = currentState.shutdownTransition(shutdownReason);
|
||||||
} else if (taskOutcome == TaskOutcome.SUCCESSFUL) {
|
} else if (taskOutcome == TaskOutcome.SUCCESSFUL) {
|
||||||
if (currentState.getTaskType() == currentTask.getTaskType()) {
|
if (currentState.getTaskType() == currentTask.getTaskType()) {
|
||||||
|
|
|
||||||
|
|
@ -40,7 +40,6 @@ import java.util.List;
|
||||||
import java.util.ListIterator;
|
import java.util.ListIterator;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.concurrent.CountDownLatch;
|
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
|
|
@ -67,14 +66,12 @@ import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisLocalFileProx
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.proxies.util.KinesisLocalFileDataCreator;
|
import com.amazonaws.services.kinesis.clientlibrary.proxies.util.KinesisLocalFileDataCreator;
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
|
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
|
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
|
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
|
import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
|
||||||
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
|
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
|
||||||
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
|
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
|
||||||
import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory;
|
import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory;
|
||||||
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
|
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
|
||||||
import com.amazonaws.services.kinesis.model.Record;
|
import com.amazonaws.services.kinesis.model.Record;
|
||||||
import com.amazonaws.services.kinesis.model.Shard;
|
|
||||||
import com.amazonaws.services.kinesis.model.ShardIteratorType;
|
import com.amazonaws.services.kinesis.model.ShardIteratorType;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -421,127 +418,6 @@ public class ShardConsumerTest {
|
||||||
file.delete();
|
file.delete();
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final class TransientShutdownErrorTestStreamlet extends TestStreamlet {
|
|
||||||
private final CountDownLatch errorShutdownLatch = new CountDownLatch(1);
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void shutdown(ShutdownInput input) {
|
|
||||||
ShutdownReason reason = input.getShutdownReason();
|
|
||||||
if (reason.equals(ShutdownReason.TERMINATE) && errorShutdownLatch.getCount() > 0) {
|
|
||||||
errorShutdownLatch.countDown();
|
|
||||||
throw new RuntimeException("test");
|
|
||||||
} else {
|
|
||||||
super.shutdown(input);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Test method for {@link ShardConsumer#consumeShard()} that ensures a transient error thrown from the record
|
|
||||||
* processor's shutdown method with reason terminate will be retried.
|
|
||||||
*/
|
|
||||||
@Test
|
|
||||||
public final void testConsumeShardWithTransientTerminateError() throws Exception {
|
|
||||||
int numRecs = 10;
|
|
||||||
BigInteger startSeqNum = BigInteger.ONE;
|
|
||||||
String streamShardId = "kinesis-0-0";
|
|
||||||
String testConcurrencyToken = "testToken";
|
|
||||||
List<Shard> shardList = KinesisLocalFileDataCreator.createShardList(1, "kinesis-0-", startSeqNum);
|
|
||||||
// Close the shard so that shutdown is called with reason terminate
|
|
||||||
shardList.get(0).getSequenceNumberRange().setEndingSequenceNumber(
|
|
||||||
KinesisLocalFileProxy.MAX_SEQUENCE_NUMBER.subtract(BigInteger.ONE).toString());
|
|
||||||
File file = KinesisLocalFileDataCreator.generateTempDataFile(shardList, numRecs, "unitTestSCT002");
|
|
||||||
|
|
||||||
IKinesisProxy fileBasedProxy = new KinesisLocalFileProxy(file.getAbsolutePath());
|
|
||||||
|
|
||||||
final int maxRecords = 2;
|
|
||||||
final int idleTimeMS = 0; // keep unit tests fast
|
|
||||||
ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString());
|
|
||||||
checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.TRIM_HORIZON, testConcurrencyToken);
|
|
||||||
when(leaseManager.getLease(anyString())).thenReturn(null);
|
|
||||||
|
|
||||||
TransientShutdownErrorTestStreamlet processor = new TransientShutdownErrorTestStreamlet();
|
|
||||||
|
|
||||||
StreamConfig streamConfig =
|
|
||||||
new StreamConfig(fileBasedProxy,
|
|
||||||
maxRecords,
|
|
||||||
idleTimeMS,
|
|
||||||
callProcessRecordsForEmptyRecordList,
|
|
||||||
skipCheckpointValidationValue, INITIAL_POSITION_LATEST);
|
|
||||||
|
|
||||||
ShardInfo shardInfo = new ShardInfo(streamShardId, testConcurrencyToken, null, null);
|
|
||||||
ShardConsumer consumer =
|
|
||||||
new ShardConsumer(shardInfo,
|
|
||||||
streamConfig,
|
|
||||||
checkpoint,
|
|
||||||
processor,
|
|
||||||
leaseManager,
|
|
||||||
parentShardPollIntervalMillis,
|
|
||||||
cleanupLeasesOfCompletedShards,
|
|
||||||
executorService,
|
|
||||||
metricsFactory,
|
|
||||||
taskBackoffTimeMillis,
|
|
||||||
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST);
|
|
||||||
|
|
||||||
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
|
|
||||||
consumer.consumeShard(); // check on parent shards
|
|
||||||
Thread.sleep(50L);
|
|
||||||
consumer.consumeShard(); // start initialization
|
|
||||||
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING)));
|
|
||||||
consumer.consumeShard(); // initialize
|
|
||||||
processor.getInitializeLatch().await(5, TimeUnit.SECONDS);
|
|
||||||
|
|
||||||
// We expect to process all records in numRecs calls
|
|
||||||
for (int i = 0; i < numRecs;) {
|
|
||||||
boolean newTaskSubmitted = consumer.consumeShard();
|
|
||||||
if (newTaskSubmitted) {
|
|
||||||
LOG.debug("New processing task was submitted, call # " + i);
|
|
||||||
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.PROCESSING)));
|
|
||||||
// CHECKSTYLE:IGNORE ModifiedControlVariable FOR NEXT 1 LINES
|
|
||||||
i += maxRecords;
|
|
||||||
}
|
|
||||||
Thread.sleep(50L);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Consume shards until shutdown terminate is called and it has thrown an exception
|
|
||||||
for (int i = 0; i < 100; i++) {
|
|
||||||
consumer.consumeShard();
|
|
||||||
if (processor.errorShutdownLatch.await(50, TimeUnit.MILLISECONDS)) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
assertEquals(0, processor.errorShutdownLatch.getCount());
|
|
||||||
|
|
||||||
// Wait for a retry of shutdown terminate that should succeed
|
|
||||||
for (int i = 0; i < 100; i++) {
|
|
||||||
consumer.consumeShard();
|
|
||||||
if (processor.getShutdownLatch().await(50, TimeUnit.MILLISECONDS)) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
assertEquals(0, processor.getShutdownLatch().getCount());
|
|
||||||
|
|
||||||
// Wait for shutdown complete now that terminate shutdown is successful
|
|
||||||
for (int i = 0; i < 100; i++) {
|
|
||||||
consumer.consumeShard();
|
|
||||||
if (consumer.getCurrentState() == ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
Thread.sleep(50L);
|
|
||||||
}
|
|
||||||
assertThat(consumer.getCurrentState(), equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE));
|
|
||||||
|
|
||||||
assertThat(processor.getShutdownReason(), is(equalTo(ShutdownReason.TERMINATE)));
|
|
||||||
|
|
||||||
executorService.shutdown();
|
|
||||||
executorService.awaitTermination(60, TimeUnit.SECONDS);
|
|
||||||
|
|
||||||
String iterator = fileBasedProxy.getIterator(streamShardId, ShardIteratorType.TRIM_HORIZON.toString());
|
|
||||||
List<Record> expectedRecords = toUserRecords(fileBasedProxy.get(iterator, numRecs).getRecords());
|
|
||||||
verifyConsumedRecords(expectedRecords, processor.getProcessedRecords());
|
|
||||||
file.delete();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test method for {@link ShardConsumer#consumeShard()} that starts from initial position of type AT_TIMESTAMP.
|
* Test method for {@link ShardConsumer#consumeShard()} that starts from initial position of type AT_TIMESTAMP.
|
||||||
*/
|
*/
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue