Merge pull request #60 from ashwing/ltr_1_old_streams_cleanup

Removing the old streams from active streams list without cleaning up the leases
This commit is contained in:
ashwing 2020-06-17 10:40:49 -07:00 committed by GitHub
commit 50a1fe8922
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 66 additions and 39 deletions

View file

@ -492,24 +492,6 @@ public class Scheduler implements Runnable {
}
};
if (SHOULD_DO_LEASE_SYNC_FOR_OLD_STREAMS) {
// We do lease sync for old streams, before leaving to the deletion strategy to delete leases for
// strategy detected leases.
Iterator<StreamIdentifier> currentSetOfStreamsIter = currentStreamConfigMap.keySet().iterator();
while (currentSetOfStreamsIter.hasNext()) {
StreamIdentifier streamIdentifier = currentSetOfStreamsIter.next();
if (!newStreamConfigMap.containsKey(streamIdentifier)) {
log.info("Found old/deleted stream: " + streamIdentifier
+ ". Syncing shards of that stream.");
ShardSyncTaskManager shardSyncTaskManager = createOrGetShardSyncTaskManager(
currentStreamConfigMap.get(streamIdentifier));
shardSyncTaskManager.submitShardSyncTask();
currentSetOfStreamsIter.remove();
streamsSynced.add(streamIdentifier);
}
}
}
if (formerStreamsLeasesDeletionStrategy.leaseDeletionType() == StreamsLeasesDeletionType.FORMER_STREAMS_AUTO_DETECTION_DEFERRED_DELETION) {
// Now, we are identifying the stale/old streams and enqueuing it for deferred deletion.
// It is assumed that all the workers will always have the latest and consistent snapshot of streams
@ -535,6 +517,29 @@ public class Scheduler implements Runnable {
} else if (formerStreamsLeasesDeletionStrategy.leaseDeletionType() == StreamsLeasesDeletionType.PROVIDED_STREAMS_DEFERRED_DELETION) {
Optional.ofNullable(formerStreamsLeasesDeletionStrategy.streamIdentifiersForLeaseCleanup()).ifPresent(
streamIdentifiers -> streamIdentifiers.stream().forEach(streamIdentifier -> enqueueStreamLeaseDeletionOperation.accept(streamIdentifier)));
} else {
// Remove the old/stale streams identified through the new and existing streams list, without
// cleaning up their leases. Disabling deprecated shard sync + lease cleanup through a flag.
Iterator<StreamIdentifier> currentSetOfStreamsIter = currentStreamConfigMap.keySet().iterator();
while (currentSetOfStreamsIter.hasNext()) {
StreamIdentifier streamIdentifier = currentSetOfStreamsIter.next();
if (!newStreamConfigMap.containsKey(streamIdentifier)) {
if (SHOULD_DO_LEASE_SYNC_FOR_OLD_STREAMS) {
log.info(
"Found old/deleted stream : {}. Triggering shard sync. Removing from tracked active streams."
+ streamIdentifier);
ShardSyncTaskManager shardSyncTaskManager = createOrGetShardSyncTaskManager(
currentStreamConfigMap.get(streamIdentifier));
shardSyncTaskManager.submitShardSyncTask();
} else {
log.info(
"Found old/deleted stream : {}. Removing from tracked active streams, but not cleaning up leases,"
+ " as part of this workflow" + streamIdentifier);
}
currentSetOfStreamsIter.remove();
streamsSynced.add(streamIdentifier);
}
}
}
// Now let's scan the streamIdentifiersForLeaseCleanup eligible for deferred deletion and delete them.

View file

@ -468,14 +468,14 @@ public class SchedulerTest {
return Duration.ofHours(1);
}
});
testMultiStreamStaleStreamsAreNotDeletedImmediately(true);
testMultiStreamStaleStreamsAreNotDeletedImmediately(true, false);
}
@Test
public final void testMultiStreamStaleStreamsAreNotDeletedImmediatelyNoDeletionStrategy()
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new NoLeaseDeletionStrategy());
testMultiStreamStaleStreamsAreNotDeletedImmediately(false);
testMultiStreamStaleStreamsAreNotDeletedImmediately(false, true);
}
@Test
@ -490,7 +490,7 @@ public class SchedulerTest {
return Duration.ofHours(1);
}
});
testMultiStreamStaleStreamsAreNotDeletedImmediately(false);
testMultiStreamStaleStreamsAreNotDeletedImmediately(false, false);
}
@Test
@ -507,10 +507,11 @@ public class SchedulerTest {
return Duration.ofHours(1);
}
});
testMultiStreamStaleStreamsAreNotDeletedImmediately(true);
testMultiStreamStaleStreamsAreNotDeletedImmediately(true, false);
}
private final void testMultiStreamStaleStreamsAreNotDeletedImmediately(boolean expectPendingStreamsForDeletion)
private final void testMultiStreamStaleStreamsAreNotDeletedImmediately(boolean expectPendingStreamsForDeletion,
boolean onlyStreamsDeletionNotLeases)
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
List<StreamConfig> streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig(
StreamIdentifier.multiStreamInstance(
@ -533,8 +534,9 @@ public class SchedulerTest {
Set<StreamIdentifier> expectedPendingStreams = IntStream.range(1, 3).mapToObj(streamId -> StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))).collect(
Collectors.toCollection(HashSet::new));
Assert.assertEquals(Sets.newHashSet(), syncedStreams);
Assert.assertEquals(Sets.newHashSet(streamConfigList1),
Set<StreamIdentifier> expectedSyncedStreams = onlyStreamsDeletionNotLeases ? expectedPendingStreams : Sets.newHashSet();
Assert.assertEquals(expectedSyncedStreams, syncedStreams);
Assert.assertEquals(Sets.newHashSet(onlyStreamsDeletionNotLeases ? streamConfigList2 : streamConfigList1),
Sets.newHashSet(scheduler.currentStreamConfigMap().values()));
Assert.assertEquals(expectPendingStreamsForDeletion ? expectedPendingStreams : Sets.newHashSet(),
scheduler.staleStreamDeletionMap().keySet());
@ -625,14 +627,14 @@ public class SchedulerTest {
return Duration.ofHours(1);
}
});
testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediately(true);
testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediately(true, false);
}
@Test
public final void testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediatelyWithNoDeletionStrategy()
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new NoLeaseDeletionStrategy());
testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediately(false);
testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediately(false, true);
}
@Test
@ -647,7 +649,7 @@ public class SchedulerTest {
return Duration.ofHours(1);
}
});
testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediately(false);
testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediately(false, false);
}
@Test
@ -665,10 +667,11 @@ public class SchedulerTest {
return Duration.ofHours(1);
}
});
testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediately(true);
testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediately(true, false);
}
private final void testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediately(boolean expectPendingStreamsForDeletion)
private final void testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediately(boolean expectPendingStreamsForDeletion,
boolean onlyStreamsNoLeasesDeletion)
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
List<StreamConfig> streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig(
StreamIdentifier.multiStreamInstance(
@ -687,20 +690,39 @@ public class SchedulerTest {
metricsConfig, processorConfig, retrievalConfig));
when(scheduler.shouldSyncStreamsNow()).thenReturn(true);
Set<StreamIdentifier> syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases();
Set<StreamIdentifier> expectedSyncedStreams = IntStream.range(5, 7)
.mapToObj(streamId -> StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)))
.collect(Collectors.toCollection(HashSet::new));
Set<StreamIdentifier> expectedSyncedStreams;
Set<StreamIdentifier> expectedPendingStreams = IntStream.range(1, 3)
.mapToObj(streamId -> StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)))
.collect(Collectors.toCollection(HashSet::new));
if(onlyStreamsNoLeasesDeletion) {
expectedSyncedStreams = IntStream.concat(IntStream.range(1, 3), IntStream.range(5, 7))
.mapToObj(streamId -> StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)))
.collect(Collectors.toCollection(HashSet::new));
} else {
expectedSyncedStreams = IntStream.range(5, 7)
.mapToObj(streamId -> StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)))
.collect(Collectors.toCollection(HashSet::new));
}
Assert.assertEquals(expectedSyncedStreams, syncedStreams);
List<StreamConfig> expectedCurrentStreamConfigs = IntStream.range(1, 7).mapToObj(streamId -> new StreamConfig(
StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)),
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)))
.collect(Collectors.toCollection(LinkedList::new));
List<StreamConfig> expectedCurrentStreamConfigs;
if(onlyStreamsNoLeasesDeletion) {
expectedCurrentStreamConfigs = IntStream.range(3, 7).mapToObj(streamId -> new StreamConfig(
StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)),
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)))
.collect(Collectors.toCollection(LinkedList::new));
} else {
expectedCurrentStreamConfigs = IntStream.range(1, 7).mapToObj(streamId -> new StreamConfig(
StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)),
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)))
.collect(Collectors.toCollection(LinkedList::new));
}
Assert.assertEquals(Sets.newHashSet(expectedCurrentStreamConfigs),
Sets.newHashSet(scheduler.currentStreamConfigMap().values()));
Assert.assertEquals(expectPendingStreamsForDeletion ? expectedPendingStreams: Sets.newHashSet(),