logWarningForTaskAfterMillis = Optional.empty();
+
+ @Getter
+ private long listShardsBackoffTimeInMillis = DEFAULT_LIST_SHARDS_BACKOFF_TIME_IN_MILLIS;
+
+ @Getter
+ private int maxListShardsRetryAttempts = DEFAULT_MAX_LIST_SHARDS_RETRY_ATTEMPTS;
/**
* Constructor.
@@ -1369,4 +1385,26 @@ public class KinesisClientLibConfiguration {
this.logWarningForTaskAfterMillis = Optional.of(logWarningForTaskAfterMillis);
return this;
}
+
+ /**
+ * @param listShardsBackoffTimeInMillis Max sleep between two listShards call when throttled
+ * in {@link com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy}.
+ * @return
+ */
+ public KinesisClientLibConfiguration withListShardsBackoffTimeInMillis(long listShardsBackoffTimeInMillis) {
+ checkIsValuePositive("listShardsBackoffTimeInMillis", listShardsBackoffTimeInMillis);
+ this.listShardsBackoffTimeInMillis = listShardsBackoffTimeInMillis;
+ return this;
+ }
+
+ /**
+ * @param maxListShardsRetryAttempts Max number of retries for listShards when throttled
+ * in {@link com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy}.
+ * @return
+ */
+ public KinesisClientLibConfiguration withMaxListShardsRetryAttempts(int maxListShardsRetryAttempts) {
+ checkIsValuePositive("maxListShardsRetryAttempts", maxListShardsRetryAttempts);
+ this.maxListShardsRetryAttempts = maxListShardsRetryAttempts;
+ return this;
+ }
}
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java
index d2ea738d..04165b15 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java
@@ -248,8 +248,7 @@ public class Worker implements Runnable {
this(config.getApplicationName(), new V1ToV2RecordProcessorFactoryAdapter(recordProcessorFactory),
config,
new StreamConfig(
- new KinesisProxyFactory(config.getKinesisCredentialsProvider(), kinesisClient)
- .getProxy(config.getStreamName()),
+ new KinesisProxyFactory(config, kinesisClient).getProxy(),
config.getMaxRecords(), config.getIdleTimeBetweenReadsInMillis(),
config.shouldCallProcessRecordsEvenForEmptyRecordList(),
config.shouldValidateSequenceNumberBeforeCheckpointing(),
@@ -1261,8 +1260,7 @@ public class Worker implements Runnable {
return new Worker(config.getApplicationName(),
recordProcessorFactory,
config,
- new StreamConfig(new KinesisProxyFactory(config.getKinesisCredentialsProvider(),
- kinesisClient).getProxy(config.getStreamName()),
+ new StreamConfig(new KinesisProxyFactory(config, kinesisClient).getProxy(),
config.getMaxRecords(),
config.getIdleTimeBetweenReadsInMillis(),
config.shouldCallProcessRecordsEvenForEmptyRecordList(),
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/IKinesisProxy.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/IKinesisProxy.java
index df7f951d..2e5a8c0a 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/IKinesisProxy.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/IKinesisProxy.java
@@ -46,12 +46,15 @@ public interface IKinesisProxy {
throws ResourceNotFoundException, InvalidArgumentException, ExpiredIteratorException;
/**
- * Fetch information about stream. Useful for fetching the list of shards in a stream.
+ * Fetch information about stream. Useful for fetching the list of shards in a stream. Going forward this method is
+ * being deprecated. This method uses DescribeStream call, which is throttled at 10 calls per account by default.
+ * If possible try to use ListShards call available in the client, or use the getShardList to get shard info.
*
* @param startShardId exclusive start shardId - used when paginating the list of shards.
* @return DescribeStreamOutput object containing a description of the stream.
* @throws ResourceNotFoundException The Kinesis stream was not found
*/
+ @Deprecated
DescribeStreamResult getStreamInfo(String startShardId) throws ResourceNotFoundException;
/**
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/IKinesisProxyFactory.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/IKinesisProxyFactory.java
index 0467b8e4..ef9f0182 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/IKinesisProxyFactory.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/IKinesisProxyFactory.java
@@ -14,6 +14,9 @@
*/
package com.amazonaws.services.kinesis.clientlibrary.proxies;
+import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
+
/**
* Interface for a KinesisProxyFactory.
*
@@ -25,6 +28,13 @@ public interface IKinesisProxyFactory {
* @param streamName Stream from which data is consumed.
* @return IKinesisProxy object.
*/
+ @Deprecated
IKinesisProxy getProxy(String streamName);
+ /**
+ * Return an IKinesisProxy object from the config object.
+ * @return
+ */
+ IKinesisProxy getProxy();
+
}
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java
index fd45c764..ba2b289f 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java
@@ -23,13 +23,16 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
-import lombok.Data;
+import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
import com.amazonaws.services.kinesis.model.DescribeStreamResult;
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
@@ -39,13 +42,18 @@ import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
import com.amazonaws.services.kinesis.model.InvalidArgumentException;
import com.amazonaws.services.kinesis.model.LimitExceededException;
+import com.amazonaws.services.kinesis.model.ListShardsRequest;
+import com.amazonaws.services.kinesis.model.ListShardsResult;
import com.amazonaws.services.kinesis.model.PutRecordRequest;
import com.amazonaws.services.kinesis.model.PutRecordResult;
+import com.amazonaws.services.kinesis.model.ResourceInUseException;
import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
import com.amazonaws.services.kinesis.model.StreamStatus;
+import lombok.Data;
+
/**
* Kinesis proxy - used to make calls to Amazon Kinesis (e.g. fetch data records and list of shards).
*/
@@ -70,22 +78,48 @@ public class KinesisProxy implements IKinesisProxyExtended {
private static final int DEFAULT_DESCRIBE_STREAM_RETRY_TIMES = 50;
private final long describeStreamBackoffTimeInMillis;
private final int maxDescribeStreamRetryAttempts;
+ private final long listShardsBackoffTimeInMillis;
+ private final int maxListShardsRetryAttempts;
+ private boolean isKinesisClient = true;
+
+ @Deprecated
+ private static AmazonKinesisClient buildClientSettingEndpoint(AWSCredentialsProvider credentialProvider,
+ String endpoint,
+ String serviceName,
+ String regionId) {
+ AmazonKinesisClient client = new AmazonKinesisClient(credentialProvider);
+ client.setEndpoint(endpoint);
+ client.setSignerRegionOverride(regionId);
+ return client;
+ }
/**
* Public constructor.
+ *
+ * Note: Deprecating constructor, this constructor doesn't use AWS best practices, moving forward please use
+ * {@link #KinesisProxy(KinesisClientLibConfiguration, AmazonKinesis)} or
+ * {@link #KinesisProxy(String, AmazonKinesis, long, int, long, int)} to create the object.
+ *
*
* @param streamName Data records will be fetched from this stream
* @param credentialProvider Provides credentials for signing Kinesis requests
* @param endpoint Kinesis endpoint
*/
-
+ @Deprecated
public KinesisProxy(final String streamName, AWSCredentialsProvider credentialProvider, String endpoint) {
this(streamName, credentialProvider, endpoint, defaultServiceName, defaultRegionId,
- DEFAULT_DESCRIBE_STREAM_BACKOFF_MILLIS, DEFAULT_DESCRIBE_STREAM_RETRY_TIMES);
+ DEFAULT_DESCRIBE_STREAM_BACKOFF_MILLIS, DEFAULT_DESCRIBE_STREAM_RETRY_TIMES,
+ KinesisClientLibConfiguration.DEFAULT_LIST_SHARDS_BACKOFF_TIME_IN_MILLIS,
+ KinesisClientLibConfiguration.DEFAULT_MAX_LIST_SHARDS_RETRY_ATTEMPTS);
}
/**
* Public constructor.
+ *
+ * Note: Deprecating constructor, this constructor doesn't use AWS best practices, moving forward please use
+ * {@link #KinesisProxy(KinesisClientLibConfiguration, AmazonKinesis)} or
+ * {@link #KinesisProxy(String, AmazonKinesis, long, int, long, int)} to create the object.
+ *
*
* @param streamName Data records will be fetched from this stream
* @param credentialProvider Provides credentials for signing Kinesis requests
@@ -95,34 +129,33 @@ public class KinesisProxy implements IKinesisProxyExtended {
* @param describeStreamBackoffTimeInMillis Backoff time for DescribeStream calls in milliseconds
* @param maxDescribeStreamRetryAttempts Number of retry attempts for DescribeStream calls
*/
+ @Deprecated
public KinesisProxy(final String streamName,
AWSCredentialsProvider credentialProvider,
String endpoint,
String serviceName,
String regionId,
long describeStreamBackoffTimeInMillis,
- int maxDescribeStreamRetryAttempts) {
- this(streamName, credentialProvider, buildClientSettingEndpoint(credentialProvider,
- endpoint,
- serviceName,
- regionId), describeStreamBackoffTimeInMillis, maxDescribeStreamRetryAttempts);
-
-
+ int maxDescribeStreamRetryAttempts,
+ long listShardsBackoffTimeInMillis,
+ int maxListShardsRetryAttempts) {
+ this(streamName,
+ credentialProvider,
+ buildClientSettingEndpoint(credentialProvider, endpoint, serviceName, regionId),
+ describeStreamBackoffTimeInMillis,
+ maxDescribeStreamRetryAttempts,
+ listShardsBackoffTimeInMillis,
+ maxListShardsRetryAttempts);
LOG.debug("KinesisProxy has created a kinesisClient");
}
-
- private static AmazonKinesisClient buildClientSettingEndpoint(AWSCredentialsProvider credentialProvider,
- String endpoint,
- String serviceName,
- String regionId) {
- AmazonKinesisClient client = new AmazonKinesisClient(credentialProvider);
- client.setEndpoint(endpoint);
- client.setSignerRegionOverride(regionId);
- return client;
- }
/**
* Public constructor.
+ *
+ * Note: Deprecating constructor, this constructor doesn't use AWS best practices, moving forward please use
+ * {@link #KinesisProxy(KinesisClientLibConfiguration, AmazonKinesis)} or
+ * {@link #KinesisProxy(String, AmazonKinesis, long, int, long, int)} to create the object.
+ *
*
* @param streamName Data records will be fetched from this stream
* @param credentialProvider Provides credentials for signing Kinesis requests
@@ -130,18 +163,56 @@ public class KinesisProxy implements IKinesisProxyExtended {
* @param describeStreamBackoffTimeInMillis Backoff time for DescribeStream calls in milliseconds
* @param maxDescribeStreamRetryAttempts Number of retry attempts for DescribeStream calls
*/
+ @Deprecated
public KinesisProxy(final String streamName,
AWSCredentialsProvider credentialProvider,
AmazonKinesis kinesisClient,
long describeStreamBackoffTimeInMillis,
- int maxDescribeStreamRetryAttempts) {
- this.streamName = streamName;
+ int maxDescribeStreamRetryAttempts,
+ long listShardsBackoffTimeInMillis,
+ int maxListShardsRetryAttempts) {
+ this(streamName, kinesisClient, describeStreamBackoffTimeInMillis, maxDescribeStreamRetryAttempts,
+ listShardsBackoffTimeInMillis, maxListShardsRetryAttempts);
this.credentialsProvider = credentialProvider;
+ LOG.debug("KinesisProxy( " + streamName + ")");
+ }
+
+ /**
+ * Public constructor.
+ * @param config
+ */
+ public KinesisProxy(final KinesisClientLibConfiguration config, final AmazonKinesis client) {
+ this(config.getStreamName(),
+ client,
+ DEFAULT_DESCRIBE_STREAM_BACKOFF_MILLIS,
+ DEFAULT_DESCRIBE_STREAM_RETRY_TIMES,
+ config.getListShardsBackoffTimeInMillis(),
+ config.getMaxListShardsRetryAttempts());
+ this.credentialsProvider = config.getKinesisCredentialsProvider();
+ }
+
+ public KinesisProxy(final String streamName,
+ final AmazonKinesis client,
+ final long describeStreamBackoffTimeInMillis,
+ final int maxDescribeStreamRetryAttempts,
+ final long listShardsBackoffTimeInMillis,
+ final int maxListShardsRetryAttempts) {
+ this.streamName = streamName;
+ this.client = client;
this.describeStreamBackoffTimeInMillis = describeStreamBackoffTimeInMillis;
this.maxDescribeStreamRetryAttempts = maxDescribeStreamRetryAttempts;
- this.client = kinesisClient;
+ this.listShardsBackoffTimeInMillis = listShardsBackoffTimeInMillis;
+ this.maxListShardsRetryAttempts = maxListShardsRetryAttempts;
- LOG.debug("KinesisProxy( " + streamName + ")");
+ try {
+ if (Class.forName("com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient")
+ .isAssignableFrom(client.getClass())) {
+ isKinesisClient = false;
+ LOG.info("Client is DynamoDb client, will use DescribeStreams.");
+ }
+ } catch (ClassNotFoundException e) {
+ LOG.info("Client is Kinesis Client, using ListShards instead of DescribeStreams.");
+ }
}
/**
@@ -152,6 +223,7 @@ public class KinesisProxy implements IKinesisProxyExtended {
throws ResourceNotFoundException, InvalidArgumentException, ExpiredIteratorException {
final GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
+ // TODO: remove this once older constructors are removed
getRecordsRequest.setRequestCredentials(credentialsProvider.getCredentials());
getRecordsRequest.setShardIterator(shardIterator);
getRecordsRequest.setLimit(maxRecords);
@@ -166,7 +238,9 @@ public class KinesisProxy implements IKinesisProxyExtended {
@Override
public DescribeStreamResult getStreamInfo(String startShardId)
throws ResourceNotFoundException, LimitExceededException {
+ LOG.info("Using describeStreams calls to get shards list");
final DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
+ // TODO: remove this once older constructors are removed
describeStreamRequest.setRequestCredentials(credentialsProvider.getCredentials());
describeStreamRequest.setStreamName(streamName);
describeStreamRequest.setExclusiveStartShardId(startShardId);
@@ -207,6 +281,49 @@ public class KinesisProxy implements IKinesisProxyExtended {
return null;
}
}
+
+ private ListShardsResult listShards(final String nextToken) {
+ LOG.info("Using ListShards to get shards information");
+ final ListShardsRequest request = new ListShardsRequest();
+ // TODO: remove this once older constructors are removed
+ request.setRequestCredentials(credentialsProvider.getCredentials());
+ if (StringUtils.isEmpty(nextToken)) {
+ request.setStreamName(streamName);
+ } else {
+ request.setNextToken(nextToken);
+ }
+ ListShardsResult result = null;
+ LimitExceededException lastException = null;
+ int remainingRetries = this.maxListShardsRetryAttempts;
+
+ while (result == null) {
+ try {
+ result = client.listShards(request);
+ } catch (LimitExceededException e) {
+ LOG.info("Got LimitExceededException when listing shards " + streamName + ". Backing off for "
+ + this.listShardsBackoffTimeInMillis + " millis.");
+ try {
+ Thread.sleep(this.listShardsBackoffTimeInMillis);
+ } catch (InterruptedException ie) {
+ LOG.debug("Stream " + streamName + " : Sleep was interrupted ", ie);
+ }
+ lastException = e;
+ } catch (ResourceInUseException e) {
+ LOG.info("Stream is not in Active/Updating status, returning null (wait until stream is in Active or"
+ + " Updating)");
+ return null;
+ }
+ remainingRetries--;
+ if (remainingRetries <= 0 && result == null) {
+ if (lastException != null) {
+ throw lastException;
+ }
+ throw new IllegalStateException("Received null from DescribeStream call.");
+ }
+ }
+
+ return result;
+ }
/**
* {@inheritDoc}
@@ -233,27 +350,47 @@ public class KinesisProxy implements IKinesisProxyExtended {
*/
@Override
public synchronized List getShardList() {
-
- DescribeStreamResult response;
if (shardIterationState == null) {
shardIterationState = new ShardIterationState();
}
+
+ if (isKinesisClient) {
+ ListShardsResult result;
+ String nextToken = null;
+
+ do {
+ result = listShards(nextToken);
+
+ if (result == null) {
+ /*
+ * If listShards ever returns null, we should bail and return null. This indicates the stream is not
+ * in ACTIVE or UPDATING state and we may not have accurate/consistent information about the stream.
+ */
+ return null;
+ } else {
+ shardIterationState.update(result.getShards());
+ nextToken = result.getNextToken();
+ }
+ } while (StringUtils.isNotEmpty(result.getNextToken()));
+
+ } else {
+ DescribeStreamResult response;
- do {
- response = getStreamInfo(shardIterationState.getLastShardId());
+ do {
+ response = getStreamInfo(shardIterationState.getLastShardId());
- if (response == null) {
+ if (response == null) {
/*
* If getStreamInfo ever returns null, we should bail and return null. This indicates the stream is not
* in ACTIVE or UPDATING state and we may not have accurate/consistent information about the stream.
*/
- return null;
- } else {
- shardIterationState.update(response.getStreamDescription().getShards());
- }
- } while (response.getStreamDescription().isHasMoreShards());
+ return null;
+ } else {
+ shardIterationState.update(response.getStreamDescription().getShards());
+ }
+ } while (response.getStreamDescription().isHasMoreShards());
+ }
this.listOfShardsSinceLastGet.set(shardIterationState.getShards());
-
shardIterationState = new ShardIterationState();
return listOfShardsSinceLastGet.get();
}
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxyFactory.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxyFactory.java
index 93df67e0..cc78f324 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxyFactory.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxyFactory.java
@@ -18,6 +18,7 @@ import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
/**
* Factory used for instantiating KinesisProxy objects (to fetch data from Kinesis).
@@ -32,44 +33,72 @@ public class KinesisProxyFactory implements IKinesisProxyFactory {
private final AmazonKinesis kinesisClient;
private final long describeStreamBackoffTimeInMillis;
private final int maxDescribeStreamRetryAttempts;
+ private final long listShardsBackoffTimeInMillis;
+ private final int maxListShardsRetryAttempts;
+ private KinesisClientLibConfiguration configuration;
/**
* Constructor for creating a KinesisProxy factory, using the specified credentials provider and endpoint.
+ *
+ * Note: Deprecating, moving forward please use
+ * {@link #KinesisProxyFactory(AWSCredentialsProvider, AmazonKinesis)}, it uses AWS best practices.
+ *
*
* @param credentialProvider credentials provider used to sign requests
* @param endpoint Amazon Kinesis endpoint to use
*/
+ @Deprecated
public KinesisProxyFactory(AWSCredentialsProvider credentialProvider, String endpoint) {
this(credentialProvider, new ClientConfiguration(), endpoint, defaultServiceName, defaultRegionId,
- DEFAULT_DESCRIBE_STREAM_BACKOFF_MILLIS, DEFAULT_DESCRIBE_STREAM_RETRY_TIMES);
+ DEFAULT_DESCRIBE_STREAM_BACKOFF_MILLIS, DEFAULT_DESCRIBE_STREAM_RETRY_TIMES,
+ KinesisClientLibConfiguration.DEFAULT_LIST_SHARDS_BACKOFF_TIME_IN_MILLIS,
+ KinesisClientLibConfiguration.DEFAULT_MAX_LIST_SHARDS_RETRY_ATTEMPTS);
}
/**
* Constructor for KinesisProxy factory using the client configuration to use when interacting with Kinesis.
- *
+ *
+ * Note: Deprecating, moving forward please use
+ * {@link #KinesisProxyFactory(AWSCredentialsProvider, AmazonKinesis)}, it uses AWS best practices.
+ *
+ *
* @param credentialProvider credentials provider used to sign requests
* @param clientConfig Client Configuration used when instantiating an AmazonKinesisClient
* @param endpoint Amazon Kinesis endpoint to use
*/
+ @Deprecated
public KinesisProxyFactory(AWSCredentialsProvider credentialProvider,
ClientConfiguration clientConfig,
String endpoint) {
this(credentialProvider, clientConfig, endpoint, defaultServiceName, defaultRegionId,
- DEFAULT_DESCRIBE_STREAM_BACKOFF_MILLIS, DEFAULT_DESCRIBE_STREAM_RETRY_TIMES);
+ DEFAULT_DESCRIBE_STREAM_BACKOFF_MILLIS, DEFAULT_DESCRIBE_STREAM_RETRY_TIMES,
+ KinesisClientLibConfiguration.DEFAULT_LIST_SHARDS_BACKOFF_TIME_IN_MILLIS,
+ KinesisClientLibConfiguration.DEFAULT_MAX_LIST_SHARDS_RETRY_ATTEMPTS);
}
/**
* This constructor may be used to specify the AmazonKinesisClient to use.
+ *
+ * Note: Deprecating, moving forward please use
+ * {@link #KinesisProxyFactory(AWSCredentialsProvider, AmazonKinesis)}, it uses AWS best practices.
+ *
*
* @param credentialProvider credentials provider used to sign requests
* @param client AmazonKinesisClient used to fetch data from Kinesis
*/
+ @Deprecated
public KinesisProxyFactory(AWSCredentialsProvider credentialProvider, AmazonKinesis client) {
- this(credentialProvider, client, DEFAULT_DESCRIBE_STREAM_BACKOFF_MILLIS, DEFAULT_DESCRIBE_STREAM_RETRY_TIMES);
+ this(credentialProvider, client, DEFAULT_DESCRIBE_STREAM_BACKOFF_MILLIS, DEFAULT_DESCRIBE_STREAM_RETRY_TIMES,
+ KinesisClientLibConfiguration.DEFAULT_LIST_SHARDS_BACKOFF_TIME_IN_MILLIS,
+ KinesisClientLibConfiguration.DEFAULT_MAX_LIST_SHARDS_RETRY_ATTEMPTS);
}
/**
* Used internally and for development/testing.
+ *
+ * Note: Deprecating, moving forward please use
+ * {@link #KinesisProxyFactory(AWSCredentialsProvider, AmazonKinesis)}, it uses AWS best practices.
+ *
*
* @param credentialProvider credentials provider used to sign requests
* @param clientConfig Client Configuration used when instantiating an AmazonKinesisClient
@@ -79,22 +108,41 @@ public class KinesisProxyFactory implements IKinesisProxyFactory {
* @param describeStreamBackoffTimeInMillis backoff time for describing stream in millis
* @param maxDescribeStreamRetryAttempts Number of retry attempts for DescribeStream calls
*/
+ @Deprecated
KinesisProxyFactory(AWSCredentialsProvider credentialProvider,
ClientConfiguration clientConfig,
String endpoint,
String serviceName,
String regionId,
long describeStreamBackoffTimeInMillis,
- int maxDescribeStreamRetryAttempts) {
+ int maxDescribeStreamRetryAttempts,
+ long listShardsBackoffTimeInMillis,
+ int maxListShardsRetryAttempts) {
this(credentialProvider, buildClientSettingEndpoint(credentialProvider,
clientConfig,
endpoint,
serviceName,
regionId),
- describeStreamBackoffTimeInMillis, maxDescribeStreamRetryAttempts);
+ describeStreamBackoffTimeInMillis,
+ maxDescribeStreamRetryAttempts,
+ listShardsBackoffTimeInMillis,
+ maxListShardsRetryAttempts);
}
+ /**
+ * Creating KinesisProxyFactory using Config and Client objects, that will be used by the proxy.
+ *
+ * @param configuration Config that will be used to create the Proxy
+ * @param client Client that will be used by the Proxy
+ */
+ public KinesisProxyFactory(final KinesisClientLibConfiguration configuration, final AmazonKinesis client) {
+ this(configuration.getKinesisCredentialsProvider(), client, DEFAULT_DESCRIBE_STREAM_BACKOFF_MILLIS,
+ DEFAULT_DESCRIBE_STREAM_RETRY_TIMES, configuration.getListShardsBackoffTimeInMillis(),
+ configuration.getMaxListShardsRetryAttempts());
+ this.configuration = configuration;
+ }
+
/**
* Used internally in the class (and for development/testing).
*
@@ -106,14 +154,18 @@ public class KinesisProxyFactory implements IKinesisProxyFactory {
KinesisProxyFactory(AWSCredentialsProvider credentialProvider,
AmazonKinesis client,
long describeStreamBackoffTimeInMillis,
- int maxDescribeStreamRetryAttempts) {
+ int maxDescribeStreamRetryAttempts,
+ long listShardsBackoffTimeInMillis,
+ int maxListShardsRetryAttempts) {
super();
this.kinesisClient = client;
this.credentialProvider = credentialProvider;
this.describeStreamBackoffTimeInMillis = describeStreamBackoffTimeInMillis;
this.maxDescribeStreamRetryAttempts = maxDescribeStreamRetryAttempts;
+ this.listShardsBackoffTimeInMillis = listShardsBackoffTimeInMillis;
+ this.maxListShardsRetryAttempts = maxListShardsRetryAttempts;
}
-
+
/**
* {@inheritDoc}
*/
@@ -123,15 +175,28 @@ public class KinesisProxyFactory implements IKinesisProxyFactory {
credentialProvider,
kinesisClient,
describeStreamBackoffTimeInMillis,
- maxDescribeStreamRetryAttempts);
-
+ maxDescribeStreamRetryAttempts,
+ listShardsBackoffTimeInMillis,
+ maxListShardsRetryAttempts);
}
-
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public IKinesisProxy getProxy() {
+ if (configuration == null) {
+ throw new IllegalArgumentException("KinesisClientLibConfiguration not set, please make sure to use"
+ + " KinesisProxyFactory(KinesisClientLibConfiguration, AmazonKinesis) constructor.");
+ }
+ return new KinesisProxy(configuration, kinesisClient);
+ }
+
private static AmazonKinesisClient buildClientSettingEndpoint(AWSCredentialsProvider credentialProvider,
- ClientConfiguration clientConfig,
- String endpoint,
- String serviceName,
- String regionId) {
+ ClientConfiguration clientConfig,
+ String endpoint,
+ String serviceName,
+ String regionId) {
AmazonKinesisClient client = new AmazonKinesisClient(credentialProvider, clientConfig);
client.setEndpoint(endpoint);
client.setSignerRegionOverride(regionId);
diff --git a/src/test/java/com/amazonaws/services/dynamodbv2/streamsadapter/AmazonDynamoDBStreamsAdapterClient.java b/src/test/java/com/amazonaws/services/dynamodbv2/streamsadapter/AmazonDynamoDBStreamsAdapterClient.java
new file mode 100644
index 00000000..c7d2db2c
--- /dev/null
+++ b/src/test/java/com/amazonaws/services/dynamodbv2/streamsadapter/AmazonDynamoDBStreamsAdapterClient.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Amazon Software License (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/asl/
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package com.amazonaws.services.dynamodbv2.streamsadapter;
+
+import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+
+/**
+ *
+ */
+public class AmazonDynamoDBStreamsAdapterClient extends AmazonKinesisClient {
+ // This class is used for testing purposes.
+}
diff --git a/src/test/java/com/amazonaws/services/dynamodbv2/streamsadapter/AmazonDynamoDBStreamsAdapterClientChild.java b/src/test/java/com/amazonaws/services/dynamodbv2/streamsadapter/AmazonDynamoDBStreamsAdapterClientChild.java
new file mode 100644
index 00000000..00a053c4
--- /dev/null
+++ b/src/test/java/com/amazonaws/services/dynamodbv2/streamsadapter/AmazonDynamoDBStreamsAdapterClientChild.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Amazon Software License (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/asl/
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package com.amazonaws.services.dynamodbv2.streamsadapter;
+
+/**
+ *
+ */
+public class AmazonDynamoDBStreamsAdapterClientChild extends AmazonDynamoDBStreamsAdapterClient {
+ // Used only for testing
+}
diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java
index 307596e3..7adef894 100644
--- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java
+++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java
@@ -18,6 +18,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -90,10 +91,13 @@ public class ShardSyncTaskIntegrationTest {
new KinesisClientLeaseManager("ShardSyncTaskIntegrationTest",
new AmazonDynamoDBClient(credentialsProvider),
useConsistentReads);
- kinesisProxy =
- new KinesisProxy(STREAM_NAME,
- new DefaultAWSCredentialsProviderChain(),
- KINESIS_ENDPOINT);
+
+ AmazonKinesisClient client = new AmazonDynamoDBStreamsAdapterClient();
+ client.setEndpoint(KINESIS_ENDPOINT);
+ client.setSignerRegionOverride("us-east-1");
+
+ kinesisProxy = new KinesisProxy(STREAM_NAME, new DefaultAWSCredentialsProviderChain(), client, 1000L, 50, 1000L,
+ 50);
}
/**
@@ -106,7 +110,6 @@ public class ShardSyncTaskIntegrationTest {
/**
* Test method for call().
*
- * @throws CapacityExceededException
* @throws DependencyException
* @throws InvalidStateException
* @throws ProvisionedThroughputException
diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxyFactory.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxyFactory.java
index 8a053ec4..ece54d68 100644
--- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxyFactory.java
+++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxyFactory.java
@@ -61,4 +61,8 @@ public class KinesisLocalFileProxyFactory implements IKinesisProxyFactory {
return testKinesisProxy;
}
+ @Override
+ public IKinesisProxy getProxy() {
+ return testKinesisProxy;
+ }
}
diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxyTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxyTest.java
index 2b7aa0b7..e9cd5458 100644
--- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxyTest.java
+++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxyTest.java
@@ -19,9 +19,12 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasProperty;
import static org.hamcrest.Matchers.isA;
import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
@@ -29,13 +32,22 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.stream.Collectors;
+import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient;
+import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClientChild;
+import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
+import com.amazonaws.services.kinesis.model.ListShardsRequest;
+import com.amazonaws.services.kinesis.model.ListShardsResult;
+import com.amazonaws.services.kinesis.model.ResourceInUseException;
+import lombok.AllArgsConstructor;
+import org.apache.commons.lang.StringUtils;
import org.hamcrest.Description;
import org.hamcrest.TypeSafeDiagnosingMatcher;
import org.junit.Before;
@@ -47,7 +59,6 @@ import org.mockito.runners.MockitoJUnitRunner;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.auth.AWSCredentialsProvider;
-import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
import com.amazonaws.services.kinesis.model.DescribeStreamResult;
import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
@@ -61,11 +72,18 @@ import com.amazonaws.services.kinesis.model.StreamStatus;
@RunWith(MockitoJUnitRunner.class)
public class KinesisProxyTest {
private static final String TEST_STRING = "TestString";
- private static final long BACKOFF_TIME = 10L;
- private static final int RETRY_TIMES = 3;
+ private static final long DESCRIBE_STREAM_BACKOFF_TIME = 10L;
+ private static final long LIST_SHARDS_BACKOFF_TIME = 10L;
+ private static final int DESCRIBE_STREAM_RETRY_TIMES = 3;
+ private static final int LIST_SHARDS_RETRY_TIMES = 3;
+ private static final String NEXT_TOKEN = "NextToken";
@Mock
- private AmazonKinesisClient mockClient;
+ private AmazonKinesis mockClient;
+ @Mock
+ private AmazonDynamoDBStreamsAdapterClient mockDDBStreamClient;
+ @Mock
+ private AmazonDynamoDBStreamsAdapterClientChild mockDDBChildClient;
@Mock
private AWSCredentialsProvider mockCredentialsProvider;
@Mock
@@ -76,8 +94,12 @@ public class KinesisProxyTest {
private StreamDescription streamDescription;
@Mock
private Shard shard;
+ @Mock
+ private KinesisClientLibConfiguration config;
private KinesisProxy proxy;
+ private KinesisProxy ddbProxy;
+ private KinesisProxy ddbChildProxy;
// Test shards for verifying.
private Set shardIdSet;
@@ -85,19 +107,24 @@ public class KinesisProxyTest {
@Before
public void setUpTest() {
- // Set up kinesis proxy
- proxy = new KinesisProxy(TEST_STRING, mockCredentialsProvider, mockClient, BACKOFF_TIME, RETRY_TIMES);
- when(mockCredentialsProvider.getCredentials()).thenReturn(null);
+ // Set up kinesis ddbProxy
+ when(config.getStreamName()).thenReturn(TEST_STRING);
+ when(config.getListShardsBackoffTimeInMillis()).thenReturn(LIST_SHARDS_BACKOFF_TIME);
+ when(config.getMaxListShardsRetryAttempts()).thenReturn(LIST_SHARDS_RETRY_TIMES);
+ when(config.getKinesisCredentialsProvider()).thenReturn(mockCredentialsProvider);
+
+ proxy = new KinesisProxy(config, mockClient);
+ ddbProxy = new KinesisProxy(TEST_STRING, mockCredentialsProvider, mockDDBStreamClient,
+ DESCRIBE_STREAM_BACKOFF_TIME, DESCRIBE_STREAM_RETRY_TIMES, LIST_SHARDS_BACKOFF_TIME,
+ LIST_SHARDS_RETRY_TIMES);
+ ddbChildProxy = new KinesisProxy(TEST_STRING, mockCredentialsProvider, mockDDBChildClient,
+ DESCRIBE_STREAM_BACKOFF_TIME, DESCRIBE_STREAM_RETRY_TIMES, LIST_SHARDS_BACKOFF_TIME,
+ LIST_SHARDS_RETRY_TIMES);
+
// Set up test shards
- shardIdSet = new HashSet<>();
- shards = new ArrayList<>();
- String[] shardIds = new String[] { "shard-1", "shard-2", "shard-3", "shard-4" };
- for (String shardId : shardIds) {
- Shard shard = new Shard();
- shard.setShardId(shardId);
- shards.add(shard);
- shardIdSet.add(shardId);
- }
+ List shardIds = Arrays.asList("shard-1", "shard-2", "shard-3", "shard-4");
+ shardIdSet = new HashSet<>(shardIds);
+ shards = shardIds.stream().map(shardId -> new Shard().withShardId(shardId)).collect(Collectors.toList());
}
@Test
@@ -107,11 +134,11 @@ public class KinesisProxyTest {
// Second call describeStream returning response with rest shards.
DescribeStreamResult responseWithMoreData = createGetStreamInfoResponse(shards.subList(0, 2), true);
DescribeStreamResult responseFinal = createGetStreamInfoResponse(shards.subList(2, shards.size()), false);
- doReturn(responseWithMoreData).when(mockClient).describeStream(argThat(new IsRequestWithStartShardId(null)));
- doReturn(responseFinal).when(mockClient)
+ doReturn(responseWithMoreData).when(mockDDBStreamClient).describeStream(argThat(new IsRequestWithStartShardId(null)));
+ doReturn(responseFinal).when(mockDDBStreamClient)
.describeStream(argThat(new OldIsRequestWithStartShardId(shards.get(1).getShardId())));
- Set resultShardIdSets = proxy.getAllShardIds();
+ Set resultShardIdSets = ddbProxy.getAllShardIds();
assertThat("Result set should equal to Test set", shardIdSet, equalTo(resultShardIdSets));
}
@@ -121,53 +148,56 @@ public class KinesisProxyTest {
// First call describeStream throwing LimitExceededException;
// Second call describeStream returning shards list.
DescribeStreamResult response = createGetStreamInfoResponse(shards, false);
- doThrow(new LimitExceededException("Test Exception")).doReturn(response).when(mockClient)
+ doThrow(new LimitExceededException("Test Exception")).doReturn(response).when(mockDDBStreamClient)
.describeStream(argThat(new OldIsRequestWithStartShardId(null)));
- Set resultShardIdSet = proxy.getAllShardIds();
+ Set resultShardIdSet = ddbProxy.getAllShardIds();
assertThat("Result set should equal to Test set", shardIdSet, equalTo(resultShardIdSet));
}
@Test
public void testValidShardIteratorType() {
- when(mockClient.getShardIterator(any(GetShardIteratorRequest.class))).thenReturn(shardIteratorResult);
+ when(mockDDBStreamClient.getShardIterator(any(GetShardIteratorRequest.class))).thenReturn(shardIteratorResult);
String expectedShardIteratorType = ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString();
- proxy.getIterator("Shard-001", expectedShardIteratorType, "1234");
+ ddbProxy.getIterator("Shard-001", expectedShardIteratorType, "1234");
- verify(mockClient).getShardIterator(argThat(both(isA(GetShardIteratorRequest.class))
+ verify(mockDDBStreamClient).getShardIterator(argThat(both(isA(GetShardIteratorRequest.class))
.and(hasProperty("shardIteratorType", equalTo(expectedShardIteratorType)))));
}
@Test
public void testInvalidShardIteratorIsntChanged() {
- when(mockClient.getShardIterator(any(GetShardIteratorRequest.class))).thenReturn(shardIteratorResult);
+ when(mockDDBStreamClient.getShardIterator(any(GetShardIteratorRequest.class))).thenReturn(shardIteratorResult);
String expectedShardIteratorType = ShardIteratorType.AT_TIMESTAMP.toString();
- proxy.getIterator("Shard-001", expectedShardIteratorType, "1234");
+ ddbProxy.getIterator("Shard-001", expectedShardIteratorType, "1234");
- verify(mockClient).getShardIterator(argThat(both(isA(GetShardIteratorRequest.class))
+ verify(mockDDBStreamClient).getShardIterator(argThat(both(isA(GetShardIteratorRequest.class))
.and(hasProperty("shardIteratorType", equalTo(expectedShardIteratorType)))));
}
@Test(expected = AmazonServiceException.class)
- public void testNullShardIteratorType() {
- when(mockClient.getShardIterator(any(GetShardIteratorRequest.class))).thenThrow(new AmazonServiceException("expected null"));
+ public void testNullShardIteratorType() throws Exception {
+ when(mockDDBStreamClient.getShardIterator(any(GetShardIteratorRequest.class))).thenThrow(new AmazonServiceException("expected null"));
String expectedShardIteratorType = null;
- proxy.getIterator("Shard-001", expectedShardIteratorType, "1234");
+ ddbProxy.getIterator("Shard-001", expectedShardIteratorType, "1234");
- verify(mockClient).getShardIterator(argThat(both(isA(GetShardIteratorRequest.class))
+ verify(mockDDBStreamClient).getShardIterator(argThat(both(isA(GetShardIteratorRequest.class))
.and(hasProperty("shardIteratorType", nullValue(String.class)))));
}
@Test(expected = AmazonServiceException.class)
- public void testGetStreamInfoFails() throws Exception {
- when(mockClient.describeStream(any(DescribeStreamRequest.class))).thenThrow(new AmazonServiceException("Test"));
- proxy.getShardList();
- verify(mockClient).describeStream(any(DescribeStreamRequest.class));
+ public void testGetStreamInfoFails() {
+ when(mockDDBStreamClient.describeStream(any(DescribeStreamRequest.class))).thenThrow(new AmazonServiceException("Test"));
+ try {
+ ddbProxy.getShardList();
+ } finally {
+ verify(mockDDBStreamClient).describeStream(any(DescribeStreamRequest.class));
+ }
}
@Test
public void testGetStreamInfoThrottledOnce() throws Exception {
- when(mockClient.describeStream(any(DescribeStreamRequest.class))).thenThrow(new LimitExceededException("Test"))
+ when(mockDDBStreamClient.describeStream(any(DescribeStreamRequest.class))).thenThrow(new LimitExceededException("Test"))
.thenReturn(describeStreamResult);
when(describeStreamResult.getStreamDescription()).thenReturn(streamDescription);
when(streamDescription.getHasMoreShards()).thenReturn(false);
@@ -175,11 +205,11 @@ public class KinesisProxyTest {
List expectedShards = Collections.singletonList(shard);
when(streamDescription.getShards()).thenReturn(expectedShards);
- List actualShards = proxy.getShardList();
+ List actualShards = ddbProxy.getShardList();
assertThat(actualShards, equalTo(expectedShards));
- verify(mockClient, times(2)).describeStream(any(DescribeStreamRequest.class));
+ verify(mockDDBStreamClient, times(2)).describeStream(any(DescribeStreamRequest.class));
verify(describeStreamResult, times(3)).getStreamDescription();
verify(streamDescription).getStreamStatus();
verify(streamDescription).isHasMoreShards();
@@ -187,9 +217,9 @@ public class KinesisProxyTest {
@Test(expected = LimitExceededException.class)
public void testGetStreamInfoThrottledAll() throws Exception {
- when(mockClient.describeStream(any(DescribeStreamRequest.class))).thenThrow(new LimitExceededException("Test"));
+ when(mockDDBStreamClient.describeStream(any(DescribeStreamRequest.class))).thenThrow(new LimitExceededException("Test"));
- proxy.getShardList();
+ ddbProxy.getShardList();
}
@Test
@@ -213,32 +243,99 @@ public class KinesisProxyTest {
when(streamDescription.getShards()).thenReturn(shardList1).thenReturn(shardList2).thenReturn(shardList3);
when(streamDescription.isHasMoreShards()).thenReturn(true, true, false);
- when(mockClient.describeStream(argThat(describeWithoutShardId()))).thenReturn(describeStreamResult);
+ when(mockDDBStreamClient.describeStream(argThat(describeWithoutShardId()))).thenReturn(describeStreamResult);
- when(mockClient.describeStream(argThat(describeWithShardId(shardId1))))
+ when(mockDDBStreamClient.describeStream(argThat(describeWithShardId(shardId1))))
.thenThrow(new LimitExceededException("1"), new LimitExceededException("2"),
new LimitExceededException("3"))
.thenReturn(describeStreamResult);
- when(mockClient.describeStream(argThat(describeWithShardId(shardId2)))).thenReturn(describeStreamResult);
+ when(mockDDBStreamClient.describeStream(argThat(describeWithShardId(shardId2)))).thenReturn(describeStreamResult);
boolean limitExceeded = false;
try {
- proxy.getShardList();
+ ddbProxy.getShardList();
} catch (LimitExceededException le) {
limitExceeded = true;
}
assertThat(limitExceeded, equalTo(true));
- List actualShards = proxy.getShardList();
+ List actualShards = ddbProxy.getShardList();
List expectedShards = Arrays.asList(shard1, shard2, shard3);
assertThat(actualShards, equalTo(expectedShards));
- verify(mockClient).describeStream(argThat(describeWithoutShardId()));
- verify(mockClient, times(4)).describeStream(argThat(describeWithShardId(shardId1)));
- verify(mockClient).describeStream(argThat(describeWithShardId(shardId2)));
+ verify(mockDDBStreamClient).describeStream(argThat(describeWithoutShardId()));
+ verify(mockDDBStreamClient, times(4)).describeStream(argThat(describeWithShardId(shardId1)));
+ verify(mockDDBStreamClient).describeStream(argThat(describeWithShardId(shardId2)));
}
+
+ @Test
+ public void testListShardsWithMoreDataAvailable() {
+ ListShardsResult responseWithMoreData = new ListShardsResult().withShards(shards.subList(0, 2)).withNextToken(NEXT_TOKEN);
+ ListShardsResult responseFinal = new ListShardsResult().withShards(shards.subList(2, shards.size())).withNextToken(null);
+ when(mockClient.listShards(argThat(initialListShardsRequestMatcher()))).thenReturn(responseWithMoreData);
+ when(mockClient.listShards(argThat(listShardsNextToken(NEXT_TOKEN)))).thenReturn(responseFinal);
+
+ Set resultShardIdSets = proxy.getAllShardIds();
+ assertEquals(shardIdSet, resultShardIdSets);
+ }
+
+ @Test
+ public void testListShardsWithLimiteExceededException() {
+ ListShardsResult result = new ListShardsResult().withShards(shards);
+ when(mockClient.listShards(argThat(initialListShardsRequestMatcher()))).thenThrow(LimitExceededException.class).thenReturn(result);
+
+ Set resultShardIdSet = proxy.getAllShardIds();
+ assertEquals(shardIdSet, resultShardIdSet);
+ }
+
+ @Test(expected = AmazonServiceException.class)
+ public void testListShardsFails() {
+ when(mockClient.listShards(any(ListShardsRequest.class))).thenThrow(AmazonServiceException.class);
+ try {
+ proxy.getShardList();
+ } finally {
+ verify(mockClient).listShards(any(ListShardsRequest.class));
+ }
+ }
+
+ @Test
+ public void testListShardsThrottledOnce() {
+ List expected = Collections.singletonList(shard);
+ ListShardsResult result = new ListShardsResult().withShards(expected);
+ when(mockClient.listShards(argThat(initialListShardsRequestMatcher()))).thenThrow(LimitExceededException.class)
+ .thenReturn(result);
+
+ List actualShards = proxy.getShardList();
+
+ assertEquals(expected, actualShards);
+ verify(mockClient, times(2)).listShards(argThat(initialListShardsRequestMatcher()));
+ }
+
+ @Test(expected = LimitExceededException.class)
+ public void testListShardsThrottledAll() {
+ when(mockClient.listShards(argThat(initialListShardsRequestMatcher()))).thenThrow(LimitExceededException.class);
+ proxy.getShardList();
+ }
+
+ @Test
+ public void testStreamNotInCorrectStatus() {
+ when(mockClient.listShards(argThat(initialListShardsRequestMatcher()))).thenThrow(ResourceInUseException.class);
+ assertNull(proxy.getShardList());
+ }
+
+ @Test
+ public void testGetShardListWithDDBChildClient() {
+ DescribeStreamResult responseWithMoreData = createGetStreamInfoResponse(shards.subList(0, 2), true);
+ DescribeStreamResult responseFinal = createGetStreamInfoResponse(shards.subList(2, shards.size()), false);
+ doReturn(responseWithMoreData).when(mockDDBChildClient).describeStream(argThat(new IsRequestWithStartShardId(null)));
+ doReturn(responseFinal).when(mockDDBChildClient)
+ .describeStream(argThat(new OldIsRequestWithStartShardId(shards.get(1).getShardId())));
+
+ Set resultShardIdSets = ddbChildProxy.getAllShardIds();
+ assertThat("Result set should equal to Test set", shardIdSet, equalTo(resultShardIdSets));
+ }
private DescribeStreamResult createGetStreamInfoResponse(List shards1, boolean isHasMoreShards) {
// Create stream description
@@ -261,8 +358,8 @@ public class KinesisProxyTest {
return new IsRequestWithStartShardId(shardId);
}
- // Matcher for testing describe stream request with specific start shard ID.
private static class IsRequestWithStartShardId extends TypeSafeDiagnosingMatcher {
+
private final String shardId;
public IsRequestWithStartShardId(String shardId) {
@@ -291,6 +388,7 @@ public class KinesisProxyTest {
description.appendText("A DescribeStreamRequest with a starting shard if of ").appendValue(shardId);
}
}
+ // Matcher for testing describe stream request with specific start shard ID.
private static class OldIsRequestWithStartShardId extends ArgumentMatcher {
private final String shardId;
@@ -309,5 +407,58 @@ public class KinesisProxyTest {
return startShardId.equals(shardId);
}
}
+
+ private static ListShardsRequestMatcher initialListShardsRequestMatcher() {
+ return new ListShardsRequestMatcher(null, null);
+ }
+
+ private static ListShardsRequestMatcher listShardsNextToken(final String nextToken) {
+ return new ListShardsRequestMatcher(null, nextToken);
+ }
+
+ @AllArgsConstructor
+ private static class ListShardsRequestMatcher extends TypeSafeDiagnosingMatcher {
+ private final String shardId;
+ private final String nextToken;
+
+ @Override
+ protected boolean matchesSafely(final ListShardsRequest listShardsRequest, final Description description) {
+ if (shardId == null) {
+ if (StringUtils.isNotEmpty(listShardsRequest.getExclusiveStartShardId())) {
+ description.appendText("Expected ExclusiveStartShardId to be null, but was ")
+ .appendValue(listShardsRequest.getExclusiveStartShardId());
+ return false;
+ }
+ } else {
+ if (!shardId.equals(listShardsRequest.getExclusiveStartShardId())) {
+ description.appendText("Expected shardId: ").appendValue(shardId)
+ .appendText(" doesn't match actual shardId: ")
+ .appendValue(listShardsRequest.getExclusiveStartShardId());
+ return false;
+ }
+ }
+
+ if (StringUtils.isNotEmpty(listShardsRequest.getNextToken())) {
+ if (StringUtils.isNotEmpty(listShardsRequest.getStreamName()) || StringUtils.isNotEmpty(listShardsRequest.getExclusiveStartShardId())) {
+ return false;
+ }
+
+ if (!listShardsRequest.getNextToken().equals(nextToken)) {
+ description.appendText("Found nextToken: ").appendValue(listShardsRequest.getNextToken())
+ .appendText(" when it was supposed to be null.");
+ return false;
+ }
+ } else {
+ return nextToken == null;
+ }
+ return true;
+ }
+
+ @Override
+ public void describeTo(final Description description) {
+ description.appendText("A ListShardsRequest with a shardId: ").appendValue(shardId)
+ .appendText(" and empty nextToken");
+ }
+ }
}