Added more logging in Scheduler w.r.t. StreamConfigs. (#1057)

This commit is contained in:
stair 2023-03-06 17:05:34 -05:00 committed by GitHub
parent cd80c93966
commit 87dc586e6d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 36 additions and 41 deletions

View file

@ -224,6 +224,7 @@ public class Scheduler implements Runnable {
this.formerStreamsLeasesDeletionStrategy = streamTracker.formerStreamsLeasesDeletionStrategy(); this.formerStreamsLeasesDeletionStrategy = streamTracker.formerStreamsLeasesDeletionStrategy();
streamTracker.streamConfigList().forEach( streamTracker.streamConfigList().forEach(
sc -> currentStreamConfigMap.put(sc.streamIdentifier(), sc)); sc -> currentStreamConfigMap.put(sc.streamIdentifier(), sc));
log.info("Initial state: {}", currentStreamConfigMap.values());
this.maxInitializationAttempts = this.coordinatorConfig.maxInitializationAttempts(); this.maxInitializationAttempts = this.coordinatorConfig.maxInitializationAttempts();
this.metricsFactory = this.metricsConfig.metricsFactory(); this.metricsFactory = this.metricsConfig.metricsFactory();
@ -449,18 +450,15 @@ public class Scheduler implements Runnable {
final MetricsScope metricsScope = MetricsUtil.createMetricsWithOperation(metricsFactory, MULTI_STREAM_TRACKER); final MetricsScope metricsScope = MetricsUtil.createMetricsWithOperation(metricsFactory, MULTI_STREAM_TRACKER);
try { try {
final Map<StreamIdentifier, StreamConfig> newStreamConfigMap = streamTracker.streamConfigList()
.stream().collect(Collectors.toMap(StreamConfig::streamIdentifier, Function.identity()));
List<MultiStreamLease> leases;
// This is done to ensure that we clean up the stale streams lingering in the lease table. // This is done to ensure that we clean up the stale streams lingering in the lease table.
if (!leasesSyncedOnAppInit && isMultiStreamMode) { if (!leasesSyncedOnAppInit && isMultiStreamMode) {
leases = fetchMultiStreamLeases(); final List<MultiStreamLease> leases = fetchMultiStreamLeases();
syncStreamsFromLeaseTableOnAppInit(leases); syncStreamsFromLeaseTableOnAppInit(leases);
leasesSyncedOnAppInit = true; leasesSyncedOnAppInit = true;
} }
final Map<StreamIdentifier, StreamConfig> newStreamConfigMap = streamTracker.streamConfigList()
.stream().collect(Collectors.toMap(StreamConfig::streamIdentifier, Function.identity()));
// For new streams discovered, do a shard sync and update the currentStreamConfigMap // For new streams discovered, do a shard sync and update the currentStreamConfigMap
for (StreamIdentifier streamIdentifier : newStreamConfigMap.keySet()) { for (StreamIdentifier streamIdentifier : newStreamConfigMap.keySet()) {
if (!currentStreamConfigMap.containsKey(streamIdentifier)) { if (!currentStreamConfigMap.containsKey(streamIdentifier)) {
@ -471,9 +469,7 @@ public class Scheduler implements Runnable {
currentStreamConfigMap.put(streamIdentifier, streamConfig); currentStreamConfigMap.put(streamIdentifier, streamConfig);
streamsSynced.add(streamIdentifier); streamsSynced.add(streamIdentifier);
} else { } else {
if (log.isDebugEnabled()) { log.debug("{} is already being processed - skipping shard sync.", streamIdentifier);
log.debug(streamIdentifier + " is already being processed - skipping shard sync.");
}
} }
} }
@ -536,7 +532,7 @@ public class Scheduler implements Runnable {
// StreamIdentifiers are eligible for deletion only when the deferment period has elapsed and // StreamIdentifiers are eligible for deletion only when the deferment period has elapsed and
// the streamIdentifiersForLeaseCleanup are not present in the latest snapshot. // the streamIdentifiersForLeaseCleanup are not present in the latest snapshot.
final Map<Boolean, Set<StreamIdentifier>> staleStreamIdDeletionDecisionMap = staleStreamDeletionMap.keySet().stream().collect(Collectors final Map<Boolean, Set<StreamIdentifier>> staleStreamIdDeletionDecisionMap = staleStreamDeletionMap.keySet().stream().collect(Collectors
.partitioningBy(streamIdentifier -> newStreamConfigMap.containsKey(streamIdentifier), Collectors.toSet())); .partitioningBy(newStreamConfigMap::containsKey, Collectors.toSet()));
final Set<StreamIdentifier> staleStreamIdsToBeDeleted = staleStreamIdDeletionDecisionMap.get(false).stream().filter(streamIdentifier -> final Set<StreamIdentifier> staleStreamIdsToBeDeleted = staleStreamIdDeletionDecisionMap.get(false).stream().filter(streamIdentifier ->
Duration.between(staleStreamDeletionMap.get(streamIdentifier), Instant.now()).toMillis() >= waitPeriodToDeleteOldStreams.toMillis()).collect(Collectors.toSet()); Duration.between(staleStreamDeletionMap.get(streamIdentifier), Instant.now()).toMillis() >= waitPeriodToDeleteOldStreams.toMillis()).collect(Collectors.toSet());
final Set<StreamIdentifier> deletedStreamsLeases = deleteMultiStreamLeases(staleStreamIdsToBeDeleted); final Set<StreamIdentifier> deletedStreamsLeases = deleteMultiStreamLeases(staleStreamIdsToBeDeleted);
@ -572,14 +568,14 @@ public class Scheduler implements Runnable {
} }
@VisibleForTesting void syncStreamsFromLeaseTableOnAppInit(List<MultiStreamLease> leases) { @VisibleForTesting void syncStreamsFromLeaseTableOnAppInit(List<MultiStreamLease> leases) {
final Set<StreamIdentifier> streamIdentifiers = leases.stream() leases.stream()
.map(lease -> StreamIdentifier.multiStreamInstance(lease.streamIdentifier())) .map(lease -> StreamIdentifier.multiStreamInstance(lease.streamIdentifier()))
.collect(Collectors.toSet()); .filter(streamIdentifier -> !currentStreamConfigMap.containsKey(streamIdentifier))
for (StreamIdentifier streamIdentifier : streamIdentifiers) { .forEach(streamIdentifier -> {
if (!currentStreamConfigMap.containsKey(streamIdentifier)) { final StreamConfig streamConfig = streamTracker.createStreamConfig(streamIdentifier);
currentStreamConfigMap.put(streamIdentifier, streamTracker.createStreamConfig(streamIdentifier)); currentStreamConfigMap.put(streamIdentifier, streamConfig);
} log.info("Cached {}", streamConfig);
} });
} }
private List<MultiStreamLease> fetchMultiStreamLeases() private List<MultiStreamLease> fetchMultiStreamLeases()
@ -897,6 +893,7 @@ public class Scheduler implements Runnable {
StreamConfig streamConfig = currentStreamConfigMap.get(streamIdentifier); StreamConfig streamConfig = currentStreamConfigMap.get(streamIdentifier);
if (streamConfig == null) { if (streamConfig == null) {
streamConfig = streamTracker.createStreamConfig(streamIdentifier); streamConfig = streamTracker.createStreamConfig(streamIdentifier);
log.info("Created orphan {}", streamConfig);
} }
Validate.notNull(streamConfig, "StreamConfig should not be null"); Validate.notNull(streamConfig, "StreamConfig should not be null");
RecordsPublisher cache = retrievalConfig.retrievalFactory().createGetRecordsCache(shardInfo, streamConfig, metricsFactory); RecordsPublisher cache = retrievalConfig.retrievalFactory().createGetRecordsCache(shardInfo, streamConfig, metricsFactory);
@ -993,7 +990,7 @@ public class Scheduler implements Runnable {
@NoArgsConstructor(access = AccessLevel.PRIVATE) @NoArgsConstructor(access = AccessLevel.PRIVATE)
private static class SchedulerLog { private static class SchedulerLog {
private long reportIntervalMillis = TimeUnit.MINUTES.toMillis(1); private final long reportIntervalMillis = TimeUnit.MINUTES.toMillis(1);
private long nextReportTime = System.currentTimeMillis() + reportIntervalMillis; private long nextReportTime = System.currentTimeMillis() + reportIntervalMillis;
private boolean infoReporting; private boolean infoReporting;

View file

@ -48,6 +48,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.IntStream; import java.util.stream.IntStream;
@ -317,7 +318,7 @@ public class SchedulerTest {
} }
@Test @Test
public final void testMultiStreamInitialization() throws ProvisionedThroughputException, DependencyException { public final void testMultiStreamInitialization() {
retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName) retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName)
.retrievalFactory(retrievalFactory); .retrievalFactory(retrievalFactory);
leaseManagementConfig = new LeaseManagementConfig(tableName, dynamoDBClient, kinesisClient, leaseManagementConfig = new LeaseManagementConfig(tableName, dynamoDBClient, kinesisClient,
@ -325,9 +326,9 @@ public class SchedulerTest {
scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
metricsConfig, processorConfig, retrievalConfig); metricsConfig, processorConfig, retrievalConfig);
scheduler.initialize(); scheduler.initialize();
shardDetectorMap.values().stream() shardDetectorMap.values()
.forEach(shardDetector -> verify(shardDetector, times(1)).listShards()); .forEach(shardDetector -> verify(shardDetector, times(1)).listShards());
shardSyncTaskManagerMap.values().stream() shardSyncTaskManagerMap.values()
.forEach(shardSyncTM -> verify(shardSyncTM, times(1)).hierarchicalShardSyncer()); .forEach(shardSyncTM -> verify(shardSyncTM, times(1)).hierarchicalShardSyncer());
} }
@ -343,17 +344,16 @@ public class SchedulerTest {
// Note : As of today we retry for all streams in the next attempt. Hence the retry for each stream will vary. // Note : As of today we retry for all streams in the next attempt. Hence the retry for each stream will vary.
// At the least we expect 2 retries for each stream. Since there are 4 streams, we expect at most // At the least we expect 2 retries for each stream. Since there are 4 streams, we expect at most
// the number of calls to be 5. // the number of calls to be 5.
shardDetectorMap.values().stream() shardDetectorMap.values().forEach(shardDetector -> {
.forEach(shardDetector -> verify(shardDetector, atLeast(2)).listShards()); verify(shardDetector, atLeast(2)).listShards();
shardDetectorMap.values().stream() verify(shardDetector, atMost(5)).listShards();
.forEach(shardDetector -> verify(shardDetector, atMost(5)).listShards()); });
shardSyncTaskManagerMap.values().stream() shardSyncTaskManagerMap.values().forEach(shardSyncTM -> {
.forEach(shardSyncTM -> verify(shardSyncTM, atLeast(2)).hierarchicalShardSyncer()); verify(shardSyncTM, atLeast(2)).hierarchicalShardSyncer();
shardSyncTaskManagerMap.values().stream() verify(shardSyncTM, atMost(5)).hierarchicalShardSyncer();
.forEach(shardSyncTM -> verify(shardSyncTM, atMost(5)).hierarchicalShardSyncer()); });
} }
@Test @Test
public final void testMultiStreamConsumersAreBuiltOncePerAccountStreamShard() throws KinesisClientLibException { public final void testMultiStreamConsumersAreBuiltOncePerAccountStreamShard() throws KinesisClientLibException {
final String shardId = "shardId-000000000000"; final String shardId = "shardId-000000000000";
@ -385,13 +385,12 @@ public class SchedulerTest {
schedulerSpy.runProcessLoop(); schedulerSpy.runProcessLoop();
schedulerSpy.runProcessLoop(); schedulerSpy.runProcessLoop();
initialShardInfo.stream().forEach( initialShardInfo.forEach(
shardInfo -> verify(schedulerSpy).buildConsumer(same(shardInfo), eq(shardRecordProcessorFactory), same(leaseCleanupManager))); shardInfo -> verify(schedulerSpy).buildConsumer(same(shardInfo), eq(shardRecordProcessorFactory), same(leaseCleanupManager)));
firstShardInfo.stream().forEach( firstShardInfo.forEach(
shardInfo -> verify(schedulerSpy, never()).buildConsumer(same(shardInfo), eq(shardRecordProcessorFactory), eq(leaseCleanupManager))); shardInfo -> verify(schedulerSpy, never()).buildConsumer(same(shardInfo), eq(shardRecordProcessorFactory), eq(leaseCleanupManager)));
secondShardInfo.stream().forEach( secondShardInfo.forEach(
shardInfo -> verify(schedulerSpy, never()).buildConsumer(same(shardInfo), eq(shardRecordProcessorFactory), eq(leaseCleanupManager))); shardInfo -> verify(schedulerSpy, never()).buildConsumer(same(shardInfo), eq(shardRecordProcessorFactory), eq(leaseCleanupManager)));
} }
@Test @Test
@ -415,7 +414,7 @@ public class SchedulerTest {
when(scheduler.shouldSyncStreamsNow()).thenReturn(true); when(scheduler.shouldSyncStreamsNow()).thenReturn(true);
Set<StreamIdentifier> syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases(); Set<StreamIdentifier> syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases();
Assert.assertTrue("SyncedStreams should be empty", syncedStreams.isEmpty()); Assert.assertTrue("SyncedStreams should be empty", syncedStreams.isEmpty());
Assert.assertEquals(new HashSet(streamConfigList1), new HashSet(scheduler.currentStreamConfigMap().values())); assertEquals(new HashSet<>(streamConfigList1), new HashSet<>(scheduler.currentStreamConfigMap().values()));
} }
@Test @Test
@ -447,8 +446,7 @@ public class SchedulerTest {
} }
@Test @Test
public final void testMultiStreamSyncFromTableDefaultInitPos() public final void testMultiStreamSyncFromTableDefaultInitPos() {
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
// Streams in lease table but not tracked by multiStreamTracker // Streams in lease table but not tracked by multiStreamTracker
List<MultiStreamLease> leasesInTable = IntStream.range(1, 3).mapToObj(streamId -> new MultiStreamLease() List<MultiStreamLease> leasesInTable = IntStream.range(1, 3).mapToObj(streamId -> new MultiStreamLease()
.streamIdentifier( .streamIdentifier(
@ -474,13 +472,12 @@ public class SchedulerTest {
metricsConfig, processorConfig, retrievalConfig); metricsConfig, processorConfig, retrievalConfig);
scheduler.syncStreamsFromLeaseTableOnAppInit(leasesInTable); scheduler.syncStreamsFromLeaseTableOnAppInit(leasesInTable);
Map<StreamIdentifier, StreamConfig> expectedConfigMap = expectedConfig.stream().collect(Collectors.toMap( Map<StreamIdentifier, StreamConfig> expectedConfigMap = expectedConfig.stream().collect(Collectors.toMap(
sc -> sc.streamIdentifier(), sc -> sc)); StreamConfig::streamIdentifier, Function.identity()));
Assert.assertEquals(expectedConfigMap, scheduler.currentStreamConfigMap()); Assert.assertEquals(expectedConfigMap, scheduler.currentStreamConfigMap());
} }
@Test @Test
public final void testMultiStreamSyncFromTableCustomInitPos() public final void testMultiStreamSyncFromTableCustomInitPos() {
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
Date testTimeStamp = new Date(); Date testTimeStamp = new Date();
// Streams in lease table but not tracked by multiStreamTracker // Streams in lease table but not tracked by multiStreamTracker
@ -725,7 +722,8 @@ public class SchedulerTest {
testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediately(true, false); testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediately(true, false);
} }
private final void testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediately(boolean expectPendingStreamsForDeletion, private void testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediately(
boolean expectPendingStreamsForDeletion,
boolean onlyStreamsNoLeasesDeletion) boolean onlyStreamsNoLeasesDeletion)
throws DependencyException, ProvisionedThroughputException, InvalidStateException { throws DependencyException, ProvisionedThroughputException, InvalidStateException {
List<StreamConfig> streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig( List<StreamConfig> streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig(