From 6a70e3db31c0612d2ffb89851e462b0b0d569a15 Mon Sep 17 00:00:00 2001 From: Justin Pfifer Date: Mon, 18 Mar 2019 10:06:59 -0700 Subject: [PATCH] Added Timeouts for DynamoDB and Kinesis Calls (#518) * Advance version to 2.1.3-SNAPSHOT * Added timeouts for Kinesis and DynamoDB calls Added a timeout to prevent an issue where the Kinesis or DynamoDB call never completes. For Kinesis call to GetRecords the timeout defaults to 30 seconds, and can be configured on the PollingConfig. For DynamoDB and Kinesis (when calling ListShards) the timeout defaults to 60 seconds and can be configured on LeaseManagementConfig. --- amazon-kinesis-client-multilang/pom.xml | 2 +- amazon-kinesis-client/pom.xml | 2 +- .../amazon/kinesis/common/FutureUtils.java | 35 +++ .../kinesis/leases/KinesisShardDetector.java | 59 ++-- .../kinesis/leases/LeaseManagementConfig.java | 8 +- .../DynamoDBLeaseManagementFactory.java | 67 ++++- .../dynamodb/DynamoDBLeaseRefresher.java | 74 +++-- .../retrieval/polling/KinesisDataFetcher.java | 34 ++- .../retrieval/polling/PollingConfig.java | 10 +- .../polling/PrefetchRecordsPublisher.java | 3 + .../SynchronousBlockingRetrievalFactory.java | 19 +- ...ynchronousPrefetchingRetrievalFactory.java | 58 ++-- .../kinesis/common/FutureUtilsTest.java | 55 ++++ .../leases/KinesisShardDetectorTest.java | 24 ++ .../dynamodb/DynamoDBLeaseRefresherTest.java | 268 ++++++++++++++++++ .../polling/KinesisDataFetcherTest.java | 171 +++++++---- ...efetchRecordsPublisherIntegrationTest.java | 6 +- .../polling/PrefetchRecordsPublisherTest.java | 15 + pom.xml | 4 +- 19 files changed, 770 insertions(+), 144 deletions(-) create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/FutureUtils.java create mode 100644 amazon-kinesis-client/src/test/java/software/amazon/kinesis/common/FutureUtilsTest.java create mode 100644 amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java diff --git a/amazon-kinesis-client-multilang/pom.xml b/amazon-kinesis-client-multilang/pom.xml index a6737e0a..bbec0402 100644 --- a/amazon-kinesis-client-multilang/pom.xml +++ b/amazon-kinesis-client-multilang/pom.xml @@ -19,7 +19,7 @@ amazon-kinesis-client-pom software.amazon.kinesis - 2.1.2 + 2.1.3-SNAPSHOT 4.0.0 diff --git a/amazon-kinesis-client/pom.xml b/amazon-kinesis-client/pom.xml index 45126ea0..16b7008f 100644 --- a/amazon-kinesis-client/pom.xml +++ b/amazon-kinesis-client/pom.xml @@ -20,7 +20,7 @@ software.amazon.kinesis amazon-kinesis-client-pom - 2.1.2 + 2.1.3-SNAPSHOT amazon-kinesis-client diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/FutureUtils.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/FutureUtils.java new file mode 100644 index 00000000..6e0abc99 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/FutureUtils.java @@ -0,0 +1,35 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package software.amazon.kinesis.common; + +import java.time.Duration; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class FutureUtils { + + public static T resolveOrCancelFuture(Future future, Duration timeout) + throws ExecutionException, InterruptedException, TimeoutException { + try { + return future.get(timeout.toMillis(), TimeUnit.MILLISECONDS); + } catch (TimeoutException te) { + future.cancel(true); + throw te; + } + } + +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java index b5645d9d..a4c1e55c 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java @@ -1,16 +1,16 @@ /* - * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at * - * http://aws.amazon.com/asl/ + * http://aws.amazon.com/asl/ * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. */ package software.amazon.kinesis.leases; @@ -22,6 +22,8 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.stream.Collectors; @@ -31,7 +33,6 @@ import org.apache.commons.lang3.StringUtils; import lombok.AccessLevel; import lombok.Getter; import lombok.NonNull; -import lombok.RequiredArgsConstructor; import lombok.Synchronized; import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; @@ -44,17 +45,18 @@ import software.amazon.awssdk.services.kinesis.model.ResourceInUseException; import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.kinesis.annotations.KinesisClientInternalApi; +import software.amazon.kinesis.common.FutureUtils; import software.amazon.kinesis.common.KinesisRequestsBuilder; import software.amazon.kinesis.retrieval.AWSExceptionManager; /** * */ -@RequiredArgsConstructor @Slf4j @Accessors(fluent = true) @KinesisClientInternalApi public class KinesisShardDetector implements ShardDetector { + @NonNull private final KinesisAsyncClient kinesisClient; @NonNull @@ -64,12 +66,35 @@ public class KinesisShardDetector implements ShardDetector { private final long listShardsCacheAllowedAgeInSeconds; private final int maxCacheMissesBeforeReload; private final int cacheMissWarningModulus; + private final Duration kinesisRequestTimeout; private volatile Map cachedShardMap = null; private volatile Instant lastCacheUpdateTime; @Getter(AccessLevel.PACKAGE) private AtomicInteger cacheMisses = new AtomicInteger(0); + @Deprecated + public KinesisShardDetector(KinesisAsyncClient kinesisClient, String streamName, long listShardsBackoffTimeInMillis, + int maxListShardsRetryAttempts, long listShardsCacheAllowedAgeInSeconds, int maxCacheMissesBeforeReload, + int cacheMissWarningModulus) { + this(kinesisClient, streamName, listShardsBackoffTimeInMillis, maxListShardsRetryAttempts, + listShardsCacheAllowedAgeInSeconds, maxCacheMissesBeforeReload, cacheMissWarningModulus, + LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT); + } + + public KinesisShardDetector(KinesisAsyncClient kinesisClient, String streamName, long listShardsBackoffTimeInMillis, + int maxListShardsRetryAttempts, long listShardsCacheAllowedAgeInSeconds, int maxCacheMissesBeforeReload, + int cacheMissWarningModulus, Duration kinesisRequestTimeout) { + this.kinesisClient = kinesisClient; + this.streamName = streamName; + this.listShardsBackoffTimeInMillis = listShardsBackoffTimeInMillis; + this.maxListShardsRetryAttempts = maxListShardsRetryAttempts; + this.listShardsCacheAllowedAgeInSeconds = listShardsCacheAllowedAgeInSeconds; + this.maxCacheMissesBeforeReload = maxCacheMissesBeforeReload; + this.cacheMissWarningModulus = cacheMissWarningModulus; + this.kinesisRequestTimeout = kinesisRequestTimeout; + } + @Override public Shard shard(@NonNull final String shardId) { if (CollectionUtils.isNullOrEmpty(this.cachedShardMap)) { @@ -132,10 +157,10 @@ public class KinesisShardDetector implements ShardDetector { result = listShards(nextToken); if (result == null) { - /* - * If listShards ever returns null, we should bail and return null. This indicates the stream is not - * in ACTIVE or UPDATING state and we may not have accurate/consistent information about the stream. - */ + /* + * If listShards ever returns null, we should bail and return null. This indicates the stream is not + * in ACTIVE or UPDATING state and we may not have accurate/consistent information about the stream. + */ return null; } else { shards.addAll(result.shards()); @@ -167,7 +192,7 @@ public class KinesisShardDetector implements ShardDetector { try { try { - result = kinesisClient.listShards(request.build()).get(); + result = FutureUtils.resolveOrCancelFuture(kinesisClient.listShards(request.build()), kinesisRequestTimeout); } catch (ExecutionException e) { throw exceptionManager.apply(e.getCause()); } catch (InterruptedException e) { @@ -188,6 +213,8 @@ public class KinesisShardDetector implements ShardDetector { log.debug("Stream {} : Sleep was interrupted ", streamName, ie); } lastException = e; + } catch (TimeoutException te) { + throw new RuntimeException(te); } remainingRetries--; if (remainingRetries <= 0 && result == null) { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java index 5c98bae6..6aa461fd 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java @@ -15,6 +15,7 @@ package software.amazon.kinesis.leases; +import java.time.Duration; import java.util.concurrent.ExecutorService; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; @@ -41,6 +42,9 @@ import software.amazon.kinesis.metrics.NullMetricsFactory; @Data @Accessors(fluent = true) public class LeaseManagementConfig { + + public static final Duration DEFAULT_REQUEST_TIMEOUT = Duration.ofMinutes(1); + /** * Name of the table to use in DynamoDB * @@ -159,6 +163,8 @@ public class LeaseManagementConfig { public long epsilonMillis = 25L; + private Duration dynamoDbRequestTimeout = DEFAULT_REQUEST_TIMEOUT; + /** * The initial position for getting records from Kinesis streams. * @@ -261,7 +267,7 @@ public class LeaseManagementConfig { initialLeaseTableReadCapacity(), initialLeaseTableWriteCapacity(), hierarchicalShardSyncer(), - tableCreatorCallback()); + tableCreatorCallback(), dynamoDbRequestTimeout()); } return leaseManagementFactory; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java index 1ec3e0b3..8ec305c4 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java @@ -15,6 +15,7 @@ package software.amazon.kinesis.leases.dynamodb; +import java.time.Duration; import java.util.concurrent.ExecutorService; import lombok.Data; @@ -26,6 +27,7 @@ import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.leases.HierarchicalShardSyncer; import software.amazon.kinesis.leases.KinesisShardDetector; import software.amazon.kinesis.leases.LeaseCoordinator; +import software.amazon.kinesis.leases.LeaseManagementConfig; import software.amazon.kinesis.leases.LeaseManagementFactory; import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardSyncTaskManager; @@ -37,6 +39,7 @@ import software.amazon.kinesis.metrics.MetricsFactory; @Data @KinesisClientInternalApi public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { + @NonNull private final KinesisAsyncClient kinesisClient; @NonNull @@ -71,6 +74,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { private final long initialLeaseTableReadCapacity; private final long initialLeaseTableWriteCapacity; private final TableCreatorCallback tableCreatorCallback; + private final Duration dynamoDbRequestTimeout; /** * Constructor. @@ -166,7 +170,8 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis, maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds, cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity, - new HierarchicalShardSyncer(), TableCreatorCallback.NOOP_TABLE_CREATOR_CALLBACK); + new HierarchicalShardSyncer(), TableCreatorCallback.NOOP_TABLE_CREATOR_CALLBACK, + LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT); } /** @@ -198,6 +203,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { * @param hierarchicalShardSyncer * @param tableCreatorCallback */ + @Deprecated public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final String streamName, final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier, final ExecutorService executorService, final InitialPositionInStreamExtended initialPositionInStream, @@ -208,8 +214,58 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload, final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus, final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity, - final HierarchicalShardSyncer hierarchicalShardSyncer, - final TableCreatorCallback tableCreatorCallback) { + final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback) { + this(kinesisClient, streamName, dynamoDBClient, tableName, workerIdentifier, executorService, + initialPositionInStream, failoverTimeMillis, epsilonMillis, maxLeasesForWorker, + maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion, + ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis, + maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds, + cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity, + hierarchicalShardSyncer, tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT); + } + + /** + * Constructor. + * + * @param kinesisClient + * @param streamName + * @param dynamoDBClient + * @param tableName + * @param workerIdentifier + * @param executorService + * @param initialPositionInStream + * @param failoverTimeMillis + * @param epsilonMillis + * @param maxLeasesForWorker + * @param maxLeasesToStealAtOneTime + * @param maxLeaseRenewalThreads + * @param cleanupLeasesUponShardCompletion + * @param ignoreUnexpectedChildShards + * @param shardSyncIntervalMillis + * @param consistentReads + * @param listShardsBackoffTimeMillis + * @param maxListShardsRetryAttempts + * @param maxCacheMissesBeforeReload + * @param listShardsCacheAllowedAgeInSeconds + * @param cacheMissWarningModulus + * @param initialLeaseTableReadCapacity + * @param initialLeaseTableWriteCapacity + * @param hierarchicalShardSyncer + * @param tableCreatorCallback + * @param dynamoDbRequestTimeout + */ + public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final String streamName, + final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier, + final ExecutorService executorService, final InitialPositionInStreamExtended initialPositionInStream, + final long failoverTimeMillis, final long epsilonMillis, final int maxLeasesForWorker, + final int maxLeasesToStealAtOneTime, final int maxLeaseRenewalThreads, + final boolean cleanupLeasesUponShardCompletion, final boolean ignoreUnexpectedChildShards, + final long shardSyncIntervalMillis, final boolean consistentReads, final long listShardsBackoffTimeMillis, + final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload, + final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus, + final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity, + final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback, + Duration dynamoDbRequestTimeout) { this.kinesisClient = kinesisClient; this.streamName = streamName; this.dynamoDBClient = dynamoDBClient; @@ -235,6 +291,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { this.initialLeaseTableWriteCapacity = initialLeaseTableWriteCapacity; this.hierarchicalShardSyncer = hierarchicalShardSyncer; this.tableCreatorCallback = tableCreatorCallback; + this.dynamoDbRequestTimeout = dynamoDbRequestTimeout; } @Override @@ -267,13 +324,13 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { @Override public DynamoDBLeaseRefresher createLeaseRefresher() { return new DynamoDBLeaseRefresher(tableName, dynamoDBClient, new DynamoDBLeaseSerializer(), consistentReads, - tableCreatorCallback); + tableCreatorCallback, dynamoDbRequestTimeout); } @Override public ShardDetector createShardDetector() { return new KinesisShardDetector(kinesisClient, streamName, listShardsBackoffTimeMillis, maxListShardsRetryAttempts, listShardsCacheAllowedAgeInSeconds, maxCacheMissesBeforeReload, - cacheMissWarningModulus); + cacheMissWarningModulus, dynamoDbRequestTimeout); } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java index 79a12fc3..c6d53f90 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java @@ -14,11 +14,13 @@ */ package software.amazon.kinesis.leases.dynamodb; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; @@ -45,7 +47,9 @@ import software.amazon.awssdk.services.dynamodb.model.TableStatus; import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest; import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.kinesis.annotations.KinesisClientInternalApi; +import software.amazon.kinesis.common.FutureUtils; import software.amazon.kinesis.leases.Lease; +import software.amazon.kinesis.leases.LeaseManagementConfig; import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.LeaseSerializer; import software.amazon.kinesis.leases.exceptions.DependencyException; @@ -60,12 +64,15 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; @Slf4j @KinesisClientInternalApi public class DynamoDBLeaseRefresher implements LeaseRefresher { + protected final String table; protected final DynamoDbAsyncClient dynamoDBClient; protected final LeaseSerializer serializer; protected final boolean consistentReads; private final TableCreatorCallback tableCreatorCallback; + private final Duration dynamoDbRequestTimeout; + private boolean newTableCreated = false; /** @@ -95,14 +102,31 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { * @param consistentReads * @param tableCreatorCallback */ + @Deprecated public DynamoDBLeaseRefresher(final String table, final DynamoDbAsyncClient dynamoDBClient, - final LeaseSerializer serializer, final boolean consistentReads, - @NonNull final TableCreatorCallback tableCreatorCallback) { + final LeaseSerializer serializer, final boolean consistentReads, + @NonNull final TableCreatorCallback tableCreatorCallback) { + this(table, dynamoDBClient, serializer, consistentReads, tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT); + } + + /** + * Constructor. + * @param table + * @param dynamoDBClient + * @param serializer + * @param consistentReads + * @param tableCreatorCallback + * @param dynamoDbRequestTimeout + */ + public DynamoDBLeaseRefresher(final String table, final DynamoDbAsyncClient dynamoDBClient, + final LeaseSerializer serializer, final boolean consistentReads, + @NonNull final TableCreatorCallback tableCreatorCallback, Duration dynamoDbRequestTimeout) { this.table = table; this.dynamoDBClient = dynamoDBClient; this.serializer = serializer; this.consistentReads = consistentReads; this.tableCreatorCallback = tableCreatorCallback; + this.dynamoDbRequestTimeout = dynamoDbRequestTimeout; } /** @@ -132,7 +156,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { try { try { - dynamoDBClient.createTable(request).get(); + FutureUtils.resolveOrCancelFuture(dynamoDBClient.createTable(request), dynamoDbRequestTimeout); newTableCreated = true; } catch (ExecutionException e) { throw exceptionManager.apply(e.getCause()); @@ -144,7 +168,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { return newTableCreated; } catch (LimitExceededException e) { throw new ProvisionedThroughputException("Capacity exceeded when creating table " + table, e); - } catch (DynamoDbException e) { + } catch (DynamoDbException | TimeoutException e) { throw new DependencyException(e); } return newTableCreated; @@ -167,7 +191,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { DescribeTableResponse result; try { try { - result = dynamoDBClient.describeTable(request).get(); + result = FutureUtils.resolveOrCancelFuture(dynamoDBClient.describeTable(request), dynamoDbRequestTimeout); } catch (ExecutionException e) { throw exceptionManager.apply(e.getCause()); } catch (InterruptedException e) { @@ -177,7 +201,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { } catch (ResourceNotFoundException e) { log.debug("Got ResourceNotFoundException for table {} in leaseTableExists, returning false.", table); return null; - } catch (DynamoDbException e) { + } catch (DynamoDbException | TimeoutException e) { throw new DependencyException(e); } @@ -269,7 +293,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { try { try { - ScanResponse scanResult = dynamoDBClient.scan(scanRequest).get(); + ScanResponse scanResult = FutureUtils.resolveOrCancelFuture(dynamoDBClient.scan(scanRequest), dynamoDbRequestTimeout); List result = new ArrayList<>(); while (scanResult != null) { @@ -287,7 +311,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { // Make another request, picking up where we left off. scanRequest = scanRequest.toBuilder().exclusiveStartKey(lastEvaluatedKey).build(); log.debug("lastEvaluatedKey was {}, continuing scan.", lastEvaluatedKey); - scanResult = dynamoDBClient.scan(scanRequest).get(); + scanResult = FutureUtils.resolveOrCancelFuture(dynamoDBClient.scan(scanRequest), dynamoDbRequestTimeout); } } log.debug("Listed {} leases from table {}", result.size(), table); @@ -302,7 +326,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { throw new InvalidStateException("Cannot scan lease table " + table + " because it does not exist.", e); } catch (ProvisionedThroughputExceededException e) { throw new ProvisionedThroughputException(e); - } catch (DynamoDbException e) { + } catch (DynamoDbException | TimeoutException e) { throw new DependencyException(e); } } @@ -323,7 +347,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { try { try { - dynamoDBClient.putItem(request).get(); + FutureUtils.resolveOrCancelFuture(dynamoDBClient.putItem(request), dynamoDbRequestTimeout); } catch (ExecutionException e) { throw exceptionManager.apply(e.getCause()); } catch (InterruptedException e) { @@ -333,7 +357,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { } catch (ConditionalCheckFailedException e) { log.debug("Did not create lease {} because it already existed", lease); return false; - } catch (DynamoDbException e) { + } catch (DynamoDbException | TimeoutException e) { throw convertAndRethrowExceptions("create", lease.leaseKey(), e); } return true; @@ -352,7 +376,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { final AWSExceptionManager exceptionManager = createExceptionManager(); try { try { - GetItemResponse result = dynamoDBClient.getItem(request).get(); + GetItemResponse result = FutureUtils.resolveOrCancelFuture(dynamoDBClient.getItem(request), dynamoDbRequestTimeout); Map dynamoRecord = result.item(); if (CollectionUtils.isNullOrEmpty(dynamoRecord)) { @@ -369,7 +393,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { // TODO: check behavior throw new DependencyException(e); } - } catch (DynamoDbException e) { + } catch (DynamoDbException | TimeoutException e) { throw convertAndRethrowExceptions("get", leaseKey, e); } } @@ -391,7 +415,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { try { try { - dynamoDBClient.updateItem(request).get(); + FutureUtils.resolveOrCancelFuture(dynamoDBClient.updateItem(request), dynamoDbRequestTimeout); } catch (ExecutionException e) { throw exceptionManager.apply(e.getCause()); } catch (InterruptedException e) { @@ -414,7 +438,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { } log.info("Detected spurious renewal failure for lease with key {}, but recovered", lease.leaseKey()); - } catch (DynamoDbException e) { + } catch (DynamoDbException | TimeoutException e) { throw new DependencyException(e); } @@ -444,7 +468,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { try { try { - dynamoDBClient.updateItem(request).get(); + FutureUtils.resolveOrCancelFuture(dynamoDBClient.updateItem(request), dynamoDbRequestTimeout); } catch (ExecutionException e) { throw exceptionManager.apply(e.getCause()); } catch (InterruptedException e) { @@ -455,7 +479,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { log.debug("Lease renewal failed for lease with key {} because the lease counter was not {}", lease.leaseKey(), lease.leaseCounter()); return false; - } catch (DynamoDbException e) { + } catch (DynamoDbException | TimeoutException e) { throw convertAndRethrowExceptions("take", lease.leaseKey(), e); } @@ -487,7 +511,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { try { try { - dynamoDBClient.updateItem(request).get(); + FutureUtils.resolveOrCancelFuture(dynamoDBClient.updateItem(request), dynamoDbRequestTimeout); } catch (ExecutionException e) { throw exceptionManager.apply(e.getCause()); } catch (InterruptedException e) { @@ -498,7 +522,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { log.debug("Lease eviction failed for lease with key {} because the lease owner was not {}", lease.leaseKey(), lease.leaseOwner()); return false; - } catch (DynamoDbException e) { + } catch (DynamoDbException | TimeoutException e) { throw convertAndRethrowExceptions("evict", lease.leaseKey(), e); } @@ -522,14 +546,14 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { try { try { - dynamoDBClient.deleteItem(deleteRequest).get(); + FutureUtils.resolveOrCancelFuture(dynamoDBClient.deleteItem(deleteRequest), dynamoDbRequestTimeout); } catch (ExecutionException e) { throw exceptionManager.apply(e.getCause()); } catch (InterruptedException e) { // TODO: check the behavior throw new DependencyException(e); } - } catch (DynamoDbException e) { + } catch (DynamoDbException | TimeoutException e) { throw convertAndRethrowExceptions("deleteAll", lease.leaseKey(), e); } } @@ -549,14 +573,14 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { final AWSExceptionManager exceptionManager = createExceptionManager(); try { try { - dynamoDBClient.deleteItem(deleteRequest).get(); + FutureUtils.resolveOrCancelFuture(dynamoDBClient.deleteItem(deleteRequest), dynamoDbRequestTimeout); } catch (ExecutionException e) { throw exceptionManager.apply(e.getCause()); } catch (InterruptedException e) { // TODO: Check if this is the correct behavior throw new DependencyException(e); } - } catch (DynamoDbException e) { + } catch (DynamoDbException | TimeoutException e) { throw convertAndRethrowExceptions("delete", lease.leaseKey(), e); } } @@ -580,7 +604,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { try { try { - dynamoDBClient.updateItem(request).get(); + FutureUtils.resolveOrCancelFuture(dynamoDBClient.updateItem(request), dynamoDbRequestTimeout); } catch (ExecutionException e) { throw exceptionManager.apply(e.getCause()); } catch (InterruptedException e) { @@ -590,7 +614,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { log.debug("Lease update failed for lease with key {} because the lease counter was not {}", lease.leaseKey(), lease.leaseCounter()); return false; - } catch (DynamoDbException e) { + } catch (DynamoDbException | TimeoutException e) { throw convertAndRethrowExceptions("update", lease.leaseKey(), e); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java index 129fd15b..459c4155 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java @@ -14,8 +14,10 @@ */ package software.amazon.kinesis.retrieval.polling; +import java.time.Duration; import java.util.Collections; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; import org.apache.commons.lang3.StringUtils; @@ -25,7 +27,6 @@ import lombok.AccessLevel; import lombok.Data; import lombok.Getter; import lombok.NonNull; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; @@ -35,6 +36,8 @@ import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse; import software.amazon.awssdk.services.kinesis.model.KinesisException; import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; +import software.amazon.kinesis.annotations.KinesisClientInternalApi; +import software.amazon.kinesis.common.FutureUtils; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.KinesisRequestsBuilder; import software.amazon.kinesis.metrics.MetricsFactory; @@ -44,14 +47,16 @@ import software.amazon.kinesis.metrics.MetricsUtil; import software.amazon.kinesis.retrieval.AWSExceptionManager; import software.amazon.kinesis.retrieval.DataFetcherResult; import software.amazon.kinesis.retrieval.IteratorBuilder; +import software.amazon.kinesis.retrieval.RetryableRetrievalException; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; /** * Used to get data from Amazon Kinesis. Tracks iterator state internally. */ -@RequiredArgsConstructor @Slf4j +@KinesisClientInternalApi public class KinesisDataFetcher { + private static final String METRICS_PREFIX = "KinesisDataFetcher"; private static final String OPERATION = "ProcessTask"; @@ -64,6 +69,21 @@ public class KinesisDataFetcher { private final int maxRecords; @NonNull private final MetricsFactory metricsFactory; + private final Duration maxFutureWait; + + @Deprecated + public KinesisDataFetcher(KinesisAsyncClient kinesisClient, String streamName, String shardId, int maxRecords, MetricsFactory metricsFactory) { + this(kinesisClient, streamName, shardId, maxRecords, metricsFactory, PollingConfig.DEFAULT_REQUEST_TIMEOUT); + } + + public KinesisDataFetcher(KinesisAsyncClient kinesisClient, String streamName, String shardId, int maxRecords, MetricsFactory metricsFactory, Duration maxFutureWait) { + this.kinesisClient = kinesisClient; + this.streamName = streamName; + this.shardId = shardId; + this.maxRecords = maxRecords; + this.metricsFactory = metricsFactory; + this.maxFutureWait = maxFutureWait; + } /** Note: This method has package level access for testing purposes. * @return nextIterator @@ -191,7 +211,8 @@ public class KinesisDataFetcher { try { try { - final GetShardIteratorResponse result = kinesisClient.getShardIterator(request).get(); + final GetShardIteratorResponse result = FutureUtils + .resolveOrCancelFuture(kinesisClient.getShardIterator(request), maxFutureWait); nextIterator = result.shardIterator(); success = true; } catch (ExecutionException e) { @@ -199,6 +220,8 @@ public class KinesisDataFetcher { } catch (InterruptedException e) { // TODO: Check behavior throw new RuntimeException(e); + } catch (TimeoutException e) { + throw new RetryableRetrievalException(e.getMessage(), e); } } catch (ResourceNotFoundException e) { log.info("Caught ResourceNotFoundException when getting an iterator for shard {}", shardId, e); @@ -244,7 +267,8 @@ public class KinesisDataFetcher { boolean success = false; long startTime = System.currentTimeMillis(); try { - final GetRecordsResponse response = kinesisClient.getRecords(request).get(); + final GetRecordsResponse response = FutureUtils.resolveOrCancelFuture(kinesisClient.getRecords(request), + maxFutureWait); success = true; return response; } catch (ExecutionException e) { @@ -253,6 +277,8 @@ public class KinesisDataFetcher { // TODO: Check behavior log.debug("Interrupt called on metod, shutdown initiated"); throw new RuntimeException(e); + } catch (TimeoutException e) { + throw new RetryableRetrievalException(e.getMessage(), e); } finally { MetricsUtil.addSuccessAndLatency(metricsScope, String.format("%s.%s", METRICS_PREFIX, "getRecords"), success, startTime, MetricsLevel.DETAILED); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PollingConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PollingConfig.java index e9b4e6a2..666cb3a9 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PollingConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PollingConfig.java @@ -15,6 +15,7 @@ package software.amazon.kinesis.retrieval.polling; +import java.time.Duration; import java.util.Optional; import lombok.Data; @@ -32,6 +33,8 @@ import software.amazon.kinesis.retrieval.RetrievalSpecificConfig; @Getter public class PollingConfig implements RetrievalSpecificConfig { + public static final Duration DEFAULT_REQUEST_TIMEOUT = Duration.ofSeconds(30); + /** * Name of the Kinesis stream. * @@ -94,9 +97,14 @@ public class PollingConfig implements RetrievalSpecificConfig { */ private RecordsFetcherFactory recordsFetcherFactory = new SimpleRecordsFetcherFactory(); + /** + * The maximum time to wait for a future request from Kinesis to complete + */ + private Duration kinesisRequestTimeout = DEFAULT_REQUEST_TIMEOUT; + @Override public RetrievalFactory retrievalFactory() { return new SynchronousBlockingRetrievalFactory(streamName(), kinesisClient(), recordsFetcherFactory, - maxRecords()); + maxRecords(), kinesisRequestTimeout); } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java index a441f721..3acd3169 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java @@ -51,6 +51,7 @@ import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; import software.amazon.kinesis.retrieval.KinesisClientRecord; import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RecordsRetrieved; +import software.amazon.kinesis.retrieval.RetryableRetrievalException; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; /** @@ -317,6 +318,8 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { drainQueueForRequests(); } catch (PositionResetException pse) { throw pse; + } catch (RetryableRetrievalException rre) { + log.info("Timeout occurred while waiting for response from Kinesis. Will retry the request."); } catch (InterruptedException e) { log.info("Thread was interrupted, indicating shutdown was called on the cache."); } catch (ExpiredIteratorException e) { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousBlockingRetrievalFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousBlockingRetrievalFactory.java index 1e827fc9..7c55d4e2 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousBlockingRetrievalFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousBlockingRetrievalFactory.java @@ -19,12 +19,15 @@ import software.amazon.kinesis.retrieval.RecordsFetcherFactory; import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RetrievalFactory; +import java.time.Duration; + /** * */ @Data @KinesisClientInternalApi public class SynchronousBlockingRetrievalFactory implements RetrievalFactory { + @NonNull private final String streamName; @NonNull @@ -34,12 +37,26 @@ public class SynchronousBlockingRetrievalFactory implements RetrievalFactory { // private final long listShardsBackoffTimeInMillis; // private final int maxListShardsRetryAttempts; private final int maxRecords; + private final Duration kinesisRequestTimeout; + + public SynchronousBlockingRetrievalFactory(String streamName, KinesisAsyncClient kinesisClient, RecordsFetcherFactory recordsFetcherFactory, int maxRecords, Duration kinesisRequestTimeout) { + this.streamName = streamName; + this.kinesisClient = kinesisClient; + this.recordsFetcherFactory = recordsFetcherFactory; + this.maxRecords = maxRecords; + this.kinesisRequestTimeout = kinesisRequestTimeout; + } + + @Deprecated + public SynchronousBlockingRetrievalFactory(String streamName, KinesisAsyncClient kinesisClient, RecordsFetcherFactory recordsFetcherFactory, int maxRecords) { + this(streamName, kinesisClient, recordsFetcherFactory, maxRecords, PollingConfig.DEFAULT_REQUEST_TIMEOUT); + } @Override public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(@NonNull final ShardInfo shardInfo, @NonNull final MetricsFactory metricsFactory) { return new SynchronousGetRecordsRetrievalStrategy( - new KinesisDataFetcher(kinesisClient, streamName, shardInfo.shardId(), maxRecords, metricsFactory)); + new KinesisDataFetcher(kinesisClient, streamName, shardInfo.shardId(), maxRecords, metricsFactory, kinesisRequestTimeout)); } @Override diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousPrefetchingRetrievalFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousPrefetchingRetrievalFactory.java index be69618d..9ec97c5b 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousPrefetchingRetrievalFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousPrefetchingRetrievalFactory.java @@ -1,24 +1,24 @@ /* - * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at * - * http://aws.amazon.com/asl/ + * http://aws.amazon.com/asl/ * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. */ package software.amazon.kinesis.retrieval.polling; +import java.time.Duration; import java.util.concurrent.ExecutorService; import lombok.NonNull; -import lombok.RequiredArgsConstructor; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.leases.ShardInfo; @@ -31,7 +31,6 @@ import software.amazon.kinesis.retrieval.RetrievalFactory; /** * */ -@RequiredArgsConstructor @KinesisClientInternalApi public class SynchronousPrefetchingRetrievalFactory implements RetrievalFactory { @NonNull @@ -44,26 +43,41 @@ public class SynchronousPrefetchingRetrievalFactory implements RetrievalFactory @NonNull private final ExecutorService executorService; private final long idleMillisBetweenCalls; + private final Duration maxFutureWait; + + @Deprecated + public SynchronousPrefetchingRetrievalFactory(String streamName, KinesisAsyncClient kinesisClient, + RecordsFetcherFactory recordsFetcherFactory, int maxRecords, ExecutorService executorService, + long idleMillisBetweenCalls) { + this(streamName, kinesisClient, recordsFetcherFactory, maxRecords, executorService, idleMillisBetweenCalls, + PollingConfig.DEFAULT_REQUEST_TIMEOUT); + } + + public SynchronousPrefetchingRetrievalFactory(String streamName, KinesisAsyncClient kinesisClient, + RecordsFetcherFactory recordsFetcherFactory, int maxRecords, ExecutorService executorService, + long idleMillisBetweenCalls, Duration maxFutureWait) { + this.streamName = streamName; + this.kinesisClient = kinesisClient; + this.recordsFetcherFactory = recordsFetcherFactory; + this.maxRecords = maxRecords; + this.executorService = executorService; + this.idleMillisBetweenCalls = idleMillisBetweenCalls; + this.maxFutureWait = maxFutureWait; + } @Override public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(@NonNull final ShardInfo shardInfo, @NonNull final MetricsFactory metricsFactory) { - return new SynchronousGetRecordsRetrievalStrategy( - new KinesisDataFetcher(kinesisClient, streamName, shardInfo.shardId(), maxRecords, metricsFactory)); + return new SynchronousGetRecordsRetrievalStrategy(new KinesisDataFetcher(kinesisClient, streamName, + shardInfo.shardId(), maxRecords, metricsFactory, maxFutureWait)); } @Override public RecordsPublisher createGetRecordsCache(@NonNull final ShardInfo shardInfo, @NonNull final MetricsFactory metricsFactory) { return new PrefetchRecordsPublisher(recordsFetcherFactory.maxPendingProcessRecordsInput(), - recordsFetcherFactory.maxByteSize(), - recordsFetcherFactory.maxRecordsCount(), - maxRecords, - createGetRecordsRetrievalStrategy(shardInfo, metricsFactory), - executorService, - idleMillisBetweenCalls, - metricsFactory, - "Prefetching", - shardInfo.shardId()); + recordsFetcherFactory.maxByteSize(), recordsFetcherFactory.maxRecordsCount(), maxRecords, + createGetRecordsRetrievalStrategy(shardInfo, metricsFactory), executorService, idleMillisBetweenCalls, + metricsFactory, "Prefetching", shardInfo.shardId()); } } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/common/FutureUtilsTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/common/FutureUtilsTest.java new file mode 100644 index 00000000..c9015131 --- /dev/null +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/common/FutureUtilsTest.java @@ -0,0 +1,55 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package software.amazon.kinesis.common; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +import java.time.Duration; +import java.util.concurrent.Future; +import java.util.concurrent.TimeoutException; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class FutureUtilsTest { + + @Mock + private Future future; + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testTimeoutExceptionCancelsFuture() throws Exception { + expectedException.expect(TimeoutException.class); + + when(future.get(anyLong(), any())).thenThrow(new TimeoutException("Timeout")); + + try { + FutureUtils.resolveOrCancelFuture(future, Duration.ofSeconds(1)); + } finally { + verify(future).cancel(eq(true)); + } + } +} \ No newline at end of file diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/KinesisShardDetectorTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/KinesisShardDetectorTest.java index be25a360..fd1a4a66 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/KinesisShardDetectorTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/KinesisShardDetectorTest.java @@ -9,9 +9,11 @@ package software.amazon.kinesis.leases; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.isA; import static org.hamcrest.CoreMatchers.nullValue; import static org.junit.Assert.assertThat; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -22,11 +24,15 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; @@ -55,8 +61,13 @@ public class KinesisShardDetectorTest { private KinesisShardDetector shardDetector; + @Rule + public ExpectedException expectedExceptionRule = ExpectedException.none(); + @Mock private KinesisAsyncClient client; + @Mock + private CompletableFuture mockFuture; @Before public void setup() { @@ -140,6 +151,19 @@ public class KinesisShardDetectorTest { } } + @Test + public void testListShardsTimesOut() throws Exception { + expectedExceptionRule.expect(RuntimeException.class); + expectedExceptionRule.expectCause(isA(TimeoutException.class)); + + when(mockFuture.get(anyLong(), any(TimeUnit.class))).thenThrow(new TimeoutException("Timeout")); + + when(client.listShards(any(ListShardsRequest.class))).thenReturn(mockFuture); + + shardDetector.listShards(); + + } + @Test public void testGetShard() { final String shardId = String.format(SHARD_ID, 1); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java new file mode 100644 index 00000000..27bb1cd3 --- /dev/null +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java @@ -0,0 +1,268 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package software.amazon.kinesis.leases.dynamodb; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; +import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest; +import software.amazon.awssdk.services.dynamodb.model.CreateTableResponse; +import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest; +import software.amazon.awssdk.services.dynamodb.model.DeleteItemResponse; +import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest; +import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse; +import software.amazon.awssdk.services.dynamodb.model.GetItemRequest; +import software.amazon.awssdk.services.dynamodb.model.GetItemResponse; +import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; +import software.amazon.awssdk.services.dynamodb.model.PutItemResponse; +import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException; +import software.amazon.awssdk.services.dynamodb.model.ScanRequest; +import software.amazon.awssdk.services.dynamodb.model.ScanResponse; +import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest; +import software.amazon.awssdk.services.dynamodb.model.UpdateItemResponse; +import software.amazon.kinesis.leases.Lease; +import software.amazon.kinesis.leases.LeaseSerializer; +import software.amazon.kinesis.leases.exceptions.DependencyException; + +@RunWith(MockitoJUnitRunner.class) +public class DynamoDBLeaseRefresherTest { + + private static final String TABLE_NAME = "test"; + private static final boolean CONSISTENT_READS = true; + + @Mock + private DynamoDbAsyncClient dynamoDbClient; + @Mock + private LeaseSerializer leaseSerializer; + @Mock + private TableCreatorCallback tableCreatorCallback; + @Mock + private CompletableFuture mockScanFuture; + @Mock + private CompletableFuture mockPutItemFuture; + @Mock + private CompletableFuture mockGetItemFuture; + @Mock + private CompletableFuture mockUpdateFuture; + @Mock + private CompletableFuture mockDeleteFuture; + @Mock + private CompletableFuture mockDescribeTableFuture; + @Mock + private CompletableFuture mockCreateTableFuture; + @Mock + private Lease lease; + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + private DynamoDBLeaseRefresher leaseRefresher; + + private Map serializedLease; + + @Before + public void setup() throws Exception { + leaseRefresher = new DynamoDBLeaseRefresher(TABLE_NAME, dynamoDbClient, leaseSerializer, CONSISTENT_READS, + tableCreatorCallback); + serializedLease = new HashMap<>(); + + } + + @Test + public void testListLeasesHandlesTimeout() throws Exception { + TimeoutException te = setRuleForDependencyTimeout(); + + when(mockScanFuture.get(anyLong(), any(TimeUnit.class))).thenThrow(te); + when(dynamoDbClient.scan(any(ScanRequest.class))).thenReturn(mockScanFuture); + + verifyCancel(mockScanFuture, () -> leaseRefresher.listLeases()); + } + + @Test + public void testListLeasesSucceedsThenFails() throws Exception { + TimeoutException te = setRuleForDependencyTimeout(); + + when(dynamoDbClient.scan(any(ScanRequest.class))).thenReturn(mockScanFuture); + + Map lastEvaluatedKey = new HashMap<>(); + lastEvaluatedKey.put("Test", AttributeValue.builder().s("test").build()); + + when(mockScanFuture.get(anyLong(), any(TimeUnit.class))) + .thenReturn(ScanResponse.builder().lastEvaluatedKey(lastEvaluatedKey).build()) + .thenThrow(te); + + verifyCancel(mockScanFuture, () -> leaseRefresher.listLeases()); + + verify(mockScanFuture, times(2)).get(anyLong(), any(TimeUnit.class)); + verify(dynamoDbClient, times(2)).scan(any(ScanRequest.class)); + + } + + @Test + public void testCreateLeaseIfNotExistsTimesOut() throws Exception { + TimeoutException te = setRuleForDependencyTimeout(); + + when(dynamoDbClient.putItem(any(PutItemRequest.class))).thenReturn(mockPutItemFuture); + when(mockPutItemFuture.get(anyLong(), any())).thenThrow(te); + + when(leaseSerializer.toDynamoRecord(any())).thenReturn(serializedLease); + when(leaseSerializer.getDynamoNonexistantExpectation()).thenReturn(Collections.emptyMap()); + + verifyCancel(mockPutItemFuture, () -> leaseRefresher.createLeaseIfNotExists(lease)); + } + + @Test + public void testGetLeaseTimesOut() throws Exception { + TimeoutException te = setRuleForDependencyTimeout(); + + when(dynamoDbClient.getItem(any(GetItemRequest.class))).thenReturn(mockGetItemFuture); + when(mockGetItemFuture.get(anyLong(), any())).thenThrow(te); + + when(leaseSerializer.getDynamoHashKey(anyString())).thenReturn(Collections.emptyMap()); + + verifyCancel(mockGetItemFuture, () -> leaseRefresher.getLease("test")); + } + + @Test + public void testRenewLeaseTimesOut() throws Exception { + setupUpdateItemTest(); + verifyCancel(mockUpdateFuture, () ->leaseRefresher.renewLease(lease)); + } + + @Test + public void testTakeLeaseTimesOut() throws Exception { + setupUpdateItemTest(); + verifyCancel(mockUpdateFuture, () -> leaseRefresher.takeLease(lease, "owner")); + } + + @Test + public void testEvictLeaseTimesOut() throws Exception { + setupUpdateItemTest(); + verifyCancel(mockUpdateFuture, () -> leaseRefresher.evictLease(lease)); + } + + @Test + public void testUpdateLeaseTimesOut() throws Exception { + setupUpdateItemTest(); + verifyCancel(mockUpdateFuture, () -> leaseRefresher.updateLease(lease)); + } + + @Test + public void testDeleteAllLeasesTimesOut() throws Exception { + TimeoutException te = setRuleForDependencyTimeout(); + when(dynamoDbClient.scan(any(ScanRequest.class))).thenReturn(mockScanFuture); + when(mockScanFuture.get(anyLong(), any())).thenReturn(ScanResponse.builder().items(Collections.emptyMap()).build()); + when(leaseSerializer.fromDynamoRecord(any())).thenReturn(lease); + when(leaseSerializer.getDynamoHashKey(any(Lease.class))).thenReturn(Collections.emptyMap()); + + when(dynamoDbClient.deleteItem(any(DeleteItemRequest.class))).thenReturn(mockDeleteFuture); + when(mockDeleteFuture.get(anyLong(), any())).thenThrow(te); + + verifyCancel(mockDeleteFuture, () -> leaseRefresher.deleteAll()); + } + + @Test + public void testDeleteLeaseTimesOut() throws Exception { + TimeoutException te = setRuleForDependencyTimeout(); + when(leaseSerializer.getDynamoHashKey(any(Lease.class))).thenReturn(Collections.emptyMap()); + + when(dynamoDbClient.deleteItem(any(DeleteItemRequest.class))).thenReturn(mockDeleteFuture); + when(mockDeleteFuture.get(anyLong(), any())).thenThrow(te); + + verifyCancel(mockDeleteFuture, () -> leaseRefresher.deleteLease(lease)); + } + + @Test + public void testLeaseTableExistsTimesOut() throws Exception { + TimeoutException te = setRuleForDependencyTimeout(); + + when(dynamoDbClient.describeTable(any(DescribeTableRequest.class))).thenReturn(mockDescribeTableFuture); + when(mockDescribeTableFuture.get(anyLong(), any())).thenThrow(te); + + verifyCancel(mockDescribeTableFuture, () -> leaseRefresher.leaseTableExists()); + } + + @Test + public void testCreateLeaseTableTimesOut() throws Exception { + TimeoutException te = setRuleForDependencyTimeout(); + + when(dynamoDbClient.describeTable(any(DescribeTableRequest.class))).thenReturn(mockDescribeTableFuture); + when(mockDescribeTableFuture.get(anyLong(), any())) + .thenThrow(ResourceNotFoundException.builder().message("Table doesn't exist").build()); + + when(dynamoDbClient.createTable(any(CreateTableRequest.class))).thenReturn(mockCreateTableFuture); + when(mockCreateTableFuture.get(anyLong(), any())).thenThrow(te); + + verifyCancel(mockCreateTableFuture, () -> leaseRefresher.createLeaseTableIfNotExists(10L, 10L)); + } + + @FunctionalInterface + private interface TestCaller { + void call() throws Exception; + } + + private void verifyCancel(Future future, TestCaller toExecute) throws Exception { + try { + toExecute.call(); + } finally { + verify(future).cancel(anyBoolean()); + } + } + + private void setupUpdateItemTest() throws Exception { + TimeoutException te = setRuleForDependencyTimeout(); + + when(leaseSerializer.getDynamoHashKey(any(Lease.class))).thenReturn(Collections.emptyMap()); + when(leaseSerializer.getDynamoLeaseCounterExpectation(any(Lease.class))).thenReturn(Collections.emptyMap()); + when(leaseSerializer.getDynamoLeaseCounterUpdate(any(Lease.class))).thenReturn(Collections.emptyMap()); + when(leaseSerializer.getDynamoTakeLeaseUpdate(any(), anyString())).thenReturn(Collections.emptyMap()); + + when(dynamoDbClient.updateItem(any(UpdateItemRequest.class))).thenReturn(mockUpdateFuture); + when(mockUpdateFuture.get(anyLong(), any())).thenThrow(te); + } + + private TimeoutException setRuleForDependencyTimeout() { + TimeoutException te = new TimeoutException("Timeout"); + expectedException.expect(DependencyException.class); + expectedException.expectCause(equalTo(te)); + + return te; + } + +} \ No newline at end of file diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcherTest.java index 8dfed30c..641d6ba0 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcherTest.java @@ -14,12 +14,14 @@ */ package software.amazon.kinesis.retrieval.polling; +import static org.hamcrest.CoreMatchers.isA; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -35,6 +37,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import org.junit.Before; @@ -66,6 +70,7 @@ import software.amazon.kinesis.metrics.NullMetricsFactory; import software.amazon.kinesis.processor.Checkpointer; import software.amazon.kinesis.retrieval.DataFetcherResult; import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; +import software.amazon.kinesis.retrieval.RetryableRetrievalException; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; /** @@ -76,12 +81,12 @@ public class KinesisDataFetcherTest { private static final int MAX_RECORDS = 1; private static final String STREAM_NAME = "streamName"; private static final String SHARD_ID = "shardId-1"; - private static final InitialPositionInStreamExtended INITIAL_POSITION_LATEST = - InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST); - private static final InitialPositionInStreamExtended INITIAL_POSITION_TRIM_HORIZON = - InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON); - private static final InitialPositionInStreamExtended INITIAL_POSITION_AT_TIMESTAMP = - InitialPositionInStreamExtended.newInitialPositionAtTimestamp(new Date(1000)); + private static final InitialPositionInStreamExtended INITIAL_POSITION_LATEST = InitialPositionInStreamExtended + .newInitialPosition(InitialPositionInStream.LATEST); + private static final InitialPositionInStreamExtended INITIAL_POSITION_TRIM_HORIZON = InitialPositionInStreamExtended + .newInitialPosition(InitialPositionInStream.TRIM_HORIZON); + private static final InitialPositionInStreamExtended INITIAL_POSITION_AT_TIMESTAMP = InitialPositionInStreamExtended + .newInitialPositionAtTimestamp(new Date(1000)); private static final MetricsFactory NULL_METRICS_FACTORY = new NullMetricsFactory(); private KinesisDataFetcher kinesisDataFetcher; @@ -90,13 +95,16 @@ public class KinesisDataFetcherTest { private KinesisAsyncClient kinesisClient; @Mock private CompletableFuture getRecordsResponseFuture; + @Mock + private CompletableFuture getShardIteratorResponseFuture; @Rule public ExpectedException expectedExceptionRule = ExpectedException.none(); @Before public void setup() { - kinesisDataFetcher = new KinesisDataFetcher(kinesisClient, STREAM_NAME, SHARD_ID, MAX_RECORDS, NULL_METRICS_FACTORY); + kinesisDataFetcher = new KinesisDataFetcher(kinesisClient, STREAM_NAME, SHARD_ID, MAX_RECORDS, + NULL_METRICS_FACTORY); } /** @@ -104,8 +112,7 @@ public class KinesisDataFetcherTest { */ @Test public final void testInitializeLatest() throws Exception { - testInitializeAndFetch(ShardIteratorType.LATEST.toString(), - ShardIteratorType.LATEST.toString(), + testInitializeAndFetch(ShardIteratorType.LATEST.toString(), ShardIteratorType.LATEST.toString(), INITIAL_POSITION_LATEST); } @@ -114,8 +121,7 @@ public class KinesisDataFetcherTest { */ @Test public final void testInitializeTimeZero() throws Exception { - testInitializeAndFetch(ShardIteratorType.TRIM_HORIZON.toString(), - ShardIteratorType.TRIM_HORIZON.toString(), + testInitializeAndFetch(ShardIteratorType.TRIM_HORIZON.toString(), ShardIteratorType.TRIM_HORIZON.toString(), INITIAL_POSITION_TRIM_HORIZON); } @@ -124,12 +130,10 @@ public class KinesisDataFetcherTest { */ @Test public final void testInitializeAtTimestamp() throws Exception { - testInitializeAndFetch(ShardIteratorType.AT_TIMESTAMP.toString(), - ShardIteratorType.AT_TIMESTAMP.toString(), + testInitializeAndFetch(ShardIteratorType.AT_TIMESTAMP.toString(), ShardIteratorType.AT_TIMESTAMP.toString(), INITIAL_POSITION_AT_TIMESTAMP); } - /** * Test initialize() when a flushpoint exists. */ @@ -149,8 +153,8 @@ public class KinesisDataFetcherTest { private CompletableFuture makeGetShardIteratorResonse(String shardIterator) throws InterruptedException, ExecutionException { - return CompletableFuture.completedFuture( - GetShardIteratorResponse.builder().shardIterator(shardIterator).build()); + return CompletableFuture + .completedFuture(GetShardIteratorResponse.builder().shardIterator(shardIterator).build()); } @Test @@ -161,12 +165,11 @@ public class KinesisDataFetcherTest { final String seqA = "123"; final String seqB = "456"; - ArgumentCaptor shardIteratorRequestCaptor = - ArgumentCaptor.forClass(GetShardIteratorRequest.class); + ArgumentCaptor shardIteratorRequestCaptor = ArgumentCaptor + .forClass(GetShardIteratorRequest.class); when(kinesisClient.getShardIterator(shardIteratorRequestCaptor.capture())) - .thenReturn(makeGetShardIteratorResonse(iteratorA)) - .thenReturn(makeGetShardIteratorResonse(iteratorA)) + .thenReturn(makeGetShardIteratorResonse(iteratorA)).thenReturn(makeGetShardIteratorResonse(iteratorA)) .thenReturn(makeGetShardIteratorResonse(iteratorB)); when(checkpoint.getCheckpoint(SHARD_ID)).thenReturn(new ExtendedSequenceNumber(seqA)); @@ -198,16 +201,18 @@ public class KinesisDataFetcherTest { @Test public void testadvanceIteratorToTrimHorizonLatestAndAtTimestamp() throws InterruptedException, ExecutionException { - final ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(GetShardIteratorRequest.class); + final ArgumentCaptor requestCaptor = ArgumentCaptor + .forClass(GetShardIteratorRequest.class); final String iteratorHorizon = "TRIM_HORIZON"; final String iteratorLatest = "LATEST"; final String iteratorAtTimestamp = "AT_TIMESTAMP"; - final Map requestsMap = Arrays.stream( - new String[] {iteratorHorizon, iteratorLatest, iteratorAtTimestamp}) + final Map requestsMap = Arrays + .stream(new String[] { iteratorHorizon, iteratorLatest, iteratorAtTimestamp }) .map(this::makeGetShardIteratorRequest) .collect(Collectors.toMap(r -> ShardIteratorType.valueOf(r.shardIteratorTypeAsString()), r -> r)); GetShardIteratorRequest tsReq = requestsMap.get(ShardIteratorType.AT_TIMESTAMP); - requestsMap.put(ShardIteratorType.AT_TIMESTAMP, tsReq.toBuilder().timestamp(INITIAL_POSITION_AT_TIMESTAMP.getTimestamp().toInstant()).build()); + requestsMap.put(ShardIteratorType.AT_TIMESTAMP, + tsReq.toBuilder().timestamp(INITIAL_POSITION_AT_TIMESTAMP.getTimestamp().toInstant()).build()); when(kinesisClient.getShardIterator(requestCaptor.capture())) .thenReturn(makeGetShardIteratorResonse(iteratorHorizon)) @@ -238,14 +243,15 @@ public class KinesisDataFetcherTest { } @Test - public void testGetRecordsWithResourceNotFoundException() throws InterruptedException, ExecutionException { - final ArgumentCaptor iteratorCaptor = - ArgumentCaptor.forClass(GetShardIteratorRequest.class); + public void testGetRecordsWithResourceNotFoundException() throws Exception { + final ArgumentCaptor iteratorCaptor = ArgumentCaptor + .forClass(GetShardIteratorRequest.class); final ArgumentCaptor recordsCaptor = ArgumentCaptor.forClass(GetRecordsRequest.class); // Set up arguments used by proxy final String nextIterator = "TestShardIterator"; - final GetShardIteratorRequest expectedIteratorRequest = makeGetShardIteratorRequest(ShardIteratorType.LATEST.name()); + final GetShardIteratorRequest expectedIteratorRequest = makeGetShardIteratorRequest( + ShardIteratorType.LATEST.name()); final GetRecordsRequest expectedRecordsRequest = makeGetRecordsRequest(nextIterator); final CompletableFuture future = mock(CompletableFuture.class); @@ -254,13 +260,13 @@ public class KinesisDataFetcherTest { when(kinesisClient.getShardIterator(iteratorCaptor.capture())) .thenReturn(makeGetShardIteratorResonse(nextIterator)); when(kinesisClient.getRecords(recordsCaptor.capture())).thenReturn(future); - when(future.get()).thenThrow( + when(future.get(anyLong(), any(TimeUnit.class))).thenThrow( new ExecutionException(ResourceNotFoundException.builder().message("Test Exception").build())); // Create data fectcher and initialize it with latest type checkpoint kinesisDataFetcher.initialize(SentinelCheckpoint.LATEST.toString(), INITIAL_POSITION_LATEST); - final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy = - new SynchronousGetRecordsRetrievalStrategy(kinesisDataFetcher); + final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy = new SynchronousGetRecordsRetrievalStrategy( + kinesisDataFetcher); try { // Call records of dataFetcher which will throw an exception getRecordsRetrievalStrategy.getRecords(MAX_RECORDS); @@ -283,7 +289,7 @@ public class KinesisDataFetcherTest { // Set up proxy mock methods when(kinesisClient.getShardIterator(any(GetShardIteratorRequest.class))).thenReturn(getShardIteratorFuture); when(kinesisClient.getRecords(any(GetRecordsRequest.class))).thenReturn(getRecordsResponseFuture); - when(getRecordsResponseFuture.get()) + when(getRecordsResponseFuture.get(anyLong(), any(TimeUnit.class))) .thenThrow(new ExecutionException(SdkException.builder().message("Test Exception").build())); // Create data fectcher and initialize it with latest type checkpoint @@ -295,14 +301,15 @@ public class KinesisDataFetcherTest { getRecordsRetrievalStrategy.getRecords(MAX_RECORDS); } - + @Test - public void testNonNullGetRecords() throws InterruptedException, ExecutionException { + public void testNonNullGetRecords() throws Exception { final String nextIterator = "TestIterator"; - final ArgumentCaptor iteratorCaptor = - ArgumentCaptor.forClass(GetShardIteratorRequest.class); + final ArgumentCaptor iteratorCaptor = ArgumentCaptor + .forClass(GetShardIteratorRequest.class); final ArgumentCaptor recordsCaptor = ArgumentCaptor.forClass(GetRecordsRequest.class); - final GetShardIteratorRequest expectedIteratorRequest = makeGetShardIteratorRequest(ShardIteratorType.LATEST.name()); + final GetShardIteratorRequest expectedIteratorRequest = makeGetShardIteratorRequest( + ShardIteratorType.LATEST.name()); final GetRecordsRequest expectedRecordsRequest = makeGetRecordsRequest(nextIterator); final CompletableFuture future = mock(CompletableFuture.class); @@ -310,7 +317,7 @@ public class KinesisDataFetcherTest { when(kinesisClient.getShardIterator(iteratorCaptor.capture())) .thenReturn(makeGetShardIteratorResonse(nextIterator)); when(kinesisClient.getRecords(recordsCaptor.capture())).thenReturn(future); - when(future.get()).thenThrow( + when(future.get(anyLong(), any(TimeUnit.class))).thenThrow( new ExecutionException(ResourceNotFoundException.builder().message("Test Exception").build())); kinesisDataFetcher.initialize(SentinelCheckpoint.LATEST.toString(), INITIAL_POSITION_LATEST); @@ -322,23 +329,23 @@ public class KinesisDataFetcherTest { } private CompletableFuture makeGetRecordsResponse(String nextIterator, List records) - throws InterruptedException, ExecutionException{ + throws InterruptedException, ExecutionException { return CompletableFuture.completedFuture(GetRecordsResponse.builder().nextShardIterator(nextIterator) - .records(CollectionUtils.isNullOrEmpty(records) ? Collections.emptyList() : records) - .build()); + .records(CollectionUtils.isNullOrEmpty(records) ? Collections.emptyList() : records).build()); } @Test public void testFetcherDoesNotAdvanceWithoutAccept() throws InterruptedException, ExecutionException { - final ArgumentCaptor iteratorCaptor = - ArgumentCaptor.forClass(GetShardIteratorRequest.class); - final ArgumentCaptor recordsCaptor = ArgumentCaptor.forClass(GetRecordsRequest.class); + final ArgumentCaptor iteratorCaptor = ArgumentCaptor + .forClass(GetShardIteratorRequest.class); + final ArgumentCaptor recordsCaptor = ArgumentCaptor.forClass(GetRecordsRequest.class); final String initialIterator = "InitialIterator"; final String nextIterator1 = "NextIteratorOne"; final String nextIterator2 = "NextIteratorTwo"; final CompletableFuture nonAdvancingResult1 = makeGetRecordsResponse(initialIterator, null); final CompletableFuture nonAdvancingResult2 = makeGetRecordsResponse(nextIterator1, null); - final CompletableFuture finalNonAdvancingResult = makeGetRecordsResponse(nextIterator2, null); + final CompletableFuture finalNonAdvancingResult = makeGetRecordsResponse(nextIterator2, + null); final CompletableFuture advancingResult1 = makeGetRecordsResponse(nextIterator1, null); final CompletableFuture advancingResult2 = makeGetRecordsResponse(nextIterator2, null); final CompletableFuture finalAdvancingResult = makeGetRecordsResponse(null, null); @@ -360,8 +367,6 @@ public class KinesisDataFetcherTest { assertNoAdvance(finalNonAdvancingResult.get(), nextIterator2); assertAdvanced(finalAdvancingResult.get(), nextIterator2, null); - - verify(kinesisClient, times(2)).getRecords(eq(makeGetRecordsRequest(initialIterator))); verify(kinesisClient, times(2)).getRecords(eq(makeGetRecordsRequest(nextIterator1))); verify(kinesisClient, times(2)).getRecords(eq(makeGetRecordsRequest(nextIterator2))); @@ -380,12 +385,13 @@ public class KinesisDataFetcherTest { verify(kinesisClient, never()).getRecords(any(GetRecordsRequest.class)); } - + @Test @Ignore - public void testRestartIterator() throws InterruptedException, ExecutionException { + public void testRestartIterator() throws Exception { GetRecordsResponse getRecordsResult = mock(GetRecordsResponse.class); - GetRecordsResponse restartGetRecordsResponse = makeGetRecordsResponse(null, null).get(); + GetRecordsResponse restartGetRecordsResponse = makeGetRecordsResponse(null, null).get(anyLong(), + any(TimeUnit.class)); Record record = mock(Record.class); final String nextShardIterator = "NextShardIterator"; final String sequenceNumber = "SequenceNumber"; @@ -400,13 +406,53 @@ public class KinesisDataFetcherTest { kinesisDataFetcher.restartIterator(); assertEquals(restartGetRecordsResponse, kinesisDataFetcher.getRecords().accept()); } - - @Test (expected = IllegalStateException.class) + + @Test(expected = IllegalStateException.class) public void testRestartIteratorNotInitialized() { kinesisDataFetcher.restartIterator(); } - private DataFetcherResult assertAdvanced(GetRecordsResponse expectedResult, String previousValue, String nextValue) { + @Test + public void testTimeoutExceptionIsRetryableForGetShardIterator() throws Exception { + expectedExceptionRule.expect(RetryableRetrievalException.class); + expectedExceptionRule.expectCause(isA(TimeoutException.class)); + expectedExceptionRule.expectMessage("Timeout"); + + // Set up proxy mock methods + when(kinesisClient.getShardIterator(any(GetShardIteratorRequest.class))) + .thenReturn(getShardIteratorResponseFuture); + when(getShardIteratorResponseFuture.get(anyLong(), any(TimeUnit.class))) + .thenThrow(new TimeoutException("Timeout")); + + // Create data fectcher and initialize it with latest type checkpoint + kinesisDataFetcher.initialize(SentinelCheckpoint.LATEST.toString(), INITIAL_POSITION_LATEST); + } + + @Test + public void testTimeoutExceptionIsRetryableForGetRecords() throws Exception { + expectedExceptionRule.expect(RetryableRetrievalException.class); + expectedExceptionRule.expectCause(isA(TimeoutException.class)); + expectedExceptionRule.expectMessage("Timeout"); + + CompletableFuture getShardIteratorFuture = CompletableFuture + .completedFuture(GetShardIteratorResponse.builder().shardIterator("test").build()); + + // Set up proxy mock methods + when(kinesisClient.getShardIterator(any(GetShardIteratorRequest.class))).thenReturn(getShardIteratorFuture); + when(kinesisClient.getRecords(any(GetRecordsRequest.class))).thenReturn(getRecordsResponseFuture); + when(getRecordsResponseFuture.get(anyLong(), any(TimeUnit.class))).thenThrow(new TimeoutException("Timeout")); + + // Create data fectcher and initialize it with latest type checkpoint + kinesisDataFetcher.initialize(SentinelCheckpoint.LATEST.toString(), INITIAL_POSITION_LATEST); + final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy = new SynchronousGetRecordsRetrievalStrategy( + kinesisDataFetcher); + + // Call records of dataFetcher which will throw an exception + getRecordsRetrievalStrategy.getRecords(MAX_RECORDS); + } + + private DataFetcherResult assertAdvanced(GetRecordsResponse expectedResult, String previousValue, + String nextValue) { DataFetcherResult acceptResult = kinesisDataFetcher.getRecords(); assertEquals(expectedResult, acceptResult.getResult()); @@ -436,18 +482,17 @@ public class KinesisDataFetcherTest { return noAcceptResult; } - private void testInitializeAndFetch(final String iteratorType, - final String seqNo, - final InitialPositionInStreamExtended initialPositionInStream) throws Exception { - final ArgumentCaptor iteratorCaptor = - ArgumentCaptor.forClass(GetShardIteratorRequest.class); + private void testInitializeAndFetch(final String iteratorType, final String seqNo, + final InitialPositionInStreamExtended initialPositionInStream) throws Exception { + final ArgumentCaptor iteratorCaptor = ArgumentCaptor + .forClass(GetShardIteratorRequest.class); final ArgumentCaptor recordsCaptor = ArgumentCaptor.forClass(GetRecordsRequest.class); final String iterator = "foo"; final List expectedRecords = Collections.emptyList(); - GetShardIteratorRequest expectedIteratorRequest = - makeGetShardIteratorRequest(iteratorType); + GetShardIteratorRequest expectedIteratorRequest = makeGetShardIteratorRequest(iteratorType); if (iteratorType.equals(ShardIteratorType.AT_TIMESTAMP.toString())) { - expectedIteratorRequest = expectedIteratorRequest.toBuilder().timestamp(initialPositionInStream.getTimestamp().toInstant()).build(); + expectedIteratorRequest = expectedIteratorRequest.toBuilder() + .timestamp(initialPositionInStream.getTimestamp().toInstant()).build(); } else if (iteratorType.equals(ShardIteratorType.AT_SEQUENCE_NUMBER.toString())) { expectedIteratorRequest = expectedIteratorRequest.toBuilder().startingSequenceNumber(seqNo).build(); } @@ -462,8 +507,8 @@ public class KinesisDataFetcherTest { Checkpointer checkpoint = mock(Checkpointer.class); when(checkpoint.getCheckpoint(SHARD_ID)).thenReturn(new ExtendedSequenceNumber(seqNo)); - final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy = - new SynchronousGetRecordsRetrievalStrategy(kinesisDataFetcher); + final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy = new SynchronousGetRecordsRetrievalStrategy( + kinesisDataFetcher); kinesisDataFetcher.initialize(seqNo, initialPositionInStream); assertEquals(expectedRecords, getRecordsRetrievalStrategy.getRecords(MAX_RECORDS).records()); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherIntegrationTest.java index 1526cb22..3e86b068 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherIntegrationTest.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -34,6 +35,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import org.junit.After; import org.junit.Before; @@ -93,7 +95,7 @@ public class PrefetchRecordsPublisherIntegrationTest { private InitialPositionInStreamExtended initialPosition; @Before - public void setup() throws InterruptedException, ExecutionException { + public void setup() throws Exception { records = new ArrayList<>(); dataFetcher = spy(new KinesisDataFetcherForTest(kinesisClient, streamName, shardId, MAX_RECORDS_PER_CALL)); getRecordsRetrievalStrategy = Mockito.spy(new SynchronousGetRecordsRetrievalStrategy(dataFetcher)); @@ -101,7 +103,7 @@ public class PrefetchRecordsPublisherIntegrationTest { CompletableFuture future = mock(CompletableFuture.class); when(extendedSequenceNumber.sequenceNumber()).thenReturn("LATEST"); - when(future.get()).thenReturn(GetShardIteratorResponse.builder().shardIterator("TestIterator").build()); + when(future.get(anyLong(), any(TimeUnit.class))).thenReturn(GetShardIteratorResponse.builder().shardIterator("TestIterator").build()); when(kinesisClient.getShardIterator(any(GetShardIteratorRequest.class))).thenReturn(future); getRecordsCache = new PrefetchRecordsPublisher(MAX_SIZE, diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java index 94373fe0..fe9aa663 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java @@ -38,11 +38,13 @@ import static software.amazon.kinesis.utils.ProcessRecordsInputMatcher.eqProcess import java.time.Duration; import java.time.Instant; import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -74,6 +76,7 @@ import software.amazon.kinesis.metrics.NullMetricsFactory; import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; import software.amazon.kinesis.retrieval.KinesisClientRecord; import software.amazon.kinesis.retrieval.RecordsRetrieved; +import software.amazon.kinesis.retrieval.RetryableRetrievalException; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; /** @@ -254,6 +257,18 @@ public class PrefetchRecordsPublisherTest { verify(dataFetcher).restartIterator(); } + @Test + public void testRetryableRetrievalExceptionContinues() { + + GetRecordsResponse response = GetRecordsResponse.builder().millisBehindLatest(100L).records(Collections.emptyList()).build(); + when(getRecordsRetrievalStrategy.getRecords(anyInt())).thenThrow(new RetryableRetrievalException("Timeout", new TimeoutException("Timeout"))).thenReturn(response); + + getRecordsCache.start(sequenceNumber, initialPosition); + + RecordsRetrieved records = getRecordsCache.getNextResult(); + assertThat(records.processRecordsInput().millisBehindLatest(), equalTo(response.millisBehindLatest())); + } + @Test(timeout = 1000L) public void testNoDeadlockOnFullQueue() { // diff --git a/pom.xml b/pom.xml index df1571b0..741f9d92 100644 --- a/pom.xml +++ b/pom.xml @@ -20,7 +20,7 @@ amazon-kinesis-client-pom pom Amazon Kinesis Client Library - 2.1.2 + 2.1.3-SNAPSHOT The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data from Amazon Kinesis. @@ -31,7 +31,7 @@ - 2.4.0 + 2.5.10