diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index bf95586a..19555cda 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -17,12 +17,14 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import java.time.Duration; import java.util.Date; import java.util.Optional; +import java.util.regex.Pattern; import java.util.Set; import com.amazonaws.services.dynamodbv2.model.BillingMode; import org.apache.commons.lang3.Validate; import com.amazonaws.ClientConfiguration; +import com.amazonaws.arn.Arn; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper; import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope; @@ -233,11 +235,25 @@ public class KinesisClientLibConfiguration { */ public static final int DEFAULT_MAX_INITIALIZATION_ATTEMPTS = 20; + /** + * Pattern for a stream ARN. The valid format is + * {@code arn:aws:kinesis:::stream/} + * where {@code region} is the id representation of a {@link Region}. + */ + private static final Pattern STREAM_ARN_PATTERN = Pattern.compile( + "arn:aws[^:]*:kinesis:(?[-a-z0-9]+):(?[0-9]{12}):stream/(?.+)"); + @Getter private BillingMode billingMode; private String applicationName; private String tableName; private String streamName; + + /** + * Kinesis stream ARN + */ + @Getter + private Arn streamArn; private String kinesisEndpoint; private String dynamoDBEndpoint; private InitialPositionInStream initialPositionInStream; @@ -719,14 +735,105 @@ public class KinesisClientLibConfiguration { this.billingMode = billingMode; } + /** + * Duplicate constructor to support stream ARN's in place of stream names. + * + * @param applicationName Name of the Kinesis application + * By default the application name is included in the user agent string used to make AWS requests. This + * can assist with troubleshooting (e.g. distinguish requests made by separate applications). + * @param streamArn Kinesis stream ARN + * @param kinesisEndpoint Kinesis endpoint + * @param dynamoDBEndpoint DynamoDB endpoint + * @param initialPositionInStream One of LATEST or TRIM_HORIZON. The KinesisClientLibrary will start fetching + * records from that location in the stream when an application starts up for the first time and there + * are no checkpoints. If there are checkpoints, then we start from the checkpoint position. + * @param kinesisCredentialsProvider Provides credentials used to access Kinesis + * @param dynamoDBCredentialsProvider Provides credentials used to access DynamoDB + * @param cloudWatchCredentialsProvider Provides credentials used to access CloudWatch + * @param failoverTimeMillis Lease duration (leases not renewed within this period will be claimed by others) + * @param workerId Used to distinguish different workers/processes of a Kinesis application + * @param maxRecords Max records to read per Kinesis getRecords() call + * @param idleTimeBetweenReadsInMillis Idle time between calls to fetch data from Kinesis + * @param callProcessRecordsEvenForEmptyRecordList Call the IRecordProcessor::processRecords() API even if + * GetRecords returned an empty record list. + * @param parentShardPollIntervalMillis Wait for this long between polls to check if parent shards are done + * @param shardSyncIntervalMillis Time between tasks to sync leases and Kinesis shards + * @param cleanupTerminatedShardsBeforeExpiry Clean up shards we've finished processing (don't wait for expiration + * in Kinesis) + * @param kinesisClientConfig Client Configuration used by Kinesis client + * @param dynamoDBClientConfig Client Configuration used by DynamoDB client + * @param cloudWatchClientConfig Client Configuration used by CloudWatch client + * @param taskBackoffTimeMillis Backoff period when tasks encounter an exception + * @param metricsBufferTimeMillis Metrics are buffered for at most this long before publishing to CloudWatch + * @param metricsMaxQueueSize Max number of metrics to buffer before publishing to CloudWatch + * @param validateSequenceNumberBeforeCheckpointing whether KCL should validate client provided sequence numbers + * with a call to Amazon Kinesis before checkpointing for calls to + * {@link RecordProcessorCheckpointer#checkpoint(String)} + * @param regionName The region name for the service + * @param shutdownGraceMillis Time before gracefully shutdown forcefully terminates + * @param billingMode The DDB Billing mode to set for lease table creation. + * @param recordsFetcherFactory Factory to create the records fetcher to retrieve data from Kinesis for a given shard. + * @param leaseCleanupIntervalMillis Rate at which to run lease cleanup thread in + * {@link com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager} + * @param completedLeaseCleanupThresholdMillis Threshold in millis at which to check if there are any completed leases + * (leases for shards which have been closed as a result of a resharding operation) that need to be cleaned up. + * @param garbageLeaseCleanupThresholdMillis Threshold in millis at which to check if there are any garbage leases + * (leases for shards which no longer exist in the stream) that need to be cleaned up. + */ + public KinesisClientLibConfiguration(String applicationName, + Arn streamArn, + String kinesisEndpoint, + String dynamoDBEndpoint, + InitialPositionInStream initialPositionInStream, + AWSCredentialsProvider kinesisCredentialsProvider, + AWSCredentialsProvider dynamoDBCredentialsProvider, + AWSCredentialsProvider cloudWatchCredentialsProvider, + long failoverTimeMillis, + String workerId, + int maxRecords, + long idleTimeBetweenReadsInMillis, + boolean callProcessRecordsEvenForEmptyRecordList, + long parentShardPollIntervalMillis, + long shardSyncIntervalMillis, + boolean cleanupTerminatedShardsBeforeExpiry, + ClientConfiguration kinesisClientConfig, + ClientConfiguration dynamoDBClientConfig, + ClientConfiguration cloudWatchClientConfig, + long taskBackoffTimeMillis, + long metricsBufferTimeMillis, + int metricsMaxQueueSize, + boolean validateSequenceNumberBeforeCheckpointing, + String regionName, + long shutdownGraceMillis, + BillingMode billingMode, + RecordsFetcherFactory recordsFetcherFactory, + long leaseCleanupIntervalMillis, + long completedLeaseCleanupThresholdMillis, + long garbageLeaseCleanupThresholdMillis) { + this(applicationName, streamArn.getResource().getResource(), kinesisEndpoint, dynamoDBEndpoint, initialPositionInStream, kinesisCredentialsProvider, + dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, failoverTimeMillis, workerId, maxRecords, idleTimeBetweenReadsInMillis, + callProcessRecordsEvenForEmptyRecordList, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupTerminatedShardsBeforeExpiry, + kinesisClientConfig, dynamoDBClientConfig, cloudWatchClientConfig, taskBackoffTimeMillis, metricsBufferTimeMillis, + metricsMaxQueueSize, validateSequenceNumberBeforeCheckpointing, regionName, shutdownGraceMillis, billingMode, + recordsFetcherFactory, leaseCleanupIntervalMillis, completedLeaseCleanupThresholdMillis, garbageLeaseCleanupThresholdMillis); + checkIsValidStreamArn(streamArn); + this.streamArn = streamArn; + } + // Check if value is positive, otherwise throw an exception - private void checkIsValuePositive(String key, long value) { + private static void checkIsValuePositive(String key, long value) { if (value <= 0) { throw new IllegalArgumentException("Value of " + key + " should be positive, but current value is " + value); } } + private static void checkIsValidStreamArn(Arn streamArn) { + if (!STREAM_ARN_PATTERN.matcher(streamArn.toString()).matches()) { + throw new IllegalArgumentException("Invalid streamArn " + streamArn); + } + } + // Check if user agent in configuration is the default agent. // If so, replace it with application name plus KINESIS_CLIENT_LIB_USER_AGENT. // If not, append KINESIS_CLIENT_LIB_USER_AGENT to the end. @@ -1056,6 +1163,25 @@ public class KinesisClientLibConfiguration { return shutdownGraceMillis; } + /** + * @param streamName Kinesis stream name + * @return KinesisClientLibConfiguration + */ + public KinesisClientLibConfiguration withStreamName(String streamName) { + this.streamName = streamName; + return this; + } + + /** + * @param streamArn Kinesis stream ARN + * @return KinesisClientLibConfiguration + */ + public KinesisClientLibConfiguration withStreamArn(Arn streamArn) { + checkIsValidStreamArn(streamArn); + this.streamArn = streamArn; + return this; + } + /* // CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 190 LINES /** diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java index c6b641bb..5673aeb9 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java @@ -37,6 +37,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import com.amazonaws.arn.Arn; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.services.kinesis.AmazonKinesis; import com.amazonaws.services.kinesis.AmazonKinesisClient; @@ -94,6 +95,12 @@ public class KinesisProxy implements IKinesisProxyExtended { private final String streamName; + /** + * Stored as a string instead of an ARN to reduce repetitive null checks when passing in the stream ARN to + * the client requests, which accepts a String stream ARN parameter. + */ + private String streamArn; + private static final long DEFAULT_DESCRIBE_STREAM_BACKOFF_MILLIS = 1000L; private static final int DEFAULT_DESCRIBE_STREAM_RETRY_TIMES = 50; private final long describeStreamBackoffTimeInMillis; @@ -218,6 +225,8 @@ public class KinesisProxy implements IKinesisProxyExtended { config.getListShardsBackoffTimeInMillis(), config.getMaxListShardsRetryAttempts()); this.credentialsProvider = config.getKinesisCredentialsProvider(); + Arn arn = config.getStreamArn(); + this.streamArn = arn != null ? arn.toString() : null; } public KinesisProxy(final String streamName, @@ -253,6 +262,7 @@ public class KinesisProxy implements IKinesisProxyExtended { final GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setRequestCredentials(credentialsProvider.getCredentials()); + getRecordsRequest.setStreamARN(streamArn); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(maxRecords); final GetRecordsResult response = client.getRecords(getRecordsRequest); @@ -270,6 +280,7 @@ public class KinesisProxy implements IKinesisProxyExtended { final DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); describeStreamRequest.setRequestCredentials(credentialsProvider.getCredentials()); describeStreamRequest.setStreamName(streamName); + describeStreamRequest.setStreamARN(streamArn); describeStreamRequest.setExclusiveStartShardId(startShardId); DescribeStreamResult response = null; @@ -314,6 +325,7 @@ public class KinesisProxy implements IKinesisProxyExtended { request.setRequestCredentials(credentialsProvider.getCredentials()); if (StringUtils.isEmpty(nextToken)) { request.setStreamName(streamName); + request.setStreamARN(streamArn); request.setShardFilter(shardFilter); } else { request.setNextToken(nextToken); @@ -567,6 +579,7 @@ public class KinesisProxy implements IKinesisProxyExtended { final GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest(); getShardIteratorRequest.setRequestCredentials(credentialsProvider.getCredentials()); getShardIteratorRequest.setStreamName(streamName); + getShardIteratorRequest.setStreamARN(streamArn); getShardIteratorRequest.setShardId(shardId); getShardIteratorRequest.setShardIteratorType(iteratorType); getShardIteratorRequest.setStartingSequenceNumber(sequenceNumber); @@ -583,6 +596,7 @@ public class KinesisProxy implements IKinesisProxyExtended { final GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest(); getShardIteratorRequest.setRequestCredentials(credentialsProvider.getCredentials()); getShardIteratorRequest.setStreamName(streamName); + getShardIteratorRequest.setStreamARN(streamArn); getShardIteratorRequest.setShardId(shardId); getShardIteratorRequest.setShardIteratorType(iteratorType); getShardIteratorRequest.setStartingSequenceNumber(null); @@ -599,6 +613,7 @@ public class KinesisProxy implements IKinesisProxyExtended { final GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest(); getShardIteratorRequest.setRequestCredentials(credentialsProvider.getCredentials()); getShardIteratorRequest.setStreamName(streamName); + getShardIteratorRequest.setStreamARN(streamArn); getShardIteratorRequest.setShardId(shardId); getShardIteratorRequest.setShardIteratorType(ShardIteratorType.AT_TIMESTAMP); getShardIteratorRequest.setStartingSequenceNumber(null); @@ -618,6 +633,7 @@ public class KinesisProxy implements IKinesisProxyExtended { final PutRecordRequest putRecordRequest = new PutRecordRequest(); putRecordRequest.setRequestCredentials(credentialsProvider.getCredentials()); putRecordRequest.setStreamName(streamName); + putRecordRequest.setStreamARN(streamArn); putRecordRequest.setSequenceNumberForOrdering(exclusiveMinimumSequenceNumber); putRecordRequest.setExplicitHashKey(explicitHashKey); putRecordRequest.setPartitionKey(partitionKey); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java index f2b5a460..aeb2e0c9 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java @@ -20,13 +20,16 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertFalse; import static org.junit.Assert.fail; +import java.util.Arrays; import java.util.Date; +import java.util.List; import com.amazonaws.services.dynamodbv2.model.BillingMode; import org.junit.Test; import org.mockito.Mockito; import com.amazonaws.ClientConfiguration; +import com.amazonaws.arn.Arn; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.regions.Region; import com.amazonaws.regions.RegionUtils; @@ -34,6 +37,7 @@ import com.amazonaws.services.cloudwatch.AmazonCloudWatchClient; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient; import com.amazonaws.services.kinesis.AmazonKinesisClient; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.SimpleRecordsFetcherFactory; import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; import com.google.common.collect.ImmutableSet; @@ -46,8 +50,28 @@ public class KinesisClientLibConfigurationTest { private static final long TEST_VALUE_LONG = 1000L; private static final int TEST_VALUE_INT = 1000; private static final int PARAMETER_COUNT = 6; + private static final String ACCOUNT_ID = "123456789012"; private static final String TEST_STRING = "TestString"; + private static final Arn TEST_ARN = Arn.builder() + .withPartition("aws") + .withService("kinesis") + .withRegion("us-east-1") + .withAccountId(ACCOUNT_ID) + .withResource("stream/" + TEST_STRING) + .build(); + + /** + * Invalid steamARN due to invalid service. This is a sample used for testing. + * @see KinesisClientLibConfigurationTest#testWithInvalidStreamArnsThrowsException() for more examples + */ + private static final Arn INVALID_TEST_ARN = Arn.builder() + .withPartition("aws") + .withService("dynamodb") + .withRegion("us-east-1") + .withAccountId(ACCOUNT_ID) + .withResource("stream/" + TEST_STRING) + .build(); private static final String ALTER_STRING = "AlterString"; // We don't want any of these tests to run checkpoint validation @@ -62,32 +86,11 @@ public class KinesisClientLibConfigurationTest { new KinesisClientLibConfiguration(TEST_STRING, TEST_STRING, null, TEST_STRING); // Test constructor with all valid arguments. - config = - new KinesisClientLibConfiguration(TEST_STRING, - TEST_STRING, - TEST_STRING, - TEST_STRING, - InitialPositionInStream.LATEST, - null, - null, - null, - TEST_VALUE_LONG, - TEST_STRING, - TEST_VALUE_INT, - TEST_VALUE_LONG, - false, - TEST_VALUE_LONG, - TEST_VALUE_LONG, - true, - new ClientConfiguration(), - new ClientConfiguration(), - new ClientConfiguration(), - TEST_VALUE_LONG, - TEST_VALUE_LONG, - TEST_VALUE_INT, - skipCheckpointValidationValue, - null, - TEST_VALUE_LONG, BillingMode.PROVISIONED); + config = buildKinesisClientLibConfiguration(TEST_STRING); + + // Test constructor with streamArn with all valid arguments. + config = buildKinesisClientLibConfiguration(TEST_ARN); + Assert.assertEquals(config.getStreamName(), TEST_STRING); } @Test @@ -168,6 +171,12 @@ public class KinesisClientLibConfigurationTest { } intValues[i] = TEST_VALUE_INT; } + // Test constructor with invalid streamArn + try { + config = buildKinesisClientLibConfiguration(INVALID_TEST_ARN); + } catch(IllegalArgumentException e) { + System.out.println(e.getMessage()); + } Assert.assertTrue("KCLConfiguration should return null when using negative arguments", config == null); } @@ -370,4 +379,111 @@ public class KinesisClientLibConfigurationTest { config = config.withIgnoreUnexpectedChildShards(true); assertTrue(config.shouldIgnoreUnexpectedChildShards()); } + + @Test + public void testWithValidStreamArnsSucceed() { + List validArnList = Arrays.asList( + "arn:aws:kinesis:us-east-1:123456789012:stream/123stream-name123", + "arn:aws-china:kinesis:us-east-2:123456789012:stream/stream-name", + "arn:aws-us-gov:kinesis:us-east-2:123456789012:stream/stream-name" + ); + + KinesisClientLibConfiguration config = + new KinesisClientLibConfiguration("TestApplication", "TestStream", null, "TestWorker"); + + for (final String arn : validArnList) { + config.withStreamArn(Arn.fromString(arn)); + } + } + + @Test + public void testWithInvalidStreamArnsThrowsException() { + List invalidArnList = Arrays.asList( + "arn:abc:kinesis:us-east-1:123456789012:stream/stream-name", //invalid partition + "arn:aws:dynamnodb:us-east-1:123456789012:stream/stream-name", // Kinesis ARN, but with a non-Kinesis service + "arn:aws:kinesis::123456789012:stream/stream-name", // missing region + "arn:aws:kinesis:us-east-1::stream/stream-name", // missing account id + "arn:aws:kinesis:us-east-1:123456789:stream/stream-name", // account id not 12 digits + "arn:aws:kinesis:us-east-1:123456789abc:stream/stream-name", // 12char alphanumeric account id + "arn:aws:kinesis:us-east-1:123456789012:table/table-name", // incorrect resource type + "arn:aws:dynamodb:us-east-1:123456789012:table/myDynamoDBTable" // valid arn but not a stream + ); + + KinesisClientLibConfiguration config = + new KinesisClientLibConfiguration("TestApplication", "TestStream", null, "TestWorker"); + + for (final String arnString : invalidArnList) { + Arn arn = Arn.fromString(arnString); + try { + config.withStreamArn(arn); + fail("Arn " + arn + " should have thrown an IllegalArgumentException"); + } catch (IllegalArgumentException e) { + // expected + } + } + } + + private KinesisClientLibConfiguration buildKinesisClientLibConfiguration(Arn streamArn) { + return new KinesisClientLibConfiguration(TEST_STRING, + streamArn, + TEST_STRING, + TEST_STRING, + InitialPositionInStream.LATEST, + null, + null, + null, + TEST_VALUE_LONG, + TEST_STRING, + TEST_VALUE_INT, + TEST_VALUE_LONG, + false, + TEST_VALUE_LONG, + TEST_VALUE_LONG, + true, + new ClientConfiguration(), + new ClientConfiguration(), + new ClientConfiguration(), + TEST_VALUE_LONG, + TEST_VALUE_LONG, + TEST_VALUE_INT, + skipCheckpointValidationValue, + null, + TEST_VALUE_LONG, BillingMode.PROVISIONED, + new SimpleRecordsFetcherFactory(), + TEST_VALUE_LONG, + TEST_VALUE_LONG, + TEST_VALUE_LONG); + } + + private KinesisClientLibConfiguration buildKinesisClientLibConfiguration(String streamName) { + return new KinesisClientLibConfiguration(TEST_STRING, + streamName, + TEST_STRING, + TEST_STRING, + InitialPositionInStream.LATEST, + null, + null, + null, + TEST_VALUE_LONG, + TEST_STRING, + TEST_VALUE_INT, + TEST_VALUE_LONG, + false, + TEST_VALUE_LONG, + TEST_VALUE_LONG, + true, + new ClientConfiguration(), + new ClientConfiguration(), + new ClientConfiguration(), + TEST_VALUE_LONG, + TEST_VALUE_LONG, + TEST_VALUE_INT, + skipCheckpointValidationValue, + null, + TEST_VALUE_LONG, BillingMode.PROVISIONED, + new SimpleRecordsFetcherFactory(), + TEST_VALUE_LONG, + TEST_VALUE_LONG, + TEST_VALUE_LONG); + } } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxyTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxyTest.java index 76671176..a603db1d 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxyTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxyTest.java @@ -48,6 +48,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.stream.Collectors; +import lombok.Builder; import org.apache.commons.lang3.StringUtils; import org.hamcrest.Description; import org.hamcrest.TypeSafeDiagnosingMatcher; @@ -59,6 +60,7 @@ import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; import com.amazonaws.AmazonServiceException; +import com.amazonaws.arn.Arn; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient; import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClientChild; @@ -77,11 +79,17 @@ import com.amazonaws.services.kinesis.model.ShardIteratorType; import com.amazonaws.services.kinesis.model.StreamDescription; import com.amazonaws.services.kinesis.model.StreamStatus; -import lombok.AllArgsConstructor; - @RunWith(MockitoJUnitRunner.class) public class KinesisProxyTest { private static final String TEST_STRING = "TestString"; + private static final String ACCOUNT_ID = "123456789012"; + private static final Arn TEST_ARN = Arn.builder() + .withPartition("aws") + .withService("kinesis") + .withRegion("us-east-1") + .withAccountId(ACCOUNT_ID) + .withResource("stream/" + TEST_STRING) + .build(); private static final long DESCRIBE_STREAM_BACKOFF_TIME = 10L; private static final long LIST_SHARDS_BACKOFF_TIME = 10L; private static final int DESCRIBE_STREAM_RETRY_TIMES = 3; @@ -132,6 +140,7 @@ public class KinesisProxyTest { public void setUpTest() { // Set up kinesis ddbProxy when(config.getStreamName()).thenReturn(TEST_STRING); + when(config.getStreamArn()).thenReturn(TEST_ARN); when(config.getListShardsBackoffTimeInMillis()).thenReturn(LIST_SHARDS_BACKOFF_TIME); when(config.getMaxListShardsRetryAttempts()).thenReturn(LIST_SHARDS_RETRY_TIMES); when(config.getKinesisCredentialsProvider()).thenReturn(mockCredentialsProvider); @@ -163,7 +172,8 @@ public class KinesisProxyTest { // Second call describeStream returning response with rest shards. DescribeStreamResult responseWithMoreData = createGetStreamInfoResponse(shards.subList(0, 2), true); DescribeStreamResult responseFinal = createGetStreamInfoResponse(shards.subList(2, shards.size()), false); - doReturn(responseWithMoreData).when(mockDDBStreamClient).describeStream(argThat(new IsRequestWithStartShardId(null))); + IsRequestWithStartShardId requestMatcher = IsRequestWithStartShardId.builder().streamName(TEST_STRING).build(); + doReturn(responseWithMoreData).when(mockDDBStreamClient).describeStream(argThat(requestMatcher)); doReturn(responseFinal).when(mockDDBStreamClient) .describeStream(argThat(new OldIsRequestWithStartShardId(shards.get(1).getShardId()))); @@ -310,7 +320,8 @@ public class KinesisProxyTest { public void testGetShardListWithDDBChildClient() { DescribeStreamResult responseWithMoreData = createGetStreamInfoResponse(shards.subList(0, 2), true); DescribeStreamResult responseFinal = createGetStreamInfoResponse(shards.subList(2, shards.size()), false); - doReturn(responseWithMoreData).when(mockDDBChildClient).describeStream(argThat(new IsRequestWithStartShardId(null))); + IsRequestWithStartShardId requestMatcher = IsRequestWithStartShardId.builder().streamName(TEST_STRING).build(); + doReturn(responseWithMoreData).when(mockDDBChildClient).describeStream(argThat(requestMatcher)); doReturn(responseFinal).when(mockDDBChildClient) .describeStream(argThat(new OldIsRequestWithStartShardId(shards.get(1).getShardId()))); @@ -498,37 +509,61 @@ public class KinesisProxyTest { return response; } - private IsRequestWithStartShardId describeWithoutShardId() { - return describeWithShardId(null); - } - private IsRequestWithStartShardId describeWithShardId(String shardId) { - return new IsRequestWithStartShardId(shardId); + return IsRequestWithStartShardId.builder() + .streamName(TEST_STRING) + .streamArn(TEST_ARN) + .shardId(shardId) + .build(); } + @Builder private static class IsRequestWithStartShardId extends TypeSafeDiagnosingMatcher { + private final String streamName; + private final Arn streamArn; private final String shardId; - public IsRequestWithStartShardId(String shardId) { - this.shardId = shardId; - } - @Override protected boolean matchesSafely(DescribeStreamRequest item, Description mismatchDescription) { + boolean matches = true; + if (streamName == null) { + if (item.getStreamName() != null) { + mismatchDescription.appendText("Expected streamName of null, but was ") + .appendValue(item.getStreamName()); + matches = false; + } + } else if (!streamName.equals(item.getStreamName())) { + mismatchDescription.appendValue(streamName).appendText(" doesn't match expected ") + .appendValue(item.getStreamName()); + matches = false; + } + + if (streamArn == null) { + if (item.getStreamARN() != null) { + mismatchDescription.appendText("Expected streamArn of null, but was ") + .appendValue(item.getStreamARN()); + matches = false; + } + } else if (!streamArn.equals(Arn.fromString(item.getStreamARN()))) { + mismatchDescription.appendValue(streamArn).appendText(" doesn't match expected ") + .appendValue(item.getStreamARN()); + matches = false; + } + if (shardId == null) { if (item.getExclusiveStartShardId() != null) { mismatchDescription.appendText("Expected starting shard id of null, but was ") .appendValue(item.getExclusiveStartShardId()); - return false; + matches = false; } } else if (!shardId.equals(item.getExclusiveStartShardId())) { mismatchDescription.appendValue(shardId).appendText(" doesn't match expected ") .appendValue(item.getExclusiveStartShardId()); - return false; + matches = false; } - return true; + return matches; } @Override @@ -557,49 +592,87 @@ public class KinesisProxyTest { } private static ListShardsRequestMatcher initialListShardsRequestMatcher() { - return new ListShardsRequestMatcher(null, null); + return ListShardsRequestMatcher.builder() + .streamName(TEST_STRING) + .streamArn(TEST_ARN) + .build(); } private static ListShardsRequestMatcher listShardsNextToken(final String nextToken) { - return new ListShardsRequestMatcher(null, nextToken); + return ListShardsRequestMatcher.builder() + .nextToken(nextToken) + .build(); } - @AllArgsConstructor + @Builder private static class ListShardsRequestMatcher extends TypeSafeDiagnosingMatcher { + private final String streamName; + private final Arn streamArn; private final String shardId; private final String nextToken; @Override protected boolean matchesSafely(final ListShardsRequest listShardsRequest, final Description description) { + boolean matches = true; + if (streamName == null) { + if (StringUtils.isNotEmpty(listShardsRequest.getStreamName())) { + description.appendText("Expected streamName to be null, but was ") + .appendValue(listShardsRequest.getStreamName()); + matches = false; + } + } else { + if (!streamName.equals(listShardsRequest.getStreamName())) { + description.appendText("Expected streamName: ").appendValue(streamName) + .appendText(" doesn't match actual streamName: ") + .appendValue(listShardsRequest.getStreamName()); + matches = false; + } + } + + if (streamArn == null) { + if (StringUtils.isNotEmpty(listShardsRequest.getStreamARN())) { + description.appendText("Expected streamArn to be null, but was ") + .appendValue(listShardsRequest.getStreamARN()); + matches = false; + } + } else { + if (!streamArn.equals(Arn.fromString(listShardsRequest.getStreamARN()))) { + description.appendText("Expected streamArn: ").appendValue(streamArn) + .appendText(" doesn't match actual streamArn: ") + .appendValue(listShardsRequest.getStreamARN()); + matches = false; + } + } + if (shardId == null) { if (StringUtils.isNotEmpty(listShardsRequest.getExclusiveStartShardId())) { description.appendText("Expected ExclusiveStartShardId to be null, but was ") .appendValue(listShardsRequest.getExclusiveStartShardId()); - return false; + matches = false; } } else { if (!shardId.equals(listShardsRequest.getExclusiveStartShardId())) { description.appendText("Expected shardId: ").appendValue(shardId) .appendText(" doesn't match actual shardId: ") .appendValue(listShardsRequest.getExclusiveStartShardId()); - return false; + matches = false; } } if (StringUtils.isNotEmpty(listShardsRequest.getNextToken())) { if (StringUtils.isNotEmpty(listShardsRequest.getStreamName()) || StringUtils.isNotEmpty(listShardsRequest.getExclusiveStartShardId())) { - return false; + matches = false; } if (!listShardsRequest.getNextToken().equals(nextToken)) { description.appendText("Found nextToken: ").appendValue(listShardsRequest.getNextToken()) .appendText(" when it was supposed to be null."); - return false; + matches = false; } } else { return nextToken == null; } - return true; + return matches; } @Override