Add additional filter to avoid overriding MultiStreamTracker's input to KCL
This commit is contained in:
parent
7a36f486fc
commit
2575f628e1
4 changed files with 65 additions and 32 deletions
|
|
@ -28,7 +28,7 @@ public class DeletedStreamListProvider {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Method returns and empties the current set of streams
|
* Method returns and empties the current set of streams
|
||||||
* @return list of deleted Streams
|
* @return set of deleted Streams
|
||||||
*/
|
*/
|
||||||
public Set<StreamIdentifier> purgeAllDeletedStream() {
|
public Set<StreamIdentifier> purgeAllDeletedStream() {
|
||||||
final Set<StreamIdentifier> response = new HashSet<>(deletedStreams);
|
final Set<StreamIdentifier> response = new HashSet<>(deletedStreams);
|
||||||
|
|
|
||||||
|
|
@ -217,7 +217,6 @@ public class Scheduler implements Runnable {
|
||||||
@NonNull final ProcessorConfig processorConfig,
|
@NonNull final ProcessorConfig processorConfig,
|
||||||
@NonNull final RetrievalConfig retrievalConfig,
|
@NonNull final RetrievalConfig retrievalConfig,
|
||||||
@NonNull final DiagnosticEventFactory diagnosticEventFactory) {
|
@NonNull final DiagnosticEventFactory diagnosticEventFactory) {
|
||||||
log.info("Scheduler invoked for version 2.4.6, V1");
|
|
||||||
this.checkpointConfig = checkpointConfig;
|
this.checkpointConfig = checkpointConfig;
|
||||||
this.coordinatorConfig = coordinatorConfig;
|
this.coordinatorConfig = coordinatorConfig;
|
||||||
this.leaseManagementConfig = leaseManagementConfig;
|
this.leaseManagementConfig = leaseManagementConfig;
|
||||||
|
|
@ -563,7 +562,12 @@ public class Scheduler implements Runnable {
|
||||||
// These are the streams which are deleted in Kinesis and we encounter resource not found during
|
// These are the streams which are deleted in Kinesis and we encounter resource not found during
|
||||||
// shardSyncTask. This is applicable in MultiStreamMode only, in case of SingleStreamMode, store will
|
// shardSyncTask. This is applicable in MultiStreamMode only, in case of SingleStreamMode, store will
|
||||||
// not have any data.
|
// not have any data.
|
||||||
final Set<StreamIdentifier> deletedStreamSet = this.deletedStreamListProvider.purgeAllDeletedStream();
|
// Filter streams based on newStreamConfigMap so that we don't override input to KCL in any case.
|
||||||
|
final Set<StreamIdentifier> deletedStreamSet = this.deletedStreamListProvider
|
||||||
|
.purgeAllDeletedStream()
|
||||||
|
.stream()
|
||||||
|
.filter(streamIdentifier -> !newStreamConfigMap.containsKey(streamIdentifier))
|
||||||
|
.collect(Collectors.toSet());
|
||||||
if (deletedStreamSet.size() > 0) {
|
if (deletedStreamSet.size() > 0) {
|
||||||
log.info("Stale streams to delete: {}", deletedStreamSet);
|
log.info("Stale streams to delete: {}", deletedStreamSet);
|
||||||
staleStreamIdsToBeDeleted.addAll(deletedStreamSet);
|
staleStreamIdsToBeDeleted.addAll(deletedStreamSet);
|
||||||
|
|
|
||||||
|
|
@ -505,15 +505,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFactory, StreamConfig streamConfig) {
|
public ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFactory, StreamConfig streamConfig) {
|
||||||
return new ShardSyncTaskManager(this.createShardDetector(streamConfig),
|
return createShardSyncTaskManager(metricsFactory, streamConfig, null);
|
||||||
this.createLeaseRefresher(),
|
|
||||||
streamConfig.initialPositionInStreamExtended(),
|
|
||||||
cleanupLeasesUponShardCompletion,
|
|
||||||
ignoreUnexpectedChildShards,
|
|
||||||
shardSyncIntervalMillis,
|
|
||||||
executorService,
|
|
||||||
new HierarchicalShardSyncer(isMultiStreamMode, streamConfig.streamIdentifier().toString()),
|
|
||||||
metricsFactory);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -521,6 +513,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
|
||||||
*
|
*
|
||||||
* @param metricsFactory - factory to get metrics object
|
* @param metricsFactory - factory to get metrics object
|
||||||
* @param streamConfig - streamConfig for which ShardSyncTaskManager needs to be created
|
* @param streamConfig - streamConfig for which ShardSyncTaskManager needs to be created
|
||||||
|
* @param deletedStreamListProvider - store for capturing the streams which are deleted in kinesis
|
||||||
* @return ShardSyncTaskManager
|
* @return ShardSyncTaskManager
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
|
|
|
||||||
|
|
@ -783,37 +783,73 @@ public class SchedulerTest {
|
||||||
List<StreamConfig> streamConfigList1 = createDummyStreamConfigList(1,6);
|
List<StreamConfig> streamConfigList1 = createDummyStreamConfigList(1,6);
|
||||||
List<StreamConfig> streamConfigList2 = createDummyStreamConfigList(1,4);
|
List<StreamConfig> streamConfigList2 = createDummyStreamConfigList(1,4);
|
||||||
when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList1, streamConfigList2);
|
when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList1, streamConfigList2);
|
||||||
|
|
||||||
|
prepareForStaleDeletedStreamCleanupTests();
|
||||||
|
|
||||||
|
// when KCL starts it starts with tracking 5 stream
|
||||||
|
assertEquals(Sets.newHashSet(streamConfigList1), Sets.newHashSet(scheduler.currentStreamConfigMap().values()));
|
||||||
|
assertEquals(0, scheduler.staleStreamDeletionMap().size());
|
||||||
|
|
||||||
|
// 2 Streams are no longer needed to be consumed
|
||||||
|
Set<StreamIdentifier> syncedStreams1 = scheduler.checkAndSyncStreamShardsAndLeases();
|
||||||
|
assertEquals(Sets.newHashSet(streamConfigList1), Sets.newHashSet(scheduler.currentStreamConfigMap().values()));
|
||||||
|
assertEquals(createDummyStreamConfigList(4, 6).stream()
|
||||||
|
.map(StreamConfig::streamIdentifier)
|
||||||
|
.collect(Collectors.toSet()), scheduler.staleStreamDeletionMap()
|
||||||
|
.keySet());
|
||||||
|
assertEquals(0, syncedStreams1.size());
|
||||||
|
|
||||||
|
StreamConfig deletedStreamConfig = createDummyStreamConfig(5);
|
||||||
|
// One stream is deleted from Kinesis side
|
||||||
|
scheduler.deletedStreamListProvider().add(deletedStreamConfig.streamIdentifier());
|
||||||
|
|
||||||
|
Set<StreamIdentifier> syncedStreams2 = scheduler.checkAndSyncStreamShardsAndLeases();
|
||||||
|
|
||||||
|
Set<StreamConfig> expectedCurrentStreamConfigs = Sets.newHashSet(streamConfigList1);
|
||||||
|
expectedCurrentStreamConfigs.remove(deletedStreamConfig);
|
||||||
|
|
||||||
|
//assert kinesis deleted stream is cleaned up from KCL in memory state.
|
||||||
|
assertEquals(expectedCurrentStreamConfigs, Sets.newHashSet(scheduler.currentStreamConfigMap().values()));
|
||||||
|
assertEquals(Sets.newHashSet(createDummyStreamConfig(4).streamIdentifier()),
|
||||||
|
Sets.newHashSet(scheduler.staleStreamDeletionMap().keySet()));
|
||||||
|
assertEquals(1, syncedStreams2.size());
|
||||||
|
assertEquals(0, scheduler.deletedStreamListProvider().purgeAllDeletedStream().size());
|
||||||
|
|
||||||
|
verify(multiStreamTracker, times(3)).streamConfigList();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private void prepareForStaleDeletedStreamCleanupTests() {
|
||||||
|
|
||||||
when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new AutoDetectionAndDeferredDeletionStrategy() {
|
when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new AutoDetectionAndDeferredDeletionStrategy() {
|
||||||
@Override public Duration waitPeriodToDeleteFormerStreams() {
|
@Override public Duration waitPeriodToDeleteFormerStreams() {
|
||||||
return Duration.ofDays(1);
|
return Duration.ofDays(1);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName)
|
retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName)
|
||||||
.retrievalFactory(retrievalFactory);
|
.retrievalFactory(retrievalFactory);
|
||||||
scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
|
scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
|
||||||
metricsConfig, processorConfig, retrievalConfig));
|
metricsConfig, processorConfig, retrievalConfig));
|
||||||
when(scheduler.shouldSyncStreamsNow()).thenReturn(true);
|
when(scheduler.shouldSyncStreamsNow()).thenReturn(true);
|
||||||
|
|
||||||
// when KCL starts it starts with tracking 5 stream
|
|
||||||
assertEquals(5, scheduler.currentStreamConfigMap().size());
|
|
||||||
assertEquals(0, scheduler.staleStreamDeletionMap().size());
|
|
||||||
|
|
||||||
// 2 Streams are no longer needed to be consumed
|
|
||||||
Set<StreamIdentifier> syncedStreams2 = scheduler.checkAndSyncStreamShardsAndLeases();
|
|
||||||
assertEquals(5, scheduler.currentStreamConfigMap().size());
|
|
||||||
assertEquals(2, scheduler.staleStreamDeletionMap().size());
|
|
||||||
|
|
||||||
// One the stream is deleted from Kinesis side
|
|
||||||
scheduler.deletedStreamListProvider().add(createDummyStreamConfig(5).streamIdentifier());
|
|
||||||
|
|
||||||
Set<StreamIdentifier> syncedStreams3 = scheduler.checkAndSyncStreamShardsAndLeases();
|
|
||||||
//assert kinesis deleted stream is cleaned up from KCL in memory state.
|
|
||||||
assertEquals(4, scheduler.currentStreamConfigMap().size());
|
|
||||||
assertEquals(1, scheduler.staleStreamDeletionMap().size());
|
|
||||||
|
|
||||||
verify(multiStreamTracker, times(3)).streamConfigList();
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
// Tests validate that no cleanup of stream is done if its still tracked in multiStreamTracker
|
||||||
|
@Test
|
||||||
|
public void testKinesisStaleDeletedStreamNoCleanUpForTrackedStream()
|
||||||
|
throws ProvisionedThroughputException, InvalidStateException, DependencyException {
|
||||||
|
List<StreamConfig> streamConfigList1 = createDummyStreamConfigList(1,6);
|
||||||
|
when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList1);
|
||||||
|
prepareForStaleDeletedStreamCleanupTests();
|
||||||
|
|
||||||
|
scheduler.deletedStreamListProvider().add(createDummyStreamConfig(3).streamIdentifier());
|
||||||
|
|
||||||
|
Set<StreamIdentifier> syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases();
|
||||||
|
|
||||||
|
assertEquals(0, syncedStreams.size());
|
||||||
|
assertEquals(0, scheduler.staleStreamDeletionMap().size());
|
||||||
|
assertEquals(Sets.newHashSet(streamConfigList1), Sets.newHashSet(scheduler.currentStreamConfigMap().values()));
|
||||||
|
}
|
||||||
|
|
||||||
//Creates list of upperBound-lowerBound no of dummy StreamConfig
|
//Creates list of upperBound-lowerBound no of dummy StreamConfig
|
||||||
private List<StreamConfig> createDummyStreamConfigList(int lowerBound, int upperBound) {
|
private List<StreamConfig> createDummyStreamConfigList(int lowerBound, int upperBound) {
|
||||||
return IntStream.range(lowerBound, upperBound).mapToObj(this::createDummyStreamConfig)
|
return IntStream.range(lowerBound, upperBound).mapToObj(this::createDummyStreamConfig)
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue