Adding ListShards calls, and managing DescribeStream for DDB Streams.

This commit is contained in:
Sahil Palvia 2018-01-04 17:37:07 -08:00
parent 85d6c059c2
commit 1704d08935
12 changed files with 570 additions and 112 deletions

View file

@ -6,7 +6,7 @@
<artifactId>amazon-kinesis-client</artifactId> <artifactId>amazon-kinesis-client</artifactId>
<packaging>jar</packaging> <packaging>jar</packaging>
<name>Amazon Kinesis Client Library for Java</name> <name>Amazon Kinesis Client Library for Java</name>
<version>1.8.8</version> <version>1.9.0-SNAPSHOT</version>
<description>The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data <description>The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data
from Amazon Kinesis. from Amazon Kinesis.
</description> </description>
@ -40,7 +40,7 @@
<dependency> <dependency>
<groupId>com.amazonaws</groupId> <groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-kinesis</artifactId> <artifactId>aws-java-sdk-kinesis</artifactId>
<version>${aws-java-sdk.version}</version> <version>1.11.listshards-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.amazonaws</groupId> <groupId>com.amazonaws</groupId>

View file

@ -182,6 +182,16 @@ public class KinesisClientLibConfiguration {
*/ */
public static final int DEFAULT_MAX_LEASE_RENEWAL_THREADS = 20; public static final int DEFAULT_MAX_LEASE_RENEWAL_THREADS = 20;
/**
* The sleep time between two listShards calls from the proxy when throttled.
*/
public static final long DEFAULT_LIST_SHARDS_BACKOFF_TIME_IN_MILLIS = 1500;
/**
* The number of times the Proxy will retry listShards call when throttled.
*/
public static final int DEFAULT_MAX_LIST_SHARDS_RETRY_ATTEMPTS = 50;
private String applicationName; private String applicationName;
private String tableName; private String tableName;
private String streamName; private String streamName;
@ -238,6 +248,12 @@ public class KinesisClientLibConfiguration {
@Getter @Getter
private Optional<Long> logWarningForTaskAfterMillis = Optional.empty(); private Optional<Long> logWarningForTaskAfterMillis = Optional.empty();
@Getter
private long listShardsBackoffTimeInMillis = DEFAULT_LIST_SHARDS_BACKOFF_TIME_IN_MILLIS;
@Getter
private int maxListShardsRetryAttempts = DEFAULT_MAX_LIST_SHARDS_RETRY_ATTEMPTS;
/** /**
* Constructor. * Constructor.
* *
@ -1369,4 +1385,26 @@ public class KinesisClientLibConfiguration {
this.logWarningForTaskAfterMillis = Optional.of(logWarningForTaskAfterMillis); this.logWarningForTaskAfterMillis = Optional.of(logWarningForTaskAfterMillis);
return this; return this;
} }
/**
* @param listShardsBackoffTimeInMillis Max sleep between two listShards call when throttled
* in {@link com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy}.
* @return
*/
public KinesisClientLibConfiguration withListShardsBackoffTimeInMillis(long listShardsBackoffTimeInMillis) {
checkIsValuePositive("listShardsBackoffTimeInMillis", listShardsBackoffTimeInMillis);
this.listShardsBackoffTimeInMillis = listShardsBackoffTimeInMillis;
return this;
}
/**
* @param maxListShardsRetryAttempts Max number of retries for listShards when throttled
* in {@link com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy}.
* @return
*/
public KinesisClientLibConfiguration withMaxListShardsRetryAttempts(int maxListShardsRetryAttempts) {
checkIsValuePositive("maxListShardsRetryAttempts", maxListShardsRetryAttempts);
this.maxListShardsRetryAttempts = maxListShardsRetryAttempts;
return this;
}
} }

View file

@ -248,8 +248,7 @@ public class Worker implements Runnable {
this(config.getApplicationName(), new V1ToV2RecordProcessorFactoryAdapter(recordProcessorFactory), this(config.getApplicationName(), new V1ToV2RecordProcessorFactoryAdapter(recordProcessorFactory),
config, config,
new StreamConfig( new StreamConfig(
new KinesisProxyFactory(config.getKinesisCredentialsProvider(), kinesisClient) new KinesisProxyFactory(config, kinesisClient).getProxy(),
.getProxy(config.getStreamName()),
config.getMaxRecords(), config.getIdleTimeBetweenReadsInMillis(), config.getMaxRecords(), config.getIdleTimeBetweenReadsInMillis(),
config.shouldCallProcessRecordsEvenForEmptyRecordList(), config.shouldCallProcessRecordsEvenForEmptyRecordList(),
config.shouldValidateSequenceNumberBeforeCheckpointing(), config.shouldValidateSequenceNumberBeforeCheckpointing(),
@ -1261,8 +1260,7 @@ public class Worker implements Runnable {
return new Worker(config.getApplicationName(), return new Worker(config.getApplicationName(),
recordProcessorFactory, recordProcessorFactory,
config, config,
new StreamConfig(new KinesisProxyFactory(config.getKinesisCredentialsProvider(), new StreamConfig(new KinesisProxyFactory(config, kinesisClient).getProxy(),
kinesisClient).getProxy(config.getStreamName()),
config.getMaxRecords(), config.getMaxRecords(),
config.getIdleTimeBetweenReadsInMillis(), config.getIdleTimeBetweenReadsInMillis(),
config.shouldCallProcessRecordsEvenForEmptyRecordList(), config.shouldCallProcessRecordsEvenForEmptyRecordList(),

View file

@ -46,12 +46,15 @@ public interface IKinesisProxy {
throws ResourceNotFoundException, InvalidArgumentException, ExpiredIteratorException; throws ResourceNotFoundException, InvalidArgumentException, ExpiredIteratorException;
/** /**
* Fetch information about stream. Useful for fetching the list of shards in a stream. * Fetch information about stream. Useful for fetching the list of shards in a stream. Going forward this method is
* being deprecated. This method uses DescribeStream call, which is throttled at 10 calls per account by default.
* If possible try to use ListShards call available in the client, or use the getShardList to get shard info.
* *
* @param startShardId exclusive start shardId - used when paginating the list of shards. * @param startShardId exclusive start shardId - used when paginating the list of shards.
* @return DescribeStreamOutput object containing a description of the stream. * @return DescribeStreamOutput object containing a description of the stream.
* @throws ResourceNotFoundException The Kinesis stream was not found * @throws ResourceNotFoundException The Kinesis stream was not found
*/ */
@Deprecated
DescribeStreamResult getStreamInfo(String startShardId) throws ResourceNotFoundException; DescribeStreamResult getStreamInfo(String startShardId) throws ResourceNotFoundException;
/** /**

View file

@ -14,6 +14,9 @@
*/ */
package com.amazonaws.services.kinesis.clientlibrary.proxies; package com.amazonaws.services.kinesis.clientlibrary.proxies;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
/** /**
* Interface for a KinesisProxyFactory. * Interface for a KinesisProxyFactory.
* *
@ -25,6 +28,13 @@ public interface IKinesisProxyFactory {
* @param streamName Stream from which data is consumed. * @param streamName Stream from which data is consumed.
* @return IKinesisProxy object. * @return IKinesisProxy object.
*/ */
@Deprecated
IKinesisProxy getProxy(String streamName); IKinesisProxy getProxy(String streamName);
/**
* Return an IKinesisProxy object from the config object.
* @return
*/
IKinesisProxy getProxy();
} }

View file

@ -23,13 +23,16 @@ import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import lombok.Data; import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.kinesis.AmazonKinesis; import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClient; import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.model.DescribeStreamRequest; import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
import com.amazonaws.services.kinesis.model.DescribeStreamResult; import com.amazonaws.services.kinesis.model.DescribeStreamResult;
import com.amazonaws.services.kinesis.model.ExpiredIteratorException; import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
@ -39,13 +42,18 @@ import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
import com.amazonaws.services.kinesis.model.GetShardIteratorResult; import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
import com.amazonaws.services.kinesis.model.InvalidArgumentException; import com.amazonaws.services.kinesis.model.InvalidArgumentException;
import com.amazonaws.services.kinesis.model.LimitExceededException; import com.amazonaws.services.kinesis.model.LimitExceededException;
import com.amazonaws.services.kinesis.model.ListShardsRequest;
import com.amazonaws.services.kinesis.model.ListShardsResult;
import com.amazonaws.services.kinesis.model.PutRecordRequest; import com.amazonaws.services.kinesis.model.PutRecordRequest;
import com.amazonaws.services.kinesis.model.PutRecordResult; import com.amazonaws.services.kinesis.model.PutRecordResult;
import com.amazonaws.services.kinesis.model.ResourceInUseException;
import com.amazonaws.services.kinesis.model.ResourceNotFoundException; import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
import com.amazonaws.services.kinesis.model.Shard; import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.model.ShardIteratorType; import com.amazonaws.services.kinesis.model.ShardIteratorType;
import com.amazonaws.services.kinesis.model.StreamStatus; import com.amazonaws.services.kinesis.model.StreamStatus;
import lombok.Data;
/** /**
* Kinesis proxy - used to make calls to Amazon Kinesis (e.g. fetch data records and list of shards). * Kinesis proxy - used to make calls to Amazon Kinesis (e.g. fetch data records and list of shards).
*/ */
@ -70,22 +78,48 @@ public class KinesisProxy implements IKinesisProxyExtended {
private static final int DEFAULT_DESCRIBE_STREAM_RETRY_TIMES = 50; private static final int DEFAULT_DESCRIBE_STREAM_RETRY_TIMES = 50;
private final long describeStreamBackoffTimeInMillis; private final long describeStreamBackoffTimeInMillis;
private final int maxDescribeStreamRetryAttempts; private final int maxDescribeStreamRetryAttempts;
private final long listShardsBackoffTimeInMillis;
private final int maxListShardsRetryAttempts;
private boolean isKinesisClient = true;
@Deprecated
private static AmazonKinesisClient buildClientSettingEndpoint(AWSCredentialsProvider credentialProvider,
String endpoint,
String serviceName,
String regionId) {
AmazonKinesisClient client = new AmazonKinesisClient(credentialProvider);
client.setEndpoint(endpoint);
client.setSignerRegionOverride(regionId);
return client;
}
/** /**
* Public constructor. * Public constructor.
* <p>
* Note: Deprecating constructor, this constructor doesn't use AWS best practices, moving forward please use
* {@link #KinesisProxy(KinesisClientLibConfiguration, AmazonKinesis)} or
* {@link #KinesisProxy(String, AmazonKinesis, long, int, long, int)} to create the object.
* </p>
* *
* @param streamName Data records will be fetched from this stream * @param streamName Data records will be fetched from this stream
* @param credentialProvider Provides credentials for signing Kinesis requests * @param credentialProvider Provides credentials for signing Kinesis requests
* @param endpoint Kinesis endpoint * @param endpoint Kinesis endpoint
*/ */
@Deprecated
public KinesisProxy(final String streamName, AWSCredentialsProvider credentialProvider, String endpoint) { public KinesisProxy(final String streamName, AWSCredentialsProvider credentialProvider, String endpoint) {
this(streamName, credentialProvider, endpoint, defaultServiceName, defaultRegionId, this(streamName, credentialProvider, endpoint, defaultServiceName, defaultRegionId,
DEFAULT_DESCRIBE_STREAM_BACKOFF_MILLIS, DEFAULT_DESCRIBE_STREAM_RETRY_TIMES); DEFAULT_DESCRIBE_STREAM_BACKOFF_MILLIS, DEFAULT_DESCRIBE_STREAM_RETRY_TIMES,
KinesisClientLibConfiguration.DEFAULT_LIST_SHARDS_BACKOFF_TIME_IN_MILLIS,
KinesisClientLibConfiguration.DEFAULT_MAX_LIST_SHARDS_RETRY_ATTEMPTS);
} }
/** /**
* Public constructor. * Public constructor.
* <p>
* Note: Deprecating constructor, this constructor doesn't use AWS best practices, moving forward please use
* {@link #KinesisProxy(KinesisClientLibConfiguration, AmazonKinesis)} or
* {@link #KinesisProxy(String, AmazonKinesis, long, int, long, int)} to create the object.
* </p>
* *
* @param streamName Data records will be fetched from this stream * @param streamName Data records will be fetched from this stream
* @param credentialProvider Provides credentials for signing Kinesis requests * @param credentialProvider Provides credentials for signing Kinesis requests
@ -95,34 +129,33 @@ public class KinesisProxy implements IKinesisProxyExtended {
* @param describeStreamBackoffTimeInMillis Backoff time for DescribeStream calls in milliseconds * @param describeStreamBackoffTimeInMillis Backoff time for DescribeStream calls in milliseconds
* @param maxDescribeStreamRetryAttempts Number of retry attempts for DescribeStream calls * @param maxDescribeStreamRetryAttempts Number of retry attempts for DescribeStream calls
*/ */
@Deprecated
public KinesisProxy(final String streamName, public KinesisProxy(final String streamName,
AWSCredentialsProvider credentialProvider, AWSCredentialsProvider credentialProvider,
String endpoint, String endpoint,
String serviceName, String serviceName,
String regionId, String regionId,
long describeStreamBackoffTimeInMillis, long describeStreamBackoffTimeInMillis,
int maxDescribeStreamRetryAttempts) { int maxDescribeStreamRetryAttempts,
this(streamName, credentialProvider, buildClientSettingEndpoint(credentialProvider, long listShardsBackoffTimeInMillis,
endpoint, int maxListShardsRetryAttempts) {
serviceName, this(streamName,
regionId), describeStreamBackoffTimeInMillis, maxDescribeStreamRetryAttempts); credentialProvider,
buildClientSettingEndpoint(credentialProvider, endpoint, serviceName, regionId),
describeStreamBackoffTimeInMillis,
maxDescribeStreamRetryAttempts,
listShardsBackoffTimeInMillis,
maxListShardsRetryAttempts);
LOG.debug("KinesisProxy has created a kinesisClient"); LOG.debug("KinesisProxy has created a kinesisClient");
} }
private static AmazonKinesisClient buildClientSettingEndpoint(AWSCredentialsProvider credentialProvider,
String endpoint,
String serviceName,
String regionId) {
AmazonKinesisClient client = new AmazonKinesisClient(credentialProvider);
client.setEndpoint(endpoint);
client.setSignerRegionOverride(regionId);
return client;
}
/** /**
* Public constructor. * Public constructor.
* <p>
* Note: Deprecating constructor, this constructor doesn't use AWS best practices, moving forward please use
* {@link #KinesisProxy(KinesisClientLibConfiguration, AmazonKinesis)} or
* {@link #KinesisProxy(String, AmazonKinesis, long, int, long, int)} to create the object.
* </p>
* *
* @param streamName Data records will be fetched from this stream * @param streamName Data records will be fetched from this stream
* @param credentialProvider Provides credentials for signing Kinesis requests * @param credentialProvider Provides credentials for signing Kinesis requests
@ -130,18 +163,56 @@ public class KinesisProxy implements IKinesisProxyExtended {
* @param describeStreamBackoffTimeInMillis Backoff time for DescribeStream calls in milliseconds * @param describeStreamBackoffTimeInMillis Backoff time for DescribeStream calls in milliseconds
* @param maxDescribeStreamRetryAttempts Number of retry attempts for DescribeStream calls * @param maxDescribeStreamRetryAttempts Number of retry attempts for DescribeStream calls
*/ */
@Deprecated
public KinesisProxy(final String streamName, public KinesisProxy(final String streamName,
AWSCredentialsProvider credentialProvider, AWSCredentialsProvider credentialProvider,
AmazonKinesis kinesisClient, AmazonKinesis kinesisClient,
long describeStreamBackoffTimeInMillis, long describeStreamBackoffTimeInMillis,
int maxDescribeStreamRetryAttempts) { int maxDescribeStreamRetryAttempts,
this.streamName = streamName; long listShardsBackoffTimeInMillis,
int maxListShardsRetryAttempts) {
this(streamName, kinesisClient, describeStreamBackoffTimeInMillis, maxDescribeStreamRetryAttempts,
listShardsBackoffTimeInMillis, maxListShardsRetryAttempts);
this.credentialsProvider = credentialProvider; this.credentialsProvider = credentialProvider;
LOG.debug("KinesisProxy( " + streamName + ")");
}
/**
* Public constructor.
* @param config
*/
public KinesisProxy(final KinesisClientLibConfiguration config, final AmazonKinesis client) {
this(config.getStreamName(),
client,
DEFAULT_DESCRIBE_STREAM_BACKOFF_MILLIS,
DEFAULT_DESCRIBE_STREAM_RETRY_TIMES,
config.getListShardsBackoffTimeInMillis(),
config.getMaxListShardsRetryAttempts());
this.credentialsProvider = config.getKinesisCredentialsProvider();
}
public KinesisProxy(final String streamName,
final AmazonKinesis client,
final long describeStreamBackoffTimeInMillis,
final int maxDescribeStreamRetryAttempts,
final long listShardsBackoffTimeInMillis,
final int maxListShardsRetryAttempts) {
this.streamName = streamName;
this.client = client;
this.describeStreamBackoffTimeInMillis = describeStreamBackoffTimeInMillis; this.describeStreamBackoffTimeInMillis = describeStreamBackoffTimeInMillis;
this.maxDescribeStreamRetryAttempts = maxDescribeStreamRetryAttempts; this.maxDescribeStreamRetryAttempts = maxDescribeStreamRetryAttempts;
this.client = kinesisClient; this.listShardsBackoffTimeInMillis = listShardsBackoffTimeInMillis;
this.maxListShardsRetryAttempts = maxListShardsRetryAttempts;
LOG.debug("KinesisProxy( " + streamName + ")"); try {
if (Class.forName("com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient")
.isAssignableFrom(client.getClass())) {
isKinesisClient = false;
LOG.info("Client is DynamoDb client, will use DescribeStreams.");
}
} catch (ClassNotFoundException e) {
LOG.info("Client is Kinesis Client, using ListShards instead of DescribeStreams.");
}
} }
/** /**
@ -152,6 +223,7 @@ public class KinesisProxy implements IKinesisProxyExtended {
throws ResourceNotFoundException, InvalidArgumentException, ExpiredIteratorException { throws ResourceNotFoundException, InvalidArgumentException, ExpiredIteratorException {
final GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); final GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
// TODO: remove this once older constructors are removed
getRecordsRequest.setRequestCredentials(credentialsProvider.getCredentials()); getRecordsRequest.setRequestCredentials(credentialsProvider.getCredentials());
getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setShardIterator(shardIterator);
getRecordsRequest.setLimit(maxRecords); getRecordsRequest.setLimit(maxRecords);
@ -166,7 +238,9 @@ public class KinesisProxy implements IKinesisProxyExtended {
@Override @Override
public DescribeStreamResult getStreamInfo(String startShardId) public DescribeStreamResult getStreamInfo(String startShardId)
throws ResourceNotFoundException, LimitExceededException { throws ResourceNotFoundException, LimitExceededException {
LOG.info("Using describeStreams calls to get shards list");
final DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); final DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
// TODO: remove this once older constructors are removed
describeStreamRequest.setRequestCredentials(credentialsProvider.getCredentials()); describeStreamRequest.setRequestCredentials(credentialsProvider.getCredentials());
describeStreamRequest.setStreamName(streamName); describeStreamRequest.setStreamName(streamName);
describeStreamRequest.setExclusiveStartShardId(startShardId); describeStreamRequest.setExclusiveStartShardId(startShardId);
@ -208,6 +282,49 @@ public class KinesisProxy implements IKinesisProxyExtended {
} }
} }
private ListShardsResult listShards(final String nextToken) {
LOG.info("Using ListShards to get shards information");
final ListShardsRequest request = new ListShardsRequest();
// TODO: remove this once older constructors are removed
request.setRequestCredentials(credentialsProvider.getCredentials());
if (StringUtils.isEmpty(nextToken)) {
request.setStreamName(streamName);
} else {
request.setNextToken(nextToken);
}
ListShardsResult result = null;
LimitExceededException lastException = null;
int remainingRetries = this.maxListShardsRetryAttempts;
while (result == null) {
try {
result = client.listShards(request);
} catch (LimitExceededException e) {
LOG.info("Got LimitExceededException when listing shards " + streamName + ". Backing off for "
+ this.listShardsBackoffTimeInMillis + " millis.");
try {
Thread.sleep(this.listShardsBackoffTimeInMillis);
} catch (InterruptedException ie) {
LOG.debug("Stream " + streamName + " : Sleep was interrupted ", ie);
}
lastException = e;
} catch (ResourceInUseException e) {
LOG.info("Stream is not in Active/Updating status, returning null (wait until stream is in Active or"
+ " Updating)");
return null;
}
remainingRetries--;
if (remainingRetries <= 0 && result == null) {
if (lastException != null) {
throw lastException;
}
throw new IllegalStateException("Received null from DescribeStream call.");
}
}
return result;
}
/** /**
* {@inheritDoc} * {@inheritDoc}
*/ */
@ -233,27 +350,47 @@ public class KinesisProxy implements IKinesisProxyExtended {
*/ */
@Override @Override
public synchronized List<Shard> getShardList() { public synchronized List<Shard> getShardList() {
DescribeStreamResult response;
if (shardIterationState == null) { if (shardIterationState == null) {
shardIterationState = new ShardIterationState(); shardIterationState = new ShardIterationState();
} }
do { if (isKinesisClient) {
response = getStreamInfo(shardIterationState.getLastShardId()); ListShardsResult result;
String nextToken = null;
if (response == null) { do {
result = listShards(nextToken);
if (result == null) {
/*
* If listShards ever returns null, we should bail and return null. This indicates the stream is not
* in ACTIVE or UPDATING state and we may not have accurate/consistent information about the stream.
*/
return null;
} else {
shardIterationState.update(result.getShards());
nextToken = result.getNextToken();
}
} while (StringUtils.isNotEmpty(result.getNextToken()));
} else {
DescribeStreamResult response;
do {
response = getStreamInfo(shardIterationState.getLastShardId());
if (response == null) {
/* /*
* If getStreamInfo ever returns null, we should bail and return null. This indicates the stream is not * If getStreamInfo ever returns null, we should bail and return null. This indicates the stream is not
* in ACTIVE or UPDATING state and we may not have accurate/consistent information about the stream. * in ACTIVE or UPDATING state and we may not have accurate/consistent information about the stream.
*/ */
return null; return null;
} else { } else {
shardIterationState.update(response.getStreamDescription().getShards()); shardIterationState.update(response.getStreamDescription().getShards());
} }
} while (response.getStreamDescription().isHasMoreShards()); } while (response.getStreamDescription().isHasMoreShards());
}
this.listOfShardsSinceLastGet.set(shardIterationState.getShards()); this.listOfShardsSinceLastGet.set(shardIterationState.getShards());
shardIterationState = new ShardIterationState(); shardIterationState = new ShardIterationState();
return listOfShardsSinceLastGet.get(); return listOfShardsSinceLastGet.get();
} }

View file

@ -18,6 +18,7 @@ import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.kinesis.AmazonKinesis; import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClient; import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
/** /**
* Factory used for instantiating KinesisProxy objects (to fetch data from Kinesis). * Factory used for instantiating KinesisProxy objects (to fetch data from Kinesis).
@ -32,44 +33,72 @@ public class KinesisProxyFactory implements IKinesisProxyFactory {
private final AmazonKinesis kinesisClient; private final AmazonKinesis kinesisClient;
private final long describeStreamBackoffTimeInMillis; private final long describeStreamBackoffTimeInMillis;
private final int maxDescribeStreamRetryAttempts; private final int maxDescribeStreamRetryAttempts;
private final long listShardsBackoffTimeInMillis;
private final int maxListShardsRetryAttempts;
private KinesisClientLibConfiguration configuration;
/** /**
* Constructor for creating a KinesisProxy factory, using the specified credentials provider and endpoint. * Constructor for creating a KinesisProxy factory, using the specified credentials provider and endpoint.
* <p>
* Note: Deprecating, moving forward please use
* {@link #KinesisProxyFactory(AWSCredentialsProvider, AmazonKinesis)}, it uses AWS best practices.
* </p>
* *
* @param credentialProvider credentials provider used to sign requests * @param credentialProvider credentials provider used to sign requests
* @param endpoint Amazon Kinesis endpoint to use * @param endpoint Amazon Kinesis endpoint to use
*/ */
@Deprecated
public KinesisProxyFactory(AWSCredentialsProvider credentialProvider, String endpoint) { public KinesisProxyFactory(AWSCredentialsProvider credentialProvider, String endpoint) {
this(credentialProvider, new ClientConfiguration(), endpoint, defaultServiceName, defaultRegionId, this(credentialProvider, new ClientConfiguration(), endpoint, defaultServiceName, defaultRegionId,
DEFAULT_DESCRIBE_STREAM_BACKOFF_MILLIS, DEFAULT_DESCRIBE_STREAM_RETRY_TIMES); DEFAULT_DESCRIBE_STREAM_BACKOFF_MILLIS, DEFAULT_DESCRIBE_STREAM_RETRY_TIMES,
KinesisClientLibConfiguration.DEFAULT_LIST_SHARDS_BACKOFF_TIME_IN_MILLIS,
KinesisClientLibConfiguration.DEFAULT_MAX_LIST_SHARDS_RETRY_ATTEMPTS);
} }
/** /**
* Constructor for KinesisProxy factory using the client configuration to use when interacting with Kinesis. * Constructor for KinesisProxy factory using the client configuration to use when interacting with Kinesis.
* <p>
* Note: Deprecating, moving forward please use
* {@link #KinesisProxyFactory(AWSCredentialsProvider, AmazonKinesis)}, it uses AWS best practices.
* </p>
* *
* @param credentialProvider credentials provider used to sign requests * @param credentialProvider credentials provider used to sign requests
* @param clientConfig Client Configuration used when instantiating an AmazonKinesisClient * @param clientConfig Client Configuration used when instantiating an AmazonKinesisClient
* @param endpoint Amazon Kinesis endpoint to use * @param endpoint Amazon Kinesis endpoint to use
*/ */
@Deprecated
public KinesisProxyFactory(AWSCredentialsProvider credentialProvider, public KinesisProxyFactory(AWSCredentialsProvider credentialProvider,
ClientConfiguration clientConfig, ClientConfiguration clientConfig,
String endpoint) { String endpoint) {
this(credentialProvider, clientConfig, endpoint, defaultServiceName, defaultRegionId, this(credentialProvider, clientConfig, endpoint, defaultServiceName, defaultRegionId,
DEFAULT_DESCRIBE_STREAM_BACKOFF_MILLIS, DEFAULT_DESCRIBE_STREAM_RETRY_TIMES); DEFAULT_DESCRIBE_STREAM_BACKOFF_MILLIS, DEFAULT_DESCRIBE_STREAM_RETRY_TIMES,
KinesisClientLibConfiguration.DEFAULT_LIST_SHARDS_BACKOFF_TIME_IN_MILLIS,
KinesisClientLibConfiguration.DEFAULT_MAX_LIST_SHARDS_RETRY_ATTEMPTS);
} }
/** /**
* This constructor may be used to specify the AmazonKinesisClient to use. * This constructor may be used to specify the AmazonKinesisClient to use.
* <p>
* Note: Deprecating, moving forward please use
* {@link #KinesisProxyFactory(AWSCredentialsProvider, AmazonKinesis)}, it uses AWS best practices.
* </p>
* *
* @param credentialProvider credentials provider used to sign requests * @param credentialProvider credentials provider used to sign requests
* @param client AmazonKinesisClient used to fetch data from Kinesis * @param client AmazonKinesisClient used to fetch data from Kinesis
*/ */
@Deprecated
public KinesisProxyFactory(AWSCredentialsProvider credentialProvider, AmazonKinesis client) { public KinesisProxyFactory(AWSCredentialsProvider credentialProvider, AmazonKinesis client) {
this(credentialProvider, client, DEFAULT_DESCRIBE_STREAM_BACKOFF_MILLIS, DEFAULT_DESCRIBE_STREAM_RETRY_TIMES); this(credentialProvider, client, DEFAULT_DESCRIBE_STREAM_BACKOFF_MILLIS, DEFAULT_DESCRIBE_STREAM_RETRY_TIMES,
KinesisClientLibConfiguration.DEFAULT_LIST_SHARDS_BACKOFF_TIME_IN_MILLIS,
KinesisClientLibConfiguration.DEFAULT_MAX_LIST_SHARDS_RETRY_ATTEMPTS);
} }
/** /**
* Used internally and for development/testing. * Used internally and for development/testing.
* <p>
* Note: Deprecating, moving forward please use
* {@link #KinesisProxyFactory(AWSCredentialsProvider, AmazonKinesis)}, it uses AWS best practices.
* </p>
* *
* @param credentialProvider credentials provider used to sign requests * @param credentialProvider credentials provider used to sign requests
* @param clientConfig Client Configuration used when instantiating an AmazonKinesisClient * @param clientConfig Client Configuration used when instantiating an AmazonKinesisClient
@ -79,22 +108,41 @@ public class KinesisProxyFactory implements IKinesisProxyFactory {
* @param describeStreamBackoffTimeInMillis backoff time for describing stream in millis * @param describeStreamBackoffTimeInMillis backoff time for describing stream in millis
* @param maxDescribeStreamRetryAttempts Number of retry attempts for DescribeStream calls * @param maxDescribeStreamRetryAttempts Number of retry attempts for DescribeStream calls
*/ */
@Deprecated
KinesisProxyFactory(AWSCredentialsProvider credentialProvider, KinesisProxyFactory(AWSCredentialsProvider credentialProvider,
ClientConfiguration clientConfig, ClientConfiguration clientConfig,
String endpoint, String endpoint,
String serviceName, String serviceName,
String regionId, String regionId,
long describeStreamBackoffTimeInMillis, long describeStreamBackoffTimeInMillis,
int maxDescribeStreamRetryAttempts) { int maxDescribeStreamRetryAttempts,
long listShardsBackoffTimeInMillis,
int maxListShardsRetryAttempts) {
this(credentialProvider, buildClientSettingEndpoint(credentialProvider, this(credentialProvider, buildClientSettingEndpoint(credentialProvider,
clientConfig, clientConfig,
endpoint, endpoint,
serviceName, serviceName,
regionId), regionId),
describeStreamBackoffTimeInMillis, maxDescribeStreamRetryAttempts); describeStreamBackoffTimeInMillis,
maxDescribeStreamRetryAttempts,
listShardsBackoffTimeInMillis,
maxListShardsRetryAttempts);
} }
/**
* Creating KinesisProxyFactory using Config and Client objects, that will be used by the proxy.
*
* @param configuration Config that will be used to create the Proxy
* @param client Client that will be used by the Proxy
*/
public KinesisProxyFactory(final KinesisClientLibConfiguration configuration, final AmazonKinesis client) {
this(configuration.getKinesisCredentialsProvider(), client, DEFAULT_DESCRIBE_STREAM_BACKOFF_MILLIS,
DEFAULT_DESCRIBE_STREAM_RETRY_TIMES, configuration.getListShardsBackoffTimeInMillis(),
configuration.getMaxListShardsRetryAttempts());
this.configuration = configuration;
}
/** /**
* Used internally in the class (and for development/testing). * Used internally in the class (and for development/testing).
* *
@ -106,12 +154,16 @@ public class KinesisProxyFactory implements IKinesisProxyFactory {
KinesisProxyFactory(AWSCredentialsProvider credentialProvider, KinesisProxyFactory(AWSCredentialsProvider credentialProvider,
AmazonKinesis client, AmazonKinesis client,
long describeStreamBackoffTimeInMillis, long describeStreamBackoffTimeInMillis,
int maxDescribeStreamRetryAttempts) { int maxDescribeStreamRetryAttempts,
long listShardsBackoffTimeInMillis,
int maxListShardsRetryAttempts) {
super(); super();
this.kinesisClient = client; this.kinesisClient = client;
this.credentialProvider = credentialProvider; this.credentialProvider = credentialProvider;
this.describeStreamBackoffTimeInMillis = describeStreamBackoffTimeInMillis; this.describeStreamBackoffTimeInMillis = describeStreamBackoffTimeInMillis;
this.maxDescribeStreamRetryAttempts = maxDescribeStreamRetryAttempts; this.maxDescribeStreamRetryAttempts = maxDescribeStreamRetryAttempts;
this.listShardsBackoffTimeInMillis = listShardsBackoffTimeInMillis;
this.maxListShardsRetryAttempts = maxListShardsRetryAttempts;
} }
/** /**
@ -123,15 +175,28 @@ public class KinesisProxyFactory implements IKinesisProxyFactory {
credentialProvider, credentialProvider,
kinesisClient, kinesisClient,
describeStreamBackoffTimeInMillis, describeStreamBackoffTimeInMillis,
maxDescribeStreamRetryAttempts); maxDescribeStreamRetryAttempts,
listShardsBackoffTimeInMillis,
maxListShardsRetryAttempts);
}
/**
* {@inheritDoc}
*/
@Override
public IKinesisProxy getProxy() {
if (configuration == null) {
throw new IllegalArgumentException("KinesisClientLibConfiguration not set, please make sure to use"
+ " KinesisProxyFactory(KinesisClientLibConfiguration, AmazonKinesis) constructor.");
}
return new KinesisProxy(configuration, kinesisClient);
} }
private static AmazonKinesisClient buildClientSettingEndpoint(AWSCredentialsProvider credentialProvider, private static AmazonKinesisClient buildClientSettingEndpoint(AWSCredentialsProvider credentialProvider,
ClientConfiguration clientConfig, ClientConfiguration clientConfig,
String endpoint, String endpoint,
String serviceName, String serviceName,
String regionId) { String regionId) {
AmazonKinesisClient client = new AmazonKinesisClient(credentialProvider, clientConfig); AmazonKinesisClient client = new AmazonKinesisClient(credentialProvider, clientConfig);
client.setEndpoint(endpoint); client.setEndpoint(endpoint);
client.setSignerRegionOverride(regionId); client.setSignerRegionOverride(regionId);

View file

@ -0,0 +1,26 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.dynamodbv2.streamsadapter;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
/**
*
*/
public class AmazonDynamoDBStreamsAdapterClient extends AmazonKinesisClient {
// This class is used for testing purposes.
}

View file

@ -0,0 +1,23 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.dynamodbv2.streamsadapter;
/**
*
*/
public class AmazonDynamoDBStreamsAdapterClientChild extends AmazonDynamoDBStreamsAdapterClient {
// Used only for testing
}

View file

@ -18,6 +18,7 @@ import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
@ -90,10 +91,13 @@ public class ShardSyncTaskIntegrationTest {
new KinesisClientLeaseManager("ShardSyncTaskIntegrationTest", new KinesisClientLeaseManager("ShardSyncTaskIntegrationTest",
new AmazonDynamoDBClient(credentialsProvider), new AmazonDynamoDBClient(credentialsProvider),
useConsistentReads); useConsistentReads);
kinesisProxy =
new KinesisProxy(STREAM_NAME, AmazonKinesisClient client = new AmazonDynamoDBStreamsAdapterClient();
new DefaultAWSCredentialsProviderChain(), client.setEndpoint(KINESIS_ENDPOINT);
KINESIS_ENDPOINT); client.setSignerRegionOverride("us-east-1");
kinesisProxy = new KinesisProxy(STREAM_NAME, new DefaultAWSCredentialsProviderChain(), client, 1000L, 50, 1000L,
50);
} }
/** /**
@ -106,7 +110,6 @@ public class ShardSyncTaskIntegrationTest {
/** /**
* Test method for call(). * Test method for call().
* *
* @throws CapacityExceededException
* @throws DependencyException * @throws DependencyException
* @throws InvalidStateException * @throws InvalidStateException
* @throws ProvisionedThroughputException * @throws ProvisionedThroughputException

View file

@ -61,4 +61,8 @@ public class KinesisLocalFileProxyFactory implements IKinesisProxyFactory {
return testKinesisProxy; return testKinesisProxy;
} }
@Override
public IKinesisProxy getProxy() {
return testKinesisProxy;
}
} }

View file

@ -19,9 +19,12 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasProperty; import static org.hamcrest.Matchers.hasProperty;
import static org.hamcrest.Matchers.isA; import static org.hamcrest.Matchers.isA;
import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
@ -29,13 +32,22 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors;
import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient;
import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClientChild;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.model.ListShardsRequest;
import com.amazonaws.services.kinesis.model.ListShardsResult;
import com.amazonaws.services.kinesis.model.ResourceInUseException;
import lombok.AllArgsConstructor;
import org.apache.commons.lang.StringUtils;
import org.hamcrest.Description; import org.hamcrest.Description;
import org.hamcrest.TypeSafeDiagnosingMatcher; import org.hamcrest.TypeSafeDiagnosingMatcher;
import org.junit.Before; import org.junit.Before;
@ -47,7 +59,6 @@ import org.mockito.runners.MockitoJUnitRunner;
import com.amazonaws.AmazonServiceException; import com.amazonaws.AmazonServiceException;
import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.model.DescribeStreamRequest; import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
import com.amazonaws.services.kinesis.model.DescribeStreamResult; import com.amazonaws.services.kinesis.model.DescribeStreamResult;
import com.amazonaws.services.kinesis.model.GetShardIteratorRequest; import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
@ -61,11 +72,18 @@ import com.amazonaws.services.kinesis.model.StreamStatus;
@RunWith(MockitoJUnitRunner.class) @RunWith(MockitoJUnitRunner.class)
public class KinesisProxyTest { public class KinesisProxyTest {
private static final String TEST_STRING = "TestString"; private static final String TEST_STRING = "TestString";
private static final long BACKOFF_TIME = 10L; private static final long DESCRIBE_STREAM_BACKOFF_TIME = 10L;
private static final int RETRY_TIMES = 3; private static final long LIST_SHARDS_BACKOFF_TIME = 10L;
private static final int DESCRIBE_STREAM_RETRY_TIMES = 3;
private static final int LIST_SHARDS_RETRY_TIMES = 3;
private static final String NEXT_TOKEN = "NextToken";
@Mock @Mock
private AmazonKinesisClient mockClient; private AmazonKinesis mockClient;
@Mock
private AmazonDynamoDBStreamsAdapterClient mockDDBStreamClient;
@Mock
private AmazonDynamoDBStreamsAdapterClientChild mockDDBChildClient;
@Mock @Mock
private AWSCredentialsProvider mockCredentialsProvider; private AWSCredentialsProvider mockCredentialsProvider;
@Mock @Mock
@ -76,8 +94,12 @@ public class KinesisProxyTest {
private StreamDescription streamDescription; private StreamDescription streamDescription;
@Mock @Mock
private Shard shard; private Shard shard;
@Mock
private KinesisClientLibConfiguration config;
private KinesisProxy proxy; private KinesisProxy proxy;
private KinesisProxy ddbProxy;
private KinesisProxy ddbChildProxy;
// Test shards for verifying. // Test shards for verifying.
private Set<String> shardIdSet; private Set<String> shardIdSet;
@ -85,19 +107,24 @@ public class KinesisProxyTest {
@Before @Before
public void setUpTest() { public void setUpTest() {
// Set up kinesis proxy // Set up kinesis ddbProxy
proxy = new KinesisProxy(TEST_STRING, mockCredentialsProvider, mockClient, BACKOFF_TIME, RETRY_TIMES); when(config.getStreamName()).thenReturn(TEST_STRING);
when(mockCredentialsProvider.getCredentials()).thenReturn(null); when(config.getListShardsBackoffTimeInMillis()).thenReturn(LIST_SHARDS_BACKOFF_TIME);
when(config.getMaxListShardsRetryAttempts()).thenReturn(LIST_SHARDS_RETRY_TIMES);
when(config.getKinesisCredentialsProvider()).thenReturn(mockCredentialsProvider);
proxy = new KinesisProxy(config, mockClient);
ddbProxy = new KinesisProxy(TEST_STRING, mockCredentialsProvider, mockDDBStreamClient,
DESCRIBE_STREAM_BACKOFF_TIME, DESCRIBE_STREAM_RETRY_TIMES, LIST_SHARDS_BACKOFF_TIME,
LIST_SHARDS_RETRY_TIMES);
ddbChildProxy = new KinesisProxy(TEST_STRING, mockCredentialsProvider, mockDDBChildClient,
DESCRIBE_STREAM_BACKOFF_TIME, DESCRIBE_STREAM_RETRY_TIMES, LIST_SHARDS_BACKOFF_TIME,
LIST_SHARDS_RETRY_TIMES);
// Set up test shards // Set up test shards
shardIdSet = new HashSet<>(); List<String> shardIds = Arrays.asList("shard-1", "shard-2", "shard-3", "shard-4");
shards = new ArrayList<>(); shardIdSet = new HashSet<>(shardIds);
String[] shardIds = new String[] { "shard-1", "shard-2", "shard-3", "shard-4" }; shards = shardIds.stream().map(shardId -> new Shard().withShardId(shardId)).collect(Collectors.toList());
for (String shardId : shardIds) {
Shard shard = new Shard();
shard.setShardId(shardId);
shards.add(shard);
shardIdSet.add(shardId);
}
} }
@Test @Test
@ -107,11 +134,11 @@ public class KinesisProxyTest {
// Second call describeStream returning response with rest shards. // Second call describeStream returning response with rest shards.
DescribeStreamResult responseWithMoreData = createGetStreamInfoResponse(shards.subList(0, 2), true); DescribeStreamResult responseWithMoreData = createGetStreamInfoResponse(shards.subList(0, 2), true);
DescribeStreamResult responseFinal = createGetStreamInfoResponse(shards.subList(2, shards.size()), false); DescribeStreamResult responseFinal = createGetStreamInfoResponse(shards.subList(2, shards.size()), false);
doReturn(responseWithMoreData).when(mockClient).describeStream(argThat(new IsRequestWithStartShardId(null))); doReturn(responseWithMoreData).when(mockDDBStreamClient).describeStream(argThat(new IsRequestWithStartShardId(null)));
doReturn(responseFinal).when(mockClient) doReturn(responseFinal).when(mockDDBStreamClient)
.describeStream(argThat(new OldIsRequestWithStartShardId(shards.get(1).getShardId()))); .describeStream(argThat(new OldIsRequestWithStartShardId(shards.get(1).getShardId())));
Set<String> resultShardIdSets = proxy.getAllShardIds(); Set<String> resultShardIdSets = ddbProxy.getAllShardIds();
assertThat("Result set should equal to Test set", shardIdSet, equalTo(resultShardIdSets)); assertThat("Result set should equal to Test set", shardIdSet, equalTo(resultShardIdSets));
} }
@ -121,53 +148,56 @@ public class KinesisProxyTest {
// First call describeStream throwing LimitExceededException; // First call describeStream throwing LimitExceededException;
// Second call describeStream returning shards list. // Second call describeStream returning shards list.
DescribeStreamResult response = createGetStreamInfoResponse(shards, false); DescribeStreamResult response = createGetStreamInfoResponse(shards, false);
doThrow(new LimitExceededException("Test Exception")).doReturn(response).when(mockClient) doThrow(new LimitExceededException("Test Exception")).doReturn(response).when(mockDDBStreamClient)
.describeStream(argThat(new OldIsRequestWithStartShardId(null))); .describeStream(argThat(new OldIsRequestWithStartShardId(null)));
Set<String> resultShardIdSet = proxy.getAllShardIds(); Set<String> resultShardIdSet = ddbProxy.getAllShardIds();
assertThat("Result set should equal to Test set", shardIdSet, equalTo(resultShardIdSet)); assertThat("Result set should equal to Test set", shardIdSet, equalTo(resultShardIdSet));
} }
@Test @Test
public void testValidShardIteratorType() { public void testValidShardIteratorType() {
when(mockClient.getShardIterator(any(GetShardIteratorRequest.class))).thenReturn(shardIteratorResult); when(mockDDBStreamClient.getShardIterator(any(GetShardIteratorRequest.class))).thenReturn(shardIteratorResult);
String expectedShardIteratorType = ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(); String expectedShardIteratorType = ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString();
proxy.getIterator("Shard-001", expectedShardIteratorType, "1234"); ddbProxy.getIterator("Shard-001", expectedShardIteratorType, "1234");
verify(mockClient).getShardIterator(argThat(both(isA(GetShardIteratorRequest.class)) verify(mockDDBStreamClient).getShardIterator(argThat(both(isA(GetShardIteratorRequest.class))
.and(hasProperty("shardIteratorType", equalTo(expectedShardIteratorType))))); .and(hasProperty("shardIteratorType", equalTo(expectedShardIteratorType)))));
} }
@Test @Test
public void testInvalidShardIteratorIsntChanged() { public void testInvalidShardIteratorIsntChanged() {
when(mockClient.getShardIterator(any(GetShardIteratorRequest.class))).thenReturn(shardIteratorResult); when(mockDDBStreamClient.getShardIterator(any(GetShardIteratorRequest.class))).thenReturn(shardIteratorResult);
String expectedShardIteratorType = ShardIteratorType.AT_TIMESTAMP.toString(); String expectedShardIteratorType = ShardIteratorType.AT_TIMESTAMP.toString();
proxy.getIterator("Shard-001", expectedShardIteratorType, "1234"); ddbProxy.getIterator("Shard-001", expectedShardIteratorType, "1234");
verify(mockClient).getShardIterator(argThat(both(isA(GetShardIteratorRequest.class)) verify(mockDDBStreamClient).getShardIterator(argThat(both(isA(GetShardIteratorRequest.class))
.and(hasProperty("shardIteratorType", equalTo(expectedShardIteratorType))))); .and(hasProperty("shardIteratorType", equalTo(expectedShardIteratorType)))));
} }
@Test(expected = AmazonServiceException.class) @Test(expected = AmazonServiceException.class)
public void testNullShardIteratorType() { public void testNullShardIteratorType() throws Exception {
when(mockClient.getShardIterator(any(GetShardIteratorRequest.class))).thenThrow(new AmazonServiceException("expected null")); when(mockDDBStreamClient.getShardIterator(any(GetShardIteratorRequest.class))).thenThrow(new AmazonServiceException("expected null"));
String expectedShardIteratorType = null; String expectedShardIteratorType = null;
proxy.getIterator("Shard-001", expectedShardIteratorType, "1234"); ddbProxy.getIterator("Shard-001", expectedShardIteratorType, "1234");
verify(mockClient).getShardIterator(argThat(both(isA(GetShardIteratorRequest.class)) verify(mockDDBStreamClient).getShardIterator(argThat(both(isA(GetShardIteratorRequest.class))
.and(hasProperty("shardIteratorType", nullValue(String.class))))); .and(hasProperty("shardIteratorType", nullValue(String.class)))));
} }
@Test(expected = AmazonServiceException.class) @Test(expected = AmazonServiceException.class)
public void testGetStreamInfoFails() throws Exception { public void testGetStreamInfoFails() {
when(mockClient.describeStream(any(DescribeStreamRequest.class))).thenThrow(new AmazonServiceException("Test")); when(mockDDBStreamClient.describeStream(any(DescribeStreamRequest.class))).thenThrow(new AmazonServiceException("Test"));
proxy.getShardList(); try {
verify(mockClient).describeStream(any(DescribeStreamRequest.class)); ddbProxy.getShardList();
} finally {
verify(mockDDBStreamClient).describeStream(any(DescribeStreamRequest.class));
}
} }
@Test @Test
public void testGetStreamInfoThrottledOnce() throws Exception { public void testGetStreamInfoThrottledOnce() throws Exception {
when(mockClient.describeStream(any(DescribeStreamRequest.class))).thenThrow(new LimitExceededException("Test")) when(mockDDBStreamClient.describeStream(any(DescribeStreamRequest.class))).thenThrow(new LimitExceededException("Test"))
.thenReturn(describeStreamResult); .thenReturn(describeStreamResult);
when(describeStreamResult.getStreamDescription()).thenReturn(streamDescription); when(describeStreamResult.getStreamDescription()).thenReturn(streamDescription);
when(streamDescription.getHasMoreShards()).thenReturn(false); when(streamDescription.getHasMoreShards()).thenReturn(false);
@ -175,11 +205,11 @@ public class KinesisProxyTest {
List<Shard> expectedShards = Collections.singletonList(shard); List<Shard> expectedShards = Collections.singletonList(shard);
when(streamDescription.getShards()).thenReturn(expectedShards); when(streamDescription.getShards()).thenReturn(expectedShards);
List<Shard> actualShards = proxy.getShardList(); List<Shard> actualShards = ddbProxy.getShardList();
assertThat(actualShards, equalTo(expectedShards)); assertThat(actualShards, equalTo(expectedShards));
verify(mockClient, times(2)).describeStream(any(DescribeStreamRequest.class)); verify(mockDDBStreamClient, times(2)).describeStream(any(DescribeStreamRequest.class));
verify(describeStreamResult, times(3)).getStreamDescription(); verify(describeStreamResult, times(3)).getStreamDescription();
verify(streamDescription).getStreamStatus(); verify(streamDescription).getStreamStatus();
verify(streamDescription).isHasMoreShards(); verify(streamDescription).isHasMoreShards();
@ -187,9 +217,9 @@ public class KinesisProxyTest {
@Test(expected = LimitExceededException.class) @Test(expected = LimitExceededException.class)
public void testGetStreamInfoThrottledAll() throws Exception { public void testGetStreamInfoThrottledAll() throws Exception {
when(mockClient.describeStream(any(DescribeStreamRequest.class))).thenThrow(new LimitExceededException("Test")); when(mockDDBStreamClient.describeStream(any(DescribeStreamRequest.class))).thenThrow(new LimitExceededException("Test"));
proxy.getShardList(); ddbProxy.getShardList();
} }
@Test @Test
@ -213,33 +243,100 @@ public class KinesisProxyTest {
when(streamDescription.getShards()).thenReturn(shardList1).thenReturn(shardList2).thenReturn(shardList3); when(streamDescription.getShards()).thenReturn(shardList1).thenReturn(shardList2).thenReturn(shardList3);
when(streamDescription.isHasMoreShards()).thenReturn(true, true, false); when(streamDescription.isHasMoreShards()).thenReturn(true, true, false);
when(mockClient.describeStream(argThat(describeWithoutShardId()))).thenReturn(describeStreamResult); when(mockDDBStreamClient.describeStream(argThat(describeWithoutShardId()))).thenReturn(describeStreamResult);
when(mockClient.describeStream(argThat(describeWithShardId(shardId1)))) when(mockDDBStreamClient.describeStream(argThat(describeWithShardId(shardId1))))
.thenThrow(new LimitExceededException("1"), new LimitExceededException("2"), .thenThrow(new LimitExceededException("1"), new LimitExceededException("2"),
new LimitExceededException("3")) new LimitExceededException("3"))
.thenReturn(describeStreamResult); .thenReturn(describeStreamResult);
when(mockClient.describeStream(argThat(describeWithShardId(shardId2)))).thenReturn(describeStreamResult); when(mockDDBStreamClient.describeStream(argThat(describeWithShardId(shardId2)))).thenReturn(describeStreamResult);
boolean limitExceeded = false; boolean limitExceeded = false;
try { try {
proxy.getShardList(); ddbProxy.getShardList();
} catch (LimitExceededException le) { } catch (LimitExceededException le) {
limitExceeded = true; limitExceeded = true;
} }
assertThat(limitExceeded, equalTo(true)); assertThat(limitExceeded, equalTo(true));
List<Shard> actualShards = proxy.getShardList(); List<Shard> actualShards = ddbProxy.getShardList();
List<Shard> expectedShards = Arrays.asList(shard1, shard2, shard3); List<Shard> expectedShards = Arrays.asList(shard1, shard2, shard3);
assertThat(actualShards, equalTo(expectedShards)); assertThat(actualShards, equalTo(expectedShards));
verify(mockClient).describeStream(argThat(describeWithoutShardId())); verify(mockDDBStreamClient).describeStream(argThat(describeWithoutShardId()));
verify(mockClient, times(4)).describeStream(argThat(describeWithShardId(shardId1))); verify(mockDDBStreamClient, times(4)).describeStream(argThat(describeWithShardId(shardId1)));
verify(mockClient).describeStream(argThat(describeWithShardId(shardId2))); verify(mockDDBStreamClient).describeStream(argThat(describeWithShardId(shardId2)));
} }
@Test
public void testListShardsWithMoreDataAvailable() {
ListShardsResult responseWithMoreData = new ListShardsResult().withShards(shards.subList(0, 2)).withNextToken(NEXT_TOKEN);
ListShardsResult responseFinal = new ListShardsResult().withShards(shards.subList(2, shards.size())).withNextToken(null);
when(mockClient.listShards(argThat(initialListShardsRequestMatcher()))).thenReturn(responseWithMoreData);
when(mockClient.listShards(argThat(listShardsNextToken(NEXT_TOKEN)))).thenReturn(responseFinal);
Set<String> resultShardIdSets = proxy.getAllShardIds();
assertEquals(shardIdSet, resultShardIdSets);
}
@Test
public void testListShardsWithLimiteExceededException() {
ListShardsResult result = new ListShardsResult().withShards(shards);
when(mockClient.listShards(argThat(initialListShardsRequestMatcher()))).thenThrow(LimitExceededException.class).thenReturn(result);
Set <String> resultShardIdSet = proxy.getAllShardIds();
assertEquals(shardIdSet, resultShardIdSet);
}
@Test(expected = AmazonServiceException.class)
public void testListShardsFails() {
when(mockClient.listShards(any(ListShardsRequest.class))).thenThrow(AmazonServiceException.class);
try {
proxy.getShardList();
} finally {
verify(mockClient).listShards(any(ListShardsRequest.class));
}
}
@Test
public void testListShardsThrottledOnce() {
List<Shard> expected = Collections.singletonList(shard);
ListShardsResult result = new ListShardsResult().withShards(expected);
when(mockClient.listShards(argThat(initialListShardsRequestMatcher()))).thenThrow(LimitExceededException.class)
.thenReturn(result);
List<Shard> actualShards = proxy.getShardList();
assertEquals(expected, actualShards);
verify(mockClient, times(2)).listShards(argThat(initialListShardsRequestMatcher()));
}
@Test(expected = LimitExceededException.class)
public void testListShardsThrottledAll() {
when(mockClient.listShards(argThat(initialListShardsRequestMatcher()))).thenThrow(LimitExceededException.class);
proxy.getShardList();
}
@Test
public void testStreamNotInCorrectStatus() {
when(mockClient.listShards(argThat(initialListShardsRequestMatcher()))).thenThrow(ResourceInUseException.class);
assertNull(proxy.getShardList());
}
@Test
public void testGetShardListWithDDBChildClient() {
DescribeStreamResult responseWithMoreData = createGetStreamInfoResponse(shards.subList(0, 2), true);
DescribeStreamResult responseFinal = createGetStreamInfoResponse(shards.subList(2, shards.size()), false);
doReturn(responseWithMoreData).when(mockDDBChildClient).describeStream(argThat(new IsRequestWithStartShardId(null)));
doReturn(responseFinal).when(mockDDBChildClient)
.describeStream(argThat(new OldIsRequestWithStartShardId(shards.get(1).getShardId())));
Set<String> resultShardIdSets = ddbChildProxy.getAllShardIds();
assertThat("Result set should equal to Test set", shardIdSet, equalTo(resultShardIdSets));
}
private DescribeStreamResult createGetStreamInfoResponse(List<Shard> shards1, boolean isHasMoreShards) { private DescribeStreamResult createGetStreamInfoResponse(List<Shard> shards1, boolean isHasMoreShards) {
// Create stream description // Create stream description
StreamDescription description = new StreamDescription(); StreamDescription description = new StreamDescription();
@ -261,8 +358,8 @@ public class KinesisProxyTest {
return new IsRequestWithStartShardId(shardId); return new IsRequestWithStartShardId(shardId);
} }
// Matcher for testing describe stream request with specific start shard ID.
private static class IsRequestWithStartShardId extends TypeSafeDiagnosingMatcher<DescribeStreamRequest> { private static class IsRequestWithStartShardId extends TypeSafeDiagnosingMatcher<DescribeStreamRequest> {
private final String shardId; private final String shardId;
public IsRequestWithStartShardId(String shardId) { public IsRequestWithStartShardId(String shardId) {
@ -291,6 +388,7 @@ public class KinesisProxyTest {
description.appendText("A DescribeStreamRequest with a starting shard if of ").appendValue(shardId); description.appendText("A DescribeStreamRequest with a starting shard if of ").appendValue(shardId);
} }
} }
// Matcher for testing describe stream request with specific start shard ID.
private static class OldIsRequestWithStartShardId extends ArgumentMatcher<DescribeStreamRequest> { private static class OldIsRequestWithStartShardId extends ArgumentMatcher<DescribeStreamRequest> {
private final String shardId; private final String shardId;
@ -310,4 +408,57 @@ public class KinesisProxyTest {
} }
} }
private static ListShardsRequestMatcher initialListShardsRequestMatcher() {
return new ListShardsRequestMatcher(null, null);
}
private static ListShardsRequestMatcher listShardsNextToken(final String nextToken) {
return new ListShardsRequestMatcher(null, nextToken);
}
@AllArgsConstructor
private static class ListShardsRequestMatcher extends TypeSafeDiagnosingMatcher<ListShardsRequest> {
private final String shardId;
private final String nextToken;
@Override
protected boolean matchesSafely(final ListShardsRequest listShardsRequest, final Description description) {
if (shardId == null) {
if (StringUtils.isNotEmpty(listShardsRequest.getExclusiveStartShardId())) {
description.appendText("Expected ExclusiveStartShardId to be null, but was ")
.appendValue(listShardsRequest.getExclusiveStartShardId());
return false;
}
} else {
if (!shardId.equals(listShardsRequest.getExclusiveStartShardId())) {
description.appendText("Expected shardId: ").appendValue(shardId)
.appendText(" doesn't match actual shardId: ")
.appendValue(listShardsRequest.getExclusiveStartShardId());
return false;
}
}
if (StringUtils.isNotEmpty(listShardsRequest.getNextToken())) {
if (StringUtils.isNotEmpty(listShardsRequest.getStreamName()) || StringUtils.isNotEmpty(listShardsRequest.getExclusiveStartShardId())) {
return false;
}
if (!listShardsRequest.getNextToken().equals(nextToken)) {
description.appendText("Found nextToken: ").appendValue(listShardsRequest.getNextToken())
.appendText(" when it was supposed to be null.");
return false;
}
} else {
return nextToken == null;
}
return true;
}
@Override
public void describeTo(final Description description) {
description.appendText("A ListShardsRequest with a shardId: ").appendValue(shardId)
.appendText(" and empty nextToken");
}
}
} }