diff --git a/NOTICE.txt b/NOTICE.txt
index 79e11d38..650c34d7 100644
--- a/NOTICE.txt
+++ b/NOTICE.txt
@@ -1,3 +1,3 @@
AmazonKinesisClientLibrary
-Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
diff --git a/pom.xml b/pom.xml
index f6648d81..b2da769e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -93,7 +93,7 @@
com.amazonaws
DynamoDBLocal
- 1.10.5.1
+ 1.11.0.1
test
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/checkpoint/SentinelCheckpoint.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/checkpoint/SentinelCheckpoint.java
index 65e00d22..d4442b82 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/checkpoint/SentinelCheckpoint.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/checkpoint/SentinelCheckpoint.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@@ -31,5 +31,9 @@ public enum SentinelCheckpoint {
/**
* We've completely processed all records in this shard.
*/
- SHARD_END;
+ SHARD_END,
+ /**
+ * Start from the record at or after the specified server-side timestamp.
+ */
+ AT_TIMESTAMP
}
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitialPositionInStream.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitialPositionInStream.java
index 241683b1..94f9b455 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitialPositionInStream.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitialPositionInStream.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@@ -19,14 +19,18 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
* This is used during initial application bootstrap (when a checkpoint doesn't exist for a shard or its parents).
*/
public enum InitialPositionInStream {
-
/**
* Start after the most recent data record (fetch new data).
*/
LATEST,
-
+
/**
* Start from the oldest available data record.
*/
- TRIM_HORIZON;
+ TRIM_HORIZON,
+
+ /**
+ * Start from the record at or after the specified server-side timestamp.
+ */
+ AT_TIMESTAMP
}
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitialPositionInStreamExtended.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitialPositionInStreamExtended.java
new file mode 100644
index 00000000..6a9948c7
--- /dev/null
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitialPositionInStreamExtended.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Amazon Software License (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/asl/
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
+
+import java.util.Date;
+
+/**
+ * Class that houses the entities needed to specify the position in the stream from where a new application should
+ * start.
+ */
+class InitialPositionInStreamExtended {
+
+ private final InitialPositionInStream position;
+ private final Date timestamp;
+
+ /**
+ * This is scoped as private to forbid callers from using it directly and to convey the intent to use the
+ * static methods instead.
+ *
+ * @param position One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. The Amazon Kinesis Client Library will start
+ * fetching records from this position when the application starts up if there are no checkpoints.
+ * If there are checkpoints, we will process records from the checkpoint position.
+ * @param timestamp The timestamp to use with the AT_TIMESTAMP value for initialPositionInStream.
+ */
+ private InitialPositionInStreamExtended(final InitialPositionInStream position, final Date timestamp) {
+ this.position = position;
+ this.timestamp = timestamp;
+ }
+
+ /**
+ * Get the initial position in the stream where the application should start from.
+ *
+ * @return The initial position in stream.
+ */
+ protected InitialPositionInStream getInitialPositionInStream() {
+ return this.position;
+ }
+
+ /**
+ * Get the timestamp from where we need to start the application.
+ * Valid only for initial position of type AT_TIMESTAMP, returns null for other positions.
+ *
+ * @return The timestamp from where we need to start the application.
+ */
+ protected Date getTimestamp() {
+ return this.timestamp;
+ }
+
+ protected static InitialPositionInStreamExtended newInitialPosition(final InitialPositionInStream position) {
+ switch (position) {
+ case LATEST:
+ return new InitialPositionInStreamExtended(InitialPositionInStream.LATEST, null);
+ case TRIM_HORIZON:
+ return new InitialPositionInStreamExtended(InitialPositionInStream.TRIM_HORIZON, null);
+ default:
+ throw new IllegalArgumentException("Invalid InitialPosition: " + position);
+ }
+ }
+
+ protected static InitialPositionInStreamExtended newInitialPositionAtTimestamp(final Date timestamp) {
+ if (timestamp == null) {
+ throw new IllegalArgumentException("Timestamp must be specified for InitialPosition AT_TIMESTAMP");
+ }
+ return new InitialPositionInStreamExtended(InitialPositionInStream.AT_TIMESTAMP, timestamp);
+ }
+}
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java
index 617813a4..262b98c7 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@@ -41,6 +41,7 @@ class InitializeTask implements ITask {
private final RecordProcessorCheckpointer recordProcessorCheckpointer;
// Back off for this interval if we encounter a problem (exception)
private final long backoffTimeMillis;
+ private final StreamConfig streamConfig;
/**
* Constructor.
@@ -50,13 +51,15 @@ class InitializeTask implements ITask {
ICheckpoint checkpoint,
RecordProcessorCheckpointer recordProcessorCheckpointer,
KinesisDataFetcher dataFetcher,
- long backoffTimeMillis) {
+ long backoffTimeMillis,
+ StreamConfig streamConfig) {
this.shardInfo = shardInfo;
this.recordProcessor = recordProcessor;
this.checkpoint = checkpoint;
this.recordProcessorCheckpointer = recordProcessorCheckpointer;
this.dataFetcher = dataFetcher;
this.backoffTimeMillis = backoffTimeMillis;
+ this.streamConfig = streamConfig;
}
/*
@@ -74,7 +77,7 @@ class InitializeTask implements ITask {
LOG.debug("Initializing ShardId " + shardInfo.getShardId());
ExtendedSequenceNumber initialCheckpoint = checkpoint.getCheckpoint(shardInfo.getShardId());
- dataFetcher.initialize(initialCheckpoint.getSequenceNumber());
+ dataFetcher.initialize(initialCheckpoint.getSequenceNumber(), streamConfig.getInitialPositionInStream());
recordProcessorCheckpointer.setLargestPermittedCheckpointValue(initialCheckpoint);
recordProcessorCheckpointer.setInitialCheckpointValue(initialCheckpoint);
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java
index 8e07e501..0d45d359 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java
@@ -14,6 +14,7 @@
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
+import java.util.Date;
import java.util.Set;
import com.amazonaws.ClientConfiguration;
@@ -185,6 +186,7 @@ public class KinesisClientLibConfiguration {
private int maxLeasesToStealAtOneTime;
private int initialLeaseTableReadCapacity;
private int initialLeaseTableWriteCapacity;
+ private InitialPositionInStreamExtended initialPositionInStreamExtended;
/**
* Constructor.
@@ -263,7 +265,6 @@ public class KinesisClientLibConfiguration {
* with a call to Amazon Kinesis before checkpointing for calls to
* {@link RecordProcessorCheckpointer#checkpoint(String)}
* @param regionName The region name for the service
- *
*/
// CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 26 LINES
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 26 LINES
@@ -330,6 +331,8 @@ public class KinesisClientLibConfiguration {
this.maxLeasesToStealAtOneTime = DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME;
this.initialLeaseTableReadCapacity = DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY;
this.initialLeaseTableWriteCapacity = DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY;
+ this.initialPositionInStreamExtended =
+ InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream);
}
// Check if value is positive, otherwise throw an exception
@@ -580,6 +583,22 @@ public class KinesisClientLibConfiguration {
return initialLeaseTableWriteCapacity;
}
+ /**
+ * Keeping it protected to forbid outside callers from depending on this internal object.
+ * @return The initialPositionInStreamExtended object.
+ */
+ protected InitialPositionInStreamExtended getInitialPositionInStreamExtended() {
+ return initialPositionInStreamExtended;
+ }
+
+ /**
+ * @return The timestamp from where we need to start the application.
+ * Valid only for initial position of type AT_TIMESTAMP, returns null for other positions.
+ */
+ public Date getTimestampAtInitialPositionInStream() {
+ return initialPositionInStreamExtended.getTimestamp();
+ }
+
// CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 190 LINES
/**
* @param tableName name of the lease table in DynamoDB
@@ -600,13 +619,25 @@ public class KinesisClientLibConfiguration {
}
/**
- * @param initialPositionInStream One of LATEST or TRIM_HORIZON. The Amazon Kinesis Client Library will start
- * fetching records from this position when the application starts up if there are no checkpoints. If there
- * are checkpoints, we will process records from the checkpoint position.
+ * @param initialPositionInStream One of LATEST or TRIM_HORIZON. The Amazon Kinesis Client Library
+ * will start fetching records from this position when the application starts up if there are no checkpoints.
+ * If there are checkpoints, we will process records from the checkpoint position.
* @return KinesisClientLibConfiguration
*/
public KinesisClientLibConfiguration withInitialPositionInStream(InitialPositionInStream initialPositionInStream) {
this.initialPositionInStream = initialPositionInStream;
+ this.initialPositionInStreamExtended =
+ InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream);
+ return this;
+ }
+
+ /**
+ * @param timestamp The timestamp to use with the AT_TIMESTAMP value for initialPositionInStream.
+ * @return KinesisClientLibConfiguration
+ */
+ public KinesisClientLibConfiguration withTimestampAtInitialPositionInStream(Date timestamp) {
+ this.initialPositionInStream = InitialPositionInStream.AT_TIMESTAMP;
+ this.initialPositionInStreamExtended = InitialPositionInStreamExtended.newInitialPositionAtTimestamp(timestamp);
return this;
}
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java
index 14d0448c..2ce3152a 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@@ -25,6 +25,8 @@ import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
import com.amazonaws.services.kinesis.clientlibrary.proxies.MetricsCollectingKinesisProxyDecorator;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
+import java.util.Date;
+
/**
* Used to get data from Amazon Kinesis. Tracks iterator state internally.
*/
@@ -41,8 +43,7 @@ class KinesisDataFetcher {
/**
*
* @param kinesisProxy Kinesis proxy
- * @param shardId shardId (we'll fetch data for this shard)
- * @param checkpoint used to get current checkpoint from which to start fetching records
+ * @param shardInfo The shardInfo object.
*/
public KinesisDataFetcher(IKinesisProxy kinesisProxy, ShardInfo shardInfo) {
this.shardId = shardInfo.getShardId();
@@ -83,17 +84,18 @@ class KinesisDataFetcher {
/**
* Initializes this KinesisDataFetcher's iterator based on the checkpointed sequence number.
* @param initialCheckpoint Current checkpoint sequence number for this shard.
- *
+ * @param initialPositionInStream The initialPositionInStream.
*/
- public void initialize(String initialCheckpoint) {
+ public void initialize(String initialCheckpoint, InitialPositionInStreamExtended initialPositionInStream) {
LOG.info("Initializing shard " + shardId + " with " + initialCheckpoint);
- advanceIteratorTo(initialCheckpoint);
+ advanceIteratorTo(initialCheckpoint, initialPositionInStream);
isInitialized = true;
}
-
- public void initialize(ExtendedSequenceNumber initialCheckpoint) {
+
+ public void initialize(ExtendedSequenceNumber initialCheckpoint,
+ InitialPositionInStreamExtended initialPositionInStream) {
LOG.info("Initializing shard " + shardId + " with " + initialCheckpoint.getSequenceNumber());
- advanceIteratorTo(initialCheckpoint.getSequenceNumber());
+ advanceIteratorTo(initialCheckpoint.getSequenceNumber(), initialPositionInStream);
isInitialized = true;
}
@@ -101,14 +103,17 @@ class KinesisDataFetcher {
* Advances this KinesisDataFetcher's internal iterator to be at the passed-in sequence number.
*
* @param sequenceNumber advance the iterator to the record at this sequence number.
+ * @param initialPositionInStream The initialPositionInStream.
*/
- void advanceIteratorTo(String sequenceNumber) {
+ void advanceIteratorTo(String sequenceNumber, InitialPositionInStreamExtended initialPositionInStream) {
if (sequenceNumber == null) {
throw new IllegalArgumentException("SequenceNumber should not be null: shardId " + shardId);
} else if (sequenceNumber.equals(SentinelCheckpoint.LATEST.toString())) {
- nextIterator = getIterator(ShardIteratorType.LATEST.toString(), null);
+ nextIterator = getIterator(ShardIteratorType.LATEST.toString());
} else if (sequenceNumber.equals(SentinelCheckpoint.TRIM_HORIZON.toString())) {
- nextIterator = getIterator(ShardIteratorType.TRIM_HORIZON.toString(), null);
+ nextIterator = getIterator(ShardIteratorType.TRIM_HORIZON.toString());
+ } else if (sequenceNumber.equals(SentinelCheckpoint.AT_TIMESTAMP.toString())) {
+ nextIterator = getIterator(initialPositionInStream.getTimestamp());
} else if (sequenceNumber.equals(SentinelCheckpoint.SHARD_END.toString())) {
nextIterator = null;
} else {
@@ -120,8 +125,8 @@ class KinesisDataFetcher {
}
/**
- * @param iteratorType
- * @param sequenceNumber
+ * @param iteratorType The iteratorType - either AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER.
+ * @param sequenceNumber The sequenceNumber.
*
* @return iterator or null if we catch a ResourceNotFound exception
*/
@@ -139,6 +144,40 @@ class KinesisDataFetcher {
return iterator;
}
+ /**
+ * @param iteratorType The iteratorType - either TRIM_HORIZON or LATEST.
+ * @return iterator or null if we catch a ResourceNotFound exception
+ */
+ private String getIterator(String iteratorType) {
+ String iterator = null;
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Calling getIterator for " + shardId + " and iterator type " + iteratorType);
+ }
+ iterator = kinesisProxy.getIterator(shardId, iteratorType);
+ } catch (ResourceNotFoundException e) {
+ LOG.info("Caught ResourceNotFoundException when getting an iterator for shard " + shardId, e);
+ }
+ return iterator;
+ }
+
+ /**
+ * @param timestamp The timestamp.
+ * @return iterator or null if we catch a ResourceNotFound exception
+ */
+ private String getIterator(Date timestamp) {
+ String iterator = null;
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Calling getIterator for " + shardId + " and timestamp " + timestamp);
+ }
+ iterator = kinesisProxy.getIterator(shardId, timestamp);
+ } catch (ResourceNotFoundException e) {
+ LOG.info("Caught ResourceNotFoundException when getting an iterator for shard " + shardId, e);
+ }
+ return iterator;
+ }
+
/**
* @return the shardEndReached
*/
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java
index 47ee7a5d..db0970d5 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@@ -259,8 +259,8 @@ class ProcessTask implements ITask {
* Advance the iterator to after the greatest processed sequence number (remembered by
* recordProcessorCheckpointer).
*/
- dataFetcher.advanceIteratorTo(
- recordProcessorCheckpointer.getLargestPermittedCheckpointValue().getSequenceNumber());
+ dataFetcher.advanceIteratorTo(recordProcessorCheckpointer.getLargestPermittedCheckpointValue()
+ .getSequenceNumber(), streamConfig.getInitialPositionInStream());
// Try a second time - if we fail this time, expose the failure.
try {
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java
index 8445e684..10dacc04 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@@ -252,7 +252,8 @@ class ShardConsumer {
checkpoint,
recordProcessorCheckpointer,
dataFetcher,
- taskBackoffTimeMillis);
+ taskBackoffTimeMillis,
+ streamConfig);
break;
case PROCESSING:
nextTask =
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTask.java
index f0db8cda..ddfb8459 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTask.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTask.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@@ -33,7 +33,7 @@ class ShardSyncTask implements ITask {
private final IKinesisProxy kinesisProxy;
private final ILeaseManager leaseManager;
- private InitialPositionInStream initialPosition;
+ private InitialPositionInStreamExtended initialPosition;
private final boolean cleanupLeasesUponShardCompletion;
private final long shardSyncTaskIdleTimeMillis;
private final TaskType taskType = TaskType.SHARDSYNC;
@@ -41,13 +41,13 @@ class ShardSyncTask implements ITask {
/**
* @param kinesisProxy Used to fetch information about the stream (e.g. shard list)
* @param leaseManager Used to fetch and create leases
- * @param initialPosition One of LATEST or TRIM_HORIZON. Amazon Kinesis Client Library will start processing records
- * from this point in the stream (when an application starts up for the first time) except for shards that
- * already have a checkpoint (and their descendant shards).
+ * @param initialPositionInStream One of LATEST, TRIM_HORIZON or AT_TIMESTAMP. Amazon Kinesis Client Library will
+ * start processing records from this point in the stream (when an application starts up for the first time)
+ * except for shards that already have a checkpoint (and their descendant shards).
*/
ShardSyncTask(IKinesisProxy kinesisProxy,
ILeaseManager leaseManager,
- InitialPositionInStream initialPositionInStream,
+ InitialPositionInStreamExtended initialPositionInStream,
boolean cleanupLeasesUponShardCompletion,
long shardSyncTaskIdleTimeMillis) {
this.kinesisProxy = kinesisProxy;
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java
index 7fffe123..c1bfae76 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@@ -42,7 +42,7 @@ class ShardSyncTaskManager {
private final ILeaseManager leaseManager;
private final IMetricsFactory metricsFactory;
private final ExecutorService executorService;
- private final InitialPositionInStream initialPositionInStream;
+ private final InitialPositionInStreamExtended initialPositionInStream;
private boolean cleanupLeasesUponShardCompletion;
private final long shardSyncIdleTimeMillis;
@@ -61,7 +61,7 @@ class ShardSyncTaskManager {
*/
ShardSyncTaskManager(final IKinesisProxy kinesisProxy,
final ILeaseManager leaseManager,
- final InitialPositionInStream initialPositionInStream,
+ final InitialPositionInStreamExtended initialPositionInStream,
final boolean cleanupLeasesUponShardCompletion,
final long shardSyncIdleTimeMillis,
final IMetricsFactory metricsFactory,
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java
index 97298d48..52944200 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@@ -59,7 +59,7 @@ class ShardSyncer {
static synchronized void bootstrapShardLeases(IKinesisProxy kinesisProxy,
ILeaseManager leaseManager,
- InitialPositionInStream initialPositionInStream,
+ InitialPositionInStreamExtended initialPositionInStream,
boolean cleanupLeasesOfCompletedShards)
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards);
@@ -82,7 +82,7 @@ class ShardSyncer {
*/
static synchronized void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy,
ILeaseManager leaseManager,
- InitialPositionInStream initialPositionInStream,
+ InitialPositionInStreamExtended initialPositionInStream,
boolean cleanupLeasesOfCompletedShards)
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards);
@@ -106,7 +106,7 @@ class ShardSyncer {
// CHECKSTYLE:OFF CyclomaticComplexity
private static synchronized void syncShardLeases(IKinesisProxy kinesisProxy,
ILeaseManager leaseManager,
- InitialPositionInStream initialPosition,
+ InitialPositionInStreamExtended initialPosition,
boolean cleanupLeasesOfCompletedShards)
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
List shards = getShardList(kinesisProxy);
@@ -327,15 +327,15 @@ class ShardSyncer {
* when persisting the leases in DynamoDB will ensure that we recover gracefully if we fail
* before creating all the leases.
*
- * @param shardIds Set of all shardIds in Kinesis (we'll create new leases based on this set)
+ * @param shards List of all shards in Kinesis (we'll create new leases based on this set)
* @param currentLeases List of current leases
- * @param initialPosition One of LATEST or TRIM_HORIZON. We'll start fetching records from that location in the
- * shard (when an application starts up for the first time - and there are no checkpoints).
+ * @param initialPosition One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. We'll start fetching records from that
+ * location in the shard (when an application starts up for the first time - and there are no checkpoints).
* @return List of new leases to create sorted by starting sequenceNumber of the corresponding shard
*/
static List determineNewLeasesToCreate(List shards,
List currentLeases,
- InitialPositionInStream initialPosition) {
+ InitialPositionInStreamExtended initialPosition) {
Map shardIdToNewLeaseMap = new HashMap();
Map shardIdToShardMapOfAllKinesisShards = constructShardIdToShardMap(shards);
@@ -364,7 +364,32 @@ class ShardSyncer {
shardIdToShardMapOfAllKinesisShards,
shardIdToNewLeaseMap,
memoizationContext);
- if (isDescendant) {
+
+ /**
+ * If the shard is a descendant and the specified initial position is AT_TIMESTAMP, then the
+ * checkpoint should be set to AT_TIMESTAMP, else to TRIM_HORIZON. For AT_TIMESTAMP, we will add a
+ * lease just like we do for TRIM_HORIZON. However we will only return back records with server-side
+ * timestamp at or after the specified initial position timestamp.
+ *
+ * Shard structure (each level depicts a stream segment):
+ * 0 1 2 3 4 5 - shards till epoch 102
+ * \ / \ / | |
+ * 6 7 4 5 - shards from epoch 103 - 205
+ * \ / | /\
+ * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
+ *
+ * Current leases: empty set
+ *
+ * For the above example, suppose the initial position in stream is set to AT_TIMESTAMP with
+ * timestamp value 206. We will then create new leases for all the shards (with checkpoint set to
+ * AT_TIMESTAMP), including the ancestor shards with epoch less than 206. However as we begin
+ * processing the ancestor shards, their checkpoints would be updated to SHARD_END and their leases
+ * would then be deleted since they won't have records with server-side timestamp at/after 206. And
+ * after that we will begin processing the descendant shards with epoch at/after 206 and we will
+ * return the records that meet the timestamp requirement for these shards.
+ */
+ if (isDescendant && !initialPosition.getInitialPositionInStream()
+ .equals(InitialPositionInStream.AT_TIMESTAMP)) {
newLease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON);
} else {
newLease.setCheckpoint(convertToCheckpoint(initialPosition));
@@ -388,8 +413,10 @@ class ShardSyncer {
* Create leases for the ancestors of this shard as required.
* See javadoc of determineNewLeasesToCreate() for rules and example.
*
- * @param shardIds Ancestors of these shards will be considered for addition into the new lease map
- * @param shardIdsOfCurrentLeases
+ * @param shardId The shardId to check.
+ * @param initialPosition One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. We'll start fetching records from that
+ * location in the shard (when an application starts up for the first time - and there are no checkpoints).
+ * @param shardIdsOfCurrentLeases The shardIds for the current leases.
* @param shardIdToShardMapOfAllKinesisShards ShardId->Shard map containing all shards obtained via DescribeStream.
* @param shardIdToLeaseMapOfNewShards Add lease POJOs corresponding to ancestors to this map.
* @param memoizationContext Memoization of shards that have been evaluated as part of the evaluation
@@ -397,7 +424,7 @@ class ShardSyncer {
*/
// CHECKSTYLE:OFF CyclomaticComplexity
static boolean checkIfDescendantAndAddNewLeasesForAncestors(String shardId,
- InitialPositionInStream initialPosition,
+ InitialPositionInStreamExtended initialPosition,
Set shardIdsOfCurrentLeases,
Map shardIdToShardMapOfAllKinesisShards,
Map shardIdToLeaseMapOfNewShards,
@@ -449,7 +476,9 @@ class ShardSyncer {
shardIdToLeaseMapOfNewShards.put(parentShardId, lease);
}
- if (descendantParentShardIds.contains(parentShardId)) {
+ if (descendantParentShardIds.contains(parentShardId)
+ && !initialPosition.getInitialPositionInStream()
+ .equals(InitialPositionInStream.AT_TIMESTAMP)) {
lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON);
} else {
lease.setCheckpoint(convertToCheckpoint(initialPosition));
@@ -457,8 +486,13 @@ class ShardSyncer {
}
}
} else {
- // This shard should be included, if the customer wants to process all records in the stream.
- if (initialPosition.equals(InitialPositionInStream.TRIM_HORIZON)) {
+ // This shard should be included, if the customer wants to process all records in the stream or
+ // if the initial position is AT_TIMESTAMP. For AT_TIMESTAMP, we will add a lease just like we do
+ // for TRIM_HORIZON. However we will only return back records with server-side timestamp at or
+ // after the specified initial position timestamp.
+ if (initialPosition.getInitialPositionInStream().equals(InitialPositionInStream.TRIM_HORIZON)
+ || initialPosition.getInitialPositionInStream()
+ .equals(InitialPositionInStream.AT_TIMESTAMP)) {
isDescendant = true;
}
}
@@ -737,13 +771,15 @@ class ShardSyncer {
return openShards;
}
- private static ExtendedSequenceNumber convertToCheckpoint(InitialPositionInStream position) {
+ private static ExtendedSequenceNumber convertToCheckpoint(InitialPositionInStreamExtended position) {
ExtendedSequenceNumber checkpoint = null;
- if (position.equals(InitialPositionInStream.TRIM_HORIZON)) {
+ if (position.getInitialPositionInStream().equals(InitialPositionInStream.TRIM_HORIZON)) {
checkpoint = ExtendedSequenceNumber.TRIM_HORIZON;
- } else if (position.equals(InitialPositionInStream.LATEST)) {
+ } else if (position.getInitialPositionInStream().equals(InitialPositionInStream.LATEST)) {
checkpoint = ExtendedSequenceNumber.LATEST;
+ } else if (position.getInitialPositionInStream().equals(InitialPositionInStream.AT_TIMESTAMP)) {
+ checkpoint = ExtendedSequenceNumber.AT_TIMESTAMP;
}
return checkpoint;
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java
index ecf4873e..3ce05203 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@@ -42,7 +42,7 @@ class ShutdownTask implements ITask {
private final ShutdownReason reason;
private final IKinesisProxy kinesisProxy;
private final ILeaseManager leaseManager;
- private final InitialPositionInStream initialPositionInStream;
+ private final InitialPositionInStreamExtended initialPositionInStream;
private final boolean cleanupLeasesOfCompletedShards;
private final TaskType taskType = TaskType.SHUTDOWN;
private final long backoffTimeMillis;
@@ -56,7 +56,7 @@ class ShutdownTask implements ITask {
RecordProcessorCheckpointer recordProcessorCheckpointer,
ShutdownReason reason,
IKinesisProxy kinesisProxy,
- InitialPositionInStream initialPositionInStream,
+ InitialPositionInStreamExtended initialPositionInStream,
boolean cleanupLeasesOfCompletedShards,
ILeaseManager leaseManager,
long backoffTimeMillis) {
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/StreamConfig.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/StreamConfig.java
index 2b7120fd..b5c283fb 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/StreamConfig.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/StreamConfig.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@@ -25,7 +25,7 @@ class StreamConfig {
private final int maxRecords;
private final long idleTimeInMilliseconds;
private final boolean callProcessRecordsEvenForEmptyRecordList;
- private InitialPositionInStream initialPositionInStream;
+ private InitialPositionInStreamExtended initialPositionInStream;
private final boolean validateSequenceNumberBeforeCheckpointing;
/**
@@ -42,7 +42,7 @@ class StreamConfig {
long idleTimeInMilliseconds,
boolean callProcessRecordsEvenForEmptyRecordList,
boolean validateSequenceNumberBeforeCheckpointing,
- InitialPositionInStream initialPositionInStream) {
+ InitialPositionInStreamExtended initialPositionInStream) {
this.streamProxy = proxy;
this.maxRecords = maxRecords;
this.idleTimeInMilliseconds = idleTimeInMilliseconds;
@@ -82,7 +82,7 @@ class StreamConfig {
/**
* @return the initialPositionInStream
*/
- InitialPositionInStream getInitialPositionInStream() {
+ InitialPositionInStreamExtended getInitialPositionInStream() {
return initialPositionInStream;
}
@@ -92,5 +92,4 @@ class StreamConfig {
boolean shouldValidateSequenceNumberBeforeCheckpointing() {
return validateSequenceNumberBeforeCheckpointing;
}
-
}
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java
index 6aec2346..50861c29 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java
@@ -64,7 +64,7 @@ public class Worker implements Runnable {
private final String applicationName;
private final IRecordProcessorFactory recordProcessorFactory;
private final StreamConfig streamConfig;
- private final InitialPositionInStream initialPosition;
+ private final InitialPositionInStreamExtended initialPosition;
private final ICheckpoint checkpointTracker;
private final long idleTimeInMilliseconds;
// Backoff time when polling to check if application has finished processing
@@ -212,8 +212,8 @@ public class Worker implements Runnable {
config.getMaxRecords(), config.getIdleTimeBetweenReadsInMillis(),
config.shouldCallProcessRecordsEvenForEmptyRecordList(),
config.shouldValidateSequenceNumberBeforeCheckpointing(),
- config.getInitialPositionInStream()),
- config.getInitialPositionInStream(),
+ config.getInitialPositionInStreamExtended()),
+ config.getInitialPositionInStreamExtended(),
config.getParentShardPollIntervalMillis(),
config.getShardSyncIntervalMillis(),
config.shouldCleanupLeasesUponShardCompletion(),
@@ -258,9 +258,9 @@ public class Worker implements Runnable {
* @param applicationName Name of the Kinesis application
* @param recordProcessorFactory Used to get record processor instances for processing data from shards
* @param streamConfig Stream configuration
- * @param initialPositionInStream One of LATEST or TRIM_HORIZON. The KinesisClientLibrary will start fetching data
- * from this location in the stream when an application starts up for the first time and there are no
- * checkpoints. If there are checkpoints, we start from the checkpoint position.
+ * @param initialPositionInStream One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. The KinesisClientLibrary will start
+ * fetching data from this location in the stream when an application starts up for the first time and
+ * there are no checkpoints. If there are checkpoints, we start from the checkpoint position.
* @param parentShardPollIntervalMillis Wait for this long between polls to check if parent shards are done
* @param shardSyncIdleTimeMillis Time between tasks to sync leases and Kinesis shards
* @param cleanupLeasesUponShardCompletion Clean up shards we've finished processing (don't wait till they expire in
@@ -277,7 +277,7 @@ public class Worker implements Runnable {
Worker(String applicationName,
IRecordProcessorFactory recordProcessorFactory,
StreamConfig streamConfig,
- InitialPositionInStream initialPositionInStream,
+ InitialPositionInStreamExtended initialPositionInStream,
long parentShardPollIntervalMillis,
long shardSyncIdleTimeMillis,
boolean cleanupLeasesUponShardCompletion,
@@ -946,8 +946,8 @@ public class Worker implements Runnable {
config.getIdleTimeBetweenReadsInMillis(),
config.shouldCallProcessRecordsEvenForEmptyRecordList(),
config.shouldValidateSequenceNumberBeforeCheckpointing(),
- config.getInitialPositionInStream()),
- config.getInitialPositionInStream(),
+ config.getInitialPositionInStreamExtended()),
+ config.getInitialPositionInStreamExtended(),
config.getParentShardPollIntervalMillis(),
config.getShardSyncIntervalMillis(),
config.shouldCleanupLeasesUponShardCompletion(),
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/IKinesisProxy.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/IKinesisProxy.java
index 8dbb97fa..df7f951d 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/IKinesisProxy.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/IKinesisProxy.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@@ -15,6 +15,7 @@
package com.amazonaws.services.kinesis.clientlibrary.proxies;
import java.nio.ByteBuffer;
+import java.util.Date;
import java.util.List;
import java.util.Set;
@@ -72,7 +73,16 @@ public interface IKinesisProxy {
/**
* Fetch a shard iterator from the specified position in the shard.
- *
+ * This is to fetch a shard iterator for ShardIteratorType AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER which
+ * requires the starting sequence number.
+ *
+ * NOTE: Currently this method continues to fetch iterators for ShardIteratorTypes TRIM_HORIZON, LATEST,
+ * AT_SEQUENCE_NUMBER and AFTER_SEQUENCE_NUMBER.
+ * But this behavior will change in the next release, after which this method will only serve
+ * AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER ShardIteratorTypes.
+ * We recommend users who call this method directly to use the appropriate getIterator method based on the
+ * ShardIteratorType.
+ *
* @param shardId Shard id
* @param iteratorEnum one of: TRIM_HORIZON, LATEST, AT_SEQUENCE_NUMBER, AFTER_SEQUENCE_NUMBER
* @param sequenceNumber the sequence number - must be null unless iteratorEnum is AT_SEQUENCE_NUMBER or
@@ -84,6 +94,31 @@ public interface IKinesisProxy {
String getIterator(String shardId, String iteratorEnum, String sequenceNumber)
throws ResourceNotFoundException, InvalidArgumentException;
+ /**
+ * Fetch a shard iterator from the specified position in the shard.
+ * This is to fetch a shard iterator for ShardIteratorType LATEST or TRIM_HORIZON which doesn't require a starting
+ * sequence number.
+ *
+ * @param shardId Shard id
+ * @param iteratorEnum Either TRIM_HORIZON or LATEST.
+ * @return shard iterator which can be used to read data from Kinesis.
+ * @throws ResourceNotFoundException The Kinesis stream or shard was not found
+ * @throws InvalidArgumentException Invalid input parameters
+ */
+ String getIterator(String shardId, String iteratorEnum) throws ResourceNotFoundException, InvalidArgumentException;
+
+ /**
+ * Fetch a shard iterator from the specified position in the shard.
+ * This is to fetch a shard iterator for ShardIteratorType AT_TIMESTAMP which requires the timestamp field.
+ *
+ * @param shardId Shard id
+ * @param timestamp The timestamp.
+ * @return shard iterator which can be used to read data from Kinesis.
+ * @throws ResourceNotFoundException The Kinesis stream or shard was not found
+ * @throws InvalidArgumentException Invalid input parameters
+ */
+ String getIterator(String shardId, Date timestamp) throws ResourceNotFoundException, InvalidArgumentException;
+
/**
* @param sequenceNumberForOrdering (optional) used for record ordering
* @param explicitHashKey optionally supplied transformation of partitionkey
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java
index 0cf36da7..ad929c21 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@@ -16,6 +16,7 @@ package com.amazonaws.services.kinesis.clientlibrary.proxies;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -40,6 +41,7 @@ import com.amazonaws.services.kinesis.model.PutRecordRequest;
import com.amazonaws.services.kinesis.model.PutRecordResult;
import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
import com.amazonaws.services.kinesis.model.Shard;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
import com.amazonaws.services.kinesis.model.StreamStatus;
/**
@@ -263,12 +265,50 @@ public class KinesisProxy implements IKinesisProxyExtended {
*/
@Override
public String getIterator(String shardId, String iteratorType, String sequenceNumber) {
+ if (!iteratorType.equals(ShardIteratorType.AT_SEQUENCE_NUMBER.toString()) || !iteratorType.equals(
+ ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString())) {
+ LOG.info("This method should only be used for AT_SEQUENCE_NUMBER and AFTER_SEQUENCE_NUMBER "
+ + "ShardIteratorTypes. For methods to use with other ShardIteratorTypes, see IKinesisProxy.java");
+ }
final GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
getShardIteratorRequest.setRequestCredentials(credentialsProvider.getCredentials());
getShardIteratorRequest.setStreamName(streamName);
getShardIteratorRequest.setShardId(shardId);
getShardIteratorRequest.setShardIteratorType(iteratorType);
getShardIteratorRequest.setStartingSequenceNumber(sequenceNumber);
+ getShardIteratorRequest.setTimestamp(null);
+ final GetShardIteratorResult response = client.getShardIterator(getShardIteratorRequest);
+ return response.getShardIterator();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String getIterator(String shardId, String iteratorType) {
+ final GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
+ getShardIteratorRequest.setRequestCredentials(credentialsProvider.getCredentials());
+ getShardIteratorRequest.setStreamName(streamName);
+ getShardIteratorRequest.setShardId(shardId);
+ getShardIteratorRequest.setShardIteratorType(iteratorType);
+ getShardIteratorRequest.setStartingSequenceNumber(null);
+ getShardIteratorRequest.setTimestamp(null);
+ final GetShardIteratorResult response = client.getShardIterator(getShardIteratorRequest);
+ return response.getShardIterator();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String getIterator(String shardId, Date timestamp) {
+ final GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
+ getShardIteratorRequest.setRequestCredentials(credentialsProvider.getCredentials());
+ getShardIteratorRequest.setStreamName(streamName);
+ getShardIteratorRequest.setShardId(shardId);
+ getShardIteratorRequest.setShardIteratorType(ShardIteratorType.AT_TIMESTAMP);
+ getShardIteratorRequest.setStartingSequenceNumber(null);
+ getShardIteratorRequest.setTimestamp(timestamp);
final GetShardIteratorResult response = client.getShardIterator(getShardIteratorRequest);
return response.getShardIterator();
}
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/MetricsCollectingKinesisProxyDecorator.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/MetricsCollectingKinesisProxyDecorator.java
index 7263351e..d27fc6a1 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/MetricsCollectingKinesisProxyDecorator.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/MetricsCollectingKinesisProxyDecorator.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@@ -15,6 +15,7 @@
package com.amazonaws.services.kinesis.clientlibrary.proxies;
import java.nio.ByteBuffer;
+import java.util.Date;
import java.util.List;
import java.util.Set;
@@ -128,6 +129,40 @@ public class MetricsCollectingKinesisProxyDecorator implements IKinesisProxy {
}
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String getIterator(String shardId, String iteratorEnum)
+ throws ResourceNotFoundException, InvalidArgumentException {
+ long startTime = System.currentTimeMillis();
+ boolean success = false;
+ try {
+ String response = other.getIterator(shardId, iteratorEnum);
+ success = true;
+ return response;
+ } finally {
+ MetricsHelper.addSuccessAndLatency(getIteratorMetric, startTime, success, MetricsLevel.DETAILED);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String getIterator(String shardId, Date timestamp)
+ throws ResourceNotFoundException, InvalidArgumentException {
+ long startTime = System.currentTimeMillis();
+ boolean success = false;
+ try {
+ String response = other.getIterator(shardId, timestamp);
+ success = true;
+ return response;
+ } finally {
+ MetricsHelper.addSuccessAndLatency(getIteratorMetric, startTime, success, MetricsLevel.DETAILED);
+ }
+ }
+
/**
* {@inheritDoc}
*/
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/ExtendedSequenceNumber.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/ExtendedSequenceNumber.java
index 0202a17a..1ed7ed67 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/ExtendedSequenceNumber.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/ExtendedSequenceNumber.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@@ -36,9 +36,10 @@ public class ExtendedSequenceNumber implements Comparable expectedRecords = new ArrayList();
GetRecordsResult response = new GetRecordsResult();
response.setRecords(expectedRecords);
-
- when(kinesis.getIterator(SHARD_ID, iteratorType, null)).thenReturn(iterator);
+ when(kinesis.getIterator(SHARD_ID, initialPositionInStream.getTimestamp())).thenReturn(iterator);
when(kinesis.getIterator(SHARD_ID, AT_SEQUENCE_NUMBER, seqNo)).thenReturn(iterator);
+ when(kinesis.getIterator(SHARD_ID, iteratorType)).thenReturn(iterator);
when(kinesis.get(iterator, MAX_RECORDS)).thenReturn(response);
ICheckpoint checkpoint = mock(ICheckpoint.class);
when(checkpoint.getCheckpoint(SHARD_ID)).thenReturn(new ExtendedSequenceNumber(seqNo));
KinesisDataFetcher fetcher = new KinesisDataFetcher(kinesis, SHARD_INFO);
-
- fetcher.initialize(seqNo);
+ fetcher.initialize(seqNo, initialPositionInStream);
List actualRecords = fetcher.getRecords(MAX_RECORDS).getRecords();
Assert.assertEquals(expectedRecords, actualRecords);
diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java
index 5385d05e..6576e47f 100644
--- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java
+++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java
@@ -66,7 +66,8 @@ public class ProcessTaskTest {
private final boolean callProcessRecordsForEmptyRecordList = true;
// We don't want any of these tests to run checkpoint validation
private final boolean skipCheckpointValidationValue = false;
- private final InitialPositionInStream initialPositionInStream = InitialPositionInStream.LATEST;
+ private static final InitialPositionInStreamExtended INITIAL_POSITION_LATEST =
+ InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST);
private @Mock KinesisDataFetcher mockDataFetcher;
private @Mock IRecordProcessor mockRecordProcessor;
@@ -84,7 +85,8 @@ public class ProcessTaskTest {
// Set up process task
final StreamConfig config =
new StreamConfig(null, maxRecords, idleTimeMillis, callProcessRecordsForEmptyRecordList,
- skipCheckpointValidationValue, initialPositionInStream);
+ skipCheckpointValidationValue,
+ INITIAL_POSITION_LATEST);
final ShardInfo shardInfo = new ShardInfo(shardId, null, null);
processTask = new ProcessTask(
shardInfo, config, mockRecordProcessor, mockCheckpointer, mockDataFetcher, taskBackoffTimeMillis);
diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SequenceNumberValidatorTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SequenceNumberValidatorTest.java
index ce222f9e..aae93f29 100644
--- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SequenceNumberValidatorTest.java
+++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SequenceNumberValidatorTest.java
@@ -86,9 +86,9 @@ public class SequenceNumberValidatorTest {
IKinesisProxy proxy,
boolean validateWithGetIterator) {
- String[] nonNumericStrings =
- { null, "bogus-sequence-number", SentinelCheckpoint.LATEST.toString(),
- SentinelCheckpoint.SHARD_END.toString(), SentinelCheckpoint.TRIM_HORIZON.toString() };
+ String[] nonNumericStrings = { null, "bogus-sequence-number", SentinelCheckpoint.LATEST.toString(),
+ SentinelCheckpoint.SHARD_END.toString(), SentinelCheckpoint.TRIM_HORIZON.toString(),
+ SentinelCheckpoint.AT_TIMESTAMP.toString() };
for (String nonNumericString : nonNumericStrings) {
try {
diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java
index 27f8f13c..d8d39377 100644
--- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java
+++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java
@@ -18,6 +18,7 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
@@ -33,6 +34,7 @@ import static org.mockito.Mockito.when;
import java.io.File;
import java.math.BigInteger;
import java.util.ArrayList;
+import java.util.Date;
import java.util.List;
import java.util.ListIterator;
import java.util.concurrent.ExecutionException;
@@ -77,7 +79,8 @@ public class ShardConsumerTest {
private final boolean cleanupLeasesOfCompletedShards = true;
// We don't want any of these tests to run checkpoint validation
private final boolean skipCheckpointValidationValue = false;
- private final InitialPositionInStream initialPositionInStream = InitialPositionInStream.LATEST;
+ private static final InitialPositionInStreamExtended INITIAL_POSITION_LATEST =
+ InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST);
// Use Executors.newFixedThreadPool since it returns ThreadPoolExecutor, which is
// ... a non-final public class, and so can be mocked and spied.
@@ -102,8 +105,7 @@ public class ShardConsumerTest {
1,
10,
callProcessRecordsForEmptyRecordList,
- skipCheckpointValidationValue,
- initialPositionInStream);
+ skipCheckpointValidationValue, INITIAL_POSITION_LATEST);
ShardConsumer consumer =
new ShardConsumer(shardInfo,
@@ -153,8 +155,7 @@ public class ShardConsumerTest {
1,
10,
callProcessRecordsForEmptyRecordList,
- skipCheckpointValidationValue,
- initialPositionInStream);
+ skipCheckpointValidationValue, INITIAL_POSITION_LATEST);
ShardConsumer consumer =
new ShardConsumer(shardInfo,
@@ -198,8 +199,7 @@ public class ShardConsumerTest {
1,
10,
callProcessRecordsForEmptyRecordList,
- skipCheckpointValidationValue,
- initialPositionInStream);
+ skipCheckpointValidationValue, INITIAL_POSITION_LATEST);
ShardConsumer consumer =
new ShardConsumer(shardInfo,
@@ -287,8 +287,7 @@ public class ShardConsumerTest {
maxRecords,
idleTimeMS,
callProcessRecordsForEmptyRecordList,
- skipCheckpointValidationValue,
- initialPositionInStream);
+ skipCheckpointValidationValue, INITIAL_POSITION_LATEST);
ShardInfo shardInfo = new ShardInfo(streamShardId, testConcurrencyToken, null);
ShardConsumer consumer =
@@ -334,12 +333,103 @@ public class ShardConsumerTest {
executorService.shutdown();
executorService.awaitTermination(60, TimeUnit.SECONDS);
- String iterator = fileBasedProxy.getIterator(streamShardId, ShardIteratorType.TRIM_HORIZON.toString(), null);
+ String iterator = fileBasedProxy.getIterator(streamShardId, ShardIteratorType.TRIM_HORIZON.toString());
List expectedRecords = toUserRecords(fileBasedProxy.get(iterator, numRecs).getRecords());
verifyConsumedRecords(expectedRecords, processor.getProcessedRecords());
file.delete();
}
+ /**
+ * Test method for {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardConsumer#consumeShard()}
+ * that starts from initial position of type AT_TIMESTAMP.
+ */
+ @Test
+ public final void testConsumeShardWithInitialPositionAtTimestamp() throws Exception {
+ int numRecs = 7;
+ BigInteger startSeqNum = BigInteger.ONE;
+ Date timestamp = new Date(KinesisLocalFileDataCreator.STARTING_TIMESTAMP + 3);
+ InitialPositionInStreamExtended atTimestamp =
+ InitialPositionInStreamExtended.newInitialPositionAtTimestamp(timestamp);
+ String streamShardId = "kinesis-0-0";
+ String testConcurrencyToken = "testToken";
+ File file =
+ KinesisLocalFileDataCreator.generateTempDataFile(1,
+ "kinesis-0-",
+ numRecs,
+ startSeqNum,
+ "unitTestSCT002");
+
+ IKinesisProxy fileBasedProxy = new KinesisLocalFileProxy(file.getAbsolutePath());
+
+ final int maxRecords = 2;
+ final int idleTimeMS = 0; // keep unit tests fast
+ ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString());
+ checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.AT_TIMESTAMP, testConcurrencyToken);
+ @SuppressWarnings("unchecked")
+ ILeaseManager leaseManager = mock(ILeaseManager.class);
+ when(leaseManager.getLease(anyString())).thenReturn(null);
+
+ TestStreamlet processor = new TestStreamlet();
+
+ StreamConfig streamConfig =
+ new StreamConfig(fileBasedProxy,
+ maxRecords,
+ idleTimeMS,
+ callProcessRecordsForEmptyRecordList,
+ skipCheckpointValidationValue,
+ atTimestamp);
+
+ ShardInfo shardInfo = new ShardInfo(streamShardId, testConcurrencyToken, null);
+ ShardConsumer consumer =
+ new ShardConsumer(shardInfo,
+ streamConfig,
+ checkpoint,
+ processor,
+ leaseManager,
+ parentShardPollIntervalMillis,
+ cleanupLeasesOfCompletedShards,
+ executorService,
+ metricsFactory,
+ taskBackoffTimeMillis);
+
+ assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
+ consumer.consumeShard(); // check on parent shards
+ Thread.sleep(50L);
+ consumer.consumeShard(); // start initialization
+ assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.INITIALIZING)));
+ consumer.consumeShard(); // initialize
+ Thread.sleep(50L);
+
+ // We expect to process all records in numRecs calls
+ for (int i = 0; i < numRecs;) {
+ boolean newTaskSubmitted = consumer.consumeShard();
+ if (newTaskSubmitted) {
+ LOG.debug("New processing task was submitted, call # " + i);
+ assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.PROCESSING)));
+ // CHECKSTYLE:IGNORE ModifiedControlVariable FOR NEXT 1 LINES
+ i += maxRecords;
+ }
+ Thread.sleep(50L);
+ }
+
+ assertThat(processor.getShutdownReason(), nullValue());
+ consumer.beginShutdown();
+ Thread.sleep(50L);
+ assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.SHUTTING_DOWN)));
+ consumer.beginShutdown();
+ assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.SHUTDOWN_COMPLETE)));
+ assertThat(processor.getShutdownReason(), is(equalTo(ShutdownReason.ZOMBIE)));
+
+ executorService.shutdown();
+ executorService.awaitTermination(60, TimeUnit.SECONDS);
+
+ String iterator = fileBasedProxy.getIterator(streamShardId, timestamp);
+ List expectedRecords = toUserRecords(fileBasedProxy.get(iterator, numRecs).getRecords());
+ verifyConsumedRecords(expectedRecords, processor.getProcessedRecords());
+ assertEquals(4, processor.getProcessedRecords().size());
+ file.delete();
+ }
+
//@formatter:off (gets the formatting wrong)
private void verifyConsumedRecords(List expectedRecords,
List actualRecords) {
diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java
index 6843efbd..307596e3 100644
--- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java
+++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java
@@ -120,8 +120,11 @@ public class ShardSyncTaskIntegrationTest {
}
leaseManager.deleteAll();
Set shardIds = kinesisProxy.getAllShardIds();
- ShardSyncTask syncTask =
- new ShardSyncTask(kinesisProxy, leaseManager, InitialPositionInStream.LATEST, false, 0L);
+ ShardSyncTask syncTask = new ShardSyncTask(kinesisProxy,
+ leaseManager,
+ InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST),
+ false,
+ 0L);
syncTask.call();
List leases = leaseManager.listLeases();
Set leaseKeys = new HashSet();
diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java
index f02943b4..b8f6ae56 100644
--- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java
+++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java
@@ -18,6 +18,7 @@ import java.io.File;
import java.io.IOException;
import java.math.BigInteger;
import java.util.ArrayList;
+import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -59,9 +60,14 @@ import junit.framework.Assert;
// CHECKSTYLE:IGNORE JavaNCSS FOR NEXT 800 LINES
public class ShardSyncerTest {
private static final Log LOG = LogFactory.getLog(ShardSyncer.class);
- private final InitialPositionInStream latestPosition = InitialPositionInStream.LATEST;
+ private static final InitialPositionInStreamExtended INITIAL_POSITION_LATEST =
+ InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST);
+ private static final InitialPositionInStreamExtended INITIAL_POSITION_TRIM_HORIZON =
+ InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON);
+ private static final InitialPositionInStreamExtended INITIAL_POSITION_AT_TIMESTAMP =
+ InitialPositionInStreamExtended.newInitialPositionAtTimestamp(new Date(1000L));
private final boolean cleanupLeasesOfCompletedShards = true;
- AmazonDynamoDB ddbClient = DynamoDBEmbedded.create();
+ AmazonDynamoDB ddbClient = DynamoDBEmbedded.create().amazonDynamoDB();
LeaseManager leaseManager = new KinesisClientLeaseManager("tempTestTable", ddbClient);
private static final int EXPONENT = 128;
/**
@@ -111,8 +117,7 @@ public class ShardSyncerTest {
List shards = new ArrayList();
List leases = new ArrayList();
- Assert.assertTrue(
- ShardSyncer.determineNewLeasesToCreate(shards, leases, InitialPositionInStream.LATEST).isEmpty());
+ Assert.assertTrue(ShardSyncer.determineNewLeasesToCreate(shards, leases, INITIAL_POSITION_LATEST).isEmpty());
}
/**
@@ -131,7 +136,7 @@ public class ShardSyncerTest {
shards.add(ShardObjectHelper.newShard(shardId1, null, null, sequenceRange));
List newLeases =
- ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, InitialPositionInStream.LATEST);
+ ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_LATEST);
Assert.assertEquals(2, newLeases.size());
Set expectedLeaseShardIds = new HashSet();
expectedLeaseShardIds.add(shardId0);
@@ -154,7 +159,7 @@ public class ShardSyncerTest {
public final void testBootstrapShardLeasesAtTrimHorizon()
throws DependencyException, InvalidStateException, ProvisionedThroughputException, IOException,
KinesisClientLibIOException {
- testBootstrapShardLeasesAtStartingPosition(InitialPositionInStream.TRIM_HORIZON);
+ testBootstrapShardLeasesAtStartingPosition(INITIAL_POSITION_TRIM_HORIZON);
}
/**
@@ -170,7 +175,7 @@ public class ShardSyncerTest {
public final void testBootstrapShardLeasesAtLatest()
throws DependencyException, InvalidStateException, ProvisionedThroughputException, IOException,
KinesisClientLibIOException {
- testBootstrapShardLeasesAtStartingPosition(InitialPositionInStream.LATEST);
+ testBootstrapShardLeasesAtStartingPosition(INITIAL_POSITION_LATEST);
}
/**
@@ -189,9 +194,7 @@ public class ShardSyncerTest {
dataFile.deleteOnExit();
IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath());
- ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy,
- leaseManager,
- InitialPositionInStream.LATEST,
+ ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, INITIAL_POSITION_LATEST,
cleanupLeasesOfCompletedShards);
List newLeases = leaseManager.listLeases();
Set expectedLeaseShardIds = new HashSet();
@@ -223,9 +226,7 @@ public class ShardSyncerTest {
dataFile.deleteOnExit();
IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath());
- ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy,
- leaseManager,
- InitialPositionInStream.TRIM_HORIZON,
+ ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, INITIAL_POSITION_TRIM_HORIZON,
cleanupLeasesOfCompletedShards);
List newLeases = leaseManager.listLeases();
Set expectedLeaseShardIds = new HashSet();
@@ -240,6 +241,37 @@ public class ShardSyncerTest {
dataFile.delete();
}
+ /**
+ * @throws KinesisClientLibIOException
+ * @throws DependencyException
+ * @throws InvalidStateException
+ * @throws ProvisionedThroughputException
+ * @throws IOException
+ */
+ @Test
+ public final void testCheckAndCreateLeasesForNewShardsAtTimestamp()
+ throws KinesisClientLibIOException, DependencyException, InvalidStateException,
+ ProvisionedThroughputException, IOException {
+ List shards = constructShardListForGraphA();
+ File dataFile = KinesisLocalFileDataCreator.generateTempDataFile(shards, 1, "testBootstrap1");
+ dataFile.deleteOnExit();
+ IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath());
+
+ ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, INITIAL_POSITION_AT_TIMESTAMP,
+ cleanupLeasesOfCompletedShards);
+ List newLeases = leaseManager.listLeases();
+ Set expectedLeaseShardIds = new HashSet();
+ for (int i = 0; i < 11; i++) {
+ expectedLeaseShardIds.add("shardId-" + i);
+ }
+ Assert.assertEquals(expectedLeaseShardIds.size(), newLeases.size());
+ for (KinesisClientLease lease1 : newLeases) {
+ Assert.assertTrue(expectedLeaseShardIds.contains(lease1.getLeaseKey()));
+ Assert.assertEquals(ExtendedSequenceNumber.AT_TIMESTAMP, lease1.getCheckpoint());
+ }
+ dataFile.delete();
+ }
+
/**
* @throws KinesisClientLibIOException
* @throws DependencyException
@@ -259,9 +291,7 @@ public class ShardSyncerTest {
dataFile.deleteOnExit();
IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath());
- ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy,
- leaseManager,
- InitialPositionInStream.TRIM_HORIZON,
+ ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, INITIAL_POSITION_TRIM_HORIZON,
cleanupLeasesOfCompletedShards);
dataFile.delete();
}
@@ -275,9 +305,10 @@ public class ShardSyncerTest {
*/
@Test
public final void testCheckAndCreateLeasesForNewShardsAtTrimHorizonAndClosedShard()
- throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException,
- IOException {
- testCheckAndCreateLeasesForNewShardsAtTrimHorizonAndClosedShardImpl(null, Integer.MAX_VALUE);
+ throws KinesisClientLibIOException, DependencyException, InvalidStateException,
+ ProvisionedThroughputException, IOException {
+ testCheckAndCreateLeasesForNewShardsAtSpecifiedPositionAndClosedShardImpl(null,
+ Integer.MAX_VALUE, INITIAL_POSITION_TRIM_HORIZON);
}
/**
@@ -295,8 +326,8 @@ public class ShardSyncerTest {
// From the Shard Graph, the max count of calling could be 10
int maxCallingCount = 10;
for (int c = 1; c <= maxCallingCount; c = c + 2) {
- testCheckAndCreateLeasesForNewShardsAtTrimHorizonAndClosedShardImpl(
- ExceptionThrowingLeaseManagerMethods.DELETELEASE, c);
+ testCheckAndCreateLeasesForNewShardsAtSpecifiedPositionAndClosedShardImpl(
+ ExceptionThrowingLeaseManagerMethods.DELETELEASE, c, INITIAL_POSITION_TRIM_HORIZON);
// Need to clean up lease manager every time after calling ShardSyncer
leaseManager.deleteAll();
}
@@ -317,8 +348,8 @@ public class ShardSyncerTest {
// From the Shard Graph, the max count of calling could be 10
int maxCallingCount = 10;
for (int c = 1; c <= maxCallingCount; c = c + 2) {
- testCheckAndCreateLeasesForNewShardsAtTrimHorizonAndClosedShardImpl(
- ExceptionThrowingLeaseManagerMethods.LISTLEASES, c);
+ testCheckAndCreateLeasesForNewShardsAtSpecifiedPositionAndClosedShardImpl(
+ ExceptionThrowingLeaseManagerMethods.LISTLEASES, c, INITIAL_POSITION_TRIM_HORIZON);
// Need to clean up lease manager every time after calling ShardSyncer
leaseManager.deleteAll();
}
@@ -339,8 +370,8 @@ public class ShardSyncerTest {
// From the Shard Graph, the max count of calling could be 10
int maxCallingCount = 5;
for (int c = 1; c <= maxCallingCount; c = c + 2) {
- testCheckAndCreateLeasesForNewShardsAtTrimHorizonAndClosedShardImpl(
- ExceptionThrowingLeaseManagerMethods.CREATELEASEIFNOTEXISTS, c);
+ testCheckAndCreateLeasesForNewShardsAtSpecifiedPositionAndClosedShardImpl(
+ ExceptionThrowingLeaseManagerMethods.CREATELEASEIFNOTEXISTS, c,INITIAL_POSITION_TRIM_HORIZON);
// Need to clean up lease manager every time after calling ShardSyncer
leaseManager.deleteAll();
}
@@ -352,7 +383,7 @@ public class ShardSyncerTest {
// 2). exceptionTime is a very big or negative value.
private void retryCheckAndCreateLeaseForNewShards(IKinesisProxy kinesisProxy,
ExceptionThrowingLeaseManagerMethods exceptionMethod,
- int exceptionTime)
+ int exceptionTime, InitialPositionInStreamExtended position)
throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException {
if (exceptionMethod != null) {
ExceptionThrowingLeaseManager exceptionThrowingLeaseManager =
@@ -364,7 +395,7 @@ public class ShardSyncerTest {
try {
ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy,
exceptionThrowingLeaseManager,
- InitialPositionInStream.TRIM_HORIZON,
+ position,
cleanupLeasesOfCompletedShards);
return;
} catch (LeasingException e) {
@@ -376,28 +407,116 @@ public class ShardSyncerTest {
} else {
ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy,
leaseManager,
- InitialPositionInStream.TRIM_HORIZON,
+ position,
cleanupLeasesOfCompletedShards);
}
}
+ /**
+ * @throws KinesisClientLibIOException
+ * @throws DependencyException
+ * @throws InvalidStateException
+ * @throws ProvisionedThroughputException
+ * @throws IOException
+ */
+ @Test
+ public final void testCheckAndCreateLeasesForNewShardsAtTimestampAndClosedShard()
+ throws KinesisClientLibIOException, DependencyException, InvalidStateException,
+ ProvisionedThroughputException, IOException {
+ testCheckAndCreateLeasesForNewShardsAtSpecifiedPositionAndClosedShardImpl(null,
+ Integer.MAX_VALUE, INITIAL_POSITION_AT_TIMESTAMP);
+ }
+
+ /**
+ * @throws KinesisClientLibIOException
+ * @throws DependencyException
+ * @throws InvalidStateException
+ * @throws ProvisionedThroughputException
+ * @throws IOException
+ */
+ @Test
+ public final void testCheckAndCreateLeasesForNewShardsAtTimestampAndClosedShardWithDeleteLeaseExceptions()
+ throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException,
+ IOException {
+ // Define the max calling count for lease manager methods.
+ // From the Shard Graph, the max count of calling could be 10
+ int maxCallingCount = 10;
+ for (int c = 1; c <= maxCallingCount; c = c + 2) {
+ testCheckAndCreateLeasesForNewShardsAtSpecifiedPositionAndClosedShardImpl(
+ ExceptionThrowingLeaseManagerMethods.DELETELEASE,
+ c, INITIAL_POSITION_AT_TIMESTAMP);
+ // Need to clean up lease manager every time after calling ShardSyncer
+ leaseManager.deleteAll();
+ }
+ }
+
+ /**
+ * @throws KinesisClientLibIOException
+ * @throws DependencyException
+ * @throws InvalidStateException
+ * @throws ProvisionedThroughputException
+ * @throws IOException
+ */
+ @Test
+ public final void testCheckAndCreateLeasesForNewShardsAtTimestampAndClosedShardWithListLeasesExceptions()
+ throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException,
+ IOException {
+ // Define the max calling count for lease manager methods.
+ // From the Shard Graph, the max count of calling could be 10
+ int maxCallingCount = 10;
+ for (int c = 1; c <= maxCallingCount; c = c + 2) {
+ testCheckAndCreateLeasesForNewShardsAtSpecifiedPositionAndClosedShardImpl(
+ ExceptionThrowingLeaseManagerMethods.LISTLEASES,
+ c, INITIAL_POSITION_AT_TIMESTAMP);
+ // Need to clean up lease manager every time after calling ShardSyncer
+ leaseManager.deleteAll();
+ }
+ }
+
+ /**
+ * @throws KinesisClientLibIOException
+ * @throws DependencyException
+ * @throws InvalidStateException
+ * @throws ProvisionedThroughputException
+ * @throws IOException
+ */
+ @Test
+ public final void testCheckAndCreateLeasesForNewShardsAtTimestampAndClosedShardWithCreateLeaseExceptions()
+ throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException,
+ IOException {
+ // Define the max calling count for lease manager methods.
+ // From the Shard Graph, the max count of calling could be 10
+ int maxCallingCount = 5;
+ for (int c = 1; c <= maxCallingCount; c = c + 2) {
+ testCheckAndCreateLeasesForNewShardsAtSpecifiedPositionAndClosedShardImpl(
+ ExceptionThrowingLeaseManagerMethods.CREATELEASEIFNOTEXISTS,
+ c, INITIAL_POSITION_AT_TIMESTAMP);
+ // Need to clean up lease manager every time after calling ShardSyncer
+ leaseManager.deleteAll();
+ }
+ }
+
// Real implementation of testing CheckAndCreateLeasesForNewShards with different leaseManager types.
- private void testCheckAndCreateLeasesForNewShardsAtTrimHorizonAndClosedShardImpl(
- ExceptionThrowingLeaseManagerMethods exceptionMethod, int exceptionTime)
+ private void testCheckAndCreateLeasesForNewShardsAtSpecifiedPositionAndClosedShardImpl(
+ ExceptionThrowingLeaseManagerMethods exceptionMethod,
+ int exceptionTime,
+ InitialPositionInStreamExtended position)
throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException,
IOException {
+ ExtendedSequenceNumber extendedSequenceNumber =
+ new ExtendedSequenceNumber(position.getInitialPositionInStream().toString());
List shards = constructShardListForGraphA();
File dataFile = KinesisLocalFileDataCreator.generateTempDataFile(shards, 2, "testBootstrap1");
dataFile.deleteOnExit();
IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath());
- retryCheckAndCreateLeaseForNewShards(kinesisProxy, exceptionMethod, exceptionTime);
+ retryCheckAndCreateLeaseForNewShards(kinesisProxy, exceptionMethod, exceptionTime, position);
List newLeases = leaseManager.listLeases();
Map expectedShardIdToCheckpointMap =
new HashMap();
for (int i = 0; i < 11; i++) {
- expectedShardIdToCheckpointMap.put("shardId-" + i, ExtendedSequenceNumber.TRIM_HORIZON);
+ expectedShardIdToCheckpointMap.put("shardId-" + i, extendedSequenceNumber);
}
Assert.assertEquals(expectedShardIdToCheckpointMap.size(), newLeases.size());
for (KinesisClientLease lease1 : newLeases) {
@@ -415,7 +534,7 @@ public class ShardSyncerTest {
leaseManager.updateLease(childShardLease);
expectedShardIdToCheckpointMap.put(childShardLease.getLeaseKey(), new ExtendedSequenceNumber("34290"));
- retryCheckAndCreateLeaseForNewShards(kinesisProxy, exceptionMethod, exceptionTime);
+ retryCheckAndCreateLeaseForNewShards(kinesisProxy, exceptionMethod, exceptionTime, position);
newLeases = leaseManager.listLeases();
Assert.assertEquals(expectedShardIdToCheckpointMap.size(), newLeases.size());
@@ -449,11 +568,11 @@ public class ShardSyncerTest {
garbageLease.setCheckpoint(new ExtendedSequenceNumber("999"));
leaseManager.createLeaseIfNotExists(garbageLease);
Assert.assertEquals(garbageShardId, leaseManager.getLease(garbageShardId).getLeaseKey());
- testBootstrapShardLeasesAtStartingPosition(InitialPositionInStream.LATEST);
+ testBootstrapShardLeasesAtStartingPosition(INITIAL_POSITION_LATEST);
Assert.assertNull(leaseManager.getLease(garbageShardId));
}
- private void testBootstrapShardLeasesAtStartingPosition(InitialPositionInStream initialPosition)
+ private void testBootstrapShardLeasesAtStartingPosition(InitialPositionInStreamExtended initialPosition)
throws DependencyException, InvalidStateException, ProvisionedThroughputException, IOException,
KinesisClientLibIOException {
List shards = new ArrayList();
@@ -463,7 +582,7 @@ public class ShardSyncerTest {
shards.add(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange));
String shardId1 = "shardId-1";
shards.add(ShardObjectHelper.newShard(shardId1, null, null, sequenceRange));
- File dataFile = KinesisLocalFileDataCreator.generateTempDataFile(shards, 10, "testBootstrap1");
+ File dataFile = KinesisLocalFileDataCreator.generateTempDataFile(shards, 2, "testBootstrap1");
dataFile.deleteOnExit();
IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath());
@@ -475,7 +594,8 @@ public class ShardSyncerTest {
expectedLeaseShardIds.add(shardId1);
for (KinesisClientLease lease1 : newLeases) {
Assert.assertTrue(expectedLeaseShardIds.contains(lease1.getLeaseKey()));
- Assert.assertEquals(new ExtendedSequenceNumber(initialPosition.toString()), lease1.getCheckpoint());
+ Assert.assertEquals(new ExtendedSequenceNumber(initialPosition.getInitialPositionInStream().toString()),
+ lease1.getCheckpoint());
}
dataFile.delete();
}
@@ -495,11 +615,11 @@ public class ShardSyncerTest {
String shardId1 = "shardId-1";
shards.add(ShardObjectHelper.newShard(shardId1, null, null, sequenceRange));
- Set initialPositions = new HashSet();
- initialPositions.add(InitialPositionInStream.LATEST);
- initialPositions.add(InitialPositionInStream.TRIM_HORIZON);
+ Set initialPositions = new HashSet();
+ initialPositions.add(INITIAL_POSITION_LATEST);
+ initialPositions.add(INITIAL_POSITION_TRIM_HORIZON);
- for (InitialPositionInStream initialPosition : initialPositions) {
+ for (InitialPositionInStreamExtended initialPosition : initialPositions) {
List newLeases =
ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, initialPosition);
Assert.assertEquals(2, newLeases.size());
@@ -508,7 +628,8 @@ public class ShardSyncerTest {
expectedLeaseShardIds.add(shardId1);
for (KinesisClientLease lease : newLeases) {
Assert.assertTrue(expectedLeaseShardIds.contains(lease.getLeaseKey()));
- Assert.assertEquals(new ExtendedSequenceNumber(initialPosition.toString()), lease.getCheckpoint());
+ Assert.assertEquals(new ExtendedSequenceNumber(initialPosition.getInitialPositionInStream().toString()),
+ lease.getCheckpoint());
}
}
}
@@ -532,7 +653,7 @@ public class ShardSyncerTest {
ShardObjectHelper.newSequenceNumberRange("405", null)));
List newLeases =
- ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, InitialPositionInStream.LATEST);
+ ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_LATEST);
Assert.assertEquals(1, newLeases.size());
Assert.assertEquals(lastShardId, newLeases.get(0).getLeaseKey());
}
@@ -557,7 +678,7 @@ public class ShardSyncerTest {
currentLeases.add(newLease("shardId-5"));
List newLeases =
- ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, InitialPositionInStream.LATEST);
+ ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_LATEST);
Map expectedShardIdCheckpointMap =
new HashMap();
expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.TRIM_HORIZON);
@@ -595,7 +716,7 @@ public class ShardSyncerTest {
currentLeases.add(newLease("shardId-7"));
List newLeases =
- ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, InitialPositionInStream.LATEST);
+ ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_LATEST);
Map expectedShardIdCheckpointMap =
new HashMap();
expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.TRIM_HORIZON);
@@ -631,7 +752,7 @@ public class ShardSyncerTest {
currentLeases.add(newLease("shardId-5"));
List newLeases =
- ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, InitialPositionInStream.TRIM_HORIZON);
+ ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_TRIM_HORIZON);
Map expectedShardIdCheckpointMap =
new HashMap();
expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.TRIM_HORIZON);
@@ -671,7 +792,7 @@ public class ShardSyncerTest {
currentLeases.add(newLease("shardId-7"));
List newLeases =
- ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, InitialPositionInStream.TRIM_HORIZON);
+ ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_TRIM_HORIZON);
Map expectedShardIdCheckpointMap =
new HashMap();
expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.TRIM_HORIZON);
@@ -700,7 +821,7 @@ public class ShardSyncerTest {
List shards = constructShardListForGraphB();
List currentLeases = new ArrayList();
List newLeases =
- ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, InitialPositionInStream.TRIM_HORIZON);
+ ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_TRIM_HORIZON);
Map expectedShardIdCheckpointMap =
new HashMap();
for (int i = 0; i < 11; i++) {
@@ -716,6 +837,110 @@ public class ShardSyncerTest {
}
}
+ /**
+ * Test CheckIfDescendantAndAddNewLeasesForAncestors (initial position AT_TIMESTAMP)
+ * Shard structure (each level depicts a stream segment):
+ * 0 1 2 3 4 5- shards till epoch 102
+ * \ / \ / | |
+ * 6 7 4 5- shards from epoch 103 - 205
+ * \ / | /\
+ * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
+ * Current leases: (3, 4, 5)
+ */
+ @Test
+ public final void testDetermineNewLeasesToCreateSplitMergeAtTimestamp1() {
+ List shards = constructShardListForGraphA();
+ List currentLeases = new ArrayList();
+
+
+ currentLeases.add(newLease("shardId-3"));
+ currentLeases.add(newLease("shardId-4"));
+ currentLeases.add(newLease("shardId-5"));
+
+ List newLeases =
+ ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_AT_TIMESTAMP);
+ Map expectedShardIdCheckpointMap = new HashMap();
+ expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.AT_TIMESTAMP);
+ expectedShardIdCheckpointMap.put("shardId-9", ExtendedSequenceNumber.AT_TIMESTAMP);
+ expectedShardIdCheckpointMap.put("shardId-10", ExtendedSequenceNumber.AT_TIMESTAMP);
+ expectedShardIdCheckpointMap.put("shardId-6", ExtendedSequenceNumber.AT_TIMESTAMP);
+ expectedShardIdCheckpointMap.put("shardId-2", ExtendedSequenceNumber.AT_TIMESTAMP);
+ expectedShardIdCheckpointMap.put("shardId-7", ExtendedSequenceNumber.AT_TIMESTAMP);
+ expectedShardIdCheckpointMap.put("shardId-0", ExtendedSequenceNumber.AT_TIMESTAMP);
+ expectedShardIdCheckpointMap.put("shardId-1", ExtendedSequenceNumber.AT_TIMESTAMP);
+
+ Assert.assertEquals(expectedShardIdCheckpointMap.size(), newLeases.size());
+ for (KinesisClientLease lease : newLeases) {
+ Assert.assertTrue("Unexpected lease: " + lease,
+ expectedShardIdCheckpointMap.containsKey(lease.getLeaseKey()));
+ Assert.assertEquals(expectedShardIdCheckpointMap.get(lease.getLeaseKey()), lease.getCheckpoint());
+ }
+ }
+
+ /**
+ * Test CheckIfDescendantAndAddNewLeasesForAncestors (initial position AT_TIMESTAMP)
+ * Shard structure (each level depicts a stream segment):
+ * 0 1 2 3 4 5- shards till epoch 102
+ * \ / \ / | |
+ * 6 7 4 5- shards from epoch 103 - 205
+ * \ / | /\
+ * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
+ * Current leases: (4, 5, 7)
+ */
+ @Test
+ public final void testDetermineNewLeasesToCreateSplitMergeAtTimestamp2() {
+ List shards = constructShardListForGraphA();
+ List currentLeases = new ArrayList();
+
+ currentLeases.add(newLease("shardId-4"));
+ currentLeases.add(newLease("shardId-5"));
+ currentLeases.add(newLease("shardId-7"));
+
+ List newLeases =
+ ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_AT_TIMESTAMP);
+ Map expectedShardIdCheckpointMap = new HashMap();
+ expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.AT_TIMESTAMP);
+ expectedShardIdCheckpointMap.put("shardId-9", ExtendedSequenceNumber.AT_TIMESTAMP);
+ expectedShardIdCheckpointMap.put("shardId-10", ExtendedSequenceNumber.AT_TIMESTAMP);
+ expectedShardIdCheckpointMap.put("shardId-6", ExtendedSequenceNumber.AT_TIMESTAMP);
+ expectedShardIdCheckpointMap.put("shardId-0", ExtendedSequenceNumber.AT_TIMESTAMP);
+ expectedShardIdCheckpointMap.put("shardId-1", ExtendedSequenceNumber.AT_TIMESTAMP);
+
+ Assert.assertEquals(expectedShardIdCheckpointMap.size(), newLeases.size());
+ for (KinesisClientLease lease : newLeases) {
+ Assert.assertTrue("Unexpected lease: " + lease,
+ expectedShardIdCheckpointMap.containsKey(lease.getLeaseKey()));
+ Assert.assertEquals(expectedShardIdCheckpointMap.get(lease.getLeaseKey()), lease.getCheckpoint());
+ }
+ }
+
+ /**
+ * Test CheckIfDescendantAndAddNewLeasesForAncestors (initial position AT_TIMESTAMP)
+ * For shard graph B (see the construct method doc for structure).
+ * Current leases: empty set
+ */
+ @Test
+ public final void testDetermineNewLeasesToCreateGraphBNoInitialLeasesAtTimestamp() {
+ List shards = constructShardListForGraphB();
+ List currentLeases = new ArrayList();
+ List newLeases =
+ ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_AT_TIMESTAMP);
+ Map expectedShardIdCheckpointMap =
+ new HashMap();
+ for (int i = 0; i < shards.size(); i++) {
+ String expectedShardId = "shardId-" + i;
+ expectedShardIdCheckpointMap.put(expectedShardId, ExtendedSequenceNumber.AT_TIMESTAMP);
+ }
+
+ Assert.assertEquals(expectedShardIdCheckpointMap.size(), newLeases.size());
+ for (KinesisClientLease lease : newLeases) {
+ Assert.assertTrue("Unexpected lease: " + lease,
+ expectedShardIdCheckpointMap.containsKey(lease.getLeaseKey()));
+ Assert.assertEquals(expectedShardIdCheckpointMap.get(lease.getLeaseKey()), lease.getCheckpoint());
+ }
+ }
+
+
/*
* Helper method to construct a shard list for graph A. Graph A is defined below.
* Shard structure (y-axis is epochs):
@@ -808,8 +1033,7 @@ public class ShardSyncerTest {
@Test
public final void testCheckIfDescendantAndAddNewLeasesForAncestorsNullShardId() {
Map memoizationContext = new HashMap<>();
- Assert.assertFalse(ShardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(null,
- latestPosition,
+ Assert.assertFalse(ShardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(null, INITIAL_POSITION_LATEST,
null,
null,
null,
@@ -824,8 +1048,7 @@ public class ShardSyncerTest {
String shardId = "shardId-trimmed";
Map kinesisShards = new HashMap();
Map memoizationContext = new HashMap<>();
- Assert.assertFalse(ShardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(shardId,
- latestPosition,
+ Assert.assertFalse(ShardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(shardId, INITIAL_POSITION_LATEST,
null,
kinesisShards,
null,
@@ -844,8 +1067,7 @@ public class ShardSyncerTest {
shardIdsOfCurrentLeases.add(shardId);
Map newLeaseMap = new HashMap();
Map memoizationContext = new HashMap<>();
- Assert.assertTrue(ShardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(shardId,
- latestPosition,
+ Assert.assertTrue(ShardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(shardId, INITIAL_POSITION_LATEST,
shardIdsOfCurrentLeases,
kinesisShards,
newLeaseMap,
@@ -872,8 +1094,7 @@ public class ShardSyncerTest {
kinesisShards.put(shardId, ShardObjectHelper.newShard(shardId, parentShardId, adjacentParentShardId, null));
Map memoizationContext = new HashMap<>();
- Assert.assertFalse(ShardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(shardId,
- latestPosition,
+ Assert.assertFalse(ShardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(shardId, INITIAL_POSITION_LATEST,
shardIdsOfCurrentLeases,
kinesisShards,
newLeaseMap,
@@ -902,8 +1123,7 @@ public class ShardSyncerTest {
kinesisShards.put(shardId, shard);
Map memoizationContext = new HashMap<>();
- Assert.assertTrue(ShardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(shardId,
- latestPosition,
+ Assert.assertTrue(ShardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(shardId, INITIAL_POSITION_LATEST,
shardIdsOfCurrentLeases,
kinesisShards,
newLeaseMap,
diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java
index 6b77f818..a2302ad0 100644
--- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java
+++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java
@@ -42,6 +42,9 @@ import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
*/
public class ShutdownTaskTest {
private static final long TASK_BACKOFF_TIME_MILLIS = 1L;
+ private static final InitialPositionInStreamExtended INITIAL_POSITION_TRIM_HORIZON =
+ InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON);
+
Set defaultParentShardIds = new HashSet<>();
String defaultConcurrencyToken = "testToken4398";
String defaultShardId = "shardId-0000397840";
@@ -88,16 +91,15 @@ public class ShutdownTaskTest {
IKinesisProxy kinesisProxy = mock(IKinesisProxy.class);
ILeaseManager leaseManager = mock(KinesisClientLeaseManager.class);
boolean cleanupLeasesOfCompletedShards = false;
- ShutdownTask task =
- new ShutdownTask(defaultShardInfo,
- defaultRecordProcessor,
- checkpointer,
- ShutdownReason.TERMINATE,
- kinesisProxy,
- InitialPositionInStream.TRIM_HORIZON,
- cleanupLeasesOfCompletedShards ,
- leaseManager,
- TASK_BACKOFF_TIME_MILLIS);
+ ShutdownTask task = new ShutdownTask(defaultShardInfo,
+ defaultRecordProcessor,
+ checkpointer,
+ ShutdownReason.TERMINATE,
+ kinesisProxy,
+ INITIAL_POSITION_TRIM_HORIZON,
+ cleanupLeasesOfCompletedShards,
+ leaseManager,
+ TASK_BACKOFF_TIME_MILLIS);
TaskResult result = task.call();
Assert.assertNotNull(result.getException());
Assert.assertTrue(result.getException() instanceof IllegalArgumentException);
@@ -114,16 +116,15 @@ public class ShutdownTaskTest {
when(kinesisProxy.getShardList()).thenReturn(null);
ILeaseManager leaseManager = mock(KinesisClientLeaseManager.class);
boolean cleanupLeasesOfCompletedShards = false;
- ShutdownTask task =
- new ShutdownTask(defaultShardInfo,
- defaultRecordProcessor,
- checkpointer,
- ShutdownReason.TERMINATE,
- kinesisProxy,
- InitialPositionInStream.TRIM_HORIZON,
- cleanupLeasesOfCompletedShards ,
- leaseManager,
- TASK_BACKOFF_TIME_MILLIS);
+ ShutdownTask task = new ShutdownTask(defaultShardInfo,
+ defaultRecordProcessor,
+ checkpointer,
+ ShutdownReason.TERMINATE,
+ kinesisProxy,
+ INITIAL_POSITION_TRIM_HORIZON,
+ cleanupLeasesOfCompletedShards,
+ leaseManager,
+ TASK_BACKOFF_TIME_MILLIS);
TaskResult result = task.call();
Assert.assertNotNull(result.getException());
Assert.assertTrue(result.getException() instanceof KinesisClientLibIOException);
diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java
index f4cd9307..f0b42671 100644
--- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java
+++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java
@@ -29,6 +29,7 @@ import java.lang.Thread.State;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -80,7 +81,6 @@ import com.amazonaws.services.kinesis.model.HashKeyRange;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.SequenceNumberRange;
import com.amazonaws.services.kinesis.model.Shard;
-import com.amazonaws.services.kinesis.model.ShardIteratorType;
/**
* Unit tests of Worker.
@@ -101,7 +101,10 @@ public class WorkerTest {
private final boolean cleanupLeasesUponShardCompletion = true;
// We don't want any of these tests to run checkpoint validation
private final boolean skipCheckpointValidationValue = false;
- private final InitialPositionInStream initialPositionInStream = InitialPositionInStream.LATEST;
+ private static final InitialPositionInStreamExtended INITIAL_POSITION_LATEST =
+ InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST);
+ private static final InitialPositionInStreamExtended INITIAL_POSITION_TRIM_HORIZON =
+ InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON);
// CHECKSTYLE:IGNORE AnonInnerLengthCheck FOR NEXT 50 LINES
private static final com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY =
@@ -167,9 +170,7 @@ public class WorkerTest {
new StreamConfig(proxy,
maxRecords,
idleTimeInMilliseconds,
- callProcessRecordsForEmptyRecordList,
- skipCheckpointValidationValue,
- initialPositionInStream);
+ callProcessRecordsForEmptyRecordList, skipCheckpointValidationValue, INITIAL_POSITION_LATEST);
final String testConcurrencyToken = "testToken";
final String anotherConcurrencyToken = "anotherTestToken";
final String dummyKinesisShardId = "kinesis-0-0";
@@ -182,9 +183,7 @@ public class WorkerTest {
Worker worker =
new Worker(stageName,
- streamletFactory,
- streamConfig,
- InitialPositionInStream.LATEST,
+ streamletFactory, streamConfig, INITIAL_POSITION_LATEST,
parentShardPollIntervalMillis,
shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion,
@@ -219,9 +218,7 @@ public class WorkerTest {
new StreamConfig(proxy,
maxRecords,
idleTimeInMilliseconds,
- callProcessRecordsForEmptyRecordList,
- skipCheckpointValidationValue,
- initialPositionInStream);
+ callProcessRecordsForEmptyRecordList, skipCheckpointValidationValue, INITIAL_POSITION_LATEST);
final String concurrencyToken = "testToken";
final String anotherConcurrencyToken = "anotherTestToken";
final String dummyKinesisShardId = "kinesis-0-0";
@@ -235,9 +232,7 @@ public class WorkerTest {
Worker worker =
new Worker(stageName,
- streamletFactory,
- streamConfig,
- InitialPositionInStream.LATEST,
+ streamletFactory, streamConfig, INITIAL_POSITION_LATEST,
parentShardPollIntervalMillis,
shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion,
@@ -283,9 +278,7 @@ public class WorkerTest {
new StreamConfig(proxy,
maxRecords,
idleTimeInMilliseconds,
- callProcessRecordsForEmptyRecordList,
- skipCheckpointValidationValue,
- initialPositionInStream);
+ callProcessRecordsForEmptyRecordList, skipCheckpointValidationValue, INITIAL_POSITION_LATEST);
KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class);
@SuppressWarnings("unchecked")
ILeaseManager leaseManager = mock(ILeaseManager.class);
@@ -295,8 +288,7 @@ public class WorkerTest {
Worker worker =
new Worker(stageName,
recordProcessorFactory,
- streamConfig,
- InitialPositionInStream.TRIM_HORIZON,
+ streamConfig, INITIAL_POSITION_TRIM_HORIZON,
shardPollInterval,
shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion,
@@ -621,6 +613,7 @@ public class WorkerTest {
/**
* Returns executor service that will be owned by the worker. This is useful to test the scenario
* where worker shuts down the executor service also during shutdown flow.
+ *
* @return Executor service that will be owned by the worker.
*/
private WorkerThreadPoolExecutor getWorkerThreadPoolExecutor() {
@@ -665,7 +658,7 @@ public class WorkerTest {
List initialLeases = new ArrayList();
for (Shard shard : shardList) {
KinesisClientLease lease = ShardSyncer.newKCLLease(shard);
- lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON);
+ lease.setCheckpoint(ExtendedSequenceNumber.AT_TIMESTAMP);
initialLeases.add(lease);
}
runAndTestWorker(shardList, threadPoolSize, initialLeases, callProcessRecordsForEmptyRecordList, numberOfRecordsPerShard);
@@ -719,7 +712,7 @@ public class WorkerTest {
final long epsilonMillis = 1000L;
final long idleTimeInMilliseconds = 2L;
- AmazonDynamoDB ddbClient = DynamoDBEmbedded.create();
+ AmazonDynamoDB ddbClient = DynamoDBEmbedded.create().amazonDynamoDB();
LeaseManager leaseManager = new KinesisClientLeaseManager("foo", ddbClient);
leaseManager.createLeaseTableIfNotExists(1L, 1L);
for (KinesisClientLease initialLease : initialLeases) {
@@ -733,19 +726,17 @@ public class WorkerTest {
epsilonMillis,
metricsFactory);
- StreamConfig streamConfig =
- new StreamConfig(kinesisProxy,
- maxRecords,
- idleTimeInMilliseconds,
- callProcessRecordsForEmptyRecordList,
- skipCheckpointValidationValue,
- initialPositionInStream);
+ final Date timestamp = new Date(KinesisLocalFileDataCreator.STARTING_TIMESTAMP);
+ StreamConfig streamConfig = new StreamConfig(kinesisProxy,
+ maxRecords,
+ idleTimeInMilliseconds,
+ callProcessRecordsForEmptyRecordList,
+ skipCheckpointValidationValue, InitialPositionInStreamExtended.newInitialPositionAtTimestamp(timestamp));
Worker worker =
new Worker(stageName,
recordProcessorFactory,
- streamConfig,
- InitialPositionInStream.TRIM_HORIZON,
+ streamConfig, INITIAL_POSITION_TRIM_HORIZON,
parentShardPollIntervalMillis,
shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion,
@@ -843,7 +834,8 @@ public class WorkerTest {
findShardIdsAndStreamLetsOfShardsWithOnlyOneProcessor(recordProcessorFactory);
for (Shard shard : shardList) {
String shardId = shard.getShardId();
- String iterator = fileBasedProxy.getIterator(shardId, ShardIteratorType.TRIM_HORIZON.toString(), null);
+ String iterator =
+ fileBasedProxy.getIterator(shardId, new Date(KinesisLocalFileDataCreator.STARTING_TIMESTAMP));
List expectedRecords = fileBasedProxy.get(iterator, numRecs).getRecords();
if (shardIdsAndStreamLetsOfShardsWithOnlyOneProcessor.containsKey(shardId)) {
verifyAllRecordsWereConsumedExactlyOnce(expectedRecords,
@@ -859,7 +851,8 @@ public class WorkerTest {
Map> shardStreamletsRecords) {
for (Shard shard : shardList) {
String shardId = shard.getShardId();
- String iterator = fileBasedProxy.getIterator(shardId, ShardIteratorType.TRIM_HORIZON.toString(), null);
+ String iterator =
+ fileBasedProxy.getIterator(shardId, new Date(KinesisLocalFileDataCreator.STARTING_TIMESTAMP));
List expectedRecords = fileBasedProxy.get(iterator, numRecs).getRecords();
verifyAllRecordsWereConsumedAtLeastOnce(expectedRecords, shardStreamletsRecords.get(shardId));
}
diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxy.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxy.java
index a346b5c6..db70b5de 100644
--- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxy.java
+++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxy.java
@@ -25,6 +25,7 @@ import java.nio.charset.Charset;
import java.nio.charset.CharsetEncoder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
@@ -65,7 +66,9 @@ public class KinesisLocalFileProxy implements IKinesisProxy {
/** Partition key associated with data record. */
PARTITION_KEY(2),
/** Data. */
- DATA(3);
+ DATA(3),
+ /** Approximate arrival timestamp. */
+ APPROXIMATE_ARRIVAL_TIMESTAMP(4);
private final int position;
@@ -149,7 +152,7 @@ public class KinesisLocalFileProxy implements IKinesisProxy {
String[] strArr = str.split(",");
if (strArr.length != NUM_FIELDS_IN_FILE) {
throw new InvalidArgumentException("Unexpected input in file."
- + "Expected format (shardId, sequenceNumber, partitionKey, dataRecord)");
+ + "Expected format (shardId, sequenceNumber, partitionKey, dataRecord, timestamp)");
}
String shardId = strArr[LocalFileFields.SHARD_ID.getPosition()];
Record record = new Record();
@@ -157,6 +160,9 @@ public class KinesisLocalFileProxy implements IKinesisProxy {
record.setPartitionKey(strArr[LocalFileFields.PARTITION_KEY.getPosition()]);
ByteBuffer byteBuffer = encoder.encode(CharBuffer.wrap(strArr[LocalFileFields.DATA.getPosition()]));
record.setData(byteBuffer);
+ Date timestamp =
+ new Date(Long.parseLong(strArr[LocalFileFields.APPROXIMATE_ARRIVAL_TIMESTAMP.getPosition()]));
+ record.setApproximateArrivalTimestamp(timestamp);
List shardRecords = shardedDataRecords.get(shardId);
if (shardRecords == null) {
shardRecords = new ArrayList();
@@ -221,11 +227,8 @@ public class KinesisLocalFileProxy implements IKinesisProxy {
return new IteratorInfo(splits[0], splits[1]);
}
- /*
- * (non-Javadoc)
- *
- * @see com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy#getIterator(java.lang.String,
- * java.lang.String, java.lang.String)
+ /**
+ * {@inheritDoc}
*/
@Override
public String getIterator(String shardId, String iteratorEnum, String sequenceNumber)
@@ -262,6 +265,77 @@ public class KinesisLocalFileProxy implements IKinesisProxy {
}
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String getIterator(String shardId, String iteratorEnum)
+ throws ResourceNotFoundException, InvalidArgumentException {
+ /*
+ * If we don't have records in this shard, any iterator will return the empty list. Using a
+ * sequence number of 1 on an empty shard will give this behavior.
+ */
+ List shardRecords = shardedDataRecords.get(shardId);
+ if (shardRecords == null) {
+ throw new ResourceNotFoundException(shardId + " does not exist");
+ }
+ if (shardRecords.isEmpty()) {
+ return serializeIterator(shardId, "1");
+ }
+
+ final String serializedIterator;
+ if (ShardIteratorType.LATEST.toString().equals(iteratorEnum)) {
+ /*
+ * If we do have records, LATEST should return an iterator that can be used to read the
+ * last record. Our iterators are inclusive for convenience.
+ */
+ Record last = shardRecords.get(shardRecords.size() - 1);
+ serializedIterator = serializeIterator(shardId, last.getSequenceNumber());
+ } else if (ShardIteratorType.TRIM_HORIZON.toString().equals(iteratorEnum)) {
+ serializedIterator = serializeIterator(shardId, shardRecords.get(0).getSequenceNumber());
+ } else {
+ throw new IllegalArgumentException("IteratorEnum value was invalid: " + iteratorEnum);
+ }
+ return serializedIterator;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String getIterator(String shardId, Date timestamp)
+ throws ResourceNotFoundException, InvalidArgumentException {
+ /*
+ * If we don't have records in this shard, any iterator will return the empty list. Using a
+ * sequence number of 1 on an empty shard will give this behavior.
+ */
+ List shardRecords = shardedDataRecords.get(shardId);
+ if (shardRecords == null) {
+ throw new ResourceNotFoundException(shardId + " does not exist");
+ }
+ if (shardRecords.isEmpty()) {
+ return serializeIterator(shardId, "1");
+ }
+
+ final String serializedIterator;
+ if (timestamp != null) {
+ String seqNumAtTimestamp = findSequenceNumberAtTimestamp(shardRecords, timestamp);
+ serializedIterator = serializeIterator(shardId, seqNumAtTimestamp);
+ } else {
+ throw new IllegalArgumentException("Timestamp must be specified for AT_TIMESTAMP iterator");
+ }
+ return serializedIterator;
+ }
+
+ private String findSequenceNumberAtTimestamp(final List shardRecords, final Date timestamp) {
+ for (Record rec : shardRecords) {
+ if (rec.getApproximateArrivalTimestamp().getTime() >= timestamp.getTime()) {
+ return rec.getSequenceNumber();
+ }
+ }
+ return null;
+ }
+
/*
* (non-Javadoc)
*
diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/util/KinesisLocalFileDataCreator.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/util/KinesisLocalFileDataCreator.java
index 795f2db9..e5e4419a 100644
--- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/util/KinesisLocalFileDataCreator.java
+++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/util/KinesisLocalFileDataCreator.java
@@ -51,6 +51,17 @@ public class KinesisLocalFileDataCreator {
private static final int PARTITION_KEY_LENGTH = 10;
private static final int DATA_LENGTH = 40;
+ /**
+ * Starting timestamp - also referenced in KinesisLocalFileProxyTest.
+ */
+ public static final long STARTING_TIMESTAMP = 1462345678910L;
+
+ /**
+ * This is used to allow few records to have the same timestamps (to mimic real life scenarios).
+ * Records 5n-1 and 5n will have the same timestamp (n > 0).
+ */
+ private static final int DIVISOR = 5;
+
private KinesisLocalFileDataCreator() {
}
@@ -96,6 +107,7 @@ public class KinesisLocalFileDataCreator {
fileWriter.write(serializedShardList);
fileWriter.newLine();
BigInteger sequenceNumberIncrement = new BigInteger("0");
+ long timestamp = STARTING_TIMESTAMP;
for (int i = 0; i < numRecordsPerShard; i++) {
for (Shard shard : shardList) {
BigInteger sequenceNumber =
@@ -112,7 +124,12 @@ public class KinesisLocalFileDataCreator {
String partitionKey =
PARTITION_KEY_PREFIX + shard.getShardId() + generateRandomString(PARTITION_KEY_LENGTH);
String data = generateRandomString(DATA_LENGTH);
- String line = shard.getShardId() + "," + sequenceNumber + "," + partitionKey + "," + data;
+
+ // Allow few records to have the same timestamps (to mimic real life scenarios).
+ timestamp = (i % DIVISOR == 0) ? timestamp : timestamp + 1;
+ String line = shard.getShardId() + "," + sequenceNumber + "," + partitionKey + "," + data + ","
+ + timestamp;
+
fileWriter.write(line);
fileWriter.newLine();
sequenceNumberIncrement = sequenceNumberIncrement.add(BigInteger.ONE);