From 6690812aac9a0b62a419741f0209529414fafb38 Mon Sep 17 00:00:00 2001 From: Chunxue Yang Date: Wed, 6 Nov 2019 15:26:32 -0800 Subject: [PATCH 1/6] Prepare for KCL release 2.2.6 --- CHANGELOG.md | 15 ++++++++++++++ README.md | 20 +++++++++++++++++-- amazon-kinesis-client-multilang/pom.xml | 2 +- amazon-kinesis-client/pom.xml | 2 +- .../kinesis/retrieval/RetrievalConfig.java | 2 +- pom.xml | 2 +- 6 files changed, 37 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 70bc7998..57c875a9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,20 @@ # Changelog +### Release 2.2.6 (November 6, 2019) +[Milestone#43](https://github.com/awslabs/amazon-kinesis-client/milestone/43) +* Updating the SDK version to 2.9.25. + * [PR#638](https://github.com/awslabs/amazon-kinesis-client/pull/638) +* Clearing the local cache on a subscription termination, to avoid noisy logs on new subscriptions. + * [PR#642](https://github.com/awslabs/amazon-kinesis-client/pull/642) +* Updating the SDK version to 2.10.0 in order to fix the premature H2 stream close issue. + * [PR#649](https://github.com/awslabs/amazon-kinesis-client/pull/649) + * NOTE: SDK has a known connection teardown issue when multiple H2 streams are used within a connection. This might result in shard consumers sticking to a stale service host and not progressing. If your shard consumer gets stuck, use the following configuration as a workaround. This configuration might result in up to 5X increase in total connections. + ``` + KinesisAsyncClient kinesisClient = KinesisAsyncClient.builder() + .region(region) + .httpClientBuilder(NettyNioAsyncHttpClient.builder().maxConcurrency(Integer.MAX_VALUE).maxHttp2Streams(1)) + .build() + ``` ### Release 2.2.5 (October 23, 2019) diff --git a/README.md b/README.md index a1ac9f19..562f3280 100644 --- a/README.md +++ b/README.md @@ -60,14 +60,30 @@ The recommended way to use the KCL for Java is to consume it from Maven. ## Release Notes -### Latest Release (2.2.5 - October 23, 2019) +### Latest Release (2.2.6 - November 6, 2019) +[Milestone#43](https://github.com/awslabs/amazon-kinesis-client/milestone/43) +* Updating the SDK version to 2.9.25. + * [PR#638](https://github.com/awslabs/amazon-kinesis-client/pull/638) +* Clearing the local cache on a subscription termination, to avoid noisy logs on new subscriptions. + * [PR#642](https://github.com/awslabs/amazon-kinesis-client/pull/642) +* Updating the SDK version to 2.10.0 in order to fix the premature H2 stream close issue. + * [PR#649](https://github.com/awslabs/amazon-kinesis-client/pull/649) + * NOTE: SDK has a known connection teardown issue when multiple H2 streams are used within a connection. This might result in shard consumers sticking to a stale service host and not progressing. If your shard consumer gets stuck, use the following configuration as a workaround. This configuration might result in up to 5X increase in total connections. + ``` + KinesisAsyncClient kinesisClient = KinesisAsyncClient.builder() + .region(region) + .httpClientBuilder(NettyNioAsyncHttpClient.builder().maxConcurrency(Integer.MAX_VALUE).maxHttp2Streams(1)) + .build() + ``` + +### Related Prior Release (2.2.5 - October 23, 2019) [Milestone#40](https://github.com/awslabs/amazon-kinesis-client/milestone/40) * Updating Sonatype to dedicated AWS endpoint. * [PR#619](https://github.com/awslabs/amazon-kinesis-client/pull/619) * Introducing a validation step to verify if ShardEnd is reached, to prevent shard consumer stuck scenarios in the event of malformed response from service. * [PR#624](https://github.com/awslabs/amazon-kinesis-client/pull/624) -### Latest Release (2.2.4 - September 23, 2019) +### Related Prior Release (2.2.4 - September 23, 2019) [Milestone#39](https://github.com/awslabs/amazon-kinesis-client/milestone/39) * Making FanoutRecordsPublisher test cases resilient to delayed thread operations * [PR#612](https://github.com/awslabs/amazon-kinesis-client/pull/612) diff --git a/amazon-kinesis-client-multilang/pom.xml b/amazon-kinesis-client-multilang/pom.xml index 11a18c36..fb0e2780 100644 --- a/amazon-kinesis-client-multilang/pom.xml +++ b/amazon-kinesis-client-multilang/pom.xml @@ -21,7 +21,7 @@ amazon-kinesis-client-pom software.amazon.kinesis - 2.2.6-SNAPSHOT + 2.2.6 4.0.0 diff --git a/amazon-kinesis-client/pom.xml b/amazon-kinesis-client/pom.xml index 9f6f377f..b6fd20da 100644 --- a/amazon-kinesis-client/pom.xml +++ b/amazon-kinesis-client/pom.xml @@ -22,7 +22,7 @@ software.amazon.kinesis amazon-kinesis-client-pom - 2.2.6-SNAPSHOT + 2.2.6 amazon-kinesis-client diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java index c112804b..0090d69d 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java @@ -34,7 +34,7 @@ public class RetrievalConfig { */ public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java"; - public static final String KINESIS_CLIENT_LIB_USER_AGENT_VERSION = "2.2.6-SNAPSHOT"; + public static final String KINESIS_CLIENT_LIB_USER_AGENT_VERSION = "2.2.6"; /** * Client used to make calls to Kinesis for records retrieval diff --git a/pom.xml b/pom.xml index 099d1493..54962049 100644 --- a/pom.xml +++ b/pom.xml @@ -22,7 +22,7 @@ amazon-kinesis-client-pom pom Amazon Kinesis Client Library - 2.2.6-SNAPSHOT + 2.2.6 The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data from Amazon Kinesis. From 653a25456cbbaa5053774b7f9bde1e418ddc2442 Mon Sep 17 00:00:00 2001 From: Chunxue Yang Date: Thu, 7 Nov 2019 13:37:45 -0800 Subject: [PATCH 2/6] Debug remote unit test failure --- .../amazon/kinesis/lifecycle/ShardConsumerSubscriber.java | 5 ++++- .../kinesis/lifecycle/ShardConsumerSubscriberTest.java | 4 ++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java index 14e347a2..566ed385 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java @@ -80,6 +80,7 @@ class ShardConsumerSubscriber implements Subscriber { lastRequestTime = Instant.now(); if (lastAccepted != null) { recordsPublisher.restartFrom(lastAccepted); + log.warn("Last record was accepted! Record Publisher restarted from the last accepted record."); } Flowable.fromPublisher(recordsPublisher).subscribeOn(scheduler).observeOn(scheduler, true, bufferSize) .subscribe(new ShardConsumerNotifyingSubscriber(this, recordsPublisher)); @@ -90,6 +91,7 @@ class ShardConsumerSubscriber implements Subscriber { Throwable result = restartIfFailed(); if (result == null) { restartIfRequestTimerExpired(maxTimeBetweenRequests); + log.warn("healthCheck result is null, called restartIfRequestTimerExpired"); } return result; } @@ -109,7 +111,7 @@ class ShardConsumerSubscriber implements Subscriber { String logMessage = String.format("%s: Failure occurred in retrieval. Restarting data requests", shardConsumer.shardInfo().shardId()); if (retrievalFailure instanceof RetryableRetrievalException) { - log.debug(logMessage, retrievalFailure.getCause()); + log.info(logMessage, retrievalFailure.getCause()); } else { log.warn(logMessage, retrievalFailure); } @@ -135,6 +137,7 @@ class ShardConsumerSubscriber implements Subscriber { // Start the subscription again which will update the lastRequestTime as well. startSubscriptions(); + log.info("No responses. Called previous subscription and called startSubscriptions."); } } } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java index a4002ab9..d775107a 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java @@ -325,6 +325,7 @@ public class ShardConsumerSubscriberTest { subscriber = new ShardConsumerSubscriber(recordsPublisher, executorService, bufferSize, shardConsumer, 0); addUniqueItem(1); + log.info("recordsPublisher and subscriber is created successfully."); List received = new ArrayList<>(); doAnswer(a -> { ProcessRecordsInput input = a.getArgumentAt(0, ProcessRecordsInput.class); @@ -346,6 +347,7 @@ public class ShardConsumerSubscriberTest { // subscription has not started correctly. verify(shardConsumer, never()).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)), any(Subscription.class)); + log.info("Verified no records were sent back and subscription has not started correctly"); Stream.iterate(2, i -> i + 1).limit(98).forEach(this::addUniqueItem); @@ -353,6 +355,7 @@ public class ShardConsumerSubscriberTest { // Doing the health check to allow the subscription to restart. assertThat(subscriber.healthCheck(1), nullValue()); + log.info("Health check was successful without exceptions"); // Allow time for processing of the records to end in the executor thread which call notifyAll as it gets the // terminal record. Keeping the timeout pretty high for avoiding test failures on slow machines. @@ -363,6 +366,7 @@ public class ShardConsumerSubscriberTest { // Verify that shardConsumer mock was called 100 times and all 100 input records are processed. verify(shardConsumer, times(100)).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)), any(Subscription.class)); + log.info("Verified that handleInput was called 100 times"); // Verify that received records in the subscriber are equal to the ones sent by the record publisher. assertThat(received.size(), equalTo(recordsPublisher.responses.size())); From d940f8e2aa12bffdf5f116b6157fe108fe3740ec Mon Sep 17 00:00:00 2001 From: Chunxue Yang Date: Thu, 7 Nov 2019 13:52:00 -0800 Subject: [PATCH 3/6] Potential Unit test fix --- .../amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java index d775107a..f5ace20c 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java @@ -329,6 +329,7 @@ public class ShardConsumerSubscriberTest { List received = new ArrayList<>(); doAnswer(a -> { ProcessRecordsInput input = a.getArgumentAt(0, ProcessRecordsInput.class); + log.info("Handling input for: {}", input.toString()); received.add(input); if (input.records().stream().anyMatch(r -> StringUtils.startsWith(r.partitionKey(), TERMINAL_MARKER))) { synchronized (processedNotifier) { @@ -364,7 +365,7 @@ public class ShardConsumerSubscriberTest { } // Verify that shardConsumer mock was called 100 times and all 100 input records are processed. - verify(shardConsumer, times(100)).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)), + verify(shardConsumer, times(100)).handleInput(any(ProcessRecordsInput.class), any(Subscription.class)); log.info("Verified that handleInput was called 100 times"); From 5221e1cd5451abfbcf9a13bb91f34b9aab5a22d7 Mon Sep 17 00:00:00 2001 From: Chunxue Yang Date: Thu, 7 Nov 2019 15:32:33 -0800 Subject: [PATCH 4/6] Fixing unit tests --- .../lifecycle/ShardConsumerSubscriber.java | 5 +-- .../ShardConsumerSubscriberTest.java | 35 ++++++------------- 2 files changed, 11 insertions(+), 29 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java index 566ed385..14e347a2 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java @@ -80,7 +80,6 @@ class ShardConsumerSubscriber implements Subscriber { lastRequestTime = Instant.now(); if (lastAccepted != null) { recordsPublisher.restartFrom(lastAccepted); - log.warn("Last record was accepted! Record Publisher restarted from the last accepted record."); } Flowable.fromPublisher(recordsPublisher).subscribeOn(scheduler).observeOn(scheduler, true, bufferSize) .subscribe(new ShardConsumerNotifyingSubscriber(this, recordsPublisher)); @@ -91,7 +90,6 @@ class ShardConsumerSubscriber implements Subscriber { Throwable result = restartIfFailed(); if (result == null) { restartIfRequestTimerExpired(maxTimeBetweenRequests); - log.warn("healthCheck result is null, called restartIfRequestTimerExpired"); } return result; } @@ -111,7 +109,7 @@ class ShardConsumerSubscriber implements Subscriber { String logMessage = String.format("%s: Failure occurred in retrieval. Restarting data requests", shardConsumer.shardInfo().shardId()); if (retrievalFailure instanceof RetryableRetrievalException) { - log.info(logMessage, retrievalFailure.getCause()); + log.debug(logMessage, retrievalFailure.getCause()); } else { log.warn(logMessage, retrievalFailure); } @@ -137,7 +135,6 @@ class ShardConsumerSubscriber implements Subscriber { // Start the subscription again which will update the lastRequestTime as well. startSubscriptions(); - log.info("No responses. Called previous subscription and called startSubscriptions."); } } } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java index f5ace20c..95064f2c 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java @@ -19,7 +19,6 @@ import static org.hamcrest.CoreMatchers.nullValue; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.argThat; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -126,7 +125,7 @@ public class ShardConsumerSubscriberTest { processedNotifier.wait(5000); } - verify(shardConsumer).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)), any(Subscription.class)); + verify(shardConsumer).handleInput(any(ProcessRecordsInput.class), any(Subscription.class)); } @Test @@ -140,8 +139,7 @@ public class ShardConsumerSubscriberTest { processedNotifier.wait(5000); } - verify(shardConsumer, times(100)).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)), - any(Subscription.class)); + verify(shardConsumer, times(100)).handleInput(any(ProcessRecordsInput.class), any(Subscription.class)); } @Test @@ -176,8 +174,7 @@ public class ShardConsumerSubscriberTest { assertThat(subscriber.getAndResetDispatchFailure(), equalTo(testException)); assertThat(subscriber.getAndResetDispatchFailure(), nullValue()); - verify(shardConsumer, times(20)).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)), - any(Subscription.class)); + verify(shardConsumer, times(20)).handleInput(any(ProcessRecordsInput.class), any(Subscription.class)); } @@ -202,8 +199,7 @@ public class ShardConsumerSubscriberTest { Thread.sleep(10); } - verify(shardConsumer, times(10)).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)), - any(Subscription.class)); + verify(shardConsumer, times(10)).handleInput(any(ProcessRecordsInput.class), any(Subscription.class)); assertThat(subscriber.retrievalFailure(), equalTo(expected)); } @@ -239,8 +235,7 @@ public class ShardConsumerSubscriberTest { } assertThat(recordsPublisher.restartedFrom, equalTo(edgeRecord)); - verify(shardConsumer, times(20)).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)), - any(Subscription.class)); + verify(shardConsumer, times(20)).handleInput(any(ProcessRecordsInput.class), any(Subscription.class)); } @Test @@ -304,8 +299,7 @@ public class ShardConsumerSubscriberTest { processedNotifier.wait(5000); } - verify(shardConsumer, times(100)).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)), - any(Subscription.class)); + verify(shardConsumer, times(100)).handleInput(any(ProcessRecordsInput.class), any(Subscription.class)); assertThat(received.size(), equalTo(recordsPublisher.responses.size())); Stream.iterate(0, i -> i + 1).limit(received.size()).forEach(i -> assertThat(received.get(i), @@ -325,11 +319,9 @@ public class ShardConsumerSubscriberTest { subscriber = new ShardConsumerSubscriber(recordsPublisher, executorService, bufferSize, shardConsumer, 0); addUniqueItem(1); - log.info("recordsPublisher and subscriber is created successfully."); List received = new ArrayList<>(); doAnswer(a -> { ProcessRecordsInput input = a.getArgumentAt(0, ProcessRecordsInput.class); - log.info("Handling input for: {}", input.toString()); received.add(input); if (input.records().stream().anyMatch(r -> StringUtils.startsWith(r.partitionKey(), TERMINAL_MARKER))) { synchronized (processedNotifier) { @@ -346,9 +338,7 @@ public class ShardConsumerSubscriberTest { // Verifying that there are no interactions with shardConsumer mock indicating no records were sent back and // subscription has not started correctly. - verify(shardConsumer, never()).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)), - any(Subscription.class)); - log.info("Verified no records were sent back and subscription has not started correctly"); + verify(shardConsumer, never()).handleInput(any(ProcessRecordsInput.class), any(Subscription.class)); Stream.iterate(2, i -> i + 1).limit(98).forEach(this::addUniqueItem); @@ -356,7 +346,6 @@ public class ShardConsumerSubscriberTest { // Doing the health check to allow the subscription to restart. assertThat(subscriber.healthCheck(1), nullValue()); - log.info("Health check was successful without exceptions"); // Allow time for processing of the records to end in the executor thread which call notifyAll as it gets the // terminal record. Keeping the timeout pretty high for avoiding test failures on slow machines. @@ -365,9 +354,7 @@ public class ShardConsumerSubscriberTest { } // Verify that shardConsumer mock was called 100 times and all 100 input records are processed. - verify(shardConsumer, times(100)).handleInput(any(ProcessRecordsInput.class), - any(Subscription.class)); - log.info("Verified that handleInput was called 100 times"); + verify(shardConsumer, times(100)).handleInput(any(ProcessRecordsInput.class), any(Subscription.class)); // Verify that received records in the subscriber are equal to the ones sent by the record publisher. assertThat(received.size(), equalTo(recordsPublisher.responses.size())); @@ -411,8 +398,7 @@ public class ShardConsumerSubscriberTest { // Verifying that there are no interactions with shardConsumer mock indicating no records were sent back and // subscription has not started correctly. - verify(shardConsumer, never()).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)), - any(Subscription.class)); + verify(shardConsumer, never()).handleInput(any(ProcessRecordsInput.class), any(Subscription.class)); Stream.iterate(2, i -> i + 1).limit(98).forEach(this::addUniqueItem); @@ -428,8 +414,7 @@ public class ShardConsumerSubscriberTest { } // Verify that shardConsumer mock was called 100 times and all 100 input records are processed. - verify(shardConsumer, times(100)).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)), - any(Subscription.class)); + verify(shardConsumer, times(100)).handleInput(any(ProcessRecordsInput.class), any(Subscription.class)); // Verify that received records in the subscriber are equal to the ones sent by the record publisher. assertThat(received.size(), equalTo(recordsPublisher.responses.size())); From b1ca4f860ee2f1b545b9c3edc3436e555e983108 Mon Sep 17 00:00:00 2001 From: Chunxue Yang Date: Thu, 7 Nov 2019 16:49:15 -0800 Subject: [PATCH 5/6] Fix unit tests --- .../ShardConsumerSubscriberTest.java | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java index 95064f2c..4d0f01ee 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java @@ -19,6 +19,7 @@ import static org.hamcrest.CoreMatchers.nullValue; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.argThat; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -125,7 +126,7 @@ public class ShardConsumerSubscriberTest { processedNotifier.wait(5000); } - verify(shardConsumer).handleInput(any(ProcessRecordsInput.class), any(Subscription.class)); + verify(shardConsumer).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)), any(Subscription.class)); } @Test @@ -139,7 +140,7 @@ public class ShardConsumerSubscriberTest { processedNotifier.wait(5000); } - verify(shardConsumer, times(100)).handleInput(any(ProcessRecordsInput.class), any(Subscription.class)); + verify(shardConsumer, times(100)).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)), any(Subscription.class)); } @Test @@ -174,7 +175,7 @@ public class ShardConsumerSubscriberTest { assertThat(subscriber.getAndResetDispatchFailure(), equalTo(testException)); assertThat(subscriber.getAndResetDispatchFailure(), nullValue()); - verify(shardConsumer, times(20)).handleInput(any(ProcessRecordsInput.class), any(Subscription.class)); + verify(shardConsumer, times(20)).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)), any(Subscription.class)); } @@ -199,7 +200,7 @@ public class ShardConsumerSubscriberTest { Thread.sleep(10); } - verify(shardConsumer, times(10)).handleInput(any(ProcessRecordsInput.class), any(Subscription.class)); + verify(shardConsumer, times(10)).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)), any(Subscription.class)); assertThat(subscriber.retrievalFailure(), equalTo(expected)); } @@ -235,7 +236,7 @@ public class ShardConsumerSubscriberTest { } assertThat(recordsPublisher.restartedFrom, equalTo(edgeRecord)); - verify(shardConsumer, times(20)).handleInput(any(ProcessRecordsInput.class), any(Subscription.class)); + verify(shardConsumer, times(20)).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)), any(Subscription.class)); } @Test @@ -299,7 +300,7 @@ public class ShardConsumerSubscriberTest { processedNotifier.wait(5000); } - verify(shardConsumer, times(100)).handleInput(any(ProcessRecordsInput.class), any(Subscription.class)); + verify(shardConsumer, times(100)).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)), any(Subscription.class)); assertThat(received.size(), equalTo(recordsPublisher.responses.size())); Stream.iterate(0, i -> i + 1).limit(received.size()).forEach(i -> assertThat(received.get(i), @@ -338,7 +339,7 @@ public class ShardConsumerSubscriberTest { // Verifying that there are no interactions with shardConsumer mock indicating no records were sent back and // subscription has not started correctly. - verify(shardConsumer, never()).handleInput(any(ProcessRecordsInput.class), any(Subscription.class)); + verify(shardConsumer, never()).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)), any(Subscription.class)); Stream.iterate(2, i -> i + 1).limit(98).forEach(this::addUniqueItem); @@ -398,7 +399,7 @@ public class ShardConsumerSubscriberTest { // Verifying that there are no interactions with shardConsumer mock indicating no records were sent back and // subscription has not started correctly. - verify(shardConsumer, never()).handleInput(any(ProcessRecordsInput.class), any(Subscription.class)); + verify(shardConsumer, never()).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)), any(Subscription.class)); Stream.iterate(2, i -> i + 1).limit(98).forEach(this::addUniqueItem); @@ -414,7 +415,7 @@ public class ShardConsumerSubscriberTest { } // Verify that shardConsumer mock was called 100 times and all 100 input records are processed. - verify(shardConsumer, times(100)).handleInput(any(ProcessRecordsInput.class), any(Subscription.class)); + verify(shardConsumer, times(100)).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)), any(Subscription.class)); // Verify that received records in the subscriber are equal to the ones sent by the record publisher. assertThat(received.size(), equalTo(recordsPublisher.responses.size())); From b7775e7cf1b9ed36fdefe93d7578a9550693ae2b Mon Sep 17 00:00:00 2001 From: Chunxue Yang Date: Thu, 7 Nov 2019 17:15:03 -0800 Subject: [PATCH 6/6] Update Release Note --- CHANGELOG.md | 2 +- README.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 57c875a9..07bcb5ca 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,6 @@ # Changelog -### Release 2.2.6 (November 6, 2019) +### Release 2.2.6 (November 7, 2019) [Milestone#43](https://github.com/awslabs/amazon-kinesis-client/milestone/43) * Updating the SDK version to 2.9.25. * [PR#638](https://github.com/awslabs/amazon-kinesis-client/pull/638) diff --git a/README.md b/README.md index 562f3280..3a9dd9e6 100644 --- a/README.md +++ b/README.md @@ -60,7 +60,7 @@ The recommended way to use the KCL for Java is to consume it from Maven. ## Release Notes -### Latest Release (2.2.6 - November 6, 2019) +### Latest Release (2.2.6 - November 7, 2019) [Milestone#43](https://github.com/awslabs/amazon-kinesis-client/milestone/43) * Updating the SDK version to 2.9.25. * [PR#638](https://github.com/awslabs/amazon-kinesis-client/pull/638)