diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java
index dc59193f..8bc08d80 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java
@@ -584,7 +584,7 @@ public class Scheduler implements Runnable {
hierarchicalShardSyncer,
metricsFactory);
return new ShardConsumer(cache, executorService, shardInfo, lifecycleConfig.logWarningForTaskAfterMillis(),
- argument, lifecycleConfig.taskExecutionListener());
+ argument, lifecycleConfig.taskExecutionListener(),lifecycleConfig.readTimeoutsToIgnoreBeforeWarning());
}
/**
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/LifecycleConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/LifecycleConfig.java
index b04d75ce..ece9274c 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/LifecycleConfig.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/LifecycleConfig.java
@@ -27,6 +27,7 @@ import software.amazon.kinesis.retrieval.AggregatorUtil;
@Data
@Accessors(fluent = true)
public class LifecycleConfig {
+ public static final int DEFAULT_READ_TIMEOUTS_TO_IGNORE = 0;
/**
* Logs warn message if as task is held in a task for more than the set time.
*
@@ -52,4 +53,13 @@ public class LifecycleConfig {
*
Default value: {@link NoOpTaskExecutionListener}
*/
private TaskExecutionListener taskExecutionListener = new NoOpTaskExecutionListener();
+
+ /**
+ * Number of consecutive ReadTimeouts to ignore before logging warning messages.
+ * If you find yourself seeing frequent ReadTimeout, you should also consider increasing your timeout according to
+ * your expected processing time.
+ *
+ * Default value: 0
+ */
+ private int readTimeoutsToIgnoreBeforeWarning = DEFAULT_READ_TIMEOUTS_TO_IGNORE;
}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java
index 60f109dd..27242b08 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java
@@ -84,21 +84,42 @@ public class ShardConsumer {
private final ShardConsumerSubscriber subscriber;
+ @Deprecated
public ShardConsumer(RecordsPublisher recordsPublisher, ExecutorService executorService, ShardInfo shardInfo,
- Optional logWarningForTaskAfterMillis, ShardConsumerArgument shardConsumerArgument,
- TaskExecutionListener taskExecutionListener) {
+ Optional logWarningForTaskAfterMillis, ShardConsumerArgument shardConsumerArgument,
+ TaskExecutionListener taskExecutionListener) {
this(recordsPublisher, executorService, shardInfo, logWarningForTaskAfterMillis, shardConsumerArgument,
ConsumerStates.INITIAL_STATE,
- ShardConsumer.metricsWrappingFunction(shardConsumerArgument.metricsFactory()), 8, taskExecutionListener);
+ ShardConsumer.metricsWrappingFunction(shardConsumerArgument.metricsFactory()), 8, taskExecutionListener,
+ LifecycleConfig.DEFAULT_READ_TIMEOUTS_TO_IGNORE);
+ }
+
+ public ShardConsumer(RecordsPublisher recordsPublisher, ExecutorService executorService, ShardInfo shardInfo,
+ Optional logWarningForTaskAfterMillis, ShardConsumerArgument shardConsumerArgument,
+ TaskExecutionListener taskExecutionListener, int readTimeoutsToIgnoreBeforeWarning) {
+ this(recordsPublisher, executorService, shardInfo, logWarningForTaskAfterMillis, shardConsumerArgument,
+ ConsumerStates.INITIAL_STATE,
+ ShardConsumer.metricsWrappingFunction(shardConsumerArgument.metricsFactory()), 8, taskExecutionListener,
+ readTimeoutsToIgnoreBeforeWarning);
+ }
+
+ @Deprecated
+ public ShardConsumer(RecordsPublisher recordsPublisher, ExecutorService executorService, ShardInfo shardInfo,
+ Optional logWarningForTaskAfterMillis, ShardConsumerArgument shardConsumerArgument,
+ ConsumerState initialState, Function taskMetricsDecorator, int bufferSize,
+ TaskExecutionListener taskExecutionListener) {
+ this(recordsPublisher, executorService, shardInfo, logWarningForTaskAfterMillis, shardConsumerArgument,
+ initialState, taskMetricsDecorator, bufferSize, taskExecutionListener,
+ LifecycleConfig.DEFAULT_READ_TIMEOUTS_TO_IGNORE);
}
//
// TODO: Make bufferSize configurable
//
public ShardConsumer(RecordsPublisher recordsPublisher, ExecutorService executorService, ShardInfo shardInfo,
- Optional logWarningForTaskAfterMillis, ShardConsumerArgument shardConsumerArgument,
- ConsumerState initialState, Function taskMetricsDecorator,
- int bufferSize, TaskExecutionListener taskExecutionListener) {
+ Optional logWarningForTaskAfterMillis, ShardConsumerArgument shardConsumerArgument,
+ ConsumerState initialState, Function taskMetricsDecorator, int bufferSize,
+ TaskExecutionListener taskExecutionListener, int readTimeoutsToIgnoreBeforeWarning) {
this.recordsPublisher = recordsPublisher;
this.executorService = executorService;
this.shardInfo = shardInfo;
@@ -107,7 +128,8 @@ public class ShardConsumer {
this.taskExecutionListener = taskExecutionListener;
this.currentState = initialState;
this.taskMetricsDecorator = taskMetricsDecorator;
- subscriber = new ShardConsumerSubscriber(recordsPublisher, executorService, bufferSize, this);
+ subscriber = new ShardConsumerSubscriber(recordsPublisher, executorService, bufferSize, this,
+ readTimeoutsToIgnoreBeforeWarning);
this.bufferSize = bufferSize;
if (this.shardInfo.isCompleted()) {
@@ -115,8 +137,7 @@ public class ShardConsumer {
}
}
-
- synchronized void handleInput(ProcessRecordsInput input, Subscription subscription) {
+ synchronized void handleInput(ProcessRecordsInput input, Subscription subscription) {
if (isShutdownRequested()) {
subscription.cancel();
return;
@@ -178,7 +199,8 @@ public class ShardConsumer {
}
Throwable dispatchFailure = subscriber.getAndResetDispatchFailure();
if (dispatchFailure != null) {
- log.warn("Exception occurred while dispatching incoming data. The incoming data has been skipped", dispatchFailure);
+ log.warn("Exception occurred while dispatching incoming data. The incoming data has been skipped",
+ dispatchFailure);
return dispatchFailure;
}
@@ -263,8 +285,8 @@ public class ShardConsumer {
} else {
//
// ShardConsumer has been asked to shutdown before the first task even had a chance to run.
- // In this case generate a successful task outcome, and allow the shutdown to continue. This should only
- // happen if the lease was lost before the initial state had a chance to run.
+ // In this case generate a successful task outcome, and allow the shutdown to continue.
+ // This should only happen if the lease was lost before the initial state had a chance to run.
//
updateState(TaskOutcome.SUCCESSFUL);
}
@@ -284,9 +306,7 @@ public class ShardConsumer {
private synchronized void executeTask(ProcessRecordsInput input) {
TaskExecutionListenerInput taskExecutionListenerInput = TaskExecutionListenerInput.builder()
- .shardInfo(shardInfo)
- .taskType(currentState.taskType())
- .build();
+ .shardInfo(shardInfo).taskType(currentState.taskType()).build();
taskExecutionListener.beforeTaskExecution(taskExecutionListenerInput);
ConsumerTask task = currentState.createTask(shardConsumerArgument, ShardConsumer.this, input);
if (task != null) {
@@ -422,7 +442,7 @@ public class ShardConsumer {
/**
* Default task wrapping function for metrics
- *
+ *
* @param metricsFactory
* the factory used for reporting metrics
* @return a function that will wrap the task with a metrics reporter
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java
index bd89d256..00331c59 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java
@@ -1,16 +1,16 @@
/*
- * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
- * Licensed under the Amazon Software License (the "License").
- * You may not use this file except in compliance with the License.
- * A copy of the License is located at
+ * Licensed under the Amazon Software License (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
*
- * http://aws.amazon.com/asl/
+ * http://aws.amazon.com/asl/
*
- * or in the "license" file accompanying this file. This file is distributed
- * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing
- * permissions and limitations under the License.
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
*/
package software.amazon.kinesis.lifecycle;
@@ -35,11 +35,12 @@ import java.util.concurrent.ExecutorService;
@Slf4j
@Accessors(fluent = true)
class ShardConsumerSubscriber implements Subscriber {
-
private final RecordsPublisher recordsPublisher;
private final Scheduler scheduler;
private final int bufferSize;
private final ShardConsumer shardConsumer;
+ private final int readTimeoutsToIgnoreBeforeWarning;
+ private volatile int readTimeoutSinceLastRead = 0;
@VisibleForTesting
final Object lockObject = new Object();
@@ -54,14 +55,22 @@ class ShardConsumerSubscriber implements Subscriber {
@Getter(AccessLevel.PACKAGE)
private volatile Throwable retrievalFailure;
+ @Deprecated
ShardConsumerSubscriber(RecordsPublisher recordsPublisher, ExecutorService executorService, int bufferSize,
ShardConsumer shardConsumer) {
+ this(recordsPublisher,executorService,bufferSize,shardConsumer, LifecycleConfig.DEFAULT_READ_TIMEOUTS_TO_IGNORE);
+ }
+
+ ShardConsumerSubscriber(RecordsPublisher recordsPublisher, ExecutorService executorService, int bufferSize,
+ ShardConsumer shardConsumer, int readTimeoutsToIgnoreBeforeWarning) {
this.recordsPublisher = recordsPublisher;
this.scheduler = Schedulers.from(executorService);
this.bufferSize = bufferSize;
this.shardConsumer = shardConsumer;
+ this.readTimeoutsToIgnoreBeforeWarning = readTimeoutsToIgnoreBeforeWarning;
}
+
void startSubscriptions() {
synchronized (lockObject) {
if (lastAccepted != null) {
@@ -92,7 +101,8 @@ class ShardConsumerSubscriber implements Subscriber {
Throwable oldFailure = null;
if (retrievalFailure != null) {
synchronized (lockObject) {
- String logMessage = String.format("%s: Failure occurred in retrieval. Restarting data requests", shardConsumer.shardInfo().shardId());
+ String logMessage = String.format("%s: Failure occurred in retrieval. Restarting data requests",
+ shardConsumer.shardInfo().shardId());
if (retrievalFailure instanceof RetryableRetrievalException) {
log.debug(logMessage, retrievalFailure.getCause());
} else {
@@ -157,18 +167,43 @@ class ShardConsumerSubscriber implements Subscriber {
lastRequestTime = Instant.now();
}
}
+
+ readTimeoutSinceLastRead = 0;
}
@Override
public void onError(Throwable t) {
synchronized (lockObject) {
- log.warn("{}: onError(). Cancelling subscription, and marking self as failed.",
- shardConsumer.shardInfo().shardId(), t);
+ if (t instanceof RetryableRetrievalException && t.getMessage().contains("ReadTimeout")) {
+ readTimeoutSinceLastRead++;
+ if (readTimeoutSinceLastRead > readTimeoutsToIgnoreBeforeWarning) {
+ logOnErrorReadTimeoutWarning(t);
+ }
+ } else {
+ logOnErrorWarning(t);
+ }
+
subscription.cancel();
retrievalFailure = t;
}
}
+ protected void logOnErrorWarning(Throwable t) {
+ log.warn(
+ "{}: onError(). Cancelling subscription, and marking self as failed. KCL will "
+ + "recreate the subscription as neccessary to continue processing.",
+ shardConsumer.shardInfo().shardId(), t);
+ }
+
+ protected void logOnErrorReadTimeoutWarning(Throwable t) {
+ log.warn("{}: onError(). Cancelling subscription, and marking self as failed. KCL will"
+ + " recreate the subscription as neccessary to continue processing. If you "
+ + "are seeing this warning frequently consider increasing the SDK timeouts "
+ + "by providing an OverrideConfiguration to the kinesis client. Alternatively you"
+ + "can configure LifecycleConfig.readTimeoutsToIgnoreBeforeWarning to suppress"
+ + "intermittant ReadTimeout warnings.", shardConsumer.shardInfo().shardId(), t);
+ }
+
@Override
public void onComplete() {
log.debug("{}: onComplete(): Received onComplete. Activity should be triggered externally",
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java
index 9382b491..9e8f6888 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java
@@ -111,13 +111,12 @@ public class ConsumerStatesTest {
public void setup() {
argument = new ShardConsumerArgument(shardInfo, STREAM_NAME, leaseRefresher, executorService, recordsPublisher,
shardRecordProcessor, checkpointer, recordProcessorCheckpointer, parentShardPollIntervalMillis,
- taskBackoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist,
- listShardsBackoffTimeInMillis, maxListShardsRetryAttempts,
- shouldCallProcessRecordsEvenForEmptyRecordList, idleTimeInMillis, INITIAL_POSITION_IN_STREAM,
- cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, shardDetector, new AggregatorUtil(),
- hierarchicalShardSyncer, metricsFactory);
- consumer = spy(
- new ShardConsumer(recordsPublisher, executorService, shardInfo, logWarningForTaskAfterMillis, argument, taskExecutionListener));
+ taskBackoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, listShardsBackoffTimeInMillis,
+ maxListShardsRetryAttempts, shouldCallProcessRecordsEvenForEmptyRecordList, idleTimeInMillis,
+ INITIAL_POSITION_IN_STREAM, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, shardDetector,
+ new AggregatorUtil(), hierarchicalShardSyncer, metricsFactory);
+ consumer = spy(new ShardConsumer(recordsPublisher, executorService, shardInfo, logWarningForTaskAfterMillis,
+ argument, taskExecutionListener, 0));
when(shardInfo.shardId()).thenReturn("shardId-000000000000");
when(recordProcessorCheckpointer.checkpointer()).thenReturn(checkpointer);
@@ -257,7 +256,8 @@ public class ConsumerStatesTest {
consumer.gracefulShutdown(shutdownNotification);
ConsumerTask task = state.createTask(argument, consumer, null);
- assertThat(task, shutdownReqTask(ShardRecordProcessor.class, "shardRecordProcessor", equalTo(shardRecordProcessor)));
+ assertThat(task,
+ shutdownReqTask(ShardRecordProcessor.class, "shardRecordProcessor", equalTo(shardRecordProcessor)));
assertThat(task, shutdownReqTask(RecordProcessorCheckpointer.class, "recordProcessorCheckpointer",
equalTo(recordProcessorCheckpointer)));
assertThat(task,
@@ -304,7 +304,8 @@ public class ConsumerStatesTest {
ConsumerTask task = state.createTask(argument, consumer, null);
assertThat(task, shutdownTask(ShardInfo.class, "shardInfo", equalTo(shardInfo)));
- assertThat(task, shutdownTask(ShardRecordProcessor.class, "shardRecordProcessor", equalTo(shardRecordProcessor)));
+ assertThat(task,
+ shutdownTask(ShardRecordProcessor.class, "shardRecordProcessor", equalTo(shardRecordProcessor)));
assertThat(task, shutdownTask(ShardRecordProcessorCheckpointer.class, "recordProcessorCheckpointer",
equalTo(recordProcessorCheckpointer)));
assertThat(task, shutdownTask(ShutdownReason.class, "reason", equalTo(reason)));
@@ -318,8 +319,7 @@ public class ConsumerStatesTest {
assertThat(state.successTransition(), equalTo(ShardConsumerState.SHUTDOWN_COMPLETE.consumerState()));
for (ShutdownReason reason : ShutdownReason.values()) {
- assertThat(state.shutdownTransition(reason),
- equalTo(ShardConsumerState.SHUTDOWN_COMPLETE.consumerState()));
+ assertThat(state.shutdownTransition(reason), equalTo(ShardConsumerState.SHUTDOWN_COMPLETE.consumerState()));
}
assertThat(state.state(), equalTo(ShardConsumerState.SHUTTING_DOWN));
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java
index 21b00451..186aff5e 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java
@@ -16,6 +16,7 @@ package software.amazon.kinesis.lifecycle;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.nullValue;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.argThat;
@@ -46,6 +47,7 @@ import org.junit.Test;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.mockito.Mock;
+import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
@@ -62,6 +64,7 @@ import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.retrieval.KinesisClientRecord;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RecordsRetrieved;
+import software.amazon.kinesis.retrieval.RetryableRetrievalException;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
@Slf4j
@@ -101,7 +104,7 @@ public class ShardConsumerSubscriberTest {
processRecordsInput = ProcessRecordsInput.builder().records(Collections.emptyList())
.cacheEntryTime(Instant.now()).build();
- subscriber = new ShardConsumerSubscriber(recordsPublisher, executorService, bufferSize, shardConsumer);
+ subscriber = new ShardConsumerSubscriber(recordsPublisher, executorService, bufferSize, shardConsumer, 0);
when(recordsRetrieved.processRecordsInput()).thenReturn(processRecordsInput);
}
@@ -244,7 +247,7 @@ public class ShardConsumerSubscriberTest {
executorService = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
.setNameFormat("test-" + testName.getMethodName() + "-%04d").setDaemon(true).build());
- subscriber = new ShardConsumerSubscriber(recordsPublisher, executorService, bufferSize, shardConsumer);
+ subscriber = new ShardConsumerSubscriber(recordsPublisher, executorService, bufferSize, shardConsumer, 0);
addUniqueItem(1);
addTerminalMarker(1);
@@ -271,7 +274,8 @@ public class ShardConsumerSubscriberTest {
executorService.execute(() -> {
try {
//
- // Notify the test as soon as we have started executing, then wait on the post add barrier.
+ // Notify the test as soon as we have started executing, then wait on the post add
+ // subscriptionBarrier.
//
synchronized (processedNotifier) {
processedNotifier.notifyAll();
@@ -444,4 +448,241 @@ public class ShardConsumerSubscriberTest {
}
}
+ class TestShardConsumerSubscriber extends ShardConsumerSubscriber {
+
+ private int genericWarningLogged = 0;
+ private int readTimeoutWarningLogged = 0;
+
+ TestShardConsumerSubscriber(RecordsPublisher recordsPublisher, ExecutorService executorService, int bufferSize,
+ ShardConsumer shardConsumer,
+ // Setup test expectations
+ int readTimeoutsToIgnoreBeforeWarning) {
+ super(recordsPublisher, executorService, bufferSize, shardConsumer, readTimeoutsToIgnoreBeforeWarning);
+ }
+
+ @Override
+ protected void logOnErrorWarning(Throwable t) {
+ genericWarningLogged++;
+ super.logOnErrorWarning(t);
+ }
+
+ @Override
+ protected void logOnErrorReadTimeoutWarning(Throwable t) {
+ readTimeoutWarningLogged++;
+ super.logOnErrorReadTimeoutWarning(t);
+ }
+ }
+
+ /**
+ * Test to validate the warning message from ShardConsumer is not suppressed with the default configuration of 0
+ *
+ * @throws Exception
+ */
+ @Test
+ public void noLoggingSuppressionNeededOnHappyPathTest() {
+ // All requests are expected to succeed. No logs are expected.
+
+ // Setup test expectations
+ int readTimeoutsToIgnore = 0;
+ int expectedReadTimeoutLogs = 0;
+ int expectedGenericLogs = 0;
+ TestShardConsumerSubscriber consumer = new TestShardConsumerSubscriber(mock(RecordsPublisher.class),
+ Executors.newFixedThreadPool(1), 8, shardConsumer, readTimeoutsToIgnore);
+ consumer.startSubscriptions();
+ mimicSuccess(consumer);
+ mimicSuccess(consumer);
+ mimicSuccess(consumer);
+ mimicSuccess(consumer);
+ mimicSuccess(consumer);
+ assertEquals(expectedGenericLogs, consumer.genericWarningLogged);
+ assertEquals(expectedReadTimeoutLogs, consumer.readTimeoutWarningLogged);
+ }
+
+ /**
+ * Test to validate the warning message from ShardConsumer is not suppressed with the default configuration of 0
+ *
+ * @throws Exception
+ */
+ @Test
+ public void loggingNotSuppressedAfterTimeoutTest() {
+ Exception exceptionToThrow = new software.amazon.kinesis.retrieval.RetryableRetrievalException("ReadTimeout");
+ // The first 2 requests succeed, followed by an exception, a success, and another exception.
+ // We are not ignoring any ReadTimeouts, so we expect 1 log on the first failure,
+ // and another log on the second failure.
+
+ // Setup test expectations
+ int readTimeoutsToIgnore = 0;
+ int expectedReadTimeoutLogs = 2;
+ int expectedGenericLogs = 0;
+ TestShardConsumerSubscriber consumer = new TestShardConsumerSubscriber(mock(RecordsPublisher.class),
+ Executors.newFixedThreadPool(1), 8, shardConsumer, readTimeoutsToIgnore);
+ consumer.startSubscriptions();
+ mimicSuccess(consumer);
+ mimicSuccess(consumer);
+ mimicException(exceptionToThrow, consumer);
+ mimicSuccess(consumer);
+ mimicException(exceptionToThrow, consumer);
+ assertEquals(expectedGenericLogs, consumer.genericWarningLogged);
+ assertEquals(expectedReadTimeoutLogs, consumer.readTimeoutWarningLogged);
+ }
+
+ /**
+ * Test to validate the warning message from ShardConsumer is successfully supressed if we only have intermittant
+ * readTimeouts.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void loggingSuppressedAfterIntermittentTimeoutTest() {
+ Exception exceptionToThrow = new software.amazon.kinesis.retrieval.RetryableRetrievalException("ReadTimeout");
+ // The first 2 requests succeed, followed by an exception, a success, and another exception.
+ // We are ignoring a single consecutive ReadTimeout, So we don't expect any logs to be made.
+
+ // Setup test expectations
+ int readTimeoutsToIgnore = 1;
+ int expectedReadTimeoutLogs = 0;
+ int expectedGenericLogs = 0;
+ TestShardConsumerSubscriber consumer = new TestShardConsumerSubscriber(mock(RecordsPublisher.class),
+ Executors.newFixedThreadPool(1), 8, shardConsumer, readTimeoutsToIgnore);
+ consumer.startSubscriptions();
+ mimicSuccess(consumer);
+ mimicSuccess(consumer);
+ mimicException(exceptionToThrow, consumer);
+ mimicSuccess(consumer);
+ mimicException(exceptionToThrow, consumer);
+ assertEquals(expectedGenericLogs, consumer.genericWarningLogged);
+ assertEquals(expectedReadTimeoutLogs, consumer.readTimeoutWarningLogged);
+ }
+
+ /**
+ * Test to validate the warning message from ShardConsumer is successfully logged if multiple sequential timeouts
+ * occur.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void loggingPartiallySuppressedAfterMultipleTimeoutTest() {
+ Exception exceptionToThrow = new software.amazon.kinesis.retrieval.RetryableRetrievalException("ReadTimeout");
+ // The first 2 requests are expected to throw an exception, followed by a success, and 2 more exceptions.
+ // We are ignoring a single consecutive ReadTimeout, so we expect a single log starting after the second
+ // consecutive ReadTimeout (Request 2) and another log after another second consecutive failure (Request 5)
+
+ // Setup test expectations
+ int readTimeoutsToIgnore = 1;
+ int expectedReadTimeoutLogs = 2;
+ int expectedGenericLogs = 0;
+ TestShardConsumerSubscriber consumer = new TestShardConsumerSubscriber(mock(RecordsPublisher.class),
+ Executors.newFixedThreadPool(1), 8, shardConsumer, readTimeoutsToIgnore);
+ consumer.startSubscriptions();
+ mimicException(exceptionToThrow, consumer);
+ mimicException(exceptionToThrow, consumer);
+ mimicSuccess(consumer);
+ mimicException(exceptionToThrow, consumer);
+ mimicException(exceptionToThrow, consumer);
+ assertEquals(expectedGenericLogs, consumer.genericWarningLogged);
+ assertEquals(expectedReadTimeoutLogs, consumer.readTimeoutWarningLogged);
+ }
+
+ /**
+ * Test to validate the warning message from ShardConsumer is successfully logged if sequential timeouts occur.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void loggingPartiallySuppressedAfterConsecutiveTimeoutTest() {
+ Exception exceptionToThrow = new software.amazon.kinesis.retrieval.RetryableRetrievalException("ReadTimeout");
+ // Every request of 5 requests are expected to fail.
+ // We are ignoring 2 consecutive ReadTimeout exceptions, so we expect a single log starting after the third
+ // consecutive ReadTimeout (Request 3) and another log after each other request since we are still breaching the
+ // number of consecutive ReadTimeouts to ignore.
+
+ // Setup test expectations
+ int readTimeoutsToIgnore = 2;
+ int expectedReadTimeoutLogs = 3;
+ int expectedGenericLogs = 0;
+ TestShardConsumerSubscriber consumer = new TestShardConsumerSubscriber(mock(RecordsPublisher.class),
+ Executors.newFixedThreadPool(1), 8, shardConsumer, readTimeoutsToIgnore);
+ consumer.startSubscriptions();
+ mimicException(exceptionToThrow, consumer);
+ mimicException(exceptionToThrow, consumer);
+ mimicException(exceptionToThrow, consumer);
+ mimicException(exceptionToThrow, consumer);
+ mimicException(exceptionToThrow, consumer);
+ assertEquals(expectedGenericLogs, consumer.genericWarningLogged);
+ assertEquals(expectedReadTimeoutLogs, consumer.readTimeoutWarningLogged);
+ }
+
+ /**
+ * Test to validate the non-timeout warning message from ShardConsumer is not suppressed with the default
+ * configuration of 0
+ *
+ * @throws Exception
+ */
+ @Test
+ public void loggingNotSuppressedOnNonReadTimeoutExceptionNotIgnoringReadTimeoutsExceptionTest() {
+ // We're not throwing a ReadTimeout, so no suppression is expected.
+ // The test expects a non-ReadTimeout exception to be thrown on requests 3 and 5, and we expect logs on
+ // each Non-ReadTimeout Exception, no matter what the number of ReadTimeoutsToIgnore we pass in.
+ Exception exceptionToThrow = new RuntimeException("Uh oh Not a ReadTimeout");
+
+ // Setup test expectations
+ int readTimeoutsToIgnore = 0;
+ int expectedReadTimeoutLogs = 0;
+ int expectedGenericLogs = 2;
+ TestShardConsumerSubscriber consumer = new TestShardConsumerSubscriber(mock(RecordsPublisher.class),
+ Executors.newFixedThreadPool(1), 8, shardConsumer, readTimeoutsToIgnore);
+ consumer.startSubscriptions();
+ mimicSuccess(consumer);
+ mimicSuccess(consumer);
+ mimicException(exceptionToThrow, consumer);
+ mimicSuccess(consumer);
+ mimicException(exceptionToThrow, consumer);
+ assertEquals(expectedGenericLogs, consumer.genericWarningLogged);
+ assertEquals(expectedReadTimeoutLogs, consumer.readTimeoutWarningLogged);
+ }
+
+ /**
+ * Test to validate the non-timeout warning message from ShardConsumer is not suppressed with 2 ReadTimeouts to
+ * ignore
+ *
+ * @throws Exception
+ */
+ @Test
+ public void loggingNotSuppressedOnNonReadTimeoutExceptionIgnoringReadTimeoutsTest() {
+
+ // We're not throwing a ReadTimeout, so no suppression is expected.
+ // The test expects a non-ReadTimeout exception to be thrown on requests 3 and 5, and we expect logs on
+ // each Non-ReadTimeout Exception, no matter what the number of ReadTimeoutsToIgnore we pass in,
+ // in this case if we had instead thrown ReadTimeouts, we would not have expected any logs with this
+ // configuration.
+ Exception exceptionToThrow = new RuntimeException("Uh oh Not a ReadTimeout");
+
+ // Setup test expectations
+ int readTimeoutsToIgnore = 2;
+ int expectedReadTimeoutLogs = 0;
+ int expectedGenericLogs = 2;
+ TestShardConsumerSubscriber consumer = new TestShardConsumerSubscriber(mock(RecordsPublisher.class),
+ Executors.newFixedThreadPool(1), 8, shardConsumer, readTimeoutsToIgnore);
+ consumer.startSubscriptions();
+ mimicSuccess(consumer);
+ mimicSuccess(consumer);
+ mimicException(exceptionToThrow, consumer);
+ mimicSuccess(consumer);
+ mimicException(exceptionToThrow, consumer);
+ assertEquals(expectedGenericLogs, consumer.genericWarningLogged);
+ assertEquals(expectedReadTimeoutLogs, consumer.readTimeoutWarningLogged);
+ }
+
+ private void mimicSuccess(TestShardConsumerSubscriber consumer) {
+ // Mimic a successful publishing request
+ consumer.onNext(recordsRetrieved);
+ }
+
+ private void mimicException(Exception exceptionToThrow, TestShardConsumerSubscriber consumer) {
+ // Mimic throwing an exception during publishing,
+ consumer.onError(exceptionToThrow);
+ // restart subscriptions to allow further requests to be mimiced
+ consumer.startSubscriptions();
+ }
+
}
\ No newline at end of file
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java
index 00678682..adf163d1 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java
@@ -139,16 +139,15 @@ public class ShardConsumerTest {
processRecordsInput = ProcessRecordsInput.builder().isAtShardEnd(false).cacheEntryTime(Instant.now())
.millisBehindLatest(1000L).records(Collections.emptyList()).build();
- initialTaskInput = TaskExecutionListenerInput.builder().shardInfo(shardInfo)
- .taskType(TaskType.INITIALIZE).build();
- processTaskInput = TaskExecutionListenerInput.builder().shardInfo(shardInfo)
- .taskType(TaskType.PROCESS).build();
+ initialTaskInput = TaskExecutionListenerInput.builder().shardInfo(shardInfo).taskType(TaskType.INITIALIZE)
+ .build();
+ processTaskInput = TaskExecutionListenerInput.builder().shardInfo(shardInfo).taskType(TaskType.PROCESS).build();
shutdownRequestedTaskInput = TaskExecutionListenerInput.builder().shardInfo(shardInfo)
.taskType(TaskType.SHUTDOWN_NOTIFICATION).build();
shutdownRequestedAwaitTaskInput = TaskExecutionListenerInput.builder().shardInfo(shardInfo)
.taskType(TaskType.SHUTDOWN_COMPLETE).build();
- shutdownTaskInput = TaskExecutionListenerInput.builder().shardInfo(shardInfo)
- .taskType(TaskType.SHUTDOWN).build();
+ shutdownTaskInput = TaskExecutionListenerInput.builder().shardInfo(shardInfo).taskType(TaskType.SHUTDOWN)
+ .build();
}
@After
@@ -245,7 +244,7 @@ public class ShardConsumerTest {
TestPublisher cache = new TestPublisher();
ShardConsumer consumer = new ShardConsumer(cache, executorService, shardInfo, logWarningForTaskAfterMillis,
- shardConsumerArgument, initialState, Function.identity(), 1, taskExecutionListener);
+ shardConsumerArgument, initialState, Function.identity(), 1, taskExecutionListener, 0);
boolean initComplete = false;
while (!initComplete) {
@@ -299,7 +298,7 @@ public class ShardConsumerTest {
TestPublisher cache = new TestPublisher();
ShardConsumer consumer = new ShardConsumer(cache, executorService, shardInfo, logWarningForTaskAfterMillis,
- shardConsumerArgument, initialState, Function.identity(), 1, taskExecutionListener);
+ shardConsumerArgument, initialState, Function.identity(), 1, taskExecutionListener, 0);
boolean initComplete = false;
while (!initComplete) {
@@ -323,7 +322,7 @@ public class ShardConsumerTest {
log.debug("Release processing task interlock");
awaitAndResetBarrier(processingTaskInterlock);
- while(!consumer.isShutdown()) {
+ while (!consumer.isShutdown()) {
consumer.executeLifecycle();
Thread.yield();
}
@@ -358,7 +357,7 @@ public class ShardConsumerTest {
TestPublisher cache = new TestPublisher();
ShardConsumer consumer = new ShardConsumer(cache, executorService, shardInfo, logWarningForTaskAfterMillis,
- shardConsumerArgument, initialState, Function.identity(), 1, taskExecutionListener);
+ shardConsumerArgument, initialState, Function.identity(), 1, taskExecutionListener, 0);
boolean initComplete = false;
while (!initComplete) {
@@ -417,7 +416,8 @@ public class ShardConsumerTest {
@Ignore
public final void testInitializationStateUponFailure() throws Exception {
ShardConsumer consumer = new ShardConsumer(recordsPublisher, executorService, shardInfo,
- logWarningForTaskAfterMillis, shardConsumerArgument, initialState, Function.identity(), 1, taskExecutionListener);
+ logWarningForTaskAfterMillis, shardConsumerArgument, initialState, Function.identity(), 1,
+ taskExecutionListener, 0);
when(initialState.createTask(eq(shardConsumerArgument), eq(consumer), any())).thenReturn(initializeTask);
when(initializeTask.call()).thenReturn(new TaskResult(new Exception("Bad")));
@@ -450,7 +450,7 @@ public class ShardConsumerTest {
ExecutorService failingService = mock(ExecutorService.class);
ShardConsumer consumer = new ShardConsumer(recordsPublisher, failingService, shardInfo,
- logWarningForTaskAfterMillis, shardConsumerArgument, initialState, t -> t, 1, taskExecutionListener);
+ logWarningForTaskAfterMillis, shardConsumerArgument, initialState, t -> t, 1, taskExecutionListener, 0);
doThrow(new RejectedExecutionException()).when(failingService).execute(any());
@@ -464,7 +464,7 @@ public class ShardConsumerTest {
@Test
public void testErrorThrowableInInitialization() throws Exception {
ShardConsumer consumer = new ShardConsumer(recordsPublisher, executorService, shardInfo,
- logWarningForTaskAfterMillis, shardConsumerArgument, initialState, t -> t, 1, taskExecutionListener);
+ logWarningForTaskAfterMillis, shardConsumerArgument, initialState, t -> t, 1, taskExecutionListener, 0);
when(initialState.createTask(any(), any(), any())).thenReturn(initializeTask);
when(initialState.taskType()).thenReturn(TaskType.INITIALIZE);
@@ -488,7 +488,7 @@ public class ShardConsumerTest {
TestPublisher cache = new TestPublisher();
ShardConsumer consumer = new ShardConsumer(cache, executorService, shardInfo, logWarningForTaskAfterMillis,
- shardConsumerArgument, initialState, t -> t, 1, taskExecutionListener);
+ shardConsumerArgument, initialState, t -> t, 1, taskExecutionListener, 0);
mockSuccessfulInitialize(null);
@@ -571,7 +571,7 @@ public class ShardConsumerTest {
TestPublisher cache = new TestPublisher();
ShardConsumer consumer = new ShardConsumer(cache, executorService, shardInfo, Optional.of(1L),
- shardConsumerArgument, initialState, Function.identity(), 1, taskExecutionListener);
+ shardConsumerArgument, initialState, Function.identity(), 1, taskExecutionListener, 0);
mockSuccessfulInitialize(null);
mockSuccessfulProcessing(null);
@@ -618,7 +618,7 @@ public class ShardConsumerTest {
TestPublisher cache = new TestPublisher();
ShardConsumer consumer = new ShardConsumer(cache, executorService, shardInfo, Optional.of(1L),
- shardConsumerArgument, initialState, Function.identity(), 1, taskExecutionListener);
+ shardConsumerArgument, initialState, Function.identity(), 1, taskExecutionListener, 0);
CyclicBarrier taskArriveBarrier = new CyclicBarrier(2);
CyclicBarrier taskDepartBarrier = new CyclicBarrier(2);