From 1704d08935dd86ab9c9b9f8e97fe3b7c12ce3858 Mon Sep 17 00:00:00 2001 From: Sahil Palvia Date: Thu, 4 Jan 2018 17:37:07 -0800 Subject: [PATCH] Adding ListShards calls, and managing DescribeStream for DDB Streams. --- pom.xml | 4 +- .../worker/KinesisClientLibConfiguration.java | 38 +++ .../clientlibrary/lib/worker/Worker.java | 6 +- .../clientlibrary/proxies/IKinesisProxy.java | 5 +- .../proxies/IKinesisProxyFactory.java | 10 + .../clientlibrary/proxies/KinesisProxy.java | 207 ++++++++++++--- .../proxies/KinesisProxyFactory.java | 95 +++++-- .../AmazonDynamoDBStreamsAdapterClient.java | 26 ++ ...azonDynamoDBStreamsAdapterClientChild.java | 23 ++ .../worker/ShardSyncTaskIntegrationTest.java | 13 +- .../proxies/KinesisLocalFileProxyFactory.java | 4 + .../proxies/KinesisProxyTest.java | 251 ++++++++++++++---- 12 files changed, 570 insertions(+), 112 deletions(-) create mode 100644 src/test/java/com/amazonaws/services/dynamodbv2/streamsadapter/AmazonDynamoDBStreamsAdapterClient.java create mode 100644 src/test/java/com/amazonaws/services/dynamodbv2/streamsadapter/AmazonDynamoDBStreamsAdapterClientChild.java diff --git a/pom.xml b/pom.xml index 1c521b0e..d15eac0d 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ amazon-kinesis-client jar Amazon Kinesis Client Library for Java - 1.8.8 + 1.9.0-SNAPSHOT The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data from Amazon Kinesis. @@ -40,7 +40,7 @@ com.amazonaws aws-java-sdk-kinesis - ${aws-java-sdk.version} + 1.11.listshards-SNAPSHOT com.amazonaws diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index 3fb36754..a469e3ce 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -182,6 +182,16 @@ public class KinesisClientLibConfiguration { */ public static final int DEFAULT_MAX_LEASE_RENEWAL_THREADS = 20; + /** + * The sleep time between two listShards calls from the proxy when throttled. + */ + public static final long DEFAULT_LIST_SHARDS_BACKOFF_TIME_IN_MILLIS = 1500; + + /** + * The number of times the Proxy will retry listShards call when throttled. + */ + public static final int DEFAULT_MAX_LIST_SHARDS_RETRY_ATTEMPTS = 50; + private String applicationName; private String tableName; private String streamName; @@ -237,6 +247,12 @@ public class KinesisClientLibConfiguration { @Getter private Optional logWarningForTaskAfterMillis = Optional.empty(); + + @Getter + private long listShardsBackoffTimeInMillis = DEFAULT_LIST_SHARDS_BACKOFF_TIME_IN_MILLIS; + + @Getter + private int maxListShardsRetryAttempts = DEFAULT_MAX_LIST_SHARDS_RETRY_ATTEMPTS; /** * Constructor. @@ -1369,4 +1385,26 @@ public class KinesisClientLibConfiguration { this.logWarningForTaskAfterMillis = Optional.of(logWarningForTaskAfterMillis); return this; } + + /** + * @param listShardsBackoffTimeInMillis Max sleep between two listShards call when throttled + * in {@link com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy}. + * @return + */ + public KinesisClientLibConfiguration withListShardsBackoffTimeInMillis(long listShardsBackoffTimeInMillis) { + checkIsValuePositive("listShardsBackoffTimeInMillis", listShardsBackoffTimeInMillis); + this.listShardsBackoffTimeInMillis = listShardsBackoffTimeInMillis; + return this; + } + + /** + * @param maxListShardsRetryAttempts Max number of retries for listShards when throttled + * in {@link com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy}. + * @return + */ + public KinesisClientLibConfiguration withMaxListShardsRetryAttempts(int maxListShardsRetryAttempts) { + checkIsValuePositive("maxListShardsRetryAttempts", maxListShardsRetryAttempts); + this.maxListShardsRetryAttempts = maxListShardsRetryAttempts; + return this; + } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index d2ea738d..04165b15 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -248,8 +248,7 @@ public class Worker implements Runnable { this(config.getApplicationName(), new V1ToV2RecordProcessorFactoryAdapter(recordProcessorFactory), config, new StreamConfig( - new KinesisProxyFactory(config.getKinesisCredentialsProvider(), kinesisClient) - .getProxy(config.getStreamName()), + new KinesisProxyFactory(config, kinesisClient).getProxy(), config.getMaxRecords(), config.getIdleTimeBetweenReadsInMillis(), config.shouldCallProcessRecordsEvenForEmptyRecordList(), config.shouldValidateSequenceNumberBeforeCheckpointing(), @@ -1261,8 +1260,7 @@ public class Worker implements Runnable { return new Worker(config.getApplicationName(), recordProcessorFactory, config, - new StreamConfig(new KinesisProxyFactory(config.getKinesisCredentialsProvider(), - kinesisClient).getProxy(config.getStreamName()), + new StreamConfig(new KinesisProxyFactory(config, kinesisClient).getProxy(), config.getMaxRecords(), config.getIdleTimeBetweenReadsInMillis(), config.shouldCallProcessRecordsEvenForEmptyRecordList(), diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/IKinesisProxy.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/IKinesisProxy.java index df7f951d..2e5a8c0a 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/IKinesisProxy.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/IKinesisProxy.java @@ -46,12 +46,15 @@ public interface IKinesisProxy { throws ResourceNotFoundException, InvalidArgumentException, ExpiredIteratorException; /** - * Fetch information about stream. Useful for fetching the list of shards in a stream. + * Fetch information about stream. Useful for fetching the list of shards in a stream. Going forward this method is + * being deprecated. This method uses DescribeStream call, which is throttled at 10 calls per account by default. + * If possible try to use ListShards call available in the client, or use the getShardList to get shard info. * * @param startShardId exclusive start shardId - used when paginating the list of shards. * @return DescribeStreamOutput object containing a description of the stream. * @throws ResourceNotFoundException The Kinesis stream was not found */ + @Deprecated DescribeStreamResult getStreamInfo(String startShardId) throws ResourceNotFoundException; /** diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/IKinesisProxyFactory.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/IKinesisProxyFactory.java index 0467b8e4..ef9f0182 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/IKinesisProxyFactory.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/IKinesisProxyFactory.java @@ -14,6 +14,9 @@ */ package com.amazonaws.services.kinesis.clientlibrary.proxies; +import com.amazonaws.services.kinesis.AmazonKinesis; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; + /** * Interface for a KinesisProxyFactory. * @@ -25,6 +28,13 @@ public interface IKinesisProxyFactory { * @param streamName Stream from which data is consumed. * @return IKinesisProxy object. */ + @Deprecated IKinesisProxy getProxy(String streamName); + /** + * Return an IKinesisProxy object from the config object. + * @return + */ + IKinesisProxy getProxy(); + } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java index fd45c764..ba2b289f 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java @@ -23,13 +23,16 @@ import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; -import lombok.Data; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.client.builder.AwsClientBuilder; import com.amazonaws.services.kinesis.AmazonKinesis; import com.amazonaws.services.kinesis.AmazonKinesisClient; +import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; import com.amazonaws.services.kinesis.model.DescribeStreamRequest; import com.amazonaws.services.kinesis.model.DescribeStreamResult; import com.amazonaws.services.kinesis.model.ExpiredIteratorException; @@ -39,13 +42,18 @@ import com.amazonaws.services.kinesis.model.GetShardIteratorRequest; import com.amazonaws.services.kinesis.model.GetShardIteratorResult; import com.amazonaws.services.kinesis.model.InvalidArgumentException; import com.amazonaws.services.kinesis.model.LimitExceededException; +import com.amazonaws.services.kinesis.model.ListShardsRequest; +import com.amazonaws.services.kinesis.model.ListShardsResult; import com.amazonaws.services.kinesis.model.PutRecordRequest; import com.amazonaws.services.kinesis.model.PutRecordResult; +import com.amazonaws.services.kinesis.model.ResourceInUseException; import com.amazonaws.services.kinesis.model.ResourceNotFoundException; import com.amazonaws.services.kinesis.model.Shard; import com.amazonaws.services.kinesis.model.ShardIteratorType; import com.amazonaws.services.kinesis.model.StreamStatus; +import lombok.Data; + /** * Kinesis proxy - used to make calls to Amazon Kinesis (e.g. fetch data records and list of shards). */ @@ -70,22 +78,48 @@ public class KinesisProxy implements IKinesisProxyExtended { private static final int DEFAULT_DESCRIBE_STREAM_RETRY_TIMES = 50; private final long describeStreamBackoffTimeInMillis; private final int maxDescribeStreamRetryAttempts; + private final long listShardsBackoffTimeInMillis; + private final int maxListShardsRetryAttempts; + private boolean isKinesisClient = true; + + @Deprecated + private static AmazonKinesisClient buildClientSettingEndpoint(AWSCredentialsProvider credentialProvider, + String endpoint, + String serviceName, + String regionId) { + AmazonKinesisClient client = new AmazonKinesisClient(credentialProvider); + client.setEndpoint(endpoint); + client.setSignerRegionOverride(regionId); + return client; + } /** * Public constructor. + *

+ * Note: Deprecating constructor, this constructor doesn't use AWS best practices, moving forward please use + * {@link #KinesisProxy(KinesisClientLibConfiguration, AmazonKinesis)} or + * {@link #KinesisProxy(String, AmazonKinesis, long, int, long, int)} to create the object. + *

* * @param streamName Data records will be fetched from this stream * @param credentialProvider Provides credentials for signing Kinesis requests * @param endpoint Kinesis endpoint */ - + @Deprecated public KinesisProxy(final String streamName, AWSCredentialsProvider credentialProvider, String endpoint) { this(streamName, credentialProvider, endpoint, defaultServiceName, defaultRegionId, - DEFAULT_DESCRIBE_STREAM_BACKOFF_MILLIS, DEFAULT_DESCRIBE_STREAM_RETRY_TIMES); + DEFAULT_DESCRIBE_STREAM_BACKOFF_MILLIS, DEFAULT_DESCRIBE_STREAM_RETRY_TIMES, + KinesisClientLibConfiguration.DEFAULT_LIST_SHARDS_BACKOFF_TIME_IN_MILLIS, + KinesisClientLibConfiguration.DEFAULT_MAX_LIST_SHARDS_RETRY_ATTEMPTS); } /** * Public constructor. + *

+ * Note: Deprecating constructor, this constructor doesn't use AWS best practices, moving forward please use + * {@link #KinesisProxy(KinesisClientLibConfiguration, AmazonKinesis)} or + * {@link #KinesisProxy(String, AmazonKinesis, long, int, long, int)} to create the object. + *

* * @param streamName Data records will be fetched from this stream * @param credentialProvider Provides credentials for signing Kinesis requests @@ -95,34 +129,33 @@ public class KinesisProxy implements IKinesisProxyExtended { * @param describeStreamBackoffTimeInMillis Backoff time for DescribeStream calls in milliseconds * @param maxDescribeStreamRetryAttempts Number of retry attempts for DescribeStream calls */ + @Deprecated public KinesisProxy(final String streamName, AWSCredentialsProvider credentialProvider, String endpoint, String serviceName, String regionId, long describeStreamBackoffTimeInMillis, - int maxDescribeStreamRetryAttempts) { - this(streamName, credentialProvider, buildClientSettingEndpoint(credentialProvider, - endpoint, - serviceName, - regionId), describeStreamBackoffTimeInMillis, maxDescribeStreamRetryAttempts); - - + int maxDescribeStreamRetryAttempts, + long listShardsBackoffTimeInMillis, + int maxListShardsRetryAttempts) { + this(streamName, + credentialProvider, + buildClientSettingEndpoint(credentialProvider, endpoint, serviceName, regionId), + describeStreamBackoffTimeInMillis, + maxDescribeStreamRetryAttempts, + listShardsBackoffTimeInMillis, + maxListShardsRetryAttempts); LOG.debug("KinesisProxy has created a kinesisClient"); } - - private static AmazonKinesisClient buildClientSettingEndpoint(AWSCredentialsProvider credentialProvider, - String endpoint, - String serviceName, - String regionId) { - AmazonKinesisClient client = new AmazonKinesisClient(credentialProvider); - client.setEndpoint(endpoint); - client.setSignerRegionOverride(regionId); - return client; - } /** * Public constructor. + *

+ * Note: Deprecating constructor, this constructor doesn't use AWS best practices, moving forward please use + * {@link #KinesisProxy(KinesisClientLibConfiguration, AmazonKinesis)} or + * {@link #KinesisProxy(String, AmazonKinesis, long, int, long, int)} to create the object. + *

* * @param streamName Data records will be fetched from this stream * @param credentialProvider Provides credentials for signing Kinesis requests @@ -130,18 +163,56 @@ public class KinesisProxy implements IKinesisProxyExtended { * @param describeStreamBackoffTimeInMillis Backoff time for DescribeStream calls in milliseconds * @param maxDescribeStreamRetryAttempts Number of retry attempts for DescribeStream calls */ + @Deprecated public KinesisProxy(final String streamName, AWSCredentialsProvider credentialProvider, AmazonKinesis kinesisClient, long describeStreamBackoffTimeInMillis, - int maxDescribeStreamRetryAttempts) { - this.streamName = streamName; + int maxDescribeStreamRetryAttempts, + long listShardsBackoffTimeInMillis, + int maxListShardsRetryAttempts) { + this(streamName, kinesisClient, describeStreamBackoffTimeInMillis, maxDescribeStreamRetryAttempts, + listShardsBackoffTimeInMillis, maxListShardsRetryAttempts); this.credentialsProvider = credentialProvider; + LOG.debug("KinesisProxy( " + streamName + ")"); + } + + /** + * Public constructor. + * @param config + */ + public KinesisProxy(final KinesisClientLibConfiguration config, final AmazonKinesis client) { + this(config.getStreamName(), + client, + DEFAULT_DESCRIBE_STREAM_BACKOFF_MILLIS, + DEFAULT_DESCRIBE_STREAM_RETRY_TIMES, + config.getListShardsBackoffTimeInMillis(), + config.getMaxListShardsRetryAttempts()); + this.credentialsProvider = config.getKinesisCredentialsProvider(); + } + + public KinesisProxy(final String streamName, + final AmazonKinesis client, + final long describeStreamBackoffTimeInMillis, + final int maxDescribeStreamRetryAttempts, + final long listShardsBackoffTimeInMillis, + final int maxListShardsRetryAttempts) { + this.streamName = streamName; + this.client = client; this.describeStreamBackoffTimeInMillis = describeStreamBackoffTimeInMillis; this.maxDescribeStreamRetryAttempts = maxDescribeStreamRetryAttempts; - this.client = kinesisClient; + this.listShardsBackoffTimeInMillis = listShardsBackoffTimeInMillis; + this.maxListShardsRetryAttempts = maxListShardsRetryAttempts; - LOG.debug("KinesisProxy( " + streamName + ")"); + try { + if (Class.forName("com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient") + .isAssignableFrom(client.getClass())) { + isKinesisClient = false; + LOG.info("Client is DynamoDb client, will use DescribeStreams."); + } + } catch (ClassNotFoundException e) { + LOG.info("Client is Kinesis Client, using ListShards instead of DescribeStreams."); + } } /** @@ -152,6 +223,7 @@ public class KinesisProxy implements IKinesisProxyExtended { throws ResourceNotFoundException, InvalidArgumentException, ExpiredIteratorException { final GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); + // TODO: remove this once older constructors are removed getRecordsRequest.setRequestCredentials(credentialsProvider.getCredentials()); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(maxRecords); @@ -166,7 +238,9 @@ public class KinesisProxy implements IKinesisProxyExtended { @Override public DescribeStreamResult getStreamInfo(String startShardId) throws ResourceNotFoundException, LimitExceededException { + LOG.info("Using describeStreams calls to get shards list"); final DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); + // TODO: remove this once older constructors are removed describeStreamRequest.setRequestCredentials(credentialsProvider.getCredentials()); describeStreamRequest.setStreamName(streamName); describeStreamRequest.setExclusiveStartShardId(startShardId); @@ -207,6 +281,49 @@ public class KinesisProxy implements IKinesisProxyExtended { return null; } } + + private ListShardsResult listShards(final String nextToken) { + LOG.info("Using ListShards to get shards information"); + final ListShardsRequest request = new ListShardsRequest(); + // TODO: remove this once older constructors are removed + request.setRequestCredentials(credentialsProvider.getCredentials()); + if (StringUtils.isEmpty(nextToken)) { + request.setStreamName(streamName); + } else { + request.setNextToken(nextToken); + } + ListShardsResult result = null; + LimitExceededException lastException = null; + int remainingRetries = this.maxListShardsRetryAttempts; + + while (result == null) { + try { + result = client.listShards(request); + } catch (LimitExceededException e) { + LOG.info("Got LimitExceededException when listing shards " + streamName + ". Backing off for " + + this.listShardsBackoffTimeInMillis + " millis."); + try { + Thread.sleep(this.listShardsBackoffTimeInMillis); + } catch (InterruptedException ie) { + LOG.debug("Stream " + streamName + " : Sleep was interrupted ", ie); + } + lastException = e; + } catch (ResourceInUseException e) { + LOG.info("Stream is not in Active/Updating status, returning null (wait until stream is in Active or" + + " Updating)"); + return null; + } + remainingRetries--; + if (remainingRetries <= 0 && result == null) { + if (lastException != null) { + throw lastException; + } + throw new IllegalStateException("Received null from DescribeStream call."); + } + } + + return result; + } /** * {@inheritDoc} @@ -233,27 +350,47 @@ public class KinesisProxy implements IKinesisProxyExtended { */ @Override public synchronized List getShardList() { - - DescribeStreamResult response; if (shardIterationState == null) { shardIterationState = new ShardIterationState(); } + + if (isKinesisClient) { + ListShardsResult result; + String nextToken = null; + + do { + result = listShards(nextToken); + + if (result == null) { + /* + * If listShards ever returns null, we should bail and return null. This indicates the stream is not + * in ACTIVE or UPDATING state and we may not have accurate/consistent information about the stream. + */ + return null; + } else { + shardIterationState.update(result.getShards()); + nextToken = result.getNextToken(); + } + } while (StringUtils.isNotEmpty(result.getNextToken())); + + } else { + DescribeStreamResult response; - do { - response = getStreamInfo(shardIterationState.getLastShardId()); + do { + response = getStreamInfo(shardIterationState.getLastShardId()); - if (response == null) { + if (response == null) { /* * If getStreamInfo ever returns null, we should bail and return null. This indicates the stream is not * in ACTIVE or UPDATING state and we may not have accurate/consistent information about the stream. */ - return null; - } else { - shardIterationState.update(response.getStreamDescription().getShards()); - } - } while (response.getStreamDescription().isHasMoreShards()); + return null; + } else { + shardIterationState.update(response.getStreamDescription().getShards()); + } + } while (response.getStreamDescription().isHasMoreShards()); + } this.listOfShardsSinceLastGet.set(shardIterationState.getShards()); - shardIterationState = new ShardIterationState(); return listOfShardsSinceLastGet.get(); } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxyFactory.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxyFactory.java index 93df67e0..cc78f324 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxyFactory.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxyFactory.java @@ -18,6 +18,7 @@ import com.amazonaws.ClientConfiguration; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.services.kinesis.AmazonKinesis; import com.amazonaws.services.kinesis.AmazonKinesisClient; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; /** * Factory used for instantiating KinesisProxy objects (to fetch data from Kinesis). @@ -32,44 +33,72 @@ public class KinesisProxyFactory implements IKinesisProxyFactory { private final AmazonKinesis kinesisClient; private final long describeStreamBackoffTimeInMillis; private final int maxDescribeStreamRetryAttempts; + private final long listShardsBackoffTimeInMillis; + private final int maxListShardsRetryAttempts; + private KinesisClientLibConfiguration configuration; /** * Constructor for creating a KinesisProxy factory, using the specified credentials provider and endpoint. + *

+ * Note: Deprecating, moving forward please use + * {@link #KinesisProxyFactory(AWSCredentialsProvider, AmazonKinesis)}, it uses AWS best practices. + *

* * @param credentialProvider credentials provider used to sign requests * @param endpoint Amazon Kinesis endpoint to use */ + @Deprecated public KinesisProxyFactory(AWSCredentialsProvider credentialProvider, String endpoint) { this(credentialProvider, new ClientConfiguration(), endpoint, defaultServiceName, defaultRegionId, - DEFAULT_DESCRIBE_STREAM_BACKOFF_MILLIS, DEFAULT_DESCRIBE_STREAM_RETRY_TIMES); + DEFAULT_DESCRIBE_STREAM_BACKOFF_MILLIS, DEFAULT_DESCRIBE_STREAM_RETRY_TIMES, + KinesisClientLibConfiguration.DEFAULT_LIST_SHARDS_BACKOFF_TIME_IN_MILLIS, + KinesisClientLibConfiguration.DEFAULT_MAX_LIST_SHARDS_RETRY_ATTEMPTS); } /** * Constructor for KinesisProxy factory using the client configuration to use when interacting with Kinesis. - * + *

+ * Note: Deprecating, moving forward please use + * {@link #KinesisProxyFactory(AWSCredentialsProvider, AmazonKinesis)}, it uses AWS best practices. + *

+ * * @param credentialProvider credentials provider used to sign requests * @param clientConfig Client Configuration used when instantiating an AmazonKinesisClient * @param endpoint Amazon Kinesis endpoint to use */ + @Deprecated public KinesisProxyFactory(AWSCredentialsProvider credentialProvider, ClientConfiguration clientConfig, String endpoint) { this(credentialProvider, clientConfig, endpoint, defaultServiceName, defaultRegionId, - DEFAULT_DESCRIBE_STREAM_BACKOFF_MILLIS, DEFAULT_DESCRIBE_STREAM_RETRY_TIMES); + DEFAULT_DESCRIBE_STREAM_BACKOFF_MILLIS, DEFAULT_DESCRIBE_STREAM_RETRY_TIMES, + KinesisClientLibConfiguration.DEFAULT_LIST_SHARDS_BACKOFF_TIME_IN_MILLIS, + KinesisClientLibConfiguration.DEFAULT_MAX_LIST_SHARDS_RETRY_ATTEMPTS); } /** * This constructor may be used to specify the AmazonKinesisClient to use. + *

+ * Note: Deprecating, moving forward please use + * {@link #KinesisProxyFactory(AWSCredentialsProvider, AmazonKinesis)}, it uses AWS best practices. + *

* * @param credentialProvider credentials provider used to sign requests * @param client AmazonKinesisClient used to fetch data from Kinesis */ + @Deprecated public KinesisProxyFactory(AWSCredentialsProvider credentialProvider, AmazonKinesis client) { - this(credentialProvider, client, DEFAULT_DESCRIBE_STREAM_BACKOFF_MILLIS, DEFAULT_DESCRIBE_STREAM_RETRY_TIMES); + this(credentialProvider, client, DEFAULT_DESCRIBE_STREAM_BACKOFF_MILLIS, DEFAULT_DESCRIBE_STREAM_RETRY_TIMES, + KinesisClientLibConfiguration.DEFAULT_LIST_SHARDS_BACKOFF_TIME_IN_MILLIS, + KinesisClientLibConfiguration.DEFAULT_MAX_LIST_SHARDS_RETRY_ATTEMPTS); } /** * Used internally and for development/testing. + *

+ * Note: Deprecating, moving forward please use + * {@link #KinesisProxyFactory(AWSCredentialsProvider, AmazonKinesis)}, it uses AWS best practices. + *

* * @param credentialProvider credentials provider used to sign requests * @param clientConfig Client Configuration used when instantiating an AmazonKinesisClient @@ -79,22 +108,41 @@ public class KinesisProxyFactory implements IKinesisProxyFactory { * @param describeStreamBackoffTimeInMillis backoff time for describing stream in millis * @param maxDescribeStreamRetryAttempts Number of retry attempts for DescribeStream calls */ + @Deprecated KinesisProxyFactory(AWSCredentialsProvider credentialProvider, ClientConfiguration clientConfig, String endpoint, String serviceName, String regionId, long describeStreamBackoffTimeInMillis, - int maxDescribeStreamRetryAttempts) { + int maxDescribeStreamRetryAttempts, + long listShardsBackoffTimeInMillis, + int maxListShardsRetryAttempts) { this(credentialProvider, buildClientSettingEndpoint(credentialProvider, clientConfig, endpoint, serviceName, regionId), - describeStreamBackoffTimeInMillis, maxDescribeStreamRetryAttempts); + describeStreamBackoffTimeInMillis, + maxDescribeStreamRetryAttempts, + listShardsBackoffTimeInMillis, + maxListShardsRetryAttempts); } + /** + * Creating KinesisProxyFactory using Config and Client objects, that will be used by the proxy. + * + * @param configuration Config that will be used to create the Proxy + * @param client Client that will be used by the Proxy + */ + public KinesisProxyFactory(final KinesisClientLibConfiguration configuration, final AmazonKinesis client) { + this(configuration.getKinesisCredentialsProvider(), client, DEFAULT_DESCRIBE_STREAM_BACKOFF_MILLIS, + DEFAULT_DESCRIBE_STREAM_RETRY_TIMES, configuration.getListShardsBackoffTimeInMillis(), + configuration.getMaxListShardsRetryAttempts()); + this.configuration = configuration; + } + /** * Used internally in the class (and for development/testing). * @@ -106,14 +154,18 @@ public class KinesisProxyFactory implements IKinesisProxyFactory { KinesisProxyFactory(AWSCredentialsProvider credentialProvider, AmazonKinesis client, long describeStreamBackoffTimeInMillis, - int maxDescribeStreamRetryAttempts) { + int maxDescribeStreamRetryAttempts, + long listShardsBackoffTimeInMillis, + int maxListShardsRetryAttempts) { super(); this.kinesisClient = client; this.credentialProvider = credentialProvider; this.describeStreamBackoffTimeInMillis = describeStreamBackoffTimeInMillis; this.maxDescribeStreamRetryAttempts = maxDescribeStreamRetryAttempts; + this.listShardsBackoffTimeInMillis = listShardsBackoffTimeInMillis; + this.maxListShardsRetryAttempts = maxListShardsRetryAttempts; } - + /** * {@inheritDoc} */ @@ -123,15 +175,28 @@ public class KinesisProxyFactory implements IKinesisProxyFactory { credentialProvider, kinesisClient, describeStreamBackoffTimeInMillis, - maxDescribeStreamRetryAttempts); - + maxDescribeStreamRetryAttempts, + listShardsBackoffTimeInMillis, + maxListShardsRetryAttempts); } - + + /** + * {@inheritDoc} + */ + @Override + public IKinesisProxy getProxy() { + if (configuration == null) { + throw new IllegalArgumentException("KinesisClientLibConfiguration not set, please make sure to use" + + " KinesisProxyFactory(KinesisClientLibConfiguration, AmazonKinesis) constructor."); + } + return new KinesisProxy(configuration, kinesisClient); + } + private static AmazonKinesisClient buildClientSettingEndpoint(AWSCredentialsProvider credentialProvider, - ClientConfiguration clientConfig, - String endpoint, - String serviceName, - String regionId) { + ClientConfiguration clientConfig, + String endpoint, + String serviceName, + String regionId) { AmazonKinesisClient client = new AmazonKinesisClient(credentialProvider, clientConfig); client.setEndpoint(endpoint); client.setSignerRegionOverride(regionId); diff --git a/src/test/java/com/amazonaws/services/dynamodbv2/streamsadapter/AmazonDynamoDBStreamsAdapterClient.java b/src/test/java/com/amazonaws/services/dynamodbv2/streamsadapter/AmazonDynamoDBStreamsAdapterClient.java new file mode 100644 index 00000000..c7d2db2c --- /dev/null +++ b/src/test/java/com/amazonaws/services/dynamodbv2/streamsadapter/AmazonDynamoDBStreamsAdapterClient.java @@ -0,0 +1,26 @@ +/* + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazonaws.services.dynamodbv2.streamsadapter; + +import com.amazonaws.services.kinesis.AmazonKinesis; +import com.amazonaws.services.kinesis.AmazonKinesisClient; + +/** + * + */ +public class AmazonDynamoDBStreamsAdapterClient extends AmazonKinesisClient { + // This class is used for testing purposes. +} diff --git a/src/test/java/com/amazonaws/services/dynamodbv2/streamsadapter/AmazonDynamoDBStreamsAdapterClientChild.java b/src/test/java/com/amazonaws/services/dynamodbv2/streamsadapter/AmazonDynamoDBStreamsAdapterClientChild.java new file mode 100644 index 00000000..00a053c4 --- /dev/null +++ b/src/test/java/com/amazonaws/services/dynamodbv2/streamsadapter/AmazonDynamoDBStreamsAdapterClientChild.java @@ -0,0 +1,23 @@ +/* + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazonaws.services.dynamodbv2.streamsadapter; + +/** + * + */ +public class AmazonDynamoDBStreamsAdapterClientChild extends AmazonDynamoDBStreamsAdapterClient { + // Used only for testing +} diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java index 307596e3..7adef894 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java @@ -18,6 +18,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -90,10 +91,13 @@ public class ShardSyncTaskIntegrationTest { new KinesisClientLeaseManager("ShardSyncTaskIntegrationTest", new AmazonDynamoDBClient(credentialsProvider), useConsistentReads); - kinesisProxy = - new KinesisProxy(STREAM_NAME, - new DefaultAWSCredentialsProviderChain(), - KINESIS_ENDPOINT); + + AmazonKinesisClient client = new AmazonDynamoDBStreamsAdapterClient(); + client.setEndpoint(KINESIS_ENDPOINT); + client.setSignerRegionOverride("us-east-1"); + + kinesisProxy = new KinesisProxy(STREAM_NAME, new DefaultAWSCredentialsProviderChain(), client, 1000L, 50, 1000L, + 50); } /** @@ -106,7 +110,6 @@ public class ShardSyncTaskIntegrationTest { /** * Test method for call(). * - * @throws CapacityExceededException * @throws DependencyException * @throws InvalidStateException * @throws ProvisionedThroughputException diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxyFactory.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxyFactory.java index 8a053ec4..ece54d68 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxyFactory.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxyFactory.java @@ -61,4 +61,8 @@ public class KinesisLocalFileProxyFactory implements IKinesisProxyFactory { return testKinesisProxy; } + @Override + public IKinesisProxy getProxy() { + return testKinesisProxy; + } } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxyTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxyTest.java index 2b7aa0b7..e9cd5458 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxyTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxyTest.java @@ -19,9 +19,12 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasProperty; import static org.hamcrest.Matchers.isA; import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; import static org.mockito.Matchers.any; import static org.mockito.Matchers.argThat; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -29,13 +32,22 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.stream.Collectors; +import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient; +import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClientChild; +import com.amazonaws.services.kinesis.AmazonKinesis; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; +import com.amazonaws.services.kinesis.model.ListShardsRequest; +import com.amazonaws.services.kinesis.model.ListShardsResult; +import com.amazonaws.services.kinesis.model.ResourceInUseException; +import lombok.AllArgsConstructor; +import org.apache.commons.lang.StringUtils; import org.hamcrest.Description; import org.hamcrest.TypeSafeDiagnosingMatcher; import org.junit.Before; @@ -47,7 +59,6 @@ import org.mockito.runners.MockitoJUnitRunner; import com.amazonaws.AmazonServiceException; import com.amazonaws.auth.AWSCredentialsProvider; -import com.amazonaws.services.kinesis.AmazonKinesisClient; import com.amazonaws.services.kinesis.model.DescribeStreamRequest; import com.amazonaws.services.kinesis.model.DescribeStreamResult; import com.amazonaws.services.kinesis.model.GetShardIteratorRequest; @@ -61,11 +72,18 @@ import com.amazonaws.services.kinesis.model.StreamStatus; @RunWith(MockitoJUnitRunner.class) public class KinesisProxyTest { private static final String TEST_STRING = "TestString"; - private static final long BACKOFF_TIME = 10L; - private static final int RETRY_TIMES = 3; + private static final long DESCRIBE_STREAM_BACKOFF_TIME = 10L; + private static final long LIST_SHARDS_BACKOFF_TIME = 10L; + private static final int DESCRIBE_STREAM_RETRY_TIMES = 3; + private static final int LIST_SHARDS_RETRY_TIMES = 3; + private static final String NEXT_TOKEN = "NextToken"; @Mock - private AmazonKinesisClient mockClient; + private AmazonKinesis mockClient; + @Mock + private AmazonDynamoDBStreamsAdapterClient mockDDBStreamClient; + @Mock + private AmazonDynamoDBStreamsAdapterClientChild mockDDBChildClient; @Mock private AWSCredentialsProvider mockCredentialsProvider; @Mock @@ -76,8 +94,12 @@ public class KinesisProxyTest { private StreamDescription streamDescription; @Mock private Shard shard; + @Mock + private KinesisClientLibConfiguration config; private KinesisProxy proxy; + private KinesisProxy ddbProxy; + private KinesisProxy ddbChildProxy; // Test shards for verifying. private Set shardIdSet; @@ -85,19 +107,24 @@ public class KinesisProxyTest { @Before public void setUpTest() { - // Set up kinesis proxy - proxy = new KinesisProxy(TEST_STRING, mockCredentialsProvider, mockClient, BACKOFF_TIME, RETRY_TIMES); - when(mockCredentialsProvider.getCredentials()).thenReturn(null); + // Set up kinesis ddbProxy + when(config.getStreamName()).thenReturn(TEST_STRING); + when(config.getListShardsBackoffTimeInMillis()).thenReturn(LIST_SHARDS_BACKOFF_TIME); + when(config.getMaxListShardsRetryAttempts()).thenReturn(LIST_SHARDS_RETRY_TIMES); + when(config.getKinesisCredentialsProvider()).thenReturn(mockCredentialsProvider); + + proxy = new KinesisProxy(config, mockClient); + ddbProxy = new KinesisProxy(TEST_STRING, mockCredentialsProvider, mockDDBStreamClient, + DESCRIBE_STREAM_BACKOFF_TIME, DESCRIBE_STREAM_RETRY_TIMES, LIST_SHARDS_BACKOFF_TIME, + LIST_SHARDS_RETRY_TIMES); + ddbChildProxy = new KinesisProxy(TEST_STRING, mockCredentialsProvider, mockDDBChildClient, + DESCRIBE_STREAM_BACKOFF_TIME, DESCRIBE_STREAM_RETRY_TIMES, LIST_SHARDS_BACKOFF_TIME, + LIST_SHARDS_RETRY_TIMES); + // Set up test shards - shardIdSet = new HashSet<>(); - shards = new ArrayList<>(); - String[] shardIds = new String[] { "shard-1", "shard-2", "shard-3", "shard-4" }; - for (String shardId : shardIds) { - Shard shard = new Shard(); - shard.setShardId(shardId); - shards.add(shard); - shardIdSet.add(shardId); - } + List shardIds = Arrays.asList("shard-1", "shard-2", "shard-3", "shard-4"); + shardIdSet = new HashSet<>(shardIds); + shards = shardIds.stream().map(shardId -> new Shard().withShardId(shardId)).collect(Collectors.toList()); } @Test @@ -107,11 +134,11 @@ public class KinesisProxyTest { // Second call describeStream returning response with rest shards. DescribeStreamResult responseWithMoreData = createGetStreamInfoResponse(shards.subList(0, 2), true); DescribeStreamResult responseFinal = createGetStreamInfoResponse(shards.subList(2, shards.size()), false); - doReturn(responseWithMoreData).when(mockClient).describeStream(argThat(new IsRequestWithStartShardId(null))); - doReturn(responseFinal).when(mockClient) + doReturn(responseWithMoreData).when(mockDDBStreamClient).describeStream(argThat(new IsRequestWithStartShardId(null))); + doReturn(responseFinal).when(mockDDBStreamClient) .describeStream(argThat(new OldIsRequestWithStartShardId(shards.get(1).getShardId()))); - Set resultShardIdSets = proxy.getAllShardIds(); + Set resultShardIdSets = ddbProxy.getAllShardIds(); assertThat("Result set should equal to Test set", shardIdSet, equalTo(resultShardIdSets)); } @@ -121,53 +148,56 @@ public class KinesisProxyTest { // First call describeStream throwing LimitExceededException; // Second call describeStream returning shards list. DescribeStreamResult response = createGetStreamInfoResponse(shards, false); - doThrow(new LimitExceededException("Test Exception")).doReturn(response).when(mockClient) + doThrow(new LimitExceededException("Test Exception")).doReturn(response).when(mockDDBStreamClient) .describeStream(argThat(new OldIsRequestWithStartShardId(null))); - Set resultShardIdSet = proxy.getAllShardIds(); + Set resultShardIdSet = ddbProxy.getAllShardIds(); assertThat("Result set should equal to Test set", shardIdSet, equalTo(resultShardIdSet)); } @Test public void testValidShardIteratorType() { - when(mockClient.getShardIterator(any(GetShardIteratorRequest.class))).thenReturn(shardIteratorResult); + when(mockDDBStreamClient.getShardIterator(any(GetShardIteratorRequest.class))).thenReturn(shardIteratorResult); String expectedShardIteratorType = ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(); - proxy.getIterator("Shard-001", expectedShardIteratorType, "1234"); + ddbProxy.getIterator("Shard-001", expectedShardIteratorType, "1234"); - verify(mockClient).getShardIterator(argThat(both(isA(GetShardIteratorRequest.class)) + verify(mockDDBStreamClient).getShardIterator(argThat(both(isA(GetShardIteratorRequest.class)) .and(hasProperty("shardIteratorType", equalTo(expectedShardIteratorType))))); } @Test public void testInvalidShardIteratorIsntChanged() { - when(mockClient.getShardIterator(any(GetShardIteratorRequest.class))).thenReturn(shardIteratorResult); + when(mockDDBStreamClient.getShardIterator(any(GetShardIteratorRequest.class))).thenReturn(shardIteratorResult); String expectedShardIteratorType = ShardIteratorType.AT_TIMESTAMP.toString(); - proxy.getIterator("Shard-001", expectedShardIteratorType, "1234"); + ddbProxy.getIterator("Shard-001", expectedShardIteratorType, "1234"); - verify(mockClient).getShardIterator(argThat(both(isA(GetShardIteratorRequest.class)) + verify(mockDDBStreamClient).getShardIterator(argThat(both(isA(GetShardIteratorRequest.class)) .and(hasProperty("shardIteratorType", equalTo(expectedShardIteratorType))))); } @Test(expected = AmazonServiceException.class) - public void testNullShardIteratorType() { - when(mockClient.getShardIterator(any(GetShardIteratorRequest.class))).thenThrow(new AmazonServiceException("expected null")); + public void testNullShardIteratorType() throws Exception { + when(mockDDBStreamClient.getShardIterator(any(GetShardIteratorRequest.class))).thenThrow(new AmazonServiceException("expected null")); String expectedShardIteratorType = null; - proxy.getIterator("Shard-001", expectedShardIteratorType, "1234"); + ddbProxy.getIterator("Shard-001", expectedShardIteratorType, "1234"); - verify(mockClient).getShardIterator(argThat(both(isA(GetShardIteratorRequest.class)) + verify(mockDDBStreamClient).getShardIterator(argThat(both(isA(GetShardIteratorRequest.class)) .and(hasProperty("shardIteratorType", nullValue(String.class))))); } @Test(expected = AmazonServiceException.class) - public void testGetStreamInfoFails() throws Exception { - when(mockClient.describeStream(any(DescribeStreamRequest.class))).thenThrow(new AmazonServiceException("Test")); - proxy.getShardList(); - verify(mockClient).describeStream(any(DescribeStreamRequest.class)); + public void testGetStreamInfoFails() { + when(mockDDBStreamClient.describeStream(any(DescribeStreamRequest.class))).thenThrow(new AmazonServiceException("Test")); + try { + ddbProxy.getShardList(); + } finally { + verify(mockDDBStreamClient).describeStream(any(DescribeStreamRequest.class)); + } } @Test public void testGetStreamInfoThrottledOnce() throws Exception { - when(mockClient.describeStream(any(DescribeStreamRequest.class))).thenThrow(new LimitExceededException("Test")) + when(mockDDBStreamClient.describeStream(any(DescribeStreamRequest.class))).thenThrow(new LimitExceededException("Test")) .thenReturn(describeStreamResult); when(describeStreamResult.getStreamDescription()).thenReturn(streamDescription); when(streamDescription.getHasMoreShards()).thenReturn(false); @@ -175,11 +205,11 @@ public class KinesisProxyTest { List expectedShards = Collections.singletonList(shard); when(streamDescription.getShards()).thenReturn(expectedShards); - List actualShards = proxy.getShardList(); + List actualShards = ddbProxy.getShardList(); assertThat(actualShards, equalTo(expectedShards)); - verify(mockClient, times(2)).describeStream(any(DescribeStreamRequest.class)); + verify(mockDDBStreamClient, times(2)).describeStream(any(DescribeStreamRequest.class)); verify(describeStreamResult, times(3)).getStreamDescription(); verify(streamDescription).getStreamStatus(); verify(streamDescription).isHasMoreShards(); @@ -187,9 +217,9 @@ public class KinesisProxyTest { @Test(expected = LimitExceededException.class) public void testGetStreamInfoThrottledAll() throws Exception { - when(mockClient.describeStream(any(DescribeStreamRequest.class))).thenThrow(new LimitExceededException("Test")); + when(mockDDBStreamClient.describeStream(any(DescribeStreamRequest.class))).thenThrow(new LimitExceededException("Test")); - proxy.getShardList(); + ddbProxy.getShardList(); } @Test @@ -213,32 +243,99 @@ public class KinesisProxyTest { when(streamDescription.getShards()).thenReturn(shardList1).thenReturn(shardList2).thenReturn(shardList3); when(streamDescription.isHasMoreShards()).thenReturn(true, true, false); - when(mockClient.describeStream(argThat(describeWithoutShardId()))).thenReturn(describeStreamResult); + when(mockDDBStreamClient.describeStream(argThat(describeWithoutShardId()))).thenReturn(describeStreamResult); - when(mockClient.describeStream(argThat(describeWithShardId(shardId1)))) + when(mockDDBStreamClient.describeStream(argThat(describeWithShardId(shardId1)))) .thenThrow(new LimitExceededException("1"), new LimitExceededException("2"), new LimitExceededException("3")) .thenReturn(describeStreamResult); - when(mockClient.describeStream(argThat(describeWithShardId(shardId2)))).thenReturn(describeStreamResult); + when(mockDDBStreamClient.describeStream(argThat(describeWithShardId(shardId2)))).thenReturn(describeStreamResult); boolean limitExceeded = false; try { - proxy.getShardList(); + ddbProxy.getShardList(); } catch (LimitExceededException le) { limitExceeded = true; } assertThat(limitExceeded, equalTo(true)); - List actualShards = proxy.getShardList(); + List actualShards = ddbProxy.getShardList(); List expectedShards = Arrays.asList(shard1, shard2, shard3); assertThat(actualShards, equalTo(expectedShards)); - verify(mockClient).describeStream(argThat(describeWithoutShardId())); - verify(mockClient, times(4)).describeStream(argThat(describeWithShardId(shardId1))); - verify(mockClient).describeStream(argThat(describeWithShardId(shardId2))); + verify(mockDDBStreamClient).describeStream(argThat(describeWithoutShardId())); + verify(mockDDBStreamClient, times(4)).describeStream(argThat(describeWithShardId(shardId1))); + verify(mockDDBStreamClient).describeStream(argThat(describeWithShardId(shardId2))); } + + @Test + public void testListShardsWithMoreDataAvailable() { + ListShardsResult responseWithMoreData = new ListShardsResult().withShards(shards.subList(0, 2)).withNextToken(NEXT_TOKEN); + ListShardsResult responseFinal = new ListShardsResult().withShards(shards.subList(2, shards.size())).withNextToken(null); + when(mockClient.listShards(argThat(initialListShardsRequestMatcher()))).thenReturn(responseWithMoreData); + when(mockClient.listShards(argThat(listShardsNextToken(NEXT_TOKEN)))).thenReturn(responseFinal); + + Set resultShardIdSets = proxy.getAllShardIds(); + assertEquals(shardIdSet, resultShardIdSets); + } + + @Test + public void testListShardsWithLimiteExceededException() { + ListShardsResult result = new ListShardsResult().withShards(shards); + when(mockClient.listShards(argThat(initialListShardsRequestMatcher()))).thenThrow(LimitExceededException.class).thenReturn(result); + + Set resultShardIdSet = proxy.getAllShardIds(); + assertEquals(shardIdSet, resultShardIdSet); + } + + @Test(expected = AmazonServiceException.class) + public void testListShardsFails() { + when(mockClient.listShards(any(ListShardsRequest.class))).thenThrow(AmazonServiceException.class); + try { + proxy.getShardList(); + } finally { + verify(mockClient).listShards(any(ListShardsRequest.class)); + } + } + + @Test + public void testListShardsThrottledOnce() { + List expected = Collections.singletonList(shard); + ListShardsResult result = new ListShardsResult().withShards(expected); + when(mockClient.listShards(argThat(initialListShardsRequestMatcher()))).thenThrow(LimitExceededException.class) + .thenReturn(result); + + List actualShards = proxy.getShardList(); + + assertEquals(expected, actualShards); + verify(mockClient, times(2)).listShards(argThat(initialListShardsRequestMatcher())); + } + + @Test(expected = LimitExceededException.class) + public void testListShardsThrottledAll() { + when(mockClient.listShards(argThat(initialListShardsRequestMatcher()))).thenThrow(LimitExceededException.class); + proxy.getShardList(); + } + + @Test + public void testStreamNotInCorrectStatus() { + when(mockClient.listShards(argThat(initialListShardsRequestMatcher()))).thenThrow(ResourceInUseException.class); + assertNull(proxy.getShardList()); + } + + @Test + public void testGetShardListWithDDBChildClient() { + DescribeStreamResult responseWithMoreData = createGetStreamInfoResponse(shards.subList(0, 2), true); + DescribeStreamResult responseFinal = createGetStreamInfoResponse(shards.subList(2, shards.size()), false); + doReturn(responseWithMoreData).when(mockDDBChildClient).describeStream(argThat(new IsRequestWithStartShardId(null))); + doReturn(responseFinal).when(mockDDBChildClient) + .describeStream(argThat(new OldIsRequestWithStartShardId(shards.get(1).getShardId()))); + + Set resultShardIdSets = ddbChildProxy.getAllShardIds(); + assertThat("Result set should equal to Test set", shardIdSet, equalTo(resultShardIdSets)); + } private DescribeStreamResult createGetStreamInfoResponse(List shards1, boolean isHasMoreShards) { // Create stream description @@ -261,8 +358,8 @@ public class KinesisProxyTest { return new IsRequestWithStartShardId(shardId); } - // Matcher for testing describe stream request with specific start shard ID. private static class IsRequestWithStartShardId extends TypeSafeDiagnosingMatcher { + private final String shardId; public IsRequestWithStartShardId(String shardId) { @@ -291,6 +388,7 @@ public class KinesisProxyTest { description.appendText("A DescribeStreamRequest with a starting shard if of ").appendValue(shardId); } } + // Matcher for testing describe stream request with specific start shard ID. private static class OldIsRequestWithStartShardId extends ArgumentMatcher { private final String shardId; @@ -309,5 +407,58 @@ public class KinesisProxyTest { return startShardId.equals(shardId); } } + + private static ListShardsRequestMatcher initialListShardsRequestMatcher() { + return new ListShardsRequestMatcher(null, null); + } + + private static ListShardsRequestMatcher listShardsNextToken(final String nextToken) { + return new ListShardsRequestMatcher(null, nextToken); + } + + @AllArgsConstructor + private static class ListShardsRequestMatcher extends TypeSafeDiagnosingMatcher { + private final String shardId; + private final String nextToken; + + @Override + protected boolean matchesSafely(final ListShardsRequest listShardsRequest, final Description description) { + if (shardId == null) { + if (StringUtils.isNotEmpty(listShardsRequest.getExclusiveStartShardId())) { + description.appendText("Expected ExclusiveStartShardId to be null, but was ") + .appendValue(listShardsRequest.getExclusiveStartShardId()); + return false; + } + } else { + if (!shardId.equals(listShardsRequest.getExclusiveStartShardId())) { + description.appendText("Expected shardId: ").appendValue(shardId) + .appendText(" doesn't match actual shardId: ") + .appendValue(listShardsRequest.getExclusiveStartShardId()); + return false; + } + } + + if (StringUtils.isNotEmpty(listShardsRequest.getNextToken())) { + if (StringUtils.isNotEmpty(listShardsRequest.getStreamName()) || StringUtils.isNotEmpty(listShardsRequest.getExclusiveStartShardId())) { + return false; + } + + if (!listShardsRequest.getNextToken().equals(nextToken)) { + description.appendText("Found nextToken: ").appendValue(listShardsRequest.getNextToken()) + .appendText(" when it was supposed to be null."); + return false; + } + } else { + return nextToken == null; + } + return true; + } + + @Override + public void describeTo(final Description description) { + description.appendText("A ListShardsRequest with a shardId: ").appendValue(shardId) + .appendText(" and empty nextToken"); + } + } }