Added unit test cases and addressed review comments
This commit is contained in:
parent
a7ae4d3e24
commit
771bc914eb
4 changed files with 129 additions and 7 deletions
|
|
@ -14,6 +14,7 @@
|
||||||
*/
|
*/
|
||||||
package software.amazon.kinesis.common;
|
package software.amazon.kinesis.common;
|
||||||
|
|
||||||
|
import lombok.EqualsAndHashCode;
|
||||||
import lombok.ToString;
|
import lombok.ToString;
|
||||||
|
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
|
|
@ -22,7 +23,7 @@ import java.util.Date;
|
||||||
* Class that houses the entities needed to specify the position in the stream from where a new application should
|
* Class that houses the entities needed to specify the position in the stream from where a new application should
|
||||||
* start.
|
* start.
|
||||||
*/
|
*/
|
||||||
@ToString
|
@ToString @EqualsAndHashCode
|
||||||
public class InitialPositionInStreamExtended {
|
public class InitialPositionInStreamExtended {
|
||||||
|
|
||||||
private final InitialPositionInStream position;
|
private final InitialPositionInStream position;
|
||||||
|
|
|
||||||
|
|
@ -6,7 +6,6 @@ import lombok.experimental.Accessors;
|
||||||
@Value
|
@Value
|
||||||
@Accessors(fluent = true)
|
@Accessors(fluent = true)
|
||||||
public class StreamConfig {
|
public class StreamConfig {
|
||||||
// TODO: Consider having streamIdentifier as the unique identifier of this class.
|
|
||||||
StreamIdentifier streamIdentifier;
|
StreamIdentifier streamIdentifier;
|
||||||
InitialPositionInStreamExtended initialPositionInStreamExtended;
|
InitialPositionInStreamExtended initialPositionInStreamExtended;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -98,7 +98,7 @@ import software.amazon.kinesis.retrieval.RetrievalConfig;
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class Scheduler implements Runnable {
|
public class Scheduler implements Runnable {
|
||||||
|
|
||||||
private static final long NEW_STREAM_CHECK_INTERVAL_MILLIS = 10000L;
|
private static final long NEW_STREAM_CHECK_INTERVAL_MILLIS = 1 * 60 * 1000L;
|
||||||
|
|
||||||
private SchedulerLog slog = new SchedulerLog();
|
private SchedulerLog slog = new SchedulerLog();
|
||||||
|
|
||||||
|
|
@ -418,11 +418,12 @@ public class Scheduler implements Runnable {
|
||||||
* Sync all streams method.
|
* Sync all streams method.
|
||||||
* @return streams that are being synced by this worker
|
* @return streams that are being synced by this worker
|
||||||
*/
|
*/
|
||||||
private Set<StreamIdentifier> checkAndSyncStreamShardsAndLeases()
|
@VisibleForTesting
|
||||||
|
Set<StreamIdentifier> checkAndSyncStreamShardsAndLeases()
|
||||||
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
|
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
|
||||||
final Set<StreamIdentifier> streamsSynced = new HashSet<>();
|
final Set<StreamIdentifier> streamsSynced = new HashSet<>();
|
||||||
|
|
||||||
if (isMultiStreamMode && (streamSyncWatch.elapsed(TimeUnit.MILLISECONDS) > NEW_STREAM_CHECK_INTERVAL_MILLIS)) {
|
if (shouldSyncStreamsNow()) {
|
||||||
final Map<StreamIdentifier, StreamConfig> newStreamConfigMap = new HashMap<>();
|
final Map<StreamIdentifier, StreamConfig> newStreamConfigMap = new HashMap<>();
|
||||||
// Making an immutable copy
|
// Making an immutable copy
|
||||||
newStreamConfigMap.putAll(multiStreamTracker.streamConfigList().stream()
|
newStreamConfigMap.putAll(multiStreamTracker.streamConfigList().stream()
|
||||||
|
|
@ -462,7 +463,12 @@ public class Scheduler implements Runnable {
|
||||||
return streamsSynced;
|
return streamsSynced;
|
||||||
}
|
}
|
||||||
|
|
||||||
private Set<StreamIdentifier> syncStreamsFromLeaseTableOnAppInit()
|
@VisibleForTesting
|
||||||
|
boolean shouldSyncStreamsNow() {
|
||||||
|
return isMultiStreamMode && (streamSyncWatch.elapsed(TimeUnit.MILLISECONDS) > NEW_STREAM_CHECK_INTERVAL_MILLIS);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void syncStreamsFromLeaseTableOnAppInit()
|
||||||
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
|
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
|
||||||
if (!leasesSyncedOnAppInit && isMultiStreamMode) {
|
if (!leasesSyncedOnAppInit && isMultiStreamMode) {
|
||||||
final Set<StreamIdentifier> streamIdentifiers = leaseCoordinator.leaseRefresher().listLeases().stream()
|
final Set<StreamIdentifier> streamIdentifiers = leaseCoordinator.leaseRefresher().listLeases().stream()
|
||||||
|
|
@ -475,7 +481,6 @@ public class Scheduler implements Runnable {
|
||||||
}
|
}
|
||||||
leasesSyncedOnAppInit = true;
|
leasesSyncedOnAppInit = true;
|
||||||
}
|
}
|
||||||
return Collections.emptySet();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// When a stream is no longer needed to be tracked, return a default StreamConfig with LATEST for faster shard end.
|
// When a stream is no longer needed to be tracked, return a default StreamConfig with LATEST for faster shard end.
|
||||||
|
|
|
||||||
|
|
@ -40,14 +40,20 @@ import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.RejectedExecutionException;
|
import java.util.concurrent.RejectedExecutionException;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
import java.util.stream.IntStream;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
import com.google.common.base.Joiner;
|
||||||
|
import com.google.common.collect.Sets;
|
||||||
import io.reactivex.plugins.RxJavaPlugins;
|
import io.reactivex.plugins.RxJavaPlugins;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
|
|
@ -58,6 +64,7 @@ import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
|
||||||
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
|
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
|
||||||
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
|
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
|
||||||
import software.amazon.awssdk.utils.Either;
|
import software.amazon.awssdk.utils.Either;
|
||||||
|
import software.amazon.awssdk.utils.Validate;
|
||||||
import software.amazon.kinesis.checkpoint.Checkpoint;
|
import software.amazon.kinesis.checkpoint.Checkpoint;
|
||||||
import software.amazon.kinesis.checkpoint.CheckpointConfig;
|
import software.amazon.kinesis.checkpoint.CheckpointConfig;
|
||||||
import software.amazon.kinesis.checkpoint.CheckpointFactory;
|
import software.amazon.kinesis.checkpoint.CheckpointFactory;
|
||||||
|
|
@ -76,6 +83,7 @@ import software.amazon.kinesis.leases.ShardInfo;
|
||||||
import software.amazon.kinesis.leases.ShardSyncTaskManager;
|
import software.amazon.kinesis.leases.ShardSyncTaskManager;
|
||||||
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseRefresher;
|
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseRefresher;
|
||||||
import software.amazon.kinesis.leases.exceptions.DependencyException;
|
import software.amazon.kinesis.leases.exceptions.DependencyException;
|
||||||
|
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
|
||||||
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
|
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
|
||||||
import software.amazon.kinesis.lifecycle.LifecycleConfig;
|
import software.amazon.kinesis.lifecycle.LifecycleConfig;
|
||||||
import software.amazon.kinesis.lifecycle.ShardConsumer;
|
import software.amazon.kinesis.lifecycle.ShardConsumer;
|
||||||
|
|
@ -375,6 +383,115 @@ public class SchedulerTest {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public final void testMultiStreamNoStreamsAreSyncedWhenStreamsAreNotRefreshed()
|
||||||
|
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
|
||||||
|
List<StreamConfig> streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig(
|
||||||
|
StreamIdentifier.multiStreamInstance(
|
||||||
|
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)),
|
||||||
|
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)))
|
||||||
|
.collect(Collectors.toCollection(LinkedList::new));
|
||||||
|
List<StreamConfig> streamConfigList2 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig(
|
||||||
|
StreamIdentifier.multiStreamInstance(
|
||||||
|
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)),
|
||||||
|
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)))
|
||||||
|
.collect(Collectors.toCollection(LinkedList::new));
|
||||||
|
retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName)
|
||||||
|
.retrievalFactory(retrievalFactory);
|
||||||
|
when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList1, streamConfigList2);
|
||||||
|
scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
|
||||||
|
metricsConfig, processorConfig, retrievalConfig));
|
||||||
|
when(scheduler.shouldSyncStreamsNow()).thenReturn(true);
|
||||||
|
Set<StreamIdentifier> syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases();
|
||||||
|
Assert.assertTrue("SyncedStreams should be empty", syncedStreams.isEmpty());
|
||||||
|
Assert.assertEquals(new HashSet(streamConfigList1), new HashSet(scheduler.currentStreamConfigMap().values()));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public final void testMultiStreamOnlyNewStreamsAreSynced()
|
||||||
|
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
|
||||||
|
List<StreamConfig> streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig(
|
||||||
|
StreamIdentifier.multiStreamInstance(
|
||||||
|
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)),
|
||||||
|
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)))
|
||||||
|
.collect(Collectors.toCollection(LinkedList::new));
|
||||||
|
List<StreamConfig> streamConfigList2 = IntStream.range(1, 7).mapToObj(streamId -> new StreamConfig(
|
||||||
|
StreamIdentifier.multiStreamInstance(
|
||||||
|
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)),
|
||||||
|
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)))
|
||||||
|
.collect(Collectors.toCollection(LinkedList::new));
|
||||||
|
retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName)
|
||||||
|
.retrievalFactory(retrievalFactory);
|
||||||
|
when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList1, streamConfigList2);
|
||||||
|
scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
|
||||||
|
metricsConfig, processorConfig, retrievalConfig));
|
||||||
|
when(scheduler.shouldSyncStreamsNow()).thenReturn(true);
|
||||||
|
Set<StreamIdentifier> syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases();
|
||||||
|
Set<StreamIdentifier> expectedSyncedStreams = IntStream.range(5, 7).mapToObj(streamId -> StreamIdentifier.multiStreamInstance(
|
||||||
|
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))).collect(
|
||||||
|
Collectors.toCollection(HashSet::new));
|
||||||
|
Assert.assertEquals(expectedSyncedStreams, syncedStreams);
|
||||||
|
Assert.assertEquals(Sets.newHashSet(streamConfigList2),
|
||||||
|
Sets.newHashSet(scheduler.currentStreamConfigMap().values()));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public final void testMultiStreamOnlyStaleStreamsAreSynced()
|
||||||
|
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
|
||||||
|
List<StreamConfig> streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig(
|
||||||
|
StreamIdentifier.multiStreamInstance(
|
||||||
|
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)),
|
||||||
|
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)))
|
||||||
|
.collect(Collectors.toCollection(LinkedList::new));
|
||||||
|
List<StreamConfig> streamConfigList2 = IntStream.range(3, 5).mapToObj(streamId -> new StreamConfig(
|
||||||
|
StreamIdentifier.multiStreamInstance(
|
||||||
|
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)),
|
||||||
|
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)))
|
||||||
|
.collect(Collectors.toCollection(LinkedList::new));
|
||||||
|
retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName)
|
||||||
|
.retrievalFactory(retrievalFactory);
|
||||||
|
when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList1, streamConfigList2);
|
||||||
|
scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
|
||||||
|
metricsConfig, processorConfig, retrievalConfig));
|
||||||
|
when(scheduler.shouldSyncStreamsNow()).thenReturn(true);
|
||||||
|
Set<StreamIdentifier> syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases();
|
||||||
|
Set<StreamIdentifier> expectedSyncedStreams = IntStream.range(1, 3).mapToObj(streamId -> StreamIdentifier.multiStreamInstance(
|
||||||
|
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))).collect(
|
||||||
|
Collectors.toCollection(HashSet::new));
|
||||||
|
Assert.assertEquals(expectedSyncedStreams, syncedStreams);
|
||||||
|
Assert.assertEquals(Sets.newHashSet(streamConfigList2),
|
||||||
|
Sets.newHashSet(scheduler.currentStreamConfigMap().values()));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public final void testMultiStreamSyncOnlyNewAndStaleStreamsAreSynced()
|
||||||
|
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
|
||||||
|
List<StreamConfig> streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig(
|
||||||
|
StreamIdentifier.multiStreamInstance(
|
||||||
|
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)),
|
||||||
|
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)))
|
||||||
|
.collect(Collectors.toCollection(LinkedList::new));
|
||||||
|
List<StreamConfig> streamConfigList2 = IntStream.range(3, 7).mapToObj(streamId -> new StreamConfig(
|
||||||
|
StreamIdentifier.multiStreamInstance(
|
||||||
|
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)),
|
||||||
|
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)))
|
||||||
|
.collect(Collectors.toCollection(LinkedList::new));
|
||||||
|
retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName)
|
||||||
|
.retrievalFactory(retrievalFactory);
|
||||||
|
when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList1, streamConfigList2);
|
||||||
|
scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
|
||||||
|
metricsConfig, processorConfig, retrievalConfig));
|
||||||
|
when(scheduler.shouldSyncStreamsNow()).thenReturn(true);
|
||||||
|
Set<StreamIdentifier> syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases();
|
||||||
|
Set<StreamIdentifier> expectedSyncedStreams = IntStream.concat(IntStream.range(1, 3), IntStream.range(5, 7))
|
||||||
|
.mapToObj(streamId -> StreamIdentifier.multiStreamInstance(
|
||||||
|
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)))
|
||||||
|
.collect(Collectors.toCollection(HashSet::new));
|
||||||
|
Assert.assertEquals(expectedSyncedStreams, syncedStreams);
|
||||||
|
Assert.assertEquals(Sets.newHashSet(streamConfigList2),
|
||||||
|
Sets.newHashSet(scheduler.currentStreamConfigMap().values()));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public final void testSchedulerShutdown() {
|
public final void testSchedulerShutdown() {
|
||||||
scheduler.shutdown();
|
scheduler.shutdown();
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue