Builds + tests pass
Everything builds, and the tests pass. Continuing migration
This commit is contained in:
parent
5be8723688
commit
d9143ce5f5
3 changed files with 8 additions and 9 deletions
|
|
@ -20,7 +20,7 @@ import java.util.Date;
|
||||||
* Class that houses the entities needed to specify the position in the stream from where a new application should
|
* Class that houses the entities needed to specify the position in the stream from where a new application should
|
||||||
* start.
|
* start.
|
||||||
*/
|
*/
|
||||||
class InitialPositionInStreamExtended {
|
public class InitialPositionInStreamExtended {
|
||||||
|
|
||||||
private final InitialPositionInStream position;
|
private final InitialPositionInStream position;
|
||||||
private final Date timestamp;
|
private final Date timestamp;
|
||||||
|
|
@ -44,7 +44,7 @@ class InitialPositionInStreamExtended {
|
||||||
*
|
*
|
||||||
* @return The initial position in stream.
|
* @return The initial position in stream.
|
||||||
*/
|
*/
|
||||||
protected InitialPositionInStream getInitialPositionInStream() {
|
public InitialPositionInStream getInitialPositionInStream() {
|
||||||
return this.position;
|
return this.position;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -54,11 +54,11 @@ class InitialPositionInStreamExtended {
|
||||||
*
|
*
|
||||||
* @return The timestamp from where we need to start the application.
|
* @return The timestamp from where we need to start the application.
|
||||||
*/
|
*/
|
||||||
protected Date getTimestamp() {
|
public Date getTimestamp() {
|
||||||
return this.timestamp;
|
return this.timestamp;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected static InitialPositionInStreamExtended newInitialPosition(final InitialPositionInStream position) {
|
public static InitialPositionInStreamExtended newInitialPosition(final InitialPositionInStream position) {
|
||||||
switch (position) {
|
switch (position) {
|
||||||
case LATEST:
|
case LATEST:
|
||||||
return new InitialPositionInStreamExtended(InitialPositionInStream.LATEST, null);
|
return new InitialPositionInStreamExtended(InitialPositionInStream.LATEST, null);
|
||||||
|
|
@ -69,7 +69,7 @@ class InitialPositionInStreamExtended {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected static InitialPositionInStreamExtended newInitialPositionAtTimestamp(final Date timestamp) {
|
public static InitialPositionInStreamExtended newInitialPositionAtTimestamp(final Date timestamp) {
|
||||||
if (timestamp == null) {
|
if (timestamp == null) {
|
||||||
throw new IllegalArgumentException("Timestamp must be specified for InitialPosition AT_TIMESTAMP");
|
throw new IllegalArgumentException("Timestamp must be specified for InitialPosition AT_TIMESTAMP");
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -28,7 +28,6 @@ import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisDataFetcher;
|
|
||||||
import software.amazon.kinesis.metrics.MetricsHelper;
|
import software.amazon.kinesis.metrics.MetricsHelper;
|
||||||
import software.amazon.kinesis.metrics.ThreadSafeMetricsDelegatingScope;
|
import software.amazon.kinesis.metrics.ThreadSafeMetricsDelegatingScope;
|
||||||
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
|
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
|
||||||
|
|
|
||||||
|
|
@ -39,7 +39,7 @@ import lombok.extern.slf4j.Slf4j;
|
||||||
* Used to get data from Amazon Kinesis. Tracks iterator state internally.
|
* Used to get data from Amazon Kinesis. Tracks iterator state internally.
|
||||||
*/
|
*/
|
||||||
@Slf4j
|
@Slf4j
|
||||||
class KinesisDataFetcher {
|
public class KinesisDataFetcher {
|
||||||
private String nextIterator;
|
private String nextIterator;
|
||||||
private IKinesisProxy kinesisProxy;
|
private IKinesisProxy kinesisProxy;
|
||||||
private final String shardId;
|
private final String shardId;
|
||||||
|
|
@ -152,7 +152,7 @@ class KinesisDataFetcher {
|
||||||
* @param sequenceNumber advance the iterator to the record at this sequence number.
|
* @param sequenceNumber advance the iterator to the record at this sequence number.
|
||||||
* @param initialPositionInStream The initialPositionInStream.
|
* @param initialPositionInStream The initialPositionInStream.
|
||||||
*/
|
*/
|
||||||
void advanceIteratorTo(String sequenceNumber, InitialPositionInStreamExtended initialPositionInStream) {
|
public void advanceIteratorTo(String sequenceNumber, InitialPositionInStreamExtended initialPositionInStream) {
|
||||||
if (sequenceNumber == null) {
|
if (sequenceNumber == null) {
|
||||||
throw new IllegalArgumentException("SequenceNumber should not be null: shardId " + shardId);
|
throw new IllegalArgumentException("SequenceNumber should not be null: shardId " + shardId);
|
||||||
} else if (sequenceNumber.equals(SentinelCheckpoint.LATEST.toString())) {
|
} else if (sequenceNumber.equals(SentinelCheckpoint.LATEST.toString())) {
|
||||||
|
|
@ -241,7 +241,7 @@ class KinesisDataFetcher {
|
||||||
/**
|
/**
|
||||||
* @return the shardEndReached
|
* @return the shardEndReached
|
||||||
*/
|
*/
|
||||||
protected boolean isShardEndReached() {
|
public boolean isShardEndReached() {
|
||||||
return isShardEndReached;
|
return isShardEndReached;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue