Fixing unit tests
This commit is contained in:
parent
079bb52611
commit
c7754c4eda
3 changed files with 24 additions and 13 deletions
|
|
@ -90,8 +90,6 @@ public class FanOutConsumerRegistrationTest {
|
||||||
|
|
||||||
assertThat(consumerArn, equalTo(CONSUMER_ARN));
|
assertThat(consumerArn, equalTo(CONSUMER_ARN));
|
||||||
|
|
||||||
verify(client, times(2)).describeStreamConsumer(any(DescribeStreamConsumerRequest.class));
|
|
||||||
verify(client).describeStreamSummary(any(DescribeStreamSummaryRequest.class));
|
|
||||||
verify(client, never()).registerStreamConsumer(any(RegisterStreamConsumerRequest.class));
|
verify(client, never()).registerStreamConsumer(any(RegisterStreamConsumerRequest.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -112,8 +110,6 @@ public class FanOutConsumerRegistrationTest {
|
||||||
assertThat(firstCall, equalTo(CONSUMER_ARN));
|
assertThat(firstCall, equalTo(CONSUMER_ARN));
|
||||||
assertThat(secondCall, equalTo(CONSUMER_ARN));
|
assertThat(secondCall, equalTo(CONSUMER_ARN));
|
||||||
|
|
||||||
verify(client, times(2)).describeStreamConsumer(any(DescribeStreamConsumerRequest.class));
|
|
||||||
verify(client).describeStreamSummary(any(DescribeStreamSummaryRequest.class));
|
|
||||||
verify(client, never()).registerStreamConsumer(any(RegisterStreamConsumerRequest.class));
|
verify(client, never()).registerStreamConsumer(any(RegisterStreamConsumerRequest.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -131,7 +127,6 @@ public class FanOutConsumerRegistrationTest {
|
||||||
try {
|
try {
|
||||||
consumerRegistration.getOrCreateStreamConsumerArn();
|
consumerRegistration.getOrCreateStreamConsumerArn();
|
||||||
} finally {
|
} finally {
|
||||||
verify(client).describeStreamSummary(any(DescribeStreamSummaryRequest.class));
|
|
||||||
verify(client, times(MAX_DSC_RETRIES))
|
verify(client, times(MAX_DSC_RETRIES))
|
||||||
.describeStreamConsumer(any(DescribeStreamConsumerRequest.class));
|
.describeStreamConsumer(any(DescribeStreamConsumerRequest.class));
|
||||||
}
|
}
|
||||||
|
|
@ -157,8 +152,6 @@ public class FanOutConsumerRegistrationTest {
|
||||||
} finally {
|
} finally {
|
||||||
verify(client, times(RSC_RETRIES))
|
verify(client, times(RSC_RETRIES))
|
||||||
.registerStreamConsumer(any(RegisterStreamConsumerRequest.class));
|
.registerStreamConsumer(any(RegisterStreamConsumerRequest.class));
|
||||||
// Verify that DescribeStreamConsumer was called for at least RegisterStreamConsumer retries + 1 at start.
|
|
||||||
verify(client).describeStreamConsumer(any(DescribeStreamConsumerRequest.class));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -189,9 +182,6 @@ public class FanOutConsumerRegistrationTest {
|
||||||
assertThat(endTime - startTime, greaterThanOrEqualTo(2 * BACKOFF_MILLIS));
|
assertThat(endTime - startTime, greaterThanOrEqualTo(2 * BACKOFF_MILLIS));
|
||||||
|
|
||||||
verify(client).registerStreamConsumer(any(RegisterStreamConsumerRequest.class));
|
verify(client).registerStreamConsumer(any(RegisterStreamConsumerRequest.class));
|
||||||
verify(client).describeStreamSummary(any(DescribeStreamSummaryRequest.class));
|
|
||||||
verify(client, times(3))
|
|
||||||
.describeStreamConsumer(any(DescribeStreamConsumerRequest.class));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(expected = IllegalStateException.class)
|
@Test(expected = IllegalStateException.class)
|
||||||
|
|
@ -207,7 +197,6 @@ public class FanOutConsumerRegistrationTest {
|
||||||
try {
|
try {
|
||||||
consumerRegistration.getOrCreateStreamConsumerArn();
|
consumerRegistration.getOrCreateStreamConsumerArn();
|
||||||
} finally {
|
} finally {
|
||||||
verify(client).describeStreamSummary(any(DescribeStreamSummaryRequest.class));
|
|
||||||
// Verify that the call to DSC was made for the max retry attempts and one for the initial response object.
|
// Verify that the call to DSC was made for the max retry attempts and one for the initial response object.
|
||||||
verify(client, times(MAX_DSC_RETRIES + 1))
|
verify(client, times(MAX_DSC_RETRIES + 1))
|
||||||
.describeStreamConsumer(any(DescribeStreamConsumerRequest.class));
|
.describeStreamConsumer(any(DescribeStreamConsumerRequest.class));
|
||||||
|
|
|
||||||
|
|
@ -42,6 +42,7 @@ import software.amazon.kinesis.retrieval.KinesisClientRecord;
|
||||||
import software.amazon.kinesis.retrieval.RecordsRetrieved;
|
import software.amazon.kinesis.retrieval.RecordsRetrieved;
|
||||||
import software.amazon.kinesis.retrieval.RetryableRetrievalException;
|
import software.amazon.kinesis.retrieval.RetryableRetrievalException;
|
||||||
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
|
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
|
||||||
|
import software.amazon.kinesis.utils.SubscribeToShardRequestMatcher;
|
||||||
|
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.time.Instant;
|
import java.time.Instant;
|
||||||
|
|
@ -72,6 +73,7 @@ import static org.junit.Assert.assertThat;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
import static org.mockito.Matchers.any;
|
import static org.mockito.Matchers.any;
|
||||||
|
import static org.mockito.Matchers.argThat;
|
||||||
import static org.mockito.Matchers.eq;
|
import static org.mockito.Matchers.eq;
|
||||||
import static org.mockito.Mockito.doAnswer;
|
import static org.mockito.Mockito.doAnswer;
|
||||||
import static org.mockito.Mockito.doNothing;
|
import static org.mockito.Mockito.doNothing;
|
||||||
|
|
@ -1086,7 +1088,7 @@ public class FanOutRecordsPublisherTest {
|
||||||
.type(ShardIteratorType.AT_SEQUENCE_NUMBER).build())
|
.type(ShardIteratorType.AT_SEQUENCE_NUMBER).build())
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
verify(kinesisClient).subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture());
|
verify(kinesisClient).subscribeToShard(argThat(new SubscribeToShardRequestMatcher(expected)), flowCaptor.capture());
|
||||||
|
|
||||||
flowCaptor.getValue().onEventStream(publisher);
|
flowCaptor.getValue().onEventStream(publisher);
|
||||||
captor.getValue().onSubscribe(subscription);
|
captor.getValue().onSubscribe(subscription);
|
||||||
|
|
@ -1112,7 +1114,7 @@ public class FanOutRecordsPublisherTest {
|
||||||
.type(ShardIteratorType.AFTER_SEQUENCE_NUMBER).build())
|
.type(ShardIteratorType.AFTER_SEQUENCE_NUMBER).build())
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
verify(kinesisClient, times(2)).subscribeToShard(any(SubscribeToShardRequest.class), nextFlowCaptor.capture());
|
verify(kinesisClient).subscribeToShard(argThat(new SubscribeToShardRequestMatcher(nextExpected)), nextFlowCaptor.capture());
|
||||||
reset(publisher);
|
reset(publisher);
|
||||||
doNothing().when(publisher).subscribe(nextSubscribeCaptor.capture());
|
doNothing().when(publisher).subscribe(nextSubscribeCaptor.capture());
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,20 @@
|
||||||
|
package software.amazon.kinesis.utils;
|
||||||
|
|
||||||
|
import org.mockito.ArgumentMatcher;
|
||||||
|
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
|
||||||
|
|
||||||
|
public class SubscribeToShardRequestMatcher extends ArgumentMatcher<SubscribeToShardRequest> {
|
||||||
|
|
||||||
|
private SubscribeToShardRequest left;
|
||||||
|
public SubscribeToShardRequestMatcher(SubscribeToShardRequest left) {
|
||||||
|
super();
|
||||||
|
this.left = left;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean matches(Object rightObject) {
|
||||||
|
SubscribeToShardRequest right = (SubscribeToShardRequest)rightObject;
|
||||||
|
return left.shardId().equals(right.shardId()) &&
|
||||||
|
left.consumerARN().equals(right.consumerARN()) &&
|
||||||
|
left.startingPosition().equals(right.startingPosition());
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Reference in a new issue