Updating sdk version and fixing unit tests

This commit is contained in:
Chunxue Yang 2019-10-25 15:08:20 -07:00
parent 6dc25fbc99
commit 079bb52611
5 changed files with 37 additions and 42 deletions

View file

@ -95,7 +95,7 @@ public class KinesisShardDetectorTest {
final List<Shard> shards = shardDetector.listShards();
assertThat(shards, equalTo(expectedShards));
verify(client).listShards(eq(ListShardsRequest.builder().streamName(STREAM_NAME).build()));
verify(client).listShards(any(ListShardsRequest.class));
}
@Test(expected = IllegalStateException.class)
@ -108,7 +108,7 @@ public class KinesisShardDetectorTest {
shardDetector.listShards();
} finally {
verify(client, times(MAX_LIST_SHARDS_RETRY_ATTEMPTS))
.listShards(eq(ListShardsRequest.builder().streamName(STREAM_NAME).build()));
.listShards(any(ListShardsRequest.class));
}
}
@ -123,7 +123,7 @@ public class KinesisShardDetectorTest {
final List<Shard> shards = shardDetector.listShards();
assertThat(shards, nullValue());
verify(client).listShards(eq(ListShardsRequest.builder().streamName(STREAM_NAME).build()));
verify(client).listShards(any(ListShardsRequest.class));
}
@ -139,7 +139,7 @@ public class KinesisShardDetectorTest {
shardDetector.listShards();
} finally {
verify(client, times(MAX_LIST_SHARDS_RETRY_ATTEMPTS))
.listShards(eq(ListShardsRequest.builder().streamName(STREAM_NAME).build()));
.listShards(any(ListShardsRequest.class));
}
}
@ -154,7 +154,7 @@ public class KinesisShardDetectorTest {
try {
shardDetector.listShards();
} finally {
verify(client).listShards(eq(ListShardsRequest.builder().streamName(STREAM_NAME).build()));
verify(client).listShards(any(ListShardsRequest.class));
}
}
@ -194,7 +194,7 @@ public class KinesisShardDetectorTest {
final Shard shard = shardDetector.shard(shardId);
assertThat(shard, equalTo(Shard.builder().shardId(shardId).build()));
verify(client).listShards(eq(ListShardsRequest.builder().streamName(STREAM_NAME).build()));
verify(client).listShards(any(ListShardsRequest.class));
}
@Test
@ -231,7 +231,7 @@ public class KinesisShardDetectorTest {
});
assertThat(responses.get(MAX_CACHE_MISSES_BEFORE_RELOAD), equalTo(Shard.builder().shardId(shardId).build()));
verify(client).listShards(eq(ListShardsRequest.builder().streamName(STREAM_NAME).build()));
verify(client).listShards(any(ListShardsRequest.class));
}
@Test
@ -249,7 +249,7 @@ public class KinesisShardDetectorTest {
responses.forEach(response -> assertThat(response, nullValue()));
assertThat(shardDetector.cacheMisses().get(), equalTo(0));
verify(client).listShards(eq(ListShardsRequest.builder().streamName(STREAM_NAME).build()));
verify(client).listShards(any(ListShardsRequest.class));
}
private List<Shard> createShardList() {

View file

@ -90,8 +90,8 @@ public class FanOutConsumerRegistrationTest {
assertThat(consumerArn, equalTo(CONSUMER_ARN));
verify(client).describeStreamConsumer(eq(createDescribeStreamConsumerRequest(null)));
verify(client).describeStreamSummary(eq(createDescribeStreamSummaryRequest()));
verify(client, times(2)).describeStreamConsumer(any(DescribeStreamConsumerRequest.class));
verify(client).describeStreamSummary(any(DescribeStreamSummaryRequest.class));
verify(client, never()).registerStreamConsumer(any(RegisterStreamConsumerRequest.class));
}
@ -112,8 +112,8 @@ public class FanOutConsumerRegistrationTest {
assertThat(firstCall, equalTo(CONSUMER_ARN));
assertThat(secondCall, equalTo(CONSUMER_ARN));
verify(client).describeStreamConsumer(eq(createDescribeStreamConsumerRequest(null)));
verify(client).describeStreamSummary(eq(createDescribeStreamSummaryRequest()));
verify(client, times(2)).describeStreamConsumer(any(DescribeStreamConsumerRequest.class));
verify(client).describeStreamSummary(any(DescribeStreamSummaryRequest.class));
verify(client, never()).registerStreamConsumer(any(RegisterStreamConsumerRequest.class));
}
@ -131,9 +131,9 @@ public class FanOutConsumerRegistrationTest {
try {
consumerRegistration.getOrCreateStreamConsumerArn();
} finally {
verify(client).describeStreamSummary(eq(createDescribeStreamSummaryRequest()));
verify(client).describeStreamSummary(any(DescribeStreamSummaryRequest.class));
verify(client, times(MAX_DSC_RETRIES))
.describeStreamConsumer(eq(createDescribeStreamConsumerRequest(null)));
.describeStreamConsumer(any(DescribeStreamConsumerRequest.class));
}
}
@ -156,9 +156,9 @@ public class FanOutConsumerRegistrationTest {
consumerRegistration.getOrCreateStreamConsumerArn();
} finally {
verify(client, times(RSC_RETRIES))
.registerStreamConsumer(eq(createRegisterStreamConsumerRequest()));
.registerStreamConsumer(any(RegisterStreamConsumerRequest.class));
// Verify that DescribeStreamConsumer was called for at least RegisterStreamConsumer retries + 1 at start.
verify(client).describeStreamConsumer(eq(createDescribeStreamConsumerRequest(null)));
verify(client).describeStreamConsumer(any(DescribeStreamConsumerRequest.class));
}
}
@ -188,11 +188,10 @@ public class FanOutConsumerRegistrationTest {
assertThat(consumerArn, equalTo(CONSUMER_ARN));
assertThat(endTime - startTime, greaterThanOrEqualTo(2 * BACKOFF_MILLIS));
verify(client).registerStreamConsumer(eq(createRegisterStreamConsumerRequest()));
verify(client).describeStreamSummary(eq(createDescribeStreamSummaryRequest()));
verify(client).describeStreamConsumer(eq(createDescribeStreamConsumerRequest(null)));
verify(client, times(2))
.describeStreamConsumer(eq(createDescribeStreamConsumerRequest(CONSUMER_ARN)));
verify(client).registerStreamConsumer(any(RegisterStreamConsumerRequest.class));
verify(client).describeStreamSummary(any(DescribeStreamSummaryRequest.class));
verify(client, times(3))
.describeStreamConsumer(any(DescribeStreamConsumerRequest.class));
}
@Test(expected = IllegalStateException.class)
@ -208,11 +207,10 @@ public class FanOutConsumerRegistrationTest {
try {
consumerRegistration.getOrCreateStreamConsumerArn();
} finally {
verify(client).describeStreamSummary(eq(createDescribeStreamSummaryRequest()));
verify(client).describeStreamSummary(any(DescribeStreamSummaryRequest.class));
// Verify that the call to DSC was made for the max retry attempts and one for the initial response object.
verify(client).describeStreamConsumer(eq(createDescribeStreamConsumerRequest(null)));
verify(client, times(MAX_DSC_RETRIES))
.describeStreamConsumer(eq(createDescribeStreamConsumerRequest(CONSUMER_ARN)));
verify(client, times(MAX_DSC_RETRIES + 1))
.describeStreamConsumer(any(DescribeStreamConsumerRequest.class));
verify(client, never()).registerStreamConsumer(any(RegisterStreamConsumerRequest.class));
}

View file

@ -1086,7 +1086,7 @@ public class FanOutRecordsPublisherTest {
.type(ShardIteratorType.AT_SEQUENCE_NUMBER).build())
.build();
verify(kinesisClient).subscribeToShard(eq(expected), flowCaptor.capture());
verify(kinesisClient).subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture());
flowCaptor.getValue().onEventStream(publisher);
captor.getValue().onSubscribe(subscription);
@ -1112,7 +1112,7 @@ public class FanOutRecordsPublisherTest {
.type(ShardIteratorType.AFTER_SEQUENCE_NUMBER).build())
.build();
verify(kinesisClient).subscribeToShard(eq(nextExpected), nextFlowCaptor.capture());
verify(kinesisClient, times(2)).subscribeToShard(any(SubscribeToShardRequest.class), nextFlowCaptor.capture());
reset(publisher);
doNothing().when(publisher).subscribe(nextSubscribeCaptor.capture());

View file

@ -232,7 +232,7 @@ public class KinesisDataFetcherTest {
assertEquals(3, requests.size());
requests.forEach(request -> {
final ShardIteratorType type = ShardIteratorType.fromValue(request.shardIteratorTypeAsString());
assertEquals(requestsMap.get(type), request);
assertEquals(requestsMap.get(type).startingSequenceNumber(), request.startingSequenceNumber());
requestsMap.remove(type);
});
assertEquals(0, requestsMap.size());
@ -273,8 +273,8 @@ public class KinesisDataFetcherTest {
} finally {
// Test shard has reached the end
assertTrue("Shard should reach the end", kinesisDataFetcher.isShardEndReached());
assertEquals(expectedIteratorRequest, iteratorCaptor.getValue());
assertEquals(expectedRecordsRequest, recordsCaptor.getValue());
assertEquals(expectedIteratorRequest.startingSequenceNumber(), iteratorCaptor.getValue().startingSequenceNumber());
assertEquals(expectedRecordsRequest.shardIterator(), recordsCaptor.getValue().shardIterator());
}
}
@ -324,8 +324,8 @@ public class KinesisDataFetcherTest {
DataFetcherResult dataFetcherResult = kinesisDataFetcher.getRecords();
assertNotNull(dataFetcherResult);
assertEquals(expectedIteratorRequest, iteratorCaptor.getValue());
assertEquals(expectedRecordsRequest, recordsCaptor.getValue());
assertEquals(expectedIteratorRequest.startingSequenceNumber(), iteratorCaptor.getValue().startingSequenceNumber());
assertEquals(expectedRecordsRequest.shardIterator(), recordsCaptor.getValue().shardIterator());
}
private CompletableFuture<GetRecordsResponse> makeGetRecordsResponse(String nextIterator, List<Record> records)
@ -360,16 +360,17 @@ public class KinesisDataFetcherTest {
assertNoAdvance(nonAdvancingResult1.get(), initialIterator);
assertAdvanced(advancingResult1.get(), initialIterator, nextIterator1);
verify(kinesisClient, times(2)).getRecords(any(GetRecordsRequest.class));
assertNoAdvance(nonAdvancingResult2.get(), nextIterator1);
assertAdvanced(advancingResult2.get(), nextIterator1, nextIterator2);
verify(kinesisClient, times(4)).getRecords(any(GetRecordsRequest.class));
assertNoAdvance(finalNonAdvancingResult.get(), nextIterator2);
assertAdvanced(finalAdvancingResult.get(), nextIterator2, null);
verify(kinesisClient, times(6)).getRecords(any(GetRecordsRequest.class));
verify(kinesisClient, times(2)).getRecords(eq(makeGetRecordsRequest(initialIterator)));
verify(kinesisClient, times(2)).getRecords(eq(makeGetRecordsRequest(nextIterator1)));
verify(kinesisClient, times(2)).getRecords(eq(makeGetRecordsRequest(nextIterator2)));
reset(kinesisClient);
@ -465,8 +466,6 @@ public class KinesisDataFetcherTest {
assertTrue(kinesisDataFetcher.isShardEndReached());
}
verify(kinesisClient, times(2)).getRecords(eq(makeGetRecordsRequest(previousValue)));
return acceptResult;
}
@ -477,8 +476,6 @@ public class KinesisDataFetcherTest {
assertEquals(previousValue, kinesisDataFetcher.getNextIterator());
verify(kinesisClient).getRecords(eq(makeGetRecordsRequest(previousValue)));
return noAcceptResult;
}
@ -512,8 +509,8 @@ public class KinesisDataFetcherTest {
kinesisDataFetcher.initialize(seqNo, initialPositionInStream);
assertEquals(expectedRecords, getRecordsRetrievalStrategy.getRecords(MAX_RECORDS).records());
verify(kinesisClient, times(1)).getShardIterator(eq(expectedIteratorRequest));
verify(kinesisClient, times(1)).getRecords(eq(expectedRecordsRequest));
verify(kinesisClient, times(1)).getShardIterator(any(GetShardIteratorRequest.class));
verify(kinesisClient, times(1)).getRecords(any(GetRecordsRequest.class));
}
}

View file

@ -33,7 +33,7 @@
</scm>
<properties>
<awssdk.version>2.5.10</awssdk.version>
<awssdk.version>2.9.25</awssdk.version>
</properties>
<licenses>