From 4cbb6d851e203c36604e6cdde07ca63409d8e267 Mon Sep 17 00:00:00 2001 From: "Pfifer, Justin" Date: Thu, 18 Aug 2016 08:41:24 -0700 Subject: [PATCH] Prevent Spurious Info Message on getIterator Clean up check for ShardIteratorType to prevent emitting a spurious message for every call to GetIterator Add missing KinesisProxyTest, and add 3 new tests for getIterator. --- .../clientlibrary/proxies/KinesisProxy.java | 15 +- .../proxies/KinesisProxyTest.java | 167 ++++++++++++++++++ 2 files changed, 180 insertions(+), 2 deletions(-) create mode 100644 src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxyTest.java diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java index ad929c21..de330dc9 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java @@ -17,6 +17,7 @@ package com.amazonaws.services.kinesis.clientlibrary.proxies; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Date; +import java.util.EnumSet; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -51,6 +52,9 @@ public class KinesisProxy implements IKinesisProxyExtended { private static final Log LOG = LogFactory.getLog(KinesisProxy.class); + private static final EnumSet EXPECTED_ITERATOR_TYPES = EnumSet + .of(ShardIteratorType.AT_SEQUENCE_NUMBER, ShardIteratorType.AFTER_SEQUENCE_NUMBER); + private static String defaultServiceName = "kinesis"; private static String defaultRegionId = "us-east-1";; @@ -265,8 +269,15 @@ public class KinesisProxy implements IKinesisProxyExtended { */ @Override public String getIterator(String shardId, String iteratorType, String sequenceNumber) { - if (!iteratorType.equals(ShardIteratorType.AT_SEQUENCE_NUMBER.toString()) || !iteratorType.equals( - ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString())) { + ShardIteratorType shardIteratorType; + try { + shardIteratorType = ShardIteratorType.fromValue(iteratorType); + } catch (IllegalArgumentException iae) { + LOG.error("Caught illegal argument exception while parsing iteratorType: " + iteratorType, iae); + shardIteratorType = null; + } + + if (!EXPECTED_ITERATOR_TYPES.contains(shardIteratorType)) { LOG.info("This method should only be used for AT_SEQUENCE_NUMBER and AFTER_SEQUENCE_NUMBER " + "ShardIteratorTypes. For methods to use with other ShardIteratorTypes, see IKinesisProxy.java"); } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxyTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxyTest.java new file mode 100644 index 00000000..2c1107b2 --- /dev/null +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxyTest.java @@ -0,0 +1,167 @@ +package com.amazonaws.services.kinesis.clientlibrary.proxies; + +import static org.hamcrest.Matchers.both; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasProperty; +import static org.hamcrest.Matchers.isA; +import static org.hamcrest.Matchers.nullValue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.argThat; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import com.amazonaws.AmazonServiceException; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentMatcher; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.kinesis.AmazonKinesisClient; +import com.amazonaws.services.kinesis.model.DescribeStreamRequest; +import com.amazonaws.services.kinesis.model.DescribeStreamResult; +import com.amazonaws.services.kinesis.model.GetShardIteratorRequest; +import com.amazonaws.services.kinesis.model.GetShardIteratorResult; +import com.amazonaws.services.kinesis.model.LimitExceededException; +import com.amazonaws.services.kinesis.model.Shard; +import com.amazonaws.services.kinesis.model.ShardIteratorType; +import com.amazonaws.services.kinesis.model.StreamDescription; +import com.amazonaws.services.kinesis.model.StreamStatus; + +import junit.framework.Assert; + +@RunWith(MockitoJUnitRunner.class) +public class KinesisProxyTest { + private static final String TEST_STRING = "TestString"; + private static final long BACKOFF_TIME = 10L; + private static final int RETRY_TIMES = 50; + + @Mock + private AmazonKinesisClient mockClient; + @Mock + private AWSCredentialsProvider mockCredentialsProvider; + @Mock + private GetShardIteratorResult shardIteratorResult; + private KinesisProxy proxy; + + // Test shards for verifying. + private Set shardIdSet; + private List shards; + + @Before + public void setUpTest() { + // Set up kinesis proxy + proxy = new KinesisProxy(TEST_STRING, mockCredentialsProvider, mockClient, BACKOFF_TIME, RETRY_TIMES); + when(mockCredentialsProvider.getCredentials()).thenReturn(null); + // Set up test shards + shardIdSet = new HashSet<>(); + shards = new ArrayList<>(); + String[] shardIds = new String[] { "shard-1", "shard-2", "shard-3", "shard-4" }; + for (String shardId : shardIds) { + Shard shard = new Shard(); + shard.setShardId(shardId); + shards.add(shard); + shardIdSet.add(shardId); + } + } + + @Test + public void testGetShardListWithMoreDataAvailable() { + // Set up mock : + // First call describeStream returning response with first two shards in the list; + // Second call describeStream returning response with rest shards. + DescribeStreamResult responseWithMoreData = createGetStreamInfoResponse(shards.subList(0, 2), true); + DescribeStreamResult responseFinal = createGetStreamInfoResponse(shards.subList(2, shards.size()), false); + doReturn(responseWithMoreData).when(mockClient).describeStream(argThat(new IsRequestWithStartShardId(null))); + doReturn(responseFinal).when(mockClient) + .describeStream(argThat(new IsRequestWithStartShardId(shards.get(1).getShardId()))); + + Set resultShardIdSets = proxy.getAllShardIds(); + Assert.assertTrue("Result set should equal to Test set", shardIdSet.equals(resultShardIdSets)); + } + + @Test + public void testGetShardListWithLimitExceededException() { + // Set up mock : + // First call describeStream throwing LimitExceededException; + // Second call describeStream returning shards list. + DescribeStreamResult response = createGetStreamInfoResponse(shards, false); + doThrow(new LimitExceededException("Test Exception")).doReturn(response).when(mockClient) + .describeStream(argThat(new IsRequestWithStartShardId(null))); + + Set resultShardIdSet = proxy.getAllShardIds(); + Assert.assertTrue("Result set should equal to Test set", shardIdSet.equals(resultShardIdSet)); + } + + @Test + public void testValidShardIteratorType() { + when(mockClient.getShardIterator(any(GetShardIteratorRequest.class))).thenReturn(shardIteratorResult); + String expectedShardIteratorType = ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(); + proxy.getIterator("Shard-001", expectedShardIteratorType, "1234"); + + verify(mockClient).getShardIterator(argThat(both(isA(GetShardIteratorRequest.class)) + .and(hasProperty("shardIteratorType", equalTo(expectedShardIteratorType))))); + } + + @Test + public void testInvalidShardIteratorIsntChanged() { + when(mockClient.getShardIterator(any(GetShardIteratorRequest.class))).thenReturn(shardIteratorResult); + String expectedShardIteratorType = ShardIteratorType.AT_TIMESTAMP.toString(); + proxy.getIterator("Shard-001", expectedShardIteratorType, "1234"); + + verify(mockClient).getShardIterator(argThat(both(isA(GetShardIteratorRequest.class)) + .and(hasProperty("shardIteratorType", equalTo(expectedShardIteratorType))))); + } + + @Test(expected = AmazonServiceException.class) + public void testNullShardIteratorType() { + when(mockClient.getShardIterator(any(GetShardIteratorRequest.class))).thenThrow(new AmazonServiceException("expected null")); + String expectedShardIteratorType = null; + proxy.getIterator("Shard-001", expectedShardIteratorType, "1234"); + + verify(mockClient).getShardIterator(argThat(both(isA(GetShardIteratorRequest.class)) + .and(hasProperty("shardIteratorType", nullValue(String.class))))); + } + + private DescribeStreamResult createGetStreamInfoResponse(List shards1, boolean isHasMoreShards) { + // Create stream description + StreamDescription description = new StreamDescription(); + description.setHasMoreShards(isHasMoreShards); + description.setShards(shards1); + description.setStreamStatus(StreamStatus.ACTIVE); + + // Create Describe Stream Result + DescribeStreamResult response = new DescribeStreamResult(); + response.setStreamDescription(description); + return response; + } + + // Matcher for testing describe stream request with specific start shard ID. + private static class IsRequestWithStartShardId extends ArgumentMatcher { + private final String shardId; + + public IsRequestWithStartShardId(String shardId) { + this.shardId = shardId; + } + + @Override + public boolean matches(Object request) { + String startShardId = ((DescribeStreamRequest) request).getExclusiveStartShardId(); + // If startShardId equals to null, shardId should also be null. + if (startShardId == null) { + return shardId == null; + } + return startShardId.equals(shardId); + } + } + +}