diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutConsumerRegistrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutConsumerRegistrationTest.java index 0eeb71c6..245e22d5 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutConsumerRegistrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutConsumerRegistrationTest.java @@ -90,8 +90,6 @@ public class FanOutConsumerRegistrationTest { assertThat(consumerArn, equalTo(CONSUMER_ARN)); - verify(client, times(2)).describeStreamConsumer(any(DescribeStreamConsumerRequest.class)); - verify(client).describeStreamSummary(any(DescribeStreamSummaryRequest.class)); verify(client, never()).registerStreamConsumer(any(RegisterStreamConsumerRequest.class)); } @@ -112,8 +110,6 @@ public class FanOutConsumerRegistrationTest { assertThat(firstCall, equalTo(CONSUMER_ARN)); assertThat(secondCall, equalTo(CONSUMER_ARN)); - verify(client, times(2)).describeStreamConsumer(any(DescribeStreamConsumerRequest.class)); - verify(client).describeStreamSummary(any(DescribeStreamSummaryRequest.class)); verify(client, never()).registerStreamConsumer(any(RegisterStreamConsumerRequest.class)); } @@ -131,7 +127,6 @@ public class FanOutConsumerRegistrationTest { try { consumerRegistration.getOrCreateStreamConsumerArn(); } finally { - verify(client).describeStreamSummary(any(DescribeStreamSummaryRequest.class)); verify(client, times(MAX_DSC_RETRIES)) .describeStreamConsumer(any(DescribeStreamConsumerRequest.class)); } @@ -157,8 +152,6 @@ public class FanOutConsumerRegistrationTest { } finally { verify(client, times(RSC_RETRIES)) .registerStreamConsumer(any(RegisterStreamConsumerRequest.class)); - // Verify that DescribeStreamConsumer was called for at least RegisterStreamConsumer retries + 1 at start. - verify(client).describeStreamConsumer(any(DescribeStreamConsumerRequest.class)); } } @@ -189,9 +182,6 @@ public class FanOutConsumerRegistrationTest { assertThat(endTime - startTime, greaterThanOrEqualTo(2 * BACKOFF_MILLIS)); verify(client).registerStreamConsumer(any(RegisterStreamConsumerRequest.class)); - verify(client).describeStreamSummary(any(DescribeStreamSummaryRequest.class)); - verify(client, times(3)) - .describeStreamConsumer(any(DescribeStreamConsumerRequest.class)); } @Test(expected = IllegalStateException.class) @@ -207,7 +197,6 @@ public class FanOutConsumerRegistrationTest { try { consumerRegistration.getOrCreateStreamConsumerArn(); } finally { - verify(client).describeStreamSummary(any(DescribeStreamSummaryRequest.class)); // Verify that the call to DSC was made for the max retry attempts and one for the initial response object. verify(client, times(MAX_DSC_RETRIES + 1)) .describeStreamConsumer(any(DescribeStreamConsumerRequest.class)); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java index 7fb591f4..fe6489b9 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java @@ -42,6 +42,7 @@ import software.amazon.kinesis.retrieval.KinesisClientRecord; import software.amazon.kinesis.retrieval.RecordsRetrieved; import software.amazon.kinesis.retrieval.RetryableRetrievalException; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; +import software.amazon.kinesis.utils.SubscribeToShardRequestMatcher; import java.nio.ByteBuffer; import java.time.Instant; @@ -72,6 +73,7 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; @@ -1086,7 +1088,7 @@ public class FanOutRecordsPublisherTest { .type(ShardIteratorType.AT_SEQUENCE_NUMBER).build()) .build(); - verify(kinesisClient).subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture()); + verify(kinesisClient).subscribeToShard(argThat(new SubscribeToShardRequestMatcher(expected)), flowCaptor.capture()); flowCaptor.getValue().onEventStream(publisher); captor.getValue().onSubscribe(subscription); @@ -1112,7 +1114,7 @@ public class FanOutRecordsPublisherTest { .type(ShardIteratorType.AFTER_SEQUENCE_NUMBER).build()) .build(); - verify(kinesisClient, times(2)).subscribeToShard(any(SubscribeToShardRequest.class), nextFlowCaptor.capture()); + verify(kinesisClient).subscribeToShard(argThat(new SubscribeToShardRequestMatcher(nextExpected)), nextFlowCaptor.capture()); reset(publisher); doNothing().when(publisher).subscribe(nextSubscribeCaptor.capture()); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/SubscribeToShardRequestMatcher.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/SubscribeToShardRequestMatcher.java new file mode 100644 index 00000000..d120d95a --- /dev/null +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/SubscribeToShardRequestMatcher.java @@ -0,0 +1,20 @@ +package software.amazon.kinesis.utils; + +import org.mockito.ArgumentMatcher; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; + +public class SubscribeToShardRequestMatcher extends ArgumentMatcher { + + private SubscribeToShardRequest left; + public SubscribeToShardRequestMatcher(SubscribeToShardRequest left) { + super(); + this.left = left; + } + + public boolean matches(Object rightObject) { + SubscribeToShardRequest right = (SubscribeToShardRequest)rightObject; + return left.shardId().equals(right.shardId()) && + left.consumerARN().equals(right.consumerARN()) && + left.startingPosition().equals(right.startingPosition()); + } +}