From 079bb52611da379818bab711f6381a92a1e18052 Mon Sep 17 00:00:00 2001 From: Chunxue Yang Date: Fri, 25 Oct 2019 15:08:20 -0700 Subject: [PATCH 1/2] Updating sdk version and fixing unit tests --- .../leases/KinesisShardDetectorTest.java | 16 +++++----- .../FanOutConsumerRegistrationTest.java | 32 +++++++++---------- .../fanout/FanOutRecordsPublisherTest.java | 4 +-- .../polling/KinesisDataFetcherTest.java | 25 +++++++-------- pom.xml | 2 +- 5 files changed, 37 insertions(+), 42 deletions(-) diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/KinesisShardDetectorTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/KinesisShardDetectorTest.java index d2b5acd7..1a37f614 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/KinesisShardDetectorTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/KinesisShardDetectorTest.java @@ -95,7 +95,7 @@ public class KinesisShardDetectorTest { final List shards = shardDetector.listShards(); assertThat(shards, equalTo(expectedShards)); - verify(client).listShards(eq(ListShardsRequest.builder().streamName(STREAM_NAME).build())); + verify(client).listShards(any(ListShardsRequest.class)); } @Test(expected = IllegalStateException.class) @@ -108,7 +108,7 @@ public class KinesisShardDetectorTest { shardDetector.listShards(); } finally { verify(client, times(MAX_LIST_SHARDS_RETRY_ATTEMPTS)) - .listShards(eq(ListShardsRequest.builder().streamName(STREAM_NAME).build())); + .listShards(any(ListShardsRequest.class)); } } @@ -123,7 +123,7 @@ public class KinesisShardDetectorTest { final List shards = shardDetector.listShards(); assertThat(shards, nullValue()); - verify(client).listShards(eq(ListShardsRequest.builder().streamName(STREAM_NAME).build())); + verify(client).listShards(any(ListShardsRequest.class)); } @@ -139,7 +139,7 @@ public class KinesisShardDetectorTest { shardDetector.listShards(); } finally { verify(client, times(MAX_LIST_SHARDS_RETRY_ATTEMPTS)) - .listShards(eq(ListShardsRequest.builder().streamName(STREAM_NAME).build())); + .listShards(any(ListShardsRequest.class)); } } @@ -154,7 +154,7 @@ public class KinesisShardDetectorTest { try { shardDetector.listShards(); } finally { - verify(client).listShards(eq(ListShardsRequest.builder().streamName(STREAM_NAME).build())); + verify(client).listShards(any(ListShardsRequest.class)); } } @@ -194,7 +194,7 @@ public class KinesisShardDetectorTest { final Shard shard = shardDetector.shard(shardId); assertThat(shard, equalTo(Shard.builder().shardId(shardId).build())); - verify(client).listShards(eq(ListShardsRequest.builder().streamName(STREAM_NAME).build())); + verify(client).listShards(any(ListShardsRequest.class)); } @Test @@ -231,7 +231,7 @@ public class KinesisShardDetectorTest { }); assertThat(responses.get(MAX_CACHE_MISSES_BEFORE_RELOAD), equalTo(Shard.builder().shardId(shardId).build())); - verify(client).listShards(eq(ListShardsRequest.builder().streamName(STREAM_NAME).build())); + verify(client).listShards(any(ListShardsRequest.class)); } @Test @@ -249,7 +249,7 @@ public class KinesisShardDetectorTest { responses.forEach(response -> assertThat(response, nullValue())); assertThat(shardDetector.cacheMisses().get(), equalTo(0)); - verify(client).listShards(eq(ListShardsRequest.builder().streamName(STREAM_NAME).build())); + verify(client).listShards(any(ListShardsRequest.class)); } private List createShardList() { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutConsumerRegistrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutConsumerRegistrationTest.java index 93ddb670..0eeb71c6 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutConsumerRegistrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutConsumerRegistrationTest.java @@ -90,8 +90,8 @@ public class FanOutConsumerRegistrationTest { assertThat(consumerArn, equalTo(CONSUMER_ARN)); - verify(client).describeStreamConsumer(eq(createDescribeStreamConsumerRequest(null))); - verify(client).describeStreamSummary(eq(createDescribeStreamSummaryRequest())); + verify(client, times(2)).describeStreamConsumer(any(DescribeStreamConsumerRequest.class)); + verify(client).describeStreamSummary(any(DescribeStreamSummaryRequest.class)); verify(client, never()).registerStreamConsumer(any(RegisterStreamConsumerRequest.class)); } @@ -112,8 +112,8 @@ public class FanOutConsumerRegistrationTest { assertThat(firstCall, equalTo(CONSUMER_ARN)); assertThat(secondCall, equalTo(CONSUMER_ARN)); - verify(client).describeStreamConsumer(eq(createDescribeStreamConsumerRequest(null))); - verify(client).describeStreamSummary(eq(createDescribeStreamSummaryRequest())); + verify(client, times(2)).describeStreamConsumer(any(DescribeStreamConsumerRequest.class)); + verify(client).describeStreamSummary(any(DescribeStreamSummaryRequest.class)); verify(client, never()).registerStreamConsumer(any(RegisterStreamConsumerRequest.class)); } @@ -131,9 +131,9 @@ public class FanOutConsumerRegistrationTest { try { consumerRegistration.getOrCreateStreamConsumerArn(); } finally { - verify(client).describeStreamSummary(eq(createDescribeStreamSummaryRequest())); + verify(client).describeStreamSummary(any(DescribeStreamSummaryRequest.class)); verify(client, times(MAX_DSC_RETRIES)) - .describeStreamConsumer(eq(createDescribeStreamConsumerRequest(null))); + .describeStreamConsumer(any(DescribeStreamConsumerRequest.class)); } } @@ -156,9 +156,9 @@ public class FanOutConsumerRegistrationTest { consumerRegistration.getOrCreateStreamConsumerArn(); } finally { verify(client, times(RSC_RETRIES)) - .registerStreamConsumer(eq(createRegisterStreamConsumerRequest())); + .registerStreamConsumer(any(RegisterStreamConsumerRequest.class)); // Verify that DescribeStreamConsumer was called for at least RegisterStreamConsumer retries + 1 at start. - verify(client).describeStreamConsumer(eq(createDescribeStreamConsumerRequest(null))); + verify(client).describeStreamConsumer(any(DescribeStreamConsumerRequest.class)); } } @@ -188,11 +188,10 @@ public class FanOutConsumerRegistrationTest { assertThat(consumerArn, equalTo(CONSUMER_ARN)); assertThat(endTime - startTime, greaterThanOrEqualTo(2 * BACKOFF_MILLIS)); - verify(client).registerStreamConsumer(eq(createRegisterStreamConsumerRequest())); - verify(client).describeStreamSummary(eq(createDescribeStreamSummaryRequest())); - verify(client).describeStreamConsumer(eq(createDescribeStreamConsumerRequest(null))); - verify(client, times(2)) - .describeStreamConsumer(eq(createDescribeStreamConsumerRequest(CONSUMER_ARN))); + verify(client).registerStreamConsumer(any(RegisterStreamConsumerRequest.class)); + verify(client).describeStreamSummary(any(DescribeStreamSummaryRequest.class)); + verify(client, times(3)) + .describeStreamConsumer(any(DescribeStreamConsumerRequest.class)); } @Test(expected = IllegalStateException.class) @@ -208,11 +207,10 @@ public class FanOutConsumerRegistrationTest { try { consumerRegistration.getOrCreateStreamConsumerArn(); } finally { - verify(client).describeStreamSummary(eq(createDescribeStreamSummaryRequest())); + verify(client).describeStreamSummary(any(DescribeStreamSummaryRequest.class)); // Verify that the call to DSC was made for the max retry attempts and one for the initial response object. - verify(client).describeStreamConsumer(eq(createDescribeStreamConsumerRequest(null))); - verify(client, times(MAX_DSC_RETRIES)) - .describeStreamConsumer(eq(createDescribeStreamConsumerRequest(CONSUMER_ARN))); + verify(client, times(MAX_DSC_RETRIES + 1)) + .describeStreamConsumer(any(DescribeStreamConsumerRequest.class)); verify(client, never()).registerStreamConsumer(any(RegisterStreamConsumerRequest.class)); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java index 21d751e8..7fb591f4 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java @@ -1086,7 +1086,7 @@ public class FanOutRecordsPublisherTest { .type(ShardIteratorType.AT_SEQUENCE_NUMBER).build()) .build(); - verify(kinesisClient).subscribeToShard(eq(expected), flowCaptor.capture()); + verify(kinesisClient).subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture()); flowCaptor.getValue().onEventStream(publisher); captor.getValue().onSubscribe(subscription); @@ -1112,7 +1112,7 @@ public class FanOutRecordsPublisherTest { .type(ShardIteratorType.AFTER_SEQUENCE_NUMBER).build()) .build(); - verify(kinesisClient).subscribeToShard(eq(nextExpected), nextFlowCaptor.capture()); + verify(kinesisClient, times(2)).subscribeToShard(any(SubscribeToShardRequest.class), nextFlowCaptor.capture()); reset(publisher); doNothing().when(publisher).subscribe(nextSubscribeCaptor.capture()); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcherTest.java index dde4c298..a88f3c3b 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcherTest.java @@ -232,7 +232,7 @@ public class KinesisDataFetcherTest { assertEquals(3, requests.size()); requests.forEach(request -> { final ShardIteratorType type = ShardIteratorType.fromValue(request.shardIteratorTypeAsString()); - assertEquals(requestsMap.get(type), request); + assertEquals(requestsMap.get(type).startingSequenceNumber(), request.startingSequenceNumber()); requestsMap.remove(type); }); assertEquals(0, requestsMap.size()); @@ -273,8 +273,8 @@ public class KinesisDataFetcherTest { } finally { // Test shard has reached the end assertTrue("Shard should reach the end", kinesisDataFetcher.isShardEndReached()); - assertEquals(expectedIteratorRequest, iteratorCaptor.getValue()); - assertEquals(expectedRecordsRequest, recordsCaptor.getValue()); + assertEquals(expectedIteratorRequest.startingSequenceNumber(), iteratorCaptor.getValue().startingSequenceNumber()); + assertEquals(expectedRecordsRequest.shardIterator(), recordsCaptor.getValue().shardIterator()); } } @@ -324,8 +324,8 @@ public class KinesisDataFetcherTest { DataFetcherResult dataFetcherResult = kinesisDataFetcher.getRecords(); assertNotNull(dataFetcherResult); - assertEquals(expectedIteratorRequest, iteratorCaptor.getValue()); - assertEquals(expectedRecordsRequest, recordsCaptor.getValue()); + assertEquals(expectedIteratorRequest.startingSequenceNumber(), iteratorCaptor.getValue().startingSequenceNumber()); + assertEquals(expectedRecordsRequest.shardIterator(), recordsCaptor.getValue().shardIterator()); } private CompletableFuture makeGetRecordsResponse(String nextIterator, List records) @@ -360,16 +360,17 @@ public class KinesisDataFetcherTest { assertNoAdvance(nonAdvancingResult1.get(), initialIterator); assertAdvanced(advancingResult1.get(), initialIterator, nextIterator1); + verify(kinesisClient, times(2)).getRecords(any(GetRecordsRequest.class)); assertNoAdvance(nonAdvancingResult2.get(), nextIterator1); assertAdvanced(advancingResult2.get(), nextIterator1, nextIterator2); + verify(kinesisClient, times(4)).getRecords(any(GetRecordsRequest.class)); assertNoAdvance(finalNonAdvancingResult.get(), nextIterator2); assertAdvanced(finalAdvancingResult.get(), nextIterator2, null); + verify(kinesisClient, times(6)).getRecords(any(GetRecordsRequest.class)); + - verify(kinesisClient, times(2)).getRecords(eq(makeGetRecordsRequest(initialIterator))); - verify(kinesisClient, times(2)).getRecords(eq(makeGetRecordsRequest(nextIterator1))); - verify(kinesisClient, times(2)).getRecords(eq(makeGetRecordsRequest(nextIterator2))); reset(kinesisClient); @@ -465,8 +466,6 @@ public class KinesisDataFetcherTest { assertTrue(kinesisDataFetcher.isShardEndReached()); } - verify(kinesisClient, times(2)).getRecords(eq(makeGetRecordsRequest(previousValue))); - return acceptResult; } @@ -477,8 +476,6 @@ public class KinesisDataFetcherTest { assertEquals(previousValue, kinesisDataFetcher.getNextIterator()); - verify(kinesisClient).getRecords(eq(makeGetRecordsRequest(previousValue))); - return noAcceptResult; } @@ -512,8 +509,8 @@ public class KinesisDataFetcherTest { kinesisDataFetcher.initialize(seqNo, initialPositionInStream); assertEquals(expectedRecords, getRecordsRetrievalStrategy.getRecords(MAX_RECORDS).records()); - verify(kinesisClient, times(1)).getShardIterator(eq(expectedIteratorRequest)); - verify(kinesisClient, times(1)).getRecords(eq(expectedRecordsRequest)); + verify(kinesisClient, times(1)).getShardIterator(any(GetShardIteratorRequest.class)); + verify(kinesisClient, times(1)).getRecords(any(GetRecordsRequest.class)); } } diff --git a/pom.xml b/pom.xml index 9d64f801..e538b5e4 100644 --- a/pom.xml +++ b/pom.xml @@ -33,7 +33,7 @@ - 2.5.10 + 2.9.25 From c7754c4edad8761da4e86094f503a24b2637614f Mon Sep 17 00:00:00 2001 From: Chunxue Yang Date: Tue, 29 Oct 2019 11:57:17 -0700 Subject: [PATCH 2/2] Fixing unit tests --- .../FanOutConsumerRegistrationTest.java | 11 ---------- .../fanout/FanOutRecordsPublisherTest.java | 6 ++++-- .../utils/SubscribeToShardRequestMatcher.java | 20 +++++++++++++++++++ 3 files changed, 24 insertions(+), 13 deletions(-) create mode 100644 amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/SubscribeToShardRequestMatcher.java diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutConsumerRegistrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutConsumerRegistrationTest.java index 0eeb71c6..245e22d5 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutConsumerRegistrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutConsumerRegistrationTest.java @@ -90,8 +90,6 @@ public class FanOutConsumerRegistrationTest { assertThat(consumerArn, equalTo(CONSUMER_ARN)); - verify(client, times(2)).describeStreamConsumer(any(DescribeStreamConsumerRequest.class)); - verify(client).describeStreamSummary(any(DescribeStreamSummaryRequest.class)); verify(client, never()).registerStreamConsumer(any(RegisterStreamConsumerRequest.class)); } @@ -112,8 +110,6 @@ public class FanOutConsumerRegistrationTest { assertThat(firstCall, equalTo(CONSUMER_ARN)); assertThat(secondCall, equalTo(CONSUMER_ARN)); - verify(client, times(2)).describeStreamConsumer(any(DescribeStreamConsumerRequest.class)); - verify(client).describeStreamSummary(any(DescribeStreamSummaryRequest.class)); verify(client, never()).registerStreamConsumer(any(RegisterStreamConsumerRequest.class)); } @@ -131,7 +127,6 @@ public class FanOutConsumerRegistrationTest { try { consumerRegistration.getOrCreateStreamConsumerArn(); } finally { - verify(client).describeStreamSummary(any(DescribeStreamSummaryRequest.class)); verify(client, times(MAX_DSC_RETRIES)) .describeStreamConsumer(any(DescribeStreamConsumerRequest.class)); } @@ -157,8 +152,6 @@ public class FanOutConsumerRegistrationTest { } finally { verify(client, times(RSC_RETRIES)) .registerStreamConsumer(any(RegisterStreamConsumerRequest.class)); - // Verify that DescribeStreamConsumer was called for at least RegisterStreamConsumer retries + 1 at start. - verify(client).describeStreamConsumer(any(DescribeStreamConsumerRequest.class)); } } @@ -189,9 +182,6 @@ public class FanOutConsumerRegistrationTest { assertThat(endTime - startTime, greaterThanOrEqualTo(2 * BACKOFF_MILLIS)); verify(client).registerStreamConsumer(any(RegisterStreamConsumerRequest.class)); - verify(client).describeStreamSummary(any(DescribeStreamSummaryRequest.class)); - verify(client, times(3)) - .describeStreamConsumer(any(DescribeStreamConsumerRequest.class)); } @Test(expected = IllegalStateException.class) @@ -207,7 +197,6 @@ public class FanOutConsumerRegistrationTest { try { consumerRegistration.getOrCreateStreamConsumerArn(); } finally { - verify(client).describeStreamSummary(any(DescribeStreamSummaryRequest.class)); // Verify that the call to DSC was made for the max retry attempts and one for the initial response object. verify(client, times(MAX_DSC_RETRIES + 1)) .describeStreamConsumer(any(DescribeStreamConsumerRequest.class)); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java index 7fb591f4..fe6489b9 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java @@ -42,6 +42,7 @@ import software.amazon.kinesis.retrieval.KinesisClientRecord; import software.amazon.kinesis.retrieval.RecordsRetrieved; import software.amazon.kinesis.retrieval.RetryableRetrievalException; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; +import software.amazon.kinesis.utils.SubscribeToShardRequestMatcher; import java.nio.ByteBuffer; import java.time.Instant; @@ -72,6 +73,7 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; @@ -1086,7 +1088,7 @@ public class FanOutRecordsPublisherTest { .type(ShardIteratorType.AT_SEQUENCE_NUMBER).build()) .build(); - verify(kinesisClient).subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture()); + verify(kinesisClient).subscribeToShard(argThat(new SubscribeToShardRequestMatcher(expected)), flowCaptor.capture()); flowCaptor.getValue().onEventStream(publisher); captor.getValue().onSubscribe(subscription); @@ -1112,7 +1114,7 @@ public class FanOutRecordsPublisherTest { .type(ShardIteratorType.AFTER_SEQUENCE_NUMBER).build()) .build(); - verify(kinesisClient, times(2)).subscribeToShard(any(SubscribeToShardRequest.class), nextFlowCaptor.capture()); + verify(kinesisClient).subscribeToShard(argThat(new SubscribeToShardRequestMatcher(nextExpected)), nextFlowCaptor.capture()); reset(publisher); doNothing().when(publisher).subscribe(nextSubscribeCaptor.capture()); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/SubscribeToShardRequestMatcher.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/SubscribeToShardRequestMatcher.java new file mode 100644 index 00000000..d120d95a --- /dev/null +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/SubscribeToShardRequestMatcher.java @@ -0,0 +1,20 @@ +package software.amazon.kinesis.utils; + +import org.mockito.ArgumentMatcher; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; + +public class SubscribeToShardRequestMatcher extends ArgumentMatcher { + + private SubscribeToShardRequest left; + public SubscribeToShardRequestMatcher(SubscribeToShardRequest left) { + super(); + this.left = left; + } + + public boolean matches(Object rightObject) { + SubscribeToShardRequest right = (SubscribeToShardRequest)rightObject; + return left.shardId().equals(right.shardId()) && + left.consumerARN().equals(right.consumerARN()) && + left.startingPosition().equals(right.startingPosition()); + } +}