Removing the old streams from active streams list without cleaning up the leases
This commit is contained in:
parent
562face833
commit
69a05b409b
2 changed files with 66 additions and 39 deletions
|
|
@ -492,24 +492,6 @@ public class Scheduler implements Runnable {
|
|||
}
|
||||
};
|
||||
|
||||
if (SHOULD_DO_LEASE_SYNC_FOR_OLD_STREAMS) {
|
||||
// We do lease sync for old streams, before leaving to the deletion strategy to delete leases for
|
||||
// strategy detected leases.
|
||||
Iterator<StreamIdentifier> currentSetOfStreamsIter = currentStreamConfigMap.keySet().iterator();
|
||||
while (currentSetOfStreamsIter.hasNext()) {
|
||||
StreamIdentifier streamIdentifier = currentSetOfStreamsIter.next();
|
||||
if (!newStreamConfigMap.containsKey(streamIdentifier)) {
|
||||
log.info("Found old/deleted stream: " + streamIdentifier
|
||||
+ ". Syncing shards of that stream.");
|
||||
ShardSyncTaskManager shardSyncTaskManager = createOrGetShardSyncTaskManager(
|
||||
currentStreamConfigMap.get(streamIdentifier));
|
||||
shardSyncTaskManager.submitShardSyncTask();
|
||||
currentSetOfStreamsIter.remove();
|
||||
streamsSynced.add(streamIdentifier);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (formerStreamsLeasesDeletionStrategy.leaseDeletionType() == StreamsLeasesDeletionType.FORMER_STREAMS_AUTO_DETECTION_DEFERRED_DELETION) {
|
||||
// Now, we are identifying the stale/old streams and enqueuing it for deferred deletion.
|
||||
// It is assumed that all the workers will always have the latest and consistent snapshot of streams
|
||||
|
|
@ -535,6 +517,29 @@ public class Scheduler implements Runnable {
|
|||
} else if (formerStreamsLeasesDeletionStrategy.leaseDeletionType() == StreamsLeasesDeletionType.PROVIDED_STREAMS_DEFERRED_DELETION) {
|
||||
Optional.ofNullable(formerStreamsLeasesDeletionStrategy.streamIdentifiersForLeaseCleanup()).ifPresent(
|
||||
streamIdentifiers -> streamIdentifiers.stream().forEach(streamIdentifier -> enqueueStreamLeaseDeletionOperation.accept(streamIdentifier)));
|
||||
} else {
|
||||
// Remove the old/stale streams identified through the new and existing streams list, without
|
||||
// cleaning up their leases. Disabling deprecated shard sync + lease cleanup through a flag.
|
||||
Iterator<StreamIdentifier> currentSetOfStreamsIter = currentStreamConfigMap.keySet().iterator();
|
||||
while (currentSetOfStreamsIter.hasNext()) {
|
||||
StreamIdentifier streamIdentifier = currentSetOfStreamsIter.next();
|
||||
if (!newStreamConfigMap.containsKey(streamIdentifier)) {
|
||||
if (SHOULD_DO_LEASE_SYNC_FOR_OLD_STREAMS) {
|
||||
log.info(
|
||||
"Found old/deleted stream : {}. Triggering shard sync. Removing from tracked active streams."
|
||||
+ streamIdentifier);
|
||||
ShardSyncTaskManager shardSyncTaskManager = createOrGetShardSyncTaskManager(
|
||||
currentStreamConfigMap.get(streamIdentifier));
|
||||
shardSyncTaskManager.submitShardSyncTask();
|
||||
} else {
|
||||
log.info(
|
||||
"Found old/deleted stream : {}. Removing from tracked active streams, but not cleaning up leases,"
|
||||
+ " as part of this workflow" + streamIdentifier);
|
||||
}
|
||||
currentSetOfStreamsIter.remove();
|
||||
streamsSynced.add(streamIdentifier);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Now let's scan the streamIdentifiersForLeaseCleanup eligible for deferred deletion and delete them.
|
||||
|
|
|
|||
|
|
@ -468,14 +468,14 @@ public class SchedulerTest {
|
|||
return Duration.ofHours(1);
|
||||
}
|
||||
});
|
||||
testMultiStreamStaleStreamsAreNotDeletedImmediately(true);
|
||||
testMultiStreamStaleStreamsAreNotDeletedImmediately(true, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public final void testMultiStreamStaleStreamsAreNotDeletedImmediatelyNoDeletionStrategy()
|
||||
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
|
||||
when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new NoLeaseDeletionStrategy());
|
||||
testMultiStreamStaleStreamsAreNotDeletedImmediately(false);
|
||||
testMultiStreamStaleStreamsAreNotDeletedImmediately(false, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -490,7 +490,7 @@ public class SchedulerTest {
|
|||
return Duration.ofHours(1);
|
||||
}
|
||||
});
|
||||
testMultiStreamStaleStreamsAreNotDeletedImmediately(false);
|
||||
testMultiStreamStaleStreamsAreNotDeletedImmediately(false, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -507,10 +507,11 @@ public class SchedulerTest {
|
|||
return Duration.ofHours(1);
|
||||
}
|
||||
});
|
||||
testMultiStreamStaleStreamsAreNotDeletedImmediately(true);
|
||||
testMultiStreamStaleStreamsAreNotDeletedImmediately(true, false);
|
||||
}
|
||||
|
||||
private final void testMultiStreamStaleStreamsAreNotDeletedImmediately(boolean expectPendingStreamsForDeletion)
|
||||
private final void testMultiStreamStaleStreamsAreNotDeletedImmediately(boolean expectPendingStreamsForDeletion,
|
||||
boolean onlyStreamsDeletionNotLeases)
|
||||
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
|
||||
List<StreamConfig> streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig(
|
||||
StreamIdentifier.multiStreamInstance(
|
||||
|
|
@ -533,8 +534,9 @@ public class SchedulerTest {
|
|||
Set<StreamIdentifier> expectedPendingStreams = IntStream.range(1, 3).mapToObj(streamId -> StreamIdentifier.multiStreamInstance(
|
||||
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))).collect(
|
||||
Collectors.toCollection(HashSet::new));
|
||||
Assert.assertEquals(Sets.newHashSet(), syncedStreams);
|
||||
Assert.assertEquals(Sets.newHashSet(streamConfigList1),
|
||||
Set<StreamIdentifier> expectedSyncedStreams = onlyStreamsDeletionNotLeases ? expectedPendingStreams : Sets.newHashSet();
|
||||
Assert.assertEquals(expectedSyncedStreams, syncedStreams);
|
||||
Assert.assertEquals(Sets.newHashSet(onlyStreamsDeletionNotLeases ? streamConfigList2 : streamConfigList1),
|
||||
Sets.newHashSet(scheduler.currentStreamConfigMap().values()));
|
||||
Assert.assertEquals(expectPendingStreamsForDeletion ? expectedPendingStreams : Sets.newHashSet(),
|
||||
scheduler.staleStreamDeletionMap().keySet());
|
||||
|
|
@ -625,14 +627,14 @@ public class SchedulerTest {
|
|||
return Duration.ofHours(1);
|
||||
}
|
||||
});
|
||||
testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediately(true);
|
||||
testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediately(true, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public final void testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediatelyWithNoDeletionStrategy()
|
||||
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
|
||||
when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new NoLeaseDeletionStrategy());
|
||||
testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediately(false);
|
||||
testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediately(false, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -647,7 +649,7 @@ public class SchedulerTest {
|
|||
return Duration.ofHours(1);
|
||||
}
|
||||
});
|
||||
testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediately(false);
|
||||
testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediately(false, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -665,10 +667,11 @@ public class SchedulerTest {
|
|||
return Duration.ofHours(1);
|
||||
}
|
||||
});
|
||||
testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediately(true);
|
||||
testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediately(true, false);
|
||||
}
|
||||
|
||||
private final void testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediately(boolean expectPendingStreamsForDeletion)
|
||||
private final void testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediately(boolean expectPendingStreamsForDeletion,
|
||||
boolean onlyStreamsNoLeasesDeletion)
|
||||
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
|
||||
List<StreamConfig> streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig(
|
||||
StreamIdentifier.multiStreamInstance(
|
||||
|
|
@ -687,20 +690,39 @@ public class SchedulerTest {
|
|||
metricsConfig, processorConfig, retrievalConfig));
|
||||
when(scheduler.shouldSyncStreamsNow()).thenReturn(true);
|
||||
Set<StreamIdentifier> syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases();
|
||||
Set<StreamIdentifier> expectedSyncedStreams = IntStream.range(5, 7)
|
||||
.mapToObj(streamId -> StreamIdentifier.multiStreamInstance(
|
||||
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)))
|
||||
.collect(Collectors.toCollection(HashSet::new));
|
||||
Set<StreamIdentifier> expectedSyncedStreams;
|
||||
Set<StreamIdentifier> expectedPendingStreams = IntStream.range(1, 3)
|
||||
.mapToObj(streamId -> StreamIdentifier.multiStreamInstance(
|
||||
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)))
|
||||
.collect(Collectors.toCollection(HashSet::new));
|
||||
|
||||
if(onlyStreamsNoLeasesDeletion) {
|
||||
expectedSyncedStreams = IntStream.concat(IntStream.range(1, 3), IntStream.range(5, 7))
|
||||
.mapToObj(streamId -> StreamIdentifier.multiStreamInstance(
|
||||
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)))
|
||||
.collect(Collectors.toCollection(HashSet::new));
|
||||
} else {
|
||||
expectedSyncedStreams = IntStream.range(5, 7)
|
||||
.mapToObj(streamId -> StreamIdentifier.multiStreamInstance(
|
||||
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)))
|
||||
.collect(Collectors.toCollection(HashSet::new));
|
||||
}
|
||||
|
||||
Assert.assertEquals(expectedSyncedStreams, syncedStreams);
|
||||
List<StreamConfig> expectedCurrentStreamConfigs = IntStream.range(1, 7).mapToObj(streamId -> new StreamConfig(
|
||||
StreamIdentifier.multiStreamInstance(
|
||||
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)),
|
||||
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)))
|
||||
.collect(Collectors.toCollection(LinkedList::new));
|
||||
List<StreamConfig> expectedCurrentStreamConfigs;
|
||||
if(onlyStreamsNoLeasesDeletion) {
|
||||
expectedCurrentStreamConfigs = IntStream.range(3, 7).mapToObj(streamId -> new StreamConfig(
|
||||
StreamIdentifier.multiStreamInstance(
|
||||
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)),
|
||||
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)))
|
||||
.collect(Collectors.toCollection(LinkedList::new));
|
||||
} else {
|
||||
expectedCurrentStreamConfigs = IntStream.range(1, 7).mapToObj(streamId -> new StreamConfig(
|
||||
StreamIdentifier.multiStreamInstance(
|
||||
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)),
|
||||
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)))
|
||||
.collect(Collectors.toCollection(LinkedList::new));
|
||||
}
|
||||
Assert.assertEquals(Sets.newHashSet(expectedCurrentStreamConfigs),
|
||||
Sets.newHashSet(scheduler.currentStreamConfigMap().values()));
|
||||
Assert.assertEquals(expectPendingStreamsForDeletion ? expectedPendingStreams: Sets.newHashSet(),
|
||||
|
|
|
|||
Loading…
Reference in a new issue