Added Timeouts for DynamoDB and Kinesis Calls (#518)

* Advance version to 2.1.3-SNAPSHOT

* Added timeouts for Kinesis and DynamoDB calls

Added a timeout to prevent an issue where the Kinesis or DynamoDB call
never completes.

For Kinesis call to GetRecords the timeout defaults to 30 seconds, and can be configured
on the PollingConfig.

For DynamoDB and Kinesis (when calling ListShards) the timeout
defaults to 60 seconds and can be configured on LeaseManagementConfig.
This commit is contained in:
Justin Pfifer 2019-03-18 10:06:59 -07:00 committed by Sahil Palvia
parent 2f3907d19f
commit 6a70e3db31
19 changed files with 770 additions and 144 deletions

View file

@ -19,7 +19,7 @@
<parent>
<artifactId>amazon-kinesis-client-pom</artifactId>
<groupId>software.amazon.kinesis</groupId>
<version>2.1.2</version>
<version>2.1.3-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

View file

@ -20,7 +20,7 @@
<parent>
<groupId>software.amazon.kinesis</groupId>
<artifactId>amazon-kinesis-client-pom</artifactId>
<version>2.1.2</version>
<version>2.1.3-SNAPSHOT</version>
</parent>
<artifactId>amazon-kinesis-client</artifactId>

View file

@ -0,0 +1,35 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package software.amazon.kinesis.common;
import java.time.Duration;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class FutureUtils {
public static <T> T resolveOrCancelFuture(Future<T> future, Duration timeout)
throws ExecutionException, InterruptedException, TimeoutException {
try {
return future.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
} catch (TimeoutException te) {
future.cancel(true);
throw te;
}
}
}

View file

@ -1,16 +1,16 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package software.amazon.kinesis.leases;
@ -22,6 +22,8 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -31,7 +33,6 @@ import org.apache.commons.lang3.StringUtils;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.Synchronized;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
@ -44,17 +45,18 @@ import software.amazon.awssdk.services.kinesis.model.ResourceInUseException;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.FutureUtils;
import software.amazon.kinesis.common.KinesisRequestsBuilder;
import software.amazon.kinesis.retrieval.AWSExceptionManager;
/**
*
*/
@RequiredArgsConstructor
@Slf4j
@Accessors(fluent = true)
@KinesisClientInternalApi
public class KinesisShardDetector implements ShardDetector {
@NonNull
private final KinesisAsyncClient kinesisClient;
@NonNull
@ -64,12 +66,35 @@ public class KinesisShardDetector implements ShardDetector {
private final long listShardsCacheAllowedAgeInSeconds;
private final int maxCacheMissesBeforeReload;
private final int cacheMissWarningModulus;
private final Duration kinesisRequestTimeout;
private volatile Map<String, Shard> cachedShardMap = null;
private volatile Instant lastCacheUpdateTime;
@Getter(AccessLevel.PACKAGE)
private AtomicInteger cacheMisses = new AtomicInteger(0);
@Deprecated
public KinesisShardDetector(KinesisAsyncClient kinesisClient, String streamName, long listShardsBackoffTimeInMillis,
int maxListShardsRetryAttempts, long listShardsCacheAllowedAgeInSeconds, int maxCacheMissesBeforeReload,
int cacheMissWarningModulus) {
this(kinesisClient, streamName, listShardsBackoffTimeInMillis, maxListShardsRetryAttempts,
listShardsCacheAllowedAgeInSeconds, maxCacheMissesBeforeReload, cacheMissWarningModulus,
LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT);
}
public KinesisShardDetector(KinesisAsyncClient kinesisClient, String streamName, long listShardsBackoffTimeInMillis,
int maxListShardsRetryAttempts, long listShardsCacheAllowedAgeInSeconds, int maxCacheMissesBeforeReload,
int cacheMissWarningModulus, Duration kinesisRequestTimeout) {
this.kinesisClient = kinesisClient;
this.streamName = streamName;
this.listShardsBackoffTimeInMillis = listShardsBackoffTimeInMillis;
this.maxListShardsRetryAttempts = maxListShardsRetryAttempts;
this.listShardsCacheAllowedAgeInSeconds = listShardsCacheAllowedAgeInSeconds;
this.maxCacheMissesBeforeReload = maxCacheMissesBeforeReload;
this.cacheMissWarningModulus = cacheMissWarningModulus;
this.kinesisRequestTimeout = kinesisRequestTimeout;
}
@Override
public Shard shard(@NonNull final String shardId) {
if (CollectionUtils.isNullOrEmpty(this.cachedShardMap)) {
@ -132,10 +157,10 @@ public class KinesisShardDetector implements ShardDetector {
result = listShards(nextToken);
if (result == null) {
/*
* If listShards ever returns null, we should bail and return null. This indicates the stream is not
* in ACTIVE or UPDATING state and we may not have accurate/consistent information about the stream.
*/
/*
* If listShards ever returns null, we should bail and return null. This indicates the stream is not
* in ACTIVE or UPDATING state and we may not have accurate/consistent information about the stream.
*/
return null;
} else {
shards.addAll(result.shards());
@ -167,7 +192,7 @@ public class KinesisShardDetector implements ShardDetector {
try {
try {
result = kinesisClient.listShards(request.build()).get();
result = FutureUtils.resolveOrCancelFuture(kinesisClient.listShards(request.build()), kinesisRequestTimeout);
} catch (ExecutionException e) {
throw exceptionManager.apply(e.getCause());
} catch (InterruptedException e) {
@ -188,6 +213,8 @@ public class KinesisShardDetector implements ShardDetector {
log.debug("Stream {} : Sleep was interrupted ", streamName, ie);
}
lastException = e;
} catch (TimeoutException te) {
throw new RuntimeException(te);
}
remainingRetries--;
if (remainingRetries <= 0 && result == null) {

View file

@ -15,6 +15,7 @@
package software.amazon.kinesis.leases;
import java.time.Duration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
@ -41,6 +42,9 @@ import software.amazon.kinesis.metrics.NullMetricsFactory;
@Data
@Accessors(fluent = true)
public class LeaseManagementConfig {
public static final Duration DEFAULT_REQUEST_TIMEOUT = Duration.ofMinutes(1);
/**
* Name of the table to use in DynamoDB
*
@ -159,6 +163,8 @@ public class LeaseManagementConfig {
public long epsilonMillis = 25L;
private Duration dynamoDbRequestTimeout = DEFAULT_REQUEST_TIMEOUT;
/**
* The initial position for getting records from Kinesis streams.
*
@ -261,7 +267,7 @@ public class LeaseManagementConfig {
initialLeaseTableReadCapacity(),
initialLeaseTableWriteCapacity(),
hierarchicalShardSyncer(),
tableCreatorCallback());
tableCreatorCallback(), dynamoDbRequestTimeout());
}
return leaseManagementFactory;
}

View file

@ -15,6 +15,7 @@
package software.amazon.kinesis.leases.dynamodb;
import java.time.Duration;
import java.util.concurrent.ExecutorService;
import lombok.Data;
@ -26,6 +27,7 @@ import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.leases.KinesisShardDetector;
import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.LeaseManagementConfig;
import software.amazon.kinesis.leases.LeaseManagementFactory;
import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardSyncTaskManager;
@ -37,6 +39,7 @@ import software.amazon.kinesis.metrics.MetricsFactory;
@Data
@KinesisClientInternalApi
public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
@NonNull
private final KinesisAsyncClient kinesisClient;
@NonNull
@ -71,6 +74,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
private final long initialLeaseTableReadCapacity;
private final long initialLeaseTableWriteCapacity;
private final TableCreatorCallback tableCreatorCallback;
private final Duration dynamoDbRequestTimeout;
/**
* Constructor.
@ -166,7 +170,8 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity,
new HierarchicalShardSyncer(), TableCreatorCallback.NOOP_TABLE_CREATOR_CALLBACK);
new HierarchicalShardSyncer(), TableCreatorCallback.NOOP_TABLE_CREATOR_CALLBACK,
LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT);
}
/**
@ -198,6 +203,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
* @param hierarchicalShardSyncer
* @param tableCreatorCallback
*/
@Deprecated
public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final String streamName,
final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier,
final ExecutorService executorService, final InitialPositionInStreamExtended initialPositionInStream,
@ -208,8 +214,58 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload,
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus,
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity,
final HierarchicalShardSyncer hierarchicalShardSyncer,
final TableCreatorCallback tableCreatorCallback) {
final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback) {
this(kinesisClient, streamName, dynamoDBClient, tableName, workerIdentifier, executorService,
initialPositionInStream, failoverTimeMillis, epsilonMillis, maxLeasesForWorker,
maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity,
hierarchicalShardSyncer, tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT);
}
/**
* Constructor.
*
* @param kinesisClient
* @param streamName
* @param dynamoDBClient
* @param tableName
* @param workerIdentifier
* @param executorService
* @param initialPositionInStream
* @param failoverTimeMillis
* @param epsilonMillis
* @param maxLeasesForWorker
* @param maxLeasesToStealAtOneTime
* @param maxLeaseRenewalThreads
* @param cleanupLeasesUponShardCompletion
* @param ignoreUnexpectedChildShards
* @param shardSyncIntervalMillis
* @param consistentReads
* @param listShardsBackoffTimeMillis
* @param maxListShardsRetryAttempts
* @param maxCacheMissesBeforeReload
* @param listShardsCacheAllowedAgeInSeconds
* @param cacheMissWarningModulus
* @param initialLeaseTableReadCapacity
* @param initialLeaseTableWriteCapacity
* @param hierarchicalShardSyncer
* @param tableCreatorCallback
* @param dynamoDbRequestTimeout
*/
public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final String streamName,
final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier,
final ExecutorService executorService, final InitialPositionInStreamExtended initialPositionInStream,
final long failoverTimeMillis, final long epsilonMillis, final int maxLeasesForWorker,
final int maxLeasesToStealAtOneTime, final int maxLeaseRenewalThreads,
final boolean cleanupLeasesUponShardCompletion, final boolean ignoreUnexpectedChildShards,
final long shardSyncIntervalMillis, final boolean consistentReads, final long listShardsBackoffTimeMillis,
final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload,
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus,
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity,
final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
Duration dynamoDbRequestTimeout) {
this.kinesisClient = kinesisClient;
this.streamName = streamName;
this.dynamoDBClient = dynamoDBClient;
@ -235,6 +291,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
this.initialLeaseTableWriteCapacity = initialLeaseTableWriteCapacity;
this.hierarchicalShardSyncer = hierarchicalShardSyncer;
this.tableCreatorCallback = tableCreatorCallback;
this.dynamoDbRequestTimeout = dynamoDbRequestTimeout;
}
@Override
@ -267,13 +324,13 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
@Override
public DynamoDBLeaseRefresher createLeaseRefresher() {
return new DynamoDBLeaseRefresher(tableName, dynamoDBClient, new DynamoDBLeaseSerializer(), consistentReads,
tableCreatorCallback);
tableCreatorCallback, dynamoDbRequestTimeout);
}
@Override
public ShardDetector createShardDetector() {
return new KinesisShardDetector(kinesisClient, streamName, listShardsBackoffTimeMillis,
maxListShardsRetryAttempts, listShardsCacheAllowedAgeInSeconds, maxCacheMissesBeforeReload,
cacheMissWarningModulus);
cacheMissWarningModulus, dynamoDbRequestTimeout);
}
}

View file

@ -14,11 +14,13 @@
*/
package software.amazon.kinesis.leases.dynamodb;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
@ -45,7 +47,9 @@ import software.amazon.awssdk.services.dynamodb.model.TableStatus;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.FutureUtils;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseManagementConfig;
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.LeaseSerializer;
import software.amazon.kinesis.leases.exceptions.DependencyException;
@ -60,12 +64,15 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
@Slf4j
@KinesisClientInternalApi
public class DynamoDBLeaseRefresher implements LeaseRefresher {
protected final String table;
protected final DynamoDbAsyncClient dynamoDBClient;
protected final LeaseSerializer serializer;
protected final boolean consistentReads;
private final TableCreatorCallback tableCreatorCallback;
private final Duration dynamoDbRequestTimeout;
private boolean newTableCreated = false;
/**
@ -95,14 +102,31 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
* @param consistentReads
* @param tableCreatorCallback
*/
@Deprecated
public DynamoDBLeaseRefresher(final String table, final DynamoDbAsyncClient dynamoDBClient,
final LeaseSerializer serializer, final boolean consistentReads,
@NonNull final TableCreatorCallback tableCreatorCallback) {
final LeaseSerializer serializer, final boolean consistentReads,
@NonNull final TableCreatorCallback tableCreatorCallback) {
this(table, dynamoDBClient, serializer, consistentReads, tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT);
}
/**
* Constructor.
* @param table
* @param dynamoDBClient
* @param serializer
* @param consistentReads
* @param tableCreatorCallback
* @param dynamoDbRequestTimeout
*/
public DynamoDBLeaseRefresher(final String table, final DynamoDbAsyncClient dynamoDBClient,
final LeaseSerializer serializer, final boolean consistentReads,
@NonNull final TableCreatorCallback tableCreatorCallback, Duration dynamoDbRequestTimeout) {
this.table = table;
this.dynamoDBClient = dynamoDBClient;
this.serializer = serializer;
this.consistentReads = consistentReads;
this.tableCreatorCallback = tableCreatorCallback;
this.dynamoDbRequestTimeout = dynamoDbRequestTimeout;
}
/**
@ -132,7 +156,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
try {
try {
dynamoDBClient.createTable(request).get();
FutureUtils.resolveOrCancelFuture(dynamoDBClient.createTable(request), dynamoDbRequestTimeout);
newTableCreated = true;
} catch (ExecutionException e) {
throw exceptionManager.apply(e.getCause());
@ -144,7 +168,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
return newTableCreated;
} catch (LimitExceededException e) {
throw new ProvisionedThroughputException("Capacity exceeded when creating table " + table, e);
} catch (DynamoDbException e) {
} catch (DynamoDbException | TimeoutException e) {
throw new DependencyException(e);
}
return newTableCreated;
@ -167,7 +191,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
DescribeTableResponse result;
try {
try {
result = dynamoDBClient.describeTable(request).get();
result = FutureUtils.resolveOrCancelFuture(dynamoDBClient.describeTable(request), dynamoDbRequestTimeout);
} catch (ExecutionException e) {
throw exceptionManager.apply(e.getCause());
} catch (InterruptedException e) {
@ -177,7 +201,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
} catch (ResourceNotFoundException e) {
log.debug("Got ResourceNotFoundException for table {} in leaseTableExists, returning false.", table);
return null;
} catch (DynamoDbException e) {
} catch (DynamoDbException | TimeoutException e) {
throw new DependencyException(e);
}
@ -269,7 +293,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
try {
try {
ScanResponse scanResult = dynamoDBClient.scan(scanRequest).get();
ScanResponse scanResult = FutureUtils.resolveOrCancelFuture(dynamoDBClient.scan(scanRequest), dynamoDbRequestTimeout);
List<Lease> result = new ArrayList<>();
while (scanResult != null) {
@ -287,7 +311,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
// Make another request, picking up where we left off.
scanRequest = scanRequest.toBuilder().exclusiveStartKey(lastEvaluatedKey).build();
log.debug("lastEvaluatedKey was {}, continuing scan.", lastEvaluatedKey);
scanResult = dynamoDBClient.scan(scanRequest).get();
scanResult = FutureUtils.resolveOrCancelFuture(dynamoDBClient.scan(scanRequest), dynamoDbRequestTimeout);
}
}
log.debug("Listed {} leases from table {}", result.size(), table);
@ -302,7 +326,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
throw new InvalidStateException("Cannot scan lease table " + table + " because it does not exist.", e);
} catch (ProvisionedThroughputExceededException e) {
throw new ProvisionedThroughputException(e);
} catch (DynamoDbException e) {
} catch (DynamoDbException | TimeoutException e) {
throw new DependencyException(e);
}
}
@ -323,7 +347,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
try {
try {
dynamoDBClient.putItem(request).get();
FutureUtils.resolveOrCancelFuture(dynamoDBClient.putItem(request), dynamoDbRequestTimeout);
} catch (ExecutionException e) {
throw exceptionManager.apply(e.getCause());
} catch (InterruptedException e) {
@ -333,7 +357,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
} catch (ConditionalCheckFailedException e) {
log.debug("Did not create lease {} because it already existed", lease);
return false;
} catch (DynamoDbException e) {
} catch (DynamoDbException | TimeoutException e) {
throw convertAndRethrowExceptions("create", lease.leaseKey(), e);
}
return true;
@ -352,7 +376,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
final AWSExceptionManager exceptionManager = createExceptionManager();
try {
try {
GetItemResponse result = dynamoDBClient.getItem(request).get();
GetItemResponse result = FutureUtils.resolveOrCancelFuture(dynamoDBClient.getItem(request), dynamoDbRequestTimeout);
Map<String, AttributeValue> dynamoRecord = result.item();
if (CollectionUtils.isNullOrEmpty(dynamoRecord)) {
@ -369,7 +393,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
// TODO: check behavior
throw new DependencyException(e);
}
} catch (DynamoDbException e) {
} catch (DynamoDbException | TimeoutException e) {
throw convertAndRethrowExceptions("get", leaseKey, e);
}
}
@ -391,7 +415,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
try {
try {
dynamoDBClient.updateItem(request).get();
FutureUtils.resolveOrCancelFuture(dynamoDBClient.updateItem(request), dynamoDbRequestTimeout);
} catch (ExecutionException e) {
throw exceptionManager.apply(e.getCause());
} catch (InterruptedException e) {
@ -414,7 +438,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
}
log.info("Detected spurious renewal failure for lease with key {}, but recovered", lease.leaseKey());
} catch (DynamoDbException e) {
} catch (DynamoDbException | TimeoutException e) {
throw new DependencyException(e);
}
@ -444,7 +468,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
try {
try {
dynamoDBClient.updateItem(request).get();
FutureUtils.resolveOrCancelFuture(dynamoDBClient.updateItem(request), dynamoDbRequestTimeout);
} catch (ExecutionException e) {
throw exceptionManager.apply(e.getCause());
} catch (InterruptedException e) {
@ -455,7 +479,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
log.debug("Lease renewal failed for lease with key {} because the lease counter was not {}",
lease.leaseKey(), lease.leaseCounter());
return false;
} catch (DynamoDbException e) {
} catch (DynamoDbException | TimeoutException e) {
throw convertAndRethrowExceptions("take", lease.leaseKey(), e);
}
@ -487,7 +511,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
try {
try {
dynamoDBClient.updateItem(request).get();
FutureUtils.resolveOrCancelFuture(dynamoDBClient.updateItem(request), dynamoDbRequestTimeout);
} catch (ExecutionException e) {
throw exceptionManager.apply(e.getCause());
} catch (InterruptedException e) {
@ -498,7 +522,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
log.debug("Lease eviction failed for lease with key {} because the lease owner was not {}",
lease.leaseKey(), lease.leaseOwner());
return false;
} catch (DynamoDbException e) {
} catch (DynamoDbException | TimeoutException e) {
throw convertAndRethrowExceptions("evict", lease.leaseKey(), e);
}
@ -522,14 +546,14 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
try {
try {
dynamoDBClient.deleteItem(deleteRequest).get();
FutureUtils.resolveOrCancelFuture(dynamoDBClient.deleteItem(deleteRequest), dynamoDbRequestTimeout);
} catch (ExecutionException e) {
throw exceptionManager.apply(e.getCause());
} catch (InterruptedException e) {
// TODO: check the behavior
throw new DependencyException(e);
}
} catch (DynamoDbException e) {
} catch (DynamoDbException | TimeoutException e) {
throw convertAndRethrowExceptions("deleteAll", lease.leaseKey(), e);
}
}
@ -549,14 +573,14 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
final AWSExceptionManager exceptionManager = createExceptionManager();
try {
try {
dynamoDBClient.deleteItem(deleteRequest).get();
FutureUtils.resolveOrCancelFuture(dynamoDBClient.deleteItem(deleteRequest), dynamoDbRequestTimeout);
} catch (ExecutionException e) {
throw exceptionManager.apply(e.getCause());
} catch (InterruptedException e) {
// TODO: Check if this is the correct behavior
throw new DependencyException(e);
}
} catch (DynamoDbException e) {
} catch (DynamoDbException | TimeoutException e) {
throw convertAndRethrowExceptions("delete", lease.leaseKey(), e);
}
}
@ -580,7 +604,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
try {
try {
dynamoDBClient.updateItem(request).get();
FutureUtils.resolveOrCancelFuture(dynamoDBClient.updateItem(request), dynamoDbRequestTimeout);
} catch (ExecutionException e) {
throw exceptionManager.apply(e.getCause());
} catch (InterruptedException e) {
@ -590,7 +614,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
log.debug("Lease update failed for lease with key {} because the lease counter was not {}",
lease.leaseKey(), lease.leaseCounter());
return false;
} catch (DynamoDbException e) {
} catch (DynamoDbException | TimeoutException e) {
throw convertAndRethrowExceptions("update", lease.leaseKey(), e);
}

View file

@ -14,8 +14,10 @@
*/
package software.amazon.kinesis.retrieval.polling;
import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang3.StringUtils;
@ -25,7 +27,6 @@ import lombok.AccessLevel;
import lombok.Data;
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
@ -35,6 +36,8 @@ import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
import software.amazon.awssdk.services.kinesis.model.KinesisException;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.FutureUtils;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.KinesisRequestsBuilder;
import software.amazon.kinesis.metrics.MetricsFactory;
@ -44,14 +47,16 @@ import software.amazon.kinesis.metrics.MetricsUtil;
import software.amazon.kinesis.retrieval.AWSExceptionManager;
import software.amazon.kinesis.retrieval.DataFetcherResult;
import software.amazon.kinesis.retrieval.IteratorBuilder;
import software.amazon.kinesis.retrieval.RetryableRetrievalException;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
/**
* Used to get data from Amazon Kinesis. Tracks iterator state internally.
*/
@RequiredArgsConstructor
@Slf4j
@KinesisClientInternalApi
public class KinesisDataFetcher {
private static final String METRICS_PREFIX = "KinesisDataFetcher";
private static final String OPERATION = "ProcessTask";
@ -64,6 +69,21 @@ public class KinesisDataFetcher {
private final int maxRecords;
@NonNull
private final MetricsFactory metricsFactory;
private final Duration maxFutureWait;
@Deprecated
public KinesisDataFetcher(KinesisAsyncClient kinesisClient, String streamName, String shardId, int maxRecords, MetricsFactory metricsFactory) {
this(kinesisClient, streamName, shardId, maxRecords, metricsFactory, PollingConfig.DEFAULT_REQUEST_TIMEOUT);
}
public KinesisDataFetcher(KinesisAsyncClient kinesisClient, String streamName, String shardId, int maxRecords, MetricsFactory metricsFactory, Duration maxFutureWait) {
this.kinesisClient = kinesisClient;
this.streamName = streamName;
this.shardId = shardId;
this.maxRecords = maxRecords;
this.metricsFactory = metricsFactory;
this.maxFutureWait = maxFutureWait;
}
/** Note: This method has package level access for testing purposes.
* @return nextIterator
@ -191,7 +211,8 @@ public class KinesisDataFetcher {
try {
try {
final GetShardIteratorResponse result = kinesisClient.getShardIterator(request).get();
final GetShardIteratorResponse result = FutureUtils
.resolveOrCancelFuture(kinesisClient.getShardIterator(request), maxFutureWait);
nextIterator = result.shardIterator();
success = true;
} catch (ExecutionException e) {
@ -199,6 +220,8 @@ public class KinesisDataFetcher {
} catch (InterruptedException e) {
// TODO: Check behavior
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RetryableRetrievalException(e.getMessage(), e);
}
} catch (ResourceNotFoundException e) {
log.info("Caught ResourceNotFoundException when getting an iterator for shard {}", shardId, e);
@ -244,7 +267,8 @@ public class KinesisDataFetcher {
boolean success = false;
long startTime = System.currentTimeMillis();
try {
final GetRecordsResponse response = kinesisClient.getRecords(request).get();
final GetRecordsResponse response = FutureUtils.resolveOrCancelFuture(kinesisClient.getRecords(request),
maxFutureWait);
success = true;
return response;
} catch (ExecutionException e) {
@ -253,6 +277,8 @@ public class KinesisDataFetcher {
// TODO: Check behavior
log.debug("Interrupt called on metod, shutdown initiated");
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RetryableRetrievalException(e.getMessage(), e);
} finally {
MetricsUtil.addSuccessAndLatency(metricsScope, String.format("%s.%s", METRICS_PREFIX, "getRecords"),
success, startTime, MetricsLevel.DETAILED);

View file

@ -15,6 +15,7 @@
package software.amazon.kinesis.retrieval.polling;
import java.time.Duration;
import java.util.Optional;
import lombok.Data;
@ -32,6 +33,8 @@ import software.amazon.kinesis.retrieval.RetrievalSpecificConfig;
@Getter
public class PollingConfig implements RetrievalSpecificConfig {
public static final Duration DEFAULT_REQUEST_TIMEOUT = Duration.ofSeconds(30);
/**
* Name of the Kinesis stream.
*
@ -94,9 +97,14 @@ public class PollingConfig implements RetrievalSpecificConfig {
*/
private RecordsFetcherFactory recordsFetcherFactory = new SimpleRecordsFetcherFactory();
/**
* The maximum time to wait for a future request from Kinesis to complete
*/
private Duration kinesisRequestTimeout = DEFAULT_REQUEST_TIMEOUT;
@Override
public RetrievalFactory retrievalFactory() {
return new SynchronousBlockingRetrievalFactory(streamName(), kinesisClient(), recordsFetcherFactory,
maxRecords());
maxRecords(), kinesisRequestTimeout);
}
}

View file

@ -51,6 +51,7 @@ import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
import software.amazon.kinesis.retrieval.KinesisClientRecord;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RecordsRetrieved;
import software.amazon.kinesis.retrieval.RetryableRetrievalException;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
/**
@ -317,6 +318,8 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
drainQueueForRequests();
} catch (PositionResetException pse) {
throw pse;
} catch (RetryableRetrievalException rre) {
log.info("Timeout occurred while waiting for response from Kinesis. Will retry the request.");
} catch (InterruptedException e) {
log.info("Thread was interrupted, indicating shutdown was called on the cache.");
} catch (ExpiredIteratorException e) {

View file

@ -19,12 +19,15 @@ import software.amazon.kinesis.retrieval.RecordsFetcherFactory;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RetrievalFactory;
import java.time.Duration;
/**
*
*/
@Data
@KinesisClientInternalApi
public class SynchronousBlockingRetrievalFactory implements RetrievalFactory {
@NonNull
private final String streamName;
@NonNull
@ -34,12 +37,26 @@ public class SynchronousBlockingRetrievalFactory implements RetrievalFactory {
// private final long listShardsBackoffTimeInMillis;
// private final int maxListShardsRetryAttempts;
private final int maxRecords;
private final Duration kinesisRequestTimeout;
public SynchronousBlockingRetrievalFactory(String streamName, KinesisAsyncClient kinesisClient, RecordsFetcherFactory recordsFetcherFactory, int maxRecords, Duration kinesisRequestTimeout) {
this.streamName = streamName;
this.kinesisClient = kinesisClient;
this.recordsFetcherFactory = recordsFetcherFactory;
this.maxRecords = maxRecords;
this.kinesisRequestTimeout = kinesisRequestTimeout;
}
@Deprecated
public SynchronousBlockingRetrievalFactory(String streamName, KinesisAsyncClient kinesisClient, RecordsFetcherFactory recordsFetcherFactory, int maxRecords) {
this(streamName, kinesisClient, recordsFetcherFactory, maxRecords, PollingConfig.DEFAULT_REQUEST_TIMEOUT);
}
@Override
public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(@NonNull final ShardInfo shardInfo,
@NonNull final MetricsFactory metricsFactory) {
return new SynchronousGetRecordsRetrievalStrategy(
new KinesisDataFetcher(kinesisClient, streamName, shardInfo.shardId(), maxRecords, metricsFactory));
new KinesisDataFetcher(kinesisClient, streamName, shardInfo.shardId(), maxRecords, metricsFactory, kinesisRequestTimeout));
}
@Override

View file

@ -1,24 +1,24 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package software.amazon.kinesis.retrieval.polling;
import java.time.Duration;
import java.util.concurrent.ExecutorService;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.leases.ShardInfo;
@ -31,7 +31,6 @@ import software.amazon.kinesis.retrieval.RetrievalFactory;
/**
*
*/
@RequiredArgsConstructor
@KinesisClientInternalApi
public class SynchronousPrefetchingRetrievalFactory implements RetrievalFactory {
@NonNull
@ -44,26 +43,41 @@ public class SynchronousPrefetchingRetrievalFactory implements RetrievalFactory
@NonNull
private final ExecutorService executorService;
private final long idleMillisBetweenCalls;
private final Duration maxFutureWait;
@Deprecated
public SynchronousPrefetchingRetrievalFactory(String streamName, KinesisAsyncClient kinesisClient,
RecordsFetcherFactory recordsFetcherFactory, int maxRecords, ExecutorService executorService,
long idleMillisBetweenCalls) {
this(streamName, kinesisClient, recordsFetcherFactory, maxRecords, executorService, idleMillisBetweenCalls,
PollingConfig.DEFAULT_REQUEST_TIMEOUT);
}
public SynchronousPrefetchingRetrievalFactory(String streamName, KinesisAsyncClient kinesisClient,
RecordsFetcherFactory recordsFetcherFactory, int maxRecords, ExecutorService executorService,
long idleMillisBetweenCalls, Duration maxFutureWait) {
this.streamName = streamName;
this.kinesisClient = kinesisClient;
this.recordsFetcherFactory = recordsFetcherFactory;
this.maxRecords = maxRecords;
this.executorService = executorService;
this.idleMillisBetweenCalls = idleMillisBetweenCalls;
this.maxFutureWait = maxFutureWait;
}
@Override
public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(@NonNull final ShardInfo shardInfo,
@NonNull final MetricsFactory metricsFactory) {
return new SynchronousGetRecordsRetrievalStrategy(
new KinesisDataFetcher(kinesisClient, streamName, shardInfo.shardId(), maxRecords, metricsFactory));
return new SynchronousGetRecordsRetrievalStrategy(new KinesisDataFetcher(kinesisClient, streamName,
shardInfo.shardId(), maxRecords, metricsFactory, maxFutureWait));
}
@Override
public RecordsPublisher createGetRecordsCache(@NonNull final ShardInfo shardInfo,
@NonNull final MetricsFactory metricsFactory) {
return new PrefetchRecordsPublisher(recordsFetcherFactory.maxPendingProcessRecordsInput(),
recordsFetcherFactory.maxByteSize(),
recordsFetcherFactory.maxRecordsCount(),
maxRecords,
createGetRecordsRetrievalStrategy(shardInfo, metricsFactory),
executorService,
idleMillisBetweenCalls,
metricsFactory,
"Prefetching",
shardInfo.shardId());
recordsFetcherFactory.maxByteSize(), recordsFetcherFactory.maxRecordsCount(), maxRecords,
createGetRecordsRetrievalStrategy(shardInfo, metricsFactory), executorService, idleMillisBetweenCalls,
metricsFactory, "Prefetching", shardInfo.shardId());
}
}

View file

@ -0,0 +1,55 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package software.amazon.kinesis.common;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import java.time.Duration;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class FutureUtilsTest {
@Mock
private Future<String> future;
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Test
public void testTimeoutExceptionCancelsFuture() throws Exception {
expectedException.expect(TimeoutException.class);
when(future.get(anyLong(), any())).thenThrow(new TimeoutException("Timeout"));
try {
FutureUtils.resolveOrCancelFuture(future, Duration.ofSeconds(1));
} finally {
verify(future).cancel(eq(true));
}
}
}

View file

@ -9,9 +9,11 @@
package software.amazon.kinesis.leases;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.isA;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
@ -22,11 +24,15 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
@ -55,8 +61,13 @@ public class KinesisShardDetectorTest {
private KinesisShardDetector shardDetector;
@Rule
public ExpectedException expectedExceptionRule = ExpectedException.none();
@Mock
private KinesisAsyncClient client;
@Mock
private CompletableFuture<ListShardsResponse> mockFuture;
@Before
public void setup() {
@ -140,6 +151,19 @@ public class KinesisShardDetectorTest {
}
}
@Test
public void testListShardsTimesOut() throws Exception {
expectedExceptionRule.expect(RuntimeException.class);
expectedExceptionRule.expectCause(isA(TimeoutException.class));
when(mockFuture.get(anyLong(), any(TimeUnit.class))).thenThrow(new TimeoutException("Timeout"));
when(client.listShards(any(ListShardsRequest.class))).thenReturn(mockFuture);
shardDetector.listShards();
}
@Test
public void testGetShard() {
final String shardId = String.format(SHARD_ID, 1);

View file

@ -0,0 +1,268 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package software.amazon.kinesis.leases.dynamodb;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
import software.amazon.awssdk.services.dynamodb.model.CreateTableResponse;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemResponse;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.PutItemResponse;
import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemResponse;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseSerializer;
import software.amazon.kinesis.leases.exceptions.DependencyException;
@RunWith(MockitoJUnitRunner.class)
public class DynamoDBLeaseRefresherTest {
private static final String TABLE_NAME = "test";
private static final boolean CONSISTENT_READS = true;
@Mock
private DynamoDbAsyncClient dynamoDbClient;
@Mock
private LeaseSerializer leaseSerializer;
@Mock
private TableCreatorCallback tableCreatorCallback;
@Mock
private CompletableFuture<ScanResponse> mockScanFuture;
@Mock
private CompletableFuture<PutItemResponse> mockPutItemFuture;
@Mock
private CompletableFuture<GetItemResponse> mockGetItemFuture;
@Mock
private CompletableFuture<UpdateItemResponse> mockUpdateFuture;
@Mock
private CompletableFuture<DeleteItemResponse> mockDeleteFuture;
@Mock
private CompletableFuture<DescribeTableResponse> mockDescribeTableFuture;
@Mock
private CompletableFuture<CreateTableResponse> mockCreateTableFuture;
@Mock
private Lease lease;
@Rule
public ExpectedException expectedException = ExpectedException.none();
private DynamoDBLeaseRefresher leaseRefresher;
private Map<String, AttributeValue> serializedLease;
@Before
public void setup() throws Exception {
leaseRefresher = new DynamoDBLeaseRefresher(TABLE_NAME, dynamoDbClient, leaseSerializer, CONSISTENT_READS,
tableCreatorCallback);
serializedLease = new HashMap<>();
}
@Test
public void testListLeasesHandlesTimeout() throws Exception {
TimeoutException te = setRuleForDependencyTimeout();
when(mockScanFuture.get(anyLong(), any(TimeUnit.class))).thenThrow(te);
when(dynamoDbClient.scan(any(ScanRequest.class))).thenReturn(mockScanFuture);
verifyCancel(mockScanFuture, () -> leaseRefresher.listLeases());
}
@Test
public void testListLeasesSucceedsThenFails() throws Exception {
TimeoutException te = setRuleForDependencyTimeout();
when(dynamoDbClient.scan(any(ScanRequest.class))).thenReturn(mockScanFuture);
Map<String, AttributeValue> lastEvaluatedKey = new HashMap<>();
lastEvaluatedKey.put("Test", AttributeValue.builder().s("test").build());
when(mockScanFuture.get(anyLong(), any(TimeUnit.class)))
.thenReturn(ScanResponse.builder().lastEvaluatedKey(lastEvaluatedKey).build())
.thenThrow(te);
verifyCancel(mockScanFuture, () -> leaseRefresher.listLeases());
verify(mockScanFuture, times(2)).get(anyLong(), any(TimeUnit.class));
verify(dynamoDbClient, times(2)).scan(any(ScanRequest.class));
}
@Test
public void testCreateLeaseIfNotExistsTimesOut() throws Exception {
TimeoutException te = setRuleForDependencyTimeout();
when(dynamoDbClient.putItem(any(PutItemRequest.class))).thenReturn(mockPutItemFuture);
when(mockPutItemFuture.get(anyLong(), any())).thenThrow(te);
when(leaseSerializer.toDynamoRecord(any())).thenReturn(serializedLease);
when(leaseSerializer.getDynamoNonexistantExpectation()).thenReturn(Collections.emptyMap());
verifyCancel(mockPutItemFuture, () -> leaseRefresher.createLeaseIfNotExists(lease));
}
@Test
public void testGetLeaseTimesOut() throws Exception {
TimeoutException te = setRuleForDependencyTimeout();
when(dynamoDbClient.getItem(any(GetItemRequest.class))).thenReturn(mockGetItemFuture);
when(mockGetItemFuture.get(anyLong(), any())).thenThrow(te);
when(leaseSerializer.getDynamoHashKey(anyString())).thenReturn(Collections.emptyMap());
verifyCancel(mockGetItemFuture, () -> leaseRefresher.getLease("test"));
}
@Test
public void testRenewLeaseTimesOut() throws Exception {
setupUpdateItemTest();
verifyCancel(mockUpdateFuture, () ->leaseRefresher.renewLease(lease));
}
@Test
public void testTakeLeaseTimesOut() throws Exception {
setupUpdateItemTest();
verifyCancel(mockUpdateFuture, () -> leaseRefresher.takeLease(lease, "owner"));
}
@Test
public void testEvictLeaseTimesOut() throws Exception {
setupUpdateItemTest();
verifyCancel(mockUpdateFuture, () -> leaseRefresher.evictLease(lease));
}
@Test
public void testUpdateLeaseTimesOut() throws Exception {
setupUpdateItemTest();
verifyCancel(mockUpdateFuture, () -> leaseRefresher.updateLease(lease));
}
@Test
public void testDeleteAllLeasesTimesOut() throws Exception {
TimeoutException te = setRuleForDependencyTimeout();
when(dynamoDbClient.scan(any(ScanRequest.class))).thenReturn(mockScanFuture);
when(mockScanFuture.get(anyLong(), any())).thenReturn(ScanResponse.builder().items(Collections.emptyMap()).build());
when(leaseSerializer.fromDynamoRecord(any())).thenReturn(lease);
when(leaseSerializer.getDynamoHashKey(any(Lease.class))).thenReturn(Collections.emptyMap());
when(dynamoDbClient.deleteItem(any(DeleteItemRequest.class))).thenReturn(mockDeleteFuture);
when(mockDeleteFuture.get(anyLong(), any())).thenThrow(te);
verifyCancel(mockDeleteFuture, () -> leaseRefresher.deleteAll());
}
@Test
public void testDeleteLeaseTimesOut() throws Exception {
TimeoutException te = setRuleForDependencyTimeout();
when(leaseSerializer.getDynamoHashKey(any(Lease.class))).thenReturn(Collections.emptyMap());
when(dynamoDbClient.deleteItem(any(DeleteItemRequest.class))).thenReturn(mockDeleteFuture);
when(mockDeleteFuture.get(anyLong(), any())).thenThrow(te);
verifyCancel(mockDeleteFuture, () -> leaseRefresher.deleteLease(lease));
}
@Test
public void testLeaseTableExistsTimesOut() throws Exception {
TimeoutException te = setRuleForDependencyTimeout();
when(dynamoDbClient.describeTable(any(DescribeTableRequest.class))).thenReturn(mockDescribeTableFuture);
when(mockDescribeTableFuture.get(anyLong(), any())).thenThrow(te);
verifyCancel(mockDescribeTableFuture, () -> leaseRefresher.leaseTableExists());
}
@Test
public void testCreateLeaseTableTimesOut() throws Exception {
TimeoutException te = setRuleForDependencyTimeout();
when(dynamoDbClient.describeTable(any(DescribeTableRequest.class))).thenReturn(mockDescribeTableFuture);
when(mockDescribeTableFuture.get(anyLong(), any()))
.thenThrow(ResourceNotFoundException.builder().message("Table doesn't exist").build());
when(dynamoDbClient.createTable(any(CreateTableRequest.class))).thenReturn(mockCreateTableFuture);
when(mockCreateTableFuture.get(anyLong(), any())).thenThrow(te);
verifyCancel(mockCreateTableFuture, () -> leaseRefresher.createLeaseTableIfNotExists(10L, 10L));
}
@FunctionalInterface
private interface TestCaller {
void call() throws Exception;
}
private void verifyCancel(Future<?> future, TestCaller toExecute) throws Exception {
try {
toExecute.call();
} finally {
verify(future).cancel(anyBoolean());
}
}
private void setupUpdateItemTest() throws Exception {
TimeoutException te = setRuleForDependencyTimeout();
when(leaseSerializer.getDynamoHashKey(any(Lease.class))).thenReturn(Collections.emptyMap());
when(leaseSerializer.getDynamoLeaseCounterExpectation(any(Lease.class))).thenReturn(Collections.emptyMap());
when(leaseSerializer.getDynamoLeaseCounterUpdate(any(Lease.class))).thenReturn(Collections.emptyMap());
when(leaseSerializer.getDynamoTakeLeaseUpdate(any(), anyString())).thenReturn(Collections.emptyMap());
when(dynamoDbClient.updateItem(any(UpdateItemRequest.class))).thenReturn(mockUpdateFuture);
when(mockUpdateFuture.get(anyLong(), any())).thenThrow(te);
}
private TimeoutException setRuleForDependencyTimeout() {
TimeoutException te = new TimeoutException("Timeout");
expectedException.expect(DependencyException.class);
expectedException.expectCause(equalTo(te));
return te;
}
}

View file

@ -14,12 +14,14 @@
*/
package software.amazon.kinesis.retrieval.polling;
import static org.hamcrest.CoreMatchers.isA;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@ -35,6 +37,8 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.junit.Before;
@ -66,6 +70,7 @@ import software.amazon.kinesis.metrics.NullMetricsFactory;
import software.amazon.kinesis.processor.Checkpointer;
import software.amazon.kinesis.retrieval.DataFetcherResult;
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
import software.amazon.kinesis.retrieval.RetryableRetrievalException;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
/**
@ -76,12 +81,12 @@ public class KinesisDataFetcherTest {
private static final int MAX_RECORDS = 1;
private static final String STREAM_NAME = "streamName";
private static final String SHARD_ID = "shardId-1";
private static final InitialPositionInStreamExtended INITIAL_POSITION_LATEST =
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST);
private static final InitialPositionInStreamExtended INITIAL_POSITION_TRIM_HORIZON =
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON);
private static final InitialPositionInStreamExtended INITIAL_POSITION_AT_TIMESTAMP =
InitialPositionInStreamExtended.newInitialPositionAtTimestamp(new Date(1000));
private static final InitialPositionInStreamExtended INITIAL_POSITION_LATEST = InitialPositionInStreamExtended
.newInitialPosition(InitialPositionInStream.LATEST);
private static final InitialPositionInStreamExtended INITIAL_POSITION_TRIM_HORIZON = InitialPositionInStreamExtended
.newInitialPosition(InitialPositionInStream.TRIM_HORIZON);
private static final InitialPositionInStreamExtended INITIAL_POSITION_AT_TIMESTAMP = InitialPositionInStreamExtended
.newInitialPositionAtTimestamp(new Date(1000));
private static final MetricsFactory NULL_METRICS_FACTORY = new NullMetricsFactory();
private KinesisDataFetcher kinesisDataFetcher;
@ -90,13 +95,16 @@ public class KinesisDataFetcherTest {
private KinesisAsyncClient kinesisClient;
@Mock
private CompletableFuture<GetRecordsResponse> getRecordsResponseFuture;
@Mock
private CompletableFuture<GetShardIteratorResponse> getShardIteratorResponseFuture;
@Rule
public ExpectedException expectedExceptionRule = ExpectedException.none();
@Before
public void setup() {
kinesisDataFetcher = new KinesisDataFetcher(kinesisClient, STREAM_NAME, SHARD_ID, MAX_RECORDS, NULL_METRICS_FACTORY);
kinesisDataFetcher = new KinesisDataFetcher(kinesisClient, STREAM_NAME, SHARD_ID, MAX_RECORDS,
NULL_METRICS_FACTORY);
}
/**
@ -104,8 +112,7 @@ public class KinesisDataFetcherTest {
*/
@Test
public final void testInitializeLatest() throws Exception {
testInitializeAndFetch(ShardIteratorType.LATEST.toString(),
ShardIteratorType.LATEST.toString(),
testInitializeAndFetch(ShardIteratorType.LATEST.toString(), ShardIteratorType.LATEST.toString(),
INITIAL_POSITION_LATEST);
}
@ -114,8 +121,7 @@ public class KinesisDataFetcherTest {
*/
@Test
public final void testInitializeTimeZero() throws Exception {
testInitializeAndFetch(ShardIteratorType.TRIM_HORIZON.toString(),
ShardIteratorType.TRIM_HORIZON.toString(),
testInitializeAndFetch(ShardIteratorType.TRIM_HORIZON.toString(), ShardIteratorType.TRIM_HORIZON.toString(),
INITIAL_POSITION_TRIM_HORIZON);
}
@ -124,12 +130,10 @@ public class KinesisDataFetcherTest {
*/
@Test
public final void testInitializeAtTimestamp() throws Exception {
testInitializeAndFetch(ShardIteratorType.AT_TIMESTAMP.toString(),
ShardIteratorType.AT_TIMESTAMP.toString(),
testInitializeAndFetch(ShardIteratorType.AT_TIMESTAMP.toString(), ShardIteratorType.AT_TIMESTAMP.toString(),
INITIAL_POSITION_AT_TIMESTAMP);
}
/**
* Test initialize() when a flushpoint exists.
*/
@ -149,8 +153,8 @@ public class KinesisDataFetcherTest {
private CompletableFuture<GetShardIteratorResponse> makeGetShardIteratorResonse(String shardIterator)
throws InterruptedException, ExecutionException {
return CompletableFuture.completedFuture(
GetShardIteratorResponse.builder().shardIterator(shardIterator).build());
return CompletableFuture
.completedFuture(GetShardIteratorResponse.builder().shardIterator(shardIterator).build());
}
@Test
@ -161,12 +165,11 @@ public class KinesisDataFetcherTest {
final String seqA = "123";
final String seqB = "456";
ArgumentCaptor<GetShardIteratorRequest> shardIteratorRequestCaptor =
ArgumentCaptor.forClass(GetShardIteratorRequest.class);
ArgumentCaptor<GetShardIteratorRequest> shardIteratorRequestCaptor = ArgumentCaptor
.forClass(GetShardIteratorRequest.class);
when(kinesisClient.getShardIterator(shardIteratorRequestCaptor.capture()))
.thenReturn(makeGetShardIteratorResonse(iteratorA))
.thenReturn(makeGetShardIteratorResonse(iteratorA))
.thenReturn(makeGetShardIteratorResonse(iteratorA)).thenReturn(makeGetShardIteratorResonse(iteratorA))
.thenReturn(makeGetShardIteratorResonse(iteratorB));
when(checkpoint.getCheckpoint(SHARD_ID)).thenReturn(new ExtendedSequenceNumber(seqA));
@ -198,16 +201,18 @@ public class KinesisDataFetcherTest {
@Test
public void testadvanceIteratorToTrimHorizonLatestAndAtTimestamp() throws InterruptedException, ExecutionException {
final ArgumentCaptor<GetShardIteratorRequest> requestCaptor = ArgumentCaptor.forClass(GetShardIteratorRequest.class);
final ArgumentCaptor<GetShardIteratorRequest> requestCaptor = ArgumentCaptor
.forClass(GetShardIteratorRequest.class);
final String iteratorHorizon = "TRIM_HORIZON";
final String iteratorLatest = "LATEST";
final String iteratorAtTimestamp = "AT_TIMESTAMP";
final Map<ShardIteratorType, GetShardIteratorRequest> requestsMap = Arrays.stream(
new String[] {iteratorHorizon, iteratorLatest, iteratorAtTimestamp})
final Map<ShardIteratorType, GetShardIteratorRequest> requestsMap = Arrays
.stream(new String[] { iteratorHorizon, iteratorLatest, iteratorAtTimestamp })
.map(this::makeGetShardIteratorRequest)
.collect(Collectors.toMap(r -> ShardIteratorType.valueOf(r.shardIteratorTypeAsString()), r -> r));
GetShardIteratorRequest tsReq = requestsMap.get(ShardIteratorType.AT_TIMESTAMP);
requestsMap.put(ShardIteratorType.AT_TIMESTAMP, tsReq.toBuilder().timestamp(INITIAL_POSITION_AT_TIMESTAMP.getTimestamp().toInstant()).build());
requestsMap.put(ShardIteratorType.AT_TIMESTAMP,
tsReq.toBuilder().timestamp(INITIAL_POSITION_AT_TIMESTAMP.getTimestamp().toInstant()).build());
when(kinesisClient.getShardIterator(requestCaptor.capture()))
.thenReturn(makeGetShardIteratorResonse(iteratorHorizon))
@ -238,14 +243,15 @@ public class KinesisDataFetcherTest {
}
@Test
public void testGetRecordsWithResourceNotFoundException() throws InterruptedException, ExecutionException {
final ArgumentCaptor<GetShardIteratorRequest> iteratorCaptor =
ArgumentCaptor.forClass(GetShardIteratorRequest.class);
public void testGetRecordsWithResourceNotFoundException() throws Exception {
final ArgumentCaptor<GetShardIteratorRequest> iteratorCaptor = ArgumentCaptor
.forClass(GetShardIteratorRequest.class);
final ArgumentCaptor<GetRecordsRequest> recordsCaptor = ArgumentCaptor.forClass(GetRecordsRequest.class);
// Set up arguments used by proxy
final String nextIterator = "TestShardIterator";
final GetShardIteratorRequest expectedIteratorRequest = makeGetShardIteratorRequest(ShardIteratorType.LATEST.name());
final GetShardIteratorRequest expectedIteratorRequest = makeGetShardIteratorRequest(
ShardIteratorType.LATEST.name());
final GetRecordsRequest expectedRecordsRequest = makeGetRecordsRequest(nextIterator);
final CompletableFuture<GetRecordsResponse> future = mock(CompletableFuture.class);
@ -254,13 +260,13 @@ public class KinesisDataFetcherTest {
when(kinesisClient.getShardIterator(iteratorCaptor.capture()))
.thenReturn(makeGetShardIteratorResonse(nextIterator));
when(kinesisClient.getRecords(recordsCaptor.capture())).thenReturn(future);
when(future.get()).thenThrow(
when(future.get(anyLong(), any(TimeUnit.class))).thenThrow(
new ExecutionException(ResourceNotFoundException.builder().message("Test Exception").build()));
// Create data fectcher and initialize it with latest type checkpoint
kinesisDataFetcher.initialize(SentinelCheckpoint.LATEST.toString(), INITIAL_POSITION_LATEST);
final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy =
new SynchronousGetRecordsRetrievalStrategy(kinesisDataFetcher);
final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy = new SynchronousGetRecordsRetrievalStrategy(
kinesisDataFetcher);
try {
// Call records of dataFetcher which will throw an exception
getRecordsRetrievalStrategy.getRecords(MAX_RECORDS);
@ -283,7 +289,7 @@ public class KinesisDataFetcherTest {
// Set up proxy mock methods
when(kinesisClient.getShardIterator(any(GetShardIteratorRequest.class))).thenReturn(getShardIteratorFuture);
when(kinesisClient.getRecords(any(GetRecordsRequest.class))).thenReturn(getRecordsResponseFuture);
when(getRecordsResponseFuture.get())
when(getRecordsResponseFuture.get(anyLong(), any(TimeUnit.class)))
.thenThrow(new ExecutionException(SdkException.builder().message("Test Exception").build()));
// Create data fectcher and initialize it with latest type checkpoint
@ -295,14 +301,15 @@ public class KinesisDataFetcherTest {
getRecordsRetrievalStrategy.getRecords(MAX_RECORDS);
}
@Test
public void testNonNullGetRecords() throws InterruptedException, ExecutionException {
public void testNonNullGetRecords() throws Exception {
final String nextIterator = "TestIterator";
final ArgumentCaptor<GetShardIteratorRequest> iteratorCaptor =
ArgumentCaptor.forClass(GetShardIteratorRequest.class);
final ArgumentCaptor<GetShardIteratorRequest> iteratorCaptor = ArgumentCaptor
.forClass(GetShardIteratorRequest.class);
final ArgumentCaptor<GetRecordsRequest> recordsCaptor = ArgumentCaptor.forClass(GetRecordsRequest.class);
final GetShardIteratorRequest expectedIteratorRequest = makeGetShardIteratorRequest(ShardIteratorType.LATEST.name());
final GetShardIteratorRequest expectedIteratorRequest = makeGetShardIteratorRequest(
ShardIteratorType.LATEST.name());
final GetRecordsRequest expectedRecordsRequest = makeGetRecordsRequest(nextIterator);
final CompletableFuture<GetRecordsResponse> future = mock(CompletableFuture.class);
@ -310,7 +317,7 @@ public class KinesisDataFetcherTest {
when(kinesisClient.getShardIterator(iteratorCaptor.capture()))
.thenReturn(makeGetShardIteratorResonse(nextIterator));
when(kinesisClient.getRecords(recordsCaptor.capture())).thenReturn(future);
when(future.get()).thenThrow(
when(future.get(anyLong(), any(TimeUnit.class))).thenThrow(
new ExecutionException(ResourceNotFoundException.builder().message("Test Exception").build()));
kinesisDataFetcher.initialize(SentinelCheckpoint.LATEST.toString(), INITIAL_POSITION_LATEST);
@ -322,23 +329,23 @@ public class KinesisDataFetcherTest {
}
private CompletableFuture<GetRecordsResponse> makeGetRecordsResponse(String nextIterator, List<Record> records)
throws InterruptedException, ExecutionException{
throws InterruptedException, ExecutionException {
return CompletableFuture.completedFuture(GetRecordsResponse.builder().nextShardIterator(nextIterator)
.records(CollectionUtils.isNullOrEmpty(records) ? Collections.emptyList() : records)
.build());
.records(CollectionUtils.isNullOrEmpty(records) ? Collections.emptyList() : records).build());
}
@Test
public void testFetcherDoesNotAdvanceWithoutAccept() throws InterruptedException, ExecutionException {
final ArgumentCaptor<GetShardIteratorRequest> iteratorCaptor =
ArgumentCaptor.forClass(GetShardIteratorRequest.class);
final ArgumentCaptor <GetRecordsRequest> recordsCaptor = ArgumentCaptor.forClass(GetRecordsRequest.class);
final ArgumentCaptor<GetShardIteratorRequest> iteratorCaptor = ArgumentCaptor
.forClass(GetShardIteratorRequest.class);
final ArgumentCaptor<GetRecordsRequest> recordsCaptor = ArgumentCaptor.forClass(GetRecordsRequest.class);
final String initialIterator = "InitialIterator";
final String nextIterator1 = "NextIteratorOne";
final String nextIterator2 = "NextIteratorTwo";
final CompletableFuture<GetRecordsResponse> nonAdvancingResult1 = makeGetRecordsResponse(initialIterator, null);
final CompletableFuture<GetRecordsResponse> nonAdvancingResult2 = makeGetRecordsResponse(nextIterator1, null);
final CompletableFuture<GetRecordsResponse> finalNonAdvancingResult = makeGetRecordsResponse(nextIterator2, null);
final CompletableFuture<GetRecordsResponse> finalNonAdvancingResult = makeGetRecordsResponse(nextIterator2,
null);
final CompletableFuture<GetRecordsResponse> advancingResult1 = makeGetRecordsResponse(nextIterator1, null);
final CompletableFuture<GetRecordsResponse> advancingResult2 = makeGetRecordsResponse(nextIterator2, null);
final CompletableFuture<GetRecordsResponse> finalAdvancingResult = makeGetRecordsResponse(null, null);
@ -360,8 +367,6 @@ public class KinesisDataFetcherTest {
assertNoAdvance(finalNonAdvancingResult.get(), nextIterator2);
assertAdvanced(finalAdvancingResult.get(), nextIterator2, null);
verify(kinesisClient, times(2)).getRecords(eq(makeGetRecordsRequest(initialIterator)));
verify(kinesisClient, times(2)).getRecords(eq(makeGetRecordsRequest(nextIterator1)));
verify(kinesisClient, times(2)).getRecords(eq(makeGetRecordsRequest(nextIterator2)));
@ -380,12 +385,13 @@ public class KinesisDataFetcherTest {
verify(kinesisClient, never()).getRecords(any(GetRecordsRequest.class));
}
@Test
@Ignore
public void testRestartIterator() throws InterruptedException, ExecutionException {
public void testRestartIterator() throws Exception {
GetRecordsResponse getRecordsResult = mock(GetRecordsResponse.class);
GetRecordsResponse restartGetRecordsResponse = makeGetRecordsResponse(null, null).get();
GetRecordsResponse restartGetRecordsResponse = makeGetRecordsResponse(null, null).get(anyLong(),
any(TimeUnit.class));
Record record = mock(Record.class);
final String nextShardIterator = "NextShardIterator";
final String sequenceNumber = "SequenceNumber";
@ -400,13 +406,53 @@ public class KinesisDataFetcherTest {
kinesisDataFetcher.restartIterator();
assertEquals(restartGetRecordsResponse, kinesisDataFetcher.getRecords().accept());
}
@Test (expected = IllegalStateException.class)
@Test(expected = IllegalStateException.class)
public void testRestartIteratorNotInitialized() {
kinesisDataFetcher.restartIterator();
}
private DataFetcherResult assertAdvanced(GetRecordsResponse expectedResult, String previousValue, String nextValue) {
@Test
public void testTimeoutExceptionIsRetryableForGetShardIterator() throws Exception {
expectedExceptionRule.expect(RetryableRetrievalException.class);
expectedExceptionRule.expectCause(isA(TimeoutException.class));
expectedExceptionRule.expectMessage("Timeout");
// Set up proxy mock methods
when(kinesisClient.getShardIterator(any(GetShardIteratorRequest.class)))
.thenReturn(getShardIteratorResponseFuture);
when(getShardIteratorResponseFuture.get(anyLong(), any(TimeUnit.class)))
.thenThrow(new TimeoutException("Timeout"));
// Create data fectcher and initialize it with latest type checkpoint
kinesisDataFetcher.initialize(SentinelCheckpoint.LATEST.toString(), INITIAL_POSITION_LATEST);
}
@Test
public void testTimeoutExceptionIsRetryableForGetRecords() throws Exception {
expectedExceptionRule.expect(RetryableRetrievalException.class);
expectedExceptionRule.expectCause(isA(TimeoutException.class));
expectedExceptionRule.expectMessage("Timeout");
CompletableFuture<GetShardIteratorResponse> getShardIteratorFuture = CompletableFuture
.completedFuture(GetShardIteratorResponse.builder().shardIterator("test").build());
// Set up proxy mock methods
when(kinesisClient.getShardIterator(any(GetShardIteratorRequest.class))).thenReturn(getShardIteratorFuture);
when(kinesisClient.getRecords(any(GetRecordsRequest.class))).thenReturn(getRecordsResponseFuture);
when(getRecordsResponseFuture.get(anyLong(), any(TimeUnit.class))).thenThrow(new TimeoutException("Timeout"));
// Create data fectcher and initialize it with latest type checkpoint
kinesisDataFetcher.initialize(SentinelCheckpoint.LATEST.toString(), INITIAL_POSITION_LATEST);
final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy = new SynchronousGetRecordsRetrievalStrategy(
kinesisDataFetcher);
// Call records of dataFetcher which will throw an exception
getRecordsRetrievalStrategy.getRecords(MAX_RECORDS);
}
private DataFetcherResult assertAdvanced(GetRecordsResponse expectedResult, String previousValue,
String nextValue) {
DataFetcherResult acceptResult = kinesisDataFetcher.getRecords();
assertEquals(expectedResult, acceptResult.getResult());
@ -436,18 +482,17 @@ public class KinesisDataFetcherTest {
return noAcceptResult;
}
private void testInitializeAndFetch(final String iteratorType,
final String seqNo,
final InitialPositionInStreamExtended initialPositionInStream) throws Exception {
final ArgumentCaptor<GetShardIteratorRequest> iteratorCaptor =
ArgumentCaptor.forClass(GetShardIteratorRequest.class);
private void testInitializeAndFetch(final String iteratorType, final String seqNo,
final InitialPositionInStreamExtended initialPositionInStream) throws Exception {
final ArgumentCaptor<GetShardIteratorRequest> iteratorCaptor = ArgumentCaptor
.forClass(GetShardIteratorRequest.class);
final ArgumentCaptor<GetRecordsRequest> recordsCaptor = ArgumentCaptor.forClass(GetRecordsRequest.class);
final String iterator = "foo";
final List<Record> expectedRecords = Collections.emptyList();
GetShardIteratorRequest expectedIteratorRequest =
makeGetShardIteratorRequest(iteratorType);
GetShardIteratorRequest expectedIteratorRequest = makeGetShardIteratorRequest(iteratorType);
if (iteratorType.equals(ShardIteratorType.AT_TIMESTAMP.toString())) {
expectedIteratorRequest = expectedIteratorRequest.toBuilder().timestamp(initialPositionInStream.getTimestamp().toInstant()).build();
expectedIteratorRequest = expectedIteratorRequest.toBuilder()
.timestamp(initialPositionInStream.getTimestamp().toInstant()).build();
} else if (iteratorType.equals(ShardIteratorType.AT_SEQUENCE_NUMBER.toString())) {
expectedIteratorRequest = expectedIteratorRequest.toBuilder().startingSequenceNumber(seqNo).build();
}
@ -462,8 +507,8 @@ public class KinesisDataFetcherTest {
Checkpointer checkpoint = mock(Checkpointer.class);
when(checkpoint.getCheckpoint(SHARD_ID)).thenReturn(new ExtendedSequenceNumber(seqNo));
final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy =
new SynchronousGetRecordsRetrievalStrategy(kinesisDataFetcher);
final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy = new SynchronousGetRecordsRetrievalStrategy(
kinesisDataFetcher);
kinesisDataFetcher.initialize(seqNo, initialPositionInStream);
assertEquals(expectedRecords, getRecordsRetrievalStrategy.getRecords(MAX_RECORDS).records());

View file

@ -21,6 +21,7 @@ import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
@ -34,6 +35,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
@ -93,7 +95,7 @@ public class PrefetchRecordsPublisherIntegrationTest {
private InitialPositionInStreamExtended initialPosition;
@Before
public void setup() throws InterruptedException, ExecutionException {
public void setup() throws Exception {
records = new ArrayList<>();
dataFetcher = spy(new KinesisDataFetcherForTest(kinesisClient, streamName, shardId, MAX_RECORDS_PER_CALL));
getRecordsRetrievalStrategy = Mockito.spy(new SynchronousGetRecordsRetrievalStrategy(dataFetcher));
@ -101,7 +103,7 @@ public class PrefetchRecordsPublisherIntegrationTest {
CompletableFuture<GetShardIteratorResponse> future = mock(CompletableFuture.class);
when(extendedSequenceNumber.sequenceNumber()).thenReturn("LATEST");
when(future.get()).thenReturn(GetShardIteratorResponse.builder().shardIterator("TestIterator").build());
when(future.get(anyLong(), any(TimeUnit.class))).thenReturn(GetShardIteratorResponse.builder().shardIterator("TestIterator").build());
when(kinesisClient.getShardIterator(any(GetShardIteratorRequest.class))).thenReturn(future);
getRecordsCache = new PrefetchRecordsPublisher(MAX_SIZE,

View file

@ -38,11 +38,13 @@ import static software.amazon.kinesis.utils.ProcessRecordsInputMatcher.eqProcess
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@ -74,6 +76,7 @@ import software.amazon.kinesis.metrics.NullMetricsFactory;
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
import software.amazon.kinesis.retrieval.KinesisClientRecord;
import software.amazon.kinesis.retrieval.RecordsRetrieved;
import software.amazon.kinesis.retrieval.RetryableRetrievalException;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
/**
@ -254,6 +257,18 @@ public class PrefetchRecordsPublisherTest {
verify(dataFetcher).restartIterator();
}
@Test
public void testRetryableRetrievalExceptionContinues() {
GetRecordsResponse response = GetRecordsResponse.builder().millisBehindLatest(100L).records(Collections.emptyList()).build();
when(getRecordsRetrievalStrategy.getRecords(anyInt())).thenThrow(new RetryableRetrievalException("Timeout", new TimeoutException("Timeout"))).thenReturn(response);
getRecordsCache.start(sequenceNumber, initialPosition);
RecordsRetrieved records = getRecordsCache.getNextResult();
assertThat(records.processRecordsInput().millisBehindLatest(), equalTo(response.millisBehindLatest()));
}
@Test(timeout = 1000L)
public void testNoDeadlockOnFullQueue() {
//

View file

@ -20,7 +20,7 @@
<artifactId>amazon-kinesis-client-pom</artifactId>
<packaging>pom</packaging>
<name>Amazon Kinesis Client Library</name>
<version>2.1.2</version>
<version>2.1.3-SNAPSHOT</version>
<description>The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data
from Amazon Kinesis.
</description>
@ -31,7 +31,7 @@
</scm>
<properties>
<awssdk.version>2.4.0</awssdk.version>
<awssdk.version>2.5.10</awssdk.version>
</properties>
<licenses>