Merge pull request #94 from pfifer/time-based-iterators

Added Support for Time based iterators
This commit is contained in:
Justin Pfifer 2016-08-11 12:46:19 -07:00 committed by GitHub
commit 33e37ef10a
32 changed files with 1041 additions and 249 deletions

View file

@ -1,3 +1,3 @@
AmazonKinesisClientLibrary
Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.

View file

@ -93,7 +93,7 @@
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>DynamoDBLocal</artifactId>
<version>1.10.5.1</version>
<version>1.11.0.1</version>
<scope>test</scope>
</dependency>
</dependencies>

View file

@ -1,5 +1,5 @@
/*
* Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@ -31,5 +31,9 @@ public enum SentinelCheckpoint {
/**
* We've completely processed all records in this shard.
*/
SHARD_END;
SHARD_END,
/**
* Start from the record at or after the specified server-side timestamp.
*/
AT_TIMESTAMP
}

View file

@ -1,5 +1,5 @@
/*
* Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@ -19,14 +19,18 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
* This is used during initial application bootstrap (when a checkpoint doesn't exist for a shard or its parents).
*/
public enum InitialPositionInStream {
/**
* Start after the most recent data record (fetch new data).
*/
LATEST,
/**
* Start from the oldest available data record.
*/
TRIM_HORIZON;
TRIM_HORIZON,
/**
* Start from the record at or after the specified server-side timestamp.
*/
AT_TIMESTAMP
}

View file

@ -0,0 +1,78 @@
/*
* Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.util.Date;
/**
* Class that houses the entities needed to specify the position in the stream from where a new application should
* start.
*/
class InitialPositionInStreamExtended {
private final InitialPositionInStream position;
private final Date timestamp;
/**
* This is scoped as private to forbid callers from using it directly and to convey the intent to use the
* static methods instead.
*
* @param position One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. The Amazon Kinesis Client Library will start
* fetching records from this position when the application starts up if there are no checkpoints.
* If there are checkpoints, we will process records from the checkpoint position.
* @param timestamp The timestamp to use with the AT_TIMESTAMP value for initialPositionInStream.
*/
private InitialPositionInStreamExtended(final InitialPositionInStream position, final Date timestamp) {
this.position = position;
this.timestamp = timestamp;
}
/**
* Get the initial position in the stream where the application should start from.
*
* @return The initial position in stream.
*/
protected InitialPositionInStream getInitialPositionInStream() {
return this.position;
}
/**
* Get the timestamp from where we need to start the application.
* Valid only for initial position of type AT_TIMESTAMP, returns null for other positions.
*
* @return The timestamp from where we need to start the application.
*/
protected Date getTimestamp() {
return this.timestamp;
}
protected static InitialPositionInStreamExtended newInitialPosition(final InitialPositionInStream position) {
switch (position) {
case LATEST:
return new InitialPositionInStreamExtended(InitialPositionInStream.LATEST, null);
case TRIM_HORIZON:
return new InitialPositionInStreamExtended(InitialPositionInStream.TRIM_HORIZON, null);
default:
throw new IllegalArgumentException("Invalid InitialPosition: " + position);
}
}
protected static InitialPositionInStreamExtended newInitialPositionAtTimestamp(final Date timestamp) {
if (timestamp == null) {
throw new IllegalArgumentException("Timestamp must be specified for InitialPosition AT_TIMESTAMP");
}
return new InitialPositionInStreamExtended(InitialPositionInStream.AT_TIMESTAMP, timestamp);
}
}

View file

@ -1,5 +1,5 @@
/*
* Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@ -41,6 +41,7 @@ class InitializeTask implements ITask {
private final RecordProcessorCheckpointer recordProcessorCheckpointer;
// Back off for this interval if we encounter a problem (exception)
private final long backoffTimeMillis;
private final StreamConfig streamConfig;
/**
* Constructor.
@ -50,13 +51,15 @@ class InitializeTask implements ITask {
ICheckpoint checkpoint,
RecordProcessorCheckpointer recordProcessorCheckpointer,
KinesisDataFetcher dataFetcher,
long backoffTimeMillis) {
long backoffTimeMillis,
StreamConfig streamConfig) {
this.shardInfo = shardInfo;
this.recordProcessor = recordProcessor;
this.checkpoint = checkpoint;
this.recordProcessorCheckpointer = recordProcessorCheckpointer;
this.dataFetcher = dataFetcher;
this.backoffTimeMillis = backoffTimeMillis;
this.streamConfig = streamConfig;
}
/*
@ -74,7 +77,7 @@ class InitializeTask implements ITask {
LOG.debug("Initializing ShardId " + shardInfo.getShardId());
ExtendedSequenceNumber initialCheckpoint = checkpoint.getCheckpoint(shardInfo.getShardId());
dataFetcher.initialize(initialCheckpoint.getSequenceNumber());
dataFetcher.initialize(initialCheckpoint.getSequenceNumber(), streamConfig.getInitialPositionInStream());
recordProcessorCheckpointer.setLargestPermittedCheckpointValue(initialCheckpoint);
recordProcessorCheckpointer.setInitialCheckpointValue(initialCheckpoint);

View file

@ -14,6 +14,7 @@
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.util.Date;
import java.util.Set;
import com.amazonaws.ClientConfiguration;
@ -185,6 +186,7 @@ public class KinesisClientLibConfiguration {
private int maxLeasesToStealAtOneTime;
private int initialLeaseTableReadCapacity;
private int initialLeaseTableWriteCapacity;
private InitialPositionInStreamExtended initialPositionInStreamExtended;
/**
* Constructor.
@ -263,7 +265,6 @@ public class KinesisClientLibConfiguration {
* with a call to Amazon Kinesis before checkpointing for calls to
* {@link RecordProcessorCheckpointer#checkpoint(String)}
* @param regionName The region name for the service
*
*/
// CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 26 LINES
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 26 LINES
@ -330,6 +331,8 @@ public class KinesisClientLibConfiguration {
this.maxLeasesToStealAtOneTime = DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME;
this.initialLeaseTableReadCapacity = DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY;
this.initialLeaseTableWriteCapacity = DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY;
this.initialPositionInStreamExtended =
InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream);
}
// Check if value is positive, otherwise throw an exception
@ -580,6 +583,22 @@ public class KinesisClientLibConfiguration {
return initialLeaseTableWriteCapacity;
}
/**
* Keeping it protected to forbid outside callers from depending on this internal object.
* @return The initialPositionInStreamExtended object.
*/
protected InitialPositionInStreamExtended getInitialPositionInStreamExtended() {
return initialPositionInStreamExtended;
}
/**
* @return The timestamp from where we need to start the application.
* Valid only for initial position of type AT_TIMESTAMP, returns null for other positions.
*/
public Date getTimestampAtInitialPositionInStream() {
return initialPositionInStreamExtended.getTimestamp();
}
// CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 190 LINES
/**
* @param tableName name of the lease table in DynamoDB
@ -600,13 +619,25 @@ public class KinesisClientLibConfiguration {
}
/**
* @param initialPositionInStream One of LATEST or TRIM_HORIZON. The Amazon Kinesis Client Library will start
* fetching records from this position when the application starts up if there are no checkpoints. If there
* are checkpoints, we will process records from the checkpoint position.
* @param initialPositionInStream One of LATEST or TRIM_HORIZON. The Amazon Kinesis Client Library
* will start fetching records from this position when the application starts up if there are no checkpoints.
* If there are checkpoints, we will process records from the checkpoint position.
* @return KinesisClientLibConfiguration
*/
public KinesisClientLibConfiguration withInitialPositionInStream(InitialPositionInStream initialPositionInStream) {
this.initialPositionInStream = initialPositionInStream;
this.initialPositionInStreamExtended =
InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream);
return this;
}
/**
* @param timestamp The timestamp to use with the AT_TIMESTAMP value for initialPositionInStream.
* @return KinesisClientLibConfiguration
*/
public KinesisClientLibConfiguration withTimestampAtInitialPositionInStream(Date timestamp) {
this.initialPositionInStream = InitialPositionInStream.AT_TIMESTAMP;
this.initialPositionInStreamExtended = InitialPositionInStreamExtended.newInitialPositionAtTimestamp(timestamp);
return this;
}

View file

@ -1,5 +1,5 @@
/*
* Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@ -25,6 +25,8 @@ import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
import com.amazonaws.services.kinesis.clientlibrary.proxies.MetricsCollectingKinesisProxyDecorator;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import java.util.Date;
/**
* Used to get data from Amazon Kinesis. Tracks iterator state internally.
*/
@ -41,8 +43,7 @@ class KinesisDataFetcher {
/**
*
* @param kinesisProxy Kinesis proxy
* @param shardId shardId (we'll fetch data for this shard)
* @param checkpoint used to get current checkpoint from which to start fetching records
* @param shardInfo The shardInfo object.
*/
public KinesisDataFetcher(IKinesisProxy kinesisProxy, ShardInfo shardInfo) {
this.shardId = shardInfo.getShardId();
@ -83,17 +84,18 @@ class KinesisDataFetcher {
/**
* Initializes this KinesisDataFetcher's iterator based on the checkpointed sequence number.
* @param initialCheckpoint Current checkpoint sequence number for this shard.
*
* @param initialPositionInStream The initialPositionInStream.
*/
public void initialize(String initialCheckpoint) {
public void initialize(String initialCheckpoint, InitialPositionInStreamExtended initialPositionInStream) {
LOG.info("Initializing shard " + shardId + " with " + initialCheckpoint);
advanceIteratorTo(initialCheckpoint);
advanceIteratorTo(initialCheckpoint, initialPositionInStream);
isInitialized = true;
}
public void initialize(ExtendedSequenceNumber initialCheckpoint) {
public void initialize(ExtendedSequenceNumber initialCheckpoint,
InitialPositionInStreamExtended initialPositionInStream) {
LOG.info("Initializing shard " + shardId + " with " + initialCheckpoint.getSequenceNumber());
advanceIteratorTo(initialCheckpoint.getSequenceNumber());
advanceIteratorTo(initialCheckpoint.getSequenceNumber(), initialPositionInStream);
isInitialized = true;
}
@ -101,14 +103,17 @@ class KinesisDataFetcher {
* Advances this KinesisDataFetcher's internal iterator to be at the passed-in sequence number.
*
* @param sequenceNumber advance the iterator to the record at this sequence number.
* @param initialPositionInStream The initialPositionInStream.
*/
void advanceIteratorTo(String sequenceNumber) {
void advanceIteratorTo(String sequenceNumber, InitialPositionInStreamExtended initialPositionInStream) {
if (sequenceNumber == null) {
throw new IllegalArgumentException("SequenceNumber should not be null: shardId " + shardId);
} else if (sequenceNumber.equals(SentinelCheckpoint.LATEST.toString())) {
nextIterator = getIterator(ShardIteratorType.LATEST.toString(), null);
nextIterator = getIterator(ShardIteratorType.LATEST.toString());
} else if (sequenceNumber.equals(SentinelCheckpoint.TRIM_HORIZON.toString())) {
nextIterator = getIterator(ShardIteratorType.TRIM_HORIZON.toString(), null);
nextIterator = getIterator(ShardIteratorType.TRIM_HORIZON.toString());
} else if (sequenceNumber.equals(SentinelCheckpoint.AT_TIMESTAMP.toString())) {
nextIterator = getIterator(initialPositionInStream.getTimestamp());
} else if (sequenceNumber.equals(SentinelCheckpoint.SHARD_END.toString())) {
nextIterator = null;
} else {
@ -120,8 +125,8 @@ class KinesisDataFetcher {
}
/**
* @param iteratorType
* @param sequenceNumber
* @param iteratorType The iteratorType - either AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER.
* @param sequenceNumber The sequenceNumber.
*
* @return iterator or null if we catch a ResourceNotFound exception
*/
@ -139,6 +144,40 @@ class KinesisDataFetcher {
return iterator;
}
/**
* @param iteratorType The iteratorType - either TRIM_HORIZON or LATEST.
* @return iterator or null if we catch a ResourceNotFound exception
*/
private String getIterator(String iteratorType) {
String iterator = null;
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Calling getIterator for " + shardId + " and iterator type " + iteratorType);
}
iterator = kinesisProxy.getIterator(shardId, iteratorType);
} catch (ResourceNotFoundException e) {
LOG.info("Caught ResourceNotFoundException when getting an iterator for shard " + shardId, e);
}
return iterator;
}
/**
* @param timestamp The timestamp.
* @return iterator or null if we catch a ResourceNotFound exception
*/
private String getIterator(Date timestamp) {
String iterator = null;
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Calling getIterator for " + shardId + " and timestamp " + timestamp);
}
iterator = kinesisProxy.getIterator(shardId, timestamp);
} catch (ResourceNotFoundException e) {
LOG.info("Caught ResourceNotFoundException when getting an iterator for shard " + shardId, e);
}
return iterator;
}
/**
* @return the shardEndReached
*/

View file

@ -1,5 +1,5 @@
/*
* Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@ -259,8 +259,8 @@ class ProcessTask implements ITask {
* Advance the iterator to after the greatest processed sequence number (remembered by
* recordProcessorCheckpointer).
*/
dataFetcher.advanceIteratorTo(
recordProcessorCheckpointer.getLargestPermittedCheckpointValue().getSequenceNumber());
dataFetcher.advanceIteratorTo(recordProcessorCheckpointer.getLargestPermittedCheckpointValue()
.getSequenceNumber(), streamConfig.getInitialPositionInStream());
// Try a second time - if we fail this time, expose the failure.
try {

View file

@ -1,5 +1,5 @@
/*
* Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@ -252,7 +252,8 @@ class ShardConsumer {
checkpoint,
recordProcessorCheckpointer,
dataFetcher,
taskBackoffTimeMillis);
taskBackoffTimeMillis,
streamConfig);
break;
case PROCESSING:
nextTask =

View file

@ -1,5 +1,5 @@
/*
* Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@ -33,7 +33,7 @@ class ShardSyncTask implements ITask {
private final IKinesisProxy kinesisProxy;
private final ILeaseManager<KinesisClientLease> leaseManager;
private InitialPositionInStream initialPosition;
private InitialPositionInStreamExtended initialPosition;
private final boolean cleanupLeasesUponShardCompletion;
private final long shardSyncTaskIdleTimeMillis;
private final TaskType taskType = TaskType.SHARDSYNC;
@ -41,13 +41,13 @@ class ShardSyncTask implements ITask {
/**
* @param kinesisProxy Used to fetch information about the stream (e.g. shard list)
* @param leaseManager Used to fetch and create leases
* @param initialPosition One of LATEST or TRIM_HORIZON. Amazon Kinesis Client Library will start processing records
* from this point in the stream (when an application starts up for the first time) except for shards that
* already have a checkpoint (and their descendant shards).
* @param initialPositionInStream One of LATEST, TRIM_HORIZON or AT_TIMESTAMP. Amazon Kinesis Client Library will
* start processing records from this point in the stream (when an application starts up for the first time)
* except for shards that already have a checkpoint (and their descendant shards).
*/
ShardSyncTask(IKinesisProxy kinesisProxy,
ILeaseManager<KinesisClientLease> leaseManager,
InitialPositionInStream initialPositionInStream,
InitialPositionInStreamExtended initialPositionInStream,
boolean cleanupLeasesUponShardCompletion,
long shardSyncTaskIdleTimeMillis) {
this.kinesisProxy = kinesisProxy;

View file

@ -1,5 +1,5 @@
/*
* Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@ -42,7 +42,7 @@ class ShardSyncTaskManager {
private final ILeaseManager<KinesisClientLease> leaseManager;
private final IMetricsFactory metricsFactory;
private final ExecutorService executorService;
private final InitialPositionInStream initialPositionInStream;
private final InitialPositionInStreamExtended initialPositionInStream;
private boolean cleanupLeasesUponShardCompletion;
private final long shardSyncIdleTimeMillis;
@ -61,7 +61,7 @@ class ShardSyncTaskManager {
*/
ShardSyncTaskManager(final IKinesisProxy kinesisProxy,
final ILeaseManager<KinesisClientLease> leaseManager,
final InitialPositionInStream initialPositionInStream,
final InitialPositionInStreamExtended initialPositionInStream,
final boolean cleanupLeasesUponShardCompletion,
final long shardSyncIdleTimeMillis,
final IMetricsFactory metricsFactory,

View file

@ -1,5 +1,5 @@
/*
* Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@ -59,7 +59,7 @@ class ShardSyncer {
static synchronized void bootstrapShardLeases(IKinesisProxy kinesisProxy,
ILeaseManager<KinesisClientLease> leaseManager,
InitialPositionInStream initialPositionInStream,
InitialPositionInStreamExtended initialPositionInStream,
boolean cleanupLeasesOfCompletedShards)
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards);
@ -82,7 +82,7 @@ class ShardSyncer {
*/
static synchronized void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy,
ILeaseManager<KinesisClientLease> leaseManager,
InitialPositionInStream initialPositionInStream,
InitialPositionInStreamExtended initialPositionInStream,
boolean cleanupLeasesOfCompletedShards)
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards);
@ -106,7 +106,7 @@ class ShardSyncer {
// CHECKSTYLE:OFF CyclomaticComplexity
private static synchronized void syncShardLeases(IKinesisProxy kinesisProxy,
ILeaseManager<KinesisClientLease> leaseManager,
InitialPositionInStream initialPosition,
InitialPositionInStreamExtended initialPosition,
boolean cleanupLeasesOfCompletedShards)
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
List<Shard> shards = getShardList(kinesisProxy);
@ -327,15 +327,15 @@ class ShardSyncer {
* when persisting the leases in DynamoDB will ensure that we recover gracefully if we fail
* before creating all the leases.
*
* @param shardIds Set of all shardIds in Kinesis (we'll create new leases based on this set)
* @param shards List of all shards in Kinesis (we'll create new leases based on this set)
* @param currentLeases List of current leases
* @param initialPosition One of LATEST or TRIM_HORIZON. We'll start fetching records from that location in the
* shard (when an application starts up for the first time - and there are no checkpoints).
* @param initialPosition One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. We'll start fetching records from that
* location in the shard (when an application starts up for the first time - and there are no checkpoints).
* @return List of new leases to create sorted by starting sequenceNumber of the corresponding shard
*/
static List<KinesisClientLease> determineNewLeasesToCreate(List<Shard> shards,
List<KinesisClientLease> currentLeases,
InitialPositionInStream initialPosition) {
InitialPositionInStreamExtended initialPosition) {
Map<String, KinesisClientLease> shardIdToNewLeaseMap = new HashMap<String, KinesisClientLease>();
Map<String, Shard> shardIdToShardMapOfAllKinesisShards = constructShardIdToShardMap(shards);
@ -364,7 +364,32 @@ class ShardSyncer {
shardIdToShardMapOfAllKinesisShards,
shardIdToNewLeaseMap,
memoizationContext);
if (isDescendant) {
/**
* If the shard is a descendant and the specified initial position is AT_TIMESTAMP, then the
* checkpoint should be set to AT_TIMESTAMP, else to TRIM_HORIZON. For AT_TIMESTAMP, we will add a
* lease just like we do for TRIM_HORIZON. However we will only return back records with server-side
* timestamp at or after the specified initial position timestamp.
*
* Shard structure (each level depicts a stream segment):
* 0 1 2 3 4 5 - shards till epoch 102
* \ / \ / | |
* 6 7 4 5 - shards from epoch 103 - 205
* \ / | /\
* 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
*
* Current leases: empty set
*
* For the above example, suppose the initial position in stream is set to AT_TIMESTAMP with
* timestamp value 206. We will then create new leases for all the shards (with checkpoint set to
* AT_TIMESTAMP), including the ancestor shards with epoch less than 206. However as we begin
* processing the ancestor shards, their checkpoints would be updated to SHARD_END and their leases
* would then be deleted since they won't have records with server-side timestamp at/after 206. And
* after that we will begin processing the descendant shards with epoch at/after 206 and we will
* return the records that meet the timestamp requirement for these shards.
*/
if (isDescendant && !initialPosition.getInitialPositionInStream()
.equals(InitialPositionInStream.AT_TIMESTAMP)) {
newLease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON);
} else {
newLease.setCheckpoint(convertToCheckpoint(initialPosition));
@ -388,8 +413,10 @@ class ShardSyncer {
* Create leases for the ancestors of this shard as required.
* See javadoc of determineNewLeasesToCreate() for rules and example.
*
* @param shardIds Ancestors of these shards will be considered for addition into the new lease map
* @param shardIdsOfCurrentLeases
* @param shardId The shardId to check.
* @param initialPosition One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. We'll start fetching records from that
* location in the shard (when an application starts up for the first time - and there are no checkpoints).
* @param shardIdsOfCurrentLeases The shardIds for the current leases.
* @param shardIdToShardMapOfAllKinesisShards ShardId->Shard map containing all shards obtained via DescribeStream.
* @param shardIdToLeaseMapOfNewShards Add lease POJOs corresponding to ancestors to this map.
* @param memoizationContext Memoization of shards that have been evaluated as part of the evaluation
@ -397,7 +424,7 @@ class ShardSyncer {
*/
// CHECKSTYLE:OFF CyclomaticComplexity
static boolean checkIfDescendantAndAddNewLeasesForAncestors(String shardId,
InitialPositionInStream initialPosition,
InitialPositionInStreamExtended initialPosition,
Set<String> shardIdsOfCurrentLeases,
Map<String, Shard> shardIdToShardMapOfAllKinesisShards,
Map<String, KinesisClientLease> shardIdToLeaseMapOfNewShards,
@ -449,7 +476,9 @@ class ShardSyncer {
shardIdToLeaseMapOfNewShards.put(parentShardId, lease);
}
if (descendantParentShardIds.contains(parentShardId)) {
if (descendantParentShardIds.contains(parentShardId)
&& !initialPosition.getInitialPositionInStream()
.equals(InitialPositionInStream.AT_TIMESTAMP)) {
lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON);
} else {
lease.setCheckpoint(convertToCheckpoint(initialPosition));
@ -457,8 +486,13 @@ class ShardSyncer {
}
}
} else {
// This shard should be included, if the customer wants to process all records in the stream.
if (initialPosition.equals(InitialPositionInStream.TRIM_HORIZON)) {
// This shard should be included, if the customer wants to process all records in the stream or
// if the initial position is AT_TIMESTAMP. For AT_TIMESTAMP, we will add a lease just like we do
// for TRIM_HORIZON. However we will only return back records with server-side timestamp at or
// after the specified initial position timestamp.
if (initialPosition.getInitialPositionInStream().equals(InitialPositionInStream.TRIM_HORIZON)
|| initialPosition.getInitialPositionInStream()
.equals(InitialPositionInStream.AT_TIMESTAMP)) {
isDescendant = true;
}
}
@ -737,13 +771,15 @@ class ShardSyncer {
return openShards;
}
private static ExtendedSequenceNumber convertToCheckpoint(InitialPositionInStream position) {
private static ExtendedSequenceNumber convertToCheckpoint(InitialPositionInStreamExtended position) {
ExtendedSequenceNumber checkpoint = null;
if (position.equals(InitialPositionInStream.TRIM_HORIZON)) {
if (position.getInitialPositionInStream().equals(InitialPositionInStream.TRIM_HORIZON)) {
checkpoint = ExtendedSequenceNumber.TRIM_HORIZON;
} else if (position.equals(InitialPositionInStream.LATEST)) {
} else if (position.getInitialPositionInStream().equals(InitialPositionInStream.LATEST)) {
checkpoint = ExtendedSequenceNumber.LATEST;
} else if (position.getInitialPositionInStream().equals(InitialPositionInStream.AT_TIMESTAMP)) {
checkpoint = ExtendedSequenceNumber.AT_TIMESTAMP;
}
return checkpoint;

View file

@ -1,5 +1,5 @@
/*
* Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@ -42,7 +42,7 @@ class ShutdownTask implements ITask {
private final ShutdownReason reason;
private final IKinesisProxy kinesisProxy;
private final ILeaseManager<KinesisClientLease> leaseManager;
private final InitialPositionInStream initialPositionInStream;
private final InitialPositionInStreamExtended initialPositionInStream;
private final boolean cleanupLeasesOfCompletedShards;
private final TaskType taskType = TaskType.SHUTDOWN;
private final long backoffTimeMillis;
@ -56,7 +56,7 @@ class ShutdownTask implements ITask {
RecordProcessorCheckpointer recordProcessorCheckpointer,
ShutdownReason reason,
IKinesisProxy kinesisProxy,
InitialPositionInStream initialPositionInStream,
InitialPositionInStreamExtended initialPositionInStream,
boolean cleanupLeasesOfCompletedShards,
ILeaseManager<KinesisClientLease> leaseManager,
long backoffTimeMillis) {

View file

@ -1,5 +1,5 @@
/*
* Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@ -25,7 +25,7 @@ class StreamConfig {
private final int maxRecords;
private final long idleTimeInMilliseconds;
private final boolean callProcessRecordsEvenForEmptyRecordList;
private InitialPositionInStream initialPositionInStream;
private InitialPositionInStreamExtended initialPositionInStream;
private final boolean validateSequenceNumberBeforeCheckpointing;
/**
@ -42,7 +42,7 @@ class StreamConfig {
long idleTimeInMilliseconds,
boolean callProcessRecordsEvenForEmptyRecordList,
boolean validateSequenceNumberBeforeCheckpointing,
InitialPositionInStream initialPositionInStream) {
InitialPositionInStreamExtended initialPositionInStream) {
this.streamProxy = proxy;
this.maxRecords = maxRecords;
this.idleTimeInMilliseconds = idleTimeInMilliseconds;
@ -82,7 +82,7 @@ class StreamConfig {
/**
* @return the initialPositionInStream
*/
InitialPositionInStream getInitialPositionInStream() {
InitialPositionInStreamExtended getInitialPositionInStream() {
return initialPositionInStream;
}
@ -92,5 +92,4 @@ class StreamConfig {
boolean shouldValidateSequenceNumberBeforeCheckpointing() {
return validateSequenceNumberBeforeCheckpointing;
}
}

View file

@ -64,7 +64,7 @@ public class Worker implements Runnable {
private final String applicationName;
private final IRecordProcessorFactory recordProcessorFactory;
private final StreamConfig streamConfig;
private final InitialPositionInStream initialPosition;
private final InitialPositionInStreamExtended initialPosition;
private final ICheckpoint checkpointTracker;
private final long idleTimeInMilliseconds;
// Backoff time when polling to check if application has finished processing
@ -212,8 +212,8 @@ public class Worker implements Runnable {
config.getMaxRecords(), config.getIdleTimeBetweenReadsInMillis(),
config.shouldCallProcessRecordsEvenForEmptyRecordList(),
config.shouldValidateSequenceNumberBeforeCheckpointing(),
config.getInitialPositionInStream()),
config.getInitialPositionInStream(),
config.getInitialPositionInStreamExtended()),
config.getInitialPositionInStreamExtended(),
config.getParentShardPollIntervalMillis(),
config.getShardSyncIntervalMillis(),
config.shouldCleanupLeasesUponShardCompletion(),
@ -258,9 +258,9 @@ public class Worker implements Runnable {
* @param applicationName Name of the Kinesis application
* @param recordProcessorFactory Used to get record processor instances for processing data from shards
* @param streamConfig Stream configuration
* @param initialPositionInStream One of LATEST or TRIM_HORIZON. The KinesisClientLibrary will start fetching data
* from this location in the stream when an application starts up for the first time and there are no
* checkpoints. If there are checkpoints, we start from the checkpoint position.
* @param initialPositionInStream One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. The KinesisClientLibrary will start
* fetching data from this location in the stream when an application starts up for the first time and
* there are no checkpoints. If there are checkpoints, we start from the checkpoint position.
* @param parentShardPollIntervalMillis Wait for this long between polls to check if parent shards are done
* @param shardSyncIdleTimeMillis Time between tasks to sync leases and Kinesis shards
* @param cleanupLeasesUponShardCompletion Clean up shards we've finished processing (don't wait till they expire in
@ -277,7 +277,7 @@ public class Worker implements Runnable {
Worker(String applicationName,
IRecordProcessorFactory recordProcessorFactory,
StreamConfig streamConfig,
InitialPositionInStream initialPositionInStream,
InitialPositionInStreamExtended initialPositionInStream,
long parentShardPollIntervalMillis,
long shardSyncIdleTimeMillis,
boolean cleanupLeasesUponShardCompletion,
@ -946,8 +946,8 @@ public class Worker implements Runnable {
config.getIdleTimeBetweenReadsInMillis(),
config.shouldCallProcessRecordsEvenForEmptyRecordList(),
config.shouldValidateSequenceNumberBeforeCheckpointing(),
config.getInitialPositionInStream()),
config.getInitialPositionInStream(),
config.getInitialPositionInStreamExtended()),
config.getInitialPositionInStreamExtended(),
config.getParentShardPollIntervalMillis(),
config.getShardSyncIntervalMillis(),
config.shouldCleanupLeasesUponShardCompletion(),

View file

@ -1,5 +1,5 @@
/*
* Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@ -15,6 +15,7 @@
package com.amazonaws.services.kinesis.clientlibrary.proxies;
import java.nio.ByteBuffer;
import java.util.Date;
import java.util.List;
import java.util.Set;
@ -72,7 +73,16 @@ public interface IKinesisProxy {
/**
* Fetch a shard iterator from the specified position in the shard.
*
* This is to fetch a shard iterator for ShardIteratorType AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER which
* requires the starting sequence number.
*
* NOTE: Currently this method continues to fetch iterators for ShardIteratorTypes TRIM_HORIZON, LATEST,
* AT_SEQUENCE_NUMBER and AFTER_SEQUENCE_NUMBER.
* But this behavior will change in the next release, after which this method will only serve
* AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER ShardIteratorTypes.
* We recommend users who call this method directly to use the appropriate getIterator method based on the
* ShardIteratorType.
*
* @param shardId Shard id
* @param iteratorEnum one of: TRIM_HORIZON, LATEST, AT_SEQUENCE_NUMBER, AFTER_SEQUENCE_NUMBER
* @param sequenceNumber the sequence number - must be null unless iteratorEnum is AT_SEQUENCE_NUMBER or
@ -84,6 +94,31 @@ public interface IKinesisProxy {
String getIterator(String shardId, String iteratorEnum, String sequenceNumber)
throws ResourceNotFoundException, InvalidArgumentException;
/**
* Fetch a shard iterator from the specified position in the shard.
* This is to fetch a shard iterator for ShardIteratorType LATEST or TRIM_HORIZON which doesn't require a starting
* sequence number.
*
* @param shardId Shard id
* @param iteratorEnum Either TRIM_HORIZON or LATEST.
* @return shard iterator which can be used to read data from Kinesis.
* @throws ResourceNotFoundException The Kinesis stream or shard was not found
* @throws InvalidArgumentException Invalid input parameters
*/
String getIterator(String shardId, String iteratorEnum) throws ResourceNotFoundException, InvalidArgumentException;
/**
* Fetch a shard iterator from the specified position in the shard.
* This is to fetch a shard iterator for ShardIteratorType AT_TIMESTAMP which requires the timestamp field.
*
* @param shardId Shard id
* @param timestamp The timestamp.
* @return shard iterator which can be used to read data from Kinesis.
* @throws ResourceNotFoundException The Kinesis stream or shard was not found
* @throws InvalidArgumentException Invalid input parameters
*/
String getIterator(String shardId, Date timestamp) throws ResourceNotFoundException, InvalidArgumentException;
/**
* @param sequenceNumberForOrdering (optional) used for record ordering
* @param explicitHashKey optionally supplied transformation of partitionkey

View file

@ -1,5 +1,5 @@
/*
* Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@ -16,6 +16,7 @@ package com.amazonaws.services.kinesis.clientlibrary.proxies;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@ -40,6 +41,7 @@ import com.amazonaws.services.kinesis.model.PutRecordRequest;
import com.amazonaws.services.kinesis.model.PutRecordResult;
import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
import com.amazonaws.services.kinesis.model.StreamStatus;
/**
@ -263,12 +265,50 @@ public class KinesisProxy implements IKinesisProxyExtended {
*/
@Override
public String getIterator(String shardId, String iteratorType, String sequenceNumber) {
if (!iteratorType.equals(ShardIteratorType.AT_SEQUENCE_NUMBER.toString()) || !iteratorType.equals(
ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString())) {
LOG.info("This method should only be used for AT_SEQUENCE_NUMBER and AFTER_SEQUENCE_NUMBER "
+ "ShardIteratorTypes. For methods to use with other ShardIteratorTypes, see IKinesisProxy.java");
}
final GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
getShardIteratorRequest.setRequestCredentials(credentialsProvider.getCredentials());
getShardIteratorRequest.setStreamName(streamName);
getShardIteratorRequest.setShardId(shardId);
getShardIteratorRequest.setShardIteratorType(iteratorType);
getShardIteratorRequest.setStartingSequenceNumber(sequenceNumber);
getShardIteratorRequest.setTimestamp(null);
final GetShardIteratorResult response = client.getShardIterator(getShardIteratorRequest);
return response.getShardIterator();
}
/**
* {@inheritDoc}
*/
@Override
public String getIterator(String shardId, String iteratorType) {
final GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
getShardIteratorRequest.setRequestCredentials(credentialsProvider.getCredentials());
getShardIteratorRequest.setStreamName(streamName);
getShardIteratorRequest.setShardId(shardId);
getShardIteratorRequest.setShardIteratorType(iteratorType);
getShardIteratorRequest.setStartingSequenceNumber(null);
getShardIteratorRequest.setTimestamp(null);
final GetShardIteratorResult response = client.getShardIterator(getShardIteratorRequest);
return response.getShardIterator();
}
/**
* {@inheritDoc}
*/
@Override
public String getIterator(String shardId, Date timestamp) {
final GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
getShardIteratorRequest.setRequestCredentials(credentialsProvider.getCredentials());
getShardIteratorRequest.setStreamName(streamName);
getShardIteratorRequest.setShardId(shardId);
getShardIteratorRequest.setShardIteratorType(ShardIteratorType.AT_TIMESTAMP);
getShardIteratorRequest.setStartingSequenceNumber(null);
getShardIteratorRequest.setTimestamp(timestamp);
final GetShardIteratorResult response = client.getShardIterator(getShardIteratorRequest);
return response.getShardIterator();
}

View file

@ -1,5 +1,5 @@
/*
* Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@ -15,6 +15,7 @@
package com.amazonaws.services.kinesis.clientlibrary.proxies;
import java.nio.ByteBuffer;
import java.util.Date;
import java.util.List;
import java.util.Set;
@ -128,6 +129,40 @@ public class MetricsCollectingKinesisProxyDecorator implements IKinesisProxy {
}
}
/**
* {@inheritDoc}
*/
@Override
public String getIterator(String shardId, String iteratorEnum)
throws ResourceNotFoundException, InvalidArgumentException {
long startTime = System.currentTimeMillis();
boolean success = false;
try {
String response = other.getIterator(shardId, iteratorEnum);
success = true;
return response;
} finally {
MetricsHelper.addSuccessAndLatency(getIteratorMetric, startTime, success, MetricsLevel.DETAILED);
}
}
/**
* {@inheritDoc}
*/
@Override
public String getIterator(String shardId, Date timestamp)
throws ResourceNotFoundException, InvalidArgumentException {
long startTime = System.currentTimeMillis();
boolean success = false;
try {
String response = other.getIterator(shardId, timestamp);
success = true;
return response;
} finally {
MetricsHelper.addSuccessAndLatency(getIteratorMetric, startTime, success, MetricsLevel.DETAILED);
}
}
/**
* {@inheritDoc}
*/

View file

@ -1,5 +1,5 @@
/*
* Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@ -36,9 +36,10 @@ public class ExtendedSequenceNumber implements Comparable<ExtendedSequenceNumber
private final String sequenceNumber;
private final long subSequenceNumber;
// Define TRIM_HORIZON and LATEST to be less than all sequence numbers
// Define TRIM_HORIZON, LATEST, and AT_TIMESTAMP to be less than all sequence numbers
private static final BigInteger TRIM_HORIZON_BIG_INTEGER_VALUE = BigInteger.valueOf(-2);
private static final BigInteger LATEST_BIG_INTEGER_VALUE = BigInteger.valueOf(-1);
private static final BigInteger AT_TIMESTAMP_BIG_INTEGER_VALUE = BigInteger.valueOf(-3);
/**
* Special value for LATEST.
@ -58,6 +59,12 @@ public class ExtendedSequenceNumber implements Comparable<ExtendedSequenceNumber
public static final ExtendedSequenceNumber TRIM_HORIZON =
new ExtendedSequenceNumber(SentinelCheckpoint.TRIM_HORIZON.toString());
/**
* Special value for AT_TIMESTAMP.
*/
public static final ExtendedSequenceNumber AT_TIMESTAMP =
new ExtendedSequenceNumber(SentinelCheckpoint.AT_TIMESTAMP.toString());
/**
* Construct an ExtendedSequenceNumber. The sub-sequence number defaults to
* 0.
@ -87,7 +94,7 @@ public class ExtendedSequenceNumber implements Comparable<ExtendedSequenceNumber
* Compares this with another ExtendedSequenceNumber using these rules.
*
* SHARD_END is considered greatest
* TRIM_HORIZON and LATEST are considered less than sequence numbers
* TRIM_HORIZON, LATEST and AT_TIMESTAMP are considered less than sequence numbers
* sequence numbers are given their big integer value
*
* @param extendedSequenceNumber The ExtendedSequenceNumber to compare against
@ -183,8 +190,8 @@ public class ExtendedSequenceNumber implements Comparable<ExtendedSequenceNumber
/**
* Sequence numbers are converted, sentinels are given a value of -1. Note this method is only used after special
* logic associated with SHARD_END and the case of comparing two sentinel values has already passed, so we map
* sentinel values LATEST and TRIM_HORIZON to negative numbers so that they are considered less than sequence
* numbers.
* sentinel values LATEST, TRIM_HORIZON and AT_TIMESTAMP to negative numbers so that they are considered less than
* sequence numbers.
*
* @param sequenceNumber The string to convert to big integer value
* @return a BigInteger value representation of the sequenceNumber
@ -196,9 +203,11 @@ public class ExtendedSequenceNumber implements Comparable<ExtendedSequenceNumber
return LATEST_BIG_INTEGER_VALUE;
} else if (SentinelCheckpoint.TRIM_HORIZON.toString().equals(sequenceNumber)) {
return TRIM_HORIZON_BIG_INTEGER_VALUE;
} else if (SentinelCheckpoint.AT_TIMESTAMP.toString().equals(sequenceNumber)) {
return AT_TIMESTAMP_BIG_INTEGER_VALUE;
} else {
throw new IllegalArgumentException("Expected a string of digits, TRIM_HORIZON, or LATEST but received "
+ sequenceNumber);
throw new IllegalArgumentException("Expected a string of digits, TRIM_HORIZON, LATEST or AT_TIMESTAMP but "
+ "received " + sequenceNumber);
}
}

View file

@ -30,7 +30,6 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.SentinelCheckpoint;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.leases.exceptions.DependencyException;
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;

View file

@ -15,6 +15,10 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import junit.framework.Assert;
import org.junit.Test;
@ -31,6 +35,8 @@ import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorF
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
import com.google.common.collect.ImmutableSet;
import java.util.Date;
public class KinesisClientLibConfigurationTest {
private static final long INVALID_LONG = 0L;
private static final int INVALID_INT = 0;
@ -58,7 +64,7 @@ public class KinesisClientLibConfigurationTest {
new KinesisClientLibConfiguration(TEST_STRING,
TEST_STRING,
TEST_STRING,
null,
InitialPositionInStream.LATEST,
null,
null,
null,
@ -95,7 +101,7 @@ public class KinesisClientLibConfigurationTest {
new KinesisClientLibConfiguration(TEST_STRING,
TEST_STRING,
TEST_STRING,
null,
InitialPositionInStream.LATEST,
null,
null,
null,
@ -128,7 +134,7 @@ public class KinesisClientLibConfigurationTest {
new KinesisClientLibConfiguration(TEST_STRING,
TEST_STRING,
TEST_STRING,
null,
InitialPositionInStream.LATEST,
null,
null,
null,
@ -346,4 +352,50 @@ public class KinesisClientLibConfigurationTest {
// Operation dimension should always be there.
assertEquals(config.getMetricsEnabledDimensions(), ImmutableSet.of("Operation", "WorkerIdentifier"));
}
@Test
public void testKCLConfigurationWithInvalidInitialPositionInStream() {
KinesisClientLibConfiguration config;
try {
config = new KinesisClientLibConfiguration("TestApplication",
"TestStream",
null,
"TestWorker").withInitialPositionInStream(InitialPositionInStream.AT_TIMESTAMP);
fail("Should have thrown");
} catch (Exception e) {
assertTrue(e instanceof IllegalArgumentException);
}
try {
config = new KinesisClientLibConfiguration("TestApplication",
"TestStream",
null, "TestWorker").withTimestampAtInitialPositionInStream(null);
fail("Should have thrown");
} catch (Exception e) {
assertTrue(e instanceof IllegalArgumentException);
}
try {
Date timestamp = new Date(1000L);
config = new KinesisClientLibConfiguration("TestApplication",
"TestStream", null, "TestWorker").withTimestampAtInitialPositionInStream(timestamp);
assertEquals(config.getInitialPositionInStreamExtended().getInitialPositionInStream(),
InitialPositionInStream.AT_TIMESTAMP);
assertEquals(config.getInitialPositionInStreamExtended().getTimestamp(), timestamp);
} catch (Exception e) {
fail("Should not have thrown");
}
try {
config = new KinesisClientLibConfiguration("TestApplication",
"TestStream",
null,
"TestWorker").withInitialPositionInStream(InitialPositionInStream.LATEST);
assertEquals(config.getInitialPositionInStreamExtended().getInitialPositionInStream(),
InitialPositionInStream.LATEST);
assertNull(config.getInitialPositionInStreamExtended().getTimestamp());
} catch (Exception e) {
fail("Should not have thrown");
}
}
}

View file

@ -20,6 +20,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import org.junit.Assert;
@ -46,9 +47,14 @@ public class KinesisDataFetcherTest {
private static final int MAX_RECORDS = 1;
private static final String SHARD_ID = "shardId-1";
private static final String AFTER_SEQUENCE_NUMBER = ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString();
private static final String AT_SEQUENCE_NUMBER = ShardIteratorType.AT_SEQUENCE_NUMBER.toString();
private static final ShardInfo SHARD_INFO = new ShardInfo(SHARD_ID, null, null);
private static final InitialPositionInStreamExtended INITIAL_POSITION_LATEST =
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST);
private static final InitialPositionInStreamExtended INITIAL_POSITION_TRIM_HORIZON =
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON);
private static final InitialPositionInStreamExtended INITIAL_POSITION_AT_TIMESTAMP =
InitialPositionInStreamExtended.newInitialPositionAtTimestamp(new Date(1000));
/**
* @throws java.lang.Exception
@ -63,7 +69,9 @@ public class KinesisDataFetcherTest {
*/
@Test
public final void testInitializeLatest() throws Exception {
testInitializeAndFetch(ShardIteratorType.LATEST.toString(), ShardIteratorType.LATEST.toString());
testInitializeAndFetch(ShardIteratorType.LATEST.toString(),
ShardIteratorType.LATEST.toString(),
INITIAL_POSITION_LATEST);
}
/**
@ -71,15 +79,28 @@ public class KinesisDataFetcherTest {
*/
@Test
public final void testInitializeTimeZero() throws Exception {
testInitializeAndFetch(ShardIteratorType.TRIM_HORIZON.toString(), ShardIteratorType.TRIM_HORIZON.toString());
testInitializeAndFetch(ShardIteratorType.TRIM_HORIZON.toString(),
ShardIteratorType.TRIM_HORIZON.toString(),
INITIAL_POSITION_TRIM_HORIZON);
}
/**
* Test initialize() with the AT_TIMESTAMP iterator instruction
*/
@Test
public final void testInitializeAtTimestamp() throws Exception {
testInitializeAndFetch(ShardIteratorType.AT_TIMESTAMP.toString(),
ShardIteratorType.AT_TIMESTAMP.toString(),
INITIAL_POSITION_AT_TIMESTAMP);
}
/**
* Test initialize() when a flushpoint exists.
*/
@Test
public final void testInitializeFlushpoint() throws Exception {
testInitializeAndFetch("foo", "123");
testInitializeAndFetch("foo", "123", INITIAL_POSITION_LATEST);
}
/**
@ -87,7 +108,7 @@ public class KinesisDataFetcherTest {
*/
@Test(expected = IllegalArgumentException.class)
public final void testInitializeInvalid() throws Exception {
testInitializeAndFetch("foo", null);
testInitializeAndFetch("foo", null, INITIAL_POSITION_LATEST);
}
@Test
@ -114,31 +135,36 @@ public class KinesisDataFetcherTest {
when(kinesis.get(iteratorB, MAX_RECORDS)).thenReturn(outputB);
when(checkpoint.getCheckpoint(SHARD_ID)).thenReturn(new ExtendedSequenceNumber(seqA));
fetcher.initialize(seqA);
fetcher.initialize(seqA, null);
fetcher.advanceIteratorTo(seqA);
fetcher.advanceIteratorTo(seqA, null);
Assert.assertEquals(recordsA, fetcher.getRecords(MAX_RECORDS).getRecords());
fetcher.advanceIteratorTo(seqB);
fetcher.advanceIteratorTo(seqB, null);
Assert.assertEquals(recordsB, fetcher.getRecords(MAX_RECORDS).getRecords());
}
@Test
public void testadvanceIteratorToTrimHorizonAndLatest() {
public void testadvanceIteratorToTrimHorizonLatestAndAtTimestamp() {
IKinesisProxy kinesis = mock(IKinesisProxy.class);
KinesisDataFetcher fetcher = new KinesisDataFetcher(kinesis, SHARD_INFO);
String iteratorHorizon = "horizon";
when(kinesis.getIterator(SHARD_ID,
ShardIteratorType.TRIM_HORIZON.toString(), null)).thenReturn(iteratorHorizon);
fetcher.advanceIteratorTo(ShardIteratorType.TRIM_HORIZON.toString());
when(kinesis.getIterator(SHARD_ID, ShardIteratorType.TRIM_HORIZON.toString())).thenReturn(iteratorHorizon);
fetcher.advanceIteratorTo(ShardIteratorType.TRIM_HORIZON.toString(), INITIAL_POSITION_TRIM_HORIZON);
Assert.assertEquals(iteratorHorizon, fetcher.getNextIterator());
String iteratorLatest = "latest";
when(kinesis.getIterator(SHARD_ID, ShardIteratorType.LATEST.toString(), null)).thenReturn(iteratorLatest);
fetcher.advanceIteratorTo(ShardIteratorType.LATEST.toString());
when(kinesis.getIterator(SHARD_ID, ShardIteratorType.LATEST.toString())).thenReturn(iteratorLatest);
fetcher.advanceIteratorTo(ShardIteratorType.LATEST.toString(), INITIAL_POSITION_LATEST);
Assert.assertEquals(iteratorLatest, fetcher.getNextIterator());
Date timestamp = new Date(1000L);
String iteratorAtTimestamp = "AT_TIMESTAMP";
when(kinesis.getIterator(SHARD_ID, timestamp)).thenReturn(iteratorAtTimestamp);
fetcher.advanceIteratorTo(ShardIteratorType.AT_TIMESTAMP.toString(), INITIAL_POSITION_AT_TIMESTAMP);
Assert.assertEquals(iteratorAtTimestamp, fetcher.getNextIterator());
}
@Test
@ -149,12 +175,12 @@ public class KinesisDataFetcherTest {
// Set up proxy mock methods
KinesisProxy mockProxy = mock(KinesisProxy.class);
doReturn(nextIterator).when(mockProxy).getIterator(SHARD_ID, ShardIteratorType.LATEST.toString(), null);
doReturn(nextIterator).when(mockProxy).getIterator(SHARD_ID, ShardIteratorType.LATEST.toString());
doThrow(new ResourceNotFoundException("Test Exception")).when(mockProxy).get(nextIterator, maxRecords);
// Create data fectcher and initialize it with latest type checkpoint
KinesisDataFetcher dataFetcher = new KinesisDataFetcher(mockProxy, SHARD_INFO);
dataFetcher.initialize(SentinelCheckpoint.LATEST.toString());
dataFetcher.initialize(SentinelCheckpoint.LATEST.toString(), INITIAL_POSITION_LATEST);
// Call getRecords of dataFetcher which will throw an exception
dataFetcher.getRecords(maxRecords);
@ -162,24 +188,25 @@ public class KinesisDataFetcherTest {
Assert.assertTrue("Shard should reach the end", dataFetcher.isShardEndReached());
}
private void testInitializeAndFetch(String iteratorType, String seqNo) throws Exception {
private void testInitializeAndFetch(String iteratorType,
String seqNo,
InitialPositionInStreamExtended initialPositionInStream) throws Exception {
IKinesisProxy kinesis = mock(IKinesisProxy.class);
String iterator = "foo";
List<Record> expectedRecords = new ArrayList<Record>();
GetRecordsResult response = new GetRecordsResult();
response.setRecords(expectedRecords);
when(kinesis.getIterator(SHARD_ID, iteratorType, null)).thenReturn(iterator);
when(kinesis.getIterator(SHARD_ID, initialPositionInStream.getTimestamp())).thenReturn(iterator);
when(kinesis.getIterator(SHARD_ID, AT_SEQUENCE_NUMBER, seqNo)).thenReturn(iterator);
when(kinesis.getIterator(SHARD_ID, iteratorType)).thenReturn(iterator);
when(kinesis.get(iterator, MAX_RECORDS)).thenReturn(response);
ICheckpoint checkpoint = mock(ICheckpoint.class);
when(checkpoint.getCheckpoint(SHARD_ID)).thenReturn(new ExtendedSequenceNumber(seqNo));
KinesisDataFetcher fetcher = new KinesisDataFetcher(kinesis, SHARD_INFO);
fetcher.initialize(seqNo);
fetcher.initialize(seqNo, initialPositionInStream);
List<Record> actualRecords = fetcher.getRecords(MAX_RECORDS).getRecords();
Assert.assertEquals(expectedRecords, actualRecords);

View file

@ -66,7 +66,8 @@ public class ProcessTaskTest {
private final boolean callProcessRecordsForEmptyRecordList = true;
// We don't want any of these tests to run checkpoint validation
private final boolean skipCheckpointValidationValue = false;
private final InitialPositionInStream initialPositionInStream = InitialPositionInStream.LATEST;
private static final InitialPositionInStreamExtended INITIAL_POSITION_LATEST =
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST);
private @Mock KinesisDataFetcher mockDataFetcher;
private @Mock IRecordProcessor mockRecordProcessor;
@ -84,7 +85,8 @@ public class ProcessTaskTest {
// Set up process task
final StreamConfig config =
new StreamConfig(null, maxRecords, idleTimeMillis, callProcessRecordsForEmptyRecordList,
skipCheckpointValidationValue, initialPositionInStream);
skipCheckpointValidationValue,
INITIAL_POSITION_LATEST);
final ShardInfo shardInfo = new ShardInfo(shardId, null, null);
processTask = new ProcessTask(
shardInfo, config, mockRecordProcessor, mockCheckpointer, mockDataFetcher, taskBackoffTimeMillis);

View file

@ -86,9 +86,9 @@ public class SequenceNumberValidatorTest {
IKinesisProxy proxy,
boolean validateWithGetIterator) {
String[] nonNumericStrings =
{ null, "bogus-sequence-number", SentinelCheckpoint.LATEST.toString(),
SentinelCheckpoint.SHARD_END.toString(), SentinelCheckpoint.TRIM_HORIZON.toString() };
String[] nonNumericStrings = { null, "bogus-sequence-number", SentinelCheckpoint.LATEST.toString(),
SentinelCheckpoint.SHARD_END.toString(), SentinelCheckpoint.TRIM_HORIZON.toString(),
SentinelCheckpoint.AT_TIMESTAMP.toString() };
for (String nonNumericString : nonNumericStrings) {
try {

View file

@ -18,6 +18,7 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
@ -33,6 +34,7 @@ import static org.mockito.Mockito.when;
import java.io.File;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.ListIterator;
import java.util.concurrent.ExecutionException;
@ -77,7 +79,8 @@ public class ShardConsumerTest {
private final boolean cleanupLeasesOfCompletedShards = true;
// We don't want any of these tests to run checkpoint validation
private final boolean skipCheckpointValidationValue = false;
private final InitialPositionInStream initialPositionInStream = InitialPositionInStream.LATEST;
private static final InitialPositionInStreamExtended INITIAL_POSITION_LATEST =
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST);
// Use Executors.newFixedThreadPool since it returns ThreadPoolExecutor, which is
// ... a non-final public class, and so can be mocked and spied.
@ -102,8 +105,7 @@ public class ShardConsumerTest {
1,
10,
callProcessRecordsForEmptyRecordList,
skipCheckpointValidationValue,
initialPositionInStream);
skipCheckpointValidationValue, INITIAL_POSITION_LATEST);
ShardConsumer consumer =
new ShardConsumer(shardInfo,
@ -153,8 +155,7 @@ public class ShardConsumerTest {
1,
10,
callProcessRecordsForEmptyRecordList,
skipCheckpointValidationValue,
initialPositionInStream);
skipCheckpointValidationValue, INITIAL_POSITION_LATEST);
ShardConsumer consumer =
new ShardConsumer(shardInfo,
@ -198,8 +199,7 @@ public class ShardConsumerTest {
1,
10,
callProcessRecordsForEmptyRecordList,
skipCheckpointValidationValue,
initialPositionInStream);
skipCheckpointValidationValue, INITIAL_POSITION_LATEST);
ShardConsumer consumer =
new ShardConsumer(shardInfo,
@ -287,8 +287,7 @@ public class ShardConsumerTest {
maxRecords,
idleTimeMS,
callProcessRecordsForEmptyRecordList,
skipCheckpointValidationValue,
initialPositionInStream);
skipCheckpointValidationValue, INITIAL_POSITION_LATEST);
ShardInfo shardInfo = new ShardInfo(streamShardId, testConcurrencyToken, null);
ShardConsumer consumer =
@ -334,12 +333,103 @@ public class ShardConsumerTest {
executorService.shutdown();
executorService.awaitTermination(60, TimeUnit.SECONDS);
String iterator = fileBasedProxy.getIterator(streamShardId, ShardIteratorType.TRIM_HORIZON.toString(), null);
String iterator = fileBasedProxy.getIterator(streamShardId, ShardIteratorType.TRIM_HORIZON.toString());
List<Record> expectedRecords = toUserRecords(fileBasedProxy.get(iterator, numRecs).getRecords());
verifyConsumedRecords(expectedRecords, processor.getProcessedRecords());
file.delete();
}
/**
* Test method for {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardConsumer#consumeShard()}
* that starts from initial position of type AT_TIMESTAMP.
*/
@Test
public final void testConsumeShardWithInitialPositionAtTimestamp() throws Exception {
int numRecs = 7;
BigInteger startSeqNum = BigInteger.ONE;
Date timestamp = new Date(KinesisLocalFileDataCreator.STARTING_TIMESTAMP + 3);
InitialPositionInStreamExtended atTimestamp =
InitialPositionInStreamExtended.newInitialPositionAtTimestamp(timestamp);
String streamShardId = "kinesis-0-0";
String testConcurrencyToken = "testToken";
File file =
KinesisLocalFileDataCreator.generateTempDataFile(1,
"kinesis-0-",
numRecs,
startSeqNum,
"unitTestSCT002");
IKinesisProxy fileBasedProxy = new KinesisLocalFileProxy(file.getAbsolutePath());
final int maxRecords = 2;
final int idleTimeMS = 0; // keep unit tests fast
ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString());
checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.AT_TIMESTAMP, testConcurrencyToken);
@SuppressWarnings("unchecked")
ILeaseManager<KinesisClientLease> leaseManager = mock(ILeaseManager.class);
when(leaseManager.getLease(anyString())).thenReturn(null);
TestStreamlet processor = new TestStreamlet();
StreamConfig streamConfig =
new StreamConfig(fileBasedProxy,
maxRecords,
idleTimeMS,
callProcessRecordsForEmptyRecordList,
skipCheckpointValidationValue,
atTimestamp);
ShardInfo shardInfo = new ShardInfo(streamShardId, testConcurrencyToken, null);
ShardConsumer consumer =
new ShardConsumer(shardInfo,
streamConfig,
checkpoint,
processor,
leaseManager,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
executorService,
metricsFactory,
taskBackoffTimeMillis);
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // check on parent shards
Thread.sleep(50L);
consumer.consumeShard(); // start initialization
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.INITIALIZING)));
consumer.consumeShard(); // initialize
Thread.sleep(50L);
// We expect to process all records in numRecs calls
for (int i = 0; i < numRecs;) {
boolean newTaskSubmitted = consumer.consumeShard();
if (newTaskSubmitted) {
LOG.debug("New processing task was submitted, call # " + i);
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.PROCESSING)));
// CHECKSTYLE:IGNORE ModifiedControlVariable FOR NEXT 1 LINES
i += maxRecords;
}
Thread.sleep(50L);
}
assertThat(processor.getShutdownReason(), nullValue());
consumer.beginShutdown();
Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.SHUTTING_DOWN)));
consumer.beginShutdown();
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.SHUTDOWN_COMPLETE)));
assertThat(processor.getShutdownReason(), is(equalTo(ShutdownReason.ZOMBIE)));
executorService.shutdown();
executorService.awaitTermination(60, TimeUnit.SECONDS);
String iterator = fileBasedProxy.getIterator(streamShardId, timestamp);
List<Record> expectedRecords = toUserRecords(fileBasedProxy.get(iterator, numRecs).getRecords());
verifyConsumedRecords(expectedRecords, processor.getProcessedRecords());
assertEquals(4, processor.getProcessedRecords().size());
file.delete();
}
//@formatter:off (gets the formatting wrong)
private void verifyConsumedRecords(List<Record> expectedRecords,
List<Record> actualRecords) {

View file

@ -120,8 +120,11 @@ public class ShardSyncTaskIntegrationTest {
}
leaseManager.deleteAll();
Set<String> shardIds = kinesisProxy.getAllShardIds();
ShardSyncTask syncTask =
new ShardSyncTask(kinesisProxy, leaseManager, InitialPositionInStream.LATEST, false, 0L);
ShardSyncTask syncTask = new ShardSyncTask(kinesisProxy,
leaseManager,
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST),
false,
0L);
syncTask.call();
List<KinesisClientLease> leases = leaseManager.listLeases();
Set<String> leaseKeys = new HashSet<String>();

View file

@ -18,6 +18,7 @@ import java.io.File;
import java.io.IOException;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -59,9 +60,14 @@ import junit.framework.Assert;
// CHECKSTYLE:IGNORE JavaNCSS FOR NEXT 800 LINES
public class ShardSyncerTest {
private static final Log LOG = LogFactory.getLog(ShardSyncer.class);
private final InitialPositionInStream latestPosition = InitialPositionInStream.LATEST;
private static final InitialPositionInStreamExtended INITIAL_POSITION_LATEST =
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST);
private static final InitialPositionInStreamExtended INITIAL_POSITION_TRIM_HORIZON =
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON);
private static final InitialPositionInStreamExtended INITIAL_POSITION_AT_TIMESTAMP =
InitialPositionInStreamExtended.newInitialPositionAtTimestamp(new Date(1000L));
private final boolean cleanupLeasesOfCompletedShards = true;
AmazonDynamoDB ddbClient = DynamoDBEmbedded.create();
AmazonDynamoDB ddbClient = DynamoDBEmbedded.create().amazonDynamoDB();
LeaseManager<KinesisClientLease> leaseManager = new KinesisClientLeaseManager("tempTestTable", ddbClient);
private static final int EXPONENT = 128;
/**
@ -111,8 +117,7 @@ public class ShardSyncerTest {
List<Shard> shards = new ArrayList<Shard>();
List<KinesisClientLease> leases = new ArrayList<KinesisClientLease>();
Assert.assertTrue(
ShardSyncer.determineNewLeasesToCreate(shards, leases, InitialPositionInStream.LATEST).isEmpty());
Assert.assertTrue(ShardSyncer.determineNewLeasesToCreate(shards, leases, INITIAL_POSITION_LATEST).isEmpty());
}
/**
@ -131,7 +136,7 @@ public class ShardSyncerTest {
shards.add(ShardObjectHelper.newShard(shardId1, null, null, sequenceRange));
List<KinesisClientLease> newLeases =
ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, InitialPositionInStream.LATEST);
ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_LATEST);
Assert.assertEquals(2, newLeases.size());
Set<String> expectedLeaseShardIds = new HashSet<String>();
expectedLeaseShardIds.add(shardId0);
@ -154,7 +159,7 @@ public class ShardSyncerTest {
public final void testBootstrapShardLeasesAtTrimHorizon()
throws DependencyException, InvalidStateException, ProvisionedThroughputException, IOException,
KinesisClientLibIOException {
testBootstrapShardLeasesAtStartingPosition(InitialPositionInStream.TRIM_HORIZON);
testBootstrapShardLeasesAtStartingPosition(INITIAL_POSITION_TRIM_HORIZON);
}
/**
@ -170,7 +175,7 @@ public class ShardSyncerTest {
public final void testBootstrapShardLeasesAtLatest()
throws DependencyException, InvalidStateException, ProvisionedThroughputException, IOException,
KinesisClientLibIOException {
testBootstrapShardLeasesAtStartingPosition(InitialPositionInStream.LATEST);
testBootstrapShardLeasesAtStartingPosition(INITIAL_POSITION_LATEST);
}
/**
@ -189,9 +194,7 @@ public class ShardSyncerTest {
dataFile.deleteOnExit();
IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath());
ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy,
leaseManager,
InitialPositionInStream.LATEST,
ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, INITIAL_POSITION_LATEST,
cleanupLeasesOfCompletedShards);
List<KinesisClientLease> newLeases = leaseManager.listLeases();
Set<String> expectedLeaseShardIds = new HashSet<String>();
@ -223,9 +226,7 @@ public class ShardSyncerTest {
dataFile.deleteOnExit();
IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath());
ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy,
leaseManager,
InitialPositionInStream.TRIM_HORIZON,
ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, INITIAL_POSITION_TRIM_HORIZON,
cleanupLeasesOfCompletedShards);
List<KinesisClientLease> newLeases = leaseManager.listLeases();
Set<String> expectedLeaseShardIds = new HashSet<String>();
@ -240,6 +241,37 @@ public class ShardSyncerTest {
dataFile.delete();
}
/**
* @throws KinesisClientLibIOException
* @throws DependencyException
* @throws InvalidStateException
* @throws ProvisionedThroughputException
* @throws IOException
*/
@Test
public final void testCheckAndCreateLeasesForNewShardsAtTimestamp()
throws KinesisClientLibIOException, DependencyException, InvalidStateException,
ProvisionedThroughputException, IOException {
List<Shard> shards = constructShardListForGraphA();
File dataFile = KinesisLocalFileDataCreator.generateTempDataFile(shards, 1, "testBootstrap1");
dataFile.deleteOnExit();
IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath());
ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, INITIAL_POSITION_AT_TIMESTAMP,
cleanupLeasesOfCompletedShards);
List<KinesisClientLease> newLeases = leaseManager.listLeases();
Set<String> expectedLeaseShardIds = new HashSet<String>();
for (int i = 0; i < 11; i++) {
expectedLeaseShardIds.add("shardId-" + i);
}
Assert.assertEquals(expectedLeaseShardIds.size(), newLeases.size());
for (KinesisClientLease lease1 : newLeases) {
Assert.assertTrue(expectedLeaseShardIds.contains(lease1.getLeaseKey()));
Assert.assertEquals(ExtendedSequenceNumber.AT_TIMESTAMP, lease1.getCheckpoint());
}
dataFile.delete();
}
/**
* @throws KinesisClientLibIOException
* @throws DependencyException
@ -259,9 +291,7 @@ public class ShardSyncerTest {
dataFile.deleteOnExit();
IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath());
ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy,
leaseManager,
InitialPositionInStream.TRIM_HORIZON,
ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, INITIAL_POSITION_TRIM_HORIZON,
cleanupLeasesOfCompletedShards);
dataFile.delete();
}
@ -275,9 +305,10 @@ public class ShardSyncerTest {
*/
@Test
public final void testCheckAndCreateLeasesForNewShardsAtTrimHorizonAndClosedShard()
throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException,
IOException {
testCheckAndCreateLeasesForNewShardsAtTrimHorizonAndClosedShardImpl(null, Integer.MAX_VALUE);
throws KinesisClientLibIOException, DependencyException, InvalidStateException,
ProvisionedThroughputException, IOException {
testCheckAndCreateLeasesForNewShardsAtSpecifiedPositionAndClosedShardImpl(null,
Integer.MAX_VALUE, INITIAL_POSITION_TRIM_HORIZON);
}
/**
@ -295,8 +326,8 @@ public class ShardSyncerTest {
// From the Shard Graph, the max count of calling could be 10
int maxCallingCount = 10;
for (int c = 1; c <= maxCallingCount; c = c + 2) {
testCheckAndCreateLeasesForNewShardsAtTrimHorizonAndClosedShardImpl(
ExceptionThrowingLeaseManagerMethods.DELETELEASE, c);
testCheckAndCreateLeasesForNewShardsAtSpecifiedPositionAndClosedShardImpl(
ExceptionThrowingLeaseManagerMethods.DELETELEASE, c, INITIAL_POSITION_TRIM_HORIZON);
// Need to clean up lease manager every time after calling ShardSyncer
leaseManager.deleteAll();
}
@ -317,8 +348,8 @@ public class ShardSyncerTest {
// From the Shard Graph, the max count of calling could be 10
int maxCallingCount = 10;
for (int c = 1; c <= maxCallingCount; c = c + 2) {
testCheckAndCreateLeasesForNewShardsAtTrimHorizonAndClosedShardImpl(
ExceptionThrowingLeaseManagerMethods.LISTLEASES, c);
testCheckAndCreateLeasesForNewShardsAtSpecifiedPositionAndClosedShardImpl(
ExceptionThrowingLeaseManagerMethods.LISTLEASES, c, INITIAL_POSITION_TRIM_HORIZON);
// Need to clean up lease manager every time after calling ShardSyncer
leaseManager.deleteAll();
}
@ -339,8 +370,8 @@ public class ShardSyncerTest {
// From the Shard Graph, the max count of calling could be 10
int maxCallingCount = 5;
for (int c = 1; c <= maxCallingCount; c = c + 2) {
testCheckAndCreateLeasesForNewShardsAtTrimHorizonAndClosedShardImpl(
ExceptionThrowingLeaseManagerMethods.CREATELEASEIFNOTEXISTS, c);
testCheckAndCreateLeasesForNewShardsAtSpecifiedPositionAndClosedShardImpl(
ExceptionThrowingLeaseManagerMethods.CREATELEASEIFNOTEXISTS, c,INITIAL_POSITION_TRIM_HORIZON);
// Need to clean up lease manager every time after calling ShardSyncer
leaseManager.deleteAll();
}
@ -352,7 +383,7 @@ public class ShardSyncerTest {
// 2). exceptionTime is a very big or negative value.
private void retryCheckAndCreateLeaseForNewShards(IKinesisProxy kinesisProxy,
ExceptionThrowingLeaseManagerMethods exceptionMethod,
int exceptionTime)
int exceptionTime, InitialPositionInStreamExtended position)
throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException {
if (exceptionMethod != null) {
ExceptionThrowingLeaseManager exceptionThrowingLeaseManager =
@ -364,7 +395,7 @@ public class ShardSyncerTest {
try {
ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy,
exceptionThrowingLeaseManager,
InitialPositionInStream.TRIM_HORIZON,
position,
cleanupLeasesOfCompletedShards);
return;
} catch (LeasingException e) {
@ -376,28 +407,116 @@ public class ShardSyncerTest {
} else {
ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy,
leaseManager,
InitialPositionInStream.TRIM_HORIZON,
position,
cleanupLeasesOfCompletedShards);
}
}
/**
* @throws KinesisClientLibIOException
* @throws DependencyException
* @throws InvalidStateException
* @throws ProvisionedThroughputException
* @throws IOException
*/
@Test
public final void testCheckAndCreateLeasesForNewShardsAtTimestampAndClosedShard()
throws KinesisClientLibIOException, DependencyException, InvalidStateException,
ProvisionedThroughputException, IOException {
testCheckAndCreateLeasesForNewShardsAtSpecifiedPositionAndClosedShardImpl(null,
Integer.MAX_VALUE, INITIAL_POSITION_AT_TIMESTAMP);
}
/**
* @throws KinesisClientLibIOException
* @throws DependencyException
* @throws InvalidStateException
* @throws ProvisionedThroughputException
* @throws IOException
*/
@Test
public final void testCheckAndCreateLeasesForNewShardsAtTimestampAndClosedShardWithDeleteLeaseExceptions()
throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException,
IOException {
// Define the max calling count for lease manager methods.
// From the Shard Graph, the max count of calling could be 10
int maxCallingCount = 10;
for (int c = 1; c <= maxCallingCount; c = c + 2) {
testCheckAndCreateLeasesForNewShardsAtSpecifiedPositionAndClosedShardImpl(
ExceptionThrowingLeaseManagerMethods.DELETELEASE,
c, INITIAL_POSITION_AT_TIMESTAMP);
// Need to clean up lease manager every time after calling ShardSyncer
leaseManager.deleteAll();
}
}
/**
* @throws KinesisClientLibIOException
* @throws DependencyException
* @throws InvalidStateException
* @throws ProvisionedThroughputException
* @throws IOException
*/
@Test
public final void testCheckAndCreateLeasesForNewShardsAtTimestampAndClosedShardWithListLeasesExceptions()
throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException,
IOException {
// Define the max calling count for lease manager methods.
// From the Shard Graph, the max count of calling could be 10
int maxCallingCount = 10;
for (int c = 1; c <= maxCallingCount; c = c + 2) {
testCheckAndCreateLeasesForNewShardsAtSpecifiedPositionAndClosedShardImpl(
ExceptionThrowingLeaseManagerMethods.LISTLEASES,
c, INITIAL_POSITION_AT_TIMESTAMP);
// Need to clean up lease manager every time after calling ShardSyncer
leaseManager.deleteAll();
}
}
/**
* @throws KinesisClientLibIOException
* @throws DependencyException
* @throws InvalidStateException
* @throws ProvisionedThroughputException
* @throws IOException
*/
@Test
public final void testCheckAndCreateLeasesForNewShardsAtTimestampAndClosedShardWithCreateLeaseExceptions()
throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException,
IOException {
// Define the max calling count for lease manager methods.
// From the Shard Graph, the max count of calling could be 10
int maxCallingCount = 5;
for (int c = 1; c <= maxCallingCount; c = c + 2) {
testCheckAndCreateLeasesForNewShardsAtSpecifiedPositionAndClosedShardImpl(
ExceptionThrowingLeaseManagerMethods.CREATELEASEIFNOTEXISTS,
c, INITIAL_POSITION_AT_TIMESTAMP);
// Need to clean up lease manager every time after calling ShardSyncer
leaseManager.deleteAll();
}
}
// Real implementation of testing CheckAndCreateLeasesForNewShards with different leaseManager types.
private void testCheckAndCreateLeasesForNewShardsAtTrimHorizonAndClosedShardImpl(
ExceptionThrowingLeaseManagerMethods exceptionMethod, int exceptionTime)
private void testCheckAndCreateLeasesForNewShardsAtSpecifiedPositionAndClosedShardImpl(
ExceptionThrowingLeaseManagerMethods exceptionMethod,
int exceptionTime,
InitialPositionInStreamExtended position)
throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException,
IOException {
ExtendedSequenceNumber extendedSequenceNumber =
new ExtendedSequenceNumber(position.getInitialPositionInStream().toString());
List<Shard> shards = constructShardListForGraphA();
File dataFile = KinesisLocalFileDataCreator.generateTempDataFile(shards, 2, "testBootstrap1");
dataFile.deleteOnExit();
IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath());
retryCheckAndCreateLeaseForNewShards(kinesisProxy, exceptionMethod, exceptionTime);
retryCheckAndCreateLeaseForNewShards(kinesisProxy, exceptionMethod, exceptionTime, position);
List<KinesisClientLease> newLeases = leaseManager.listLeases();
Map<String, ExtendedSequenceNumber> expectedShardIdToCheckpointMap =
new HashMap<String, ExtendedSequenceNumber>();
for (int i = 0; i < 11; i++) {
expectedShardIdToCheckpointMap.put("shardId-" + i, ExtendedSequenceNumber.TRIM_HORIZON);
expectedShardIdToCheckpointMap.put("shardId-" + i, extendedSequenceNumber);
}
Assert.assertEquals(expectedShardIdToCheckpointMap.size(), newLeases.size());
for (KinesisClientLease lease1 : newLeases) {
@ -415,7 +534,7 @@ public class ShardSyncerTest {
leaseManager.updateLease(childShardLease);
expectedShardIdToCheckpointMap.put(childShardLease.getLeaseKey(), new ExtendedSequenceNumber("34290"));
retryCheckAndCreateLeaseForNewShards(kinesisProxy, exceptionMethod, exceptionTime);
retryCheckAndCreateLeaseForNewShards(kinesisProxy, exceptionMethod, exceptionTime, position);
newLeases = leaseManager.listLeases();
Assert.assertEquals(expectedShardIdToCheckpointMap.size(), newLeases.size());
@ -449,11 +568,11 @@ public class ShardSyncerTest {
garbageLease.setCheckpoint(new ExtendedSequenceNumber("999"));
leaseManager.createLeaseIfNotExists(garbageLease);
Assert.assertEquals(garbageShardId, leaseManager.getLease(garbageShardId).getLeaseKey());
testBootstrapShardLeasesAtStartingPosition(InitialPositionInStream.LATEST);
testBootstrapShardLeasesAtStartingPosition(INITIAL_POSITION_LATEST);
Assert.assertNull(leaseManager.getLease(garbageShardId));
}
private void testBootstrapShardLeasesAtStartingPosition(InitialPositionInStream initialPosition)
private void testBootstrapShardLeasesAtStartingPosition(InitialPositionInStreamExtended initialPosition)
throws DependencyException, InvalidStateException, ProvisionedThroughputException, IOException,
KinesisClientLibIOException {
List<Shard> shards = new ArrayList<Shard>();
@ -463,7 +582,7 @@ public class ShardSyncerTest {
shards.add(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange));
String shardId1 = "shardId-1";
shards.add(ShardObjectHelper.newShard(shardId1, null, null, sequenceRange));
File dataFile = KinesisLocalFileDataCreator.generateTempDataFile(shards, 10, "testBootstrap1");
File dataFile = KinesisLocalFileDataCreator.generateTempDataFile(shards, 2, "testBootstrap1");
dataFile.deleteOnExit();
IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath());
@ -475,7 +594,8 @@ public class ShardSyncerTest {
expectedLeaseShardIds.add(shardId1);
for (KinesisClientLease lease1 : newLeases) {
Assert.assertTrue(expectedLeaseShardIds.contains(lease1.getLeaseKey()));
Assert.assertEquals(new ExtendedSequenceNumber(initialPosition.toString()), lease1.getCheckpoint());
Assert.assertEquals(new ExtendedSequenceNumber(initialPosition.getInitialPositionInStream().toString()),
lease1.getCheckpoint());
}
dataFile.delete();
}
@ -495,11 +615,11 @@ public class ShardSyncerTest {
String shardId1 = "shardId-1";
shards.add(ShardObjectHelper.newShard(shardId1, null, null, sequenceRange));
Set<InitialPositionInStream> initialPositions = new HashSet<InitialPositionInStream>();
initialPositions.add(InitialPositionInStream.LATEST);
initialPositions.add(InitialPositionInStream.TRIM_HORIZON);
Set<InitialPositionInStreamExtended> initialPositions = new HashSet<InitialPositionInStreamExtended>();
initialPositions.add(INITIAL_POSITION_LATEST);
initialPositions.add(INITIAL_POSITION_TRIM_HORIZON);
for (InitialPositionInStream initialPosition : initialPositions) {
for (InitialPositionInStreamExtended initialPosition : initialPositions) {
List<KinesisClientLease> newLeases =
ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, initialPosition);
Assert.assertEquals(2, newLeases.size());
@ -508,7 +628,8 @@ public class ShardSyncerTest {
expectedLeaseShardIds.add(shardId1);
for (KinesisClientLease lease : newLeases) {
Assert.assertTrue(expectedLeaseShardIds.contains(lease.getLeaseKey()));
Assert.assertEquals(new ExtendedSequenceNumber(initialPosition.toString()), lease.getCheckpoint());
Assert.assertEquals(new ExtendedSequenceNumber(initialPosition.getInitialPositionInStream().toString()),
lease.getCheckpoint());
}
}
}
@ -532,7 +653,7 @@ public class ShardSyncerTest {
ShardObjectHelper.newSequenceNumberRange("405", null)));
List<KinesisClientLease> newLeases =
ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, InitialPositionInStream.LATEST);
ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_LATEST);
Assert.assertEquals(1, newLeases.size());
Assert.assertEquals(lastShardId, newLeases.get(0).getLeaseKey());
}
@ -557,7 +678,7 @@ public class ShardSyncerTest {
currentLeases.add(newLease("shardId-5"));
List<KinesisClientLease> newLeases =
ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, InitialPositionInStream.LATEST);
ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_LATEST);
Map<String, ExtendedSequenceNumber> expectedShardIdCheckpointMap =
new HashMap<String, ExtendedSequenceNumber>();
expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.TRIM_HORIZON);
@ -595,7 +716,7 @@ public class ShardSyncerTest {
currentLeases.add(newLease("shardId-7"));
List<KinesisClientLease> newLeases =
ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, InitialPositionInStream.LATEST);
ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_LATEST);
Map<String, ExtendedSequenceNumber> expectedShardIdCheckpointMap =
new HashMap<String, ExtendedSequenceNumber>();
expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.TRIM_HORIZON);
@ -631,7 +752,7 @@ public class ShardSyncerTest {
currentLeases.add(newLease("shardId-5"));
List<KinesisClientLease> newLeases =
ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, InitialPositionInStream.TRIM_HORIZON);
ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_TRIM_HORIZON);
Map<String, ExtendedSequenceNumber> expectedShardIdCheckpointMap =
new HashMap<String, ExtendedSequenceNumber>();
expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.TRIM_HORIZON);
@ -671,7 +792,7 @@ public class ShardSyncerTest {
currentLeases.add(newLease("shardId-7"));
List<KinesisClientLease> newLeases =
ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, InitialPositionInStream.TRIM_HORIZON);
ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_TRIM_HORIZON);
Map<String, ExtendedSequenceNumber> expectedShardIdCheckpointMap =
new HashMap<String, ExtendedSequenceNumber>();
expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.TRIM_HORIZON);
@ -700,7 +821,7 @@ public class ShardSyncerTest {
List<Shard> shards = constructShardListForGraphB();
List<KinesisClientLease> currentLeases = new ArrayList<KinesisClientLease>();
List<KinesisClientLease> newLeases =
ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, InitialPositionInStream.TRIM_HORIZON);
ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_TRIM_HORIZON);
Map<String, ExtendedSequenceNumber> expectedShardIdCheckpointMap =
new HashMap<String, ExtendedSequenceNumber>();
for (int i = 0; i < 11; i++) {
@ -716,6 +837,110 @@ public class ShardSyncerTest {
}
}
/**
* Test CheckIfDescendantAndAddNewLeasesForAncestors (initial position AT_TIMESTAMP)
* Shard structure (each level depicts a stream segment):
* 0 1 2 3 4 5- shards till epoch 102
* \ / \ / | |
* 6 7 4 5- shards from epoch 103 - 205
* \ / | /\
* 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
* Current leases: (3, 4, 5)
*/
@Test
public final void testDetermineNewLeasesToCreateSplitMergeAtTimestamp1() {
List<Shard> shards = constructShardListForGraphA();
List<KinesisClientLease> currentLeases = new ArrayList<KinesisClientLease>();
currentLeases.add(newLease("shardId-3"));
currentLeases.add(newLease("shardId-4"));
currentLeases.add(newLease("shardId-5"));
List<KinesisClientLease> newLeases =
ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_AT_TIMESTAMP);
Map<String, ExtendedSequenceNumber> expectedShardIdCheckpointMap = new HashMap<String, ExtendedSequenceNumber>();
expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.AT_TIMESTAMP);
expectedShardIdCheckpointMap.put("shardId-9", ExtendedSequenceNumber.AT_TIMESTAMP);
expectedShardIdCheckpointMap.put("shardId-10", ExtendedSequenceNumber.AT_TIMESTAMP);
expectedShardIdCheckpointMap.put("shardId-6", ExtendedSequenceNumber.AT_TIMESTAMP);
expectedShardIdCheckpointMap.put("shardId-2", ExtendedSequenceNumber.AT_TIMESTAMP);
expectedShardIdCheckpointMap.put("shardId-7", ExtendedSequenceNumber.AT_TIMESTAMP);
expectedShardIdCheckpointMap.put("shardId-0", ExtendedSequenceNumber.AT_TIMESTAMP);
expectedShardIdCheckpointMap.put("shardId-1", ExtendedSequenceNumber.AT_TIMESTAMP);
Assert.assertEquals(expectedShardIdCheckpointMap.size(), newLeases.size());
for (KinesisClientLease lease : newLeases) {
Assert.assertTrue("Unexpected lease: " + lease,
expectedShardIdCheckpointMap.containsKey(lease.getLeaseKey()));
Assert.assertEquals(expectedShardIdCheckpointMap.get(lease.getLeaseKey()), lease.getCheckpoint());
}
}
/**
* Test CheckIfDescendantAndAddNewLeasesForAncestors (initial position AT_TIMESTAMP)
* Shard structure (each level depicts a stream segment):
* 0 1 2 3 4 5- shards till epoch 102
* \ / \ / | |
* 6 7 4 5- shards from epoch 103 - 205
* \ / | /\
* 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
* Current leases: (4, 5, 7)
*/
@Test
public final void testDetermineNewLeasesToCreateSplitMergeAtTimestamp2() {
List<Shard> shards = constructShardListForGraphA();
List<KinesisClientLease> currentLeases = new ArrayList<KinesisClientLease>();
currentLeases.add(newLease("shardId-4"));
currentLeases.add(newLease("shardId-5"));
currentLeases.add(newLease("shardId-7"));
List<KinesisClientLease> newLeases =
ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_AT_TIMESTAMP);
Map<String, ExtendedSequenceNumber> expectedShardIdCheckpointMap = new HashMap<String, ExtendedSequenceNumber>();
expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.AT_TIMESTAMP);
expectedShardIdCheckpointMap.put("shardId-9", ExtendedSequenceNumber.AT_TIMESTAMP);
expectedShardIdCheckpointMap.put("shardId-10", ExtendedSequenceNumber.AT_TIMESTAMP);
expectedShardIdCheckpointMap.put("shardId-6", ExtendedSequenceNumber.AT_TIMESTAMP);
expectedShardIdCheckpointMap.put("shardId-0", ExtendedSequenceNumber.AT_TIMESTAMP);
expectedShardIdCheckpointMap.put("shardId-1", ExtendedSequenceNumber.AT_TIMESTAMP);
Assert.assertEquals(expectedShardIdCheckpointMap.size(), newLeases.size());
for (KinesisClientLease lease : newLeases) {
Assert.assertTrue("Unexpected lease: " + lease,
expectedShardIdCheckpointMap.containsKey(lease.getLeaseKey()));
Assert.assertEquals(expectedShardIdCheckpointMap.get(lease.getLeaseKey()), lease.getCheckpoint());
}
}
/**
* Test CheckIfDescendantAndAddNewLeasesForAncestors (initial position AT_TIMESTAMP)
* For shard graph B (see the construct method doc for structure).
* Current leases: empty set
*/
@Test
public final void testDetermineNewLeasesToCreateGraphBNoInitialLeasesAtTimestamp() {
List<Shard> shards = constructShardListForGraphB();
List<KinesisClientLease> currentLeases = new ArrayList<KinesisClientLease>();
List<KinesisClientLease> newLeases =
ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_AT_TIMESTAMP);
Map<String, ExtendedSequenceNumber> expectedShardIdCheckpointMap =
new HashMap<String, ExtendedSequenceNumber>();
for (int i = 0; i < shards.size(); i++) {
String expectedShardId = "shardId-" + i;
expectedShardIdCheckpointMap.put(expectedShardId, ExtendedSequenceNumber.AT_TIMESTAMP);
}
Assert.assertEquals(expectedShardIdCheckpointMap.size(), newLeases.size());
for (KinesisClientLease lease : newLeases) {
Assert.assertTrue("Unexpected lease: " + lease,
expectedShardIdCheckpointMap.containsKey(lease.getLeaseKey()));
Assert.assertEquals(expectedShardIdCheckpointMap.get(lease.getLeaseKey()), lease.getCheckpoint());
}
}
/*
* Helper method to construct a shard list for graph A. Graph A is defined below.
* Shard structure (y-axis is epochs):
@ -808,8 +1033,7 @@ public class ShardSyncerTest {
@Test
public final void testCheckIfDescendantAndAddNewLeasesForAncestorsNullShardId() {
Map<String, Boolean> memoizationContext = new HashMap<>();
Assert.assertFalse(ShardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(null,
latestPosition,
Assert.assertFalse(ShardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(null, INITIAL_POSITION_LATEST,
null,
null,
null,
@ -824,8 +1048,7 @@ public class ShardSyncerTest {
String shardId = "shardId-trimmed";
Map<String, Shard> kinesisShards = new HashMap<String, Shard>();
Map<String, Boolean> memoizationContext = new HashMap<>();
Assert.assertFalse(ShardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(shardId,
latestPosition,
Assert.assertFalse(ShardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(shardId, INITIAL_POSITION_LATEST,
null,
kinesisShards,
null,
@ -844,8 +1067,7 @@ public class ShardSyncerTest {
shardIdsOfCurrentLeases.add(shardId);
Map<String, KinesisClientLease> newLeaseMap = new HashMap<String, KinesisClientLease>();
Map<String, Boolean> memoizationContext = new HashMap<>();
Assert.assertTrue(ShardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(shardId,
latestPosition,
Assert.assertTrue(ShardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(shardId, INITIAL_POSITION_LATEST,
shardIdsOfCurrentLeases,
kinesisShards,
newLeaseMap,
@ -872,8 +1094,7 @@ public class ShardSyncerTest {
kinesisShards.put(shardId, ShardObjectHelper.newShard(shardId, parentShardId, adjacentParentShardId, null));
Map<String, Boolean> memoizationContext = new HashMap<>();
Assert.assertFalse(ShardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(shardId,
latestPosition,
Assert.assertFalse(ShardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(shardId, INITIAL_POSITION_LATEST,
shardIdsOfCurrentLeases,
kinesisShards,
newLeaseMap,
@ -902,8 +1123,7 @@ public class ShardSyncerTest {
kinesisShards.put(shardId, shard);
Map<String, Boolean> memoizationContext = new HashMap<>();
Assert.assertTrue(ShardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(shardId,
latestPosition,
Assert.assertTrue(ShardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(shardId, INITIAL_POSITION_LATEST,
shardIdsOfCurrentLeases,
kinesisShards,
newLeaseMap,

View file

@ -42,6 +42,9 @@ import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
*/
public class ShutdownTaskTest {
private static final long TASK_BACKOFF_TIME_MILLIS = 1L;
private static final InitialPositionInStreamExtended INITIAL_POSITION_TRIM_HORIZON =
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON);
Set<String> defaultParentShardIds = new HashSet<>();
String defaultConcurrencyToken = "testToken4398";
String defaultShardId = "shardId-0000397840";
@ -88,16 +91,15 @@ public class ShutdownTaskTest {
IKinesisProxy kinesisProxy = mock(IKinesisProxy.class);
ILeaseManager<KinesisClientLease> leaseManager = mock(KinesisClientLeaseManager.class);
boolean cleanupLeasesOfCompletedShards = false;
ShutdownTask task =
new ShutdownTask(defaultShardInfo,
defaultRecordProcessor,
checkpointer,
ShutdownReason.TERMINATE,
kinesisProxy,
InitialPositionInStream.TRIM_HORIZON,
cleanupLeasesOfCompletedShards ,
leaseManager,
TASK_BACKOFF_TIME_MILLIS);
ShutdownTask task = new ShutdownTask(defaultShardInfo,
defaultRecordProcessor,
checkpointer,
ShutdownReason.TERMINATE,
kinesisProxy,
INITIAL_POSITION_TRIM_HORIZON,
cleanupLeasesOfCompletedShards,
leaseManager,
TASK_BACKOFF_TIME_MILLIS);
TaskResult result = task.call();
Assert.assertNotNull(result.getException());
Assert.assertTrue(result.getException() instanceof IllegalArgumentException);
@ -114,16 +116,15 @@ public class ShutdownTaskTest {
when(kinesisProxy.getShardList()).thenReturn(null);
ILeaseManager<KinesisClientLease> leaseManager = mock(KinesisClientLeaseManager.class);
boolean cleanupLeasesOfCompletedShards = false;
ShutdownTask task =
new ShutdownTask(defaultShardInfo,
defaultRecordProcessor,
checkpointer,
ShutdownReason.TERMINATE,
kinesisProxy,
InitialPositionInStream.TRIM_HORIZON,
cleanupLeasesOfCompletedShards ,
leaseManager,
TASK_BACKOFF_TIME_MILLIS);
ShutdownTask task = new ShutdownTask(defaultShardInfo,
defaultRecordProcessor,
checkpointer,
ShutdownReason.TERMINATE,
kinesisProxy,
INITIAL_POSITION_TRIM_HORIZON,
cleanupLeasesOfCompletedShards,
leaseManager,
TASK_BACKOFF_TIME_MILLIS);
TaskResult result = task.call();
Assert.assertNotNull(result.getException());
Assert.assertTrue(result.getException() instanceof KinesisClientLibIOException);

View file

@ -29,6 +29,7 @@ import java.lang.Thread.State;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -80,7 +81,6 @@ import com.amazonaws.services.kinesis.model.HashKeyRange;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.SequenceNumberRange;
import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
/**
* Unit tests of Worker.
@ -101,7 +101,10 @@ public class WorkerTest {
private final boolean cleanupLeasesUponShardCompletion = true;
// We don't want any of these tests to run checkpoint validation
private final boolean skipCheckpointValidationValue = false;
private final InitialPositionInStream initialPositionInStream = InitialPositionInStream.LATEST;
private static final InitialPositionInStreamExtended INITIAL_POSITION_LATEST =
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST);
private static final InitialPositionInStreamExtended INITIAL_POSITION_TRIM_HORIZON =
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON);
// CHECKSTYLE:IGNORE AnonInnerLengthCheck FOR NEXT 50 LINES
private static final com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY =
@ -167,9 +170,7 @@ public class WorkerTest {
new StreamConfig(proxy,
maxRecords,
idleTimeInMilliseconds,
callProcessRecordsForEmptyRecordList,
skipCheckpointValidationValue,
initialPositionInStream);
callProcessRecordsForEmptyRecordList, skipCheckpointValidationValue, INITIAL_POSITION_LATEST);
final String testConcurrencyToken = "testToken";
final String anotherConcurrencyToken = "anotherTestToken";
final String dummyKinesisShardId = "kinesis-0-0";
@ -182,9 +183,7 @@ public class WorkerTest {
Worker worker =
new Worker(stageName,
streamletFactory,
streamConfig,
InitialPositionInStream.LATEST,
streamletFactory, streamConfig, INITIAL_POSITION_LATEST,
parentShardPollIntervalMillis,
shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion,
@ -219,9 +218,7 @@ public class WorkerTest {
new StreamConfig(proxy,
maxRecords,
idleTimeInMilliseconds,
callProcessRecordsForEmptyRecordList,
skipCheckpointValidationValue,
initialPositionInStream);
callProcessRecordsForEmptyRecordList, skipCheckpointValidationValue, INITIAL_POSITION_LATEST);
final String concurrencyToken = "testToken";
final String anotherConcurrencyToken = "anotherTestToken";
final String dummyKinesisShardId = "kinesis-0-0";
@ -235,9 +232,7 @@ public class WorkerTest {
Worker worker =
new Worker(stageName,
streamletFactory,
streamConfig,
InitialPositionInStream.LATEST,
streamletFactory, streamConfig, INITIAL_POSITION_LATEST,
parentShardPollIntervalMillis,
shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion,
@ -283,9 +278,7 @@ public class WorkerTest {
new StreamConfig(proxy,
maxRecords,
idleTimeInMilliseconds,
callProcessRecordsForEmptyRecordList,
skipCheckpointValidationValue,
initialPositionInStream);
callProcessRecordsForEmptyRecordList, skipCheckpointValidationValue, INITIAL_POSITION_LATEST);
KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class);
@SuppressWarnings("unchecked")
ILeaseManager<KinesisClientLease> leaseManager = mock(ILeaseManager.class);
@ -295,8 +288,7 @@ public class WorkerTest {
Worker worker =
new Worker(stageName,
recordProcessorFactory,
streamConfig,
InitialPositionInStream.TRIM_HORIZON,
streamConfig, INITIAL_POSITION_TRIM_HORIZON,
shardPollInterval,
shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion,
@ -621,6 +613,7 @@ public class WorkerTest {
/**
* Returns executor service that will be owned by the worker. This is useful to test the scenario
* where worker shuts down the executor service also during shutdown flow.
*
* @return Executor service that will be owned by the worker.
*/
private WorkerThreadPoolExecutor getWorkerThreadPoolExecutor() {
@ -665,7 +658,7 @@ public class WorkerTest {
List<KinesisClientLease> initialLeases = new ArrayList<KinesisClientLease>();
for (Shard shard : shardList) {
KinesisClientLease lease = ShardSyncer.newKCLLease(shard);
lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON);
lease.setCheckpoint(ExtendedSequenceNumber.AT_TIMESTAMP);
initialLeases.add(lease);
}
runAndTestWorker(shardList, threadPoolSize, initialLeases, callProcessRecordsForEmptyRecordList, numberOfRecordsPerShard);
@ -719,7 +712,7 @@ public class WorkerTest {
final long epsilonMillis = 1000L;
final long idleTimeInMilliseconds = 2L;
AmazonDynamoDB ddbClient = DynamoDBEmbedded.create();
AmazonDynamoDB ddbClient = DynamoDBEmbedded.create().amazonDynamoDB();
LeaseManager<KinesisClientLease> leaseManager = new KinesisClientLeaseManager("foo", ddbClient);
leaseManager.createLeaseTableIfNotExists(1L, 1L);
for (KinesisClientLease initialLease : initialLeases) {
@ -733,19 +726,17 @@ public class WorkerTest {
epsilonMillis,
metricsFactory);
StreamConfig streamConfig =
new StreamConfig(kinesisProxy,
maxRecords,
idleTimeInMilliseconds,
callProcessRecordsForEmptyRecordList,
skipCheckpointValidationValue,
initialPositionInStream);
final Date timestamp = new Date(KinesisLocalFileDataCreator.STARTING_TIMESTAMP);
StreamConfig streamConfig = new StreamConfig(kinesisProxy,
maxRecords,
idleTimeInMilliseconds,
callProcessRecordsForEmptyRecordList,
skipCheckpointValidationValue, InitialPositionInStreamExtended.newInitialPositionAtTimestamp(timestamp));
Worker worker =
new Worker(stageName,
recordProcessorFactory,
streamConfig,
InitialPositionInStream.TRIM_HORIZON,
streamConfig, INITIAL_POSITION_TRIM_HORIZON,
parentShardPollIntervalMillis,
shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion,
@ -843,7 +834,8 @@ public class WorkerTest {
findShardIdsAndStreamLetsOfShardsWithOnlyOneProcessor(recordProcessorFactory);
for (Shard shard : shardList) {
String shardId = shard.getShardId();
String iterator = fileBasedProxy.getIterator(shardId, ShardIteratorType.TRIM_HORIZON.toString(), null);
String iterator =
fileBasedProxy.getIterator(shardId, new Date(KinesisLocalFileDataCreator.STARTING_TIMESTAMP));
List<Record> expectedRecords = fileBasedProxy.get(iterator, numRecs).getRecords();
if (shardIdsAndStreamLetsOfShardsWithOnlyOneProcessor.containsKey(shardId)) {
verifyAllRecordsWereConsumedExactlyOnce(expectedRecords,
@ -859,7 +851,8 @@ public class WorkerTest {
Map<String, List<Record>> shardStreamletsRecords) {
for (Shard shard : shardList) {
String shardId = shard.getShardId();
String iterator = fileBasedProxy.getIterator(shardId, ShardIteratorType.TRIM_HORIZON.toString(), null);
String iterator =
fileBasedProxy.getIterator(shardId, new Date(KinesisLocalFileDataCreator.STARTING_TIMESTAMP));
List<Record> expectedRecords = fileBasedProxy.get(iterator, numRecs).getRecords();
verifyAllRecordsWereConsumedAtLeastOnce(expectedRecords, shardStreamletsRecords.get(shardId));
}

View file

@ -25,6 +25,7 @@ import java.nio.charset.Charset;
import java.nio.charset.CharsetEncoder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
@ -65,7 +66,9 @@ public class KinesisLocalFileProxy implements IKinesisProxy {
/** Partition key associated with data record. */
PARTITION_KEY(2),
/** Data. */
DATA(3);
DATA(3),
/** Approximate arrival timestamp. */
APPROXIMATE_ARRIVAL_TIMESTAMP(4);
private final int position;
@ -149,7 +152,7 @@ public class KinesisLocalFileProxy implements IKinesisProxy {
String[] strArr = str.split(",");
if (strArr.length != NUM_FIELDS_IN_FILE) {
throw new InvalidArgumentException("Unexpected input in file."
+ "Expected format (shardId, sequenceNumber, partitionKey, dataRecord)");
+ "Expected format (shardId, sequenceNumber, partitionKey, dataRecord, timestamp)");
}
String shardId = strArr[LocalFileFields.SHARD_ID.getPosition()];
Record record = new Record();
@ -157,6 +160,9 @@ public class KinesisLocalFileProxy implements IKinesisProxy {
record.setPartitionKey(strArr[LocalFileFields.PARTITION_KEY.getPosition()]);
ByteBuffer byteBuffer = encoder.encode(CharBuffer.wrap(strArr[LocalFileFields.DATA.getPosition()]));
record.setData(byteBuffer);
Date timestamp =
new Date(Long.parseLong(strArr[LocalFileFields.APPROXIMATE_ARRIVAL_TIMESTAMP.getPosition()]));
record.setApproximateArrivalTimestamp(timestamp);
List<Record> shardRecords = shardedDataRecords.get(shardId);
if (shardRecords == null) {
shardRecords = new ArrayList<Record>();
@ -221,11 +227,8 @@ public class KinesisLocalFileProxy implements IKinesisProxy {
return new IteratorInfo(splits[0], splits[1]);
}
/*
* (non-Javadoc)
*
* @see com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy#getIterator(java.lang.String,
* java.lang.String, java.lang.String)
/**
* {@inheritDoc}
*/
@Override
public String getIterator(String shardId, String iteratorEnum, String sequenceNumber)
@ -262,6 +265,77 @@ public class KinesisLocalFileProxy implements IKinesisProxy {
}
}
/**
* {@inheritDoc}
*/
@Override
public String getIterator(String shardId, String iteratorEnum)
throws ResourceNotFoundException, InvalidArgumentException {
/*
* If we don't have records in this shard, any iterator will return the empty list. Using a
* sequence number of 1 on an empty shard will give this behavior.
*/
List<Record> shardRecords = shardedDataRecords.get(shardId);
if (shardRecords == null) {
throw new ResourceNotFoundException(shardId + " does not exist");
}
if (shardRecords.isEmpty()) {
return serializeIterator(shardId, "1");
}
final String serializedIterator;
if (ShardIteratorType.LATEST.toString().equals(iteratorEnum)) {
/*
* If we do have records, LATEST should return an iterator that can be used to read the
* last record. Our iterators are inclusive for convenience.
*/
Record last = shardRecords.get(shardRecords.size() - 1);
serializedIterator = serializeIterator(shardId, last.getSequenceNumber());
} else if (ShardIteratorType.TRIM_HORIZON.toString().equals(iteratorEnum)) {
serializedIterator = serializeIterator(shardId, shardRecords.get(0).getSequenceNumber());
} else {
throw new IllegalArgumentException("IteratorEnum value was invalid: " + iteratorEnum);
}
return serializedIterator;
}
/**
* {@inheritDoc}
*/
@Override
public String getIterator(String shardId, Date timestamp)
throws ResourceNotFoundException, InvalidArgumentException {
/*
* If we don't have records in this shard, any iterator will return the empty list. Using a
* sequence number of 1 on an empty shard will give this behavior.
*/
List<Record> shardRecords = shardedDataRecords.get(shardId);
if (shardRecords == null) {
throw new ResourceNotFoundException(shardId + " does not exist");
}
if (shardRecords.isEmpty()) {
return serializeIterator(shardId, "1");
}
final String serializedIterator;
if (timestamp != null) {
String seqNumAtTimestamp = findSequenceNumberAtTimestamp(shardRecords, timestamp);
serializedIterator = serializeIterator(shardId, seqNumAtTimestamp);
} else {
throw new IllegalArgumentException("Timestamp must be specified for AT_TIMESTAMP iterator");
}
return serializedIterator;
}
private String findSequenceNumberAtTimestamp(final List<Record> shardRecords, final Date timestamp) {
for (Record rec : shardRecords) {
if (rec.getApproximateArrivalTimestamp().getTime() >= timestamp.getTime()) {
return rec.getSequenceNumber();
}
}
return null;
}
/*
* (non-Javadoc)
*

View file

@ -51,6 +51,17 @@ public class KinesisLocalFileDataCreator {
private static final int PARTITION_KEY_LENGTH = 10;
private static final int DATA_LENGTH = 40;
/**
* Starting timestamp - also referenced in KinesisLocalFileProxyTest.
*/
public static final long STARTING_TIMESTAMP = 1462345678910L;
/**
* This is used to allow few records to have the same timestamps (to mimic real life scenarios).
* Records 5n-1 and 5n will have the same timestamp (n > 0).
*/
private static final int DIVISOR = 5;
private KinesisLocalFileDataCreator() {
}
@ -96,6 +107,7 @@ public class KinesisLocalFileDataCreator {
fileWriter.write(serializedShardList);
fileWriter.newLine();
BigInteger sequenceNumberIncrement = new BigInteger("0");
long timestamp = STARTING_TIMESTAMP;
for (int i = 0; i < numRecordsPerShard; i++) {
for (Shard shard : shardList) {
BigInteger sequenceNumber =
@ -112,7 +124,12 @@ public class KinesisLocalFileDataCreator {
String partitionKey =
PARTITION_KEY_PREFIX + shard.getShardId() + generateRandomString(PARTITION_KEY_LENGTH);
String data = generateRandomString(DATA_LENGTH);
String line = shard.getShardId() + "," + sequenceNumber + "," + partitionKey + "," + data;
// Allow few records to have the same timestamps (to mimic real life scenarios).
timestamp = (i % DIVISOR == 0) ? timestamp : timestamp + 1;
String line = shard.getShardId() + "," + sequenceNumber + "," + partitionKey + "," + data + ","
+ timestamp;
fileWriter.write(line);
fileWriter.newLine();
sequenceNumberIncrement = sequenceNumberIncrement.add(BigInteger.ONE);