diff --git a/META-INF/MANIFEST.MF b/META-INF/MANIFEST.MF
index 17ac0954..dea3446e 100644
--- a/META-INF/MANIFEST.MF
+++ b/META-INF/MANIFEST.MF
@@ -2,7 +2,7 @@ Manifest-Version: 1.0
Bundle-ManifestVersion: 2
Bundle-Name: Amazon Kinesis Client Library for Java
Bundle-SymbolicName: com.amazonaws.kinesisclientlibrary;singleton:=true
-Bundle-Version: 1.7.3
+Bundle-Version: 1.7.4
Bundle-Vendor: Amazon Technologies, Inc
Bundle-RequiredExecutionEnvironment: JavaSE-1.7
Require-Bundle: org.apache.commons.codec;bundle-version="1.6",
diff --git a/pom.xml b/pom.xml
index 368ed1e7..750f5a92 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
amazon-kinesis-client
jar
Amazon Kinesis Client Library for Java
- 1.7.3
+ 1.7.4-SNAPSHOT
The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data
from Amazon Kinesis.
@@ -25,7 +25,7 @@
- 1.11.76
+ 1.11.91
1.0.392
libsqlite4java
${project.build.directory}/test-lib
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java
index c8678974..2d92d7d7 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java
@@ -355,7 +355,7 @@ class ConsumerStates {
@Override
public ITask createTask(ShardConsumer consumer) {
return new ShutdownNotificationTask(consumer.getRecordProcessor(), consumer.getRecordProcessorCheckpointer(),
- consumer.getShutdownNotification());
+ consumer.getShutdownNotification(), consumer.getShardInfo());
}
@Override
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java
index 7f1593df..b8218968 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java
@@ -120,7 +120,7 @@ public class KinesisClientLibConfiguration {
/**
* User agent set when Amazon Kinesis Client Library makes AWS requests.
*/
- public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.7.3";
+ public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.7.4";
/**
* KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java
index 6ee34880..c419c693 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java
@@ -34,6 +34,7 @@ import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope;
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.Shard;
@@ -49,6 +50,7 @@ class ProcessTask implements ITask {
private static final String RECORDS_PROCESSED_METRIC = "RecordsProcessed";
private static final String MILLIS_BEHIND_LATEST_METRIC = "MillisBehindLatest";
private static final String RECORD_PROCESSOR_PROCESS_RECORDS_METRIC = "RecordProcessor.processRecords";
+ private static final int MAX_CONSECUTIVE_THROTTLES = 5;
private final ShardInfo shardInfo;
private final IRecordProcessor recordProcessor;
@@ -58,23 +60,50 @@ class ProcessTask implements ITask {
private final StreamConfig streamConfig;
private final long backoffTimeMillis;
private final Shard shard;
+ private final ThrottlingReporter throttlingReporter;
/**
- * @param shardInfo contains information about the shard
- * @param streamConfig Stream configuration
- * @param recordProcessor Record processor used to process the data records for the shard
- * @param recordProcessorCheckpointer Passed to the RecordProcessor so it can checkpoint
- * progress
- * @param dataFetcher Kinesis data fetcher (used to fetch records from Kinesis)
- * @param backoffTimeMillis backoff time when catching exceptions
+ * @param shardInfo
+ * contains information about the shard
+ * @param streamConfig
+ * Stream configuration
+ * @param recordProcessor
+ * Record processor used to process the data records for the shard
+ * @param recordProcessorCheckpointer
+ * Passed to the RecordProcessor so it can checkpoint progress
+ * @param dataFetcher
+ * Kinesis data fetcher (used to fetch records from Kinesis)
+ * @param backoffTimeMillis
+ * backoff time when catching exceptions
*/
- public ProcessTask(ShardInfo shardInfo,
- StreamConfig streamConfig,
- IRecordProcessor recordProcessor,
- RecordProcessorCheckpointer recordProcessorCheckpointer,
- KinesisDataFetcher dataFetcher,
- long backoffTimeMillis,
- boolean skipShardSyncAtWorkerInitializationIfLeasesExist) {
+ public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor,
+ RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher,
+ long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist) {
+ this(shardInfo, streamConfig, recordProcessor, recordProcessorCheckpointer, dataFetcher, backoffTimeMillis,
+ skipShardSyncAtWorkerInitializationIfLeasesExist,
+ new ThrottlingReporter(MAX_CONSECUTIVE_THROTTLES, shardInfo.getShardId()));
+ }
+
+ /**
+ * @param shardInfo
+ * contains information about the shard
+ * @param streamConfig
+ * Stream configuration
+ * @param recordProcessor
+ * Record processor used to process the data records for the shard
+ * @param recordProcessorCheckpointer
+ * Passed to the RecordProcessor so it can checkpoint progress
+ * @param dataFetcher
+ * Kinesis data fetcher (used to fetch records from Kinesis)
+ * @param backoffTimeMillis
+ * backoff time when catching exceptions
+ * @param throttlingReporter
+ * determines how throttling events should be reported in the log.
+ */
+ public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor,
+ RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher,
+ long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
+ ThrottlingReporter throttlingReporter) {
super();
this.shardInfo = shardInfo;
this.recordProcessor = recordProcessor;
@@ -82,6 +111,7 @@ class ProcessTask implements ITask {
this.dataFetcher = dataFetcher;
this.streamConfig = streamConfig;
this.backoffTimeMillis = backoffTimeMillis;
+ this.throttlingReporter = throttlingReporter;
IKinesisProxy kinesisProxy = this.streamConfig.getStreamProxy();
// If skipShardSyncAtWorkerInitializationIfLeasesExist is set, we will not get the shard for
// this ProcessTask. In this case, duplicate KPL user records in the event of resharding will
@@ -104,9 +134,6 @@ class ProcessTask implements ITask {
*
* @see com.amazonaws.services.kinesis.clientlibrary.lib.worker.ITask#call()
*/
-
- // CHECKSTYLE:OFF CyclomaticComplexity
- @SuppressWarnings("unchecked")
@Override
public TaskResult call() {
long startTimeMillis = System.currentTimeMillis();
@@ -120,85 +147,138 @@ class ProcessTask implements ITask {
try {
if (dataFetcher.isShardEndReached()) {
LOG.info("Reached end of shard " + shardInfo.getShardId());
- boolean shardEndReached = true;
- return new TaskResult(null, shardEndReached);
+ return new TaskResult(null, true);
}
final GetRecordsResult getRecordsResult = getRecordsResult();
+ throttlingReporter.success();
List records = getRecordsResult.getRecords();
if (!records.isEmpty()) {
scope.addData(RECORDS_PROCESSED_METRIC, records.size(), StandardUnit.Count, MetricsLevel.SUMMARY);
} else {
- LOG.debug("Kinesis didn't return any records for shard " + shardInfo.getShardId());
+ handleNoRecords(startTimeMillis);
+ }
+ records = deaggregateRecords(records);
- long sleepTimeMillis =
- streamConfig.getIdleTimeInMilliseconds() - (System.currentTimeMillis() - startTimeMillis);
- if (sleepTimeMillis > 0) {
- sleepTimeMillis = Math.max(sleepTimeMillis, streamConfig.getIdleTimeInMilliseconds());
- try {
- LOG.debug("Sleeping for " + sleepTimeMillis + " ms since there were no new records in shard "
- + shardInfo.getShardId());
- Thread.sleep(sleepTimeMillis);
- } catch (InterruptedException e) {
- LOG.debug("ShardId " + shardInfo.getShardId() + ": Sleep was interrupted");
- }
- }
- }
-
- // We deaggregate if and only if we got actual Kinesis records, i.e.
- // not instances of some subclass thereof.
- if (!records.isEmpty() && records.get(0).getClass().equals(Record.class)) {
- if (this.shard != null) {
- records = (List) (List>) UserRecord.deaggregate(records,
- new BigInteger(this.shard.getHashKeyRange().getStartingHashKey()),
- new BigInteger(this.shard.getHashKeyRange().getEndingHashKey()));
- } else {
- records = (List) (List>) UserRecord.deaggregate(records);
- }
- }
-
recordProcessorCheckpointer.setLargestPermittedCheckpointValue(
filterAndGetMaxExtendedSequenceNumber(scope, records,
recordProcessorCheckpointer.getLastCheckpointValue(),
recordProcessorCheckpointer.getLargestPermittedCheckpointValue()));
- if ((!records.isEmpty()) || streamConfig.shouldCallProcessRecordsEvenForEmptyRecordList()) {
- LOG.debug("Calling application processRecords() with " + records.size()
- + " records from " + shardInfo.getShardId());
- final ProcessRecordsInput processRecordsInput = new ProcessRecordsInput()
- .withRecords(records)
- .withCheckpointer(recordProcessorCheckpointer)
- .withMillisBehindLatest(getRecordsResult.getMillisBehindLatest());
-
- final long recordProcessorStartTimeMillis = System.currentTimeMillis();
- try {
- recordProcessor.processRecords(processRecordsInput);
- } catch (Exception e) {
- LOG.error("ShardId " + shardInfo.getShardId()
- + ": Application processRecords() threw an exception when processing shard ", e);
- LOG.error("ShardId " + shardInfo.getShardId() + ": Skipping over the following data records: "
- + records);
- } finally {
- MetricsHelper.addLatencyPerShard(shardInfo.getShardId(), RECORD_PROCESSOR_PROCESS_RECORDS_METRIC,
- recordProcessorStartTimeMillis, MetricsLevel.SUMMARY);
- }
+ if (shouldCallProcessRecords(records)) {
+ callProcessRecords(getRecordsResult, records);
}
+ } catch (ProvisionedThroughputExceededException pte) {
+ throttlingReporter.throttled();
+ exception = pte;
+ backoff();
+
} catch (RuntimeException e) {
LOG.error("ShardId " + shardInfo.getShardId() + ": Caught exception: ", e);
exception = e;
-
- // backoff if we encounter an exception.
- try {
- Thread.sleep(this.backoffTimeMillis);
- } catch (InterruptedException ie) {
- LOG.debug(shardInfo.getShardId() + ": Sleep was interrupted", ie);
- }
+ backoff();
}
return new TaskResult(exception);
}
- // CHECKSTYLE:ON CyclomaticComplexity
+
+ /**
+ * Sleeps for the configured backoff period. This is usually only called when an exception occurs.
+ */
+ private void backoff() {
+ // backoff if we encounter an exception.
+ try {
+ Thread.sleep(this.backoffTimeMillis);
+ } catch (InterruptedException ie) {
+ LOG.debug(shardInfo.getShardId() + ": Sleep was interrupted", ie);
+ }
+ }
+
+ /**
+ * Dispatches a batch of records to the record processor, and handles any fallout from that.
+ *
+ * @param getRecordsResult
+ * the result of the last call to Kinesis
+ * @param records
+ * the records to be dispatched. It's possible the records have been adjusted by KPL deaggregation.
+ */
+ private void callProcessRecords(GetRecordsResult getRecordsResult, List records) {
+ LOG.debug("Calling application processRecords() with " + records.size() + " records from "
+ + shardInfo.getShardId());
+ final ProcessRecordsInput processRecordsInput = new ProcessRecordsInput().withRecords(records)
+ .withCheckpointer(recordProcessorCheckpointer)
+ .withMillisBehindLatest(getRecordsResult.getMillisBehindLatest());
+
+ final long recordProcessorStartTimeMillis = System.currentTimeMillis();
+ try {
+ recordProcessor.processRecords(processRecordsInput);
+ } catch (Exception e) {
+ LOG.error("ShardId " + shardInfo.getShardId()
+ + ": Application processRecords() threw an exception when processing shard ", e);
+ LOG.error("ShardId " + shardInfo.getShardId() + ": Skipping over the following data records: " + records);
+ } finally {
+ MetricsHelper.addLatencyPerShard(shardInfo.getShardId(), RECORD_PROCESSOR_PROCESS_RECORDS_METRIC,
+ recordProcessorStartTimeMillis, MetricsLevel.SUMMARY);
+ }
+ }
+
+ /**
+ * Whether we should call process records or not
+ *
+ * @param records
+ * the records returned from the call to Kinesis, and/or deaggregation
+ * @return true if the set of records should be dispatched to the record process, false if they should not.
+ */
+ private boolean shouldCallProcessRecords(List records) {
+ return (!records.isEmpty()) || streamConfig.shouldCallProcessRecordsEvenForEmptyRecordList();
+ }
+
+ /**
+ * Determines whether to deaggregate the given records, and if they are KPL records dispatches them to deaggregation
+ *
+ * @param records
+ * the records to deaggregate is deaggregation is required.
+ * @return returns either the deaggregated records, or the original records
+ */
+ @SuppressWarnings("unchecked")
+ private List deaggregateRecords(List records) {
+ // We deaggregate if and only if we got actual Kinesis records, i.e.
+ // not instances of some subclass thereof.
+ if (!records.isEmpty() && records.get(0).getClass().equals(Record.class)) {
+ if (this.shard != null) {
+ return (List) (List>) UserRecord.deaggregate(records,
+ new BigInteger(this.shard.getHashKeyRange().getStartingHashKey()),
+ new BigInteger(this.shard.getHashKeyRange().getEndingHashKey()));
+ } else {
+ return (List) (List>) UserRecord.deaggregate(records);
+ }
+ }
+ return records;
+ }
+
+ /**
+ * Emits metrics, and sleeps if there are no records available
+ *
+ * @param startTimeMillis
+ * the time when the task started
+ */
+ private void handleNoRecords(long startTimeMillis) {
+ LOG.debug("Kinesis didn't return any records for shard " + shardInfo.getShardId());
+
+ long sleepTimeMillis = streamConfig.getIdleTimeInMilliseconds()
+ - (System.currentTimeMillis() - startTimeMillis);
+ if (sleepTimeMillis > 0) {
+ sleepTimeMillis = Math.max(sleepTimeMillis, streamConfig.getIdleTimeInMilliseconds());
+ try {
+ LOG.debug("Sleeping for " + sleepTimeMillis + " ms since there were no new records in shard "
+ + shardInfo.getShardId());
+ Thread.sleep(sleepTimeMillis);
+ } catch (InterruptedException e) {
+ LOG.debug("ShardId " + shardInfo.getShardId() + ": Sleep was interrupted");
+ }
+ }
+ }
@Override
public TaskType getTaskType() {
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownNotificationTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownNotificationTask.java
index 13711f23..a689ee43 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownNotificationTask.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownNotificationTask.java
@@ -12,11 +12,13 @@ class ShutdownNotificationTask implements ITask {
private final IRecordProcessor recordProcessor;
private final IRecordProcessorCheckpointer recordProcessorCheckpointer;
private final ShutdownNotification shutdownNotification;
+ private final ShardInfo shardInfo;
- ShutdownNotificationTask(IRecordProcessor recordProcessor, IRecordProcessorCheckpointer recordProcessorCheckpointer, ShutdownNotification shutdownNotification) {
+ ShutdownNotificationTask(IRecordProcessor recordProcessor, IRecordProcessorCheckpointer recordProcessorCheckpointer, ShutdownNotification shutdownNotification, ShardInfo shardInfo) {
this.recordProcessor = recordProcessor;
this.recordProcessorCheckpointer = recordProcessorCheckpointer;
this.shutdownNotification = shutdownNotification;
+ this.shardInfo = shardInfo;
}
@Override
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ThrottlingReporter.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ThrottlingReporter.java
new file mode 100644
index 00000000..f88f131f
--- /dev/null
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ThrottlingReporter.java
@@ -0,0 +1,38 @@
+package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.apachecommons.CommonsLog;
+import org.apache.commons.logging.Log;
+
+@RequiredArgsConstructor
+@CommonsLog
+class ThrottlingReporter {
+
+ private final int maxConsecutiveWarnThrottles;
+ private final String shardId;
+
+ private int consecutiveThrottles = 0;
+
+ void throttled() {
+ consecutiveThrottles++;
+ String message = "Shard '" + shardId + "' has been throttled "
+ + consecutiveThrottles + " consecutively";
+
+ if (consecutiveThrottles > maxConsecutiveWarnThrottles) {
+ getLog().error(message);
+ } else {
+ getLog().warn(message);
+ }
+
+ }
+
+ void success() {
+ consecutiveThrottles = 0;
+ }
+
+ protected Log getLog() {
+ return log;
+ }
+
+}
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java
index 2a1e5484..7ebfc3f3 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java
@@ -538,15 +538,18 @@ public class Worker implements Runnable {
*/
public Future requestShutdown() {
+ //
+ // Stop accepting new leases. Once we do this we can be sure that
+ // no more leases will be acquired.
+ //
leaseCoordinator.stopLeaseTaker();
- //
- // Stop accepting new leases
- //
+
Collection leases = leaseCoordinator.getAssignments();
if (leases == null || leases.isEmpty()) {
//
- // If there are no leases shutdown is already completed.
+ // If there are no leases notification is already completed, but we still need to shutdown the worker.
//
+ this.shutdown();
return Futures.immediateFuture(null);
}
CountDownLatch shutdownCompleteLatch = new CountDownLatch(leases.size());
@@ -555,7 +558,18 @@ public class Worker implements Runnable {
ShutdownNotification shutdownNotification = new ShardConsumerShutdownNotification(leaseCoordinator, lease,
notificationCompleteLatch, shutdownCompleteLatch);
ShardInfo shardInfo = KinesisClientLibLeaseCoordinator.convertLeaseToAssignment(lease);
- shardInfoShardConsumerMap.get(shardInfo).notifyShutdownRequested(shutdownNotification);
+ ShardConsumer consumer = shardInfoShardConsumerMap.get(shardInfo);
+ if (consumer != null) {
+ consumer.notifyShutdownRequested(shutdownNotification);
+ } else {
+ //
+ // There is a race condition between retrieving the current assignments, and creating the
+ // notification. If the a lease is lost in between these two points, we explicitly decrement the
+ // notification latches to clear the shutdown.
+ //
+ notificationCompleteLatch.countDown();
+ shutdownCompleteLatch.countDown();
+ }
}
return new ShutdownFuture(shutdownCompleteLatch, notificationCompleteLatch, this);
@@ -622,9 +636,11 @@ public class Worker implements Runnable {
/**
* Returns whether worker can shutdown immediately. Note that this method is called from Worker's {{@link #run()}
* method before every loop run, so method must do minimum amount of work to not impact shard processing timings.
+ *
* @return Whether worker should shutdown immediately.
*/
- private boolean shouldShutdown() {
+ @VisibleForTesting
+ boolean shouldShutdown() {
if (executorService.isShutdown()) {
LOG.error("Worker executor service has been shutdown, so record processors cannot be shutdown.");
return true;
diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java
index 4d32566e..e95aef50 100644
--- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java
+++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java
@@ -19,8 +19,10 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -72,6 +74,8 @@ public class ProcessTaskTest {
private @Mock KinesisDataFetcher mockDataFetcher;
private @Mock IRecordProcessor mockRecordProcessor;
private @Mock RecordProcessorCheckpointer mockCheckpointer;
+ @Mock
+ private ThrottlingReporter throttlingReporter;
private List processedRecords;
private ExtendedSequenceNumber newLargestPermittedCheckpointValue;
@@ -90,7 +94,7 @@ public class ProcessTaskTest {
final ShardInfo shardInfo = new ShardInfo(shardId, null, null, null);
processTask = new ProcessTask(
shardInfo, config, mockRecordProcessor, mockCheckpointer, mockDataFetcher, taskBackoffTimeMillis,
- KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST);
+ KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, throttlingReporter);
}
@Test
@@ -101,6 +105,8 @@ public class ProcessTaskTest {
.getRecords(maxRecords);
TaskResult result = processTask.call();
+ verify(throttlingReporter).throttled();
+ verify(throttlingReporter, never()).success();
assertTrue("Result should contain ProvisionedThroughputExceededException",
result.getException() instanceof ProvisionedThroughputExceededException);
}
@@ -299,6 +305,8 @@ public class ProcessTaskTest {
when(mockCheckpointer.getLastCheckpointValue()).thenReturn(lastCheckpointValue);
when(mockCheckpointer.getLargestPermittedCheckpointValue()).thenReturn(largestPermittedCheckpointValue);
processTask.call();
+ verify(throttlingReporter).success();
+ verify(throttlingReporter, never()).throttled();
ArgumentCaptor priCaptor = ArgumentCaptor.forClass(ProcessRecordsInput.class);
verify(mockRecordProcessor).processRecords(priCaptor.capture());
diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ThrottlingReporterTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ThrottlingReporterTest.java
new file mode 100644
index 00000000..d0645229
--- /dev/null
+++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ThrottlingReporterTest.java
@@ -0,0 +1,66 @@
+package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import org.apache.commons.logging.Log;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ThrottlingReporterTest {
+
+ private static final String SHARD_ID = "Shard-001";
+
+ @Mock
+ private Log throttleLog;
+
+ @Test
+ public void testLessThanMaxThrottles() {
+ ThrottlingReporter reporter = new LogTestingThrottingReporter(5, SHARD_ID);
+ reporter.throttled();
+ verify(throttleLog).warn(any(Object.class));
+ verify(throttleLog, never()).error(any(Object.class));
+
+ }
+
+ @Test
+ public void testMoreThanMaxThrottles() {
+ ThrottlingReporter reporter = new LogTestingThrottingReporter(1, SHARD_ID);
+ reporter.throttled();
+ reporter.throttled();
+ verify(throttleLog).warn(any(Object.class));
+ verify(throttleLog).error(any(Object.class));
+ }
+
+ @Test
+ public void testSuccessResetsErrors() {
+ ThrottlingReporter reporter = new LogTestingThrottingReporter(1, SHARD_ID);
+ reporter.throttled();
+ reporter.throttled();
+ reporter.throttled();
+ reporter.throttled();
+ reporter.success();
+ reporter.throttled();
+ verify(throttleLog, times(2)).warn(any(Object.class));
+ verify(throttleLog, times(3)).error(any(Object.class));
+
+ }
+
+ private class LogTestingThrottingReporter extends ThrottlingReporter {
+
+ public LogTestingThrottingReporter(int maxConsecutiveWarnThrottles, String shardId) {
+ super(maxConsecutiveWarnThrottles, shardId);
+ }
+
+ @Override
+ protected Log getLog() {
+ return throttleLog;
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java
index baafa447..daf58165 100644
--- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java
+++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java
@@ -19,6 +19,7 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.isA;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
@@ -27,6 +28,7 @@ import static org.mockito.Mockito.*;
import java.io.File;
import java.lang.Thread.State;
+import java.lang.reflect.Field;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collections;
@@ -47,6 +49,7 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
@@ -96,6 +99,8 @@ import com.amazonaws.services.kinesis.model.Shard;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.RequiredArgsConstructor;
+
/**
* Unit tests of Worker.
*/
@@ -776,6 +781,276 @@ public class WorkerTest {
}
+ @Test
+ public void testRequestShutdownNoLeases() throws Exception {
+
+
+ IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
+ StreamConfig streamConfig = mock(StreamConfig.class);
+ IMetricsFactory metricsFactory = mock(IMetricsFactory.class);
+
+
+ final List leases = new ArrayList<>();
+ final List currentAssignments = new ArrayList<>();
+
+ when(leaseCoordinator.getAssignments()).thenAnswer(new Answer>() {
+ @Override
+ public List answer(InvocationOnMock invocation) throws Throwable {
+ return leases;
+ }
+ });
+ when(leaseCoordinator.getCurrentAssignments()).thenAnswer(new Answer>() {
+ @Override
+ public List answer(InvocationOnMock invocation) throws Throwable {
+ return currentAssignments;
+ }
+ });
+
+ IRecordProcessor processor = mock(IRecordProcessor.class);
+ when(recordProcessorFactory.createProcessor()).thenReturn(processor);
+
+
+ Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig,
+ INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
+ cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory,
+ taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization);
+
+ when(executorService.submit(Matchers.> any()))
+ .thenAnswer(new ShutdownHandlingAnswer(taskFuture));
+ when(taskFuture.isDone()).thenReturn(true);
+ when(taskFuture.get()).thenReturn(taskResult);
+
+ worker.runProcessLoop();
+
+ verify(executorService, never()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
+ .and(TaskTypeMatcher.isOfType(TaskType.BLOCK_ON_PARENT_SHARDS))));
+
+ worker.runProcessLoop();
+
+ verify(executorService, never()).submit(argThat(
+ both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.INITIALIZE))));
+
+ worker.requestShutdown();
+ worker.runProcessLoop();
+
+ verify(executorService, never()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
+ .and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN_NOTIFICATION))));
+
+ worker.runProcessLoop();
+ verify(executorService, never()).submit(argThat(
+ both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN))));
+
+ assertThat(worker.shouldShutdown(), equalTo(true));
+
+ }
+
+ @Test
+ public void testRequestShutdownWithLostLease() throws Exception {
+
+ IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
+ StreamConfig streamConfig = mock(StreamConfig.class);
+ IMetricsFactory metricsFactory = mock(IMetricsFactory.class);
+
+ ExtendedSequenceNumber checkpoint = new ExtendedSequenceNumber("123", 0L);
+ KinesisClientLease lease1 = makeLease(checkpoint, 1);
+ KinesisClientLease lease2 = makeLease(checkpoint, 2);
+ final List leases = new ArrayList<>();
+ final List currentAssignments = new ArrayList<>();
+ leases.add(lease1);
+ leases.add(lease2);
+
+ ShardInfo shardInfo1 = makeShardInfo(lease1);
+ currentAssignments.add(shardInfo1);
+ ShardInfo shardInfo2 = makeShardInfo(lease2);
+ currentAssignments.add(shardInfo2);
+
+ when(leaseCoordinator.getAssignments()).thenAnswer(new Answer>() {
+ @Override
+ public List answer(InvocationOnMock invocation) throws Throwable {
+ return leases;
+ }
+ });
+ when(leaseCoordinator.getCurrentAssignments()).thenAnswer(new Answer>() {
+ @Override
+ public List answer(InvocationOnMock invocation) throws Throwable {
+ return currentAssignments;
+ }
+ });
+
+ IRecordProcessor processor = mock(IRecordProcessor.class);
+ when(recordProcessorFactory.createProcessor()).thenReturn(processor);
+
+ Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig,
+ INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
+ cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory,
+ taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization);
+
+ when(executorService.submit(Matchers.> any()))
+ .thenAnswer(new ShutdownHandlingAnswer(taskFuture));
+ when(taskFuture.isDone()).thenReturn(true);
+ when(taskFuture.get()).thenReturn(taskResult);
+
+ worker.runProcessLoop();
+
+ verify(executorService).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
+ .and(TaskTypeMatcher.isOfType(TaskType.BLOCK_ON_PARENT_SHARDS)).and(ReflectionFieldMatcher
+ .withField(BlockOnParentShardTask.class, "shardInfo", equalTo(shardInfo1)))));
+ verify(executorService).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
+ .and(TaskTypeMatcher.isOfType(TaskType.BLOCK_ON_PARENT_SHARDS)).and(ReflectionFieldMatcher
+ .withField(BlockOnParentShardTask.class, "shardInfo", equalTo(shardInfo2)))));
+
+ worker.runProcessLoop();
+
+ verify(executorService).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
+ .and(TaskTypeMatcher.isOfType(TaskType.INITIALIZE)).and(ReflectionFieldMatcher
+ .withField(InitializeTask.class, "shardInfo", equalTo(shardInfo1)))));
+ verify(executorService).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
+ .and(TaskTypeMatcher.isOfType(TaskType.INITIALIZE)).and(ReflectionFieldMatcher
+ .withField(InitializeTask.class, "shardInfo", equalTo(shardInfo2)))));
+
+ worker.getShardInfoShardConsumerMap().remove(shardInfo2);
+ worker.requestShutdown();
+ leases.remove(1);
+ currentAssignments.remove(1);
+ worker.runProcessLoop();
+
+
+ verify(executorService).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
+ .and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN_NOTIFICATION)).and(ReflectionFieldMatcher
+ .withField(ShutdownNotificationTask.class, "shardInfo", equalTo(shardInfo1)))));
+
+ verify(executorService, never()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
+ .and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN_NOTIFICATION)).and(ReflectionFieldMatcher
+ .withField(ShutdownNotificationTask.class, "shardInfo", equalTo(shardInfo2)))));
+
+ worker.runProcessLoop();
+
+ verify(leaseCoordinator).dropLease(eq(lease1));
+ verify(leaseCoordinator, never()).dropLease(eq(lease2));
+ leases.clear();
+ currentAssignments.clear();
+
+ worker.runProcessLoop();
+
+ verify(executorService, atLeastOnce()).submit(argThat(
+ both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN))));
+
+ verify(executorService).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
+ .and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN)).and(ReflectionFieldMatcher
+ .withField(ShutdownTask.class, "shardInfo", equalTo(shardInfo1)))));
+
+ verify(executorService, never()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
+ .and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN)).and(ReflectionFieldMatcher
+ .withField(ShutdownTask.class, "shardInfo", equalTo(shardInfo2)))));
+
+ }
+
+ @Test
+ public void testRequestShutdownWithAllLeasesLost() throws Exception {
+
+ IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
+ StreamConfig streamConfig = mock(StreamConfig.class);
+ IMetricsFactory metricsFactory = mock(IMetricsFactory.class);
+
+ ExtendedSequenceNumber checkpoint = new ExtendedSequenceNumber("123", 0L);
+ KinesisClientLease lease1 = makeLease(checkpoint, 1);
+ KinesisClientLease lease2 = makeLease(checkpoint, 2);
+ final List leases = new ArrayList<>();
+ final List currentAssignments = new ArrayList<>();
+ leases.add(lease1);
+ leases.add(lease2);
+
+ ShardInfo shardInfo1 = makeShardInfo(lease1);
+ currentAssignments.add(shardInfo1);
+ ShardInfo shardInfo2 = makeShardInfo(lease2);
+ currentAssignments.add(shardInfo2);
+
+ when(leaseCoordinator.getAssignments()).thenAnswer(new Answer>() {
+ @Override
+ public List answer(InvocationOnMock invocation) throws Throwable {
+ return leases;
+ }
+ });
+ when(leaseCoordinator.getCurrentAssignments()).thenAnswer(new Answer>() {
+ @Override
+ public List answer(InvocationOnMock invocation) throws Throwable {
+ return currentAssignments;
+ }
+ });
+
+ IRecordProcessor processor = mock(IRecordProcessor.class);
+ when(recordProcessorFactory.createProcessor()).thenReturn(processor);
+
+ Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig,
+ INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
+ cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory,
+ taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization);
+
+ when(executorService.submit(Matchers.> any()))
+ .thenAnswer(new ShutdownHandlingAnswer(taskFuture));
+ when(taskFuture.isDone()).thenReturn(true);
+ when(taskFuture.get()).thenReturn(taskResult);
+
+ worker.runProcessLoop();
+
+ verify(executorService).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
+ .and(TaskTypeMatcher.isOfType(TaskType.BLOCK_ON_PARENT_SHARDS)).and(ReflectionFieldMatcher
+ .withField(BlockOnParentShardTask.class, "shardInfo", equalTo(shardInfo1)))));
+ verify(executorService).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
+ .and(TaskTypeMatcher.isOfType(TaskType.BLOCK_ON_PARENT_SHARDS)).and(ReflectionFieldMatcher
+ .withField(BlockOnParentShardTask.class, "shardInfo", equalTo(shardInfo2)))));
+
+ worker.runProcessLoop();
+
+ verify(executorService).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
+ .and(TaskTypeMatcher.isOfType(TaskType.INITIALIZE)).and(ReflectionFieldMatcher
+ .withField(InitializeTask.class, "shardInfo", equalTo(shardInfo1)))));
+ verify(executorService).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
+ .and(TaskTypeMatcher.isOfType(TaskType.INITIALIZE)).and(ReflectionFieldMatcher
+ .withField(InitializeTask.class, "shardInfo", equalTo(shardInfo2)))));
+
+ worker.getShardInfoShardConsumerMap().clear();
+ Future future = worker.requestShutdown();
+
+ leases.clear();
+ currentAssignments.clear();
+
+ try {
+ future.get(1, TimeUnit.HOURS);
+ } catch (TimeoutException te) {
+ fail("Future from requestShutdown should immediately return.");
+ }
+
+ worker.runProcessLoop();
+ verify(executorService, never()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
+ .and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN_NOTIFICATION)).and(ReflectionFieldMatcher
+ .withField(ShutdownNotificationTask.class, "shardInfo", equalTo(shardInfo1)))));
+
+ verify(executorService, never()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
+ .and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN_NOTIFICATION)).and(ReflectionFieldMatcher
+ .withField(ShutdownNotificationTask.class, "shardInfo", equalTo(shardInfo2)))));
+
+ worker.runProcessLoop();
+
+ verify(leaseCoordinator, never()).dropLease(eq(lease1));
+ verify(leaseCoordinator, never()).dropLease(eq(lease2));
+
+ worker.runProcessLoop();
+
+ verify(executorService, never()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
+ .and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN)).and(ReflectionFieldMatcher
+ .withField(ShutdownTask.class, "shardInfo", equalTo(shardInfo1)))));
+
+ verify(executorService, never()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
+ .and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN)).and(ReflectionFieldMatcher
+ .withField(ShutdownTask.class, "shardInfo", equalTo(shardInfo2)))));
+
+
+
+ assertThat(worker.shouldShutdown(), equalTo(true));
+
+ }
+
@Test
public void testLeaseCancelledAfterShutdownRequest() throws Exception {
@@ -919,6 +1194,17 @@ public class WorkerTest {
}
+ private KinesisClientLease makeLease(ExtendedSequenceNumber checkpoint, int shardId) {
+ return new KinesisClientLeaseBuilder().withCheckpoint(checkpoint).withConcurrencyToken(UUID.randomUUID())
+ .withLastCounterIncrementNanos(0L).withLeaseCounter(0L).withOwnerSwitchesSinceCheckpoint(0L)
+ .withLeaseOwner("Self").withLeaseKey(String.format("shardId-%03d", shardId)).build();
+ }
+
+ private ShardInfo makeShardInfo(KinesisClientLease lease) {
+ return new ShardInfo(lease.getLeaseKey(), lease.getConcurrencyToken().toString(), lease.getParentShardIds(),
+ lease.getCheckpoint());
+ }
+
private static class ShutdownReasonMatcher extends TypeSafeDiagnosingMatcher {
private final Matcher matcher;
@@ -1012,9 +1298,9 @@ public class WorkerTest {
private static class InnerTaskMatcher extends TypeSafeMatcher {
- final Matcher matcher;
+ final Matcher> matcher;
- InnerTaskMatcher(Matcher matcher) {
+ InnerTaskMatcher(Matcher> matcher) {
this.matcher = matcher;
}
@@ -1028,10 +1314,60 @@ public class WorkerTest {
matcher.describeTo(description);
}
- static InnerTaskMatcher taskWith(Class clazz, Matcher matcher) {
+ static InnerTaskMatcher taskWith(Class clazz, Matcher> matcher) {
return new InnerTaskMatcher<>(matcher);
}
}
+
+ @RequiredArgsConstructor
+ private static class ReflectionFieldMatcher
+ extends TypeSafeDiagnosingMatcher {
+
+ private final Class itemClass;
+ private final String fieldName;
+ private final Matcher> fieldMatcher;
+
+ @Override
+ protected boolean matchesSafely(MetricsCollectingTaskDecorator item, Description mismatchDescription) {
+ if (item.getOther() == null) {
+ mismatchDescription.appendText("inner task is null");
+ return false;
+ }
+ ITask inner = item.getOther();
+ if (!itemClass.equals(inner.getClass())) {
+ mismatchDescription.appendText("inner task isn't an instance of ").appendText(itemClass.getName());
+ return false;
+ }
+ try {
+ Field field = itemClass.getDeclaredField(fieldName);
+ field.setAccessible(true);
+ if (!fieldMatcher.matches(field.get(inner))) {
+ mismatchDescription.appendText("Field '").appendText(fieldName).appendText("' doesn't match: ")
+ .appendDescriptionOf(fieldMatcher);
+ return false;
+ }
+ return true;
+ } catch (NoSuchFieldException e) {
+ mismatchDescription.appendText(itemClass.getName()).appendText(" doesn't have a field named ")
+ .appendText(fieldName);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+
+ return false;
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description.appendText("An item of ").appendText(itemClass.getName()).appendText(" with the field '")
+ .appendText(fieldName).appendText("' matching ").appendDescriptionOf(fieldMatcher);
+ }
+
+ static ReflectionFieldMatcher withField(Class itemClass, String fieldName,
+ Matcher> fieldMatcher) {
+ return new ReflectionFieldMatcher<>(itemClass, fieldName, fieldMatcher);
+ }
+ }
/**
* Returns executor service that will be owned by the worker. This is useful to test the scenario
* where worker shuts down the executor service also during shutdown flow.