From ce2604e58e92de9adf32a459d70ce2c09b4289a0 Mon Sep 17 00:00:00 2001 From: stair Date: Sun, 23 Apr 2023 13:15:09 -0400 Subject: [PATCH] Overrode `SingleStreamTracker#orphanedInitialPositionInStream()` to match that of the provided `StreamConfig`. --- .../amazon/kinesis/common/StreamConfig.java | 2 -- .../kinesis/processor/SingleStreamTracker.java | 14 ++++++++++++++ .../processor/SingleStreamTrackerTest.java | 18 ++++++++++++++++++ 3 files changed, 32 insertions(+), 2 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamConfig.java index 8ca75dec..836fe83b 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamConfig.java @@ -27,5 +27,3 @@ public class StreamConfig { private final InitialPositionInStreamExtended initialPositionInStreamExtended; private String consumerArn; } - - diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/SingleStreamTracker.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/SingleStreamTracker.java index 9b5f85c3..995401d7 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/SingleStreamTracker.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/SingleStreamTracker.java @@ -45,6 +45,14 @@ public class SingleStreamTracker implements StreamTracker { private final List streamConfigs; + /** + * Cached reference to {@link StreamConfig#initialPositionInStreamExtended()} + * to avoid unnecessary getter invocations. + */ + @EqualsAndHashCode.Exclude + @ToString.Exclude + private final InitialPositionInStreamExtended initialPositionInStream; + public SingleStreamTracker(String streamName) { this(StreamIdentifier.singleStreamInstance(streamName)); } @@ -72,6 +80,7 @@ public class SingleStreamTracker implements StreamTracker { public SingleStreamTracker(@NonNull StreamIdentifier streamIdentifier, @NonNull StreamConfig streamConfig) { this.streamIdentifier = streamIdentifier; this.streamConfigs = Collections.singletonList(streamConfig); + this.initialPositionInStream = streamConfig.initialPositionInStreamExtended(); } @Override @@ -84,6 +93,11 @@ public class SingleStreamTracker implements StreamTracker { return NO_LEASE_DELETION; } + @Override + public InitialPositionInStreamExtended orphanedStreamInitialPositionInStream() { + return initialPositionInStream; + } + @Override public boolean isMultiStream() { return false; diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/processor/SingleStreamTrackerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/processor/SingleStreamTrackerTest.java index 9ae19ba3..75f3d8e1 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/processor/SingleStreamTrackerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/processor/SingleStreamTrackerTest.java @@ -15,6 +15,8 @@ package software.amazon.kinesis.processor; +import java.util.Date; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; @@ -48,6 +50,22 @@ public class SingleStreamTrackerTest { validate(tracker, expectedPosition); } + @Test + public void testConsistencyOfInitialPositionInStream() { + for (final InitialPositionInStream position : InitialPositionInStream.values()) { + final InitialPositionInStreamExtended expectedPosition; + if (position == InitialPositionInStream.AT_TIMESTAMP) { + expectedPosition = InitialPositionInStreamExtended.newInitialPositionAtTimestamp(new Date()); + } else { + expectedPosition = InitialPositionInStreamExtended.newInitialPosition(position); + } + + final StreamTracker tracker = new SingleStreamTracker(STREAM_NAME, expectedPosition); + + assertEquals(expectedPosition, tracker.orphanedStreamInitialPositionInStream()); + } + } + private static void validate(StreamTracker tracker) { validate(tracker, StreamTracker.DEFAULT_POSITION_IN_STREAM); }