Clean up streams from currentStreamConfigMap (#1273)

This commit is contained in:
chenylee-aws 2024-04-01 14:53:16 -07:00 committed by GitHub
parent bf5ab60f4b
commit 24774bc2e3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 115 additions and 31 deletions

View file

@ -16,6 +16,7 @@ package software.amazon.kinesis.coordinator;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ComparisonChain; import com.google.common.collect.ComparisonChain;
import lombok.AccessLevel;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;
import lombok.Getter; import lombok.Getter;
import lombok.NonNull; import lombok.NonNull;
@ -26,6 +27,7 @@ import org.apache.commons.lang3.Validate;
import software.amazon.awssdk.services.cloudwatch.model.StandardUnit; import software.amazon.awssdk.services.cloudwatch.model.StandardUnit;
import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.HashKeyRangeForLease; import software.amazon.kinesis.common.HashKeyRangeForLease;
import software.amazon.kinesis.common.StreamConfig; import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.common.StreamIdentifier;
@ -58,6 +60,7 @@ import java.util.Set;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -70,6 +73,7 @@ import static software.amazon.kinesis.common.HashKeyRangeForLease.fromHashKeyRan
@Getter @Getter
@EqualsAndHashCode @EqualsAndHashCode
@Slf4j @Slf4j
@KinesisClientInternalApi
class PeriodicShardSyncManager { class PeriodicShardSyncManager {
private static final long INITIAL_DELAY = 60 * 1000L; private static final long INITIAL_DELAY = 60 * 1000L;
@VisibleForTesting @VisibleForTesting
@ -90,6 +94,8 @@ class PeriodicShardSyncManager {
private final MetricsFactory metricsFactory; private final MetricsFactory metricsFactory;
private final long leasesRecoveryAuditorExecutionFrequencyMillis; private final long leasesRecoveryAuditorExecutionFrequencyMillis;
private final int leasesRecoveryAuditorInconsistencyConfidenceThreshold; private final int leasesRecoveryAuditorInconsistencyConfidenceThreshold;
@Getter(AccessLevel.NONE)
private final AtomicBoolean leaderSynced;
private boolean isRunning; private boolean isRunning;
PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, LeaseRefresher leaseRefresher, PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, LeaseRefresher leaseRefresher,
@ -98,11 +104,13 @@ class PeriodicShardSyncManager {
Map<StreamConfig, ShardSyncTaskManager> streamToShardSyncTaskManagerMap, Map<StreamConfig, ShardSyncTaskManager> streamToShardSyncTaskManagerMap,
boolean isMultiStreamingMode, MetricsFactory metricsFactory, boolean isMultiStreamingMode, MetricsFactory metricsFactory,
long leasesRecoveryAuditorExecutionFrequencyMillis, long leasesRecoveryAuditorExecutionFrequencyMillis,
int leasesRecoveryAuditorInconsistencyConfidenceThreshold) { int leasesRecoveryAuditorInconsistencyConfidenceThreshold,
AtomicBoolean leaderSynced){
this(workerId, leaderDecider, leaseRefresher, currentStreamConfigMap, shardSyncTaskManagerProvider, this(workerId, leaderDecider, leaseRefresher, currentStreamConfigMap, shardSyncTaskManagerProvider,
streamToShardSyncTaskManagerMap, streamToShardSyncTaskManagerMap,
Executors.newSingleThreadScheduledExecutor(), isMultiStreamingMode, metricsFactory, Executors.newSingleThreadScheduledExecutor(), isMultiStreamingMode, metricsFactory,
leasesRecoveryAuditorExecutionFrequencyMillis, leasesRecoveryAuditorInconsistencyConfidenceThreshold); leasesRecoveryAuditorExecutionFrequencyMillis, leasesRecoveryAuditorInconsistencyConfidenceThreshold,
leaderSynced);
} }
PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, LeaseRefresher leaseRefresher, PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, LeaseRefresher leaseRefresher,
@ -112,7 +120,8 @@ class PeriodicShardSyncManager {
ScheduledExecutorService shardSyncThreadPool, boolean isMultiStreamingMode, ScheduledExecutorService shardSyncThreadPool, boolean isMultiStreamingMode,
MetricsFactory metricsFactory, MetricsFactory metricsFactory,
long leasesRecoveryAuditorExecutionFrequencyMillis, long leasesRecoveryAuditorExecutionFrequencyMillis,
int leasesRecoveryAuditorInconsistencyConfidenceThreshold) { int leasesRecoveryAuditorInconsistencyConfidenceThreshold,
AtomicBoolean leaderSynced) {
Validate.notBlank(workerId, "WorkerID is required to initialize PeriodicShardSyncManager."); Validate.notBlank(workerId, "WorkerID is required to initialize PeriodicShardSyncManager.");
Validate.notNull(leaderDecider, "LeaderDecider is required to initialize PeriodicShardSyncManager."); Validate.notNull(leaderDecider, "LeaderDecider is required to initialize PeriodicShardSyncManager.");
this.workerId = workerId; this.workerId = workerId;
@ -126,6 +135,7 @@ class PeriodicShardSyncManager {
this.metricsFactory = metricsFactory; this.metricsFactory = metricsFactory;
this.leasesRecoveryAuditorExecutionFrequencyMillis = leasesRecoveryAuditorExecutionFrequencyMillis; this.leasesRecoveryAuditorExecutionFrequencyMillis = leasesRecoveryAuditorExecutionFrequencyMillis;
this.leasesRecoveryAuditorInconsistencyConfidenceThreshold = leasesRecoveryAuditorInconsistencyConfidenceThreshold; this.leasesRecoveryAuditorInconsistencyConfidenceThreshold = leasesRecoveryAuditorInconsistencyConfidenceThreshold;
this.leaderSynced = leaderSynced;
} }
public synchronized TaskResult start() { public synchronized TaskResult start() {
@ -173,7 +183,7 @@ class PeriodicShardSyncManager {
} }
private void runShardSync() { private void runShardSync() {
if (leaderDecider.isLeader(workerId)) { if (leaderDecider.isLeader(workerId) && leaderSynced.get()) {
log.info(String.format("WorkerId %s is leader, running the periodic shard sync task", workerId)); log.info(String.format("WorkerId %s is leader, running the periodic shard sync task", workerId));
final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory,

View file

@ -41,6 +41,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -184,7 +185,7 @@ public class Scheduler implements Runnable {
private boolean leasesSyncedOnAppInit = false; private boolean leasesSyncedOnAppInit = false;
@Getter(AccessLevel.NONE) @Getter(AccessLevel.NONE)
private boolean shouldSyncLeases = true; private final AtomicBoolean leaderSynced = new AtomicBoolean(false);
/** /**
* Used to ensure that only one requestedShutdown is in progress at a time. * Used to ensure that only one requestedShutdown is in progress at a time.
@ -294,7 +295,8 @@ public class Scheduler implements Runnable {
leaseManagementConfig.workerIdentifier(), leaderDecider, leaseRefresher, currentStreamConfigMap, leaseManagementConfig.workerIdentifier(), leaderDecider, leaseRefresher, currentStreamConfigMap,
shardSyncTaskManagerProvider, streamToShardSyncTaskManagerMap, isMultiStreamMode, metricsFactory, shardSyncTaskManagerProvider, streamToShardSyncTaskManagerMap, isMultiStreamMode, metricsFactory,
leaseManagementConfig.leasesRecoveryAuditorExecutionFrequencyMillis(), leaseManagementConfig.leasesRecoveryAuditorExecutionFrequencyMillis(),
leaseManagementConfig.leasesRecoveryAuditorInconsistencyConfidenceThreshold()); leaseManagementConfig.leasesRecoveryAuditorInconsistencyConfidenceThreshold(),
leaderSynced);
this.leaseCleanupManager = this.leaseManagementConfig.leaseManagementFactory(leaseSerializer, isMultiStreamMode) this.leaseCleanupManager = this.leaseManagementConfig.leaseManagementFactory(leaseSerializer, isMultiStreamMode)
.createLeaseCleanupManager(metricsFactory); .createLeaseCleanupManager(metricsFactory);
this.schemaRegistryDecoder = this.schemaRegistryDecoder =
@ -421,8 +423,9 @@ public class Scheduler implements Runnable {
// check for new streams and sync with the scheduler state // check for new streams and sync with the scheduler state
if (isLeader()) { if (isLeader()) {
checkAndSyncStreamShardsAndLeases(); checkAndSyncStreamShardsAndLeases();
leaderSynced.set(true);
} else { } else {
shouldSyncLeases = true; leaderSynced.set(false);
} }
logExecutorState(); logExecutorState();
@ -461,13 +464,28 @@ public class Scheduler implements Runnable {
final Map<StreamIdentifier, StreamConfig> newStreamConfigMap = streamTracker.streamConfigList() final Map<StreamIdentifier, StreamConfig> newStreamConfigMap = streamTracker.streamConfigList()
.stream().collect(Collectors.toMap(StreamConfig::streamIdentifier, Function.identity())); .stream().collect(Collectors.toMap(StreamConfig::streamIdentifier, Function.identity()));
// This is done to ensure that we clean up the stale streams lingering in the lease table. // This is done to ensure that we clean up the stale streams lingering in the lease table.
if (isMultiStreamMode && (shouldSyncLeases || !leasesSyncedOnAppInit)) { if (!leaderSynced.get() || !leasesSyncedOnAppInit) {
// Skip updating the stream map due to no new stream since last sync // Only sync from lease table again if the currentStreamConfigMap and newStreamConfigMap contain
if (newStreamConfigMap.keySet().stream().anyMatch(s -> !currentStreamConfigMap.containsKey(s))) { // different set of streams.
syncStreamsFromLeaseTableOnAppInit(fetchMultiStreamLeases()); if (!newStreamConfigMap.keySet().equals(currentStreamConfigMap.keySet())) {
log.info("Syncing leases for leader to catch up");
final List<MultiStreamLease> leaseTableLeases = fetchMultiStreamLeases();
syncStreamsFromLeaseTableOnAppInit(leaseTableLeases);
final Set<StreamIdentifier> streamsFromLeaseTable = leaseTableLeases.stream()
.map(lease -> StreamIdentifier.multiStreamInstance(lease.streamIdentifier()))
.collect(Collectors.toSet());
// Remove stream from currentStreamConfigMap if this stream in not in the lease table and newStreamConfigMap.
// This means that the leases have already been deleted by the last leader.
currentStreamConfigMap.keySet().stream()
.filter(streamIdentifier -> !newStreamConfigMap.containsKey(streamIdentifier)
&& !streamsFromLeaseTable.contains(streamIdentifier)).forEach(stream -> {
log.info("Removing stream {} from currentStreamConfigMap due to not being active", stream);
currentStreamConfigMap.remove(stream);
staleStreamDeletionMap.remove(stream);
streamsSynced.add(stream);
});
} }
leasesSyncedOnAppInit = true; leasesSyncedOnAppInit = true;
shouldSyncLeases = false;
} }
// For new streams discovered, do a shard sync and update the currentStreamConfigMap // For new streams discovered, do a shard sync and update the currentStreamConfigMap
@ -489,7 +507,6 @@ public class Scheduler implements Runnable {
staleStreamDeletionMap.putIfAbsent(streamIdentifier, Instant.now()); staleStreamDeletionMap.putIfAbsent(streamIdentifier, Instant.now());
} }
}; };
if (formerStreamsLeasesDeletionStrategy.leaseDeletionType() == FORMER_STREAMS_AUTO_DETECTION_DEFERRED_DELETION) { if (formerStreamsLeasesDeletionStrategy.leaseDeletionType() == FORMER_STREAMS_AUTO_DETECTION_DEFERRED_DELETION) {
// Now, we are identifying the stale/old streams and enqueuing it for deferred deletion. // Now, we are identifying the stale/old streams and enqueuing it for deferred deletion.
// It is assumed that all the workers will always have the latest and consistent snapshot of streams // It is assumed that all the workers will always have the latest and consistent snapshot of streams
@ -633,10 +650,15 @@ public class Scheduler implements Runnable {
final Map<String, List<MultiStreamLease>> streamIdToShardsMap = leases.stream().collect( final Map<String, List<MultiStreamLease>> streamIdToShardsMap = leases.stream().collect(
Collectors.groupingBy(MultiStreamLease::streamIdentifier, Collectors.toCollection(ArrayList::new))); Collectors.groupingBy(MultiStreamLease::streamIdentifier, Collectors.toCollection(ArrayList::new)));
for (StreamIdentifier streamIdentifier : streamIdentifiers) { for (StreamIdentifier streamIdentifier : streamIdentifiers) {
log.warn("Found old/deleted stream: {}. Directly deleting leases of this stream.", streamIdentifier);
// Removing streamIdentifier from this map so PSSM doesn't think there is a hole in the stream while
// scheduler attempts to delete the stream if the stream is taking longer to delete. If deletion fails
// it will be retried again since stream will still show up in the staleStreamDeletionMap.
// It is fine for PSSM to detect holes and it should not do shardsync because it takes few iterations
// to breach the hole confidence interval threshold.
currentStreamConfigMap.remove(streamIdentifier);
// Deleting leases will cause the workers to shutdown the record processors for these shards. // Deleting leases will cause the workers to shutdown the record processors for these shards.
if (deleteMultiStreamLeases(streamIdToShardsMap.get(streamIdentifier.serialize()))) { if (deleteMultiStreamLeases(streamIdToShardsMap.get(streamIdentifier.serialize()))) {
log.warn("Found old/deleted stream: {}. Directly deleting leases of this stream.", streamIdentifier);
currentStreamConfigMap.remove(streamIdentifier);
staleStreamDeletionMap.remove(streamIdentifier); staleStreamDeletionMap.remove(streamIdentifier);
streamsSynced.add(streamIdentifier); streamsSynced.add(streamIdentifier);
} }

View file

@ -42,6 +42,7 @@ import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.IntStream; import java.util.stream.IntStream;
@ -77,7 +78,8 @@ public class PeriodicShardSyncManagerTest {
public void setup() { public void setup() {
streamIdentifier = StreamIdentifier.multiStreamInstance("123456789012:stream:456"); streamIdentifier = StreamIdentifier.multiStreamInstance("123456789012:stream:456");
periodicShardSyncManager = new PeriodicShardSyncManager("worker", leaderDecider, leaseRefresher, currentStreamConfigMap, periodicShardSyncManager = new PeriodicShardSyncManager("worker", leaderDecider, leaseRefresher, currentStreamConfigMap,
shardSyncTaskManagerProvider, streamToShardSyncTaskManagerMap, true, new NullMetricsFactory(), 2 * 60 * 1000, 3); shardSyncTaskManagerProvider, streamToShardSyncTaskManagerMap, true, new NullMetricsFactory(), 2 * 60 * 1000, 3,
new AtomicBoolean(true));
} }
@Test @Test

View file

@ -568,7 +568,7 @@ public class SchedulerTest {
testMultiStreamStaleStreamsAreNotDeletedImmediately(true, false); testMultiStreamStaleStreamsAreNotDeletedImmediately(true, false);
} }
private final void testMultiStreamStaleStreamsAreNotDeletedImmediately(boolean expectPendingStreamsForDeletion, private void testMultiStreamStaleStreamsAreNotDeletedImmediately(boolean expectPendingStreamsForDeletion,
boolean onlyStreamsDeletionNotLeases) boolean onlyStreamsDeletionNotLeases)
throws DependencyException, ProvisionedThroughputException, InvalidStateException { throws DependencyException, ProvisionedThroughputException, InvalidStateException {
List<StreamConfig> streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig( List<StreamConfig> streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig(
@ -584,7 +584,7 @@ public class SchedulerTest {
retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName) retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName)
.retrievalFactory(retrievalFactory); .retrievalFactory(retrievalFactory);
when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList1, streamConfigList2); when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList1, streamConfigList2);
mockListLeases(streamConfigList1);
scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
metricsConfig, processorConfig, retrievalConfig)); metricsConfig, processorConfig, retrievalConfig));
when(scheduler.shouldSyncStreamsNow()).thenReturn(true); when(scheduler.shouldSyncStreamsNow()).thenReturn(true);
@ -667,6 +667,8 @@ public class SchedulerTest {
scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
metricsConfig, processorConfig, retrievalConfig)); metricsConfig, processorConfig, retrievalConfig));
when(scheduler.shouldSyncStreamsNow()).thenReturn(true); when(scheduler.shouldSyncStreamsNow()).thenReturn(true);
mockListLeases(streamConfigList1);
Set<StreamIdentifier> syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases(); Set<StreamIdentifier> syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases();
Set<StreamIdentifier> expectedSyncedStreams = IntStream.range(1, 3).mapToObj(streamId -> StreamIdentifier.multiStreamInstance( Set<StreamIdentifier> expectedSyncedStreams = IntStream.range(1, 3).mapToObj(streamId -> StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))).collect( Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))).collect(
@ -741,6 +743,9 @@ public class SchedulerTest {
scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
metricsConfig, processorConfig, retrievalConfig)); metricsConfig, processorConfig, retrievalConfig));
when(scheduler.shouldSyncStreamsNow()).thenReturn(true); when(scheduler.shouldSyncStreamsNow()).thenReturn(true);
// Mock listLeases to exercise the delete path so scheduler doesn't remove stale streams due to not presenting
// in lease table
mockListLeases(streamConfigList1);
Set<StreamIdentifier> syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases(); Set<StreamIdentifier> syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases();
Set<StreamIdentifier> expectedSyncedStreams; Set<StreamIdentifier> expectedSyncedStreams;
Set<StreamIdentifier> expectedPendingStreams = IntStream.range(1, 3) Set<StreamIdentifier> expectedPendingStreams = IntStream.range(1, 3)
@ -792,6 +797,7 @@ public class SchedulerTest {
// when KCL starts it starts with tracking 5 stream // when KCL starts it starts with tracking 5 stream
assertEquals(Sets.newHashSet(streamConfigList1), Sets.newHashSet(scheduler.currentStreamConfigMap().values())); assertEquals(Sets.newHashSet(streamConfigList1), Sets.newHashSet(scheduler.currentStreamConfigMap().values()));
assertEquals(0, scheduler.staleStreamDeletionMap().size()); assertEquals(0, scheduler.staleStreamDeletionMap().size());
mockListLeases(streamConfigList1);
// 2 Streams are no longer needed to be consumed // 2 Streams are no longer needed to be consumed
Set<StreamIdentifier> syncedStreams1 = scheduler.checkAndSyncStreamShardsAndLeases(); Set<StreamIdentifier> syncedStreams1 = scheduler.checkAndSyncStreamShardsAndLeases();
@ -974,39 +980,75 @@ public class SchedulerTest {
@Test @Test
public void testNoDdbLookUpAsStreamMapContainsAllStreams() throws Exception { public void testNoDdbLookUpAsStreamMapContainsAllStreams() throws Exception {
final List<StreamConfig> streamConfigList = createDummyStreamConfigList(1, 6); final List<StreamConfig> streamConfigList = createDummyStreamConfigList(1, 6);
when(multiStreamTracker.streamConfigList()).thenReturn(Collections.emptyList());
prepareMultiStreamScheduler(streamConfigList); prepareMultiStreamScheduler(streamConfigList);
// Populate currentStreamConfigMap to simulate that the leader has the latest streams. // Populate currentStreamConfigMap to simulate that the leader has the latest streams.
streamConfigList.forEach(s -> scheduler.currentStreamConfigMap().put(s.streamIdentifier(), s)); multiStreamTracker.streamConfigList().forEach(s -> scheduler.currentStreamConfigMap().put(s.streamIdentifier(), s));
scheduler.checkAndSyncStreamShardsAndLeases(); scheduler.checkAndSyncStreamShardsAndLeases();
verify(scheduler, never()).syncStreamsFromLeaseTableOnAppInit(any()); verify(scheduler, never()).syncStreamsFromLeaseTableOnAppInit(any());
assertTrue(scheduler.currentStreamConfigMap().size() != 0);
} }
@Test @Test
public void testNoDdbLookUpForNewStreamAsLeaderFlippedTheShardSyncFlags() throws Exception { public void testNotRefreshForNewStreamAfterLeaderFlippedTheShouldInitialize(){
prepareMultiStreamScheduler(); prepareMultiStreamScheduler(createDummyStreamConfigList(1, 6));
scheduler.checkAndSyncStreamShardsAndLeases(); // flip the shouldInitialize flag
verify(scheduler, never()).syncStreamsFromLeaseTableOnAppInit(any()); scheduler.runProcessLoop();
verify(scheduler, times(1)).syncStreamsFromLeaseTableOnAppInit(any());
final List<StreamConfig> streamConfigList = createDummyStreamConfigList(1, 6); final List<StreamConfig> streamConfigList = createDummyStreamConfigList(1, 6);
when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList); when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList);
scheduler.checkAndSyncStreamShardsAndLeases(); scheduler.runProcessLoop();
// Since the sync path has been executed once before the DDB sync flags should be flipped // Since the sync path has been executed once before the DDB sync flags should be flipped
// to prevent doing DDB lookups in the subsequent runs. // to prevent doing DDB lookups in the subsequent runs.
verify(scheduler, never()).syncStreamsFromLeaseTableOnAppInit(any()); verify(scheduler, times(1)).syncStreamsFromLeaseTableOnAppInit(any());
assertEquals(0, streamConfigList.stream() assertEquals(0, streamConfigList.stream()
.filter(s -> !scheduler.currentStreamConfigMap().containsKey(s.streamIdentifier())).count()); .filter(s -> !scheduler.currentStreamConfigMap().containsKey(s.streamIdentifier())).count());
} }
@Test
public void testDropStreamsFromMapsWhenStreamIsNotInLeaseTableAndNewStreamConfigMap() throws Exception {
when(multiStreamTracker.streamConfigList()).thenReturn(Collections.emptyList());
prepareMultiStreamScheduler();
final List<StreamConfig> streamConfigList = createDummyStreamConfigList(1, 6);
streamConfigList.forEach(s -> scheduler.currentStreamConfigMap().put(s.streamIdentifier(), s));
scheduler.checkAndSyncStreamShardsAndLeases();
assertEquals(Collections.emptySet(), scheduler.currentStreamConfigMap().keySet());
}
@Test
public void testNotDropStreamsFromMapsWhenStreamIsInLeaseTable() throws Exception {
when(multiStreamTracker.streamConfigList()).thenReturn(Collections.emptyList());
prepareForStaleDeletedStreamCleanupTests();
final List<StreamConfig> streamConfigList = createDummyStreamConfigList(1, 6);
mockListLeases(streamConfigList);
streamConfigList.forEach(s -> scheduler.currentStreamConfigMap().put(s.streamIdentifier(), s));
final Set<StreamIdentifier> initialSet = new HashSet<>(scheduler.currentStreamConfigMap().keySet());
scheduler.checkAndSyncStreamShardsAndLeases();
assertEquals(initialSet, scheduler.currentStreamConfigMap().keySet());
assertEquals(streamConfigList.size(), scheduler.currentStreamConfigMap().keySet().size());
}
@Test
public void testNotDropStreamsFromMapsWhenStreamIsInNewStreamConfigMap() throws Exception {
final List<StreamConfig> streamConfigList = createDummyStreamConfigList(1, 6);
when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList);
prepareMultiStreamScheduler();
streamConfigList.forEach(s -> scheduler.currentStreamConfigMap().put(s.streamIdentifier(), s));
final Set<StreamIdentifier> initialSet = new HashSet<>(scheduler.currentStreamConfigMap().keySet());
scheduler.checkAndSyncStreamShardsAndLeases();
assertEquals(initialSet, scheduler.currentStreamConfigMap().keySet());
assertEquals(streamConfigList.size(), scheduler.currentStreamConfigMap().keySet().size());
}
@SafeVarargs @SafeVarargs
private final void prepareMultiStreamScheduler(List<StreamConfig>... streamConfigs) { private final void prepareMultiStreamScheduler(List<StreamConfig>... streamConfigs) {
retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName) retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName)
.retrievalFactory(retrievalFactory); .retrievalFactory(retrievalFactory);
scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
metricsConfig, processorConfig, retrievalConfig)); metricsConfig, processorConfig, retrievalConfig));
if (streamConfigs.length > 0) { stubMultiStreamTracker(streamConfigs);
stubMultiStreamTracker(streamConfigs);
}
when(scheduler.shouldSyncStreamsNow()).thenReturn(true); when(scheduler.shouldSyncStreamsNow()).thenReturn(true);
} }
@ -1024,12 +1066,20 @@ public class SchedulerTest {
@SafeVarargs @SafeVarargs
private final void stubMultiStreamTracker(List<StreamConfig>... streamConfigs) { private final void stubMultiStreamTracker(List<StreamConfig>... streamConfigs) {
OngoingStubbing<List<StreamConfig>> stub = when(multiStreamTracker.streamConfigList()); if (streamConfigs.length > 0) {
for (List<StreamConfig> streamConfig : streamConfigs) { OngoingStubbing<List<StreamConfig>> stub = when(multiStreamTracker.streamConfigList());
stub = stub.thenReturn(streamConfig); for (List<StreamConfig> streamConfig : streamConfigs) {
stub = stub.thenReturn(streamConfig);
}
} }
} }
private void mockListLeases(List<StreamConfig> configs) throws ProvisionedThroughputException, InvalidStateException, DependencyException {
when(dynamoDBLeaseRefresher.listLeases()).thenReturn(configs.stream()
.map(s -> new MultiStreamLease().streamIdentifier(s.streamIdentifier().toString())
.shardId("some_random_shard_id")).collect(Collectors.toList()));
}
/*private void runAndTestWorker(int numShards, int threadPoolSize) throws Exception { /*private void runAndTestWorker(int numShards, int threadPoolSize) throws Exception {
final int numberOfRecordsPerShard = 10; final int numberOfRecordsPerShard = 10;
final String kinesisShardPrefix = "kinesis-0-"; final String kinesisShardPrefix = "kinesis-0-";