Add support for Stream ARNs
This commit is contained in:
parent
43edc90c27
commit
c74938e559
4 changed files with 293 additions and 11 deletions
|
|
@ -23,6 +23,7 @@ import com.amazonaws.services.dynamodbv2.model.BillingMode;
|
|||
import org.apache.commons.lang3.Validate;
|
||||
|
||||
import com.amazonaws.ClientConfiguration;
|
||||
import com.amazonaws.arn.Arn;
|
||||
import com.amazonaws.auth.AWSCredentialsProvider;
|
||||
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
|
||||
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope;
|
||||
|
|
@ -238,6 +239,7 @@ public class KinesisClientLibConfiguration {
|
|||
private String applicationName;
|
||||
private String tableName;
|
||||
private String streamName;
|
||||
private Arn streamARN;
|
||||
private String kinesisEndpoint;
|
||||
private String dynamoDBEndpoint;
|
||||
private InitialPositionInStream initialPositionInStream;
|
||||
|
|
@ -719,6 +721,138 @@ public class KinesisClientLibConfiguration {
|
|||
this.billingMode = billingMode;
|
||||
}
|
||||
|
||||
/**
|
||||
* Duplicate constructor to support stream ARN's in place of stream names.
|
||||
*
|
||||
* @param applicationName Name of the Kinesis application
|
||||
* By default the application name is included in the user agent string used to make AWS requests. This
|
||||
* can assist with troubleshooting (e.g. distinguish requests made by separate applications).
|
||||
* @param streamARN Kinesis stream ARN
|
||||
* @param kinesisEndpoint Kinesis endpoint
|
||||
* @param dynamoDBEndpoint DynamoDB endpoint
|
||||
* @param initialPositionInStream One of LATEST or TRIM_HORIZON. The KinesisClientLibrary will start fetching
|
||||
* records from that location in the stream when an application starts up for the first time and there
|
||||
* are no checkpoints. If there are checkpoints, then we start from the checkpoint position.
|
||||
* @param kinesisCredentialsProvider Provides credentials used to access Kinesis
|
||||
* @param dynamoDBCredentialsProvider Provides credentials used to access DynamoDB
|
||||
* @param cloudWatchCredentialsProvider Provides credentials used to access CloudWatch
|
||||
* @param failoverTimeMillis Lease duration (leases not renewed within this period will be claimed by others)
|
||||
* @param workerId Used to distinguish different workers/processes of a Kinesis application
|
||||
* @param maxRecords Max records to read per Kinesis getRecords() call
|
||||
* @param idleTimeBetweenReadsInMillis Idle time between calls to fetch data from Kinesis
|
||||
* @param callProcessRecordsEvenForEmptyRecordList Call the IRecordProcessor::processRecords() API even if
|
||||
* GetRecords returned an empty record list.
|
||||
* @param parentShardPollIntervalMillis Wait for this long between polls to check if parent shards are done
|
||||
* @param shardSyncIntervalMillis Time between tasks to sync leases and Kinesis shards
|
||||
* @param cleanupTerminatedShardsBeforeExpiry Clean up shards we've finished processing (don't wait for expiration
|
||||
* in Kinesis)
|
||||
* @param kinesisClientConfig Client Configuration used by Kinesis client
|
||||
* @param dynamoDBClientConfig Client Configuration used by DynamoDB client
|
||||
* @param cloudWatchClientConfig Client Configuration used by CloudWatch client
|
||||
* @param taskBackoffTimeMillis Backoff period when tasks encounter an exception
|
||||
* @param metricsBufferTimeMillis Metrics are buffered for at most this long before publishing to CloudWatch
|
||||
* @param metricsMaxQueueSize Max number of metrics to buffer before publishing to CloudWatch
|
||||
* @param validateSequenceNumberBeforeCheckpointing whether KCL should validate client provided sequence numbers
|
||||
* with a call to Amazon Kinesis before checkpointing for calls to
|
||||
* {@link RecordProcessorCheckpointer#checkpoint(String)}
|
||||
* @param regionName The region name for the service
|
||||
* @param shutdownGraceMillis Time before gracefully shutdown forcefully terminates
|
||||
* @param billingMode The DDB Billing mode to set for lease table creation.
|
||||
* @param recordsFetcherFactory Factory to create the records fetcher to retrieve data from Kinesis for a given shard.
|
||||
* @param leaseCleanupIntervalMillis Rate at which to run lease cleanup thread in
|
||||
* {@link com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager}
|
||||
* @param completedLeaseCleanupThresholdMillis Threshold in millis at which to check if there are any completed leases
|
||||
* (leases for shards which have been closed as a result of a resharding operation) that need to be cleaned up.
|
||||
* @param garbageLeaseCleanupThresholdMillis Threshold in millis at which to check if there are any garbage leases
|
||||
* (leases for shards which no longer exist in the stream) that need to be cleaned up.
|
||||
*/
|
||||
public KinesisClientLibConfiguration(String applicationName,
|
||||
Arn streamARN,
|
||||
String kinesisEndpoint,
|
||||
String dynamoDBEndpoint,
|
||||
InitialPositionInStream initialPositionInStream,
|
||||
AWSCredentialsProvider kinesisCredentialsProvider,
|
||||
AWSCredentialsProvider dynamoDBCredentialsProvider,
|
||||
AWSCredentialsProvider cloudWatchCredentialsProvider,
|
||||
long failoverTimeMillis,
|
||||
String workerId,
|
||||
int maxRecords,
|
||||
long idleTimeBetweenReadsInMillis,
|
||||
boolean callProcessRecordsEvenForEmptyRecordList,
|
||||
long parentShardPollIntervalMillis,
|
||||
long shardSyncIntervalMillis,
|
||||
boolean cleanupTerminatedShardsBeforeExpiry,
|
||||
ClientConfiguration kinesisClientConfig,
|
||||
ClientConfiguration dynamoDBClientConfig,
|
||||
ClientConfiguration cloudWatchClientConfig,
|
||||
long taskBackoffTimeMillis,
|
||||
long metricsBufferTimeMillis,
|
||||
int metricsMaxQueueSize,
|
||||
boolean validateSequenceNumberBeforeCheckpointing,
|
||||
String regionName,
|
||||
long shutdownGraceMillis,
|
||||
BillingMode billingMode,
|
||||
RecordsFetcherFactory recordsFetcherFactory,
|
||||
long leaseCleanupIntervalMillis,
|
||||
long completedLeaseCleanupThresholdMillis,
|
||||
long garbageLeaseCleanupThresholdMillis) {
|
||||
|
||||
// Check following values are greater than zero
|
||||
checkIsValuePositive("FailoverTimeMillis", failoverTimeMillis);
|
||||
checkIsValuePositive("IdleTimeBetweenReadsInMillis", idleTimeBetweenReadsInMillis);
|
||||
checkIsValuePositive("ParentShardPollIntervalMillis", parentShardPollIntervalMillis);
|
||||
checkIsValuePositive("ShardSyncIntervalMillis", shardSyncIntervalMillis);
|
||||
checkIsValuePositive("MaxRecords", (long) maxRecords);
|
||||
checkIsValuePositive("TaskBackoffTimeMillis", taskBackoffTimeMillis);
|
||||
checkIsValuePositive("MetricsBufferTimeMills", metricsBufferTimeMillis);
|
||||
checkIsValuePositive("MetricsMaxQueueSize", (long) metricsMaxQueueSize);
|
||||
this.applicationName = applicationName;
|
||||
this.tableName = applicationName;
|
||||
this.streamName = streamARN.getResource().getResource();
|
||||
this.streamARN = streamARN;
|
||||
this.kinesisEndpoint = kinesisEndpoint;
|
||||
this.dynamoDBEndpoint = dynamoDBEndpoint;
|
||||
this.initialPositionInStream = initialPositionInStream;
|
||||
this.kinesisCredentialsProvider = kinesisCredentialsProvider;
|
||||
this.dynamoDBCredentialsProvider = dynamoDBCredentialsProvider;
|
||||
this.cloudWatchCredentialsProvider = cloudWatchCredentialsProvider;
|
||||
this.failoverTimeMillis = failoverTimeMillis;
|
||||
this.maxRecords = maxRecords;
|
||||
this.idleTimeBetweenReadsInMillis = idleTimeBetweenReadsInMillis;
|
||||
this.callProcessRecordsEvenForEmptyRecordList = callProcessRecordsEvenForEmptyRecordList;
|
||||
this.parentShardPollIntervalMillis = parentShardPollIntervalMillis;
|
||||
this.shardSyncIntervalMillis = shardSyncIntervalMillis;
|
||||
this.cleanupLeasesUponShardCompletion = cleanupTerminatedShardsBeforeExpiry;
|
||||
this.workerIdentifier = workerId;
|
||||
this.kinesisClientConfig = checkAndAppendKinesisClientLibUserAgent(kinesisClientConfig);
|
||||
this.dynamoDBClientConfig = checkAndAppendKinesisClientLibUserAgent(dynamoDBClientConfig);
|
||||
this.cloudWatchClientConfig = checkAndAppendKinesisClientLibUserAgent(cloudWatchClientConfig);
|
||||
this.taskBackoffTimeMillis = taskBackoffTimeMillis;
|
||||
this.metricsBufferTimeMillis = metricsBufferTimeMillis;
|
||||
this.metricsMaxQueueSize = metricsMaxQueueSize;
|
||||
this.metricsLevel = DEFAULT_METRICS_LEVEL;
|
||||
this.metricsEnabledDimensions = DEFAULT_METRICS_ENABLED_DIMENSIONS;
|
||||
this.validateSequenceNumberBeforeCheckpointing = validateSequenceNumberBeforeCheckpointing;
|
||||
this.regionName = regionName;
|
||||
this.maxLeasesForWorker = DEFAULT_MAX_LEASES_FOR_WORKER;
|
||||
this.maxLeasesToStealAtOneTime = DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME;
|
||||
this.initialLeaseTableReadCapacity = DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY;
|
||||
this.initialLeaseTableWriteCapacity = DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY;
|
||||
this.initialPositionInStreamExtended = InitialPositionInStreamExtended.newInitialPosition(
|
||||
initialPositionInStream);
|
||||
this.skipShardSyncAtWorkerInitializationIfLeasesExist = DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST;
|
||||
this.shardSyncStrategyType = DEFAULT_SHARD_SYNC_STRATEGY_TYPE;
|
||||
this.leasesRecoveryAuditorExecutionFrequencyMillis = LEASES_RECOVERY_AUDITOR_EXECUTION_FREQUENCY_MILLIS;
|
||||
this.leasesRecoveryAuditorInconsistencyConfidenceThreshold = LEASES_RECOVERY_AUDITOR_INCONSISTENCY_CONFIDENCE_THRESHOLD;
|
||||
this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION;
|
||||
this.recordsFetcherFactory = recordsFetcherFactory;
|
||||
this.leaseCleanupIntervalMillis = leaseCleanupIntervalMillis;
|
||||
this.completedLeaseCleanupThresholdMillis = completedLeaseCleanupThresholdMillis;
|
||||
this.garbageLeaseCleanupThresholdMillis = garbageLeaseCleanupThresholdMillis;
|
||||
this.shutdownGraceMillis = shutdownGraceMillis;
|
||||
this.billingMode = billingMode;
|
||||
}
|
||||
|
||||
// Check if value is positive, otherwise throw an exception
|
||||
private void checkIsValuePositive(String key, long value) {
|
||||
if (value <= 0) {
|
||||
|
|
@ -833,6 +967,13 @@ public class KinesisClientLibConfiguration {
|
|||
return streamName;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return stream ARN
|
||||
*/
|
||||
public Arn getStreamARN() {
|
||||
return streamARN;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Kinesis endpoint
|
||||
*/
|
||||
|
|
@ -1056,6 +1197,24 @@ public class KinesisClientLibConfiguration {
|
|||
return shutdownGraceMillis;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param streamName Kinesis stream name
|
||||
* @return KinesisClientLibConfiguration
|
||||
*/
|
||||
public KinesisClientLibConfiguration withStreamName(String streamName) {
|
||||
this.streamName = streamName;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param streamARN Kinesis stream ARN
|
||||
* @return KinesisClientLibConfiguration
|
||||
*/
|
||||
public KinesisClientLibConfiguration withStreamARN(Arn streamARN) {
|
||||
this.streamARN = streamARN;
|
||||
return this;
|
||||
}
|
||||
|
||||
/*
|
||||
// CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 190 LINES
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -37,6 +37,7 @@ import org.apache.commons.lang3.StringUtils;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import com.amazonaws.arn.Arn;
|
||||
import com.amazonaws.auth.AWSCredentialsProvider;
|
||||
import com.amazonaws.services.kinesis.AmazonKinesis;
|
||||
import com.amazonaws.services.kinesis.AmazonKinesisClient;
|
||||
|
|
@ -93,6 +94,7 @@ public class KinesisProxy implements IKinesisProxyExtended {
|
|||
private AtomicInteger cacheMisses = new AtomicInteger(0);
|
||||
|
||||
private final String streamName;
|
||||
private Arn streamARN;
|
||||
|
||||
private static final long DEFAULT_DESCRIBE_STREAM_BACKOFF_MILLIS = 1000L;
|
||||
private static final int DEFAULT_DESCRIBE_STREAM_RETRY_TIMES = 50;
|
||||
|
|
@ -218,6 +220,7 @@ public class KinesisProxy implements IKinesisProxyExtended {
|
|||
config.getListShardsBackoffTimeInMillis(),
|
||||
config.getMaxListShardsRetryAttempts());
|
||||
this.credentialsProvider = config.getKinesisCredentialsProvider();
|
||||
this.streamARN = config.getStreamARN();
|
||||
}
|
||||
|
||||
public KinesisProxy(final String streamName,
|
||||
|
|
@ -270,6 +273,7 @@ public class KinesisProxy implements IKinesisProxyExtended {
|
|||
final DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
|
||||
describeStreamRequest.setRequestCredentials(credentialsProvider.getCredentials());
|
||||
describeStreamRequest.setStreamName(streamName);
|
||||
describeStreamRequest.setStreamARN(streamARN != null ? streamARN.toString() : null);
|
||||
describeStreamRequest.setExclusiveStartShardId(startShardId);
|
||||
DescribeStreamResult response = null;
|
||||
|
||||
|
|
@ -314,6 +318,7 @@ public class KinesisProxy implements IKinesisProxyExtended {
|
|||
request.setRequestCredentials(credentialsProvider.getCredentials());
|
||||
if (StringUtils.isEmpty(nextToken)) {
|
||||
request.setStreamName(streamName);
|
||||
request.setStreamARN(streamARN != null ? streamARN.toString() : null);
|
||||
request.setShardFilter(shardFilter);
|
||||
} else {
|
||||
request.setNextToken(nextToken);
|
||||
|
|
@ -567,6 +572,7 @@ public class KinesisProxy implements IKinesisProxyExtended {
|
|||
final GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
|
||||
getShardIteratorRequest.setRequestCredentials(credentialsProvider.getCredentials());
|
||||
getShardIteratorRequest.setStreamName(streamName);
|
||||
getShardIteratorRequest.setStreamARN(streamARN != null ? streamARN.toString() : null);
|
||||
getShardIteratorRequest.setShardId(shardId);
|
||||
getShardIteratorRequest.setShardIteratorType(iteratorType);
|
||||
getShardIteratorRequest.setStartingSequenceNumber(sequenceNumber);
|
||||
|
|
@ -583,6 +589,7 @@ public class KinesisProxy implements IKinesisProxyExtended {
|
|||
final GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
|
||||
getShardIteratorRequest.setRequestCredentials(credentialsProvider.getCredentials());
|
||||
getShardIteratorRequest.setStreamName(streamName);
|
||||
getShardIteratorRequest.setStreamARN(streamARN != null ? streamARN.toString() : null);
|
||||
getShardIteratorRequest.setShardId(shardId);
|
||||
getShardIteratorRequest.setShardIteratorType(iteratorType);
|
||||
getShardIteratorRequest.setStartingSequenceNumber(null);
|
||||
|
|
@ -599,6 +606,7 @@ public class KinesisProxy implements IKinesisProxyExtended {
|
|||
final GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
|
||||
getShardIteratorRequest.setRequestCredentials(credentialsProvider.getCredentials());
|
||||
getShardIteratorRequest.setStreamName(streamName);
|
||||
getShardIteratorRequest.setStreamARN(streamARN != null ? streamARN.toString() : null);
|
||||
getShardIteratorRequest.setShardId(shardId);
|
||||
getShardIteratorRequest.setShardIteratorType(ShardIteratorType.AT_TIMESTAMP);
|
||||
getShardIteratorRequest.setStartingSequenceNumber(null);
|
||||
|
|
@ -618,6 +626,7 @@ public class KinesisProxy implements IKinesisProxyExtended {
|
|||
final PutRecordRequest putRecordRequest = new PutRecordRequest();
|
||||
putRecordRequest.setRequestCredentials(credentialsProvider.getCredentials());
|
||||
putRecordRequest.setStreamName(streamName);
|
||||
putRecordRequest.setStreamARN(streamARN != null ? streamARN.toString() : null);
|
||||
putRecordRequest.setSequenceNumberForOrdering(exclusiveMinimumSequenceNumber);
|
||||
putRecordRequest.setExplicitHashKey(explicitHashKey);
|
||||
putRecordRequest.setPartitionKey(partitionKey);
|
||||
|
|
|
|||
|
|
@ -27,6 +27,7 @@ import org.junit.Test;
|
|||
import org.mockito.Mockito;
|
||||
|
||||
import com.amazonaws.ClientConfiguration;
|
||||
import com.amazonaws.arn.Arn;
|
||||
import com.amazonaws.auth.AWSCredentialsProvider;
|
||||
import com.amazonaws.regions.Region;
|
||||
import com.amazonaws.regions.RegionUtils;
|
||||
|
|
@ -34,6 +35,7 @@ import com.amazonaws.services.cloudwatch.AmazonCloudWatchClient;
|
|||
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
|
||||
import com.amazonaws.services.kinesis.AmazonKinesisClient;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.SimpleRecordsFetcherFactory;
|
||||
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
|
||||
|
|
@ -46,8 +48,16 @@ public class KinesisClientLibConfigurationTest {
|
|||
private static final long TEST_VALUE_LONG = 1000L;
|
||||
private static final int TEST_VALUE_INT = 1000;
|
||||
private static final int PARAMETER_COUNT = 6;
|
||||
private static final String ACCOUNT_ID = "123456789012";
|
||||
|
||||
private static final String TEST_STRING = "TestString";
|
||||
private static final Arn TEST_ARN = Arn.builder()
|
||||
.withPartition("aws")
|
||||
.withService("kinesis")
|
||||
.withRegion("us-east-1")
|
||||
.withAccountId(ACCOUNT_ID)
|
||||
.withResource("stream/" + TEST_STRING)
|
||||
.build();
|
||||
private static final String ALTER_STRING = "AlterString";
|
||||
|
||||
// We don't want any of these tests to run checkpoint validation
|
||||
|
|
@ -87,7 +97,45 @@ public class KinesisClientLibConfigurationTest {
|
|||
TEST_VALUE_INT,
|
||||
skipCheckpointValidationValue,
|
||||
null,
|
||||
TEST_VALUE_LONG, BillingMode.PROVISIONED);
|
||||
TEST_VALUE_LONG,
|
||||
BillingMode.PROVISIONED,
|
||||
new SimpleRecordsFetcherFactory(),
|
||||
TEST_VALUE_LONG,
|
||||
TEST_VALUE_LONG,
|
||||
TEST_VALUE_LONG);
|
||||
|
||||
// Test constructor with streamARN with all valid arguments.
|
||||
config =
|
||||
new KinesisClientLibConfiguration(TEST_STRING,
|
||||
TEST_ARN,
|
||||
TEST_STRING,
|
||||
TEST_STRING,
|
||||
InitialPositionInStream.LATEST,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
TEST_VALUE_LONG,
|
||||
TEST_STRING,
|
||||
TEST_VALUE_INT,
|
||||
TEST_VALUE_LONG,
|
||||
false,
|
||||
TEST_VALUE_LONG,
|
||||
TEST_VALUE_LONG,
|
||||
true,
|
||||
new ClientConfiguration(),
|
||||
new ClientConfiguration(),
|
||||
new ClientConfiguration(),
|
||||
TEST_VALUE_LONG,
|
||||
TEST_VALUE_LONG,
|
||||
TEST_VALUE_INT,
|
||||
skipCheckpointValidationValue,
|
||||
null,
|
||||
TEST_VALUE_LONG, BillingMode.PROVISIONED,
|
||||
new SimpleRecordsFetcherFactory(),
|
||||
TEST_VALUE_LONG,
|
||||
TEST_VALUE_LONG,
|
||||
TEST_VALUE_LONG);
|
||||
Assert.assertEquals(config.getStreamName(), TEST_STRING);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
|||
|
|
@ -59,6 +59,7 @@ import org.mockito.Mock;
|
|||
import org.mockito.runners.MockitoJUnitRunner;
|
||||
|
||||
import com.amazonaws.AmazonServiceException;
|
||||
import com.amazonaws.arn.Arn;
|
||||
import com.amazonaws.auth.AWSCredentialsProvider;
|
||||
import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient;
|
||||
import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClientChild;
|
||||
|
|
@ -82,6 +83,14 @@ import lombok.AllArgsConstructor;
|
|||
@RunWith(MockitoJUnitRunner.class)
|
||||
public class KinesisProxyTest {
|
||||
private static final String TEST_STRING = "TestString";
|
||||
private static final String ACCOUNT_ID = "123456789012";
|
||||
private static final Arn TEST_ARN = Arn.builder()
|
||||
.withPartition("aws")
|
||||
.withService("kinesis")
|
||||
.withRegion("us-east-1")
|
||||
.withAccountId(ACCOUNT_ID)
|
||||
.withResource("stream/" + TEST_STRING)
|
||||
.build();
|
||||
private static final long DESCRIBE_STREAM_BACKOFF_TIME = 10L;
|
||||
private static final long LIST_SHARDS_BACKOFF_TIME = 10L;
|
||||
private static final int DESCRIBE_STREAM_RETRY_TIMES = 3;
|
||||
|
|
@ -118,6 +127,7 @@ public class KinesisProxyTest {
|
|||
|
||||
private KinesisProxy proxy;
|
||||
private KinesisProxy ddbProxy;
|
||||
private KinesisProxy ddbProxyWithStreamARN;
|
||||
private KinesisProxy ddbChildProxy;
|
||||
|
||||
// Test shards for verifying.
|
||||
|
|
@ -132,6 +142,7 @@ public class KinesisProxyTest {
|
|||
public void setUpTest() {
|
||||
// Set up kinesis ddbProxy
|
||||
when(config.getStreamName()).thenReturn(TEST_STRING);
|
||||
when(config.getStreamARN()).thenReturn(TEST_ARN);
|
||||
when(config.getListShardsBackoffTimeInMillis()).thenReturn(LIST_SHARDS_BACKOFF_TIME);
|
||||
when(config.getMaxListShardsRetryAttempts()).thenReturn(LIST_SHARDS_RETRY_TIMES);
|
||||
when(config.getKinesisCredentialsProvider()).thenReturn(mockCredentialsProvider);
|
||||
|
|
@ -163,7 +174,7 @@ public class KinesisProxyTest {
|
|||
// Second call describeStream returning response with rest shards.
|
||||
DescribeStreamResult responseWithMoreData = createGetStreamInfoResponse(shards.subList(0, 2), true);
|
||||
DescribeStreamResult responseFinal = createGetStreamInfoResponse(shards.subList(2, shards.size()), false);
|
||||
doReturn(responseWithMoreData).when(mockDDBStreamClient).describeStream(argThat(new IsRequestWithStartShardId(null)));
|
||||
doReturn(responseWithMoreData).when(mockDDBStreamClient).describeStream(argThat(new IsRequestWithStartShardId(TEST_STRING, null, null)));
|
||||
doReturn(responseFinal).when(mockDDBStreamClient)
|
||||
.describeStream(argThat(new OldIsRequestWithStartShardId(shards.get(1).getShardId())));
|
||||
|
||||
|
|
@ -310,7 +321,7 @@ public class KinesisProxyTest {
|
|||
public void testGetShardListWithDDBChildClient() {
|
||||
DescribeStreamResult responseWithMoreData = createGetStreamInfoResponse(shards.subList(0, 2), true);
|
||||
DescribeStreamResult responseFinal = createGetStreamInfoResponse(shards.subList(2, shards.size()), false);
|
||||
doReturn(responseWithMoreData).when(mockDDBChildClient).describeStream(argThat(new IsRequestWithStartShardId(null)));
|
||||
doReturn(responseWithMoreData).when(mockDDBChildClient).describeStream(argThat(new IsRequestWithStartShardId(TEST_STRING, null, null)));
|
||||
doReturn(responseFinal).when(mockDDBChildClient)
|
||||
.describeStream(argThat(new OldIsRequestWithStartShardId(shards.get(1).getShardId())));
|
||||
|
||||
|
|
@ -499,23 +510,46 @@ public class KinesisProxyTest {
|
|||
}
|
||||
|
||||
private IsRequestWithStartShardId describeWithoutShardId() {
|
||||
return describeWithShardId(null);
|
||||
return new IsRequestWithStartShardId(TEST_STRING, TEST_ARN, null);
|
||||
}
|
||||
|
||||
private IsRequestWithStartShardId describeWithShardId(String shardId) {
|
||||
return new IsRequestWithStartShardId(shardId);
|
||||
return new IsRequestWithStartShardId(TEST_STRING, TEST_ARN, shardId);
|
||||
}
|
||||
|
||||
@AllArgsConstructor
|
||||
private static class IsRequestWithStartShardId extends TypeSafeDiagnosingMatcher<DescribeStreamRequest> {
|
||||
|
||||
private final String streamName;
|
||||
private final Arn streamARN;
|
||||
private final String shardId;
|
||||
|
||||
public IsRequestWithStartShardId(String shardId) {
|
||||
this.shardId = shardId;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean matchesSafely(DescribeStreamRequest item, Description mismatchDescription) {
|
||||
if (streamName == null) {
|
||||
if (item.getStreamName() != null) {
|
||||
mismatchDescription.appendText("Expected streamName of null, but was ")
|
||||
.appendValue(item.getStreamName());
|
||||
return false;
|
||||
}
|
||||
} else if (!streamName.equals(item.getStreamName())) {
|
||||
mismatchDescription.appendValue(streamName).appendText(" doesn't match expected ")
|
||||
.appendValue(item.getStreamName());
|
||||
return false;
|
||||
}
|
||||
|
||||
if (streamARN == null) {
|
||||
if (item.getStreamARN() != null) {
|
||||
mismatchDescription.appendText("Expected streamARN of null, but was ")
|
||||
.appendValue(item.getStreamARN());
|
||||
return false;
|
||||
}
|
||||
} else if (!streamARN.equals(Arn.fromString(item.getStreamARN()))) {
|
||||
mismatchDescription.appendValue(streamARN).appendText(" doesn't match expected ")
|
||||
.appendValue(item.getStreamARN());
|
||||
return false;
|
||||
}
|
||||
|
||||
if (shardId == null) {
|
||||
if (item.getExclusiveStartShardId() != null) {
|
||||
mismatchDescription.appendText("Expected starting shard id of null, but was ")
|
||||
|
|
@ -557,20 +591,52 @@ public class KinesisProxyTest {
|
|||
}
|
||||
|
||||
private static ListShardsRequestMatcher initialListShardsRequestMatcher() {
|
||||
return new ListShardsRequestMatcher(null, null);
|
||||
return new ListShardsRequestMatcher(TEST_STRING, TEST_ARN, null, null);
|
||||
}
|
||||
|
||||
private static ListShardsRequestMatcher listShardsNextToken(final String nextToken) {
|
||||
return new ListShardsRequestMatcher(null, nextToken);
|
||||
return new ListShardsRequestMatcher(null, null, null, nextToken);
|
||||
}
|
||||
|
||||
@AllArgsConstructor
|
||||
private static class ListShardsRequestMatcher extends TypeSafeDiagnosingMatcher<ListShardsRequest> {
|
||||
private final String streamName;
|
||||
private final Arn streamARN;
|
||||
private final String shardId;
|
||||
private final String nextToken;
|
||||
|
||||
@Override
|
||||
protected boolean matchesSafely(final ListShardsRequest listShardsRequest, final Description description) {
|
||||
if (streamName == null) {
|
||||
if (StringUtils.isNotEmpty(listShardsRequest.getStreamName())) {
|
||||
description.appendText("Expected streamName to be null, but was ")
|
||||
.appendValue(listShardsRequest.getStreamName());
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
if (!streamName.equals(listShardsRequest.getStreamName())) {
|
||||
description.appendText("Expected streamName: ").appendValue(streamName)
|
||||
.appendText(" doesn't match actual streamName: ")
|
||||
.appendValue(listShardsRequest.getStreamName());
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
if (streamARN == null) {
|
||||
if (StringUtils.isNotEmpty(listShardsRequest.getStreamARN())) {
|
||||
description.appendText("Expected streamARN to be null, but was ")
|
||||
.appendValue(listShardsRequest.getStreamARN());
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
if (!streamARN.equals(Arn.fromString(listShardsRequest.getStreamARN()))) {
|
||||
description.appendText("Expected streamARN: ").appendValue(streamARN)
|
||||
.appendText(" doesn't match actual streamARN: ")
|
||||
.appendValue(listShardsRequest.getStreamARN());
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
if (shardId == null) {
|
||||
if (StringUtils.isNotEmpty(listShardsRequest.getExclusiveStartShardId())) {
|
||||
description.appendText("Expected ExclusiveStartShardId to be null, but was ")
|
||||
|
|
|
|||
Loading…
Reference in a new issue