diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index a37daa42..fae780f5 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -234,6 +234,9 @@ public class KinesisClientLibConfiguration { @Getter private RecordsFetcherFactory recordsFetcherFactory; + + @Getter + private Optional logWarningForTaskAfterMillis = Optional.empty(); /** * Constructor. @@ -1355,4 +1358,15 @@ public class KinesisClientLibConfiguration { this.recordsFetcherFactory.setIdleMillisBetweenCalls(idleMillisBetweenCalls); return this; } + + /** + * @param logWarningForTaskAfterMillis Logs warn message if as task is held in a task for more than the set + * time. + * @return KinesisClientLibConfiguration + */ + public KinesisClientLibConfiguration withLogWarningForTaskAfterMillis(long logWarningForTaskAfterMillis) { + checkIsValuePositive("LogProcessTaskStatusAfterInMillis", logWarningForTaskAfterMillis); + this.logWarningForTaskAfterMillis = Optional.of(logWarningForTaskAfterMillis); + return this; + } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java index d7a5545e..14a1d08c 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java @@ -283,11 +283,17 @@ class ShardConsumer { } } } else { + final long timeElapsed = System.currentTimeMillis() - currentTaskSubmitTime; + final String commonMessage = String.format("Previous %s task still pending for shard %s since %d ms ago. ", + currentTask.getTaskType(), shardInfo.getShardId(), timeElapsed); if (LOG.isDebugEnabled()) { - LOG.debug("Previous " + currentTask.getTaskType() + " task still pending for shard " - + shardInfo.getShardId() + " since " + (System.currentTimeMillis() - currentTaskSubmitTime) - + " ms ago" + ". Not submitting new task."); + LOG.debug(commonMessage + "Not submitting new task."); } + config.getLogWarningForTaskAfterMillis().ifPresent(value -> { + if (timeElapsed > value) { + LOG.warn(commonMessage); + } + }); } return submittedNewTask; diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java index efb9d43c..9a7f2234 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java @@ -27,6 +27,7 @@ import static org.mockito.Matchers.argThat; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -43,6 +44,7 @@ import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; @@ -119,6 +121,7 @@ public class ShardConsumerTest { recordsFetcherFactory = spy(new SimpleRecordsFetcherFactory(maxRecords)); when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory); + when(config.getLogWarningForTaskAfterMillis()).thenReturn(Optional.empty()); } /** @@ -645,6 +648,52 @@ public class ShardConsumerTest { assertEquals(shardConsumer.getGetRecordsCache().getGetRecordsRetrievalStrategy().getClass(), AsynchronousGetRecordsRetrievalStrategy.class); } + + @SuppressWarnings("unchecked") + @Test + public void testLongRunningTasks() throws InterruptedException { + final long sleepTime = 1000L; + ExecutorService mockExecutorService = mock(ExecutorService.class); + Future mockFuture = mock(Future.class); + + when(mockExecutorService.submit(any(ITask.class))).thenReturn(mockFuture); + when(mockFuture.isDone()).thenReturn(false); + when(mockFuture.isCancelled()).thenReturn(false); + when(config.getLogWarningForTaskAfterMillis()).thenReturn(Optional.of(sleepTime)); + + ShardInfo shardInfo = new ShardInfo("s-0-0", "testToken", null, ExtendedSequenceNumber.LATEST); + StreamConfig streamConfig = new StreamConfig( + streamProxy, + 1, + 10, + callProcessRecordsForEmptyRecordList, + skipCheckpointValidationValue, + INITIAL_POSITION_LATEST); + + ShardConsumer shardConsumer = new ShardConsumer( + shardInfo, + streamConfig, + checkpoint, + processor, + null, + parentShardPollIntervalMillis, + cleanupLeasesOfCompletedShards, + mockExecutorService, + metricsFactory, + taskBackoffTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, + config); + + shardConsumer.consumeShard(); + + Thread.sleep(sleepTime); + + shardConsumer.consumeShard(); + + verify(config).getLogWarningForTaskAfterMillis(); + verify(mockFuture).isDone(); + verify(mockFuture).isCancelled(); + } //@formatter:off (gets the formatting wrong) private void verifyConsumedRecords(List expectedRecords,