Add support for Stream ARNs (#1108)

Add support for referencing streams by streamARN
This commit is contained in:
lucienlu-aws 2023-06-07 13:55:51 -07:00 committed by GitHub
parent fe1a34f6e5
commit db30ac956b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 382 additions and 51 deletions

View file

@ -17,12 +17,14 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.time.Duration;
import java.util.Date;
import java.util.Optional;
import java.util.regex.Pattern;
import java.util.Set;
import com.amazonaws.services.dynamodbv2.model.BillingMode;
import org.apache.commons.lang3.Validate;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.arn.Arn;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope;
@ -233,11 +235,25 @@ public class KinesisClientLibConfiguration {
*/
public static final int DEFAULT_MAX_INITIALIZATION_ATTEMPTS = 20;
/**
* Pattern for a stream ARN. The valid format is
* {@code arn:aws:kinesis:<region>:<accountId>:stream/<streamName>}
* where {@code region} is the id representation of a {@link Region}.
*/
private static final Pattern STREAM_ARN_PATTERN = Pattern.compile(
"arn:aws[^:]*:kinesis:(?<region>[-a-z0-9]+):(?<accountId>[0-9]{12}):stream/(?<streamName>.+)");
@Getter
private BillingMode billingMode;
private String applicationName;
private String tableName;
private String streamName;
/**
* Kinesis stream ARN
*/
@Getter
private Arn streamArn;
private String kinesisEndpoint;
private String dynamoDBEndpoint;
private InitialPositionInStream initialPositionInStream;
@ -719,14 +735,105 @@ public class KinesisClientLibConfiguration {
this.billingMode = billingMode;
}
/**
* Duplicate constructor to support stream ARN's in place of stream names.
*
* @param applicationName Name of the Kinesis application
* By default the application name is included in the user agent string used to make AWS requests. This
* can assist with troubleshooting (e.g. distinguish requests made by separate applications).
* @param streamArn Kinesis stream ARN
* @param kinesisEndpoint Kinesis endpoint
* @param dynamoDBEndpoint DynamoDB endpoint
* @param initialPositionInStream One of LATEST or TRIM_HORIZON. The KinesisClientLibrary will start fetching
* records from that location in the stream when an application starts up for the first time and there
* are no checkpoints. If there are checkpoints, then we start from the checkpoint position.
* @param kinesisCredentialsProvider Provides credentials used to access Kinesis
* @param dynamoDBCredentialsProvider Provides credentials used to access DynamoDB
* @param cloudWatchCredentialsProvider Provides credentials used to access CloudWatch
* @param failoverTimeMillis Lease duration (leases not renewed within this period will be claimed by others)
* @param workerId Used to distinguish different workers/processes of a Kinesis application
* @param maxRecords Max records to read per Kinesis getRecords() call
* @param idleTimeBetweenReadsInMillis Idle time between calls to fetch data from Kinesis
* @param callProcessRecordsEvenForEmptyRecordList Call the IRecordProcessor::processRecords() API even if
* GetRecords returned an empty record list.
* @param parentShardPollIntervalMillis Wait for this long between polls to check if parent shards are done
* @param shardSyncIntervalMillis Time between tasks to sync leases and Kinesis shards
* @param cleanupTerminatedShardsBeforeExpiry Clean up shards we've finished processing (don't wait for expiration
* in Kinesis)
* @param kinesisClientConfig Client Configuration used by Kinesis client
* @param dynamoDBClientConfig Client Configuration used by DynamoDB client
* @param cloudWatchClientConfig Client Configuration used by CloudWatch client
* @param taskBackoffTimeMillis Backoff period when tasks encounter an exception
* @param metricsBufferTimeMillis Metrics are buffered for at most this long before publishing to CloudWatch
* @param metricsMaxQueueSize Max number of metrics to buffer before publishing to CloudWatch
* @param validateSequenceNumberBeforeCheckpointing whether KCL should validate client provided sequence numbers
* with a call to Amazon Kinesis before checkpointing for calls to
* {@link RecordProcessorCheckpointer#checkpoint(String)}
* @param regionName The region name for the service
* @param shutdownGraceMillis Time before gracefully shutdown forcefully terminates
* @param billingMode The DDB Billing mode to set for lease table creation.
* @param recordsFetcherFactory Factory to create the records fetcher to retrieve data from Kinesis for a given shard.
* @param leaseCleanupIntervalMillis Rate at which to run lease cleanup thread in
* {@link com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager}
* @param completedLeaseCleanupThresholdMillis Threshold in millis at which to check if there are any completed leases
* (leases for shards which have been closed as a result of a resharding operation) that need to be cleaned up.
* @param garbageLeaseCleanupThresholdMillis Threshold in millis at which to check if there are any garbage leases
* (leases for shards which no longer exist in the stream) that need to be cleaned up.
*/
public KinesisClientLibConfiguration(String applicationName,
Arn streamArn,
String kinesisEndpoint,
String dynamoDBEndpoint,
InitialPositionInStream initialPositionInStream,
AWSCredentialsProvider kinesisCredentialsProvider,
AWSCredentialsProvider dynamoDBCredentialsProvider,
AWSCredentialsProvider cloudWatchCredentialsProvider,
long failoverTimeMillis,
String workerId,
int maxRecords,
long idleTimeBetweenReadsInMillis,
boolean callProcessRecordsEvenForEmptyRecordList,
long parentShardPollIntervalMillis,
long shardSyncIntervalMillis,
boolean cleanupTerminatedShardsBeforeExpiry,
ClientConfiguration kinesisClientConfig,
ClientConfiguration dynamoDBClientConfig,
ClientConfiguration cloudWatchClientConfig,
long taskBackoffTimeMillis,
long metricsBufferTimeMillis,
int metricsMaxQueueSize,
boolean validateSequenceNumberBeforeCheckpointing,
String regionName,
long shutdownGraceMillis,
BillingMode billingMode,
RecordsFetcherFactory recordsFetcherFactory,
long leaseCleanupIntervalMillis,
long completedLeaseCleanupThresholdMillis,
long garbageLeaseCleanupThresholdMillis) {
this(applicationName, streamArn.getResource().getResource(), kinesisEndpoint, dynamoDBEndpoint, initialPositionInStream, kinesisCredentialsProvider,
dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, failoverTimeMillis, workerId, maxRecords, idleTimeBetweenReadsInMillis,
callProcessRecordsEvenForEmptyRecordList, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupTerminatedShardsBeforeExpiry,
kinesisClientConfig, dynamoDBClientConfig, cloudWatchClientConfig, taskBackoffTimeMillis, metricsBufferTimeMillis,
metricsMaxQueueSize, validateSequenceNumberBeforeCheckpointing, regionName, shutdownGraceMillis, billingMode,
recordsFetcherFactory, leaseCleanupIntervalMillis, completedLeaseCleanupThresholdMillis, garbageLeaseCleanupThresholdMillis);
checkIsValidStreamArn(streamArn);
this.streamArn = streamArn;
}
// Check if value is positive, otherwise throw an exception
private void checkIsValuePositive(String key, long value) {
private static void checkIsValuePositive(String key, long value) {
if (value <= 0) {
throw new IllegalArgumentException("Value of " + key
+ " should be positive, but current value is " + value);
}
}
private static void checkIsValidStreamArn(Arn streamArn) {
if (!STREAM_ARN_PATTERN.matcher(streamArn.toString()).matches()) {
throw new IllegalArgumentException("Invalid streamArn " + streamArn);
}
}
// Check if user agent in configuration is the default agent.
// If so, replace it with application name plus KINESIS_CLIENT_LIB_USER_AGENT.
// If not, append KINESIS_CLIENT_LIB_USER_AGENT to the end.
@ -1056,6 +1163,25 @@ public class KinesisClientLibConfiguration {
return shutdownGraceMillis;
}
/**
* @param streamName Kinesis stream name
* @return KinesisClientLibConfiguration
*/
public KinesisClientLibConfiguration withStreamName(String streamName) {
this.streamName = streamName;
return this;
}
/**
* @param streamArn Kinesis stream ARN
* @return KinesisClientLibConfiguration
*/
public KinesisClientLibConfiguration withStreamArn(Arn streamArn) {
checkIsValidStreamArn(streamArn);
this.streamArn = streamArn;
return this;
}
/*
// CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 190 LINES
/**

View file

@ -37,6 +37,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.amazonaws.arn.Arn;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
@ -94,6 +95,12 @@ public class KinesisProxy implements IKinesisProxyExtended {
private final String streamName;
/**
* Stored as a string instead of an ARN to reduce repetitive null checks when passing in the stream ARN to
* the client requests, which accepts a String stream ARN parameter.
*/
private String streamArn;
private static final long DEFAULT_DESCRIBE_STREAM_BACKOFF_MILLIS = 1000L;
private static final int DEFAULT_DESCRIBE_STREAM_RETRY_TIMES = 50;
private final long describeStreamBackoffTimeInMillis;
@ -218,6 +225,8 @@ public class KinesisProxy implements IKinesisProxyExtended {
config.getListShardsBackoffTimeInMillis(),
config.getMaxListShardsRetryAttempts());
this.credentialsProvider = config.getKinesisCredentialsProvider();
Arn arn = config.getStreamArn();
this.streamArn = arn != null ? arn.toString() : null;
}
public KinesisProxy(final String streamName,
@ -253,6 +262,7 @@ public class KinesisProxy implements IKinesisProxyExtended {
final GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
getRecordsRequest.setRequestCredentials(credentialsProvider.getCredentials());
getRecordsRequest.setStreamARN(streamArn);
getRecordsRequest.setShardIterator(shardIterator);
getRecordsRequest.setLimit(maxRecords);
final GetRecordsResult response = client.getRecords(getRecordsRequest);
@ -270,6 +280,7 @@ public class KinesisProxy implements IKinesisProxyExtended {
final DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
describeStreamRequest.setRequestCredentials(credentialsProvider.getCredentials());
describeStreamRequest.setStreamName(streamName);
describeStreamRequest.setStreamARN(streamArn);
describeStreamRequest.setExclusiveStartShardId(startShardId);
DescribeStreamResult response = null;
@ -314,6 +325,7 @@ public class KinesisProxy implements IKinesisProxyExtended {
request.setRequestCredentials(credentialsProvider.getCredentials());
if (StringUtils.isEmpty(nextToken)) {
request.setStreamName(streamName);
request.setStreamARN(streamArn);
request.setShardFilter(shardFilter);
} else {
request.setNextToken(nextToken);
@ -567,6 +579,7 @@ public class KinesisProxy implements IKinesisProxyExtended {
final GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
getShardIteratorRequest.setRequestCredentials(credentialsProvider.getCredentials());
getShardIteratorRequest.setStreamName(streamName);
getShardIteratorRequest.setStreamARN(streamArn);
getShardIteratorRequest.setShardId(shardId);
getShardIteratorRequest.setShardIteratorType(iteratorType);
getShardIteratorRequest.setStartingSequenceNumber(sequenceNumber);
@ -583,6 +596,7 @@ public class KinesisProxy implements IKinesisProxyExtended {
final GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
getShardIteratorRequest.setRequestCredentials(credentialsProvider.getCredentials());
getShardIteratorRequest.setStreamName(streamName);
getShardIteratorRequest.setStreamARN(streamArn);
getShardIteratorRequest.setShardId(shardId);
getShardIteratorRequest.setShardIteratorType(iteratorType);
getShardIteratorRequest.setStartingSequenceNumber(null);
@ -599,6 +613,7 @@ public class KinesisProxy implements IKinesisProxyExtended {
final GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
getShardIteratorRequest.setRequestCredentials(credentialsProvider.getCredentials());
getShardIteratorRequest.setStreamName(streamName);
getShardIteratorRequest.setStreamARN(streamArn);
getShardIteratorRequest.setShardId(shardId);
getShardIteratorRequest.setShardIteratorType(ShardIteratorType.AT_TIMESTAMP);
getShardIteratorRequest.setStartingSequenceNumber(null);
@ -618,6 +633,7 @@ public class KinesisProxy implements IKinesisProxyExtended {
final PutRecordRequest putRecordRequest = new PutRecordRequest();
putRecordRequest.setRequestCredentials(credentialsProvider.getCredentials());
putRecordRequest.setStreamName(streamName);
putRecordRequest.setStreamARN(streamArn);
putRecordRequest.setSequenceNumberForOrdering(exclusiveMinimumSequenceNumber);
putRecordRequest.setExplicitHashKey(explicitHashKey);
putRecordRequest.setPartitionKey(partitionKey);

View file

@ -20,13 +20,16 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import com.amazonaws.services.dynamodbv2.model.BillingMode;
import org.junit.Test;
import org.mockito.Mockito;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.arn.Arn;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.RegionUtils;
@ -34,6 +37,7 @@ import com.amazonaws.services.cloudwatch.AmazonCloudWatchClient;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.SimpleRecordsFetcherFactory;
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
import com.google.common.collect.ImmutableSet;
@ -46,8 +50,28 @@ public class KinesisClientLibConfigurationTest {
private static final long TEST_VALUE_LONG = 1000L;
private static final int TEST_VALUE_INT = 1000;
private static final int PARAMETER_COUNT = 6;
private static final String ACCOUNT_ID = "123456789012";
private static final String TEST_STRING = "TestString";
private static final Arn TEST_ARN = Arn.builder()
.withPartition("aws")
.withService("kinesis")
.withRegion("us-east-1")
.withAccountId(ACCOUNT_ID)
.withResource("stream/" + TEST_STRING)
.build();
/**
* Invalid steamARN due to invalid service. This is a sample used for testing.
* @see KinesisClientLibConfigurationTest#testWithInvalidStreamArnsThrowsException() for more examples
*/
private static final Arn INVALID_TEST_ARN = Arn.builder()
.withPartition("aws")
.withService("dynamodb")
.withRegion("us-east-1")
.withAccountId(ACCOUNT_ID)
.withResource("stream/" + TEST_STRING)
.build();
private static final String ALTER_STRING = "AlterString";
// We don't want any of these tests to run checkpoint validation
@ -62,32 +86,11 @@ public class KinesisClientLibConfigurationTest {
new KinesisClientLibConfiguration(TEST_STRING, TEST_STRING, null, TEST_STRING);
// Test constructor with all valid arguments.
config =
new KinesisClientLibConfiguration(TEST_STRING,
TEST_STRING,
TEST_STRING,
TEST_STRING,
InitialPositionInStream.LATEST,
null,
null,
null,
TEST_VALUE_LONG,
TEST_STRING,
TEST_VALUE_INT,
TEST_VALUE_LONG,
false,
TEST_VALUE_LONG,
TEST_VALUE_LONG,
true,
new ClientConfiguration(),
new ClientConfiguration(),
new ClientConfiguration(),
TEST_VALUE_LONG,
TEST_VALUE_LONG,
TEST_VALUE_INT,
skipCheckpointValidationValue,
null,
TEST_VALUE_LONG, BillingMode.PROVISIONED);
config = buildKinesisClientLibConfiguration(TEST_STRING);
// Test constructor with streamArn with all valid arguments.
config = buildKinesisClientLibConfiguration(TEST_ARN);
Assert.assertEquals(config.getStreamName(), TEST_STRING);
}
@Test
@ -168,6 +171,12 @@ public class KinesisClientLibConfigurationTest {
}
intValues[i] = TEST_VALUE_INT;
}
// Test constructor with invalid streamArn
try {
config = buildKinesisClientLibConfiguration(INVALID_TEST_ARN);
} catch(IllegalArgumentException e) {
System.out.println(e.getMessage());
}
Assert.assertTrue("KCLConfiguration should return null when using negative arguments", config == null);
}
@ -370,4 +379,111 @@ public class KinesisClientLibConfigurationTest {
config = config.withIgnoreUnexpectedChildShards(true);
assertTrue(config.shouldIgnoreUnexpectedChildShards());
}
@Test
public void testWithValidStreamArnsSucceed() {
List<String> validArnList = Arrays.asList(
"arn:aws:kinesis:us-east-1:123456789012:stream/123stream-name123",
"arn:aws-china:kinesis:us-east-2:123456789012:stream/stream-name",
"arn:aws-us-gov:kinesis:us-east-2:123456789012:stream/stream-name"
);
KinesisClientLibConfiguration config =
new KinesisClientLibConfiguration("TestApplication", "TestStream", null, "TestWorker");
for (final String arn : validArnList) {
config.withStreamArn(Arn.fromString(arn));
}
}
@Test
public void testWithInvalidStreamArnsThrowsException() {
List<String> invalidArnList = Arrays.asList(
"arn:abc:kinesis:us-east-1:123456789012:stream/stream-name", //invalid partition
"arn:aws:dynamnodb:us-east-1:123456789012:stream/stream-name", // Kinesis ARN, but with a non-Kinesis service
"arn:aws:kinesis::123456789012:stream/stream-name", // missing region
"arn:aws:kinesis:us-east-1::stream/stream-name", // missing account id
"arn:aws:kinesis:us-east-1:123456789:stream/stream-name", // account id not 12 digits
"arn:aws:kinesis:us-east-1:123456789abc:stream/stream-name", // 12char alphanumeric account id
"arn:aws:kinesis:us-east-1:123456789012:table/table-name", // incorrect resource type
"arn:aws:dynamodb:us-east-1:123456789012:table/myDynamoDBTable" // valid arn but not a stream
);
KinesisClientLibConfiguration config =
new KinesisClientLibConfiguration("TestApplication", "TestStream", null, "TestWorker");
for (final String arnString : invalidArnList) {
Arn arn = Arn.fromString(arnString);
try {
config.withStreamArn(arn);
fail("Arn " + arn + " should have thrown an IllegalArgumentException");
} catch (IllegalArgumentException e) {
// expected
}
}
}
private KinesisClientLibConfiguration buildKinesisClientLibConfiguration(Arn streamArn) {
return new KinesisClientLibConfiguration(TEST_STRING,
streamArn,
TEST_STRING,
TEST_STRING,
InitialPositionInStream.LATEST,
null,
null,
null,
TEST_VALUE_LONG,
TEST_STRING,
TEST_VALUE_INT,
TEST_VALUE_LONG,
false,
TEST_VALUE_LONG,
TEST_VALUE_LONG,
true,
new ClientConfiguration(),
new ClientConfiguration(),
new ClientConfiguration(),
TEST_VALUE_LONG,
TEST_VALUE_LONG,
TEST_VALUE_INT,
skipCheckpointValidationValue,
null,
TEST_VALUE_LONG, BillingMode.PROVISIONED,
new SimpleRecordsFetcherFactory(),
TEST_VALUE_LONG,
TEST_VALUE_LONG,
TEST_VALUE_LONG);
}
private KinesisClientLibConfiguration buildKinesisClientLibConfiguration(String streamName) {
return new KinesisClientLibConfiguration(TEST_STRING,
streamName,
TEST_STRING,
TEST_STRING,
InitialPositionInStream.LATEST,
null,
null,
null,
TEST_VALUE_LONG,
TEST_STRING,
TEST_VALUE_INT,
TEST_VALUE_LONG,
false,
TEST_VALUE_LONG,
TEST_VALUE_LONG,
true,
new ClientConfiguration(),
new ClientConfiguration(),
new ClientConfiguration(),
TEST_VALUE_LONG,
TEST_VALUE_LONG,
TEST_VALUE_INT,
skipCheckpointValidationValue,
null,
TEST_VALUE_LONG, BillingMode.PROVISIONED,
new SimpleRecordsFetcherFactory(),
TEST_VALUE_LONG,
TEST_VALUE_LONG,
TEST_VALUE_LONG);
}
}

View file

@ -48,6 +48,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.Builder;
import org.apache.commons.lang3.StringUtils;
import org.hamcrest.Description;
import org.hamcrest.TypeSafeDiagnosingMatcher;
@ -59,6 +60,7 @@ import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.arn.Arn;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient;
import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClientChild;
@ -77,11 +79,17 @@ import com.amazonaws.services.kinesis.model.ShardIteratorType;
import com.amazonaws.services.kinesis.model.StreamDescription;
import com.amazonaws.services.kinesis.model.StreamStatus;
import lombok.AllArgsConstructor;
@RunWith(MockitoJUnitRunner.class)
public class KinesisProxyTest {
private static final String TEST_STRING = "TestString";
private static final String ACCOUNT_ID = "123456789012";
private static final Arn TEST_ARN = Arn.builder()
.withPartition("aws")
.withService("kinesis")
.withRegion("us-east-1")
.withAccountId(ACCOUNT_ID)
.withResource("stream/" + TEST_STRING)
.build();
private static final long DESCRIBE_STREAM_BACKOFF_TIME = 10L;
private static final long LIST_SHARDS_BACKOFF_TIME = 10L;
private static final int DESCRIBE_STREAM_RETRY_TIMES = 3;
@ -132,6 +140,7 @@ public class KinesisProxyTest {
public void setUpTest() {
// Set up kinesis ddbProxy
when(config.getStreamName()).thenReturn(TEST_STRING);
when(config.getStreamArn()).thenReturn(TEST_ARN);
when(config.getListShardsBackoffTimeInMillis()).thenReturn(LIST_SHARDS_BACKOFF_TIME);
when(config.getMaxListShardsRetryAttempts()).thenReturn(LIST_SHARDS_RETRY_TIMES);
when(config.getKinesisCredentialsProvider()).thenReturn(mockCredentialsProvider);
@ -163,7 +172,8 @@ public class KinesisProxyTest {
// Second call describeStream returning response with rest shards.
DescribeStreamResult responseWithMoreData = createGetStreamInfoResponse(shards.subList(0, 2), true);
DescribeStreamResult responseFinal = createGetStreamInfoResponse(shards.subList(2, shards.size()), false);
doReturn(responseWithMoreData).when(mockDDBStreamClient).describeStream(argThat(new IsRequestWithStartShardId(null)));
IsRequestWithStartShardId requestMatcher = IsRequestWithStartShardId.builder().streamName(TEST_STRING).build();
doReturn(responseWithMoreData).when(mockDDBStreamClient).describeStream(argThat(requestMatcher));
doReturn(responseFinal).when(mockDDBStreamClient)
.describeStream(argThat(new OldIsRequestWithStartShardId(shards.get(1).getShardId())));
@ -310,7 +320,8 @@ public class KinesisProxyTest {
public void testGetShardListWithDDBChildClient() {
DescribeStreamResult responseWithMoreData = createGetStreamInfoResponse(shards.subList(0, 2), true);
DescribeStreamResult responseFinal = createGetStreamInfoResponse(shards.subList(2, shards.size()), false);
doReturn(responseWithMoreData).when(mockDDBChildClient).describeStream(argThat(new IsRequestWithStartShardId(null)));
IsRequestWithStartShardId requestMatcher = IsRequestWithStartShardId.builder().streamName(TEST_STRING).build();
doReturn(responseWithMoreData).when(mockDDBChildClient).describeStream(argThat(requestMatcher));
doReturn(responseFinal).when(mockDDBChildClient)
.describeStream(argThat(new OldIsRequestWithStartShardId(shards.get(1).getShardId())));
@ -498,37 +509,61 @@ public class KinesisProxyTest {
return response;
}
private IsRequestWithStartShardId describeWithoutShardId() {
return describeWithShardId(null);
}
private IsRequestWithStartShardId describeWithShardId(String shardId) {
return new IsRequestWithStartShardId(shardId);
return IsRequestWithStartShardId.builder()
.streamName(TEST_STRING)
.streamArn(TEST_ARN)
.shardId(shardId)
.build();
}
@Builder
private static class IsRequestWithStartShardId extends TypeSafeDiagnosingMatcher<DescribeStreamRequest> {
private final String streamName;
private final Arn streamArn;
private final String shardId;
public IsRequestWithStartShardId(String shardId) {
this.shardId = shardId;
}
@Override
protected boolean matchesSafely(DescribeStreamRequest item, Description mismatchDescription) {
boolean matches = true;
if (streamName == null) {
if (item.getStreamName() != null) {
mismatchDescription.appendText("Expected streamName of null, but was ")
.appendValue(item.getStreamName());
matches = false;
}
} else if (!streamName.equals(item.getStreamName())) {
mismatchDescription.appendValue(streamName).appendText(" doesn't match expected ")
.appendValue(item.getStreamName());
matches = false;
}
if (streamArn == null) {
if (item.getStreamARN() != null) {
mismatchDescription.appendText("Expected streamArn of null, but was ")
.appendValue(item.getStreamARN());
matches = false;
}
} else if (!streamArn.equals(Arn.fromString(item.getStreamARN()))) {
mismatchDescription.appendValue(streamArn).appendText(" doesn't match expected ")
.appendValue(item.getStreamARN());
matches = false;
}
if (shardId == null) {
if (item.getExclusiveStartShardId() != null) {
mismatchDescription.appendText("Expected starting shard id of null, but was ")
.appendValue(item.getExclusiveStartShardId());
return false;
matches = false;
}
} else if (!shardId.equals(item.getExclusiveStartShardId())) {
mismatchDescription.appendValue(shardId).appendText(" doesn't match expected ")
.appendValue(item.getExclusiveStartShardId());
return false;
matches = false;
}
return true;
return matches;
}
@Override
@ -557,49 +592,87 @@ public class KinesisProxyTest {
}
private static ListShardsRequestMatcher initialListShardsRequestMatcher() {
return new ListShardsRequestMatcher(null, null);
return ListShardsRequestMatcher.builder()
.streamName(TEST_STRING)
.streamArn(TEST_ARN)
.build();
}
private static ListShardsRequestMatcher listShardsNextToken(final String nextToken) {
return new ListShardsRequestMatcher(null, nextToken);
return ListShardsRequestMatcher.builder()
.nextToken(nextToken)
.build();
}
@AllArgsConstructor
@Builder
private static class ListShardsRequestMatcher extends TypeSafeDiagnosingMatcher<ListShardsRequest> {
private final String streamName;
private final Arn streamArn;
private final String shardId;
private final String nextToken;
@Override
protected boolean matchesSafely(final ListShardsRequest listShardsRequest, final Description description) {
boolean matches = true;
if (streamName == null) {
if (StringUtils.isNotEmpty(listShardsRequest.getStreamName())) {
description.appendText("Expected streamName to be null, but was ")
.appendValue(listShardsRequest.getStreamName());
matches = false;
}
} else {
if (!streamName.equals(listShardsRequest.getStreamName())) {
description.appendText("Expected streamName: ").appendValue(streamName)
.appendText(" doesn't match actual streamName: ")
.appendValue(listShardsRequest.getStreamName());
matches = false;
}
}
if (streamArn == null) {
if (StringUtils.isNotEmpty(listShardsRequest.getStreamARN())) {
description.appendText("Expected streamArn to be null, but was ")
.appendValue(listShardsRequest.getStreamARN());
matches = false;
}
} else {
if (!streamArn.equals(Arn.fromString(listShardsRequest.getStreamARN()))) {
description.appendText("Expected streamArn: ").appendValue(streamArn)
.appendText(" doesn't match actual streamArn: ")
.appendValue(listShardsRequest.getStreamARN());
matches = false;
}
}
if (shardId == null) {
if (StringUtils.isNotEmpty(listShardsRequest.getExclusiveStartShardId())) {
description.appendText("Expected ExclusiveStartShardId to be null, but was ")
.appendValue(listShardsRequest.getExclusiveStartShardId());
return false;
matches = false;
}
} else {
if (!shardId.equals(listShardsRequest.getExclusiveStartShardId())) {
description.appendText("Expected shardId: ").appendValue(shardId)
.appendText(" doesn't match actual shardId: ")
.appendValue(listShardsRequest.getExclusiveStartShardId());
return false;
matches = false;
}
}
if (StringUtils.isNotEmpty(listShardsRequest.getNextToken())) {
if (StringUtils.isNotEmpty(listShardsRequest.getStreamName()) || StringUtils.isNotEmpty(listShardsRequest.getExclusiveStartShardId())) {
return false;
matches = false;
}
if (!listShardsRequest.getNextToken().equals(nextToken)) {
description.appendText("Found nextToken: ").appendValue(listShardsRequest.getNextToken())
.appendText(" when it was supposed to be null.");
return false;
matches = false;
}
} else {
return nextToken == null;
}
return true;
return matches;
}
@Override