diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index bf95586a..40a06efe 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -23,6 +23,7 @@ import com.amazonaws.services.dynamodbv2.model.BillingMode; import org.apache.commons.lang3.Validate; import com.amazonaws.ClientConfiguration; +import com.amazonaws.arn.Arn; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper; import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope; @@ -238,6 +239,7 @@ public class KinesisClientLibConfiguration { private String applicationName; private String tableName; private String streamName; + private Arn streamARN; private String kinesisEndpoint; private String dynamoDBEndpoint; private InitialPositionInStream initialPositionInStream; @@ -719,6 +721,138 @@ public class KinesisClientLibConfiguration { this.billingMode = billingMode; } + /** + * Duplicate constructor to support stream ARN's in place of stream names. + * + * @param applicationName Name of the Kinesis application + * By default the application name is included in the user agent string used to make AWS requests. This + * can assist with troubleshooting (e.g. distinguish requests made by separate applications). + * @param streamARN Kinesis stream ARN + * @param kinesisEndpoint Kinesis endpoint + * @param dynamoDBEndpoint DynamoDB endpoint + * @param initialPositionInStream One of LATEST or TRIM_HORIZON. The KinesisClientLibrary will start fetching + * records from that location in the stream when an application starts up for the first time and there + * are no checkpoints. If there are checkpoints, then we start from the checkpoint position. + * @param kinesisCredentialsProvider Provides credentials used to access Kinesis + * @param dynamoDBCredentialsProvider Provides credentials used to access DynamoDB + * @param cloudWatchCredentialsProvider Provides credentials used to access CloudWatch + * @param failoverTimeMillis Lease duration (leases not renewed within this period will be claimed by others) + * @param workerId Used to distinguish different workers/processes of a Kinesis application + * @param maxRecords Max records to read per Kinesis getRecords() call + * @param idleTimeBetweenReadsInMillis Idle time between calls to fetch data from Kinesis + * @param callProcessRecordsEvenForEmptyRecordList Call the IRecordProcessor::processRecords() API even if + * GetRecords returned an empty record list. + * @param parentShardPollIntervalMillis Wait for this long between polls to check if parent shards are done + * @param shardSyncIntervalMillis Time between tasks to sync leases and Kinesis shards + * @param cleanupTerminatedShardsBeforeExpiry Clean up shards we've finished processing (don't wait for expiration + * in Kinesis) + * @param kinesisClientConfig Client Configuration used by Kinesis client + * @param dynamoDBClientConfig Client Configuration used by DynamoDB client + * @param cloudWatchClientConfig Client Configuration used by CloudWatch client + * @param taskBackoffTimeMillis Backoff period when tasks encounter an exception + * @param metricsBufferTimeMillis Metrics are buffered for at most this long before publishing to CloudWatch + * @param metricsMaxQueueSize Max number of metrics to buffer before publishing to CloudWatch + * @param validateSequenceNumberBeforeCheckpointing whether KCL should validate client provided sequence numbers + * with a call to Amazon Kinesis before checkpointing for calls to + * {@link RecordProcessorCheckpointer#checkpoint(String)} + * @param regionName The region name for the service + * @param shutdownGraceMillis Time before gracefully shutdown forcefully terminates + * @param billingMode The DDB Billing mode to set for lease table creation. + * @param recordsFetcherFactory Factory to create the records fetcher to retrieve data from Kinesis for a given shard. + * @param leaseCleanupIntervalMillis Rate at which to run lease cleanup thread in + * {@link com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager} + * @param completedLeaseCleanupThresholdMillis Threshold in millis at which to check if there are any completed leases + * (leases for shards which have been closed as a result of a resharding operation) that need to be cleaned up. + * @param garbageLeaseCleanupThresholdMillis Threshold in millis at which to check if there are any garbage leases + * (leases for shards which no longer exist in the stream) that need to be cleaned up. + */ + public KinesisClientLibConfiguration(String applicationName, + Arn streamARN, + String kinesisEndpoint, + String dynamoDBEndpoint, + InitialPositionInStream initialPositionInStream, + AWSCredentialsProvider kinesisCredentialsProvider, + AWSCredentialsProvider dynamoDBCredentialsProvider, + AWSCredentialsProvider cloudWatchCredentialsProvider, + long failoverTimeMillis, + String workerId, + int maxRecords, + long idleTimeBetweenReadsInMillis, + boolean callProcessRecordsEvenForEmptyRecordList, + long parentShardPollIntervalMillis, + long shardSyncIntervalMillis, + boolean cleanupTerminatedShardsBeforeExpiry, + ClientConfiguration kinesisClientConfig, + ClientConfiguration dynamoDBClientConfig, + ClientConfiguration cloudWatchClientConfig, + long taskBackoffTimeMillis, + long metricsBufferTimeMillis, + int metricsMaxQueueSize, + boolean validateSequenceNumberBeforeCheckpointing, + String regionName, + long shutdownGraceMillis, + BillingMode billingMode, + RecordsFetcherFactory recordsFetcherFactory, + long leaseCleanupIntervalMillis, + long completedLeaseCleanupThresholdMillis, + long garbageLeaseCleanupThresholdMillis) { + + // Check following values are greater than zero + checkIsValuePositive("FailoverTimeMillis", failoverTimeMillis); + checkIsValuePositive("IdleTimeBetweenReadsInMillis", idleTimeBetweenReadsInMillis); + checkIsValuePositive("ParentShardPollIntervalMillis", parentShardPollIntervalMillis); + checkIsValuePositive("ShardSyncIntervalMillis", shardSyncIntervalMillis); + checkIsValuePositive("MaxRecords", (long) maxRecords); + checkIsValuePositive("TaskBackoffTimeMillis", taskBackoffTimeMillis); + checkIsValuePositive("MetricsBufferTimeMills", metricsBufferTimeMillis); + checkIsValuePositive("MetricsMaxQueueSize", (long) metricsMaxQueueSize); + this.applicationName = applicationName; + this.tableName = applicationName; + this.streamName = streamARN.getResource().getResource(); + this.streamARN = streamARN; + this.kinesisEndpoint = kinesisEndpoint; + this.dynamoDBEndpoint = dynamoDBEndpoint; + this.initialPositionInStream = initialPositionInStream; + this.kinesisCredentialsProvider = kinesisCredentialsProvider; + this.dynamoDBCredentialsProvider = dynamoDBCredentialsProvider; + this.cloudWatchCredentialsProvider = cloudWatchCredentialsProvider; + this.failoverTimeMillis = failoverTimeMillis; + this.maxRecords = maxRecords; + this.idleTimeBetweenReadsInMillis = idleTimeBetweenReadsInMillis; + this.callProcessRecordsEvenForEmptyRecordList = callProcessRecordsEvenForEmptyRecordList; + this.parentShardPollIntervalMillis = parentShardPollIntervalMillis; + this.shardSyncIntervalMillis = shardSyncIntervalMillis; + this.cleanupLeasesUponShardCompletion = cleanupTerminatedShardsBeforeExpiry; + this.workerIdentifier = workerId; + this.kinesisClientConfig = checkAndAppendKinesisClientLibUserAgent(kinesisClientConfig); + this.dynamoDBClientConfig = checkAndAppendKinesisClientLibUserAgent(dynamoDBClientConfig); + this.cloudWatchClientConfig = checkAndAppendKinesisClientLibUserAgent(cloudWatchClientConfig); + this.taskBackoffTimeMillis = taskBackoffTimeMillis; + this.metricsBufferTimeMillis = metricsBufferTimeMillis; + this.metricsMaxQueueSize = metricsMaxQueueSize; + this.metricsLevel = DEFAULT_METRICS_LEVEL; + this.metricsEnabledDimensions = DEFAULT_METRICS_ENABLED_DIMENSIONS; + this.validateSequenceNumberBeforeCheckpointing = validateSequenceNumberBeforeCheckpointing; + this.regionName = regionName; + this.maxLeasesForWorker = DEFAULT_MAX_LEASES_FOR_WORKER; + this.maxLeasesToStealAtOneTime = DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME; + this.initialLeaseTableReadCapacity = DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY; + this.initialLeaseTableWriteCapacity = DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY; + this.initialPositionInStreamExtended = InitialPositionInStreamExtended.newInitialPosition( + initialPositionInStream); + this.skipShardSyncAtWorkerInitializationIfLeasesExist = DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST; + this.shardSyncStrategyType = DEFAULT_SHARD_SYNC_STRATEGY_TYPE; + this.leasesRecoveryAuditorExecutionFrequencyMillis = LEASES_RECOVERY_AUDITOR_EXECUTION_FREQUENCY_MILLIS; + this.leasesRecoveryAuditorInconsistencyConfidenceThreshold = LEASES_RECOVERY_AUDITOR_INCONSISTENCY_CONFIDENCE_THRESHOLD; + this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION; + this.recordsFetcherFactory = recordsFetcherFactory; + this.leaseCleanupIntervalMillis = leaseCleanupIntervalMillis; + this.completedLeaseCleanupThresholdMillis = completedLeaseCleanupThresholdMillis; + this.garbageLeaseCleanupThresholdMillis = garbageLeaseCleanupThresholdMillis; + this.shutdownGraceMillis = shutdownGraceMillis; + this.billingMode = billingMode; + } + // Check if value is positive, otherwise throw an exception private void checkIsValuePositive(String key, long value) { if (value <= 0) { @@ -833,6 +967,13 @@ public class KinesisClientLibConfiguration { return streamName; } + /** + * @return stream ARN + */ + public Arn getStreamARN() { + return streamARN; + } + /** * @return Kinesis endpoint */ @@ -1056,6 +1197,24 @@ public class KinesisClientLibConfiguration { return shutdownGraceMillis; } + /** + * @param streamName Kinesis stream name + * @return KinesisClientLibConfiguration + */ + public KinesisClientLibConfiguration withStreamName(String streamName) { + this.streamName = streamName; + return this; + } + + /** + * @param streamARN Kinesis stream ARN + * @return KinesisClientLibConfiguration + */ + public KinesisClientLibConfiguration withStreamARN(Arn streamARN) { + this.streamARN = streamARN; + return this; + } + /* // CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 190 LINES /** diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java index c6b641bb..312eae56 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java @@ -37,6 +37,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import com.amazonaws.arn.Arn; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.services.kinesis.AmazonKinesis; import com.amazonaws.services.kinesis.AmazonKinesisClient; @@ -93,6 +94,7 @@ public class KinesisProxy implements IKinesisProxyExtended { private AtomicInteger cacheMisses = new AtomicInteger(0); private final String streamName; + private Arn streamARN; private static final long DEFAULT_DESCRIBE_STREAM_BACKOFF_MILLIS = 1000L; private static final int DEFAULT_DESCRIBE_STREAM_RETRY_TIMES = 50; @@ -218,6 +220,7 @@ public class KinesisProxy implements IKinesisProxyExtended { config.getListShardsBackoffTimeInMillis(), config.getMaxListShardsRetryAttempts()); this.credentialsProvider = config.getKinesisCredentialsProvider(); + this.streamARN = config.getStreamARN(); } public KinesisProxy(final String streamName, @@ -270,6 +273,7 @@ public class KinesisProxy implements IKinesisProxyExtended { final DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); describeStreamRequest.setRequestCredentials(credentialsProvider.getCredentials()); describeStreamRequest.setStreamName(streamName); + describeStreamRequest.setStreamARN(streamARN != null ? streamARN.toString() : null); describeStreamRequest.setExclusiveStartShardId(startShardId); DescribeStreamResult response = null; @@ -314,6 +318,7 @@ public class KinesisProxy implements IKinesisProxyExtended { request.setRequestCredentials(credentialsProvider.getCredentials()); if (StringUtils.isEmpty(nextToken)) { request.setStreamName(streamName); + request.setStreamARN(streamARN != null ? streamARN.toString() : null); request.setShardFilter(shardFilter); } else { request.setNextToken(nextToken); @@ -567,6 +572,7 @@ public class KinesisProxy implements IKinesisProxyExtended { final GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest(); getShardIteratorRequest.setRequestCredentials(credentialsProvider.getCredentials()); getShardIteratorRequest.setStreamName(streamName); + getShardIteratorRequest.setStreamARN(streamARN != null ? streamARN.toString() : null); getShardIteratorRequest.setShardId(shardId); getShardIteratorRequest.setShardIteratorType(iteratorType); getShardIteratorRequest.setStartingSequenceNumber(sequenceNumber); @@ -583,6 +589,7 @@ public class KinesisProxy implements IKinesisProxyExtended { final GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest(); getShardIteratorRequest.setRequestCredentials(credentialsProvider.getCredentials()); getShardIteratorRequest.setStreamName(streamName); + getShardIteratorRequest.setStreamARN(streamARN != null ? streamARN.toString() : null); getShardIteratorRequest.setShardId(shardId); getShardIteratorRequest.setShardIteratorType(iteratorType); getShardIteratorRequest.setStartingSequenceNumber(null); @@ -599,6 +606,7 @@ public class KinesisProxy implements IKinesisProxyExtended { final GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest(); getShardIteratorRequest.setRequestCredentials(credentialsProvider.getCredentials()); getShardIteratorRequest.setStreamName(streamName); + getShardIteratorRequest.setStreamARN(streamARN != null ? streamARN.toString() : null); getShardIteratorRequest.setShardId(shardId); getShardIteratorRequest.setShardIteratorType(ShardIteratorType.AT_TIMESTAMP); getShardIteratorRequest.setStartingSequenceNumber(null); @@ -618,6 +626,7 @@ public class KinesisProxy implements IKinesisProxyExtended { final PutRecordRequest putRecordRequest = new PutRecordRequest(); putRecordRequest.setRequestCredentials(credentialsProvider.getCredentials()); putRecordRequest.setStreamName(streamName); + putRecordRequest.setStreamARN(streamARN != null ? streamARN.toString() : null); putRecordRequest.setSequenceNumberForOrdering(exclusiveMinimumSequenceNumber); putRecordRequest.setExplicitHashKey(explicitHashKey); putRecordRequest.setPartitionKey(partitionKey); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java index f2b5a460..65f80b1b 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java @@ -27,6 +27,7 @@ import org.junit.Test; import org.mockito.Mockito; import com.amazonaws.ClientConfiguration; +import com.amazonaws.arn.Arn; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.regions.Region; import com.amazonaws.regions.RegionUtils; @@ -34,6 +35,7 @@ import com.amazonaws.services.cloudwatch.AmazonCloudWatchClient; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient; import com.amazonaws.services.kinesis.AmazonKinesisClient; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.SimpleRecordsFetcherFactory; import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; import com.google.common.collect.ImmutableSet; @@ -46,8 +48,16 @@ public class KinesisClientLibConfigurationTest { private static final long TEST_VALUE_LONG = 1000L; private static final int TEST_VALUE_INT = 1000; private static final int PARAMETER_COUNT = 6; + private static final String ACCOUNT_ID = "123456789012"; private static final String TEST_STRING = "TestString"; + private static final Arn TEST_ARN = Arn.builder() + .withPartition("aws") + .withService("kinesis") + .withRegion("us-east-1") + .withAccountId(ACCOUNT_ID) + .withResource("stream/" + TEST_STRING) + .build(); private static final String ALTER_STRING = "AlterString"; // We don't want any of these tests to run checkpoint validation @@ -87,7 +97,45 @@ public class KinesisClientLibConfigurationTest { TEST_VALUE_INT, skipCheckpointValidationValue, null, - TEST_VALUE_LONG, BillingMode.PROVISIONED); + TEST_VALUE_LONG, + BillingMode.PROVISIONED, + new SimpleRecordsFetcherFactory(), + TEST_VALUE_LONG, + TEST_VALUE_LONG, + TEST_VALUE_LONG); + + // Test constructor with streamARN with all valid arguments. + config = + new KinesisClientLibConfiguration(TEST_STRING, + TEST_ARN, + TEST_STRING, + TEST_STRING, + InitialPositionInStream.LATEST, + null, + null, + null, + TEST_VALUE_LONG, + TEST_STRING, + TEST_VALUE_INT, + TEST_VALUE_LONG, + false, + TEST_VALUE_LONG, + TEST_VALUE_LONG, + true, + new ClientConfiguration(), + new ClientConfiguration(), + new ClientConfiguration(), + TEST_VALUE_LONG, + TEST_VALUE_LONG, + TEST_VALUE_INT, + skipCheckpointValidationValue, + null, + TEST_VALUE_LONG, BillingMode.PROVISIONED, + new SimpleRecordsFetcherFactory(), + TEST_VALUE_LONG, + TEST_VALUE_LONG, + TEST_VALUE_LONG); + Assert.assertEquals(config.getStreamName(), TEST_STRING); } @Test diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxyTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxyTest.java index 76671176..25fc82b8 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxyTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxyTest.java @@ -59,6 +59,7 @@ import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; import com.amazonaws.AmazonServiceException; +import com.amazonaws.arn.Arn; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient; import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClientChild; @@ -82,6 +83,14 @@ import lombok.AllArgsConstructor; @RunWith(MockitoJUnitRunner.class) public class KinesisProxyTest { private static final String TEST_STRING = "TestString"; + private static final String ACCOUNT_ID = "123456789012"; + private static final Arn TEST_ARN = Arn.builder() + .withPartition("aws") + .withService("kinesis") + .withRegion("us-east-1") + .withAccountId(ACCOUNT_ID) + .withResource("stream/" + TEST_STRING) + .build(); private static final long DESCRIBE_STREAM_BACKOFF_TIME = 10L; private static final long LIST_SHARDS_BACKOFF_TIME = 10L; private static final int DESCRIBE_STREAM_RETRY_TIMES = 3; @@ -118,6 +127,7 @@ public class KinesisProxyTest { private KinesisProxy proxy; private KinesisProxy ddbProxy; + private KinesisProxy ddbProxyWithStreamARN; private KinesisProxy ddbChildProxy; // Test shards for verifying. @@ -132,6 +142,7 @@ public class KinesisProxyTest { public void setUpTest() { // Set up kinesis ddbProxy when(config.getStreamName()).thenReturn(TEST_STRING); + when(config.getStreamARN()).thenReturn(TEST_ARN); when(config.getListShardsBackoffTimeInMillis()).thenReturn(LIST_SHARDS_BACKOFF_TIME); when(config.getMaxListShardsRetryAttempts()).thenReturn(LIST_SHARDS_RETRY_TIMES); when(config.getKinesisCredentialsProvider()).thenReturn(mockCredentialsProvider); @@ -163,7 +174,7 @@ public class KinesisProxyTest { // Second call describeStream returning response with rest shards. DescribeStreamResult responseWithMoreData = createGetStreamInfoResponse(shards.subList(0, 2), true); DescribeStreamResult responseFinal = createGetStreamInfoResponse(shards.subList(2, shards.size()), false); - doReturn(responseWithMoreData).when(mockDDBStreamClient).describeStream(argThat(new IsRequestWithStartShardId(null))); + doReturn(responseWithMoreData).when(mockDDBStreamClient).describeStream(argThat(new IsRequestWithStartShardId(TEST_STRING, null, null))); doReturn(responseFinal).when(mockDDBStreamClient) .describeStream(argThat(new OldIsRequestWithStartShardId(shards.get(1).getShardId()))); @@ -310,7 +321,7 @@ public class KinesisProxyTest { public void testGetShardListWithDDBChildClient() { DescribeStreamResult responseWithMoreData = createGetStreamInfoResponse(shards.subList(0, 2), true); DescribeStreamResult responseFinal = createGetStreamInfoResponse(shards.subList(2, shards.size()), false); - doReturn(responseWithMoreData).when(mockDDBChildClient).describeStream(argThat(new IsRequestWithStartShardId(null))); + doReturn(responseWithMoreData).when(mockDDBChildClient).describeStream(argThat(new IsRequestWithStartShardId(TEST_STRING, null, null))); doReturn(responseFinal).when(mockDDBChildClient) .describeStream(argThat(new OldIsRequestWithStartShardId(shards.get(1).getShardId()))); @@ -499,23 +510,46 @@ public class KinesisProxyTest { } private IsRequestWithStartShardId describeWithoutShardId() { - return describeWithShardId(null); + return new IsRequestWithStartShardId(TEST_STRING, TEST_ARN, null); } private IsRequestWithStartShardId describeWithShardId(String shardId) { - return new IsRequestWithStartShardId(shardId); + return new IsRequestWithStartShardId(TEST_STRING, TEST_ARN, shardId); } + @AllArgsConstructor private static class IsRequestWithStartShardId extends TypeSafeDiagnosingMatcher { + private final String streamName; + private final Arn streamARN; private final String shardId; - public IsRequestWithStartShardId(String shardId) { - this.shardId = shardId; - } - @Override protected boolean matchesSafely(DescribeStreamRequest item, Description mismatchDescription) { + if (streamName == null) { + if (item.getStreamName() != null) { + mismatchDescription.appendText("Expected streamName of null, but was ") + .appendValue(item.getStreamName()); + return false; + } + } else if (!streamName.equals(item.getStreamName())) { + mismatchDescription.appendValue(streamName).appendText(" doesn't match expected ") + .appendValue(item.getStreamName()); + return false; + } + + if (streamARN == null) { + if (item.getStreamARN() != null) { + mismatchDescription.appendText("Expected streamARN of null, but was ") + .appendValue(item.getStreamARN()); + return false; + } + } else if (!streamARN.equals(Arn.fromString(item.getStreamARN()))) { + mismatchDescription.appendValue(streamARN).appendText(" doesn't match expected ") + .appendValue(item.getStreamARN()); + return false; + } + if (shardId == null) { if (item.getExclusiveStartShardId() != null) { mismatchDescription.appendText("Expected starting shard id of null, but was ") @@ -557,20 +591,52 @@ public class KinesisProxyTest { } private static ListShardsRequestMatcher initialListShardsRequestMatcher() { - return new ListShardsRequestMatcher(null, null); + return new ListShardsRequestMatcher(TEST_STRING, TEST_ARN, null, null); } private static ListShardsRequestMatcher listShardsNextToken(final String nextToken) { - return new ListShardsRequestMatcher(null, nextToken); + return new ListShardsRequestMatcher(null, null, null, nextToken); } @AllArgsConstructor private static class ListShardsRequestMatcher extends TypeSafeDiagnosingMatcher { + private final String streamName; + private final Arn streamARN; private final String shardId; private final String nextToken; @Override protected boolean matchesSafely(final ListShardsRequest listShardsRequest, final Description description) { + if (streamName == null) { + if (StringUtils.isNotEmpty(listShardsRequest.getStreamName())) { + description.appendText("Expected streamName to be null, but was ") + .appendValue(listShardsRequest.getStreamName()); + return false; + } + } else { + if (!streamName.equals(listShardsRequest.getStreamName())) { + description.appendText("Expected streamName: ").appendValue(streamName) + .appendText(" doesn't match actual streamName: ") + .appendValue(listShardsRequest.getStreamName()); + return false; + } + } + + if (streamARN == null) { + if (StringUtils.isNotEmpty(listShardsRequest.getStreamARN())) { + description.appendText("Expected streamARN to be null, but was ") + .appendValue(listShardsRequest.getStreamARN()); + return false; + } + } else { + if (!streamARN.equals(Arn.fromString(listShardsRequest.getStreamARN()))) { + description.appendText("Expected streamARN: ").appendValue(streamARN) + .appendText(" doesn't match actual streamARN: ") + .appendValue(listShardsRequest.getStreamARN()); + return false; + } + } + if (shardId == null) { if (StringUtils.isNotEmpty(listShardsRequest.getExclusiveStartShardId())) { description.appendText("Expected ExclusiveStartShardId to be null, but was ")