From d9143ce5f5381a5f61f3c09f2379e9a0a68f05c5 Mon Sep 17 00:00:00 2001 From: "Pfifer, Justin" Date: Wed, 14 Mar 2018 09:36:27 -0700 Subject: [PATCH] Builds + tests pass Everything builds, and the tests pass. Continuing migration --- .../lib/worker/InitialPositionInStreamExtended.java | 10 +++++----- .../AsynchronousGetRecordsRetrievalStrategy.java | 1 - .../amazon/kinesis/retrieval/KinesisDataFetcher.java | 6 +++--- 3 files changed, 8 insertions(+), 9 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitialPositionInStreamExtended.java b/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitialPositionInStreamExtended.java index 6a9948c7..51209d30 100644 --- a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitialPositionInStreamExtended.java +++ b/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitialPositionInStreamExtended.java @@ -20,7 +20,7 @@ import java.util.Date; * Class that houses the entities needed to specify the position in the stream from where a new application should * start. */ -class InitialPositionInStreamExtended { +public class InitialPositionInStreamExtended { private final InitialPositionInStream position; private final Date timestamp; @@ -44,7 +44,7 @@ class InitialPositionInStreamExtended { * * @return The initial position in stream. */ - protected InitialPositionInStream getInitialPositionInStream() { + public InitialPositionInStream getInitialPositionInStream() { return this.position; } @@ -54,11 +54,11 @@ class InitialPositionInStreamExtended { * * @return The timestamp from where we need to start the application. */ - protected Date getTimestamp() { + public Date getTimestamp() { return this.timestamp; } - protected static InitialPositionInStreamExtended newInitialPosition(final InitialPositionInStream position) { + public static InitialPositionInStreamExtended newInitialPosition(final InitialPositionInStream position) { switch (position) { case LATEST: return new InitialPositionInStreamExtended(InitialPositionInStream.LATEST, null); @@ -69,7 +69,7 @@ class InitialPositionInStreamExtended { } } - protected static InitialPositionInStreamExtended newInitialPositionAtTimestamp(final Date timestamp) { + public static InitialPositionInStreamExtended newInitialPositionAtTimestamp(final Date timestamp) { if (timestamp == null) { throw new IllegalArgumentException("Timestamp must be specified for InitialPosition AT_TIMESTAMP"); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/AsynchronousGetRecordsRetrievalStrategy.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/AsynchronousGetRecordsRetrievalStrategy.java index 05da601b..25937a77 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/AsynchronousGetRecordsRetrievalStrategy.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/AsynchronousGetRecordsRetrievalStrategy.java @@ -28,7 +28,6 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisDataFetcher; import software.amazon.kinesis.metrics.MetricsHelper; import software.amazon.kinesis.metrics.ThreadSafeMetricsDelegatingScope; import com.amazonaws.services.kinesis.model.ExpiredIteratorException; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/KinesisDataFetcher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/KinesisDataFetcher.java index dacf39f4..122cebdc 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/KinesisDataFetcher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/KinesisDataFetcher.java @@ -39,7 +39,7 @@ import lombok.extern.slf4j.Slf4j; * Used to get data from Amazon Kinesis. Tracks iterator state internally. */ @Slf4j -class KinesisDataFetcher { +public class KinesisDataFetcher { private String nextIterator; private IKinesisProxy kinesisProxy; private final String shardId; @@ -152,7 +152,7 @@ class KinesisDataFetcher { * @param sequenceNumber advance the iterator to the record at this sequence number. * @param initialPositionInStream The initialPositionInStream. */ - void advanceIteratorTo(String sequenceNumber, InitialPositionInStreamExtended initialPositionInStream) { + public void advanceIteratorTo(String sequenceNumber, InitialPositionInStreamExtended initialPositionInStream) { if (sequenceNumber == null) { throw new IllegalArgumentException("SequenceNumber should not be null: shardId " + shardId); } else if (sequenceNumber.equals(SentinelCheckpoint.LATEST.toString())) { @@ -241,7 +241,7 @@ class KinesisDataFetcher { /** * @return the shardEndReached */ - protected boolean isShardEndReached() { + public boolean isShardEndReached() { return isShardEndReached; }