From 771bc914ebc976a23cb9fc8bb31bda3ade1ffe92 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Tue, 31 Mar 2020 15:40:43 -0700 Subject: [PATCH] Added unit test cases and addressed review comments --- .../InitialPositionInStreamExtended.java | 3 +- .../amazon/kinesis/common/StreamConfig.java | 1 - .../amazon/kinesis/coordinator/Scheduler.java | 15 ++- .../kinesis/coordinator/SchedulerTest.java | 117 ++++++++++++++++++ 4 files changed, 129 insertions(+), 7 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/InitialPositionInStreamExtended.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/InitialPositionInStreamExtended.java index 437abf28..b3bedd88 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/InitialPositionInStreamExtended.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/InitialPositionInStreamExtended.java @@ -14,6 +14,7 @@ */ package software.amazon.kinesis.common; +import lombok.EqualsAndHashCode; import lombok.ToString; import java.util.Date; @@ -22,7 +23,7 @@ import java.util.Date; * Class that houses the entities needed to specify the position in the stream from where a new application should * start. */ -@ToString +@ToString @EqualsAndHashCode public class InitialPositionInStreamExtended { private final InitialPositionInStream position; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamConfig.java index 3cf0eeb2..999182b6 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamConfig.java @@ -6,7 +6,6 @@ import lombok.experimental.Accessors; @Value @Accessors(fluent = true) public class StreamConfig { - // TODO: Consider having streamIdentifier as the unique identifier of this class. StreamIdentifier streamIdentifier; InitialPositionInStreamExtended initialPositionInStreamExtended; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index 945700a7..3f496f64 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -98,7 +98,7 @@ import software.amazon.kinesis.retrieval.RetrievalConfig; @Slf4j public class Scheduler implements Runnable { - private static final long NEW_STREAM_CHECK_INTERVAL_MILLIS = 10000L; + private static final long NEW_STREAM_CHECK_INTERVAL_MILLIS = 1 * 60 * 1000L; private SchedulerLog slog = new SchedulerLog(); @@ -418,11 +418,12 @@ public class Scheduler implements Runnable { * Sync all streams method. * @return streams that are being synced by this worker */ - private Set checkAndSyncStreamShardsAndLeases() + @VisibleForTesting + Set checkAndSyncStreamShardsAndLeases() throws DependencyException, ProvisionedThroughputException, InvalidStateException { final Set streamsSynced = new HashSet<>(); - if (isMultiStreamMode && (streamSyncWatch.elapsed(TimeUnit.MILLISECONDS) > NEW_STREAM_CHECK_INTERVAL_MILLIS)) { + if (shouldSyncStreamsNow()) { final Map newStreamConfigMap = new HashMap<>(); // Making an immutable copy newStreamConfigMap.putAll(multiStreamTracker.streamConfigList().stream() @@ -462,7 +463,12 @@ public class Scheduler implements Runnable { return streamsSynced; } - private Set syncStreamsFromLeaseTableOnAppInit() + @VisibleForTesting + boolean shouldSyncStreamsNow() { + return isMultiStreamMode && (streamSyncWatch.elapsed(TimeUnit.MILLISECONDS) > NEW_STREAM_CHECK_INTERVAL_MILLIS); + } + + private void syncStreamsFromLeaseTableOnAppInit() throws DependencyException, ProvisionedThroughputException, InvalidStateException { if (!leasesSyncedOnAppInit && isMultiStreamMode) { final Set streamIdentifiers = leaseCoordinator.leaseRefresher().listLeases().stream() @@ -475,7 +481,6 @@ public class Scheduler implements Runnable { } leasesSyncedOnAppInit = true; } - return Collections.emptySet(); } // When a stream is no longer needed to be tracked, return a default StreamConfig with LATEST for faster shard end. diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java index 425af67f..ff6eab1c 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java @@ -40,14 +40,20 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.RejectedExecutionException; import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; +import com.google.common.base.Joiner; +import com.google.common.collect.Sets; import io.reactivex.plugins.RxJavaPlugins; import lombok.RequiredArgsConstructor; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -58,6 +64,7 @@ import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.utils.Either; +import software.amazon.awssdk.utils.Validate; import software.amazon.kinesis.checkpoint.Checkpoint; import software.amazon.kinesis.checkpoint.CheckpointConfig; import software.amazon.kinesis.checkpoint.CheckpointFactory; @@ -76,6 +83,7 @@ import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.ShardSyncTaskManager; import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseRefresher; import software.amazon.kinesis.leases.exceptions.DependencyException; +import software.amazon.kinesis.leases.exceptions.InvalidStateException; import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; import software.amazon.kinesis.lifecycle.LifecycleConfig; import software.amazon.kinesis.lifecycle.ShardConsumer; @@ -375,6 +383,115 @@ public class SchedulerTest { } + @Test + public final void testMultiStreamNoStreamsAreSyncedWhenStreamsAreNotRefreshed() + throws DependencyException, ProvisionedThroughputException, InvalidStateException { + List streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig( + StreamIdentifier.multiStreamInstance( + Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)), + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST))) + .collect(Collectors.toCollection(LinkedList::new)); + List streamConfigList2 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig( + StreamIdentifier.multiStreamInstance( + Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)), + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST))) + .collect(Collectors.toCollection(LinkedList::new)); + retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName) + .retrievalFactory(retrievalFactory); + when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList1, streamConfigList2); + scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, + metricsConfig, processorConfig, retrievalConfig)); + when(scheduler.shouldSyncStreamsNow()).thenReturn(true); + Set syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases(); + Assert.assertTrue("SyncedStreams should be empty", syncedStreams.isEmpty()); + Assert.assertEquals(new HashSet(streamConfigList1), new HashSet(scheduler.currentStreamConfigMap().values())); + } + + @Test + public final void testMultiStreamOnlyNewStreamsAreSynced() + throws DependencyException, ProvisionedThroughputException, InvalidStateException { + List streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig( + StreamIdentifier.multiStreamInstance( + Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)), + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST))) + .collect(Collectors.toCollection(LinkedList::new)); + List streamConfigList2 = IntStream.range(1, 7).mapToObj(streamId -> new StreamConfig( + StreamIdentifier.multiStreamInstance( + Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)), + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST))) + .collect(Collectors.toCollection(LinkedList::new)); + retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName) + .retrievalFactory(retrievalFactory); + when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList1, streamConfigList2); + scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, + metricsConfig, processorConfig, retrievalConfig)); + when(scheduler.shouldSyncStreamsNow()).thenReturn(true); + Set syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases(); + Set expectedSyncedStreams = IntStream.range(5, 7).mapToObj(streamId -> StreamIdentifier.multiStreamInstance( + Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))).collect( + Collectors.toCollection(HashSet::new)); + Assert.assertEquals(expectedSyncedStreams, syncedStreams); + Assert.assertEquals(Sets.newHashSet(streamConfigList2), + Sets.newHashSet(scheduler.currentStreamConfigMap().values())); + } + + @Test + public final void testMultiStreamOnlyStaleStreamsAreSynced() + throws DependencyException, ProvisionedThroughputException, InvalidStateException { + List streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig( + StreamIdentifier.multiStreamInstance( + Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)), + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST))) + .collect(Collectors.toCollection(LinkedList::new)); + List streamConfigList2 = IntStream.range(3, 5).mapToObj(streamId -> new StreamConfig( + StreamIdentifier.multiStreamInstance( + Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)), + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST))) + .collect(Collectors.toCollection(LinkedList::new)); + retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName) + .retrievalFactory(retrievalFactory); + when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList1, streamConfigList2); + scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, + metricsConfig, processorConfig, retrievalConfig)); + when(scheduler.shouldSyncStreamsNow()).thenReturn(true); + Set syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases(); + Set expectedSyncedStreams = IntStream.range(1, 3).mapToObj(streamId -> StreamIdentifier.multiStreamInstance( + Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))).collect( + Collectors.toCollection(HashSet::new)); + Assert.assertEquals(expectedSyncedStreams, syncedStreams); + Assert.assertEquals(Sets.newHashSet(streamConfigList2), + Sets.newHashSet(scheduler.currentStreamConfigMap().values())); + } + + @Test + public final void testMultiStreamSyncOnlyNewAndStaleStreamsAreSynced() + throws DependencyException, ProvisionedThroughputException, InvalidStateException { + List streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig( + StreamIdentifier.multiStreamInstance( + Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)), + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST))) + .collect(Collectors.toCollection(LinkedList::new)); + List streamConfigList2 = IntStream.range(3, 7).mapToObj(streamId -> new StreamConfig( + StreamIdentifier.multiStreamInstance( + Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)), + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST))) + .collect(Collectors.toCollection(LinkedList::new)); + retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName) + .retrievalFactory(retrievalFactory); + when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList1, streamConfigList2); + scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, + metricsConfig, processorConfig, retrievalConfig)); + when(scheduler.shouldSyncStreamsNow()).thenReturn(true); + Set syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases(); + Set expectedSyncedStreams = IntStream.concat(IntStream.range(1, 3), IntStream.range(5, 7)) + .mapToObj(streamId -> StreamIdentifier.multiStreamInstance( + Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))) + .collect(Collectors.toCollection(HashSet::new)); + Assert.assertEquals(expectedSyncedStreams, syncedStreams); + Assert.assertEquals(Sets.newHashSet(streamConfigList2), + Sets.newHashSet(scheduler.currentStreamConfigMap().values())); + } + @Test public final void testSchedulerShutdown() { scheduler.shutdown();