Merge remote-tracking branch 'upstream/master' into long-running-tasks
This commit is contained in:
commit
74f8dc0e7d
14 changed files with 143 additions and 96 deletions
|
|
@ -2,9 +2,9 @@ Manifest-Version: 1.0
|
||||||
Bundle-ManifestVersion: 2
|
Bundle-ManifestVersion: 2
|
||||||
Bundle-Name: Amazon Kinesis Client Library for Java
|
Bundle-Name: Amazon Kinesis Client Library for Java
|
||||||
Bundle-SymbolicName: com.amazonaws.kinesisclientlibrary;singleton:=true
|
Bundle-SymbolicName: com.amazonaws.kinesisclientlibrary;singleton:=true
|
||||||
Bundle-Version: 1.8.5
|
Bundle-Version: 1.8.7
|
||||||
Bundle-Vendor: Amazon Technologies, Inc
|
Bundle-Vendor: Amazon Technologies, Inc
|
||||||
Bundle-RequiredExecutionEnvironment: JavaSE-1.7
|
Bundle-RequiredExecutionEnvironment: JavaSE-1.8
|
||||||
Require-Bundle: org.apache.commons.codec;bundle-version="1.6",
|
Require-Bundle: org.apache.commons.codec;bundle-version="1.6",
|
||||||
org.apache.commons.logging;bundle-version="1.1.3";visibility:=reexport,
|
org.apache.commons.logging;bundle-version="1.1.3";visibility:=reexport,
|
||||||
com.fasterxml.jackson.core.jackson-databind;bundle-version="2.5.3",
|
com.fasterxml.jackson.core.jackson-databind;bundle-version="2.5.3",
|
||||||
|
|
|
||||||
21
README.md
21
README.md
|
|
@ -29,6 +29,27 @@ For producer-side developers using the **[Kinesis Producer Library (KPL)][kinesi
|
||||||
To make it easier for developers to write record processors in other languages, we have implemented a Java based daemon, called MultiLangDaemon that does all the heavy lifting. Our approach has the daemon spawn a sub-process, which in turn runs the record processor, which can be written in any language. The MultiLangDaemon process and the record processor sub-process communicate with each other over [STDIN and STDOUT using a defined protocol][multi-lang-protocol]. There will be a one to one correspondence amongst record processors, child processes, and shards. For Python developers specifically, we have abstracted these implementation details away and [expose an interface][kclpy] that enables you to focus on writing record processing logic in Python. This approach enables KCL to be language agnostic, while providing identical features and similar parallel processing model across all languages.
|
To make it easier for developers to write record processors in other languages, we have implemented a Java based daemon, called MultiLangDaemon that does all the heavy lifting. Our approach has the daemon spawn a sub-process, which in turn runs the record processor, which can be written in any language. The MultiLangDaemon process and the record processor sub-process communicate with each other over [STDIN and STDOUT using a defined protocol][multi-lang-protocol]. There will be a one to one correspondence amongst record processors, child processes, and shards. For Python developers specifically, we have abstracted these implementation details away and [expose an interface][kclpy] that enables you to focus on writing record processing logic in Python. This approach enables KCL to be language agnostic, while providing identical features and similar parallel processing model across all languages.
|
||||||
|
|
||||||
## Release Notes
|
## Release Notes
|
||||||
|
### Release 1.8.7
|
||||||
|
* Don't add a delay for synchronous requests to Kinesis
|
||||||
|
Removes a delay that had been added for synchronous `GetRecords` calls to Kinesis.
|
||||||
|
* [PR #256](https://github.com/awslabs/amazon-kinesis-client/pull/256)
|
||||||
|
|
||||||
|
### Release 1.8.6
|
||||||
|
* Add prefetching of records from Kinesis
|
||||||
|
Prefetching will retrieve and queue additional records from Kinesis while the application is processing existing records.
|
||||||
|
Prefetching can be enabled by setting [`dataFetchingStrategy`](https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java#L1317) to `PREFETCH_CACHED`. Once enabled an additional fetching thread will be started to retrieve records from Kinesis. Retrieved records will be held in a queue until the application is ready to process them.
|
||||||
|
Pre-fetching supports the following configuration values:
|
||||||
|
|
||||||
|
| Name | Default | Description |
|
||||||
|
| ---- | ------- | ----------- |
|
||||||
|
| [`dataFetchingStrategy`](https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java#L1317) | `DEFAULT` | Which data fetching strategy to use |
|
||||||
|
| [`maxPendingProcessRecordsInput`](https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java#L1296) | 3 | The maximum number of process records input that can be queued |
|
||||||
|
| [`maxCacheByteSize`](https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java#L1307) | 8 MiB | The maximum number of bytes that can be queued |
|
||||||
|
| [`maxRecordsCount`](https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java#L1326) | 30,000 | The maximum number of records that can be queued |
|
||||||
|
| [`idleMillisBetweenCalls`](https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java#L1353) | 1,500 ms | The amount of time to wait between calls to Kinesis |
|
||||||
|
|
||||||
|
* [PR #246](https://github.com/awslabs/amazon-kinesis-client/pull/246)
|
||||||
|
|
||||||
### Release 1.8.5 (September 26, 2017)
|
### Release 1.8.5 (September 26, 2017)
|
||||||
* Only advance the shard iterator for the accepted response.
|
* Only advance the shard iterator for the accepted response.
|
||||||
This fixes a race condition in the `KinesisDataFetcher` when it's being used to make asynchronous requests. The shard iterator is now only advanced when the retriever calls `DataFetcherResult#accept()`.
|
This fixes a race condition in the `KinesisDataFetcher` when it's being used to make asynchronous requests. The shard iterator is now only advanced when the retriever calls `DataFetcherResult#accept()`.
|
||||||
|
|
|
||||||
4
pom.xml
4
pom.xml
|
|
@ -6,7 +6,7 @@
|
||||||
<artifactId>amazon-kinesis-client</artifactId>
|
<artifactId>amazon-kinesis-client</artifactId>
|
||||||
<packaging>jar</packaging>
|
<packaging>jar</packaging>
|
||||||
<name>Amazon Kinesis Client Library for Java</name>
|
<name>Amazon Kinesis Client Library for Java</name>
|
||||||
<version>1.8.6-SNAPSHOT</version>
|
<version>1.8.8-SNAPSHOT</version>
|
||||||
<description>The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data
|
<description>The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data
|
||||||
from Amazon Kinesis.
|
from Amazon Kinesis.
|
||||||
</description>
|
</description>
|
||||||
|
|
@ -25,7 +25,7 @@
|
||||||
</licenses>
|
</licenses>
|
||||||
|
|
||||||
<properties>
|
<properties>
|
||||||
<aws-java-sdk.version>1.11.198</aws-java-sdk.version>
|
<aws-java-sdk.version>1.11.218</aws-java-sdk.version>
|
||||||
<sqlite4java.version>1.0.392</sqlite4java.version>
|
<sqlite4java.version>1.0.392</sqlite4java.version>
|
||||||
<sqlite4java.native>libsqlite4java</sqlite4java.native>
|
<sqlite4java.native>libsqlite4java</sqlite4java.native>
|
||||||
<sqlite4java.libpath>${project.build.directory}/test-lib</sqlite4java.libpath>
|
<sqlite4java.libpath>${project.build.directory}/test-lib</sqlite4java.libpath>
|
||||||
|
|
|
||||||
|
|
@ -31,15 +31,11 @@ import lombok.extern.apachecommons.CommonsLog;
|
||||||
public class BlockingGetRecordsCache implements GetRecordsCache {
|
public class BlockingGetRecordsCache implements GetRecordsCache {
|
||||||
private final int maxRecordsPerCall;
|
private final int maxRecordsPerCall;
|
||||||
private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
|
private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
|
||||||
private final long idleMillisBetweenCalls;
|
|
||||||
private Instant lastSuccessfulCall;
|
|
||||||
|
|
||||||
public BlockingGetRecordsCache(final int maxRecordsPerCall,
|
public BlockingGetRecordsCache(final int maxRecordsPerCall,
|
||||||
final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy,
|
final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) {
|
||||||
final long idleMillisBetweenCalls) {
|
|
||||||
this.maxRecordsPerCall = maxRecordsPerCall;
|
this.maxRecordsPerCall = maxRecordsPerCall;
|
||||||
this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy;
|
this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy;
|
||||||
this.idleMillisBetweenCalls = idleMillisBetweenCalls;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
@ -51,31 +47,10 @@ public class BlockingGetRecordsCache implements GetRecordsCache {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ProcessRecordsInput getNextResult() {
|
public ProcessRecordsInput getNextResult() {
|
||||||
sleepBeforeNextCall();
|
|
||||||
GetRecordsResult getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall);
|
GetRecordsResult getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall);
|
||||||
lastSuccessfulCall = Instant.now();
|
return new ProcessRecordsInput()
|
||||||
ProcessRecordsInput processRecordsInput = new ProcessRecordsInput()
|
|
||||||
.withRecords(getRecordsResult.getRecords())
|
.withRecords(getRecordsResult.getRecords())
|
||||||
.withMillisBehindLatest(getRecordsResult.getMillisBehindLatest());
|
.withMillisBehindLatest(getRecordsResult.getMillisBehindLatest());
|
||||||
return processRecordsInput;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void sleepBeforeNextCall() {
|
|
||||||
if (!Thread.interrupted()) {
|
|
||||||
if (lastSuccessfulCall == null) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
long timeSinceLastCall = Duration.between(lastSuccessfulCall, Instant.now()).abs().toMillis();
|
|
||||||
if (timeSinceLastCall < idleMillisBetweenCalls) {
|
|
||||||
try {
|
|
||||||
Thread.sleep(idleMillisBetweenCalls - timeSinceLastCall);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
log.info("Thread was interrupted, indicating that shutdown was called.");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
log.info("Thread has been interrupted, indicating that it is in the shutdown phase.");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
||||||
|
|
@ -126,7 +126,7 @@ public class KinesisClientLibConfiguration {
|
||||||
/**
|
/**
|
||||||
* User agent set when Amazon Kinesis Client Library makes AWS requests.
|
* User agent set when Amazon Kinesis Client Library makes AWS requests.
|
||||||
*/
|
*/
|
||||||
public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.8.5";
|
public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.8.7";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls
|
* KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls
|
||||||
|
|
|
||||||
|
|
@ -20,6 +20,8 @@ import java.time.Instant;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
|
||||||
|
import org.apache.commons.lang.Validate;
|
||||||
|
|
||||||
import com.amazonaws.SdkClientException;
|
import com.amazonaws.SdkClientException;
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
|
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
|
||||||
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
|
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
|
||||||
|
|
@ -29,7 +31,6 @@ import com.amazonaws.services.kinesis.model.GetRecordsResult;
|
||||||
|
|
||||||
import lombok.NonNull;
|
import lombok.NonNull;
|
||||||
import lombok.extern.apachecommons.CommonsLog;
|
import lombok.extern.apachecommons.CommonsLog;
|
||||||
import org.apache.commons.lang.Validate;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This is the prefetch caching class, this class spins up a thread if prefetching is enabled. That thread fetches the
|
* This is the prefetch caching class, this class spins up a thread if prefetching is enabled. That thread fetches the
|
||||||
|
|
@ -171,6 +172,16 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
|
||||||
} finally {
|
} finally {
|
||||||
MetricsHelper.endScope();
|
MetricsHelper.endScope();
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
//
|
||||||
|
// Consumer isn't ready to receive new records will allow prefetch counters to pause
|
||||||
|
//
|
||||||
|
try {
|
||||||
|
prefetchCounters.waitForConsumer();
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
log.info("Thread was interrupted while waiting for the consumer. " +
|
||||||
|
"Shutdown has probably been started");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
callShutdownOnStrategy();
|
callShutdownOnStrategy();
|
||||||
|
|
@ -205,6 +216,7 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
|
||||||
public synchronized void removed(final ProcessRecordsInput result) {
|
public synchronized void removed(final ProcessRecordsInput result) {
|
||||||
size -= getSize(result);
|
size -= getSize(result);
|
||||||
byteSize -= getByteSize(result);
|
byteSize -= getByteSize(result);
|
||||||
|
this.notifyAll();
|
||||||
}
|
}
|
||||||
|
|
||||||
private long getSize(final ProcessRecordsInput result) {
|
private long getSize(final ProcessRecordsInput result) {
|
||||||
|
|
@ -215,9 +227,25 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
|
||||||
return result.getRecords().stream().mapToLong(record -> record.getData().array().length).sum();
|
return result.getRecords().stream().mapToLong(record -> record.getData().array().length).sum();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public synchronized void waitForConsumer() throws InterruptedException {
|
||||||
|
if (!shouldGetNewRecords()) {
|
||||||
|
log.debug("Queue is full waiting for consumer for " + idleMillisBetweenCalls + " ms");
|
||||||
|
this.wait(idleMillisBetweenCalls);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public synchronized boolean shouldGetNewRecords() {
|
public synchronized boolean shouldGetNewRecords() {
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug("Current Prefetch Counter States: " + this.toString());
|
||||||
|
}
|
||||||
return size < maxRecordsCount && byteSize < maxByteSize;
|
return size < maxRecordsCount && byteSize < maxByteSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return String.format("{ Requests: %d, Records: %d, Bytes: %d }", getRecordsResultQueue.size(), size,
|
||||||
|
byteSize);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -28,7 +28,6 @@ public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory {
|
||||||
private int maxRecordsCount = 30000;
|
private int maxRecordsCount = 30000;
|
||||||
private long idleMillisBetweenCalls = 1500L;
|
private long idleMillisBetweenCalls = 1500L;
|
||||||
private DataFetchingStrategy dataFetchingStrategy = DataFetchingStrategy.DEFAULT;
|
private DataFetchingStrategy dataFetchingStrategy = DataFetchingStrategy.DEFAULT;
|
||||||
private IMetricsFactory metricsFactory;
|
|
||||||
|
|
||||||
public SimpleRecordsFetcherFactory(int maxRecords) {
|
public SimpleRecordsFetcherFactory(int maxRecords) {
|
||||||
this.maxRecords = maxRecords;
|
this.maxRecords = maxRecords;
|
||||||
|
|
@ -37,7 +36,7 @@ public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory {
|
||||||
@Override
|
@Override
|
||||||
public GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, String shardId, IMetricsFactory metricsFactory) {
|
public GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, String shardId, IMetricsFactory metricsFactory) {
|
||||||
if(dataFetchingStrategy.equals(DataFetchingStrategy.DEFAULT)) {
|
if(dataFetchingStrategy.equals(DataFetchingStrategy.DEFAULT)) {
|
||||||
return new BlockingGetRecordsCache(maxRecords, getRecordsRetrievalStrategy, idleMillisBetweenCalls);
|
return new BlockingGetRecordsCache(maxRecords, getRecordsRetrievalStrategy);
|
||||||
} else {
|
} else {
|
||||||
return new PrefetchGetRecordsCache(maxPendingProcessRecordsInput, maxByteSize, maxRecordsCount, maxRecords,
|
return new PrefetchGetRecordsCache(maxPendingProcessRecordsInput, maxByteSize, maxRecordsCount, maxRecords,
|
||||||
getRecordsRetrievalStrategy,
|
getRecordsRetrievalStrategy,
|
||||||
|
|
|
||||||
|
|
@ -19,6 +19,7 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import com.amazonaws.services.kinesis.leases.util.DynamoUtils;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
|
|
@ -386,7 +387,19 @@ public class LeaseManager<T extends Lease> implements ILeaseManager<T> {
|
||||||
+ " because the lease counter was not " + lease.getLeaseCounter());
|
+ " because the lease counter was not " + lease.getLeaseCounter());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If we had a spurious retry during the Dynamo update, then this conditional PUT failure
|
||||||
|
// might be incorrect. So, we get the item straight away and check if the lease owner + lease counter
|
||||||
|
// are what we expected.
|
||||||
|
String expectedOwner = lease.getLeaseOwner();
|
||||||
|
Long expectedCounter = lease.getLeaseCounter() + 1;
|
||||||
|
T updatedLease = getLease(lease.getLeaseKey());
|
||||||
|
if (updatedLease == null || !expectedOwner.equals(updatedLease.getLeaseOwner()) ||
|
||||||
|
!expectedCounter.equals(updatedLease.getLeaseCounter())) {
|
||||||
return false;
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.info("Detected spurious renewal failure for lease with key " + lease.getLeaseKey()
|
||||||
|
+ ", but recovered");
|
||||||
} catch (AmazonClientException e) {
|
} catch (AmazonClientException e) {
|
||||||
throw convertAndRethrowExceptions("renew", lease.getLeaseKey(), e);
|
throw convertAndRethrowExceptions("renew", lease.getLeaseKey(), e);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -40,7 +40,6 @@ import com.amazonaws.services.kinesis.model.Record;
|
||||||
@RunWith(MockitoJUnitRunner.class)
|
@RunWith(MockitoJUnitRunner.class)
|
||||||
public class BlockingGetRecordsCacheTest {
|
public class BlockingGetRecordsCacheTest {
|
||||||
private static final int MAX_RECORDS_PER_COUNT = 10_000;
|
private static final int MAX_RECORDS_PER_COUNT = 10_000;
|
||||||
private static final long IDLE_MILLIS_BETWEEN_CALLS = 500L;
|
|
||||||
|
|
||||||
@Mock
|
@Mock
|
||||||
private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
|
private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
|
||||||
|
|
@ -53,7 +52,7 @@ public class BlockingGetRecordsCacheTest {
|
||||||
@Before
|
@Before
|
||||||
public void setup() {
|
public void setup() {
|
||||||
records = new ArrayList<>();
|
records = new ArrayList<>();
|
||||||
blockingGetRecordsCache = new BlockingGetRecordsCache(MAX_RECORDS_PER_COUNT, getRecordsRetrievalStrategy, IDLE_MILLIS_BETWEEN_CALLS);
|
blockingGetRecordsCache = new BlockingGetRecordsCache(MAX_RECORDS_PER_COUNT, getRecordsRetrievalStrategy);
|
||||||
|
|
||||||
when(getRecordsRetrievalStrategy.getRecords(eq(MAX_RECORDS_PER_COUNT))).thenReturn(getRecordsResult);
|
when(getRecordsRetrievalStrategy.getRecords(eq(MAX_RECORDS_PER_COUNT))).thenReturn(getRecordsResult);
|
||||||
when(getRecordsResult.getRecords()).thenReturn(records);
|
when(getRecordsResult.getRecords()).thenReturn(records);
|
||||||
|
|
|
||||||
|
|
@ -342,8 +342,7 @@ public class ShardConsumerTest {
|
||||||
KinesisDataFetcher dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo);
|
KinesisDataFetcher dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo);
|
||||||
|
|
||||||
getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords,
|
getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords,
|
||||||
new SynchronousGetRecordsRetrievalStrategy(dataFetcher),
|
new SynchronousGetRecordsRetrievalStrategy(dataFetcher)));
|
||||||
0L));
|
|
||||||
when(recordsFetcherFactory.createRecordsFetcher(any(), anyString(),any())).thenReturn(getRecordsCache);
|
when(recordsFetcherFactory.createRecordsFetcher(any(), anyString(),any())).thenReturn(getRecordsCache);
|
||||||
|
|
||||||
ShardConsumer consumer =
|
ShardConsumer consumer =
|
||||||
|
|
@ -472,8 +471,7 @@ public class ShardConsumerTest {
|
||||||
KinesisDataFetcher dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo);
|
KinesisDataFetcher dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo);
|
||||||
|
|
||||||
getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords,
|
getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords,
|
||||||
new SynchronousGetRecordsRetrievalStrategy(dataFetcher),
|
new SynchronousGetRecordsRetrievalStrategy(dataFetcher)));
|
||||||
0L));
|
|
||||||
when(recordsFetcherFactory.createRecordsFetcher(any(), anyString(),any())).thenReturn(getRecordsCache);
|
when(recordsFetcherFactory.createRecordsFetcher(any(), anyString(),any())).thenReturn(getRecordsCache);
|
||||||
|
|
||||||
ShardConsumer consumer =
|
ShardConsumer consumer =
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,5 @@
|
||||||
/*
|
/*
|
||||||
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||||
*
|
*
|
||||||
* Licensed under the Amazon Software License (the "License").
|
* Licensed under the Amazon Software License (the "License").
|
||||||
* You may not use this file except in compliance with the License.
|
* You may not use this file except in compliance with the License.
|
||||||
|
|
@ -16,6 +16,7 @@ package com.amazonaws.services.kinesis.leases.impl;
|
||||||
|
|
||||||
import java.util.logging.Logger;
|
import java.util.logging.Logger;
|
||||||
|
|
||||||
|
import com.amazonaws.services.kinesis.leases.exceptions.LeasingException;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.junit.Ignore;
|
import org.junit.Ignore;
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,5 @@
|
||||||
/*
|
/*
|
||||||
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||||
*
|
*
|
||||||
* Licensed under the Amazon Software License (the "License").
|
* Licensed under the Amazon Software License (the "License").
|
||||||
* You may not use this file except in compliance with the License.
|
* You may not use this file except in compliance with the License.
|
||||||
|
|
@ -108,7 +108,8 @@ public class LeaseManagerIntegrationTest extends LeaseIntegrationTest {
|
||||||
|
|
||||||
KinesisClientLease leaseCopy = leaseManager.getLease(lease.getLeaseKey());
|
KinesisClientLease leaseCopy = leaseManager.getLease(lease.getLeaseKey());
|
||||||
|
|
||||||
leaseManager.renewLease(lease);
|
// lose lease
|
||||||
|
leaseManager.takeLease(lease, "bar");
|
||||||
|
|
||||||
Assert.assertFalse(leaseManager.renewLease(leaseCopy));
|
Assert.assertFalse(leaseManager.renewLease(leaseCopy));
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,5 @@
|
||||||
/*
|
/*
|
||||||
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||||
*
|
*
|
||||||
* Licensed under the Amazon Software License (the "License").
|
* Licensed under the Amazon Software License (the "License").
|
||||||
* You may not use this file except in compliance with the License.
|
* You may not use this file except in compliance with the License.
|
||||||
|
|
@ -14,17 +14,16 @@
|
||||||
*/
|
*/
|
||||||
package com.amazonaws.services.kinesis.leases.impl;
|
package com.amazonaws.services.kinesis.leases.impl;
|
||||||
|
|
||||||
import java.util.Collections;
|
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
|
||||||
import java.util.Map;
|
import com.amazonaws.services.kinesis.leases.exceptions.LeasingException;
|
||||||
import java.util.concurrent.Executors;
|
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseRenewer;
|
||||||
|
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
|
import java.util.Collections;
|
||||||
import com.amazonaws.services.kinesis.leases.exceptions.LeasingException;
|
import java.util.Map;
|
||||||
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseRenewer;
|
import java.util.concurrent.Executors;
|
||||||
|
|
||||||
public class LeaseRenewerIntegrationTest extends LeaseIntegrationTest {
|
public class LeaseRenewerIntegrationTest extends LeaseIntegrationTest {
|
||||||
|
|
||||||
|
|
@ -58,7 +57,9 @@ public class LeaseRenewerIntegrationTest extends LeaseIntegrationTest {
|
||||||
builder.addLeasesToRenew(renewer, "1", "2");
|
builder.addLeasesToRenew(renewer, "1", "2");
|
||||||
KinesisClientLease renewedLease = builder.renewMutateAssert(renewer, "1", "2").get("2");
|
KinesisClientLease renewedLease = builder.renewMutateAssert(renewer, "1", "2").get("2");
|
||||||
|
|
||||||
leaseManager.updateLease(renewedLease);
|
// lose lease 2
|
||||||
|
leaseManager.takeLease(renewedLease, "bar");
|
||||||
|
|
||||||
builder.renewMutateAssert(renewer, "1");
|
builder.renewMutateAssert(renewer, "1");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -96,9 +97,9 @@ public class LeaseRenewerIntegrationTest extends LeaseIntegrationTest {
|
||||||
public void testGetCurrentlyHeldLeases() throws LeasingException {
|
public void testGetCurrentlyHeldLeases() throws LeasingException {
|
||||||
TestHarnessBuilder builder = new TestHarnessBuilder(leaseManager);
|
TestHarnessBuilder builder = new TestHarnessBuilder(leaseManager);
|
||||||
|
|
||||||
KinesisClientLease lease2 = builder.withLease("1", "foo").withLease("2", "foo").build().get("2");
|
builder.withLease("1", "foo").withLease("2", "foo").build();
|
||||||
builder.addLeasesToRenew(renewer, "1", "2");
|
builder.addLeasesToRenew(renewer, "1", "2");
|
||||||
builder.renewMutateAssert(renewer, "1", "2");
|
KinesisClientLease lease2 = builder.renewMutateAssert(renewer, "1", "2").get("2");
|
||||||
|
|
||||||
// This should be a copy that doesn't get updated
|
// This should be a copy that doesn't get updated
|
||||||
Map<String, KinesisClientLease> heldLeases = renewer.getCurrentlyHeldLeases();
|
Map<String, KinesisClientLease> heldLeases = renewer.getCurrentlyHeldLeases();
|
||||||
|
|
@ -106,7 +107,9 @@ public class LeaseRenewerIntegrationTest extends LeaseIntegrationTest {
|
||||||
Assert.assertEquals((Long) 1L, heldLeases.get("1").getLeaseCounter());
|
Assert.assertEquals((Long) 1L, heldLeases.get("1").getLeaseCounter());
|
||||||
Assert.assertEquals((Long) 1L, heldLeases.get("2").getLeaseCounter());
|
Assert.assertEquals((Long) 1L, heldLeases.get("2").getLeaseCounter());
|
||||||
|
|
||||||
leaseManager.updateLease(lease2); // lose lease 2
|
// lose lease 2
|
||||||
|
leaseManager.takeLease(lease2, "bar");
|
||||||
|
|
||||||
// Do another renewal and make sure the copy doesn't change
|
// Do another renewal and make sure the copy doesn't change
|
||||||
builder.renewMutateAssert(renewer, "1");
|
builder.renewMutateAssert(renewer, "1");
|
||||||
|
|
||||||
|
|
@ -176,7 +179,7 @@ public class LeaseRenewerIntegrationTest extends LeaseIntegrationTest {
|
||||||
KinesisClientLease lease = renewer.getCurrentlyHeldLease("1");
|
KinesisClientLease lease = renewer.getCurrentlyHeldLease("1");
|
||||||
|
|
||||||
// cause lease loss such that the renewer knows the lease has been lost when update is called
|
// cause lease loss such that the renewer knows the lease has been lost when update is called
|
||||||
leaseManager.renewLease(lease);
|
leaseManager.takeLease(lease, "bar");
|
||||||
builder.renewMutateAssert(renewer);
|
builder.renewMutateAssert(renewer);
|
||||||
|
|
||||||
lease.setCheckpoint(new ExtendedSequenceNumber("new checkpoint"));
|
lease.setCheckpoint(new ExtendedSequenceNumber("new checkpoint"));
|
||||||
|
|
@ -195,7 +198,7 @@ public class LeaseRenewerIntegrationTest extends LeaseIntegrationTest {
|
||||||
KinesisClientLease lease = renewer.getCurrentlyHeldLease("1");
|
KinesisClientLease lease = renewer.getCurrentlyHeldLease("1");
|
||||||
|
|
||||||
// cause lease loss such that the renewer knows the lease has been lost when update is called
|
// cause lease loss such that the renewer knows the lease has been lost when update is called
|
||||||
leaseManager.renewLease(lease);
|
leaseManager.takeLease(lease, "bar");
|
||||||
builder.renewMutateAssert(renewer);
|
builder.renewMutateAssert(renewer);
|
||||||
|
|
||||||
// regain the lease
|
// regain the lease
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,5 @@
|
||||||
/*
|
/*
|
||||||
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||||
*
|
*
|
||||||
* Licensed under the Amazon Software License (the "License").
|
* Licensed under the Amazon Software License (the "License").
|
||||||
* You may not use this file except in compliance with the License.
|
* You may not use this file except in compliance with the License.
|
||||||
|
|
@ -35,6 +35,7 @@ public class TestHarnessBuilder {
|
||||||
|
|
||||||
private Map<String, KinesisClientLease> leases = new HashMap<String, KinesisClientLease>();
|
private Map<String, KinesisClientLease> leases = new HashMap<String, KinesisClientLease>();
|
||||||
private KinesisClientLeaseManager leaseManager;
|
private KinesisClientLeaseManager leaseManager;
|
||||||
|
private Map<String, KinesisClientLease> originalLeases = new HashMap<>();
|
||||||
|
|
||||||
private Callable<Long> timeProvider = new Callable<Long>() {
|
private Callable<Long> timeProvider = new Callable<Long>() {
|
||||||
|
|
||||||
|
|
@ -54,6 +55,15 @@ public class TestHarnessBuilder {
|
||||||
}
|
}
|
||||||
|
|
||||||
public TestHarnessBuilder withLease(String shardId, String owner) {
|
public TestHarnessBuilder withLease(String shardId, String owner) {
|
||||||
|
KinesisClientLease lease = createLease(shardId, owner);
|
||||||
|
KinesisClientLease originalLease = createLease(shardId, owner);
|
||||||
|
|
||||||
|
leases.put(shardId, lease);
|
||||||
|
originalLeases.put(shardId, originalLease);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
private KinesisClientLease createLease(String shardId, String owner) {
|
||||||
KinesisClientLease lease = new KinesisClientLease();
|
KinesisClientLease lease = new KinesisClientLease();
|
||||||
lease.setCheckpoint(new ExtendedSequenceNumber("checkpoint"));
|
lease.setCheckpoint(new ExtendedSequenceNumber("checkpoint"));
|
||||||
lease.setOwnerSwitchesSinceCheckpoint(0L);
|
lease.setOwnerSwitchesSinceCheckpoint(0L);
|
||||||
|
|
@ -62,8 +72,7 @@ public class TestHarnessBuilder {
|
||||||
lease.setParentShardIds(Collections.singleton("parentShardId"));
|
lease.setParentShardIds(Collections.singleton("parentShardId"));
|
||||||
lease.setLeaseKey(shardId);
|
lease.setLeaseKey(shardId);
|
||||||
|
|
||||||
leases.put(shardId, lease);
|
return lease;
|
||||||
return this;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public Map<String, KinesisClientLease> build() throws LeasingException {
|
public Map<String, KinesisClientLease> build() throws LeasingException {
|
||||||
|
|
@ -147,7 +156,7 @@ public class TestHarnessBuilder {
|
||||||
Assert.assertEquals(renewedShardIds.length, heldLeases.size());
|
Assert.assertEquals(renewedShardIds.length, heldLeases.size());
|
||||||
|
|
||||||
for (String shardId : renewedShardIds) {
|
for (String shardId : renewedShardIds) {
|
||||||
KinesisClientLease original = leases.get(shardId);
|
KinesisClientLease original = originalLeases.get(shardId);
|
||||||
Assert.assertNotNull(original);
|
Assert.assertNotNull(original);
|
||||||
|
|
||||||
KinesisClientLease actual = heldLeases.get(shardId);
|
KinesisClientLease actual = heldLeases.get(shardId);
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue