Adding logging for long running tasks. (#259)

Allow enabling logging for long running tasks.
This commit is contained in:
Sahil Palvia 2017-10-25 15:38:45 -07:00 committed by Justin Pfifer
parent 7032ea67ec
commit 3de901ea93
3 changed files with 72 additions and 3 deletions

View file

@ -234,6 +234,9 @@ public class KinesisClientLibConfiguration {
@Getter
private RecordsFetcherFactory recordsFetcherFactory;
@Getter
private Optional<Long> logWarningForTaskAfterMillis = Optional.empty();
/**
* Constructor.
@ -1355,4 +1358,15 @@ public class KinesisClientLibConfiguration {
this.recordsFetcherFactory.setIdleMillisBetweenCalls(idleMillisBetweenCalls);
return this;
}
/**
* @param logWarningForTaskAfterMillis Logs warn message if as task is held in a task for more than the set
* time.
* @return KinesisClientLibConfiguration
*/
public KinesisClientLibConfiguration withLogWarningForTaskAfterMillis(long logWarningForTaskAfterMillis) {
checkIsValuePositive("LogProcessTaskStatusAfterInMillis", logWarningForTaskAfterMillis);
this.logWarningForTaskAfterMillis = Optional.of(logWarningForTaskAfterMillis);
return this;
}
}

View file

@ -283,11 +283,17 @@ class ShardConsumer {
}
}
} else {
final long timeElapsed = System.currentTimeMillis() - currentTaskSubmitTime;
final String commonMessage = String.format("Previous %s task still pending for shard %s since %d ms ago. ",
currentTask.getTaskType(), shardInfo.getShardId(), timeElapsed);
if (LOG.isDebugEnabled()) {
LOG.debug("Previous " + currentTask.getTaskType() + " task still pending for shard "
+ shardInfo.getShardId() + " since " + (System.currentTimeMillis() - currentTaskSubmitTime)
+ " ms ago" + ". Not submitting new task.");
LOG.debug(commonMessage + "Not submitting new task.");
}
config.getLogWarningForTaskAfterMillis().ifPresent(value -> {
if (timeElapsed > value) {
LOG.warn(commonMessage);
}
});
}
return submittedNewTask;

View file

@ -27,6 +27,7 @@ import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ -43,6 +44,7 @@ import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
@ -119,6 +121,7 @@ public class ShardConsumerTest {
recordsFetcherFactory = spy(new SimpleRecordsFetcherFactory(maxRecords));
when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory);
when(config.getLogWarningForTaskAfterMillis()).thenReturn(Optional.empty());
}
/**
@ -645,6 +648,52 @@ public class ShardConsumerTest {
assertEquals(shardConsumer.getGetRecordsCache().getGetRecordsRetrievalStrategy().getClass(),
AsynchronousGetRecordsRetrievalStrategy.class);
}
@SuppressWarnings("unchecked")
@Test
public void testLongRunningTasks() throws InterruptedException {
final long sleepTime = 1000L;
ExecutorService mockExecutorService = mock(ExecutorService.class);
Future<TaskResult> mockFuture = mock(Future.class);
when(mockExecutorService.submit(any(ITask.class))).thenReturn(mockFuture);
when(mockFuture.isDone()).thenReturn(false);
when(mockFuture.isCancelled()).thenReturn(false);
when(config.getLogWarningForTaskAfterMillis()).thenReturn(Optional.of(sleepTime));
ShardInfo shardInfo = new ShardInfo("s-0-0", "testToken", null, ExtendedSequenceNumber.LATEST);
StreamConfig streamConfig = new StreamConfig(
streamProxy,
1,
10,
callProcessRecordsForEmptyRecordList,
skipCheckpointValidationValue,
INITIAL_POSITION_LATEST);
ShardConsumer shardConsumer = new ShardConsumer(
shardInfo,
streamConfig,
checkpoint,
processor,
null,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
mockExecutorService,
metricsFactory,
taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
config);
shardConsumer.consumeShard();
Thread.sleep(sleepTime);
shardConsumer.consumeShard();
verify(config).getLogWarningForTaskAfterMillis();
verify(mockFuture).isDone();
verify(mockFuture).isCancelled();
}
//@formatter:off (gets the formatting wrong)
private void verifyConsumedRecords(List<Record> expectedRecords,