Merge pull request #638 from ychunxue/master

Updating sdk version and fixing unit tests
This commit is contained in:
ychunxue 2019-10-30 14:05:03 -07:00 committed by GitHub
commit c696bc1862
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 49 additions and 43 deletions

View file

@ -95,7 +95,7 @@ public class KinesisShardDetectorTest {
final List<Shard> shards = shardDetector.listShards(); final List<Shard> shards = shardDetector.listShards();
assertThat(shards, equalTo(expectedShards)); assertThat(shards, equalTo(expectedShards));
verify(client).listShards(eq(ListShardsRequest.builder().streamName(STREAM_NAME).build())); verify(client).listShards(any(ListShardsRequest.class));
} }
@Test(expected = IllegalStateException.class) @Test(expected = IllegalStateException.class)
@ -108,7 +108,7 @@ public class KinesisShardDetectorTest {
shardDetector.listShards(); shardDetector.listShards();
} finally { } finally {
verify(client, times(MAX_LIST_SHARDS_RETRY_ATTEMPTS)) verify(client, times(MAX_LIST_SHARDS_RETRY_ATTEMPTS))
.listShards(eq(ListShardsRequest.builder().streamName(STREAM_NAME).build())); .listShards(any(ListShardsRequest.class));
} }
} }
@ -123,7 +123,7 @@ public class KinesisShardDetectorTest {
final List<Shard> shards = shardDetector.listShards(); final List<Shard> shards = shardDetector.listShards();
assertThat(shards, nullValue()); assertThat(shards, nullValue());
verify(client).listShards(eq(ListShardsRequest.builder().streamName(STREAM_NAME).build())); verify(client).listShards(any(ListShardsRequest.class));
} }
@ -139,7 +139,7 @@ public class KinesisShardDetectorTest {
shardDetector.listShards(); shardDetector.listShards();
} finally { } finally {
verify(client, times(MAX_LIST_SHARDS_RETRY_ATTEMPTS)) verify(client, times(MAX_LIST_SHARDS_RETRY_ATTEMPTS))
.listShards(eq(ListShardsRequest.builder().streamName(STREAM_NAME).build())); .listShards(any(ListShardsRequest.class));
} }
} }
@ -154,7 +154,7 @@ public class KinesisShardDetectorTest {
try { try {
shardDetector.listShards(); shardDetector.listShards();
} finally { } finally {
verify(client).listShards(eq(ListShardsRequest.builder().streamName(STREAM_NAME).build())); verify(client).listShards(any(ListShardsRequest.class));
} }
} }
@ -194,7 +194,7 @@ public class KinesisShardDetectorTest {
final Shard shard = shardDetector.shard(shardId); final Shard shard = shardDetector.shard(shardId);
assertThat(shard, equalTo(Shard.builder().shardId(shardId).build())); assertThat(shard, equalTo(Shard.builder().shardId(shardId).build()));
verify(client).listShards(eq(ListShardsRequest.builder().streamName(STREAM_NAME).build())); verify(client).listShards(any(ListShardsRequest.class));
} }
@Test @Test
@ -231,7 +231,7 @@ public class KinesisShardDetectorTest {
}); });
assertThat(responses.get(MAX_CACHE_MISSES_BEFORE_RELOAD), equalTo(Shard.builder().shardId(shardId).build())); assertThat(responses.get(MAX_CACHE_MISSES_BEFORE_RELOAD), equalTo(Shard.builder().shardId(shardId).build()));
verify(client).listShards(eq(ListShardsRequest.builder().streamName(STREAM_NAME).build())); verify(client).listShards(any(ListShardsRequest.class));
} }
@Test @Test
@ -249,7 +249,7 @@ public class KinesisShardDetectorTest {
responses.forEach(response -> assertThat(response, nullValue())); responses.forEach(response -> assertThat(response, nullValue()));
assertThat(shardDetector.cacheMisses().get(), equalTo(0)); assertThat(shardDetector.cacheMisses().get(), equalTo(0));
verify(client).listShards(eq(ListShardsRequest.builder().streamName(STREAM_NAME).build())); verify(client).listShards(any(ListShardsRequest.class));
} }
private List<Shard> createShardList() { private List<Shard> createShardList() {

View file

@ -90,8 +90,6 @@ public class FanOutConsumerRegistrationTest {
assertThat(consumerArn, equalTo(CONSUMER_ARN)); assertThat(consumerArn, equalTo(CONSUMER_ARN));
verify(client).describeStreamConsumer(eq(createDescribeStreamConsumerRequest(null)));
verify(client).describeStreamSummary(eq(createDescribeStreamSummaryRequest()));
verify(client, never()).registerStreamConsumer(any(RegisterStreamConsumerRequest.class)); verify(client, never()).registerStreamConsumer(any(RegisterStreamConsumerRequest.class));
} }
@ -112,8 +110,6 @@ public class FanOutConsumerRegistrationTest {
assertThat(firstCall, equalTo(CONSUMER_ARN)); assertThat(firstCall, equalTo(CONSUMER_ARN));
assertThat(secondCall, equalTo(CONSUMER_ARN)); assertThat(secondCall, equalTo(CONSUMER_ARN));
verify(client).describeStreamConsumer(eq(createDescribeStreamConsumerRequest(null)));
verify(client).describeStreamSummary(eq(createDescribeStreamSummaryRequest()));
verify(client, never()).registerStreamConsumer(any(RegisterStreamConsumerRequest.class)); verify(client, never()).registerStreamConsumer(any(RegisterStreamConsumerRequest.class));
} }
@ -131,9 +127,8 @@ public class FanOutConsumerRegistrationTest {
try { try {
consumerRegistration.getOrCreateStreamConsumerArn(); consumerRegistration.getOrCreateStreamConsumerArn();
} finally { } finally {
verify(client).describeStreamSummary(eq(createDescribeStreamSummaryRequest()));
verify(client, times(MAX_DSC_RETRIES)) verify(client, times(MAX_DSC_RETRIES))
.describeStreamConsumer(eq(createDescribeStreamConsumerRequest(null))); .describeStreamConsumer(any(DescribeStreamConsumerRequest.class));
} }
} }
@ -156,9 +151,7 @@ public class FanOutConsumerRegistrationTest {
consumerRegistration.getOrCreateStreamConsumerArn(); consumerRegistration.getOrCreateStreamConsumerArn();
} finally { } finally {
verify(client, times(RSC_RETRIES)) verify(client, times(RSC_RETRIES))
.registerStreamConsumer(eq(createRegisterStreamConsumerRequest())); .registerStreamConsumer(any(RegisterStreamConsumerRequest.class));
// Verify that DescribeStreamConsumer was called for at least RegisterStreamConsumer retries + 1 at start.
verify(client).describeStreamConsumer(eq(createDescribeStreamConsumerRequest(null)));
} }
} }
@ -188,11 +181,7 @@ public class FanOutConsumerRegistrationTest {
assertThat(consumerArn, equalTo(CONSUMER_ARN)); assertThat(consumerArn, equalTo(CONSUMER_ARN));
assertThat(endTime - startTime, greaterThanOrEqualTo(2 * BACKOFF_MILLIS)); assertThat(endTime - startTime, greaterThanOrEqualTo(2 * BACKOFF_MILLIS));
verify(client).registerStreamConsumer(eq(createRegisterStreamConsumerRequest())); verify(client).registerStreamConsumer(any(RegisterStreamConsumerRequest.class));
verify(client).describeStreamSummary(eq(createDescribeStreamSummaryRequest()));
verify(client).describeStreamConsumer(eq(createDescribeStreamConsumerRequest(null)));
verify(client, times(2))
.describeStreamConsumer(eq(createDescribeStreamConsumerRequest(CONSUMER_ARN)));
} }
@Test(expected = IllegalStateException.class) @Test(expected = IllegalStateException.class)
@ -208,11 +197,9 @@ public class FanOutConsumerRegistrationTest {
try { try {
consumerRegistration.getOrCreateStreamConsumerArn(); consumerRegistration.getOrCreateStreamConsumerArn();
} finally { } finally {
verify(client).describeStreamSummary(eq(createDescribeStreamSummaryRequest()));
// Verify that the call to DSC was made for the max retry attempts and one for the initial response object. // Verify that the call to DSC was made for the max retry attempts and one for the initial response object.
verify(client).describeStreamConsumer(eq(createDescribeStreamConsumerRequest(null))); verify(client, times(MAX_DSC_RETRIES + 1))
verify(client, times(MAX_DSC_RETRIES)) .describeStreamConsumer(any(DescribeStreamConsumerRequest.class));
.describeStreamConsumer(eq(createDescribeStreamConsumerRequest(CONSUMER_ARN)));
verify(client, never()).registerStreamConsumer(any(RegisterStreamConsumerRequest.class)); verify(client, never()).registerStreamConsumer(any(RegisterStreamConsumerRequest.class));
} }

View file

@ -42,6 +42,7 @@ import software.amazon.kinesis.retrieval.KinesisClientRecord;
import software.amazon.kinesis.retrieval.RecordsRetrieved; import software.amazon.kinesis.retrieval.RecordsRetrieved;
import software.amazon.kinesis.retrieval.RetryableRetrievalException; import software.amazon.kinesis.retrieval.RetryableRetrievalException;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import software.amazon.kinesis.utils.SubscribeToShardRequestMatcher;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.time.Instant; import java.time.Instant;
@ -72,6 +73,7 @@ import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doNothing;
@ -1086,7 +1088,7 @@ public class FanOutRecordsPublisherTest {
.type(ShardIteratorType.AT_SEQUENCE_NUMBER).build()) .type(ShardIteratorType.AT_SEQUENCE_NUMBER).build())
.build(); .build();
verify(kinesisClient).subscribeToShard(eq(expected), flowCaptor.capture()); verify(kinesisClient).subscribeToShard(argThat(new SubscribeToShardRequestMatcher(expected)), flowCaptor.capture());
flowCaptor.getValue().onEventStream(publisher); flowCaptor.getValue().onEventStream(publisher);
captor.getValue().onSubscribe(subscription); captor.getValue().onSubscribe(subscription);
@ -1112,7 +1114,7 @@ public class FanOutRecordsPublisherTest {
.type(ShardIteratorType.AFTER_SEQUENCE_NUMBER).build()) .type(ShardIteratorType.AFTER_SEQUENCE_NUMBER).build())
.build(); .build();
verify(kinesisClient).subscribeToShard(eq(nextExpected), nextFlowCaptor.capture()); verify(kinesisClient).subscribeToShard(argThat(new SubscribeToShardRequestMatcher(nextExpected)), nextFlowCaptor.capture());
reset(publisher); reset(publisher);
doNothing().when(publisher).subscribe(nextSubscribeCaptor.capture()); doNothing().when(publisher).subscribe(nextSubscribeCaptor.capture());

View file

@ -232,7 +232,7 @@ public class KinesisDataFetcherTest {
assertEquals(3, requests.size()); assertEquals(3, requests.size());
requests.forEach(request -> { requests.forEach(request -> {
final ShardIteratorType type = ShardIteratorType.fromValue(request.shardIteratorTypeAsString()); final ShardIteratorType type = ShardIteratorType.fromValue(request.shardIteratorTypeAsString());
assertEquals(requestsMap.get(type), request); assertEquals(requestsMap.get(type).startingSequenceNumber(), request.startingSequenceNumber());
requestsMap.remove(type); requestsMap.remove(type);
}); });
assertEquals(0, requestsMap.size()); assertEquals(0, requestsMap.size());
@ -273,8 +273,8 @@ public class KinesisDataFetcherTest {
} finally { } finally {
// Test shard has reached the end // Test shard has reached the end
assertTrue("Shard should reach the end", kinesisDataFetcher.isShardEndReached()); assertTrue("Shard should reach the end", kinesisDataFetcher.isShardEndReached());
assertEquals(expectedIteratorRequest, iteratorCaptor.getValue()); assertEquals(expectedIteratorRequest.startingSequenceNumber(), iteratorCaptor.getValue().startingSequenceNumber());
assertEquals(expectedRecordsRequest, recordsCaptor.getValue()); assertEquals(expectedRecordsRequest.shardIterator(), recordsCaptor.getValue().shardIterator());
} }
} }
@ -324,8 +324,8 @@ public class KinesisDataFetcherTest {
DataFetcherResult dataFetcherResult = kinesisDataFetcher.getRecords(); DataFetcherResult dataFetcherResult = kinesisDataFetcher.getRecords();
assertNotNull(dataFetcherResult); assertNotNull(dataFetcherResult);
assertEquals(expectedIteratorRequest, iteratorCaptor.getValue()); assertEquals(expectedIteratorRequest.startingSequenceNumber(), iteratorCaptor.getValue().startingSequenceNumber());
assertEquals(expectedRecordsRequest, recordsCaptor.getValue()); assertEquals(expectedRecordsRequest.shardIterator(), recordsCaptor.getValue().shardIterator());
} }
private CompletableFuture<GetRecordsResponse> makeGetRecordsResponse(String nextIterator, List<Record> records) private CompletableFuture<GetRecordsResponse> makeGetRecordsResponse(String nextIterator, List<Record> records)
@ -360,16 +360,17 @@ public class KinesisDataFetcherTest {
assertNoAdvance(nonAdvancingResult1.get(), initialIterator); assertNoAdvance(nonAdvancingResult1.get(), initialIterator);
assertAdvanced(advancingResult1.get(), initialIterator, nextIterator1); assertAdvanced(advancingResult1.get(), initialIterator, nextIterator1);
verify(kinesisClient, times(2)).getRecords(any(GetRecordsRequest.class));
assertNoAdvance(nonAdvancingResult2.get(), nextIterator1); assertNoAdvance(nonAdvancingResult2.get(), nextIterator1);
assertAdvanced(advancingResult2.get(), nextIterator1, nextIterator2); assertAdvanced(advancingResult2.get(), nextIterator1, nextIterator2);
verify(kinesisClient, times(4)).getRecords(any(GetRecordsRequest.class));
assertNoAdvance(finalNonAdvancingResult.get(), nextIterator2); assertNoAdvance(finalNonAdvancingResult.get(), nextIterator2);
assertAdvanced(finalAdvancingResult.get(), nextIterator2, null); assertAdvanced(finalAdvancingResult.get(), nextIterator2, null);
verify(kinesisClient, times(6)).getRecords(any(GetRecordsRequest.class));
verify(kinesisClient, times(2)).getRecords(eq(makeGetRecordsRequest(initialIterator)));
verify(kinesisClient, times(2)).getRecords(eq(makeGetRecordsRequest(nextIterator1)));
verify(kinesisClient, times(2)).getRecords(eq(makeGetRecordsRequest(nextIterator2)));
reset(kinesisClient); reset(kinesisClient);
@ -465,8 +466,6 @@ public class KinesisDataFetcherTest {
assertTrue(kinesisDataFetcher.isShardEndReached()); assertTrue(kinesisDataFetcher.isShardEndReached());
} }
verify(kinesisClient, times(2)).getRecords(eq(makeGetRecordsRequest(previousValue)));
return acceptResult; return acceptResult;
} }
@ -477,8 +476,6 @@ public class KinesisDataFetcherTest {
assertEquals(previousValue, kinesisDataFetcher.getNextIterator()); assertEquals(previousValue, kinesisDataFetcher.getNextIterator());
verify(kinesisClient).getRecords(eq(makeGetRecordsRequest(previousValue)));
return noAcceptResult; return noAcceptResult;
} }
@ -512,8 +509,8 @@ public class KinesisDataFetcherTest {
kinesisDataFetcher.initialize(seqNo, initialPositionInStream); kinesisDataFetcher.initialize(seqNo, initialPositionInStream);
assertEquals(expectedRecords, getRecordsRetrievalStrategy.getRecords(MAX_RECORDS).records()); assertEquals(expectedRecords, getRecordsRetrievalStrategy.getRecords(MAX_RECORDS).records());
verify(kinesisClient, times(1)).getShardIterator(eq(expectedIteratorRequest)); verify(kinesisClient, times(1)).getShardIterator(any(GetShardIteratorRequest.class));
verify(kinesisClient, times(1)).getRecords(eq(expectedRecordsRequest)); verify(kinesisClient, times(1)).getRecords(any(GetRecordsRequest.class));
} }
} }

View file

@ -0,0 +1,20 @@
package software.amazon.kinesis.utils;
import org.mockito.ArgumentMatcher;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
public class SubscribeToShardRequestMatcher extends ArgumentMatcher<SubscribeToShardRequest> {
private SubscribeToShardRequest left;
public SubscribeToShardRequestMatcher(SubscribeToShardRequest left) {
super();
this.left = left;
}
public boolean matches(Object rightObject) {
SubscribeToShardRequest right = (SubscribeToShardRequest)rightObject;
return left.shardId().equals(right.shardId()) &&
left.consumerARN().equals(right.consumerARN()) &&
left.startingPosition().equals(right.startingPosition());
}
}

View file

@ -33,7 +33,7 @@
</scm> </scm>
<properties> <properties>
<awssdk.version>2.5.10</awssdk.version> <awssdk.version>2.9.25</awssdk.version>
</properties> </properties>
<licenses> <licenses>