Merge branch 'master' into allow-ignore-inconsistent-child-shards-1
This commit is contained in:
commit
74aa7a5986
20 changed files with 442 additions and 67 deletions
|
|
@ -2,7 +2,7 @@ Manifest-Version: 1.0
|
|||
Bundle-ManifestVersion: 2
|
||||
Bundle-Name: Amazon Kinesis Client Library for Java
|
||||
Bundle-SymbolicName: com.amazonaws.kinesisclientlibrary;singleton:=true
|
||||
Bundle-Version: 1.8.7
|
||||
Bundle-Version: 1.8.8
|
||||
Bundle-Vendor: Amazon Technologies, Inc
|
||||
Bundle-RequiredExecutionEnvironment: JavaSE-1.8
|
||||
Require-Bundle: org.apache.commons.codec;bundle-version="1.6",
|
||||
|
|
|
|||
22
README.md
22
README.md
|
|
@ -15,7 +15,7 @@ The **Amazon Kinesis Client Library for Java** (Amazon KCL) enables Java develop
|
|||
|
||||
1. **Sign up for AWS** — Before you begin, you need an AWS account. For more information about creating an AWS account and retrieving your AWS credentials, see [AWS Account and Credentials][docs-signup] in the AWS SDK for Java Developer Guide.
|
||||
1. **Sign up for Amazon Kinesis** — Go to the Amazon Kinesis console to sign up for the service and create an Amazon Kinesis stream. For more information, see [Create an Amazon Kinesis Stream][kinesis-guide-create] in the Amazon Kinesis Developer Guide.
|
||||
1. **Minimum requirements** — To use the Amazon Kinesis Client Library, you'll need **Java 1.7+**. For more information about Amazon Kinesis Client Library requirements, see [Before You Begin][kinesis-guide-begin] in the Amazon Kinesis Developer Guide.
|
||||
1. **Minimum requirements** — To use the Amazon Kinesis Client Library, you'll need **Java 1.8+**. For more information about Amazon Kinesis Client Library requirements, see [Before You Begin][kinesis-guide-begin] in the Amazon Kinesis Developer Guide.
|
||||
1. **Using the Amazon Kinesis Client Library** — The best way to get familiar with the Amazon Kinesis Client Library is to read [Developing Record Consumer Applications][kinesis-guide-applications] in the Amazon Kinesis Developer Guide.
|
||||
|
||||
## Building from Source
|
||||
|
|
@ -29,6 +29,26 @@ For producer-side developers using the **[Kinesis Producer Library (KPL)][kinesi
|
|||
To make it easier for developers to write record processors in other languages, we have implemented a Java based daemon, called MultiLangDaemon that does all the heavy lifting. Our approach has the daemon spawn a sub-process, which in turn runs the record processor, which can be written in any language. The MultiLangDaemon process and the record processor sub-process communicate with each other over [STDIN and STDOUT using a defined protocol][multi-lang-protocol]. There will be a one to one correspondence amongst record processors, child processes, and shards. For Python developers specifically, we have abstracted these implementation details away and [expose an interface][kclpy] that enables you to focus on writing record processing logic in Python. This approach enables KCL to be language agnostic, while providing identical features and similar parallel processing model across all languages.
|
||||
|
||||
## Release Notes
|
||||
### Release 1.8.8
|
||||
* Fixed issues with leases losses due to `ExpiredIteratorException` in `PrefetchGetRecordsCache` and `AsynchronousFetchingStrategy`.
|
||||
PrefetchGetRecordsCache will request for a new iterator and start fetching data again.
|
||||
* [PR#263](https://github.com/awslabs/amazon-kinesis-client/pull/263)
|
||||
* Added warning message for long running tasks.
|
||||
Logging long running tasks can be enabled by setting the following configuration property:
|
||||
|
||||
| Name | Default | Description |
|
||||
| ---- | ------- | ----------- |
|
||||
| [`logWarningForTaskAfterMillis`](https://github.com/awslabs/amazon-kinesis-client/blob/3de901ea9327370ed732af86c4d4999c8d99541c/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java#L1367) | Not set | Milliseconds after which the logger will log a warning message for the long running task |
|
||||
|
||||
* [PR#259](https://github.com/awslabs/amazon-kinesis-client/pull/259)
|
||||
* Handling spurious lease renewal failures gracefully.
|
||||
Added better handling of DynamoDB failures when updating leases. These failures would occur when a request to DynamoDB appeared to fail, but was actually successful.
|
||||
* [PR#247](https://github.com/awslabs/amazon-kinesis-client/pull/247)
|
||||
* ShutdownTask gets retried if the previous attempt on the ShutdownTask fails.
|
||||
* [PR#267](https://github.com/awslabs/amazon-kinesis-client/pull/267)
|
||||
* Fix for using maxRecords from `KinesisClientLibConfiguration` in `GetRecordsCache` for fetching records.
|
||||
* [PR#264](https://github.com/awslabs/amazon-kinesis-client/pull/264)
|
||||
|
||||
### Release 1.8.7
|
||||
* Don't add a delay for synchronous requests to Kinesis
|
||||
Removes a delay that had been added for synchronous `GetRecords` calls to Kinesis.
|
||||
|
|
|
|||
2
pom.xml
2
pom.xml
|
|
@ -6,7 +6,7 @@
|
|||
<artifactId>amazon-kinesis-client</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
<name>Amazon Kinesis Client Library for Java</name>
|
||||
<version>1.8.8-SNAPSHOT</version>
|
||||
<version>1.8.8</version>
|
||||
<description>The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data
|
||||
from Amazon Kinesis.
|
||||
</description>
|
||||
|
|
|
|||
|
|
@ -16,7 +16,6 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
|||
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.CompletionService;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
|
@ -31,6 +30,7 @@ import java.util.function.Supplier;
|
|||
|
||||
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
|
||||
import com.amazonaws.services.kinesis.metrics.impl.ThreadSafeMetricsDelegatingScope;
|
||||
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
|
||||
import com.amazonaws.services.kinesis.model.GetRecordsResult;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
|
||||
|
|
@ -81,33 +81,39 @@ public class AsynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrie
|
|||
CompletionService<DataFetcherResult> completionService = completionServiceSupplier.get();
|
||||
Set<Future<DataFetcherResult>> futures = new HashSet<>();
|
||||
Callable<DataFetcherResult> retrieverCall = createRetrieverCallable(maxRecords);
|
||||
while (true) {
|
||||
try {
|
||||
futures.add(completionService.submit(retrieverCall));
|
||||
} catch (RejectedExecutionException e) {
|
||||
log.warn("Out of resources, unable to start additional requests.");
|
||||
}
|
||||
try {
|
||||
while (true) {
|
||||
try {
|
||||
futures.add(completionService.submit(retrieverCall));
|
||||
} catch (RejectedExecutionException e) {
|
||||
log.warn("Out of resources, unable to start additional requests.");
|
||||
}
|
||||
|
||||
try {
|
||||
Future<DataFetcherResult> resultFuture = completionService.poll(retryGetRecordsInSeconds,
|
||||
TimeUnit.SECONDS);
|
||||
if (resultFuture != null) {
|
||||
//
|
||||
// Fix to ensure that we only let the shard iterator advance when we intend to return the result
|
||||
// to the caller. This ensures that the shard iterator is consistently advance in step with
|
||||
// what the caller sees.
|
||||
//
|
||||
result = resultFuture.get().accept();
|
||||
try {
|
||||
Future<DataFetcherResult> resultFuture = completionService.poll(retryGetRecordsInSeconds,
|
||||
TimeUnit.SECONDS);
|
||||
if (resultFuture != null) {
|
||||
//
|
||||
// Fix to ensure that we only let the shard iterator advance when we intend to return the result
|
||||
// to the caller. This ensures that the shard iterator is consistently advance in step with
|
||||
// what the caller sees.
|
||||
//
|
||||
result = resultFuture.get().accept();
|
||||
break;
|
||||
}
|
||||
} catch (ExecutionException e) {
|
||||
if (e.getCause() instanceof ExpiredIteratorException) {
|
||||
throw (ExpiredIteratorException) e.getCause();
|
||||
}
|
||||
log.error("ExecutionException thrown while trying to get records", e);
|
||||
} catch (InterruptedException e) {
|
||||
log.error("Thread was interrupted", e);
|
||||
break;
|
||||
}
|
||||
} catch (ExecutionException e) {
|
||||
log.error("ExecutionException thrown while trying to get records", e);
|
||||
} catch (InterruptedException e) {
|
||||
log.error("Thread was interrupted", e);
|
||||
break;
|
||||
}
|
||||
} finally {
|
||||
futures.forEach(f -> f.cancel(true));
|
||||
}
|
||||
futures.forEach(f -> f.cancel(true));
|
||||
return result;
|
||||
}
|
||||
|
||||
|
|
@ -140,4 +146,9 @@ public class AsynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrie
|
|||
new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadNameFormat).build(),
|
||||
new ThreadPoolExecutor.AbortPolicy());
|
||||
}
|
||||
|
||||
@Override
|
||||
public KinesisDataFetcher getDataFetcher() {
|
||||
return dataFetcher;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -44,4 +44,11 @@ public interface GetRecordsRetrievalStrategy {
|
|||
* @return true if the strategy has been shutdown, false otherwise.
|
||||
*/
|
||||
boolean isShutdown();
|
||||
|
||||
/**
|
||||
* Returns the KinesisDataFetcher used to getRecords from Kinesis.
|
||||
*
|
||||
* @return KinesisDataFetcher
|
||||
*/
|
||||
KinesisDataFetcher getDataFetcher();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -131,7 +131,7 @@ public class KinesisClientLibConfiguration {
|
|||
/**
|
||||
* User agent set when Amazon Kinesis Client Library makes AWS requests.
|
||||
*/
|
||||
public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.8.7";
|
||||
public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.8.8";
|
||||
|
||||
/**
|
||||
* KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls
|
||||
|
|
@ -489,7 +489,7 @@ public class KinesisClientLibConfiguration {
|
|||
InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream);
|
||||
this.skipShardSyncAtWorkerInitializationIfLeasesExist = DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST;
|
||||
this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION;
|
||||
this.recordsFetcherFactory = new SimpleRecordsFetcherFactory(this.maxRecords);
|
||||
this.recordsFetcherFactory = new SimpleRecordsFetcherFactory();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
|||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
|
|
@ -27,6 +28,8 @@ import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber
|
|||
import com.amazonaws.services.kinesis.model.GetRecordsResult;
|
||||
import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
|
||||
import com.amazonaws.services.kinesis.model.ShardIteratorType;
|
||||
import com.amazonaws.util.CollectionUtils;
|
||||
import com.google.common.collect.Iterables;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
|
|
@ -42,6 +45,8 @@ class KinesisDataFetcher {
|
|||
private final String shardId;
|
||||
private boolean isShardEndReached;
|
||||
private boolean isInitialized;
|
||||
private String lastKnownSequenceNumber;
|
||||
private InitialPositionInStreamExtended initialPositionInStream;
|
||||
|
||||
/**
|
||||
*
|
||||
|
|
@ -108,6 +113,9 @@ class KinesisDataFetcher {
|
|||
@Override
|
||||
public GetRecordsResult accept() {
|
||||
nextIterator = result.getNextShardIterator();
|
||||
if (!CollectionUtils.isNullOrEmpty(result.getRecords())) {
|
||||
lastKnownSequenceNumber = Iterables.getLast(result.getRecords()).getSequenceNumber();
|
||||
}
|
||||
if (nextIterator == null) {
|
||||
isShardEndReached = true;
|
||||
}
|
||||
|
|
@ -161,6 +169,8 @@ class KinesisDataFetcher {
|
|||
if (nextIterator == null) {
|
||||
isShardEndReached = true;
|
||||
}
|
||||
this.lastKnownSequenceNumber = sequenceNumber;
|
||||
this.initialPositionInStream = initialPositionInStream;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -217,6 +227,17 @@ class KinesisDataFetcher {
|
|||
return iterator;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets a new iterator from the last known sequence number i.e. the sequence number of the last record from the last
|
||||
* getRecords call.
|
||||
*/
|
||||
public void restartIterator() {
|
||||
if (StringUtils.isEmpty(lastKnownSequenceNumber) || initialPositionInStream == null) {
|
||||
throw new IllegalStateException("Make sure to initialize the KinesisDataFetcher before restarting the iterator.");
|
||||
}
|
||||
advanceIteratorTo(lastKnownSequenceNumber, initialPositionInStream);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the shardEndReached
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -23,10 +23,13 @@ import java.util.concurrent.LinkedBlockingQueue;
|
|||
import org.apache.commons.lang.Validate;
|
||||
|
||||
import com.amazonaws.SdkClientException;
|
||||
import com.amazonaws.services.cloudwatch.model.StandardUnit;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
|
||||
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
|
||||
import com.amazonaws.services.kinesis.metrics.impl.ThreadSafeMetricsDelegatingFactory;
|
||||
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
|
||||
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
|
||||
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
|
||||
import com.amazonaws.services.kinesis.model.GetRecordsResult;
|
||||
|
||||
import lombok.NonNull;
|
||||
|
|
@ -42,6 +45,7 @@ import lombok.extern.apachecommons.CommonsLog;
|
|||
*/
|
||||
@CommonsLog
|
||||
public class PrefetchGetRecordsCache implements GetRecordsCache {
|
||||
private static final String EXPIRED_ITERATOR_METRIC = "ExpiredIterator";
|
||||
LinkedBlockingQueue<ProcessRecordsInput> getRecordsResultQueue;
|
||||
private int maxPendingProcessRecordsInput;
|
||||
private int maxByteSize;
|
||||
|
|
@ -56,6 +60,8 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
|
|||
private PrefetchCounters prefetchCounters;
|
||||
private boolean started = false;
|
||||
private final String operation;
|
||||
private final KinesisDataFetcher dataFetcher;
|
||||
private final String shardId;
|
||||
|
||||
/**
|
||||
* Constructor for the PrefetchGetRecordsCache. This cache prefetches records from Kinesis and stores them in a
|
||||
|
|
@ -76,9 +82,10 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
|
|||
final int maxRecordsPerCall,
|
||||
@NonNull final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy,
|
||||
@NonNull final ExecutorService executorService,
|
||||
long idleMillisBetweenCalls,
|
||||
final long idleMillisBetweenCalls,
|
||||
@NonNull final IMetricsFactory metricsFactory,
|
||||
@NonNull String operation) {
|
||||
@NonNull final String operation,
|
||||
@NonNull final String shardId) {
|
||||
this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy;
|
||||
this.maxRecordsPerCall = maxRecordsPerCall;
|
||||
this.maxPendingProcessRecordsInput = maxPendingProcessRecordsInput;
|
||||
|
|
@ -92,6 +99,8 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
|
|||
this.defaultGetRecordsCacheDaemon = new DefaultGetRecordsCacheDaemon();
|
||||
Validate.notEmpty(operation, "Operation cannot be empty");
|
||||
this.operation = operation;
|
||||
this.dataFetcher = this.getRecordsRetrievalStrategy.getDataFetcher();
|
||||
this.shardId = shardId;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -162,6 +171,14 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
|
|||
prefetchCounters.added(processRecordsInput);
|
||||
} catch (InterruptedException e) {
|
||||
log.info("Thread was interrupted, indicating shutdown was called on the cache.");
|
||||
} catch (ExpiredIteratorException e) {
|
||||
log.info(String.format("ShardId %s: getRecords threw ExpiredIteratorException - restarting"
|
||||
+ " after greatest seqNum passed to customer", shardId), e);
|
||||
|
||||
MetricsHelper.getMetricsScope().addData(EXPIRED_ITERATOR_METRIC, 1, StandardUnit.Count,
|
||||
MetricsLevel.SUMMARY);
|
||||
|
||||
dataFetcher.restartIterator();
|
||||
} catch (SdkClientException e) {
|
||||
log.error("Exception thrown while fetching records from Kinesis", e);
|
||||
} catch (Throwable e) {
|
||||
|
|
|
|||
|
|
@ -26,10 +26,12 @@ public interface RecordsFetcherFactory {
|
|||
* @param getRecordsRetrievalStrategy GetRecordsRetrievalStrategy to be used with the GetRecordsCache
|
||||
* @param shardId ShardId of the shard that the fetcher will retrieve records for
|
||||
* @param metricsFactory MetricsFactory used to create metricScope
|
||||
* @param maxRecords Max number of records to be returned in a single get call
|
||||
*
|
||||
* @return GetRecordsCache used to get records from Kinesis.
|
||||
*/
|
||||
GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, String shardId, IMetricsFactory metricsFactory);
|
||||
GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, String shardId,
|
||||
IMetricsFactory metricsFactory, int maxRecords);
|
||||
|
||||
/**
|
||||
* Sets the maximum number of ProcessRecordsInput objects the GetRecordsCache can hold, before further requests are
|
||||
|
|
|
|||
|
|
@ -235,7 +235,7 @@ class ShardConsumer {
|
|||
this.dataFetcher = kinesisDataFetcher;
|
||||
this.getRecordsCache = config.getRecordsFetcherFactory().createRecordsFetcher(
|
||||
makeStrategy(this.dataFetcher, retryGetRecordsInSeconds, maxGetRecordsThreadPool, this.shardInfo),
|
||||
this.getShardInfo().getShardId(), this.metricsFactory);
|
||||
this.getShardInfo().getShardId(), this.metricsFactory, this.config.getMaxRecords());
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -412,7 +412,7 @@ class ShardConsumer {
|
|||
if (taskOutcome == TaskOutcome.END_OF_SHARD) {
|
||||
markForShutdown(ShutdownReason.TERMINATE);
|
||||
}
|
||||
if (isShutdownRequested()) {
|
||||
if (isShutdownRequested() && taskOutcome != TaskOutcome.FAILURE) {
|
||||
currentState = currentState.shutdownTransition(shutdownReason);
|
||||
} else if (taskOutcome == TaskOutcome.SUCCESSFUL) {
|
||||
if (currentState.getTaskType() == currentTask.getTaskType()) {
|
||||
|
|
|
|||
|
|
@ -18,23 +18,20 @@ import java.util.concurrent.Executors;
|
|||
|
||||
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
|
||||
import lombok.extern.apachecommons.CommonsLog;
|
||||
|
||||
@CommonsLog
|
||||
public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory {
|
||||
private final int maxRecords;
|
||||
private int maxPendingProcessRecordsInput = 3;
|
||||
private int maxByteSize = 8 * 1024 * 1024;
|
||||
private int maxRecordsCount = 30000;
|
||||
private long idleMillisBetweenCalls = 1500L;
|
||||
private DataFetchingStrategy dataFetchingStrategy = DataFetchingStrategy.DEFAULT;
|
||||
|
||||
public SimpleRecordsFetcherFactory(int maxRecords) {
|
||||
this.maxRecords = maxRecords;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, String shardId, IMetricsFactory metricsFactory) {
|
||||
public GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, String shardId,
|
||||
IMetricsFactory metricsFactory, int maxRecords) {
|
||||
if(dataFetchingStrategy.equals(DataFetchingStrategy.DEFAULT)) {
|
||||
return new BlockingGetRecordsCache(maxRecords, getRecordsRetrievalStrategy);
|
||||
} else {
|
||||
|
|
@ -46,7 +43,8 @@ public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory {
|
|||
.build()),
|
||||
idleMillisBetweenCalls,
|
||||
metricsFactory,
|
||||
"ProcessTask");
|
||||
"ProcessTask",
|
||||
shardId);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -42,4 +42,9 @@ public class SynchronousGetRecordsRetrievalStrategy implements GetRecordsRetriev
|
|||
public boolean isShutdown() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public KinesisDataFetcher getDataFetcher() {
|
||||
return dataFetcher;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -38,17 +38,21 @@ import java.util.concurrent.ThreadPoolExecutor;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.runners.MockitoJUnitRunner;
|
||||
|
||||
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
|
||||
import com.amazonaws.services.kinesis.model.GetRecordsResult;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest {
|
||||
|
|
@ -133,6 +137,24 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest {
|
|||
verify(mockFuture).get();
|
||||
assertNull(getRecordsResult);
|
||||
}
|
||||
|
||||
@Test (expected = ExpiredIteratorException.class)
|
||||
public void testExpiredIteratorExcpetion() throws InterruptedException {
|
||||
when(dataFetcher.getRecords(eq(numberOfRecords))).thenAnswer(new Answer<DataFetcherResult>() {
|
||||
@Override
|
||||
public DataFetcherResult answer(final InvocationOnMock invocationOnMock) throws Throwable {
|
||||
Thread.sleep(SLEEP_GET_RECORDS_IN_SECONDS * 1000);
|
||||
throw new ExpiredIteratorException("ExpiredIterator");
|
||||
}
|
||||
});
|
||||
|
||||
try {
|
||||
getRecordsRetrivalStrategy.getRecords(numberOfRecords);
|
||||
} finally {
|
||||
verify(dataFetcher, atLeast(getLeastNumberOfCalls())).getRecords(eq(numberOfRecords));
|
||||
verify(executorService, atLeast(getLeastNumberOfCalls())).execute(any());
|
||||
}
|
||||
}
|
||||
|
||||
private int getLeastNumberOfCalls() {
|
||||
int leastNumberOfCalls = 0;
|
||||
|
|
@ -163,6 +185,7 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest {
|
|||
} catch (InterruptedException e) {
|
||||
// Do nothing
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -20,20 +20,25 @@ import static org.mockito.Matchers.any;
|
|||
import static org.mockito.Matchers.anyBoolean;
|
||||
import static org.mockito.Matchers.anyLong;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.util.concurrent.CompletionService;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.runners.MockitoJUnitRunner;
|
||||
|
|
@ -153,5 +158,27 @@ public class AsynchronousGetRecordsRetrievalStrategyTest {
|
|||
|
||||
assertThat(actualResult, equalTo(expectedResults));
|
||||
}
|
||||
|
||||
@Test (expected = ExpiredIteratorException.class)
|
||||
public void testExpiredIteratorExceptionCase() throws Exception {
|
||||
AsynchronousGetRecordsRetrievalStrategy strategy = new AsynchronousGetRecordsRetrievalStrategy(dataFetcher,
|
||||
executorService, (int) RETRY_GET_RECORDS_IN_SECONDS, completionServiceSupplier, SHARD_ID);
|
||||
Future<DataFetcherResult> successfulFuture2 = mock(Future.class);
|
||||
|
||||
when(executorService.isShutdown()).thenReturn(false);
|
||||
when(completionService.submit(any())).thenReturn(successfulFuture, successfulFuture2);
|
||||
when(completionService.poll(anyLong(), any())).thenReturn(null).thenReturn(successfulFuture);
|
||||
when(successfulFuture.get()).thenThrow(new ExecutionException(new ExpiredIteratorException("ExpiredException")));
|
||||
|
||||
try {
|
||||
strategy.getRecords(10);
|
||||
} finally {
|
||||
verify(executorService).isShutdown();
|
||||
verify(completionService, times(2)).submit(any());
|
||||
verify(completionService, times(2)).poll(eq(RETRY_GET_RECORDS_IN_SECONDS), eq(TimeUnit.SECONDS));
|
||||
verify(successfulFuture).cancel(eq(true));
|
||||
verify(successfulFuture2).cancel(eq(true));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -32,6 +32,7 @@ import static org.mockito.Mockito.verify;
|
|||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
||||
|
|
@ -273,6 +274,45 @@ public class KinesisDataFetcherTest {
|
|||
|
||||
verify(kinesisProxy, never()).get(anyString(), anyInt());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRestartIterator() {
|
||||
GetRecordsResult getRecordsResult = mock(GetRecordsResult.class);
|
||||
GetRecordsResult restartGetRecordsResult = new GetRecordsResult();
|
||||
Record record = mock(Record.class);
|
||||
final String initialIterator = "InitialIterator";
|
||||
final String nextShardIterator = "NextShardIterator";
|
||||
final String restartShardIterator = "RestartIterator";
|
||||
final String sequenceNumber = "SequenceNumber";
|
||||
final String iteratorType = "AT_SEQUENCE_NUMBER";
|
||||
KinesisProxy kinesisProxy = mock(KinesisProxy.class);
|
||||
KinesisDataFetcher fetcher = new KinesisDataFetcher(kinesisProxy, SHARD_INFO);
|
||||
|
||||
when(kinesisProxy.getIterator(eq(SHARD_ID), eq(InitialPositionInStream.LATEST.toString()))).thenReturn(initialIterator);
|
||||
when(kinesisProxy.get(eq(initialIterator), eq(10))).thenReturn(getRecordsResult);
|
||||
when(getRecordsResult.getRecords()).thenReturn(Collections.singletonList(record));
|
||||
when(getRecordsResult.getNextShardIterator()).thenReturn(nextShardIterator);
|
||||
when(record.getSequenceNumber()).thenReturn(sequenceNumber);
|
||||
|
||||
fetcher.initialize(InitialPositionInStream.LATEST.toString(), INITIAL_POSITION_LATEST);
|
||||
verify(kinesisProxy).getIterator(eq(SHARD_ID), eq(InitialPositionInStream.LATEST.toString()));
|
||||
Assert.assertEquals(getRecordsResult, fetcher.getRecords(10).accept());
|
||||
verify(kinesisProxy).get(eq(initialIterator), eq(10));
|
||||
|
||||
when(kinesisProxy.getIterator(eq(SHARD_ID), eq(iteratorType), eq(sequenceNumber))).thenReturn(restartShardIterator);
|
||||
when(kinesisProxy.get(eq(restartShardIterator), eq(10))).thenReturn(restartGetRecordsResult);
|
||||
|
||||
fetcher.restartIterator();
|
||||
Assert.assertEquals(restartGetRecordsResult, fetcher.getRecords(10).accept());
|
||||
verify(kinesisProxy).getIterator(eq(SHARD_ID), eq(iteratorType), eq(sequenceNumber));
|
||||
verify(kinesisProxy).get(eq(restartShardIterator), eq(10));
|
||||
}
|
||||
|
||||
@Test (expected = IllegalStateException.class)
|
||||
public void testRestartIteratorNotInitialized() {
|
||||
KinesisDataFetcher dataFetcher = new KinesisDataFetcher(kinesisProxy, SHARD_INFO);
|
||||
dataFetcher.restartIterator();
|
||||
}
|
||||
|
||||
private DataFetcherResult assertAdvanced(KinesisDataFetcher dataFetcher, GetRecordsResult expectedResult,
|
||||
String previousValue, String nextValue) {
|
||||
|
|
|
|||
|
|
@ -20,6 +20,8 @@ import static org.junit.Assert.assertFalse;
|
|||
import static org.junit.Assert.assertNotEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Mockito.doNothing;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
|
@ -31,19 +33,19 @@ import java.util.List;
|
|||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
|
||||
import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory;
|
||||
import com.amazonaws.services.kinesis.model.Record;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.runners.MockitoJUnitRunner;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
|
||||
import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory;
|
||||
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
|
||||
import com.amazonaws.services.kinesis.model.GetRecordsResult;
|
||||
import com.amazonaws.services.kinesis.model.Record;
|
||||
|
||||
|
|
@ -70,14 +72,13 @@ public class PrefetchGetRecordsCacheIntegrationTest {
|
|||
|
||||
@Mock
|
||||
private IKinesisProxy proxy;
|
||||
|
||||
@Mock
|
||||
private ShardInfo shardInfo;
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
records = new ArrayList<>();
|
||||
dataFetcher = new KinesisDataFetcherForTest(proxy, shardInfo);
|
||||
dataFetcher = spy(new KinesisDataFetcherForTest(proxy, shardInfo));
|
||||
getRecordsRetrievalStrategy = spy(new SynchronousGetRecordsRetrievalStrategy(dataFetcher));
|
||||
executorService = spy(Executors.newFixedThreadPool(1));
|
||||
|
||||
|
|
@ -89,7 +90,8 @@ public class PrefetchGetRecordsCacheIntegrationTest {
|
|||
executorService,
|
||||
IDLE_MILLIS_BETWEEN_CALLS,
|
||||
new NullMetricsFactory(),
|
||||
operation);
|
||||
operation,
|
||||
"test-shard");
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -135,7 +137,8 @@ public class PrefetchGetRecordsCacheIntegrationTest {
|
|||
executorService2,
|
||||
IDLE_MILLIS_BETWEEN_CALLS,
|
||||
new NullMetricsFactory(),
|
||||
operation);
|
||||
operation,
|
||||
"test-shard-2");
|
||||
|
||||
getRecordsCache.start();
|
||||
sleep(IDLE_MILLIS_BETWEEN_CALLS);
|
||||
|
|
@ -167,6 +170,26 @@ public class PrefetchGetRecordsCacheIntegrationTest {
|
|||
verify(getRecordsRetrievalStrategy2).shutdown();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExpiredIteratorException() {
|
||||
when(dataFetcher.getRecords(eq(MAX_RECORDS_PER_CALL))).thenAnswer(new Answer<DataFetcherResult>() {
|
||||
@Override
|
||||
public DataFetcherResult answer(final InvocationOnMock invocationOnMock) throws Throwable {
|
||||
throw new ExpiredIteratorException("ExpiredIterator");
|
||||
}
|
||||
}).thenCallRealMethod();
|
||||
doNothing().when(dataFetcher).restartIterator();
|
||||
|
||||
getRecordsCache.start();
|
||||
sleep(IDLE_MILLIS_BETWEEN_CALLS);
|
||||
|
||||
ProcessRecordsInput processRecordsInput = getRecordsCache.getNextResult();
|
||||
|
||||
assertNotNull(processRecordsInput);
|
||||
assertTrue(processRecordsInput.getRecords().isEmpty());
|
||||
verify(dataFetcher).restartIterator();
|
||||
}
|
||||
|
||||
@After
|
||||
public void shutdown() {
|
||||
getRecordsCache.shutdown();
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue;
|
|||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Mockito.atLeast;
|
||||
import static org.mockito.Mockito.doNothing;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
|
@ -36,7 +37,6 @@ import java.util.concurrent.Executors;
|
|||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
|
@ -45,6 +45,8 @@ import org.mockito.Mock;
|
|||
import org.mockito.runners.MockitoJUnitRunner;
|
||||
|
||||
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
|
||||
import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory;
|
||||
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
|
||||
import com.amazonaws.services.kinesis.model.GetRecordsResult;
|
||||
import com.amazonaws.services.kinesis.model.Record;
|
||||
|
||||
|
|
@ -66,6 +68,8 @@ public class PrefetchGetRecordsCacheTest {
|
|||
private GetRecordsResult getRecordsResult;
|
||||
@Mock
|
||||
private Record record;
|
||||
@Mock
|
||||
private KinesisDataFetcher dataFetcher;
|
||||
|
||||
private List<Record> records;
|
||||
private ExecutorService executorService;
|
||||
|
|
@ -75,6 +79,8 @@ public class PrefetchGetRecordsCacheTest {
|
|||
|
||||
@Before
|
||||
public void setup() {
|
||||
when(getRecordsRetrievalStrategy.getDataFetcher()).thenReturn(dataFetcher);
|
||||
|
||||
executorService = spy(Executors.newFixedThreadPool(1));
|
||||
getRecordsCache = new PrefetchGetRecordsCache(
|
||||
MAX_SIZE,
|
||||
|
|
@ -85,7 +91,8 @@ public class PrefetchGetRecordsCacheTest {
|
|||
executorService,
|
||||
IDLE_MILLIS_BETWEEN_CALLS,
|
||||
new NullMetricsFactory(),
|
||||
operation);
|
||||
operation,
|
||||
"shardId");
|
||||
spyQueue = spy(getRecordsCache.getRecordsResultQueue);
|
||||
records = spy(new ArrayList<>());
|
||||
|
||||
|
|
@ -194,6 +201,20 @@ public class PrefetchGetRecordsCacheTest {
|
|||
when(executorService.isShutdown()).thenReturn(true);
|
||||
getRecordsCache.getNextResult();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExpiredIteratorException() {
|
||||
getRecordsCache.start();
|
||||
|
||||
when(getRecordsRetrievalStrategy.getRecords(MAX_RECORDS_PER_CALL)).thenThrow(ExpiredIteratorException.class).thenReturn(getRecordsResult);
|
||||
doNothing().when(dataFetcher).restartIterator();
|
||||
|
||||
getRecordsCache.getNextResult();
|
||||
|
||||
sleep(1000);
|
||||
|
||||
verify(dataFetcher).restartIterator();
|
||||
}
|
||||
|
||||
@After
|
||||
public void shutdown() {
|
||||
|
|
|
|||
|
|
@ -1,7 +1,5 @@
|
|||
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||
|
||||
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.instanceOf;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
|
||||
|
|
@ -10,26 +8,27 @@ import org.junit.Test;
|
|||
import org.mockito.Mock;
|
||||
import org.mockito.MockitoAnnotations;
|
||||
|
||||
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
|
||||
|
||||
public class RecordsFetcherFactoryTest {
|
||||
private String shardId = "TestShard";
|
||||
private RecordsFetcherFactory recordsFetcherFactory;
|
||||
|
||||
@Mock
|
||||
private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
|
||||
|
||||
@Mock
|
||||
private IMetricsFactory metricsFactory;
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
MockitoAnnotations.initMocks(this);
|
||||
recordsFetcherFactory = new SimpleRecordsFetcherFactory(1);
|
||||
recordsFetcherFactory = new SimpleRecordsFetcherFactory();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void createDefaultRecordsFetcherTest() {
|
||||
GetRecordsCache recordsCache = recordsFetcherFactory.createRecordsFetcher(getRecordsRetrievalStrategy, shardId,
|
||||
metricsFactory);
|
||||
metricsFactory, 1);
|
||||
assertThat(recordsCache, instanceOf(BlockingGetRecordsCache.class));
|
||||
}
|
||||
|
||||
|
|
@ -37,7 +36,7 @@ public class RecordsFetcherFactoryTest {
|
|||
public void createPrefetchRecordsFetcherTest() {
|
||||
recordsFetcherFactory.setDataFetchingStrategy(DataFetchingStrategy.PREFETCH_CACHED);
|
||||
GetRecordsCache recordsCache = recordsFetcherFactory.createRecordsFetcher(getRecordsRetrievalStrategy, shardId,
|
||||
metricsFactory);
|
||||
metricsFactory, 1);
|
||||
assertThat(recordsCache, instanceOf(PrefetchGetRecordsCache.class));
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
|
|||
import static org.junit.Assert.assertThat;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyInt;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
import static org.mockito.Matchers.argThat;
|
||||
import static org.mockito.Mockito.atLeastOnce;
|
||||
|
|
@ -41,6 +42,7 @@ import java.util.List;
|
|||
import java.util.ListIterator;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
|
@ -68,12 +70,14 @@ import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisLocalFileProx
|
|||
import com.amazonaws.services.kinesis.clientlibrary.proxies.util.KinesisLocalFileDataCreator;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
|
||||
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
|
||||
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
|
||||
import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory;
|
||||
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
|
||||
import com.amazonaws.services.kinesis.model.Record;
|
||||
import com.amazonaws.services.kinesis.model.Shard;
|
||||
import com.amazonaws.services.kinesis.model.ShardIteratorType;
|
||||
|
||||
/**
|
||||
|
|
@ -97,11 +101,12 @@ public class ShardConsumerTest {
|
|||
// Use Executors.newFixedThreadPool since it returns ThreadPoolExecutor, which is
|
||||
// ... a non-final public class, and so can be mocked and spied.
|
||||
private final ExecutorService executorService = Executors.newFixedThreadPool(1);
|
||||
private final int maxRecords = 500;
|
||||
private RecordsFetcherFactory recordsFetcherFactory;
|
||||
|
||||
private GetRecordsCache getRecordsCache;
|
||||
|
||||
private KinesisDataFetcher dataFetcher;
|
||||
|
||||
@Mock
|
||||
private IRecordProcessor processor;
|
||||
@Mock
|
||||
|
|
@ -118,8 +123,9 @@ public class ShardConsumerTest {
|
|||
@Before
|
||||
public void setup() {
|
||||
getRecordsCache = null;
|
||||
dataFetcher = null;
|
||||
|
||||
recordsFetcherFactory = spy(new SimpleRecordsFetcherFactory(maxRecords));
|
||||
recordsFetcherFactory = spy(new SimpleRecordsFetcherFactory());
|
||||
when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory);
|
||||
when(config.getLogWarningForTaskAfterMillis()).thenReturn(Optional.empty());
|
||||
}
|
||||
|
|
@ -339,11 +345,13 @@ public class ShardConsumerTest {
|
|||
)
|
||||
);
|
||||
|
||||
KinesisDataFetcher dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo);
|
||||
dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo);
|
||||
|
||||
getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords,
|
||||
new SynchronousGetRecordsRetrievalStrategy(dataFetcher)));
|
||||
when(recordsFetcherFactory.createRecordsFetcher(any(), anyString(),any())).thenReturn(getRecordsCache);
|
||||
when(recordsFetcherFactory.createRecordsFetcher(any(GetRecordsRetrievalStrategy.class), anyString(),
|
||||
any(IMetricsFactory.class), anyInt()))
|
||||
.thenReturn(getRecordsCache);
|
||||
|
||||
ShardConsumer consumer =
|
||||
new ShardConsumer(shardInfo,
|
||||
|
|
@ -420,6 +428,154 @@ public class ShardConsumerTest {
|
|||
file.delete();
|
||||
}
|
||||
|
||||
private static final class TransientShutdownErrorTestStreamlet extends TestStreamlet {
|
||||
private final CountDownLatch errorShutdownLatch = new CountDownLatch(1);
|
||||
|
||||
@Override
|
||||
public void shutdown(ShutdownInput input) {
|
||||
ShutdownReason reason = input.getShutdownReason();
|
||||
if (reason.equals(ShutdownReason.TERMINATE) && errorShutdownLatch.getCount() > 0) {
|
||||
errorShutdownLatch.countDown();
|
||||
throw new RuntimeException("test");
|
||||
} else {
|
||||
super.shutdown(input);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test method for {@link ShardConsumer#consumeShard()} that ensures a transient error thrown from the record
|
||||
* processor's shutdown method with reason terminate will be retried.
|
||||
*/
|
||||
@Test
|
||||
public final void testConsumeShardWithTransientTerminateError() throws Exception {
|
||||
int numRecs = 10;
|
||||
BigInteger startSeqNum = BigInteger.ONE;
|
||||
String streamShardId = "kinesis-0-0";
|
||||
String testConcurrencyToken = "testToken";
|
||||
List<Shard> shardList = KinesisLocalFileDataCreator.createShardList(1, "kinesis-0-", startSeqNum);
|
||||
// Close the shard so that shutdown is called with reason terminate
|
||||
shardList.get(0).getSequenceNumberRange().setEndingSequenceNumber(
|
||||
KinesisLocalFileProxy.MAX_SEQUENCE_NUMBER.subtract(BigInteger.ONE).toString());
|
||||
File file = KinesisLocalFileDataCreator.generateTempDataFile(shardList, numRecs, "unitTestSCT002");
|
||||
|
||||
IKinesisProxy fileBasedProxy = new KinesisLocalFileProxy(file.getAbsolutePath());
|
||||
|
||||
final int maxRecords = 2;
|
||||
final int idleTimeMS = 0; // keep unit tests fast
|
||||
ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString());
|
||||
checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.TRIM_HORIZON, testConcurrencyToken);
|
||||
when(leaseManager.getLease(anyString())).thenReturn(null);
|
||||
|
||||
TransientShutdownErrorTestStreamlet processor = new TransientShutdownErrorTestStreamlet();
|
||||
|
||||
StreamConfig streamConfig =
|
||||
new StreamConfig(fileBasedProxy,
|
||||
maxRecords,
|
||||
idleTimeMS,
|
||||
callProcessRecordsForEmptyRecordList,
|
||||
skipCheckpointValidationValue, INITIAL_POSITION_LATEST);
|
||||
|
||||
ShardInfo shardInfo = new ShardInfo(streamShardId, testConcurrencyToken, null, null);
|
||||
|
||||
dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo);
|
||||
|
||||
getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords,
|
||||
new SynchronousGetRecordsRetrievalStrategy(dataFetcher)));
|
||||
when(recordsFetcherFactory.createRecordsFetcher(any(GetRecordsRetrievalStrategy.class), anyString(),
|
||||
any(IMetricsFactory.class), anyInt()))
|
||||
.thenReturn(getRecordsCache);
|
||||
|
||||
RecordProcessorCheckpointer recordProcessorCheckpointer = new RecordProcessorCheckpointer(
|
||||
shardInfo,
|
||||
checkpoint,
|
||||
new SequenceNumberValidator(
|
||||
streamConfig.getStreamProxy(),
|
||||
shardInfo.getShardId(),
|
||||
streamConfig.shouldValidateSequenceNumberBeforeCheckpointing()
|
||||
)
|
||||
);
|
||||
|
||||
ShardConsumer consumer =
|
||||
new ShardConsumer(shardInfo,
|
||||
streamConfig,
|
||||
checkpoint,
|
||||
processor,
|
||||
recordProcessorCheckpointer,
|
||||
leaseManager,
|
||||
parentShardPollIntervalMillis,
|
||||
cleanupLeasesOfCompletedShards,
|
||||
executorService,
|
||||
metricsFactory,
|
||||
taskBackoffTimeMillis,
|
||||
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
|
||||
dataFetcher,
|
||||
Optional.empty(),
|
||||
Optional.empty(),
|
||||
config);
|
||||
|
||||
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
|
||||
consumer.consumeShard(); // check on parent shards
|
||||
Thread.sleep(50L);
|
||||
consumer.consumeShard(); // start initialization
|
||||
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING)));
|
||||
consumer.consumeShard(); // initialize
|
||||
processor.getInitializeLatch().await(5, TimeUnit.SECONDS);
|
||||
verify(getRecordsCache).start();
|
||||
|
||||
// We expect to process all records in numRecs calls
|
||||
for (int i = 0; i < numRecs;) {
|
||||
boolean newTaskSubmitted = consumer.consumeShard();
|
||||
if (newTaskSubmitted) {
|
||||
LOG.debug("New processing task was submitted, call # " + i);
|
||||
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.PROCESSING)));
|
||||
// CHECKSTYLE:IGNORE ModifiedControlVariable FOR NEXT 1 LINES
|
||||
i += maxRecords;
|
||||
}
|
||||
Thread.sleep(50L);
|
||||
}
|
||||
|
||||
// Consume shards until shutdown terminate is called and it has thrown an exception
|
||||
for (int i = 0; i < 100; i++) {
|
||||
consumer.consumeShard();
|
||||
if (processor.errorShutdownLatch.await(50, TimeUnit.MILLISECONDS)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
assertEquals(0, processor.errorShutdownLatch.getCount());
|
||||
|
||||
// Wait for a retry of shutdown terminate that should succeed
|
||||
for (int i = 0; i < 100; i++) {
|
||||
consumer.consumeShard();
|
||||
if (processor.getShutdownLatch().await(50, TimeUnit.MILLISECONDS)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
assertEquals(0, processor.getShutdownLatch().getCount());
|
||||
|
||||
// Wait for shutdown complete now that terminate shutdown is successful
|
||||
for (int i = 0; i < 100; i++) {
|
||||
consumer.consumeShard();
|
||||
if (consumer.getCurrentState() == ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE) {
|
||||
break;
|
||||
}
|
||||
Thread.sleep(50L);
|
||||
}
|
||||
assertThat(consumer.getCurrentState(), equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE));
|
||||
|
||||
assertThat(processor.getShutdownReason(), is(equalTo(ShutdownReason.TERMINATE)));
|
||||
|
||||
verify(getRecordsCache).shutdown();
|
||||
|
||||
executorService.shutdown();
|
||||
executorService.awaitTermination(60, TimeUnit.SECONDS);
|
||||
|
||||
String iterator = fileBasedProxy.getIterator(streamShardId, ShardIteratorType.TRIM_HORIZON.toString());
|
||||
List<Record> expectedRecords = toUserRecords(fileBasedProxy.get(iterator, numRecs).getRecords());
|
||||
verifyConsumedRecords(expectedRecords, processor.getProcessedRecords());
|
||||
file.delete();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test method for {@link ShardConsumer#consumeShard()} that starts from initial position of type AT_TIMESTAMP.
|
||||
*/
|
||||
|
|
@ -468,11 +624,13 @@ public class ShardConsumerTest {
|
|||
)
|
||||
);
|
||||
|
||||
KinesisDataFetcher dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo);
|
||||
dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo);
|
||||
|
||||
getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords,
|
||||
new SynchronousGetRecordsRetrievalStrategy(dataFetcher)));
|
||||
when(recordsFetcherFactory.createRecordsFetcher(any(), anyString(),any())).thenReturn(getRecordsCache);
|
||||
when(recordsFetcherFactory.createRecordsFetcher(any(GetRecordsRetrievalStrategy.class), anyString(),
|
||||
any(IMetricsFactory.class), anyInt()))
|
||||
.thenReturn(getRecordsCache);
|
||||
|
||||
ShardConsumer consumer =
|
||||
new ShardConsumer(shardInfo,
|
||||
|
|
@ -567,7 +725,7 @@ public class ShardConsumerTest {
|
|||
final ExtendedSequenceNumber checkpointSequenceNumber = new ExtendedSequenceNumber("123");
|
||||
final ExtendedSequenceNumber pendingCheckpointSequenceNumber = new ExtendedSequenceNumber("999");
|
||||
when(leaseManager.getLease(anyString())).thenReturn(null);
|
||||
when(config.getRecordsFetcherFactory()).thenReturn(new SimpleRecordsFetcherFactory(2));
|
||||
when(config.getRecordsFetcherFactory()).thenReturn(new SimpleRecordsFetcherFactory());
|
||||
when(checkpoint.getCheckpointObject(anyString())).thenReturn(
|
||||
new Checkpoint(checkpointSequenceNumber, pendingCheckpointSequenceNumber));
|
||||
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
|
|||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyInt;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
import static org.mockito.Matchers.argThat;
|
||||
import static org.mockito.Matchers.eq;
|
||||
|
|
@ -172,7 +173,7 @@ public class WorkerTest {
|
|||
@Before
|
||||
public void setup() {
|
||||
config = spy(new KinesisClientLibConfiguration("app", null, null, null));
|
||||
recordsFetcherFactory = spy(new SimpleRecordsFetcherFactory(500));
|
||||
recordsFetcherFactory = spy(new SimpleRecordsFetcherFactory());
|
||||
when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory);
|
||||
}
|
||||
|
||||
|
|
@ -505,7 +506,7 @@ public class WorkerTest {
|
|||
lease.setCheckpoint(new ExtendedSequenceNumber("2"));
|
||||
initialLeases.add(lease);
|
||||
boolean callProcessRecordsForEmptyRecordList = true;
|
||||
RecordsFetcherFactory recordsFetcherFactory = new SimpleRecordsFetcherFactory(500);
|
||||
RecordsFetcherFactory recordsFetcherFactory = new SimpleRecordsFetcherFactory();
|
||||
recordsFetcherFactory.setIdleMillisBetweenCalls(0L);
|
||||
when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory);
|
||||
runAndTestWorker(shardList, threadPoolSize, initialLeases, callProcessRecordsForEmptyRecordList, numberOfRecordsPerShard, config);
|
||||
|
|
@ -621,7 +622,9 @@ public class WorkerTest {
|
|||
RecordsFetcherFactory recordsFetcherFactory = mock(RecordsFetcherFactory.class);
|
||||
GetRecordsCache getRecordsCache = mock(GetRecordsCache.class);
|
||||
when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory);
|
||||
when(recordsFetcherFactory.createRecordsFetcher(any(), anyString(),any())).thenReturn(getRecordsCache);
|
||||
when(recordsFetcherFactory.createRecordsFetcher(any(GetRecordsRetrievalStrategy.class), anyString(),
|
||||
any(IMetricsFactory.class), anyInt()))
|
||||
.thenReturn(getRecordsCache);
|
||||
when(getRecordsCache.getNextResult()).thenReturn(new ProcessRecordsInput().withRecords(Collections.emptyList()).withMillisBehindLatest(0L));
|
||||
|
||||
WorkerThread workerThread = runWorker(shardList,
|
||||
|
|
|
|||
Loading…
Reference in a new issue