From 046e160e24036fc237d732b1122383c912d1b813 Mon Sep 17 00:00:00 2001 From: Justin Pfifer Date: Mon, 23 Oct 2017 10:16:03 -0700 Subject: [PATCH 1/7] Block Fetch Thread When Retrieval Should be Paused (#252) Block the fetching thread when the queue is considered to be full. This ensures that the thread won't spin the CPU when it can't retrieve more records or bytes. --- .../lib/worker/PrefetchGetRecordsCache.java | 30 ++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java index 5369c0f4..06e77c8c 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java @@ -20,6 +20,8 @@ import java.time.Instant; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; +import org.apache.commons.lang.Validate; + import com.amazonaws.SdkClientException; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper; @@ -29,7 +31,6 @@ import com.amazonaws.services.kinesis.model.GetRecordsResult; import lombok.NonNull; import lombok.extern.apachecommons.CommonsLog; -import org.apache.commons.lang.Validate; /** * This is the prefetch caching class, this class spins up a thread if prefetching is enabled. That thread fetches the @@ -171,6 +172,16 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { } finally { MetricsHelper.endScope(); } + } else { + // + // Consumer isn't ready to receive new records will allow prefetch counters to pause + // + try { + prefetchCounters.waitForConsumer(); + } catch (InterruptedException ie) { + log.info("Thread was interrupted while waiting for the consumer. " + + "Shutdown has probably been started"); + } } } callShutdownOnStrategy(); @@ -205,6 +216,7 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { public synchronized void removed(final ProcessRecordsInput result) { size -= getSize(result); byteSize -= getByteSize(result); + this.notifyAll(); } private long getSize(final ProcessRecordsInput result) { @@ -214,10 +226,26 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { private long getByteSize(final ProcessRecordsInput result) { return result.getRecords().stream().mapToLong(record -> record.getData().array().length).sum(); } + + public synchronized void waitForConsumer() throws InterruptedException { + if (!shouldGetNewRecords()) { + log.debug("Queue is full waiting for consumer for " + idleMillisBetweenCalls + " ms"); + this.wait(idleMillisBetweenCalls); + } + } public synchronized boolean shouldGetNewRecords() { + if (log.isDebugEnabled()) { + log.debug("Current Prefetch Counter States: " + this.toString()); + } return size < maxRecordsCount && byteSize < maxByteSize; } + + @Override + public String toString() { + return String.format("{ Requests: %d, Records: %d, Bytes: %d }", getRecordsResultQueue.size(), size, + byteSize); + } } } From 90d4cb78c5b8bf3e3b3cf634b94d8dba0ae9bec9 Mon Sep 17 00:00:00 2001 From: Justin Pfifer Date: Mon, 23 Oct 2017 10:16:25 -0700 Subject: [PATCH 2/7] Update the AWS SDK Version to 1.11.218 (#253) --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 0e40b5ab..3e9b2c68 100644 --- a/pom.xml +++ b/pom.xml @@ -25,7 +25,7 @@ - 1.11.198 + 1.11.218 1.0.392 libsqlite4java ${project.build.directory}/test-lib From cc7e329e2fe1781073abc6d8892204be47480263 Mon Sep 17 00:00:00 2001 From: Justin Pfifer Date: Mon, 23 Oct 2017 12:30:08 -0700 Subject: [PATCH 3/7] Release Note for Release 1.8.6 of the Amazon Kinesis Client for Java (#254) --- META-INF/MANIFEST.MF | 4 ++-- README.md | 16 ++++++++++++++++ pom.xml | 2 +- .../worker/KinesisClientLibConfiguration.java | 2 +- 4 files changed, 20 insertions(+), 4 deletions(-) diff --git a/META-INF/MANIFEST.MF b/META-INF/MANIFEST.MF index 146a18fe..c48417b0 100644 --- a/META-INF/MANIFEST.MF +++ b/META-INF/MANIFEST.MF @@ -2,9 +2,9 @@ Manifest-Version: 1.0 Bundle-ManifestVersion: 2 Bundle-Name: Amazon Kinesis Client Library for Java Bundle-SymbolicName: com.amazonaws.kinesisclientlibrary;singleton:=true -Bundle-Version: 1.8.5 +Bundle-Version: 1.8.6 Bundle-Vendor: Amazon Technologies, Inc -Bundle-RequiredExecutionEnvironment: JavaSE-1.7 +Bundle-RequiredExecutionEnvironment: JavaSE-1.8 Require-Bundle: org.apache.commons.codec;bundle-version="1.6", org.apache.commons.logging;bundle-version="1.1.3";visibility:=reexport, com.fasterxml.jackson.core.jackson-databind;bundle-version="2.5.3", diff --git a/README.md b/README.md index 8191254a..ee1b7f78 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,22 @@ For producer-side developers using the **[Kinesis Producer Library (KPL)][kinesi To make it easier for developers to write record processors in other languages, we have implemented a Java based daemon, called MultiLangDaemon that does all the heavy lifting. Our approach has the daemon spawn a sub-process, which in turn runs the record processor, which can be written in any language. The MultiLangDaemon process and the record processor sub-process communicate with each other over [STDIN and STDOUT using a defined protocol][multi-lang-protocol]. There will be a one to one correspondence amongst record processors, child processes, and shards. For Python developers specifically, we have abstracted these implementation details away and [expose an interface][kclpy] that enables you to focus on writing record processing logic in Python. This approach enables KCL to be language agnostic, while providing identical features and similar parallel processing model across all languages. ## Release Notes +### Release 1.8.6 +* Add prefetching of records from Kinesis + Prefetching will retrieve and queue additional records from Kinesis while the application is processing existing records. + Prefetching can be enabled by setting [`dataFetchingStrategy`](https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java#L1317) to `PREFETCH_CACHED`. Once enabled an additional fetching thread will be started to retrieve records from Kinesis. Retrieved records will be held in a queue until the application is ready to process them. + Pre-fetching supports the following configuration values: + + | Name | Default | Description | + | ---- | ------- | ----------- | + | [`dataFetchingStrategy`](https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java#L1317) | `DEFAULT` | Which data fetching strategy to use | + | [`maxPendingProcessRecordsInput`](https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java#L1296) | 3 | The maximum number of process records input that can be queued | + | [`maxCacheByteSize`](https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java#L1307) | 8 MiB | The maximum number of bytes that can be queued | + | [`maxRecordsCount`](https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java#L1326) | 30,000 | The maximum number of records that can be queued | + | [`idleMillisBetweenCalls`](https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java#L1353) | 1,500 ms | The amount of time to wait between calls to Kinesis | + + * [PR #246](https://github.com/awslabs/amazon-kinesis-client/pull/246) + ### Release 1.8.5 (September 26, 2017) * Only advance the shard iterator for the accepted response. This fixes a race condition in the `KinesisDataFetcher` when it's being used to make asynchronous requests. The shard iterator is now only advanced when the retriever calls `DataFetcherResult#accept()`. diff --git a/pom.xml b/pom.xml index 3e9b2c68..46bba4ca 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ amazon-kinesis-client jar Amazon Kinesis Client Library for Java - 1.8.6-SNAPSHOT + 1.8.6 The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data from Amazon Kinesis. diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index b0ac6bfd..da2610be 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -126,7 +126,7 @@ public class KinesisClientLibConfiguration { /** * User agent set when Amazon Kinesis Client Library makes AWS requests. */ - public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.8.5"; + public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.8.6"; /** * KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls From 73426bd73347ecae0e19626d79fa014d3dc225e7 Mon Sep 17 00:00:00 2001 From: Sahil Palvia Date: Tue, 24 Oct 2017 09:13:19 -0700 Subject: [PATCH 4/7] Don't Sleep for During Retrieval for the BlockingGetRecordsCache The BlockingGetRecordsCache shouldn't sleep when retrieving records as backoff is provided in other parts of the ShardConumer. --- .../lib/worker/BlockingGetRecordsCache.java | 29 ++----------------- .../worker/SimpleRecordsFetcherFactory.java | 3 +- .../worker/BlockingGetRecordsCacheTest.java | 3 +- .../lib/worker/ShardConsumerTest.java | 6 ++-- 4 files changed, 6 insertions(+), 35 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCache.java index d9fc011e..021d886b 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCache.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCache.java @@ -31,15 +31,11 @@ import lombok.extern.apachecommons.CommonsLog; public class BlockingGetRecordsCache implements GetRecordsCache { private final int maxRecordsPerCall; private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; - private final long idleMillisBetweenCalls; - private Instant lastSuccessfulCall; public BlockingGetRecordsCache(final int maxRecordsPerCall, - final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, - final long idleMillisBetweenCalls) { + final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) { this.maxRecordsPerCall = maxRecordsPerCall; this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy; - this.idleMillisBetweenCalls = idleMillisBetweenCalls; } @Override @@ -51,33 +47,12 @@ public class BlockingGetRecordsCache implements GetRecordsCache { @Override public ProcessRecordsInput getNextResult() { - sleepBeforeNextCall(); GetRecordsResult getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall); - lastSuccessfulCall = Instant.now(); - ProcessRecordsInput processRecordsInput = new ProcessRecordsInput() + return new ProcessRecordsInput() .withRecords(getRecordsResult.getRecords()) .withMillisBehindLatest(getRecordsResult.getMillisBehindLatest()); - return processRecordsInput; } - private void sleepBeforeNextCall() { - if (!Thread.interrupted()) { - if (lastSuccessfulCall == null) { - return; - } - long timeSinceLastCall = Duration.between(lastSuccessfulCall, Instant.now()).abs().toMillis(); - if (timeSinceLastCall < idleMillisBetweenCalls) { - try { - Thread.sleep(idleMillisBetweenCalls - timeSinceLastCall); - } catch (InterruptedException e) { - log.info("Thread was interrupted, indicating that shutdown was called."); - } - } - } else { - log.info("Thread has been interrupted, indicating that it is in the shutdown phase."); - } - } - @Override public GetRecordsRetrievalStrategy getGetRecordsRetrievalStrategy() { return getRecordsRetrievalStrategy; diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java index e6c9f3b0..44c93e7b 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java @@ -28,7 +28,6 @@ public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory { private int maxRecordsCount = 30000; private long idleMillisBetweenCalls = 1500L; private DataFetchingStrategy dataFetchingStrategy = DataFetchingStrategy.DEFAULT; - private IMetricsFactory metricsFactory; public SimpleRecordsFetcherFactory(int maxRecords) { this.maxRecords = maxRecords; @@ -37,7 +36,7 @@ public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory { @Override public GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, String shardId, IMetricsFactory metricsFactory) { if(dataFetchingStrategy.equals(DataFetchingStrategy.DEFAULT)) { - return new BlockingGetRecordsCache(maxRecords, getRecordsRetrievalStrategy, idleMillisBetweenCalls); + return new BlockingGetRecordsCache(maxRecords, getRecordsRetrievalStrategy); } else { return new PrefetchGetRecordsCache(maxPendingProcessRecordsInput, maxByteSize, maxRecordsCount, maxRecords, getRecordsRetrievalStrategy, diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCacheTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCacheTest.java index 731c4653..0636baea 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCacheTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCacheTest.java @@ -40,7 +40,6 @@ import com.amazonaws.services.kinesis.model.Record; @RunWith(MockitoJUnitRunner.class) public class BlockingGetRecordsCacheTest { private static final int MAX_RECORDS_PER_COUNT = 10_000; - private static final long IDLE_MILLIS_BETWEEN_CALLS = 500L; @Mock private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; @@ -53,7 +52,7 @@ public class BlockingGetRecordsCacheTest { @Before public void setup() { records = new ArrayList<>(); - blockingGetRecordsCache = new BlockingGetRecordsCache(MAX_RECORDS_PER_COUNT, getRecordsRetrievalStrategy, IDLE_MILLIS_BETWEEN_CALLS); + blockingGetRecordsCache = new BlockingGetRecordsCache(MAX_RECORDS_PER_COUNT, getRecordsRetrievalStrategy); when(getRecordsRetrievalStrategy.getRecords(eq(MAX_RECORDS_PER_COUNT))).thenReturn(getRecordsResult); when(getRecordsResult.getRecords()).thenReturn(records); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java index 0bd2f31a..efb9d43c 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java @@ -339,8 +339,7 @@ public class ShardConsumerTest { KinesisDataFetcher dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo); getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords, - new SynchronousGetRecordsRetrievalStrategy(dataFetcher), - 0L)); + new SynchronousGetRecordsRetrievalStrategy(dataFetcher))); when(recordsFetcherFactory.createRecordsFetcher(any(), anyString(),any())).thenReturn(getRecordsCache); ShardConsumer consumer = @@ -469,8 +468,7 @@ public class ShardConsumerTest { KinesisDataFetcher dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo); getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords, - new SynchronousGetRecordsRetrievalStrategy(dataFetcher), - 0L)); + new SynchronousGetRecordsRetrievalStrategy(dataFetcher))); when(recordsFetcherFactory.createRecordsFetcher(any(), anyString(),any())).thenReturn(getRecordsCache); ShardConsumer consumer = From 821b0cbd0fc315d9eca192454ab0b42af5d89ae6 Mon Sep 17 00:00:00 2001 From: Justin Pfifer Date: Tue, 24 Oct 2017 09:38:49 -0700 Subject: [PATCH 5/7] Release Notes for 1.8.7 of the Amazon Kinesis Client Library for Java (#257) Don't add a delay for synchronous requests to Kinesis Removes a delay that had been added for synchronous GetRecords calls to Kinesis --- META-INF/MANIFEST.MF | 2 +- README.md | 5 +++++ pom.xml | 2 +- .../lib/worker/KinesisClientLibConfiguration.java | 2 +- 4 files changed, 8 insertions(+), 3 deletions(-) diff --git a/META-INF/MANIFEST.MF b/META-INF/MANIFEST.MF index c48417b0..3c6411d8 100644 --- a/META-INF/MANIFEST.MF +++ b/META-INF/MANIFEST.MF @@ -2,7 +2,7 @@ Manifest-Version: 1.0 Bundle-ManifestVersion: 2 Bundle-Name: Amazon Kinesis Client Library for Java Bundle-SymbolicName: com.amazonaws.kinesisclientlibrary;singleton:=true -Bundle-Version: 1.8.6 +Bundle-Version: 1.8.7 Bundle-Vendor: Amazon Technologies, Inc Bundle-RequiredExecutionEnvironment: JavaSE-1.8 Require-Bundle: org.apache.commons.codec;bundle-version="1.6", diff --git a/README.md b/README.md index ee1b7f78..c612845f 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,11 @@ For producer-side developers using the **[Kinesis Producer Library (KPL)][kinesi To make it easier for developers to write record processors in other languages, we have implemented a Java based daemon, called MultiLangDaemon that does all the heavy lifting. Our approach has the daemon spawn a sub-process, which in turn runs the record processor, which can be written in any language. The MultiLangDaemon process and the record processor sub-process communicate with each other over [STDIN and STDOUT using a defined protocol][multi-lang-protocol]. There will be a one to one correspondence amongst record processors, child processes, and shards. For Python developers specifically, we have abstracted these implementation details away and [expose an interface][kclpy] that enables you to focus on writing record processing logic in Python. This approach enables KCL to be language agnostic, while providing identical features and similar parallel processing model across all languages. ## Release Notes +### Release 1.8.7 +* Don't add a delay for synchronous requests to Kinesis + Removes a delay that had been added for synchronous `GetRecords` calls to Kinesis. + * [PR #256](https://github.com/awslabs/amazon-kinesis-client/pull/256) + ### Release 1.8.6 * Add prefetching of records from Kinesis Prefetching will retrieve and queue additional records from Kinesis while the application is processing existing records. diff --git a/pom.xml b/pom.xml index 46bba4ca..88a1a79a 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ amazon-kinesis-client jar Amazon Kinesis Client Library for Java - 1.8.6 + 1.8.7 The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data from Amazon Kinesis. diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index da2610be..a37daa42 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -126,7 +126,7 @@ public class KinesisClientLibConfiguration { /** * User agent set when Amazon Kinesis Client Library makes AWS requests. */ - public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.8.6"; + public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.8.7"; /** * KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls From cbcc898d71e257ca02296e3e2521ac63c4847fd7 Mon Sep 17 00:00:00 2001 From: Sahil Palvia Date: Wed, 25 Oct 2017 08:09:44 -0700 Subject: [PATCH 6/7] Updating version to 1.8.8-SNAPSHOT. (#258) --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 88a1a79a..b07d9ac0 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ amazon-kinesis-client jar Amazon Kinesis Client Library for Java - 1.8.7 + 1.8.8-SNAPSHOT The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data from Amazon Kinesis. From 7032ea67eced2745bba8090405da9fc50859dad7 Mon Sep 17 00:00:00 2001 From: Sahil Palvia Date: Wed, 25 Oct 2017 08:11:20 -0700 Subject: [PATCH 7/7] Spurious update fix (#247) * Handle spurious lease renewal failures gracefully. If the request to conditionally update a lease counter in DynamoDB fails, it's considered a failure to renew the lease. This is a good thing, except if the request failure was just because of connectivity problems. In this case the counter *did* update in DynamoDB, but the Dynamo client retries the request which then fails the update condition (since the lease counter no longer matches expected value). To handle this gracefully we opt to get the lease record from Dynamo and examine the lease owner and counter. If it matches what we were expecting, then we consider renewal a success. --- .../kinesis/leases/impl/LeaseManager.java | 15 +++++- .../leases/impl/LeaseIntegrationTest.java | 19 +++---- .../impl/LeaseManagerIntegrationTest.java | 21 ++++---- .../impl/LeaseRenewerIntegrationTest.java | 49 ++++++++++--------- .../leases/impl/TestHarnessBuilder.java | 33 ++++++++----- 5 files changed, 82 insertions(+), 55 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java index a2bf33a2..9dc2a4a3 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java @@ -19,6 +19,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import com.amazonaws.services.kinesis.leases.util.DynamoUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -386,7 +387,19 @@ public class LeaseManager implements ILeaseManager { + " because the lease counter was not " + lease.getLeaseCounter()); } - return false; + // If we had a spurious retry during the Dynamo update, then this conditional PUT failure + // might be incorrect. So, we get the item straight away and check if the lease owner + lease counter + // are what we expected. + String expectedOwner = lease.getLeaseOwner(); + Long expectedCounter = lease.getLeaseCounter() + 1; + T updatedLease = getLease(lease.getLeaseKey()); + if (updatedLease == null || !expectedOwner.equals(updatedLease.getLeaseOwner()) || + !expectedCounter.equals(updatedLease.getLeaseCounter())) { + return false; + } + + LOG.info("Detected spurious renewal failure for lease with key " + lease.getLeaseKey() + + ", but recovered"); } catch (AmazonClientException e) { throw convertAndRethrowExceptions("renew", lease.getLeaseKey(), e); } diff --git a/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseIntegrationTest.java b/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseIntegrationTest.java index 57a9c99b..e7ff0ebe 100644 --- a/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseIntegrationTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseIntegrationTest.java @@ -1,21 +1,22 @@ /* - * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at * - * http://aws.amazon.com/asl/ + * http://aws.amazon.com/asl/ * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. */ package com.amazonaws.services.kinesis.leases.impl; import java.util.logging.Logger; +import com.amazonaws.services.kinesis.leases.exceptions.LeasingException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.junit.Ignore; diff --git a/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseManagerIntegrationTest.java b/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseManagerIntegrationTest.java index 23cc9fc1..dcaedc38 100644 --- a/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseManagerIntegrationTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseManagerIntegrationTest.java @@ -1,16 +1,16 @@ /* - * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at * - * http://aws.amazon.com/asl/ + * http://aws.amazon.com/asl/ * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. */ package com.amazonaws.services.kinesis.leases.impl; @@ -108,7 +108,8 @@ public class LeaseManagerIntegrationTest extends LeaseIntegrationTest { KinesisClientLease leaseCopy = leaseManager.getLease(lease.getLeaseKey()); - leaseManager.renewLease(lease); + // lose lease + leaseManager.takeLease(lease, "bar"); Assert.assertFalse(leaseManager.renewLease(leaseCopy)); } diff --git a/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseRenewerIntegrationTest.java b/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseRenewerIntegrationTest.java index 9792d006..8ad19d34 100644 --- a/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseRenewerIntegrationTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseRenewerIntegrationTest.java @@ -1,30 +1,29 @@ /* - * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at * - * http://aws.amazon.com/asl/ + * http://aws.amazon.com/asl/ * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. */ package com.amazonaws.services.kinesis.leases.impl; -import java.util.Collections; -import java.util.Map; -import java.util.concurrent.Executors; - -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; import com.amazonaws.services.kinesis.leases.exceptions.LeasingException; import com.amazonaws.services.kinesis.leases.interfaces.ILeaseRenewer; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.Executors; public class LeaseRenewerIntegrationTest extends LeaseIntegrationTest { @@ -58,7 +57,9 @@ public class LeaseRenewerIntegrationTest extends LeaseIntegrationTest { builder.addLeasesToRenew(renewer, "1", "2"); KinesisClientLease renewedLease = builder.renewMutateAssert(renewer, "1", "2").get("2"); - leaseManager.updateLease(renewedLease); + // lose lease 2 + leaseManager.takeLease(renewedLease, "bar"); + builder.renewMutateAssert(renewer, "1"); } @@ -96,9 +97,9 @@ public class LeaseRenewerIntegrationTest extends LeaseIntegrationTest { public void testGetCurrentlyHeldLeases() throws LeasingException { TestHarnessBuilder builder = new TestHarnessBuilder(leaseManager); - KinesisClientLease lease2 = builder.withLease("1", "foo").withLease("2", "foo").build().get("2"); + builder.withLease("1", "foo").withLease("2", "foo").build(); builder.addLeasesToRenew(renewer, "1", "2"); - builder.renewMutateAssert(renewer, "1", "2"); + KinesisClientLease lease2 = builder.renewMutateAssert(renewer, "1", "2").get("2"); // This should be a copy that doesn't get updated Map heldLeases = renewer.getCurrentlyHeldLeases(); @@ -106,7 +107,9 @@ public class LeaseRenewerIntegrationTest extends LeaseIntegrationTest { Assert.assertEquals((Long) 1L, heldLeases.get("1").getLeaseCounter()); Assert.assertEquals((Long) 1L, heldLeases.get("2").getLeaseCounter()); - leaseManager.updateLease(lease2); // lose lease 2 + // lose lease 2 + leaseManager.takeLease(lease2, "bar"); + // Do another renewal and make sure the copy doesn't change builder.renewMutateAssert(renewer, "1"); @@ -176,7 +179,7 @@ public class LeaseRenewerIntegrationTest extends LeaseIntegrationTest { KinesisClientLease lease = renewer.getCurrentlyHeldLease("1"); // cause lease loss such that the renewer knows the lease has been lost when update is called - leaseManager.renewLease(lease); + leaseManager.takeLease(lease, "bar"); builder.renewMutateAssert(renewer); lease.setCheckpoint(new ExtendedSequenceNumber("new checkpoint")); @@ -195,7 +198,7 @@ public class LeaseRenewerIntegrationTest extends LeaseIntegrationTest { KinesisClientLease lease = renewer.getCurrentlyHeldLease("1"); // cause lease loss such that the renewer knows the lease has been lost when update is called - leaseManager.renewLease(lease); + leaseManager.takeLease(lease, "bar"); builder.renewMutateAssert(renewer); // regain the lease diff --git a/src/test/java/com/amazonaws/services/kinesis/leases/impl/TestHarnessBuilder.java b/src/test/java/com/amazonaws/services/kinesis/leases/impl/TestHarnessBuilder.java index 6b6d673c..0dfbb568 100644 --- a/src/test/java/com/amazonaws/services/kinesis/leases/impl/TestHarnessBuilder.java +++ b/src/test/java/com/amazonaws/services/kinesis/leases/impl/TestHarnessBuilder.java @@ -1,16 +1,16 @@ /* - * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at * - * http://aws.amazon.com/asl/ + * http://aws.amazon.com/asl/ * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. */ package com.amazonaws.services.kinesis.leases.impl; @@ -35,6 +35,7 @@ public class TestHarnessBuilder { private Map leases = new HashMap(); private KinesisClientLeaseManager leaseManager; + private Map originalLeases = new HashMap<>(); private Callable timeProvider = new Callable() { @@ -54,6 +55,15 @@ public class TestHarnessBuilder { } public TestHarnessBuilder withLease(String shardId, String owner) { + KinesisClientLease lease = createLease(shardId, owner); + KinesisClientLease originalLease = createLease(shardId, owner); + + leases.put(shardId, lease); + originalLeases.put(shardId, originalLease); + return this; + } + + private KinesisClientLease createLease(String shardId, String owner) { KinesisClientLease lease = new KinesisClientLease(); lease.setCheckpoint(new ExtendedSequenceNumber("checkpoint")); lease.setOwnerSwitchesSinceCheckpoint(0L); @@ -62,8 +72,7 @@ public class TestHarnessBuilder { lease.setParentShardIds(Collections.singleton("parentShardId")); lease.setLeaseKey(shardId); - leases.put(shardId, lease); - return this; + return lease; } public Map build() throws LeasingException { @@ -147,7 +156,7 @@ public class TestHarnessBuilder { Assert.assertEquals(renewedShardIds.length, heldLeases.size()); for (String shardId : renewedShardIds) { - KinesisClientLease original = leases.get(shardId); + KinesisClientLease original = originalLeases.get(shardId); Assert.assertNotNull(original); KinesisClientLease actual = heldLeases.get(shardId);