Introducing configuration for ignoring ReadTimeouts before printing warnings. (#528)

* Added configuration to ignore a number of ReadTimeouts before printing warnings.

     Messaging now directs customer to configure the SDK with appropriate timeouts based on their processing model.
     Warning messages from ShardConsumer now specify that the KCL will reattempt to subscribe to the stream as needed.
     Added configuraiton to Lifecycle configuration to enable ignoring a number of ReadTimeouts before printing warning messages.

* Removed functional tests that are now replicated as unit tests

* Refactored after review of Pull Request

Marked original ShardConsumerSubscriber constructor as deprecated
Renamed tests to be more descriptive

* Updated Default Value injection to ShardConsumerSubscriber

* Refactored based on PR comments

* Removed Chained Constructor from test class

* Added comments to tests to make then easier to understand

* Seperating coding of each log suppression test
This commit is contained in:
Cory-Bradshaw 2019-04-05 15:13:10 -07:00 committed by Sahil Palvia
parent 1bfaa90322
commit 2851a8b6e0
7 changed files with 366 additions and 60 deletions

View file

@ -584,7 +584,7 @@ public class Scheduler implements Runnable {
hierarchicalShardSyncer,
metricsFactory);
return new ShardConsumer(cache, executorService, shardInfo, lifecycleConfig.logWarningForTaskAfterMillis(),
argument, lifecycleConfig.taskExecutionListener());
argument, lifecycleConfig.taskExecutionListener(),lifecycleConfig.readTimeoutsToIgnoreBeforeWarning());
}
/**

View file

@ -27,6 +27,7 @@ import software.amazon.kinesis.retrieval.AggregatorUtil;
@Data
@Accessors(fluent = true)
public class LifecycleConfig {
public static final int DEFAULT_READ_TIMEOUTS_TO_IGNORE = 0;
/**
* Logs warn message if as task is held in a task for more than the set time.
*
@ -52,4 +53,13 @@ public class LifecycleConfig {
* <p>Default value: {@link NoOpTaskExecutionListener}</p>
*/
private TaskExecutionListener taskExecutionListener = new NoOpTaskExecutionListener();
/**
* Number of consecutive ReadTimeouts to ignore before logging warning messages.
* If you find yourself seeing frequent ReadTimeout, you should also consider increasing your timeout according to
* your expected processing time.
*
* <p>Default value: 0</p>
*/
private int readTimeoutsToIgnoreBeforeWarning = DEFAULT_READ_TIMEOUTS_TO_IGNORE;
}

View file

@ -84,12 +84,33 @@ public class ShardConsumer {
private final ShardConsumerSubscriber subscriber;
@Deprecated
public ShardConsumer(RecordsPublisher recordsPublisher, ExecutorService executorService, ShardInfo shardInfo,
Optional<Long> logWarningForTaskAfterMillis, ShardConsumerArgument shardConsumerArgument,
TaskExecutionListener taskExecutionListener) {
this(recordsPublisher, executorService, shardInfo, logWarningForTaskAfterMillis, shardConsumerArgument,
ConsumerStates.INITIAL_STATE,
ShardConsumer.metricsWrappingFunction(shardConsumerArgument.metricsFactory()), 8, taskExecutionListener);
ShardConsumer.metricsWrappingFunction(shardConsumerArgument.metricsFactory()), 8, taskExecutionListener,
LifecycleConfig.DEFAULT_READ_TIMEOUTS_TO_IGNORE);
}
public ShardConsumer(RecordsPublisher recordsPublisher, ExecutorService executorService, ShardInfo shardInfo,
Optional<Long> logWarningForTaskAfterMillis, ShardConsumerArgument shardConsumerArgument,
TaskExecutionListener taskExecutionListener, int readTimeoutsToIgnoreBeforeWarning) {
this(recordsPublisher, executorService, shardInfo, logWarningForTaskAfterMillis, shardConsumerArgument,
ConsumerStates.INITIAL_STATE,
ShardConsumer.metricsWrappingFunction(shardConsumerArgument.metricsFactory()), 8, taskExecutionListener,
readTimeoutsToIgnoreBeforeWarning);
}
@Deprecated
public ShardConsumer(RecordsPublisher recordsPublisher, ExecutorService executorService, ShardInfo shardInfo,
Optional<Long> logWarningForTaskAfterMillis, ShardConsumerArgument shardConsumerArgument,
ConsumerState initialState, Function<ConsumerTask, ConsumerTask> taskMetricsDecorator, int bufferSize,
TaskExecutionListener taskExecutionListener) {
this(recordsPublisher, executorService, shardInfo, logWarningForTaskAfterMillis, shardConsumerArgument,
initialState, taskMetricsDecorator, bufferSize, taskExecutionListener,
LifecycleConfig.DEFAULT_READ_TIMEOUTS_TO_IGNORE);
}
//
@ -97,8 +118,8 @@ public class ShardConsumer {
//
public ShardConsumer(RecordsPublisher recordsPublisher, ExecutorService executorService, ShardInfo shardInfo,
Optional<Long> logWarningForTaskAfterMillis, ShardConsumerArgument shardConsumerArgument,
ConsumerState initialState, Function<ConsumerTask, ConsumerTask> taskMetricsDecorator,
int bufferSize, TaskExecutionListener taskExecutionListener) {
ConsumerState initialState, Function<ConsumerTask, ConsumerTask> taskMetricsDecorator, int bufferSize,
TaskExecutionListener taskExecutionListener, int readTimeoutsToIgnoreBeforeWarning) {
this.recordsPublisher = recordsPublisher;
this.executorService = executorService;
this.shardInfo = shardInfo;
@ -107,7 +128,8 @@ public class ShardConsumer {
this.taskExecutionListener = taskExecutionListener;
this.currentState = initialState;
this.taskMetricsDecorator = taskMetricsDecorator;
subscriber = new ShardConsumerSubscriber(recordsPublisher, executorService, bufferSize, this);
subscriber = new ShardConsumerSubscriber(recordsPublisher, executorService, bufferSize, this,
readTimeoutsToIgnoreBeforeWarning);
this.bufferSize = bufferSize;
if (this.shardInfo.isCompleted()) {
@ -115,7 +137,6 @@ public class ShardConsumer {
}
}
synchronized void handleInput(ProcessRecordsInput input, Subscription subscription) {
if (isShutdownRequested()) {
subscription.cancel();
@ -178,7 +199,8 @@ public class ShardConsumer {
}
Throwable dispatchFailure = subscriber.getAndResetDispatchFailure();
if (dispatchFailure != null) {
log.warn("Exception occurred while dispatching incoming data. The incoming data has been skipped", dispatchFailure);
log.warn("Exception occurred while dispatching incoming data. The incoming data has been skipped",
dispatchFailure);
return dispatchFailure;
}
@ -263,8 +285,8 @@ public class ShardConsumer {
} else {
//
// ShardConsumer has been asked to shutdown before the first task even had a chance to run.
// In this case generate a successful task outcome, and allow the shutdown to continue. This should only
// happen if the lease was lost before the initial state had a chance to run.
// In this case generate a successful task outcome, and allow the shutdown to continue.
// This should only happen if the lease was lost before the initial state had a chance to run.
//
updateState(TaskOutcome.SUCCESSFUL);
}
@ -284,9 +306,7 @@ public class ShardConsumer {
private synchronized void executeTask(ProcessRecordsInput input) {
TaskExecutionListenerInput taskExecutionListenerInput = TaskExecutionListenerInput.builder()
.shardInfo(shardInfo)
.taskType(currentState.taskType())
.build();
.shardInfo(shardInfo).taskType(currentState.taskType()).build();
taskExecutionListener.beforeTaskExecution(taskExecutionListenerInput);
ConsumerTask task = currentState.createTask(shardConsumerArgument, ShardConsumer.this, input);
if (task != null) {

View file

@ -35,11 +35,12 @@ import java.util.concurrent.ExecutorService;
@Slf4j
@Accessors(fluent = true)
class ShardConsumerSubscriber implements Subscriber<RecordsRetrieved> {
private final RecordsPublisher recordsPublisher;
private final Scheduler scheduler;
private final int bufferSize;
private final ShardConsumer shardConsumer;
private final int readTimeoutsToIgnoreBeforeWarning;
private volatile int readTimeoutSinceLastRead = 0;
@VisibleForTesting
final Object lockObject = new Object();
@ -54,14 +55,22 @@ class ShardConsumerSubscriber implements Subscriber<RecordsRetrieved> {
@Getter(AccessLevel.PACKAGE)
private volatile Throwable retrievalFailure;
@Deprecated
ShardConsumerSubscriber(RecordsPublisher recordsPublisher, ExecutorService executorService, int bufferSize,
ShardConsumer shardConsumer) {
this(recordsPublisher,executorService,bufferSize,shardConsumer, LifecycleConfig.DEFAULT_READ_TIMEOUTS_TO_IGNORE);
}
ShardConsumerSubscriber(RecordsPublisher recordsPublisher, ExecutorService executorService, int bufferSize,
ShardConsumer shardConsumer, int readTimeoutsToIgnoreBeforeWarning) {
this.recordsPublisher = recordsPublisher;
this.scheduler = Schedulers.from(executorService);
this.bufferSize = bufferSize;
this.shardConsumer = shardConsumer;
this.readTimeoutsToIgnoreBeforeWarning = readTimeoutsToIgnoreBeforeWarning;
}
void startSubscriptions() {
synchronized (lockObject) {
if (lastAccepted != null) {
@ -92,7 +101,8 @@ class ShardConsumerSubscriber implements Subscriber<RecordsRetrieved> {
Throwable oldFailure = null;
if (retrievalFailure != null) {
synchronized (lockObject) {
String logMessage = String.format("%s: Failure occurred in retrieval. Restarting data requests", shardConsumer.shardInfo().shardId());
String logMessage = String.format("%s: Failure occurred in retrieval. Restarting data requests",
shardConsumer.shardInfo().shardId());
if (retrievalFailure instanceof RetryableRetrievalException) {
log.debug(logMessage, retrievalFailure.getCause());
} else {
@ -157,18 +167,43 @@ class ShardConsumerSubscriber implements Subscriber<RecordsRetrieved> {
lastRequestTime = Instant.now();
}
}
readTimeoutSinceLastRead = 0;
}
@Override
public void onError(Throwable t) {
synchronized (lockObject) {
log.warn("{}: onError(). Cancelling subscription, and marking self as failed.",
shardConsumer.shardInfo().shardId(), t);
if (t instanceof RetryableRetrievalException && t.getMessage().contains("ReadTimeout")) {
readTimeoutSinceLastRead++;
if (readTimeoutSinceLastRead > readTimeoutsToIgnoreBeforeWarning) {
logOnErrorReadTimeoutWarning(t);
}
} else {
logOnErrorWarning(t);
}
subscription.cancel();
retrievalFailure = t;
}
}
protected void logOnErrorWarning(Throwable t) {
log.warn(
"{}: onError(). Cancelling subscription, and marking self as failed. KCL will "
+ "recreate the subscription as neccessary to continue processing.",
shardConsumer.shardInfo().shardId(), t);
}
protected void logOnErrorReadTimeoutWarning(Throwable t) {
log.warn("{}: onError(). Cancelling subscription, and marking self as failed. KCL will"
+ " recreate the subscription as neccessary to continue processing. If you "
+ "are seeing this warning frequently consider increasing the SDK timeouts "
+ "by providing an OverrideConfiguration to the kinesis client. Alternatively you"
+ "can configure LifecycleConfig.readTimeoutsToIgnoreBeforeWarning to suppress"
+ "intermittant ReadTimeout warnings.", shardConsumer.shardInfo().shardId(), t);
}
@Override
public void onComplete() {
log.debug("{}: onComplete(): Received onComplete. Activity should be triggered externally",

View file

@ -111,13 +111,12 @@ public class ConsumerStatesTest {
public void setup() {
argument = new ShardConsumerArgument(shardInfo, STREAM_NAME, leaseRefresher, executorService, recordsPublisher,
shardRecordProcessor, checkpointer, recordProcessorCheckpointer, parentShardPollIntervalMillis,
taskBackoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist,
listShardsBackoffTimeInMillis, maxListShardsRetryAttempts,
shouldCallProcessRecordsEvenForEmptyRecordList, idleTimeInMillis, INITIAL_POSITION_IN_STREAM,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, shardDetector, new AggregatorUtil(),
hierarchicalShardSyncer, metricsFactory);
consumer = spy(
new ShardConsumer(recordsPublisher, executorService, shardInfo, logWarningForTaskAfterMillis, argument, taskExecutionListener));
taskBackoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, listShardsBackoffTimeInMillis,
maxListShardsRetryAttempts, shouldCallProcessRecordsEvenForEmptyRecordList, idleTimeInMillis,
INITIAL_POSITION_IN_STREAM, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, shardDetector,
new AggregatorUtil(), hierarchicalShardSyncer, metricsFactory);
consumer = spy(new ShardConsumer(recordsPublisher, executorService, shardInfo, logWarningForTaskAfterMillis,
argument, taskExecutionListener, 0));
when(shardInfo.shardId()).thenReturn("shardId-000000000000");
when(recordProcessorCheckpointer.checkpointer()).thenReturn(checkpointer);
@ -257,7 +256,8 @@ public class ConsumerStatesTest {
consumer.gracefulShutdown(shutdownNotification);
ConsumerTask task = state.createTask(argument, consumer, null);
assertThat(task, shutdownReqTask(ShardRecordProcessor.class, "shardRecordProcessor", equalTo(shardRecordProcessor)));
assertThat(task,
shutdownReqTask(ShardRecordProcessor.class, "shardRecordProcessor", equalTo(shardRecordProcessor)));
assertThat(task, shutdownReqTask(RecordProcessorCheckpointer.class, "recordProcessorCheckpointer",
equalTo(recordProcessorCheckpointer)));
assertThat(task,
@ -304,7 +304,8 @@ public class ConsumerStatesTest {
ConsumerTask task = state.createTask(argument, consumer, null);
assertThat(task, shutdownTask(ShardInfo.class, "shardInfo", equalTo(shardInfo)));
assertThat(task, shutdownTask(ShardRecordProcessor.class, "shardRecordProcessor", equalTo(shardRecordProcessor)));
assertThat(task,
shutdownTask(ShardRecordProcessor.class, "shardRecordProcessor", equalTo(shardRecordProcessor)));
assertThat(task, shutdownTask(ShardRecordProcessorCheckpointer.class, "recordProcessorCheckpointer",
equalTo(recordProcessorCheckpointer)));
assertThat(task, shutdownTask(ShutdownReason.class, "reason", equalTo(reason)));
@ -318,8 +319,7 @@ public class ConsumerStatesTest {
assertThat(state.successTransition(), equalTo(ShardConsumerState.SHUTDOWN_COMPLETE.consumerState()));
for (ShutdownReason reason : ShutdownReason.values()) {
assertThat(state.shutdownTransition(reason),
equalTo(ShardConsumerState.SHUTDOWN_COMPLETE.consumerState()));
assertThat(state.shutdownTransition(reason), equalTo(ShardConsumerState.SHUTDOWN_COMPLETE.consumerState()));
}
assertThat(state.state(), equalTo(ShardConsumerState.SHUTTING_DOWN));

View file

@ -16,6 +16,7 @@ package software.amazon.kinesis.lifecycle;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.argThat;
@ -46,6 +47,7 @@ import org.junit.Test;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
@ -62,6 +64,7 @@ import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.retrieval.KinesisClientRecord;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RecordsRetrieved;
import software.amazon.kinesis.retrieval.RetryableRetrievalException;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
@Slf4j
@ -101,7 +104,7 @@ public class ShardConsumerSubscriberTest {
processRecordsInput = ProcessRecordsInput.builder().records(Collections.emptyList())
.cacheEntryTime(Instant.now()).build();
subscriber = new ShardConsumerSubscriber(recordsPublisher, executorService, bufferSize, shardConsumer);
subscriber = new ShardConsumerSubscriber(recordsPublisher, executorService, bufferSize, shardConsumer, 0);
when(recordsRetrieved.processRecordsInput()).thenReturn(processRecordsInput);
}
@ -244,7 +247,7 @@ public class ShardConsumerSubscriberTest {
executorService = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
.setNameFormat("test-" + testName.getMethodName() + "-%04d").setDaemon(true).build());
subscriber = new ShardConsumerSubscriber(recordsPublisher, executorService, bufferSize, shardConsumer);
subscriber = new ShardConsumerSubscriber(recordsPublisher, executorService, bufferSize, shardConsumer, 0);
addUniqueItem(1);
addTerminalMarker(1);
@ -271,7 +274,8 @@ public class ShardConsumerSubscriberTest {
executorService.execute(() -> {
try {
//
// Notify the test as soon as we have started executing, then wait on the post add barrier.
// Notify the test as soon as we have started executing, then wait on the post add
// subscriptionBarrier.
//
synchronized (processedNotifier) {
processedNotifier.notifyAll();
@ -444,4 +448,241 @@ public class ShardConsumerSubscriberTest {
}
}
class TestShardConsumerSubscriber extends ShardConsumerSubscriber {
private int genericWarningLogged = 0;
private int readTimeoutWarningLogged = 0;
TestShardConsumerSubscriber(RecordsPublisher recordsPublisher, ExecutorService executorService, int bufferSize,
ShardConsumer shardConsumer,
// Setup test expectations
int readTimeoutsToIgnoreBeforeWarning) {
super(recordsPublisher, executorService, bufferSize, shardConsumer, readTimeoutsToIgnoreBeforeWarning);
}
@Override
protected void logOnErrorWarning(Throwable t) {
genericWarningLogged++;
super.logOnErrorWarning(t);
}
@Override
protected void logOnErrorReadTimeoutWarning(Throwable t) {
readTimeoutWarningLogged++;
super.logOnErrorReadTimeoutWarning(t);
}
}
/**
* Test to validate the warning message from ShardConsumer is not suppressed with the default configuration of 0
*
* @throws Exception
*/
@Test
public void noLoggingSuppressionNeededOnHappyPathTest() {
// All requests are expected to succeed. No logs are expected.
// Setup test expectations
int readTimeoutsToIgnore = 0;
int expectedReadTimeoutLogs = 0;
int expectedGenericLogs = 0;
TestShardConsumerSubscriber consumer = new TestShardConsumerSubscriber(mock(RecordsPublisher.class),
Executors.newFixedThreadPool(1), 8, shardConsumer, readTimeoutsToIgnore);
consumer.startSubscriptions();
mimicSuccess(consumer);
mimicSuccess(consumer);
mimicSuccess(consumer);
mimicSuccess(consumer);
mimicSuccess(consumer);
assertEquals(expectedGenericLogs, consumer.genericWarningLogged);
assertEquals(expectedReadTimeoutLogs, consumer.readTimeoutWarningLogged);
}
/**
* Test to validate the warning message from ShardConsumer is not suppressed with the default configuration of 0
*
* @throws Exception
*/
@Test
public void loggingNotSuppressedAfterTimeoutTest() {
Exception exceptionToThrow = new software.amazon.kinesis.retrieval.RetryableRetrievalException("ReadTimeout");
// The first 2 requests succeed, followed by an exception, a success, and another exception.
// We are not ignoring any ReadTimeouts, so we expect 1 log on the first failure,
// and another log on the second failure.
// Setup test expectations
int readTimeoutsToIgnore = 0;
int expectedReadTimeoutLogs = 2;
int expectedGenericLogs = 0;
TestShardConsumerSubscriber consumer = new TestShardConsumerSubscriber(mock(RecordsPublisher.class),
Executors.newFixedThreadPool(1), 8, shardConsumer, readTimeoutsToIgnore);
consumer.startSubscriptions();
mimicSuccess(consumer);
mimicSuccess(consumer);
mimicException(exceptionToThrow, consumer);
mimicSuccess(consumer);
mimicException(exceptionToThrow, consumer);
assertEquals(expectedGenericLogs, consumer.genericWarningLogged);
assertEquals(expectedReadTimeoutLogs, consumer.readTimeoutWarningLogged);
}
/**
* Test to validate the warning message from ShardConsumer is successfully supressed if we only have intermittant
* readTimeouts.
*
* @throws Exception
*/
@Test
public void loggingSuppressedAfterIntermittentTimeoutTest() {
Exception exceptionToThrow = new software.amazon.kinesis.retrieval.RetryableRetrievalException("ReadTimeout");
// The first 2 requests succeed, followed by an exception, a success, and another exception.
// We are ignoring a single consecutive ReadTimeout, So we don't expect any logs to be made.
// Setup test expectations
int readTimeoutsToIgnore = 1;
int expectedReadTimeoutLogs = 0;
int expectedGenericLogs = 0;
TestShardConsumerSubscriber consumer = new TestShardConsumerSubscriber(mock(RecordsPublisher.class),
Executors.newFixedThreadPool(1), 8, shardConsumer, readTimeoutsToIgnore);
consumer.startSubscriptions();
mimicSuccess(consumer);
mimicSuccess(consumer);
mimicException(exceptionToThrow, consumer);
mimicSuccess(consumer);
mimicException(exceptionToThrow, consumer);
assertEquals(expectedGenericLogs, consumer.genericWarningLogged);
assertEquals(expectedReadTimeoutLogs, consumer.readTimeoutWarningLogged);
}
/**
* Test to validate the warning message from ShardConsumer is successfully logged if multiple sequential timeouts
* occur.
*
* @throws Exception
*/
@Test
public void loggingPartiallySuppressedAfterMultipleTimeoutTest() {
Exception exceptionToThrow = new software.amazon.kinesis.retrieval.RetryableRetrievalException("ReadTimeout");
// The first 2 requests are expected to throw an exception, followed by a success, and 2 more exceptions.
// We are ignoring a single consecutive ReadTimeout, so we expect a single log starting after the second
// consecutive ReadTimeout (Request 2) and another log after another second consecutive failure (Request 5)
// Setup test expectations
int readTimeoutsToIgnore = 1;
int expectedReadTimeoutLogs = 2;
int expectedGenericLogs = 0;
TestShardConsumerSubscriber consumer = new TestShardConsumerSubscriber(mock(RecordsPublisher.class),
Executors.newFixedThreadPool(1), 8, shardConsumer, readTimeoutsToIgnore);
consumer.startSubscriptions();
mimicException(exceptionToThrow, consumer);
mimicException(exceptionToThrow, consumer);
mimicSuccess(consumer);
mimicException(exceptionToThrow, consumer);
mimicException(exceptionToThrow, consumer);
assertEquals(expectedGenericLogs, consumer.genericWarningLogged);
assertEquals(expectedReadTimeoutLogs, consumer.readTimeoutWarningLogged);
}
/**
* Test to validate the warning message from ShardConsumer is successfully logged if sequential timeouts occur.
*
* @throws Exception
*/
@Test
public void loggingPartiallySuppressedAfterConsecutiveTimeoutTest() {
Exception exceptionToThrow = new software.amazon.kinesis.retrieval.RetryableRetrievalException("ReadTimeout");
// Every request of 5 requests are expected to fail.
// We are ignoring 2 consecutive ReadTimeout exceptions, so we expect a single log starting after the third
// consecutive ReadTimeout (Request 3) and another log after each other request since we are still breaching the
// number of consecutive ReadTimeouts to ignore.
// Setup test expectations
int readTimeoutsToIgnore = 2;
int expectedReadTimeoutLogs = 3;
int expectedGenericLogs = 0;
TestShardConsumerSubscriber consumer = new TestShardConsumerSubscriber(mock(RecordsPublisher.class),
Executors.newFixedThreadPool(1), 8, shardConsumer, readTimeoutsToIgnore);
consumer.startSubscriptions();
mimicException(exceptionToThrow, consumer);
mimicException(exceptionToThrow, consumer);
mimicException(exceptionToThrow, consumer);
mimicException(exceptionToThrow, consumer);
mimicException(exceptionToThrow, consumer);
assertEquals(expectedGenericLogs, consumer.genericWarningLogged);
assertEquals(expectedReadTimeoutLogs, consumer.readTimeoutWarningLogged);
}
/**
* Test to validate the non-timeout warning message from ShardConsumer is not suppressed with the default
* configuration of 0
*
* @throws Exception
*/
@Test
public void loggingNotSuppressedOnNonReadTimeoutExceptionNotIgnoringReadTimeoutsExceptionTest() {
// We're not throwing a ReadTimeout, so no suppression is expected.
// The test expects a non-ReadTimeout exception to be thrown on requests 3 and 5, and we expect logs on
// each Non-ReadTimeout Exception, no matter what the number of ReadTimeoutsToIgnore we pass in.
Exception exceptionToThrow = new RuntimeException("Uh oh Not a ReadTimeout");
// Setup test expectations
int readTimeoutsToIgnore = 0;
int expectedReadTimeoutLogs = 0;
int expectedGenericLogs = 2;
TestShardConsumerSubscriber consumer = new TestShardConsumerSubscriber(mock(RecordsPublisher.class),
Executors.newFixedThreadPool(1), 8, shardConsumer, readTimeoutsToIgnore);
consumer.startSubscriptions();
mimicSuccess(consumer);
mimicSuccess(consumer);
mimicException(exceptionToThrow, consumer);
mimicSuccess(consumer);
mimicException(exceptionToThrow, consumer);
assertEquals(expectedGenericLogs, consumer.genericWarningLogged);
assertEquals(expectedReadTimeoutLogs, consumer.readTimeoutWarningLogged);
}
/**
* Test to validate the non-timeout warning message from ShardConsumer is not suppressed with 2 ReadTimeouts to
* ignore
*
* @throws Exception
*/
@Test
public void loggingNotSuppressedOnNonReadTimeoutExceptionIgnoringReadTimeoutsTest() {
// We're not throwing a ReadTimeout, so no suppression is expected.
// The test expects a non-ReadTimeout exception to be thrown on requests 3 and 5, and we expect logs on
// each Non-ReadTimeout Exception, no matter what the number of ReadTimeoutsToIgnore we pass in,
// in this case if we had instead thrown ReadTimeouts, we would not have expected any logs with this
// configuration.
Exception exceptionToThrow = new RuntimeException("Uh oh Not a ReadTimeout");
// Setup test expectations
int readTimeoutsToIgnore = 2;
int expectedReadTimeoutLogs = 0;
int expectedGenericLogs = 2;
TestShardConsumerSubscriber consumer = new TestShardConsumerSubscriber(mock(RecordsPublisher.class),
Executors.newFixedThreadPool(1), 8, shardConsumer, readTimeoutsToIgnore);
consumer.startSubscriptions();
mimicSuccess(consumer);
mimicSuccess(consumer);
mimicException(exceptionToThrow, consumer);
mimicSuccess(consumer);
mimicException(exceptionToThrow, consumer);
assertEquals(expectedGenericLogs, consumer.genericWarningLogged);
assertEquals(expectedReadTimeoutLogs, consumer.readTimeoutWarningLogged);
}
private void mimicSuccess(TestShardConsumerSubscriber consumer) {
// Mimic a successful publishing request
consumer.onNext(recordsRetrieved);
}
private void mimicException(Exception exceptionToThrow, TestShardConsumerSubscriber consumer) {
// Mimic throwing an exception during publishing,
consumer.onError(exceptionToThrow);
// restart subscriptions to allow further requests to be mimiced
consumer.startSubscriptions();
}
}

View file

@ -139,16 +139,15 @@ public class ShardConsumerTest {
processRecordsInput = ProcessRecordsInput.builder().isAtShardEnd(false).cacheEntryTime(Instant.now())
.millisBehindLatest(1000L).records(Collections.emptyList()).build();
initialTaskInput = TaskExecutionListenerInput.builder().shardInfo(shardInfo)
.taskType(TaskType.INITIALIZE).build();
processTaskInput = TaskExecutionListenerInput.builder().shardInfo(shardInfo)
.taskType(TaskType.PROCESS).build();
initialTaskInput = TaskExecutionListenerInput.builder().shardInfo(shardInfo).taskType(TaskType.INITIALIZE)
.build();
processTaskInput = TaskExecutionListenerInput.builder().shardInfo(shardInfo).taskType(TaskType.PROCESS).build();
shutdownRequestedTaskInput = TaskExecutionListenerInput.builder().shardInfo(shardInfo)
.taskType(TaskType.SHUTDOWN_NOTIFICATION).build();
shutdownRequestedAwaitTaskInput = TaskExecutionListenerInput.builder().shardInfo(shardInfo)
.taskType(TaskType.SHUTDOWN_COMPLETE).build();
shutdownTaskInput = TaskExecutionListenerInput.builder().shardInfo(shardInfo)
.taskType(TaskType.SHUTDOWN).build();
shutdownTaskInput = TaskExecutionListenerInput.builder().shardInfo(shardInfo).taskType(TaskType.SHUTDOWN)
.build();
}
@After
@ -245,7 +244,7 @@ public class ShardConsumerTest {
TestPublisher cache = new TestPublisher();
ShardConsumer consumer = new ShardConsumer(cache, executorService, shardInfo, logWarningForTaskAfterMillis,
shardConsumerArgument, initialState, Function.identity(), 1, taskExecutionListener);
shardConsumerArgument, initialState, Function.identity(), 1, taskExecutionListener, 0);
boolean initComplete = false;
while (!initComplete) {
@ -299,7 +298,7 @@ public class ShardConsumerTest {
TestPublisher cache = new TestPublisher();
ShardConsumer consumer = new ShardConsumer(cache, executorService, shardInfo, logWarningForTaskAfterMillis,
shardConsumerArgument, initialState, Function.identity(), 1, taskExecutionListener);
shardConsumerArgument, initialState, Function.identity(), 1, taskExecutionListener, 0);
boolean initComplete = false;
while (!initComplete) {
@ -323,7 +322,7 @@ public class ShardConsumerTest {
log.debug("Release processing task interlock");
awaitAndResetBarrier(processingTaskInterlock);
while(!consumer.isShutdown()) {
while (!consumer.isShutdown()) {
consumer.executeLifecycle();
Thread.yield();
}
@ -358,7 +357,7 @@ public class ShardConsumerTest {
TestPublisher cache = new TestPublisher();
ShardConsumer consumer = new ShardConsumer(cache, executorService, shardInfo, logWarningForTaskAfterMillis,
shardConsumerArgument, initialState, Function.identity(), 1, taskExecutionListener);
shardConsumerArgument, initialState, Function.identity(), 1, taskExecutionListener, 0);
boolean initComplete = false;
while (!initComplete) {
@ -417,7 +416,8 @@ public class ShardConsumerTest {
@Ignore
public final void testInitializationStateUponFailure() throws Exception {
ShardConsumer consumer = new ShardConsumer(recordsPublisher, executorService, shardInfo,
logWarningForTaskAfterMillis, shardConsumerArgument, initialState, Function.identity(), 1, taskExecutionListener);
logWarningForTaskAfterMillis, shardConsumerArgument, initialState, Function.identity(), 1,
taskExecutionListener, 0);
when(initialState.createTask(eq(shardConsumerArgument), eq(consumer), any())).thenReturn(initializeTask);
when(initializeTask.call()).thenReturn(new TaskResult(new Exception("Bad")));
@ -450,7 +450,7 @@ public class ShardConsumerTest {
ExecutorService failingService = mock(ExecutorService.class);
ShardConsumer consumer = new ShardConsumer(recordsPublisher, failingService, shardInfo,
logWarningForTaskAfterMillis, shardConsumerArgument, initialState, t -> t, 1, taskExecutionListener);
logWarningForTaskAfterMillis, shardConsumerArgument, initialState, t -> t, 1, taskExecutionListener, 0);
doThrow(new RejectedExecutionException()).when(failingService).execute(any());
@ -464,7 +464,7 @@ public class ShardConsumerTest {
@Test
public void testErrorThrowableInInitialization() throws Exception {
ShardConsumer consumer = new ShardConsumer(recordsPublisher, executorService, shardInfo,
logWarningForTaskAfterMillis, shardConsumerArgument, initialState, t -> t, 1, taskExecutionListener);
logWarningForTaskAfterMillis, shardConsumerArgument, initialState, t -> t, 1, taskExecutionListener, 0);
when(initialState.createTask(any(), any(), any())).thenReturn(initializeTask);
when(initialState.taskType()).thenReturn(TaskType.INITIALIZE);
@ -488,7 +488,7 @@ public class ShardConsumerTest {
TestPublisher cache = new TestPublisher();
ShardConsumer consumer = new ShardConsumer(cache, executorService, shardInfo, logWarningForTaskAfterMillis,
shardConsumerArgument, initialState, t -> t, 1, taskExecutionListener);
shardConsumerArgument, initialState, t -> t, 1, taskExecutionListener, 0);
mockSuccessfulInitialize(null);
@ -571,7 +571,7 @@ public class ShardConsumerTest {
TestPublisher cache = new TestPublisher();
ShardConsumer consumer = new ShardConsumer(cache, executorService, shardInfo, Optional.of(1L),
shardConsumerArgument, initialState, Function.identity(), 1, taskExecutionListener);
shardConsumerArgument, initialState, Function.identity(), 1, taskExecutionListener, 0);
mockSuccessfulInitialize(null);
mockSuccessfulProcessing(null);
@ -618,7 +618,7 @@ public class ShardConsumerTest {
TestPublisher cache = new TestPublisher();
ShardConsumer consumer = new ShardConsumer(cache, executorService, shardInfo, Optional.of(1L),
shardConsumerArgument, initialState, Function.identity(), 1, taskExecutionListener);
shardConsumerArgument, initialState, Function.identity(), 1, taskExecutionListener, 0);
CyclicBarrier taskArriveBarrier = new CyclicBarrier(2);
CyclicBarrier taskDepartBarrier = new CyclicBarrier(2);