diff --git a/amazon-kinesis-client-multilang/pom.xml b/amazon-kinesis-client-multilang/pom.xml index a6737e0a..bbec0402 100644 --- a/amazon-kinesis-client-multilang/pom.xml +++ b/amazon-kinesis-client-multilang/pom.xml @@ -19,7 +19,7 @@ amazon-kinesis-client-pom software.amazon.kinesis - 2.1.2 + 2.1.3-SNAPSHOT 4.0.0 diff --git a/amazon-kinesis-client/pom.xml b/amazon-kinesis-client/pom.xml index 45126ea0..16b7008f 100644 --- a/amazon-kinesis-client/pom.xml +++ b/amazon-kinesis-client/pom.xml @@ -20,7 +20,7 @@ software.amazon.kinesis amazon-kinesis-client-pom - 2.1.2 + 2.1.3-SNAPSHOT amazon-kinesis-client diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/FutureUtils.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/FutureUtils.java new file mode 100644 index 00000000..6e0abc99 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/FutureUtils.java @@ -0,0 +1,35 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package software.amazon.kinesis.common; + +import java.time.Duration; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class FutureUtils { + + public static T resolveOrCancelFuture(Future future, Duration timeout) + throws ExecutionException, InterruptedException, TimeoutException { + try { + return future.get(timeout.toMillis(), TimeUnit.MILLISECONDS); + } catch (TimeoutException te) { + future.cancel(true); + throw te; + } + } + +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java index b5645d9d..a4c1e55c 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java @@ -1,16 +1,16 @@ /* - * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at * - * http://aws.amazon.com/asl/ + * http://aws.amazon.com/asl/ * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. */ package software.amazon.kinesis.leases; @@ -22,6 +22,8 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.stream.Collectors; @@ -31,7 +33,6 @@ import org.apache.commons.lang3.StringUtils; import lombok.AccessLevel; import lombok.Getter; import lombok.NonNull; -import lombok.RequiredArgsConstructor; import lombok.Synchronized; import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; @@ -44,17 +45,18 @@ import software.amazon.awssdk.services.kinesis.model.ResourceInUseException; import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.kinesis.annotations.KinesisClientInternalApi; +import software.amazon.kinesis.common.FutureUtils; import software.amazon.kinesis.common.KinesisRequestsBuilder; import software.amazon.kinesis.retrieval.AWSExceptionManager; /** * */ -@RequiredArgsConstructor @Slf4j @Accessors(fluent = true) @KinesisClientInternalApi public class KinesisShardDetector implements ShardDetector { + @NonNull private final KinesisAsyncClient kinesisClient; @NonNull @@ -64,12 +66,35 @@ public class KinesisShardDetector implements ShardDetector { private final long listShardsCacheAllowedAgeInSeconds; private final int maxCacheMissesBeforeReload; private final int cacheMissWarningModulus; + private final Duration kinesisRequestTimeout; private volatile Map cachedShardMap = null; private volatile Instant lastCacheUpdateTime; @Getter(AccessLevel.PACKAGE) private AtomicInteger cacheMisses = new AtomicInteger(0); + @Deprecated + public KinesisShardDetector(KinesisAsyncClient kinesisClient, String streamName, long listShardsBackoffTimeInMillis, + int maxListShardsRetryAttempts, long listShardsCacheAllowedAgeInSeconds, int maxCacheMissesBeforeReload, + int cacheMissWarningModulus) { + this(kinesisClient, streamName, listShardsBackoffTimeInMillis, maxListShardsRetryAttempts, + listShardsCacheAllowedAgeInSeconds, maxCacheMissesBeforeReload, cacheMissWarningModulus, + LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT); + } + + public KinesisShardDetector(KinesisAsyncClient kinesisClient, String streamName, long listShardsBackoffTimeInMillis, + int maxListShardsRetryAttempts, long listShardsCacheAllowedAgeInSeconds, int maxCacheMissesBeforeReload, + int cacheMissWarningModulus, Duration kinesisRequestTimeout) { + this.kinesisClient = kinesisClient; + this.streamName = streamName; + this.listShardsBackoffTimeInMillis = listShardsBackoffTimeInMillis; + this.maxListShardsRetryAttempts = maxListShardsRetryAttempts; + this.listShardsCacheAllowedAgeInSeconds = listShardsCacheAllowedAgeInSeconds; + this.maxCacheMissesBeforeReload = maxCacheMissesBeforeReload; + this.cacheMissWarningModulus = cacheMissWarningModulus; + this.kinesisRequestTimeout = kinesisRequestTimeout; + } + @Override public Shard shard(@NonNull final String shardId) { if (CollectionUtils.isNullOrEmpty(this.cachedShardMap)) { @@ -132,10 +157,10 @@ public class KinesisShardDetector implements ShardDetector { result = listShards(nextToken); if (result == null) { - /* - * If listShards ever returns null, we should bail and return null. This indicates the stream is not - * in ACTIVE or UPDATING state and we may not have accurate/consistent information about the stream. - */ + /* + * If listShards ever returns null, we should bail and return null. This indicates the stream is not + * in ACTIVE or UPDATING state and we may not have accurate/consistent information about the stream. + */ return null; } else { shards.addAll(result.shards()); @@ -167,7 +192,7 @@ public class KinesisShardDetector implements ShardDetector { try { try { - result = kinesisClient.listShards(request.build()).get(); + result = FutureUtils.resolveOrCancelFuture(kinesisClient.listShards(request.build()), kinesisRequestTimeout); } catch (ExecutionException e) { throw exceptionManager.apply(e.getCause()); } catch (InterruptedException e) { @@ -188,6 +213,8 @@ public class KinesisShardDetector implements ShardDetector { log.debug("Stream {} : Sleep was interrupted ", streamName, ie); } lastException = e; + } catch (TimeoutException te) { + throw new RuntimeException(te); } remainingRetries--; if (remainingRetries <= 0 && result == null) { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java index 5c98bae6..6aa461fd 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java @@ -15,6 +15,7 @@ package software.amazon.kinesis.leases; +import java.time.Duration; import java.util.concurrent.ExecutorService; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; @@ -41,6 +42,9 @@ import software.amazon.kinesis.metrics.NullMetricsFactory; @Data @Accessors(fluent = true) public class LeaseManagementConfig { + + public static final Duration DEFAULT_REQUEST_TIMEOUT = Duration.ofMinutes(1); + /** * Name of the table to use in DynamoDB * @@ -159,6 +163,8 @@ public class LeaseManagementConfig { public long epsilonMillis = 25L; + private Duration dynamoDbRequestTimeout = DEFAULT_REQUEST_TIMEOUT; + /** * The initial position for getting records from Kinesis streams. * @@ -261,7 +267,7 @@ public class LeaseManagementConfig { initialLeaseTableReadCapacity(), initialLeaseTableWriteCapacity(), hierarchicalShardSyncer(), - tableCreatorCallback()); + tableCreatorCallback(), dynamoDbRequestTimeout()); } return leaseManagementFactory; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java index 1ec3e0b3..8ec305c4 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java @@ -15,6 +15,7 @@ package software.amazon.kinesis.leases.dynamodb; +import java.time.Duration; import java.util.concurrent.ExecutorService; import lombok.Data; @@ -26,6 +27,7 @@ import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.leases.HierarchicalShardSyncer; import software.amazon.kinesis.leases.KinesisShardDetector; import software.amazon.kinesis.leases.LeaseCoordinator; +import software.amazon.kinesis.leases.LeaseManagementConfig; import software.amazon.kinesis.leases.LeaseManagementFactory; import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardSyncTaskManager; @@ -37,6 +39,7 @@ import software.amazon.kinesis.metrics.MetricsFactory; @Data @KinesisClientInternalApi public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { + @NonNull private final KinesisAsyncClient kinesisClient; @NonNull @@ -71,6 +74,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { private final long initialLeaseTableReadCapacity; private final long initialLeaseTableWriteCapacity; private final TableCreatorCallback tableCreatorCallback; + private final Duration dynamoDbRequestTimeout; /** * Constructor. @@ -166,7 +170,8 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis, maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds, cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity, - new HierarchicalShardSyncer(), TableCreatorCallback.NOOP_TABLE_CREATOR_CALLBACK); + new HierarchicalShardSyncer(), TableCreatorCallback.NOOP_TABLE_CREATOR_CALLBACK, + LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT); } /** @@ -198,6 +203,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { * @param hierarchicalShardSyncer * @param tableCreatorCallback */ + @Deprecated public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final String streamName, final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier, final ExecutorService executorService, final InitialPositionInStreamExtended initialPositionInStream, @@ -208,8 +214,58 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload, final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus, final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity, - final HierarchicalShardSyncer hierarchicalShardSyncer, - final TableCreatorCallback tableCreatorCallback) { + final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback) { + this(kinesisClient, streamName, dynamoDBClient, tableName, workerIdentifier, executorService, + initialPositionInStream, failoverTimeMillis, epsilonMillis, maxLeasesForWorker, + maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion, + ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis, + maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds, + cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity, + hierarchicalShardSyncer, tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT); + } + + /** + * Constructor. + * + * @param kinesisClient + * @param streamName + * @param dynamoDBClient + * @param tableName + * @param workerIdentifier + * @param executorService + * @param initialPositionInStream + * @param failoverTimeMillis + * @param epsilonMillis + * @param maxLeasesForWorker + * @param maxLeasesToStealAtOneTime + * @param maxLeaseRenewalThreads + * @param cleanupLeasesUponShardCompletion + * @param ignoreUnexpectedChildShards + * @param shardSyncIntervalMillis + * @param consistentReads + * @param listShardsBackoffTimeMillis + * @param maxListShardsRetryAttempts + * @param maxCacheMissesBeforeReload + * @param listShardsCacheAllowedAgeInSeconds + * @param cacheMissWarningModulus + * @param initialLeaseTableReadCapacity + * @param initialLeaseTableWriteCapacity + * @param hierarchicalShardSyncer + * @param tableCreatorCallback + * @param dynamoDbRequestTimeout + */ + public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final String streamName, + final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier, + final ExecutorService executorService, final InitialPositionInStreamExtended initialPositionInStream, + final long failoverTimeMillis, final long epsilonMillis, final int maxLeasesForWorker, + final int maxLeasesToStealAtOneTime, final int maxLeaseRenewalThreads, + final boolean cleanupLeasesUponShardCompletion, final boolean ignoreUnexpectedChildShards, + final long shardSyncIntervalMillis, final boolean consistentReads, final long listShardsBackoffTimeMillis, + final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload, + final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus, + final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity, + final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback, + Duration dynamoDbRequestTimeout) { this.kinesisClient = kinesisClient; this.streamName = streamName; this.dynamoDBClient = dynamoDBClient; @@ -235,6 +291,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { this.initialLeaseTableWriteCapacity = initialLeaseTableWriteCapacity; this.hierarchicalShardSyncer = hierarchicalShardSyncer; this.tableCreatorCallback = tableCreatorCallback; + this.dynamoDbRequestTimeout = dynamoDbRequestTimeout; } @Override @@ -267,13 +324,13 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { @Override public DynamoDBLeaseRefresher createLeaseRefresher() { return new DynamoDBLeaseRefresher(tableName, dynamoDBClient, new DynamoDBLeaseSerializer(), consistentReads, - tableCreatorCallback); + tableCreatorCallback, dynamoDbRequestTimeout); } @Override public ShardDetector createShardDetector() { return new KinesisShardDetector(kinesisClient, streamName, listShardsBackoffTimeMillis, maxListShardsRetryAttempts, listShardsCacheAllowedAgeInSeconds, maxCacheMissesBeforeReload, - cacheMissWarningModulus); + cacheMissWarningModulus, dynamoDbRequestTimeout); } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java index 79a12fc3..c6d53f90 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java @@ -14,11 +14,13 @@ */ package software.amazon.kinesis.leases.dynamodb; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; @@ -45,7 +47,9 @@ import software.amazon.awssdk.services.dynamodb.model.TableStatus; import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest; import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.kinesis.annotations.KinesisClientInternalApi; +import software.amazon.kinesis.common.FutureUtils; import software.amazon.kinesis.leases.Lease; +import software.amazon.kinesis.leases.LeaseManagementConfig; import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.LeaseSerializer; import software.amazon.kinesis.leases.exceptions.DependencyException; @@ -60,12 +64,15 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; @Slf4j @KinesisClientInternalApi public class DynamoDBLeaseRefresher implements LeaseRefresher { + protected final String table; protected final DynamoDbAsyncClient dynamoDBClient; protected final LeaseSerializer serializer; protected final boolean consistentReads; private final TableCreatorCallback tableCreatorCallback; + private final Duration dynamoDbRequestTimeout; + private boolean newTableCreated = false; /** @@ -95,14 +102,31 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { * @param consistentReads * @param tableCreatorCallback */ + @Deprecated public DynamoDBLeaseRefresher(final String table, final DynamoDbAsyncClient dynamoDBClient, - final LeaseSerializer serializer, final boolean consistentReads, - @NonNull final TableCreatorCallback tableCreatorCallback) { + final LeaseSerializer serializer, final boolean consistentReads, + @NonNull final TableCreatorCallback tableCreatorCallback) { + this(table, dynamoDBClient, serializer, consistentReads, tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT); + } + + /** + * Constructor. + * @param table + * @param dynamoDBClient + * @param serializer + * @param consistentReads + * @param tableCreatorCallback + * @param dynamoDbRequestTimeout + */ + public DynamoDBLeaseRefresher(final String table, final DynamoDbAsyncClient dynamoDBClient, + final LeaseSerializer serializer, final boolean consistentReads, + @NonNull final TableCreatorCallback tableCreatorCallback, Duration dynamoDbRequestTimeout) { this.table = table; this.dynamoDBClient = dynamoDBClient; this.serializer = serializer; this.consistentReads = consistentReads; this.tableCreatorCallback = tableCreatorCallback; + this.dynamoDbRequestTimeout = dynamoDbRequestTimeout; } /** @@ -132,7 +156,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { try { try { - dynamoDBClient.createTable(request).get(); + FutureUtils.resolveOrCancelFuture(dynamoDBClient.createTable(request), dynamoDbRequestTimeout); newTableCreated = true; } catch (ExecutionException e) { throw exceptionManager.apply(e.getCause()); @@ -144,7 +168,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { return newTableCreated; } catch (LimitExceededException e) { throw new ProvisionedThroughputException("Capacity exceeded when creating table " + table, e); - } catch (DynamoDbException e) { + } catch (DynamoDbException | TimeoutException e) { throw new DependencyException(e); } return newTableCreated; @@ -167,7 +191,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { DescribeTableResponse result; try { try { - result = dynamoDBClient.describeTable(request).get(); + result = FutureUtils.resolveOrCancelFuture(dynamoDBClient.describeTable(request), dynamoDbRequestTimeout); } catch (ExecutionException e) { throw exceptionManager.apply(e.getCause()); } catch (InterruptedException e) { @@ -177,7 +201,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { } catch (ResourceNotFoundException e) { log.debug("Got ResourceNotFoundException for table {} in leaseTableExists, returning false.", table); return null; - } catch (DynamoDbException e) { + } catch (DynamoDbException | TimeoutException e) { throw new DependencyException(e); } @@ -269,7 +293,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { try { try { - ScanResponse scanResult = dynamoDBClient.scan(scanRequest).get(); + ScanResponse scanResult = FutureUtils.resolveOrCancelFuture(dynamoDBClient.scan(scanRequest), dynamoDbRequestTimeout); List result = new ArrayList<>(); while (scanResult != null) { @@ -287,7 +311,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { // Make another request, picking up where we left off. scanRequest = scanRequest.toBuilder().exclusiveStartKey(lastEvaluatedKey).build(); log.debug("lastEvaluatedKey was {}, continuing scan.", lastEvaluatedKey); - scanResult = dynamoDBClient.scan(scanRequest).get(); + scanResult = FutureUtils.resolveOrCancelFuture(dynamoDBClient.scan(scanRequest), dynamoDbRequestTimeout); } } log.debug("Listed {} leases from table {}", result.size(), table); @@ -302,7 +326,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { throw new InvalidStateException("Cannot scan lease table " + table + " because it does not exist.", e); } catch (ProvisionedThroughputExceededException e) { throw new ProvisionedThroughputException(e); - } catch (DynamoDbException e) { + } catch (DynamoDbException | TimeoutException e) { throw new DependencyException(e); } } @@ -323,7 +347,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { try { try { - dynamoDBClient.putItem(request).get(); + FutureUtils.resolveOrCancelFuture(dynamoDBClient.putItem(request), dynamoDbRequestTimeout); } catch (ExecutionException e) { throw exceptionManager.apply(e.getCause()); } catch (InterruptedException e) { @@ -333,7 +357,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { } catch (ConditionalCheckFailedException e) { log.debug("Did not create lease {} because it already existed", lease); return false; - } catch (DynamoDbException e) { + } catch (DynamoDbException | TimeoutException e) { throw convertAndRethrowExceptions("create", lease.leaseKey(), e); } return true; @@ -352,7 +376,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { final AWSExceptionManager exceptionManager = createExceptionManager(); try { try { - GetItemResponse result = dynamoDBClient.getItem(request).get(); + GetItemResponse result = FutureUtils.resolveOrCancelFuture(dynamoDBClient.getItem(request), dynamoDbRequestTimeout); Map dynamoRecord = result.item(); if (CollectionUtils.isNullOrEmpty(dynamoRecord)) { @@ -369,7 +393,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { // TODO: check behavior throw new DependencyException(e); } - } catch (DynamoDbException e) { + } catch (DynamoDbException | TimeoutException e) { throw convertAndRethrowExceptions("get", leaseKey, e); } } @@ -391,7 +415,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { try { try { - dynamoDBClient.updateItem(request).get(); + FutureUtils.resolveOrCancelFuture(dynamoDBClient.updateItem(request), dynamoDbRequestTimeout); } catch (ExecutionException e) { throw exceptionManager.apply(e.getCause()); } catch (InterruptedException e) { @@ -414,7 +438,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { } log.info("Detected spurious renewal failure for lease with key {}, but recovered", lease.leaseKey()); - } catch (DynamoDbException e) { + } catch (DynamoDbException | TimeoutException e) { throw new DependencyException(e); } @@ -444,7 +468,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { try { try { - dynamoDBClient.updateItem(request).get(); + FutureUtils.resolveOrCancelFuture(dynamoDBClient.updateItem(request), dynamoDbRequestTimeout); } catch (ExecutionException e) { throw exceptionManager.apply(e.getCause()); } catch (InterruptedException e) { @@ -455,7 +479,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { log.debug("Lease renewal failed for lease with key {} because the lease counter was not {}", lease.leaseKey(), lease.leaseCounter()); return false; - } catch (DynamoDbException e) { + } catch (DynamoDbException | TimeoutException e) { throw convertAndRethrowExceptions("take", lease.leaseKey(), e); } @@ -487,7 +511,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { try { try { - dynamoDBClient.updateItem(request).get(); + FutureUtils.resolveOrCancelFuture(dynamoDBClient.updateItem(request), dynamoDbRequestTimeout); } catch (ExecutionException e) { throw exceptionManager.apply(e.getCause()); } catch (InterruptedException e) { @@ -498,7 +522,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { log.debug("Lease eviction failed for lease with key {} because the lease owner was not {}", lease.leaseKey(), lease.leaseOwner()); return false; - } catch (DynamoDbException e) { + } catch (DynamoDbException | TimeoutException e) { throw convertAndRethrowExceptions("evict", lease.leaseKey(), e); } @@ -522,14 +546,14 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { try { try { - dynamoDBClient.deleteItem(deleteRequest).get(); + FutureUtils.resolveOrCancelFuture(dynamoDBClient.deleteItem(deleteRequest), dynamoDbRequestTimeout); } catch (ExecutionException e) { throw exceptionManager.apply(e.getCause()); } catch (InterruptedException e) { // TODO: check the behavior throw new DependencyException(e); } - } catch (DynamoDbException e) { + } catch (DynamoDbException | TimeoutException e) { throw convertAndRethrowExceptions("deleteAll", lease.leaseKey(), e); } } @@ -549,14 +573,14 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { final AWSExceptionManager exceptionManager = createExceptionManager(); try { try { - dynamoDBClient.deleteItem(deleteRequest).get(); + FutureUtils.resolveOrCancelFuture(dynamoDBClient.deleteItem(deleteRequest), dynamoDbRequestTimeout); } catch (ExecutionException e) { throw exceptionManager.apply(e.getCause()); } catch (InterruptedException e) { // TODO: Check if this is the correct behavior throw new DependencyException(e); } - } catch (DynamoDbException e) { + } catch (DynamoDbException | TimeoutException e) { throw convertAndRethrowExceptions("delete", lease.leaseKey(), e); } } @@ -580,7 +604,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { try { try { - dynamoDBClient.updateItem(request).get(); + FutureUtils.resolveOrCancelFuture(dynamoDBClient.updateItem(request), dynamoDbRequestTimeout); } catch (ExecutionException e) { throw exceptionManager.apply(e.getCause()); } catch (InterruptedException e) { @@ -590,7 +614,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { log.debug("Lease update failed for lease with key {} because the lease counter was not {}", lease.leaseKey(), lease.leaseCounter()); return false; - } catch (DynamoDbException e) { + } catch (DynamoDbException | TimeoutException e) { throw convertAndRethrowExceptions("update", lease.leaseKey(), e); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java index 129fd15b..459c4155 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java @@ -14,8 +14,10 @@ */ package software.amazon.kinesis.retrieval.polling; +import java.time.Duration; import java.util.Collections; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; import org.apache.commons.lang3.StringUtils; @@ -25,7 +27,6 @@ import lombok.AccessLevel; import lombok.Data; import lombok.Getter; import lombok.NonNull; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; @@ -35,6 +36,8 @@ import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse; import software.amazon.awssdk.services.kinesis.model.KinesisException; import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; +import software.amazon.kinesis.annotations.KinesisClientInternalApi; +import software.amazon.kinesis.common.FutureUtils; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.KinesisRequestsBuilder; import software.amazon.kinesis.metrics.MetricsFactory; @@ -44,14 +47,16 @@ import software.amazon.kinesis.metrics.MetricsUtil; import software.amazon.kinesis.retrieval.AWSExceptionManager; import software.amazon.kinesis.retrieval.DataFetcherResult; import software.amazon.kinesis.retrieval.IteratorBuilder; +import software.amazon.kinesis.retrieval.RetryableRetrievalException; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; /** * Used to get data from Amazon Kinesis. Tracks iterator state internally. */ -@RequiredArgsConstructor @Slf4j +@KinesisClientInternalApi public class KinesisDataFetcher { + private static final String METRICS_PREFIX = "KinesisDataFetcher"; private static final String OPERATION = "ProcessTask"; @@ -64,6 +69,21 @@ public class KinesisDataFetcher { private final int maxRecords; @NonNull private final MetricsFactory metricsFactory; + private final Duration maxFutureWait; + + @Deprecated + public KinesisDataFetcher(KinesisAsyncClient kinesisClient, String streamName, String shardId, int maxRecords, MetricsFactory metricsFactory) { + this(kinesisClient, streamName, shardId, maxRecords, metricsFactory, PollingConfig.DEFAULT_REQUEST_TIMEOUT); + } + + public KinesisDataFetcher(KinesisAsyncClient kinesisClient, String streamName, String shardId, int maxRecords, MetricsFactory metricsFactory, Duration maxFutureWait) { + this.kinesisClient = kinesisClient; + this.streamName = streamName; + this.shardId = shardId; + this.maxRecords = maxRecords; + this.metricsFactory = metricsFactory; + this.maxFutureWait = maxFutureWait; + } /** Note: This method has package level access for testing purposes. * @return nextIterator @@ -191,7 +211,8 @@ public class KinesisDataFetcher { try { try { - final GetShardIteratorResponse result = kinesisClient.getShardIterator(request).get(); + final GetShardIteratorResponse result = FutureUtils + .resolveOrCancelFuture(kinesisClient.getShardIterator(request), maxFutureWait); nextIterator = result.shardIterator(); success = true; } catch (ExecutionException e) { @@ -199,6 +220,8 @@ public class KinesisDataFetcher { } catch (InterruptedException e) { // TODO: Check behavior throw new RuntimeException(e); + } catch (TimeoutException e) { + throw new RetryableRetrievalException(e.getMessage(), e); } } catch (ResourceNotFoundException e) { log.info("Caught ResourceNotFoundException when getting an iterator for shard {}", shardId, e); @@ -244,7 +267,8 @@ public class KinesisDataFetcher { boolean success = false; long startTime = System.currentTimeMillis(); try { - final GetRecordsResponse response = kinesisClient.getRecords(request).get(); + final GetRecordsResponse response = FutureUtils.resolveOrCancelFuture(kinesisClient.getRecords(request), + maxFutureWait); success = true; return response; } catch (ExecutionException e) { @@ -253,6 +277,8 @@ public class KinesisDataFetcher { // TODO: Check behavior log.debug("Interrupt called on metod, shutdown initiated"); throw new RuntimeException(e); + } catch (TimeoutException e) { + throw new RetryableRetrievalException(e.getMessage(), e); } finally { MetricsUtil.addSuccessAndLatency(metricsScope, String.format("%s.%s", METRICS_PREFIX, "getRecords"), success, startTime, MetricsLevel.DETAILED); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PollingConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PollingConfig.java index e9b4e6a2..666cb3a9 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PollingConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PollingConfig.java @@ -15,6 +15,7 @@ package software.amazon.kinesis.retrieval.polling; +import java.time.Duration; import java.util.Optional; import lombok.Data; @@ -32,6 +33,8 @@ import software.amazon.kinesis.retrieval.RetrievalSpecificConfig; @Getter public class PollingConfig implements RetrievalSpecificConfig { + public static final Duration DEFAULT_REQUEST_TIMEOUT = Duration.ofSeconds(30); + /** * Name of the Kinesis stream. * @@ -94,9 +97,14 @@ public class PollingConfig implements RetrievalSpecificConfig { */ private RecordsFetcherFactory recordsFetcherFactory = new SimpleRecordsFetcherFactory(); + /** + * The maximum time to wait for a future request from Kinesis to complete + */ + private Duration kinesisRequestTimeout = DEFAULT_REQUEST_TIMEOUT; + @Override public RetrievalFactory retrievalFactory() { return new SynchronousBlockingRetrievalFactory(streamName(), kinesisClient(), recordsFetcherFactory, - maxRecords()); + maxRecords(), kinesisRequestTimeout); } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java index a441f721..3acd3169 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java @@ -51,6 +51,7 @@ import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; import software.amazon.kinesis.retrieval.KinesisClientRecord; import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RecordsRetrieved; +import software.amazon.kinesis.retrieval.RetryableRetrievalException; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; /** @@ -317,6 +318,8 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { drainQueueForRequests(); } catch (PositionResetException pse) { throw pse; + } catch (RetryableRetrievalException rre) { + log.info("Timeout occurred while waiting for response from Kinesis. Will retry the request."); } catch (InterruptedException e) { log.info("Thread was interrupted, indicating shutdown was called on the cache."); } catch (ExpiredIteratorException e) { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousBlockingRetrievalFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousBlockingRetrievalFactory.java index 1e827fc9..7c55d4e2 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousBlockingRetrievalFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousBlockingRetrievalFactory.java @@ -19,12 +19,15 @@ import software.amazon.kinesis.retrieval.RecordsFetcherFactory; import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RetrievalFactory; +import java.time.Duration; + /** * */ @Data @KinesisClientInternalApi public class SynchronousBlockingRetrievalFactory implements RetrievalFactory { + @NonNull private final String streamName; @NonNull @@ -34,12 +37,26 @@ public class SynchronousBlockingRetrievalFactory implements RetrievalFactory { // private final long listShardsBackoffTimeInMillis; // private final int maxListShardsRetryAttempts; private final int maxRecords; + private final Duration kinesisRequestTimeout; + + public SynchronousBlockingRetrievalFactory(String streamName, KinesisAsyncClient kinesisClient, RecordsFetcherFactory recordsFetcherFactory, int maxRecords, Duration kinesisRequestTimeout) { + this.streamName = streamName; + this.kinesisClient = kinesisClient; + this.recordsFetcherFactory = recordsFetcherFactory; + this.maxRecords = maxRecords; + this.kinesisRequestTimeout = kinesisRequestTimeout; + } + + @Deprecated + public SynchronousBlockingRetrievalFactory(String streamName, KinesisAsyncClient kinesisClient, RecordsFetcherFactory recordsFetcherFactory, int maxRecords) { + this(streamName, kinesisClient, recordsFetcherFactory, maxRecords, PollingConfig.DEFAULT_REQUEST_TIMEOUT); + } @Override public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(@NonNull final ShardInfo shardInfo, @NonNull final MetricsFactory metricsFactory) { return new SynchronousGetRecordsRetrievalStrategy( - new KinesisDataFetcher(kinesisClient, streamName, shardInfo.shardId(), maxRecords, metricsFactory)); + new KinesisDataFetcher(kinesisClient, streamName, shardInfo.shardId(), maxRecords, metricsFactory, kinesisRequestTimeout)); } @Override diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousPrefetchingRetrievalFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousPrefetchingRetrievalFactory.java index be69618d..9ec97c5b 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousPrefetchingRetrievalFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousPrefetchingRetrievalFactory.java @@ -1,24 +1,24 @@ /* - * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at * - * http://aws.amazon.com/asl/ + * http://aws.amazon.com/asl/ * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. */ package software.amazon.kinesis.retrieval.polling; +import java.time.Duration; import java.util.concurrent.ExecutorService; import lombok.NonNull; -import lombok.RequiredArgsConstructor; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.leases.ShardInfo; @@ -31,7 +31,6 @@ import software.amazon.kinesis.retrieval.RetrievalFactory; /** * */ -@RequiredArgsConstructor @KinesisClientInternalApi public class SynchronousPrefetchingRetrievalFactory implements RetrievalFactory { @NonNull @@ -44,26 +43,41 @@ public class SynchronousPrefetchingRetrievalFactory implements RetrievalFactory @NonNull private final ExecutorService executorService; private final long idleMillisBetweenCalls; + private final Duration maxFutureWait; + + @Deprecated + public SynchronousPrefetchingRetrievalFactory(String streamName, KinesisAsyncClient kinesisClient, + RecordsFetcherFactory recordsFetcherFactory, int maxRecords, ExecutorService executorService, + long idleMillisBetweenCalls) { + this(streamName, kinesisClient, recordsFetcherFactory, maxRecords, executorService, idleMillisBetweenCalls, + PollingConfig.DEFAULT_REQUEST_TIMEOUT); + } + + public SynchronousPrefetchingRetrievalFactory(String streamName, KinesisAsyncClient kinesisClient, + RecordsFetcherFactory recordsFetcherFactory, int maxRecords, ExecutorService executorService, + long idleMillisBetweenCalls, Duration maxFutureWait) { + this.streamName = streamName; + this.kinesisClient = kinesisClient; + this.recordsFetcherFactory = recordsFetcherFactory; + this.maxRecords = maxRecords; + this.executorService = executorService; + this.idleMillisBetweenCalls = idleMillisBetweenCalls; + this.maxFutureWait = maxFutureWait; + } @Override public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(@NonNull final ShardInfo shardInfo, @NonNull final MetricsFactory metricsFactory) { - return new SynchronousGetRecordsRetrievalStrategy( - new KinesisDataFetcher(kinesisClient, streamName, shardInfo.shardId(), maxRecords, metricsFactory)); + return new SynchronousGetRecordsRetrievalStrategy(new KinesisDataFetcher(kinesisClient, streamName, + shardInfo.shardId(), maxRecords, metricsFactory, maxFutureWait)); } @Override public RecordsPublisher createGetRecordsCache(@NonNull final ShardInfo shardInfo, @NonNull final MetricsFactory metricsFactory) { return new PrefetchRecordsPublisher(recordsFetcherFactory.maxPendingProcessRecordsInput(), - recordsFetcherFactory.maxByteSize(), - recordsFetcherFactory.maxRecordsCount(), - maxRecords, - createGetRecordsRetrievalStrategy(shardInfo, metricsFactory), - executorService, - idleMillisBetweenCalls, - metricsFactory, - "Prefetching", - shardInfo.shardId()); + recordsFetcherFactory.maxByteSize(), recordsFetcherFactory.maxRecordsCount(), maxRecords, + createGetRecordsRetrievalStrategy(shardInfo, metricsFactory), executorService, idleMillisBetweenCalls, + metricsFactory, "Prefetching", shardInfo.shardId()); } } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/common/FutureUtilsTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/common/FutureUtilsTest.java new file mode 100644 index 00000000..c9015131 --- /dev/null +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/common/FutureUtilsTest.java @@ -0,0 +1,55 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package software.amazon.kinesis.common; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +import java.time.Duration; +import java.util.concurrent.Future; +import java.util.concurrent.TimeoutException; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class FutureUtilsTest { + + @Mock + private Future future; + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testTimeoutExceptionCancelsFuture() throws Exception { + expectedException.expect(TimeoutException.class); + + when(future.get(anyLong(), any())).thenThrow(new TimeoutException("Timeout")); + + try { + FutureUtils.resolveOrCancelFuture(future, Duration.ofSeconds(1)); + } finally { + verify(future).cancel(eq(true)); + } + } +} \ No newline at end of file diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/KinesisShardDetectorTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/KinesisShardDetectorTest.java index be25a360..fd1a4a66 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/KinesisShardDetectorTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/KinesisShardDetectorTest.java @@ -9,9 +9,11 @@ package software.amazon.kinesis.leases; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.isA; import static org.hamcrest.CoreMatchers.nullValue; import static org.junit.Assert.assertThat; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -22,11 +24,15 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; @@ -55,8 +61,13 @@ public class KinesisShardDetectorTest { private KinesisShardDetector shardDetector; + @Rule + public ExpectedException expectedExceptionRule = ExpectedException.none(); + @Mock private KinesisAsyncClient client; + @Mock + private CompletableFuture mockFuture; @Before public void setup() { @@ -140,6 +151,19 @@ public class KinesisShardDetectorTest { } } + @Test + public void testListShardsTimesOut() throws Exception { + expectedExceptionRule.expect(RuntimeException.class); + expectedExceptionRule.expectCause(isA(TimeoutException.class)); + + when(mockFuture.get(anyLong(), any(TimeUnit.class))).thenThrow(new TimeoutException("Timeout")); + + when(client.listShards(any(ListShardsRequest.class))).thenReturn(mockFuture); + + shardDetector.listShards(); + + } + @Test public void testGetShard() { final String shardId = String.format(SHARD_ID, 1); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java new file mode 100644 index 00000000..27bb1cd3 --- /dev/null +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java @@ -0,0 +1,268 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package software.amazon.kinesis.leases.dynamodb; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; +import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest; +import software.amazon.awssdk.services.dynamodb.model.CreateTableResponse; +import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest; +import software.amazon.awssdk.services.dynamodb.model.DeleteItemResponse; +import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest; +import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse; +import software.amazon.awssdk.services.dynamodb.model.GetItemRequest; +import software.amazon.awssdk.services.dynamodb.model.GetItemResponse; +import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; +import software.amazon.awssdk.services.dynamodb.model.PutItemResponse; +import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException; +import software.amazon.awssdk.services.dynamodb.model.ScanRequest; +import software.amazon.awssdk.services.dynamodb.model.ScanResponse; +import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest; +import software.amazon.awssdk.services.dynamodb.model.UpdateItemResponse; +import software.amazon.kinesis.leases.Lease; +import software.amazon.kinesis.leases.LeaseSerializer; +import software.amazon.kinesis.leases.exceptions.DependencyException; + +@RunWith(MockitoJUnitRunner.class) +public class DynamoDBLeaseRefresherTest { + + private static final String TABLE_NAME = "test"; + private static final boolean CONSISTENT_READS = true; + + @Mock + private DynamoDbAsyncClient dynamoDbClient; + @Mock + private LeaseSerializer leaseSerializer; + @Mock + private TableCreatorCallback tableCreatorCallback; + @Mock + private CompletableFuture mockScanFuture; + @Mock + private CompletableFuture mockPutItemFuture; + @Mock + private CompletableFuture mockGetItemFuture; + @Mock + private CompletableFuture mockUpdateFuture; + @Mock + private CompletableFuture mockDeleteFuture; + @Mock + private CompletableFuture mockDescribeTableFuture; + @Mock + private CompletableFuture mockCreateTableFuture; + @Mock + private Lease lease; + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + private DynamoDBLeaseRefresher leaseRefresher; + + private Map serializedLease; + + @Before + public void setup() throws Exception { + leaseRefresher = new DynamoDBLeaseRefresher(TABLE_NAME, dynamoDbClient, leaseSerializer, CONSISTENT_READS, + tableCreatorCallback); + serializedLease = new HashMap<>(); + + } + + @Test + public void testListLeasesHandlesTimeout() throws Exception { + TimeoutException te = setRuleForDependencyTimeout(); + + when(mockScanFuture.get(anyLong(), any(TimeUnit.class))).thenThrow(te); + when(dynamoDbClient.scan(any(ScanRequest.class))).thenReturn(mockScanFuture); + + verifyCancel(mockScanFuture, () -> leaseRefresher.listLeases()); + } + + @Test + public void testListLeasesSucceedsThenFails() throws Exception { + TimeoutException te = setRuleForDependencyTimeout(); + + when(dynamoDbClient.scan(any(ScanRequest.class))).thenReturn(mockScanFuture); + + Map lastEvaluatedKey = new HashMap<>(); + lastEvaluatedKey.put("Test", AttributeValue.builder().s("test").build()); + + when(mockScanFuture.get(anyLong(), any(TimeUnit.class))) + .thenReturn(ScanResponse.builder().lastEvaluatedKey(lastEvaluatedKey).build()) + .thenThrow(te); + + verifyCancel(mockScanFuture, () -> leaseRefresher.listLeases()); + + verify(mockScanFuture, times(2)).get(anyLong(), any(TimeUnit.class)); + verify(dynamoDbClient, times(2)).scan(any(ScanRequest.class)); + + } + + @Test + public void testCreateLeaseIfNotExistsTimesOut() throws Exception { + TimeoutException te = setRuleForDependencyTimeout(); + + when(dynamoDbClient.putItem(any(PutItemRequest.class))).thenReturn(mockPutItemFuture); + when(mockPutItemFuture.get(anyLong(), any())).thenThrow(te); + + when(leaseSerializer.toDynamoRecord(any())).thenReturn(serializedLease); + when(leaseSerializer.getDynamoNonexistantExpectation()).thenReturn(Collections.emptyMap()); + + verifyCancel(mockPutItemFuture, () -> leaseRefresher.createLeaseIfNotExists(lease)); + } + + @Test + public void testGetLeaseTimesOut() throws Exception { + TimeoutException te = setRuleForDependencyTimeout(); + + when(dynamoDbClient.getItem(any(GetItemRequest.class))).thenReturn(mockGetItemFuture); + when(mockGetItemFuture.get(anyLong(), any())).thenThrow(te); + + when(leaseSerializer.getDynamoHashKey(anyString())).thenReturn(Collections.emptyMap()); + + verifyCancel(mockGetItemFuture, () -> leaseRefresher.getLease("test")); + } + + @Test + public void testRenewLeaseTimesOut() throws Exception { + setupUpdateItemTest(); + verifyCancel(mockUpdateFuture, () ->leaseRefresher.renewLease(lease)); + } + + @Test + public void testTakeLeaseTimesOut() throws Exception { + setupUpdateItemTest(); + verifyCancel(mockUpdateFuture, () -> leaseRefresher.takeLease(lease, "owner")); + } + + @Test + public void testEvictLeaseTimesOut() throws Exception { + setupUpdateItemTest(); + verifyCancel(mockUpdateFuture, () -> leaseRefresher.evictLease(lease)); + } + + @Test + public void testUpdateLeaseTimesOut() throws Exception { + setupUpdateItemTest(); + verifyCancel(mockUpdateFuture, () -> leaseRefresher.updateLease(lease)); + } + + @Test + public void testDeleteAllLeasesTimesOut() throws Exception { + TimeoutException te = setRuleForDependencyTimeout(); + when(dynamoDbClient.scan(any(ScanRequest.class))).thenReturn(mockScanFuture); + when(mockScanFuture.get(anyLong(), any())).thenReturn(ScanResponse.builder().items(Collections.emptyMap()).build()); + when(leaseSerializer.fromDynamoRecord(any())).thenReturn(lease); + when(leaseSerializer.getDynamoHashKey(any(Lease.class))).thenReturn(Collections.emptyMap()); + + when(dynamoDbClient.deleteItem(any(DeleteItemRequest.class))).thenReturn(mockDeleteFuture); + when(mockDeleteFuture.get(anyLong(), any())).thenThrow(te); + + verifyCancel(mockDeleteFuture, () -> leaseRefresher.deleteAll()); + } + + @Test + public void testDeleteLeaseTimesOut() throws Exception { + TimeoutException te = setRuleForDependencyTimeout(); + when(leaseSerializer.getDynamoHashKey(any(Lease.class))).thenReturn(Collections.emptyMap()); + + when(dynamoDbClient.deleteItem(any(DeleteItemRequest.class))).thenReturn(mockDeleteFuture); + when(mockDeleteFuture.get(anyLong(), any())).thenThrow(te); + + verifyCancel(mockDeleteFuture, () -> leaseRefresher.deleteLease(lease)); + } + + @Test + public void testLeaseTableExistsTimesOut() throws Exception { + TimeoutException te = setRuleForDependencyTimeout(); + + when(dynamoDbClient.describeTable(any(DescribeTableRequest.class))).thenReturn(mockDescribeTableFuture); + when(mockDescribeTableFuture.get(anyLong(), any())).thenThrow(te); + + verifyCancel(mockDescribeTableFuture, () -> leaseRefresher.leaseTableExists()); + } + + @Test + public void testCreateLeaseTableTimesOut() throws Exception { + TimeoutException te = setRuleForDependencyTimeout(); + + when(dynamoDbClient.describeTable(any(DescribeTableRequest.class))).thenReturn(mockDescribeTableFuture); + when(mockDescribeTableFuture.get(anyLong(), any())) + .thenThrow(ResourceNotFoundException.builder().message("Table doesn't exist").build()); + + when(dynamoDbClient.createTable(any(CreateTableRequest.class))).thenReturn(mockCreateTableFuture); + when(mockCreateTableFuture.get(anyLong(), any())).thenThrow(te); + + verifyCancel(mockCreateTableFuture, () -> leaseRefresher.createLeaseTableIfNotExists(10L, 10L)); + } + + @FunctionalInterface + private interface TestCaller { + void call() throws Exception; + } + + private void verifyCancel(Future future, TestCaller toExecute) throws Exception { + try { + toExecute.call(); + } finally { + verify(future).cancel(anyBoolean()); + } + } + + private void setupUpdateItemTest() throws Exception { + TimeoutException te = setRuleForDependencyTimeout(); + + when(leaseSerializer.getDynamoHashKey(any(Lease.class))).thenReturn(Collections.emptyMap()); + when(leaseSerializer.getDynamoLeaseCounterExpectation(any(Lease.class))).thenReturn(Collections.emptyMap()); + when(leaseSerializer.getDynamoLeaseCounterUpdate(any(Lease.class))).thenReturn(Collections.emptyMap()); + when(leaseSerializer.getDynamoTakeLeaseUpdate(any(), anyString())).thenReturn(Collections.emptyMap()); + + when(dynamoDbClient.updateItem(any(UpdateItemRequest.class))).thenReturn(mockUpdateFuture); + when(mockUpdateFuture.get(anyLong(), any())).thenThrow(te); + } + + private TimeoutException setRuleForDependencyTimeout() { + TimeoutException te = new TimeoutException("Timeout"); + expectedException.expect(DependencyException.class); + expectedException.expectCause(equalTo(te)); + + return te; + } + +} \ No newline at end of file diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcherTest.java index 8dfed30c..641d6ba0 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcherTest.java @@ -14,12 +14,14 @@ */ package software.amazon.kinesis.retrieval.polling; +import static org.hamcrest.CoreMatchers.isA; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -35,6 +37,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import org.junit.Before; @@ -66,6 +70,7 @@ import software.amazon.kinesis.metrics.NullMetricsFactory; import software.amazon.kinesis.processor.Checkpointer; import software.amazon.kinesis.retrieval.DataFetcherResult; import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; +import software.amazon.kinesis.retrieval.RetryableRetrievalException; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; /** @@ -76,12 +81,12 @@ public class KinesisDataFetcherTest { private static final int MAX_RECORDS = 1; private static final String STREAM_NAME = "streamName"; private static final String SHARD_ID = "shardId-1"; - private static final InitialPositionInStreamExtended INITIAL_POSITION_LATEST = - InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST); - private static final InitialPositionInStreamExtended INITIAL_POSITION_TRIM_HORIZON = - InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON); - private static final InitialPositionInStreamExtended INITIAL_POSITION_AT_TIMESTAMP = - InitialPositionInStreamExtended.newInitialPositionAtTimestamp(new Date(1000)); + private static final InitialPositionInStreamExtended INITIAL_POSITION_LATEST = InitialPositionInStreamExtended + .newInitialPosition(InitialPositionInStream.LATEST); + private static final InitialPositionInStreamExtended INITIAL_POSITION_TRIM_HORIZON = InitialPositionInStreamExtended + .newInitialPosition(InitialPositionInStream.TRIM_HORIZON); + private static final InitialPositionInStreamExtended INITIAL_POSITION_AT_TIMESTAMP = InitialPositionInStreamExtended + .newInitialPositionAtTimestamp(new Date(1000)); private static final MetricsFactory NULL_METRICS_FACTORY = new NullMetricsFactory(); private KinesisDataFetcher kinesisDataFetcher; @@ -90,13 +95,16 @@ public class KinesisDataFetcherTest { private KinesisAsyncClient kinesisClient; @Mock private CompletableFuture getRecordsResponseFuture; + @Mock + private CompletableFuture getShardIteratorResponseFuture; @Rule public ExpectedException expectedExceptionRule = ExpectedException.none(); @Before public void setup() { - kinesisDataFetcher = new KinesisDataFetcher(kinesisClient, STREAM_NAME, SHARD_ID, MAX_RECORDS, NULL_METRICS_FACTORY); + kinesisDataFetcher = new KinesisDataFetcher(kinesisClient, STREAM_NAME, SHARD_ID, MAX_RECORDS, + NULL_METRICS_FACTORY); } /** @@ -104,8 +112,7 @@ public class KinesisDataFetcherTest { */ @Test public final void testInitializeLatest() throws Exception { - testInitializeAndFetch(ShardIteratorType.LATEST.toString(), - ShardIteratorType.LATEST.toString(), + testInitializeAndFetch(ShardIteratorType.LATEST.toString(), ShardIteratorType.LATEST.toString(), INITIAL_POSITION_LATEST); } @@ -114,8 +121,7 @@ public class KinesisDataFetcherTest { */ @Test public final void testInitializeTimeZero() throws Exception { - testInitializeAndFetch(ShardIteratorType.TRIM_HORIZON.toString(), - ShardIteratorType.TRIM_HORIZON.toString(), + testInitializeAndFetch(ShardIteratorType.TRIM_HORIZON.toString(), ShardIteratorType.TRIM_HORIZON.toString(), INITIAL_POSITION_TRIM_HORIZON); } @@ -124,12 +130,10 @@ public class KinesisDataFetcherTest { */ @Test public final void testInitializeAtTimestamp() throws Exception { - testInitializeAndFetch(ShardIteratorType.AT_TIMESTAMP.toString(), - ShardIteratorType.AT_TIMESTAMP.toString(), + testInitializeAndFetch(ShardIteratorType.AT_TIMESTAMP.toString(), ShardIteratorType.AT_TIMESTAMP.toString(), INITIAL_POSITION_AT_TIMESTAMP); } - /** * Test initialize() when a flushpoint exists. */ @@ -149,8 +153,8 @@ public class KinesisDataFetcherTest { private CompletableFuture makeGetShardIteratorResonse(String shardIterator) throws InterruptedException, ExecutionException { - return CompletableFuture.completedFuture( - GetShardIteratorResponse.builder().shardIterator(shardIterator).build()); + return CompletableFuture + .completedFuture(GetShardIteratorResponse.builder().shardIterator(shardIterator).build()); } @Test @@ -161,12 +165,11 @@ public class KinesisDataFetcherTest { final String seqA = "123"; final String seqB = "456"; - ArgumentCaptor shardIteratorRequestCaptor = - ArgumentCaptor.forClass(GetShardIteratorRequest.class); + ArgumentCaptor shardIteratorRequestCaptor = ArgumentCaptor + .forClass(GetShardIteratorRequest.class); when(kinesisClient.getShardIterator(shardIteratorRequestCaptor.capture())) - .thenReturn(makeGetShardIteratorResonse(iteratorA)) - .thenReturn(makeGetShardIteratorResonse(iteratorA)) + .thenReturn(makeGetShardIteratorResonse(iteratorA)).thenReturn(makeGetShardIteratorResonse(iteratorA)) .thenReturn(makeGetShardIteratorResonse(iteratorB)); when(checkpoint.getCheckpoint(SHARD_ID)).thenReturn(new ExtendedSequenceNumber(seqA)); @@ -198,16 +201,18 @@ public class KinesisDataFetcherTest { @Test public void testadvanceIteratorToTrimHorizonLatestAndAtTimestamp() throws InterruptedException, ExecutionException { - final ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(GetShardIteratorRequest.class); + final ArgumentCaptor requestCaptor = ArgumentCaptor + .forClass(GetShardIteratorRequest.class); final String iteratorHorizon = "TRIM_HORIZON"; final String iteratorLatest = "LATEST"; final String iteratorAtTimestamp = "AT_TIMESTAMP"; - final Map requestsMap = Arrays.stream( - new String[] {iteratorHorizon, iteratorLatest, iteratorAtTimestamp}) + final Map requestsMap = Arrays + .stream(new String[] { iteratorHorizon, iteratorLatest, iteratorAtTimestamp }) .map(this::makeGetShardIteratorRequest) .collect(Collectors.toMap(r -> ShardIteratorType.valueOf(r.shardIteratorTypeAsString()), r -> r)); GetShardIteratorRequest tsReq = requestsMap.get(ShardIteratorType.AT_TIMESTAMP); - requestsMap.put(ShardIteratorType.AT_TIMESTAMP, tsReq.toBuilder().timestamp(INITIAL_POSITION_AT_TIMESTAMP.getTimestamp().toInstant()).build()); + requestsMap.put(ShardIteratorType.AT_TIMESTAMP, + tsReq.toBuilder().timestamp(INITIAL_POSITION_AT_TIMESTAMP.getTimestamp().toInstant()).build()); when(kinesisClient.getShardIterator(requestCaptor.capture())) .thenReturn(makeGetShardIteratorResonse(iteratorHorizon)) @@ -238,14 +243,15 @@ public class KinesisDataFetcherTest { } @Test - public void testGetRecordsWithResourceNotFoundException() throws InterruptedException, ExecutionException { - final ArgumentCaptor iteratorCaptor = - ArgumentCaptor.forClass(GetShardIteratorRequest.class); + public void testGetRecordsWithResourceNotFoundException() throws Exception { + final ArgumentCaptor iteratorCaptor = ArgumentCaptor + .forClass(GetShardIteratorRequest.class); final ArgumentCaptor recordsCaptor = ArgumentCaptor.forClass(GetRecordsRequest.class); // Set up arguments used by proxy final String nextIterator = "TestShardIterator"; - final GetShardIteratorRequest expectedIteratorRequest = makeGetShardIteratorRequest(ShardIteratorType.LATEST.name()); + final GetShardIteratorRequest expectedIteratorRequest = makeGetShardIteratorRequest( + ShardIteratorType.LATEST.name()); final GetRecordsRequest expectedRecordsRequest = makeGetRecordsRequest(nextIterator); final CompletableFuture future = mock(CompletableFuture.class); @@ -254,13 +260,13 @@ public class KinesisDataFetcherTest { when(kinesisClient.getShardIterator(iteratorCaptor.capture())) .thenReturn(makeGetShardIteratorResonse(nextIterator)); when(kinesisClient.getRecords(recordsCaptor.capture())).thenReturn(future); - when(future.get()).thenThrow( + when(future.get(anyLong(), any(TimeUnit.class))).thenThrow( new ExecutionException(ResourceNotFoundException.builder().message("Test Exception").build())); // Create data fectcher and initialize it with latest type checkpoint kinesisDataFetcher.initialize(SentinelCheckpoint.LATEST.toString(), INITIAL_POSITION_LATEST); - final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy = - new SynchronousGetRecordsRetrievalStrategy(kinesisDataFetcher); + final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy = new SynchronousGetRecordsRetrievalStrategy( + kinesisDataFetcher); try { // Call records of dataFetcher which will throw an exception getRecordsRetrievalStrategy.getRecords(MAX_RECORDS); @@ -283,7 +289,7 @@ public class KinesisDataFetcherTest { // Set up proxy mock methods when(kinesisClient.getShardIterator(any(GetShardIteratorRequest.class))).thenReturn(getShardIteratorFuture); when(kinesisClient.getRecords(any(GetRecordsRequest.class))).thenReturn(getRecordsResponseFuture); - when(getRecordsResponseFuture.get()) + when(getRecordsResponseFuture.get(anyLong(), any(TimeUnit.class))) .thenThrow(new ExecutionException(SdkException.builder().message("Test Exception").build())); // Create data fectcher and initialize it with latest type checkpoint @@ -295,14 +301,15 @@ public class KinesisDataFetcherTest { getRecordsRetrievalStrategy.getRecords(MAX_RECORDS); } - + @Test - public void testNonNullGetRecords() throws InterruptedException, ExecutionException { + public void testNonNullGetRecords() throws Exception { final String nextIterator = "TestIterator"; - final ArgumentCaptor iteratorCaptor = - ArgumentCaptor.forClass(GetShardIteratorRequest.class); + final ArgumentCaptor iteratorCaptor = ArgumentCaptor + .forClass(GetShardIteratorRequest.class); final ArgumentCaptor recordsCaptor = ArgumentCaptor.forClass(GetRecordsRequest.class); - final GetShardIteratorRequest expectedIteratorRequest = makeGetShardIteratorRequest(ShardIteratorType.LATEST.name()); + final GetShardIteratorRequest expectedIteratorRequest = makeGetShardIteratorRequest( + ShardIteratorType.LATEST.name()); final GetRecordsRequest expectedRecordsRequest = makeGetRecordsRequest(nextIterator); final CompletableFuture future = mock(CompletableFuture.class); @@ -310,7 +317,7 @@ public class KinesisDataFetcherTest { when(kinesisClient.getShardIterator(iteratorCaptor.capture())) .thenReturn(makeGetShardIteratorResonse(nextIterator)); when(kinesisClient.getRecords(recordsCaptor.capture())).thenReturn(future); - when(future.get()).thenThrow( + when(future.get(anyLong(), any(TimeUnit.class))).thenThrow( new ExecutionException(ResourceNotFoundException.builder().message("Test Exception").build())); kinesisDataFetcher.initialize(SentinelCheckpoint.LATEST.toString(), INITIAL_POSITION_LATEST); @@ -322,23 +329,23 @@ public class KinesisDataFetcherTest { } private CompletableFuture makeGetRecordsResponse(String nextIterator, List records) - throws InterruptedException, ExecutionException{ + throws InterruptedException, ExecutionException { return CompletableFuture.completedFuture(GetRecordsResponse.builder().nextShardIterator(nextIterator) - .records(CollectionUtils.isNullOrEmpty(records) ? Collections.emptyList() : records) - .build()); + .records(CollectionUtils.isNullOrEmpty(records) ? Collections.emptyList() : records).build()); } @Test public void testFetcherDoesNotAdvanceWithoutAccept() throws InterruptedException, ExecutionException { - final ArgumentCaptor iteratorCaptor = - ArgumentCaptor.forClass(GetShardIteratorRequest.class); - final ArgumentCaptor recordsCaptor = ArgumentCaptor.forClass(GetRecordsRequest.class); + final ArgumentCaptor iteratorCaptor = ArgumentCaptor + .forClass(GetShardIteratorRequest.class); + final ArgumentCaptor recordsCaptor = ArgumentCaptor.forClass(GetRecordsRequest.class); final String initialIterator = "InitialIterator"; final String nextIterator1 = "NextIteratorOne"; final String nextIterator2 = "NextIteratorTwo"; final CompletableFuture nonAdvancingResult1 = makeGetRecordsResponse(initialIterator, null); final CompletableFuture nonAdvancingResult2 = makeGetRecordsResponse(nextIterator1, null); - final CompletableFuture finalNonAdvancingResult = makeGetRecordsResponse(nextIterator2, null); + final CompletableFuture finalNonAdvancingResult = makeGetRecordsResponse(nextIterator2, + null); final CompletableFuture advancingResult1 = makeGetRecordsResponse(nextIterator1, null); final CompletableFuture advancingResult2 = makeGetRecordsResponse(nextIterator2, null); final CompletableFuture finalAdvancingResult = makeGetRecordsResponse(null, null); @@ -360,8 +367,6 @@ public class KinesisDataFetcherTest { assertNoAdvance(finalNonAdvancingResult.get(), nextIterator2); assertAdvanced(finalAdvancingResult.get(), nextIterator2, null); - - verify(kinesisClient, times(2)).getRecords(eq(makeGetRecordsRequest(initialIterator))); verify(kinesisClient, times(2)).getRecords(eq(makeGetRecordsRequest(nextIterator1))); verify(kinesisClient, times(2)).getRecords(eq(makeGetRecordsRequest(nextIterator2))); @@ -380,12 +385,13 @@ public class KinesisDataFetcherTest { verify(kinesisClient, never()).getRecords(any(GetRecordsRequest.class)); } - + @Test @Ignore - public void testRestartIterator() throws InterruptedException, ExecutionException { + public void testRestartIterator() throws Exception { GetRecordsResponse getRecordsResult = mock(GetRecordsResponse.class); - GetRecordsResponse restartGetRecordsResponse = makeGetRecordsResponse(null, null).get(); + GetRecordsResponse restartGetRecordsResponse = makeGetRecordsResponse(null, null).get(anyLong(), + any(TimeUnit.class)); Record record = mock(Record.class); final String nextShardIterator = "NextShardIterator"; final String sequenceNumber = "SequenceNumber"; @@ -400,13 +406,53 @@ public class KinesisDataFetcherTest { kinesisDataFetcher.restartIterator(); assertEquals(restartGetRecordsResponse, kinesisDataFetcher.getRecords().accept()); } - - @Test (expected = IllegalStateException.class) + + @Test(expected = IllegalStateException.class) public void testRestartIteratorNotInitialized() { kinesisDataFetcher.restartIterator(); } - private DataFetcherResult assertAdvanced(GetRecordsResponse expectedResult, String previousValue, String nextValue) { + @Test + public void testTimeoutExceptionIsRetryableForGetShardIterator() throws Exception { + expectedExceptionRule.expect(RetryableRetrievalException.class); + expectedExceptionRule.expectCause(isA(TimeoutException.class)); + expectedExceptionRule.expectMessage("Timeout"); + + // Set up proxy mock methods + when(kinesisClient.getShardIterator(any(GetShardIteratorRequest.class))) + .thenReturn(getShardIteratorResponseFuture); + when(getShardIteratorResponseFuture.get(anyLong(), any(TimeUnit.class))) + .thenThrow(new TimeoutException("Timeout")); + + // Create data fectcher and initialize it with latest type checkpoint + kinesisDataFetcher.initialize(SentinelCheckpoint.LATEST.toString(), INITIAL_POSITION_LATEST); + } + + @Test + public void testTimeoutExceptionIsRetryableForGetRecords() throws Exception { + expectedExceptionRule.expect(RetryableRetrievalException.class); + expectedExceptionRule.expectCause(isA(TimeoutException.class)); + expectedExceptionRule.expectMessage("Timeout"); + + CompletableFuture getShardIteratorFuture = CompletableFuture + .completedFuture(GetShardIteratorResponse.builder().shardIterator("test").build()); + + // Set up proxy mock methods + when(kinesisClient.getShardIterator(any(GetShardIteratorRequest.class))).thenReturn(getShardIteratorFuture); + when(kinesisClient.getRecords(any(GetRecordsRequest.class))).thenReturn(getRecordsResponseFuture); + when(getRecordsResponseFuture.get(anyLong(), any(TimeUnit.class))).thenThrow(new TimeoutException("Timeout")); + + // Create data fectcher and initialize it with latest type checkpoint + kinesisDataFetcher.initialize(SentinelCheckpoint.LATEST.toString(), INITIAL_POSITION_LATEST); + final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy = new SynchronousGetRecordsRetrievalStrategy( + kinesisDataFetcher); + + // Call records of dataFetcher which will throw an exception + getRecordsRetrievalStrategy.getRecords(MAX_RECORDS); + } + + private DataFetcherResult assertAdvanced(GetRecordsResponse expectedResult, String previousValue, + String nextValue) { DataFetcherResult acceptResult = kinesisDataFetcher.getRecords(); assertEquals(expectedResult, acceptResult.getResult()); @@ -436,18 +482,17 @@ public class KinesisDataFetcherTest { return noAcceptResult; } - private void testInitializeAndFetch(final String iteratorType, - final String seqNo, - final InitialPositionInStreamExtended initialPositionInStream) throws Exception { - final ArgumentCaptor iteratorCaptor = - ArgumentCaptor.forClass(GetShardIteratorRequest.class); + private void testInitializeAndFetch(final String iteratorType, final String seqNo, + final InitialPositionInStreamExtended initialPositionInStream) throws Exception { + final ArgumentCaptor iteratorCaptor = ArgumentCaptor + .forClass(GetShardIteratorRequest.class); final ArgumentCaptor recordsCaptor = ArgumentCaptor.forClass(GetRecordsRequest.class); final String iterator = "foo"; final List expectedRecords = Collections.emptyList(); - GetShardIteratorRequest expectedIteratorRequest = - makeGetShardIteratorRequest(iteratorType); + GetShardIteratorRequest expectedIteratorRequest = makeGetShardIteratorRequest(iteratorType); if (iteratorType.equals(ShardIteratorType.AT_TIMESTAMP.toString())) { - expectedIteratorRequest = expectedIteratorRequest.toBuilder().timestamp(initialPositionInStream.getTimestamp().toInstant()).build(); + expectedIteratorRequest = expectedIteratorRequest.toBuilder() + .timestamp(initialPositionInStream.getTimestamp().toInstant()).build(); } else if (iteratorType.equals(ShardIteratorType.AT_SEQUENCE_NUMBER.toString())) { expectedIteratorRequest = expectedIteratorRequest.toBuilder().startingSequenceNumber(seqNo).build(); } @@ -462,8 +507,8 @@ public class KinesisDataFetcherTest { Checkpointer checkpoint = mock(Checkpointer.class); when(checkpoint.getCheckpoint(SHARD_ID)).thenReturn(new ExtendedSequenceNumber(seqNo)); - final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy = - new SynchronousGetRecordsRetrievalStrategy(kinesisDataFetcher); + final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy = new SynchronousGetRecordsRetrievalStrategy( + kinesisDataFetcher); kinesisDataFetcher.initialize(seqNo, initialPositionInStream); assertEquals(expectedRecords, getRecordsRetrievalStrategy.getRecords(MAX_RECORDS).records()); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherIntegrationTest.java index 1526cb22..3e86b068 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherIntegrationTest.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -34,6 +35,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import org.junit.After; import org.junit.Before; @@ -93,7 +95,7 @@ public class PrefetchRecordsPublisherIntegrationTest { private InitialPositionInStreamExtended initialPosition; @Before - public void setup() throws InterruptedException, ExecutionException { + public void setup() throws Exception { records = new ArrayList<>(); dataFetcher = spy(new KinesisDataFetcherForTest(kinesisClient, streamName, shardId, MAX_RECORDS_PER_CALL)); getRecordsRetrievalStrategy = Mockito.spy(new SynchronousGetRecordsRetrievalStrategy(dataFetcher)); @@ -101,7 +103,7 @@ public class PrefetchRecordsPublisherIntegrationTest { CompletableFuture future = mock(CompletableFuture.class); when(extendedSequenceNumber.sequenceNumber()).thenReturn("LATEST"); - when(future.get()).thenReturn(GetShardIteratorResponse.builder().shardIterator("TestIterator").build()); + when(future.get(anyLong(), any(TimeUnit.class))).thenReturn(GetShardIteratorResponse.builder().shardIterator("TestIterator").build()); when(kinesisClient.getShardIterator(any(GetShardIteratorRequest.class))).thenReturn(future); getRecordsCache = new PrefetchRecordsPublisher(MAX_SIZE, diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java index 94373fe0..fe9aa663 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java @@ -38,11 +38,13 @@ import static software.amazon.kinesis.utils.ProcessRecordsInputMatcher.eqProcess import java.time.Duration; import java.time.Instant; import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -74,6 +76,7 @@ import software.amazon.kinesis.metrics.NullMetricsFactory; import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; import software.amazon.kinesis.retrieval.KinesisClientRecord; import software.amazon.kinesis.retrieval.RecordsRetrieved; +import software.amazon.kinesis.retrieval.RetryableRetrievalException; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; /** @@ -254,6 +257,18 @@ public class PrefetchRecordsPublisherTest { verify(dataFetcher).restartIterator(); } + @Test + public void testRetryableRetrievalExceptionContinues() { + + GetRecordsResponse response = GetRecordsResponse.builder().millisBehindLatest(100L).records(Collections.emptyList()).build(); + when(getRecordsRetrievalStrategy.getRecords(anyInt())).thenThrow(new RetryableRetrievalException("Timeout", new TimeoutException("Timeout"))).thenReturn(response); + + getRecordsCache.start(sequenceNumber, initialPosition); + + RecordsRetrieved records = getRecordsCache.getNextResult(); + assertThat(records.processRecordsInput().millisBehindLatest(), equalTo(response.millisBehindLatest())); + } + @Test(timeout = 1000L) public void testNoDeadlockOnFullQueue() { // diff --git a/pom.xml b/pom.xml index df1571b0..741f9d92 100644 --- a/pom.xml +++ b/pom.xml @@ -20,7 +20,7 @@ amazon-kinesis-client-pom pom Amazon Kinesis Client Library - 2.1.2 + 2.1.3-SNAPSHOT The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data from Amazon Kinesis. @@ -31,7 +31,7 @@ - 2.4.0 + 2.5.10