Merge branch 'master' into fix-javadoc

This commit is contained in:
Pfifer, Justin 2017-02-17 09:55:40 -08:00
commit 24784485b8
11 changed files with 636 additions and 90 deletions

View file

@ -2,7 +2,7 @@ Manifest-Version: 1.0
Bundle-ManifestVersion: 2 Bundle-ManifestVersion: 2
Bundle-Name: Amazon Kinesis Client Library for Java Bundle-Name: Amazon Kinesis Client Library for Java
Bundle-SymbolicName: com.amazonaws.kinesisclientlibrary;singleton:=true Bundle-SymbolicName: com.amazonaws.kinesisclientlibrary;singleton:=true
Bundle-Version: 1.7.3 Bundle-Version: 1.7.4
Bundle-Vendor: Amazon Technologies, Inc Bundle-Vendor: Amazon Technologies, Inc
Bundle-RequiredExecutionEnvironment: JavaSE-1.7 Bundle-RequiredExecutionEnvironment: JavaSE-1.7
Require-Bundle: org.apache.commons.codec;bundle-version="1.6", Require-Bundle: org.apache.commons.codec;bundle-version="1.6",

View file

@ -6,7 +6,7 @@
<artifactId>amazon-kinesis-client</artifactId> <artifactId>amazon-kinesis-client</artifactId>
<packaging>jar</packaging> <packaging>jar</packaging>
<name>Amazon Kinesis Client Library for Java</name> <name>Amazon Kinesis Client Library for Java</name>
<version>1.7.3</version> <version>1.7.4-SNAPSHOT</version>
<description>The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data <description>The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data
from Amazon Kinesis. from Amazon Kinesis.
</description> </description>
@ -25,7 +25,7 @@
</licenses> </licenses>
<properties> <properties>
<aws-java-sdk.version>1.11.76</aws-java-sdk.version> <aws-java-sdk.version>1.11.91</aws-java-sdk.version>
<sqlite4java.version>1.0.392</sqlite4java.version> <sqlite4java.version>1.0.392</sqlite4java.version>
<sqlite4java.native>libsqlite4java</sqlite4java.native> <sqlite4java.native>libsqlite4java</sqlite4java.native>
<sqlite4java.libpath>${project.build.directory}/test-lib</sqlite4java.libpath> <sqlite4java.libpath>${project.build.directory}/test-lib</sqlite4java.libpath>

View file

@ -355,7 +355,7 @@ class ConsumerStates {
@Override @Override
public ITask createTask(ShardConsumer consumer) { public ITask createTask(ShardConsumer consumer) {
return new ShutdownNotificationTask(consumer.getRecordProcessor(), consumer.getRecordProcessorCheckpointer(), return new ShutdownNotificationTask(consumer.getRecordProcessor(), consumer.getRecordProcessorCheckpointer(),
consumer.getShutdownNotification()); consumer.getShutdownNotification(), consumer.getShardInfo());
} }
@Override @Override

View file

@ -120,7 +120,7 @@ public class KinesisClientLibConfiguration {
/** /**
* User agent set when Amazon Kinesis Client Library makes AWS requests. * User agent set when Amazon Kinesis Client Library makes AWS requests.
*/ */
public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.7.3"; public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.7.4";
/** /**
* KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls * KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls

View file

@ -34,6 +34,7 @@ import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope;
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
import com.amazonaws.services.kinesis.model.ExpiredIteratorException; import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import com.amazonaws.services.kinesis.model.GetRecordsResult; import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.Shard; import com.amazonaws.services.kinesis.model.Shard;
@ -49,6 +50,7 @@ class ProcessTask implements ITask {
private static final String RECORDS_PROCESSED_METRIC = "RecordsProcessed"; private static final String RECORDS_PROCESSED_METRIC = "RecordsProcessed";
private static final String MILLIS_BEHIND_LATEST_METRIC = "MillisBehindLatest"; private static final String MILLIS_BEHIND_LATEST_METRIC = "MillisBehindLatest";
private static final String RECORD_PROCESSOR_PROCESS_RECORDS_METRIC = "RecordProcessor.processRecords"; private static final String RECORD_PROCESSOR_PROCESS_RECORDS_METRIC = "RecordProcessor.processRecords";
private static final int MAX_CONSECUTIVE_THROTTLES = 5;
private final ShardInfo shardInfo; private final ShardInfo shardInfo;
private final IRecordProcessor recordProcessor; private final IRecordProcessor recordProcessor;
@ -58,23 +60,50 @@ class ProcessTask implements ITask {
private final StreamConfig streamConfig; private final StreamConfig streamConfig;
private final long backoffTimeMillis; private final long backoffTimeMillis;
private final Shard shard; private final Shard shard;
private final ThrottlingReporter throttlingReporter;
/** /**
* @param shardInfo contains information about the shard * @param shardInfo
* @param streamConfig Stream configuration * contains information about the shard
* @param recordProcessor Record processor used to process the data records for the shard * @param streamConfig
* @param recordProcessorCheckpointer Passed to the RecordProcessor so it can checkpoint * Stream configuration
* progress * @param recordProcessor
* @param dataFetcher Kinesis data fetcher (used to fetch records from Kinesis) * Record processor used to process the data records for the shard
* @param backoffTimeMillis backoff time when catching exceptions * @param recordProcessorCheckpointer
* Passed to the RecordProcessor so it can checkpoint progress
* @param dataFetcher
* Kinesis data fetcher (used to fetch records from Kinesis)
* @param backoffTimeMillis
* backoff time when catching exceptions
*/ */
public ProcessTask(ShardInfo shardInfo, public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor,
StreamConfig streamConfig, RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher,
IRecordProcessor recordProcessor, long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist) {
RecordProcessorCheckpointer recordProcessorCheckpointer, this(shardInfo, streamConfig, recordProcessor, recordProcessorCheckpointer, dataFetcher, backoffTimeMillis,
KinesisDataFetcher dataFetcher, skipShardSyncAtWorkerInitializationIfLeasesExist,
long backoffTimeMillis, new ThrottlingReporter(MAX_CONSECUTIVE_THROTTLES, shardInfo.getShardId()));
boolean skipShardSyncAtWorkerInitializationIfLeasesExist) { }
/**
* @param shardInfo
* contains information about the shard
* @param streamConfig
* Stream configuration
* @param recordProcessor
* Record processor used to process the data records for the shard
* @param recordProcessorCheckpointer
* Passed to the RecordProcessor so it can checkpoint progress
* @param dataFetcher
* Kinesis data fetcher (used to fetch records from Kinesis)
* @param backoffTimeMillis
* backoff time when catching exceptions
* @param throttlingReporter
* determines how throttling events should be reported in the log.
*/
public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor,
RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher,
long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
ThrottlingReporter throttlingReporter) {
super(); super();
this.shardInfo = shardInfo; this.shardInfo = shardInfo;
this.recordProcessor = recordProcessor; this.recordProcessor = recordProcessor;
@ -82,6 +111,7 @@ class ProcessTask implements ITask {
this.dataFetcher = dataFetcher; this.dataFetcher = dataFetcher;
this.streamConfig = streamConfig; this.streamConfig = streamConfig;
this.backoffTimeMillis = backoffTimeMillis; this.backoffTimeMillis = backoffTimeMillis;
this.throttlingReporter = throttlingReporter;
IKinesisProxy kinesisProxy = this.streamConfig.getStreamProxy(); IKinesisProxy kinesisProxy = this.streamConfig.getStreamProxy();
// If skipShardSyncAtWorkerInitializationIfLeasesExist is set, we will not get the shard for // If skipShardSyncAtWorkerInitializationIfLeasesExist is set, we will not get the shard for
// this ProcessTask. In this case, duplicate KPL user records in the event of resharding will // this ProcessTask. In this case, duplicate KPL user records in the event of resharding will
@ -104,9 +134,6 @@ class ProcessTask implements ITask {
* *
* @see com.amazonaws.services.kinesis.clientlibrary.lib.worker.ITask#call() * @see com.amazonaws.services.kinesis.clientlibrary.lib.worker.ITask#call()
*/ */
// CHECKSTYLE:OFF CyclomaticComplexity
@SuppressWarnings("unchecked")
@Override @Override
public TaskResult call() { public TaskResult call() {
long startTimeMillis = System.currentTimeMillis(); long startTimeMillis = System.currentTimeMillis();
@ -120,85 +147,138 @@ class ProcessTask implements ITask {
try { try {
if (dataFetcher.isShardEndReached()) { if (dataFetcher.isShardEndReached()) {
LOG.info("Reached end of shard " + shardInfo.getShardId()); LOG.info("Reached end of shard " + shardInfo.getShardId());
boolean shardEndReached = true; return new TaskResult(null, true);
return new TaskResult(null, shardEndReached);
} }
final GetRecordsResult getRecordsResult = getRecordsResult(); final GetRecordsResult getRecordsResult = getRecordsResult();
throttlingReporter.success();
List<Record> records = getRecordsResult.getRecords(); List<Record> records = getRecordsResult.getRecords();
if (!records.isEmpty()) { if (!records.isEmpty()) {
scope.addData(RECORDS_PROCESSED_METRIC, records.size(), StandardUnit.Count, MetricsLevel.SUMMARY); scope.addData(RECORDS_PROCESSED_METRIC, records.size(), StandardUnit.Count, MetricsLevel.SUMMARY);
} else { } else {
LOG.debug("Kinesis didn't return any records for shard " + shardInfo.getShardId()); handleNoRecords(startTimeMillis);
}
records = deaggregateRecords(records);
long sleepTimeMillis =
streamConfig.getIdleTimeInMilliseconds() - (System.currentTimeMillis() - startTimeMillis);
if (sleepTimeMillis > 0) {
sleepTimeMillis = Math.max(sleepTimeMillis, streamConfig.getIdleTimeInMilliseconds());
try {
LOG.debug("Sleeping for " + sleepTimeMillis + " ms since there were no new records in shard "
+ shardInfo.getShardId());
Thread.sleep(sleepTimeMillis);
} catch (InterruptedException e) {
LOG.debug("ShardId " + shardInfo.getShardId() + ": Sleep was interrupted");
}
}
}
// We deaggregate if and only if we got actual Kinesis records, i.e.
// not instances of some subclass thereof.
if (!records.isEmpty() && records.get(0).getClass().equals(Record.class)) {
if (this.shard != null) {
records = (List<Record>) (List<?>) UserRecord.deaggregate(records,
new BigInteger(this.shard.getHashKeyRange().getStartingHashKey()),
new BigInteger(this.shard.getHashKeyRange().getEndingHashKey()));
} else {
records = (List<Record>) (List<?>) UserRecord.deaggregate(records);
}
}
recordProcessorCheckpointer.setLargestPermittedCheckpointValue( recordProcessorCheckpointer.setLargestPermittedCheckpointValue(
filterAndGetMaxExtendedSequenceNumber(scope, records, filterAndGetMaxExtendedSequenceNumber(scope, records,
recordProcessorCheckpointer.getLastCheckpointValue(), recordProcessorCheckpointer.getLastCheckpointValue(),
recordProcessorCheckpointer.getLargestPermittedCheckpointValue())); recordProcessorCheckpointer.getLargestPermittedCheckpointValue()));
if ((!records.isEmpty()) || streamConfig.shouldCallProcessRecordsEvenForEmptyRecordList()) { if (shouldCallProcessRecords(records)) {
LOG.debug("Calling application processRecords() with " + records.size() callProcessRecords(getRecordsResult, records);
+ " records from " + shardInfo.getShardId());
final ProcessRecordsInput processRecordsInput = new ProcessRecordsInput()
.withRecords(records)
.withCheckpointer(recordProcessorCheckpointer)
.withMillisBehindLatest(getRecordsResult.getMillisBehindLatest());
final long recordProcessorStartTimeMillis = System.currentTimeMillis();
try {
recordProcessor.processRecords(processRecordsInput);
} catch (Exception e) {
LOG.error("ShardId " + shardInfo.getShardId()
+ ": Application processRecords() threw an exception when processing shard ", e);
LOG.error("ShardId " + shardInfo.getShardId() + ": Skipping over the following data records: "
+ records);
} finally {
MetricsHelper.addLatencyPerShard(shardInfo.getShardId(), RECORD_PROCESSOR_PROCESS_RECORDS_METRIC,
recordProcessorStartTimeMillis, MetricsLevel.SUMMARY);
}
} }
} catch (ProvisionedThroughputExceededException pte) {
throttlingReporter.throttled();
exception = pte;
backoff();
} catch (RuntimeException e) { } catch (RuntimeException e) {
LOG.error("ShardId " + shardInfo.getShardId() + ": Caught exception: ", e); LOG.error("ShardId " + shardInfo.getShardId() + ": Caught exception: ", e);
exception = e; exception = e;
backoff();
// backoff if we encounter an exception.
try {
Thread.sleep(this.backoffTimeMillis);
} catch (InterruptedException ie) {
LOG.debug(shardInfo.getShardId() + ": Sleep was interrupted", ie);
}
} }
return new TaskResult(exception); return new TaskResult(exception);
} }
// CHECKSTYLE:ON CyclomaticComplexity
/**
* Sleeps for the configured backoff period. This is usually only called when an exception occurs.
*/
private void backoff() {
// backoff if we encounter an exception.
try {
Thread.sleep(this.backoffTimeMillis);
} catch (InterruptedException ie) {
LOG.debug(shardInfo.getShardId() + ": Sleep was interrupted", ie);
}
}
/**
* Dispatches a batch of records to the record processor, and handles any fallout from that.
*
* @param getRecordsResult
* the result of the last call to Kinesis
* @param records
* the records to be dispatched. It's possible the records have been adjusted by KPL deaggregation.
*/
private void callProcessRecords(GetRecordsResult getRecordsResult, List<Record> records) {
LOG.debug("Calling application processRecords() with " + records.size() + " records from "
+ shardInfo.getShardId());
final ProcessRecordsInput processRecordsInput = new ProcessRecordsInput().withRecords(records)
.withCheckpointer(recordProcessorCheckpointer)
.withMillisBehindLatest(getRecordsResult.getMillisBehindLatest());
final long recordProcessorStartTimeMillis = System.currentTimeMillis();
try {
recordProcessor.processRecords(processRecordsInput);
} catch (Exception e) {
LOG.error("ShardId " + shardInfo.getShardId()
+ ": Application processRecords() threw an exception when processing shard ", e);
LOG.error("ShardId " + shardInfo.getShardId() + ": Skipping over the following data records: " + records);
} finally {
MetricsHelper.addLatencyPerShard(shardInfo.getShardId(), RECORD_PROCESSOR_PROCESS_RECORDS_METRIC,
recordProcessorStartTimeMillis, MetricsLevel.SUMMARY);
}
}
/**
* Whether we should call process records or not
*
* @param records
* the records returned from the call to Kinesis, and/or deaggregation
* @return true if the set of records should be dispatched to the record process, false if they should not.
*/
private boolean shouldCallProcessRecords(List<Record> records) {
return (!records.isEmpty()) || streamConfig.shouldCallProcessRecordsEvenForEmptyRecordList();
}
/**
* Determines whether to deaggregate the given records, and if they are KPL records dispatches them to deaggregation
*
* @param records
* the records to deaggregate is deaggregation is required.
* @return returns either the deaggregated records, or the original records
*/
@SuppressWarnings("unchecked")
private List<Record> deaggregateRecords(List<Record> records) {
// We deaggregate if and only if we got actual Kinesis records, i.e.
// not instances of some subclass thereof.
if (!records.isEmpty() && records.get(0).getClass().equals(Record.class)) {
if (this.shard != null) {
return (List<Record>) (List<?>) UserRecord.deaggregate(records,
new BigInteger(this.shard.getHashKeyRange().getStartingHashKey()),
new BigInteger(this.shard.getHashKeyRange().getEndingHashKey()));
} else {
return (List<Record>) (List<?>) UserRecord.deaggregate(records);
}
}
return records;
}
/**
* Emits metrics, and sleeps if there are no records available
*
* @param startTimeMillis
* the time when the task started
*/
private void handleNoRecords(long startTimeMillis) {
LOG.debug("Kinesis didn't return any records for shard " + shardInfo.getShardId());
long sleepTimeMillis = streamConfig.getIdleTimeInMilliseconds()
- (System.currentTimeMillis() - startTimeMillis);
if (sleepTimeMillis > 0) {
sleepTimeMillis = Math.max(sleepTimeMillis, streamConfig.getIdleTimeInMilliseconds());
try {
LOG.debug("Sleeping for " + sleepTimeMillis + " ms since there were no new records in shard "
+ shardInfo.getShardId());
Thread.sleep(sleepTimeMillis);
} catch (InterruptedException e) {
LOG.debug("ShardId " + shardInfo.getShardId() + ": Sleep was interrupted");
}
}
}
@Override @Override
public TaskType getTaskType() { public TaskType getTaskType() {

View file

@ -12,11 +12,13 @@ class ShutdownNotificationTask implements ITask {
private final IRecordProcessor recordProcessor; private final IRecordProcessor recordProcessor;
private final IRecordProcessorCheckpointer recordProcessorCheckpointer; private final IRecordProcessorCheckpointer recordProcessorCheckpointer;
private final ShutdownNotification shutdownNotification; private final ShutdownNotification shutdownNotification;
private final ShardInfo shardInfo;
ShutdownNotificationTask(IRecordProcessor recordProcessor, IRecordProcessorCheckpointer recordProcessorCheckpointer, ShutdownNotification shutdownNotification) { ShutdownNotificationTask(IRecordProcessor recordProcessor, IRecordProcessorCheckpointer recordProcessorCheckpointer, ShutdownNotification shutdownNotification, ShardInfo shardInfo) {
this.recordProcessor = recordProcessor; this.recordProcessor = recordProcessor;
this.recordProcessorCheckpointer = recordProcessorCheckpointer; this.recordProcessorCheckpointer = recordProcessorCheckpointer;
this.shutdownNotification = shutdownNotification; this.shutdownNotification = shutdownNotification;
this.shardInfo = shardInfo;
} }
@Override @Override

View file

@ -0,0 +1,38 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.apachecommons.CommonsLog;
import org.apache.commons.logging.Log;
@RequiredArgsConstructor
@CommonsLog
class ThrottlingReporter {
private final int maxConsecutiveWarnThrottles;
private final String shardId;
private int consecutiveThrottles = 0;
void throttled() {
consecutiveThrottles++;
String message = "Shard '" + shardId + "' has been throttled "
+ consecutiveThrottles + " consecutively";
if (consecutiveThrottles > maxConsecutiveWarnThrottles) {
getLog().error(message);
} else {
getLog().warn(message);
}
}
void success() {
consecutiveThrottles = 0;
}
protected Log getLog() {
return log;
}
}

View file

@ -538,15 +538,18 @@ public class Worker implements Runnable {
*/ */
public Future<Void> requestShutdown() { public Future<Void> requestShutdown() {
//
// Stop accepting new leases. Once we do this we can be sure that
// no more leases will be acquired.
//
leaseCoordinator.stopLeaseTaker(); leaseCoordinator.stopLeaseTaker();
//
// Stop accepting new leases
//
Collection<KinesisClientLease> leases = leaseCoordinator.getAssignments(); Collection<KinesisClientLease> leases = leaseCoordinator.getAssignments();
if (leases == null || leases.isEmpty()) { if (leases == null || leases.isEmpty()) {
// //
// If there are no leases shutdown is already completed. // If there are no leases notification is already completed, but we still need to shutdown the worker.
// //
this.shutdown();
return Futures.immediateFuture(null); return Futures.immediateFuture(null);
} }
CountDownLatch shutdownCompleteLatch = new CountDownLatch(leases.size()); CountDownLatch shutdownCompleteLatch = new CountDownLatch(leases.size());
@ -555,7 +558,18 @@ public class Worker implements Runnable {
ShutdownNotification shutdownNotification = new ShardConsumerShutdownNotification(leaseCoordinator, lease, ShutdownNotification shutdownNotification = new ShardConsumerShutdownNotification(leaseCoordinator, lease,
notificationCompleteLatch, shutdownCompleteLatch); notificationCompleteLatch, shutdownCompleteLatch);
ShardInfo shardInfo = KinesisClientLibLeaseCoordinator.convertLeaseToAssignment(lease); ShardInfo shardInfo = KinesisClientLibLeaseCoordinator.convertLeaseToAssignment(lease);
shardInfoShardConsumerMap.get(shardInfo).notifyShutdownRequested(shutdownNotification); ShardConsumer consumer = shardInfoShardConsumerMap.get(shardInfo);
if (consumer != null) {
consumer.notifyShutdownRequested(shutdownNotification);
} else {
//
// There is a race condition between retrieving the current assignments, and creating the
// notification. If the a lease is lost in between these two points, we explicitly decrement the
// notification latches to clear the shutdown.
//
notificationCompleteLatch.countDown();
shutdownCompleteLatch.countDown();
}
} }
return new ShutdownFuture(shutdownCompleteLatch, notificationCompleteLatch, this); return new ShutdownFuture(shutdownCompleteLatch, notificationCompleteLatch, this);
@ -622,9 +636,11 @@ public class Worker implements Runnable {
/** /**
* Returns whether worker can shutdown immediately. Note that this method is called from Worker's {{@link #run()} * Returns whether worker can shutdown immediately. Note that this method is called from Worker's {{@link #run()}
* method before every loop run, so method must do minimum amount of work to not impact shard processing timings. * method before every loop run, so method must do minimum amount of work to not impact shard processing timings.
*
* @return Whether worker should shutdown immediately. * @return Whether worker should shutdown immediately.
*/ */
private boolean shouldShutdown() { @VisibleForTesting
boolean shouldShutdown() {
if (executorService.isShutdown()) { if (executorService.isShutdown()) {
LOG.error("Worker executor service has been shutdown, so record processors cannot be shutdown."); LOG.error("Worker executor service has been shutdown, so record processors cannot be shutdown.");
return true; return true;

View file

@ -19,8 +19,10 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame; import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -72,6 +74,8 @@ public class ProcessTaskTest {
private @Mock KinesisDataFetcher mockDataFetcher; private @Mock KinesisDataFetcher mockDataFetcher;
private @Mock IRecordProcessor mockRecordProcessor; private @Mock IRecordProcessor mockRecordProcessor;
private @Mock RecordProcessorCheckpointer mockCheckpointer; private @Mock RecordProcessorCheckpointer mockCheckpointer;
@Mock
private ThrottlingReporter throttlingReporter;
private List<Record> processedRecords; private List<Record> processedRecords;
private ExtendedSequenceNumber newLargestPermittedCheckpointValue; private ExtendedSequenceNumber newLargestPermittedCheckpointValue;
@ -90,7 +94,7 @@ public class ProcessTaskTest {
final ShardInfo shardInfo = new ShardInfo(shardId, null, null, null); final ShardInfo shardInfo = new ShardInfo(shardId, null, null, null);
processTask = new ProcessTask( processTask = new ProcessTask(
shardInfo, config, mockRecordProcessor, mockCheckpointer, mockDataFetcher, taskBackoffTimeMillis, shardInfo, config, mockRecordProcessor, mockCheckpointer, mockDataFetcher, taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, throttlingReporter);
} }
@Test @Test
@ -101,6 +105,8 @@ public class ProcessTaskTest {
.getRecords(maxRecords); .getRecords(maxRecords);
TaskResult result = processTask.call(); TaskResult result = processTask.call();
verify(throttlingReporter).throttled();
verify(throttlingReporter, never()).success();
assertTrue("Result should contain ProvisionedThroughputExceededException", assertTrue("Result should contain ProvisionedThroughputExceededException",
result.getException() instanceof ProvisionedThroughputExceededException); result.getException() instanceof ProvisionedThroughputExceededException);
} }
@ -299,6 +305,8 @@ public class ProcessTaskTest {
when(mockCheckpointer.getLastCheckpointValue()).thenReturn(lastCheckpointValue); when(mockCheckpointer.getLastCheckpointValue()).thenReturn(lastCheckpointValue);
when(mockCheckpointer.getLargestPermittedCheckpointValue()).thenReturn(largestPermittedCheckpointValue); when(mockCheckpointer.getLargestPermittedCheckpointValue()).thenReturn(largestPermittedCheckpointValue);
processTask.call(); processTask.call();
verify(throttlingReporter).success();
verify(throttlingReporter, never()).throttled();
ArgumentCaptor<ProcessRecordsInput> priCaptor = ArgumentCaptor.forClass(ProcessRecordsInput.class); ArgumentCaptor<ProcessRecordsInput> priCaptor = ArgumentCaptor.forClass(ProcessRecordsInput.class);
verify(mockRecordProcessor).processRecords(priCaptor.capture()); verify(mockRecordProcessor).processRecords(priCaptor.capture());

View file

@ -0,0 +1,66 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import org.apache.commons.logging.Log;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class ThrottlingReporterTest {
private static final String SHARD_ID = "Shard-001";
@Mock
private Log throttleLog;
@Test
public void testLessThanMaxThrottles() {
ThrottlingReporter reporter = new LogTestingThrottingReporter(5, SHARD_ID);
reporter.throttled();
verify(throttleLog).warn(any(Object.class));
verify(throttleLog, never()).error(any(Object.class));
}
@Test
public void testMoreThanMaxThrottles() {
ThrottlingReporter reporter = new LogTestingThrottingReporter(1, SHARD_ID);
reporter.throttled();
reporter.throttled();
verify(throttleLog).warn(any(Object.class));
verify(throttleLog).error(any(Object.class));
}
@Test
public void testSuccessResetsErrors() {
ThrottlingReporter reporter = new LogTestingThrottingReporter(1, SHARD_ID);
reporter.throttled();
reporter.throttled();
reporter.throttled();
reporter.throttled();
reporter.success();
reporter.throttled();
verify(throttleLog, times(2)).warn(any(Object.class));
verify(throttleLog, times(3)).error(any(Object.class));
}
private class LogTestingThrottingReporter extends ThrottlingReporter {
public LogTestingThrottingReporter(int maxConsecutiveWarnThrottles, String shardId) {
super(maxConsecutiveWarnThrottles, shardId);
}
@Override
protected Log getLog() {
return throttleLog;
}
}
}

View file

@ -19,6 +19,7 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.isA; import static org.hamcrest.CoreMatchers.isA;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
@ -27,6 +28,7 @@ import static org.mockito.Mockito.*;
import java.io.File; import java.io.File;
import java.lang.Thread.State; import java.lang.Thread.State;
import java.lang.reflect.Field;
import java.math.BigInteger; import java.math.BigInteger;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
@ -47,6 +49,7 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -96,6 +99,8 @@ import com.amazonaws.services.kinesis.model.Shard;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.RequiredArgsConstructor;
/** /**
* Unit tests of Worker. * Unit tests of Worker.
*/ */
@ -776,6 +781,276 @@ public class WorkerTest {
} }
@Test
public void testRequestShutdownNoLeases() throws Exception {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
StreamConfig streamConfig = mock(StreamConfig.class);
IMetricsFactory metricsFactory = mock(IMetricsFactory.class);
final List<KinesisClientLease> leases = new ArrayList<>();
final List<ShardInfo> currentAssignments = new ArrayList<>();
when(leaseCoordinator.getAssignments()).thenAnswer(new Answer<List<KinesisClientLease>>() {
@Override
public List<KinesisClientLease> answer(InvocationOnMock invocation) throws Throwable {
return leases;
}
});
when(leaseCoordinator.getCurrentAssignments()).thenAnswer(new Answer<List<ShardInfo>>() {
@Override
public List<ShardInfo> answer(InvocationOnMock invocation) throws Throwable {
return currentAssignments;
}
});
IRecordProcessor processor = mock(IRecordProcessor.class);
when(recordProcessorFactory.createProcessor()).thenReturn(processor);
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig,
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory,
taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization);
when(executorService.submit(Matchers.<Callable<TaskResult>> any()))
.thenAnswer(new ShutdownHandlingAnswer(taskFuture));
when(taskFuture.isDone()).thenReturn(true);
when(taskFuture.get()).thenReturn(taskResult);
worker.runProcessLoop();
verify(executorService, never()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
.and(TaskTypeMatcher.isOfType(TaskType.BLOCK_ON_PARENT_SHARDS))));
worker.runProcessLoop();
verify(executorService, never()).submit(argThat(
both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.INITIALIZE))));
worker.requestShutdown();
worker.runProcessLoop();
verify(executorService, never()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
.and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN_NOTIFICATION))));
worker.runProcessLoop();
verify(executorService, never()).submit(argThat(
both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN))));
assertThat(worker.shouldShutdown(), equalTo(true));
}
@Test
public void testRequestShutdownWithLostLease() throws Exception {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
StreamConfig streamConfig = mock(StreamConfig.class);
IMetricsFactory metricsFactory = mock(IMetricsFactory.class);
ExtendedSequenceNumber checkpoint = new ExtendedSequenceNumber("123", 0L);
KinesisClientLease lease1 = makeLease(checkpoint, 1);
KinesisClientLease lease2 = makeLease(checkpoint, 2);
final List<KinesisClientLease> leases = new ArrayList<>();
final List<ShardInfo> currentAssignments = new ArrayList<>();
leases.add(lease1);
leases.add(lease2);
ShardInfo shardInfo1 = makeShardInfo(lease1);
currentAssignments.add(shardInfo1);
ShardInfo shardInfo2 = makeShardInfo(lease2);
currentAssignments.add(shardInfo2);
when(leaseCoordinator.getAssignments()).thenAnswer(new Answer<List<KinesisClientLease>>() {
@Override
public List<KinesisClientLease> answer(InvocationOnMock invocation) throws Throwable {
return leases;
}
});
when(leaseCoordinator.getCurrentAssignments()).thenAnswer(new Answer<List<ShardInfo>>() {
@Override
public List<ShardInfo> answer(InvocationOnMock invocation) throws Throwable {
return currentAssignments;
}
});
IRecordProcessor processor = mock(IRecordProcessor.class);
when(recordProcessorFactory.createProcessor()).thenReturn(processor);
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig,
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory,
taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization);
when(executorService.submit(Matchers.<Callable<TaskResult>> any()))
.thenAnswer(new ShutdownHandlingAnswer(taskFuture));
when(taskFuture.isDone()).thenReturn(true);
when(taskFuture.get()).thenReturn(taskResult);
worker.runProcessLoop();
verify(executorService).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
.and(TaskTypeMatcher.isOfType(TaskType.BLOCK_ON_PARENT_SHARDS)).and(ReflectionFieldMatcher
.withField(BlockOnParentShardTask.class, "shardInfo", equalTo(shardInfo1)))));
verify(executorService).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
.and(TaskTypeMatcher.isOfType(TaskType.BLOCK_ON_PARENT_SHARDS)).and(ReflectionFieldMatcher
.withField(BlockOnParentShardTask.class, "shardInfo", equalTo(shardInfo2)))));
worker.runProcessLoop();
verify(executorService).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
.and(TaskTypeMatcher.isOfType(TaskType.INITIALIZE)).and(ReflectionFieldMatcher
.withField(InitializeTask.class, "shardInfo", equalTo(shardInfo1)))));
verify(executorService).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
.and(TaskTypeMatcher.isOfType(TaskType.INITIALIZE)).and(ReflectionFieldMatcher
.withField(InitializeTask.class, "shardInfo", equalTo(shardInfo2)))));
worker.getShardInfoShardConsumerMap().remove(shardInfo2);
worker.requestShutdown();
leases.remove(1);
currentAssignments.remove(1);
worker.runProcessLoop();
verify(executorService).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
.and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN_NOTIFICATION)).and(ReflectionFieldMatcher
.withField(ShutdownNotificationTask.class, "shardInfo", equalTo(shardInfo1)))));
verify(executorService, never()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
.and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN_NOTIFICATION)).and(ReflectionFieldMatcher
.withField(ShutdownNotificationTask.class, "shardInfo", equalTo(shardInfo2)))));
worker.runProcessLoop();
verify(leaseCoordinator).dropLease(eq(lease1));
verify(leaseCoordinator, never()).dropLease(eq(lease2));
leases.clear();
currentAssignments.clear();
worker.runProcessLoop();
verify(executorService, atLeastOnce()).submit(argThat(
both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN))));
verify(executorService).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
.and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN)).and(ReflectionFieldMatcher
.withField(ShutdownTask.class, "shardInfo", equalTo(shardInfo1)))));
verify(executorService, never()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
.and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN)).and(ReflectionFieldMatcher
.withField(ShutdownTask.class, "shardInfo", equalTo(shardInfo2)))));
}
@Test
public void testRequestShutdownWithAllLeasesLost() throws Exception {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
StreamConfig streamConfig = mock(StreamConfig.class);
IMetricsFactory metricsFactory = mock(IMetricsFactory.class);
ExtendedSequenceNumber checkpoint = new ExtendedSequenceNumber("123", 0L);
KinesisClientLease lease1 = makeLease(checkpoint, 1);
KinesisClientLease lease2 = makeLease(checkpoint, 2);
final List<KinesisClientLease> leases = new ArrayList<>();
final List<ShardInfo> currentAssignments = new ArrayList<>();
leases.add(lease1);
leases.add(lease2);
ShardInfo shardInfo1 = makeShardInfo(lease1);
currentAssignments.add(shardInfo1);
ShardInfo shardInfo2 = makeShardInfo(lease2);
currentAssignments.add(shardInfo2);
when(leaseCoordinator.getAssignments()).thenAnswer(new Answer<List<KinesisClientLease>>() {
@Override
public List<KinesisClientLease> answer(InvocationOnMock invocation) throws Throwable {
return leases;
}
});
when(leaseCoordinator.getCurrentAssignments()).thenAnswer(new Answer<List<ShardInfo>>() {
@Override
public List<ShardInfo> answer(InvocationOnMock invocation) throws Throwable {
return currentAssignments;
}
});
IRecordProcessor processor = mock(IRecordProcessor.class);
when(recordProcessorFactory.createProcessor()).thenReturn(processor);
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig,
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory,
taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization);
when(executorService.submit(Matchers.<Callable<TaskResult>> any()))
.thenAnswer(new ShutdownHandlingAnswer(taskFuture));
when(taskFuture.isDone()).thenReturn(true);
when(taskFuture.get()).thenReturn(taskResult);
worker.runProcessLoop();
verify(executorService).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
.and(TaskTypeMatcher.isOfType(TaskType.BLOCK_ON_PARENT_SHARDS)).and(ReflectionFieldMatcher
.withField(BlockOnParentShardTask.class, "shardInfo", equalTo(shardInfo1)))));
verify(executorService).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
.and(TaskTypeMatcher.isOfType(TaskType.BLOCK_ON_PARENT_SHARDS)).and(ReflectionFieldMatcher
.withField(BlockOnParentShardTask.class, "shardInfo", equalTo(shardInfo2)))));
worker.runProcessLoop();
verify(executorService).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
.and(TaskTypeMatcher.isOfType(TaskType.INITIALIZE)).and(ReflectionFieldMatcher
.withField(InitializeTask.class, "shardInfo", equalTo(shardInfo1)))));
verify(executorService).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
.and(TaskTypeMatcher.isOfType(TaskType.INITIALIZE)).and(ReflectionFieldMatcher
.withField(InitializeTask.class, "shardInfo", equalTo(shardInfo2)))));
worker.getShardInfoShardConsumerMap().clear();
Future<Void> future = worker.requestShutdown();
leases.clear();
currentAssignments.clear();
try {
future.get(1, TimeUnit.HOURS);
} catch (TimeoutException te) {
fail("Future from requestShutdown should immediately return.");
}
worker.runProcessLoop();
verify(executorService, never()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
.and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN_NOTIFICATION)).and(ReflectionFieldMatcher
.withField(ShutdownNotificationTask.class, "shardInfo", equalTo(shardInfo1)))));
verify(executorService, never()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
.and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN_NOTIFICATION)).and(ReflectionFieldMatcher
.withField(ShutdownNotificationTask.class, "shardInfo", equalTo(shardInfo2)))));
worker.runProcessLoop();
verify(leaseCoordinator, never()).dropLease(eq(lease1));
verify(leaseCoordinator, never()).dropLease(eq(lease2));
worker.runProcessLoop();
verify(executorService, never()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
.and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN)).and(ReflectionFieldMatcher
.withField(ShutdownTask.class, "shardInfo", equalTo(shardInfo1)))));
verify(executorService, never()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
.and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN)).and(ReflectionFieldMatcher
.withField(ShutdownTask.class, "shardInfo", equalTo(shardInfo2)))));
assertThat(worker.shouldShutdown(), equalTo(true));
}
@Test @Test
public void testLeaseCancelledAfterShutdownRequest() throws Exception { public void testLeaseCancelledAfterShutdownRequest() throws Exception {
@ -919,6 +1194,17 @@ public class WorkerTest {
} }
private KinesisClientLease makeLease(ExtendedSequenceNumber checkpoint, int shardId) {
return new KinesisClientLeaseBuilder().withCheckpoint(checkpoint).withConcurrencyToken(UUID.randomUUID())
.withLastCounterIncrementNanos(0L).withLeaseCounter(0L).withOwnerSwitchesSinceCheckpoint(0L)
.withLeaseOwner("Self").withLeaseKey(String.format("shardId-%03d", shardId)).build();
}
private ShardInfo makeShardInfo(KinesisClientLease lease) {
return new ShardInfo(lease.getLeaseKey(), lease.getConcurrencyToken().toString(), lease.getParentShardIds(),
lease.getCheckpoint());
}
private static class ShutdownReasonMatcher extends TypeSafeDiagnosingMatcher<MetricsCollectingTaskDecorator> { private static class ShutdownReasonMatcher extends TypeSafeDiagnosingMatcher<MetricsCollectingTaskDecorator> {
private final Matcher<ShutdownReason> matcher; private final Matcher<ShutdownReason> matcher;
@ -1012,9 +1298,9 @@ public class WorkerTest {
private static class InnerTaskMatcher<T extends ITask> extends TypeSafeMatcher<MetricsCollectingTaskDecorator> { private static class InnerTaskMatcher<T extends ITask> extends TypeSafeMatcher<MetricsCollectingTaskDecorator> {
final Matcher<T> matcher; final Matcher<?> matcher;
InnerTaskMatcher(Matcher<T> matcher) { InnerTaskMatcher(Matcher<?> matcher) {
this.matcher = matcher; this.matcher = matcher;
} }
@ -1028,10 +1314,60 @@ public class WorkerTest {
matcher.describeTo(description); matcher.describeTo(description);
} }
static <U extends ITask> InnerTaskMatcher<U> taskWith(Class<U> clazz, Matcher<U> matcher) { static <U extends ITask> InnerTaskMatcher<U> taskWith(Class<U> clazz, Matcher<?> matcher) {
return new InnerTaskMatcher<>(matcher); return new InnerTaskMatcher<>(matcher);
} }
} }
@RequiredArgsConstructor
private static class ReflectionFieldMatcher<T extends ITask>
extends TypeSafeDiagnosingMatcher<MetricsCollectingTaskDecorator> {
private final Class<T> itemClass;
private final String fieldName;
private final Matcher<?> fieldMatcher;
@Override
protected boolean matchesSafely(MetricsCollectingTaskDecorator item, Description mismatchDescription) {
if (item.getOther() == null) {
mismatchDescription.appendText("inner task is null");
return false;
}
ITask inner = item.getOther();
if (!itemClass.equals(inner.getClass())) {
mismatchDescription.appendText("inner task isn't an instance of ").appendText(itemClass.getName());
return false;
}
try {
Field field = itemClass.getDeclaredField(fieldName);
field.setAccessible(true);
if (!fieldMatcher.matches(field.get(inner))) {
mismatchDescription.appendText("Field '").appendText(fieldName).appendText("' doesn't match: ")
.appendDescriptionOf(fieldMatcher);
return false;
}
return true;
} catch (NoSuchFieldException e) {
mismatchDescription.appendText(itemClass.getName()).appendText(" doesn't have a field named ")
.appendText(fieldName);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
return false;
}
@Override
public void describeTo(Description description) {
description.appendText("An item of ").appendText(itemClass.getName()).appendText(" with the field '")
.appendText(fieldName).appendText("' matching ").appendDescriptionOf(fieldMatcher);
}
static <T extends ITask> ReflectionFieldMatcher<T> withField(Class<T> itemClass, String fieldName,
Matcher<?> fieldMatcher) {
return new ReflectionFieldMatcher<>(itemClass, fieldName, fieldMatcher);
}
}
/** /**
* Returns executor service that will be owned by the worker. This is useful to test the scenario * Returns executor service that will be owned by the worker. This is useful to test the scenario
* where worker shuts down the executor service also during shutdown flow. * where worker shuts down the executor service also during shutdown flow.