diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index 743be28a..e95ddb6f 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -224,6 +224,7 @@ public class Scheduler implements Runnable { this.formerStreamsLeasesDeletionStrategy = streamTracker.formerStreamsLeasesDeletionStrategy(); streamTracker.streamConfigList().forEach( sc -> currentStreamConfigMap.put(sc.streamIdentifier(), sc)); + log.info("Initial state: {}", currentStreamConfigMap.values()); this.maxInitializationAttempts = this.coordinatorConfig.maxInitializationAttempts(); this.metricsFactory = this.metricsConfig.metricsFactory(); @@ -449,18 +450,15 @@ public class Scheduler implements Runnable { final MetricsScope metricsScope = MetricsUtil.createMetricsWithOperation(metricsFactory, MULTI_STREAM_TRACKER); try { - final Map newStreamConfigMap = streamTracker.streamConfigList() - .stream().collect(Collectors.toMap(StreamConfig::streamIdentifier, Function.identity())); - - List leases; - // This is done to ensure that we clean up the stale streams lingering in the lease table. if (!leasesSyncedOnAppInit && isMultiStreamMode) { - leases = fetchMultiStreamLeases(); + final List leases = fetchMultiStreamLeases(); syncStreamsFromLeaseTableOnAppInit(leases); leasesSyncedOnAppInit = true; } + final Map newStreamConfigMap = streamTracker.streamConfigList() + .stream().collect(Collectors.toMap(StreamConfig::streamIdentifier, Function.identity())); // For new streams discovered, do a shard sync and update the currentStreamConfigMap for (StreamIdentifier streamIdentifier : newStreamConfigMap.keySet()) { if (!currentStreamConfigMap.containsKey(streamIdentifier)) { @@ -471,9 +469,7 @@ public class Scheduler implements Runnable { currentStreamConfigMap.put(streamIdentifier, streamConfig); streamsSynced.add(streamIdentifier); } else { - if (log.isDebugEnabled()) { - log.debug(streamIdentifier + " is already being processed - skipping shard sync."); - } + log.debug("{} is already being processed - skipping shard sync.", streamIdentifier); } } @@ -536,7 +532,7 @@ public class Scheduler implements Runnable { // StreamIdentifiers are eligible for deletion only when the deferment period has elapsed and // the streamIdentifiersForLeaseCleanup are not present in the latest snapshot. final Map> staleStreamIdDeletionDecisionMap = staleStreamDeletionMap.keySet().stream().collect(Collectors - .partitioningBy(streamIdentifier -> newStreamConfigMap.containsKey(streamIdentifier), Collectors.toSet())); + .partitioningBy(newStreamConfigMap::containsKey, Collectors.toSet())); final Set staleStreamIdsToBeDeleted = staleStreamIdDeletionDecisionMap.get(false).stream().filter(streamIdentifier -> Duration.between(staleStreamDeletionMap.get(streamIdentifier), Instant.now()).toMillis() >= waitPeriodToDeleteOldStreams.toMillis()).collect(Collectors.toSet()); final Set deletedStreamsLeases = deleteMultiStreamLeases(staleStreamIdsToBeDeleted); @@ -572,14 +568,14 @@ public class Scheduler implements Runnable { } @VisibleForTesting void syncStreamsFromLeaseTableOnAppInit(List leases) { - final Set streamIdentifiers = leases.stream() + leases.stream() .map(lease -> StreamIdentifier.multiStreamInstance(lease.streamIdentifier())) - .collect(Collectors.toSet()); - for (StreamIdentifier streamIdentifier : streamIdentifiers) { - if (!currentStreamConfigMap.containsKey(streamIdentifier)) { - currentStreamConfigMap.put(streamIdentifier, streamTracker.createStreamConfig(streamIdentifier)); - } - } + .filter(streamIdentifier -> !currentStreamConfigMap.containsKey(streamIdentifier)) + .forEach(streamIdentifier -> { + final StreamConfig streamConfig = streamTracker.createStreamConfig(streamIdentifier); + currentStreamConfigMap.put(streamIdentifier, streamConfig); + log.info("Cached {}", streamConfig); + }); } private List fetchMultiStreamLeases() @@ -897,6 +893,7 @@ public class Scheduler implements Runnable { StreamConfig streamConfig = currentStreamConfigMap.get(streamIdentifier); if (streamConfig == null) { streamConfig = streamTracker.createStreamConfig(streamIdentifier); + log.info("Created orphan {}", streamConfig); } Validate.notNull(streamConfig, "StreamConfig should not be null"); RecordsPublisher cache = retrievalConfig.retrievalFactory().createGetRecordsCache(shardInfo, streamConfig, metricsFactory); @@ -993,7 +990,7 @@ public class Scheduler implements Runnable { @NoArgsConstructor(access = AccessLevel.PRIVATE) private static class SchedulerLog { - private long reportIntervalMillis = TimeUnit.MINUTES.toMillis(1); + private final long reportIntervalMillis = TimeUnit.MINUTES.toMillis(1); private long nextReportTime = System.currentTimeMillis() + reportIntervalMillis; private boolean infoReporting; diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java index aa9f8412..90b63477 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java @@ -48,6 +48,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.RejectedExecutionException; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -317,7 +318,7 @@ public class SchedulerTest { } @Test - public final void testMultiStreamInitialization() throws ProvisionedThroughputException, DependencyException { + public final void testMultiStreamInitialization() { retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName) .retrievalFactory(retrievalFactory); leaseManagementConfig = new LeaseManagementConfig(tableName, dynamoDBClient, kinesisClient, @@ -325,9 +326,9 @@ public class SchedulerTest { scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, metricsConfig, processorConfig, retrievalConfig); scheduler.initialize(); - shardDetectorMap.values().stream() + shardDetectorMap.values() .forEach(shardDetector -> verify(shardDetector, times(1)).listShards()); - shardSyncTaskManagerMap.values().stream() + shardSyncTaskManagerMap.values() .forEach(shardSyncTM -> verify(shardSyncTM, times(1)).hierarchicalShardSyncer()); } @@ -343,17 +344,16 @@ public class SchedulerTest { // Note : As of today we retry for all streams in the next attempt. Hence the retry for each stream will vary. // At the least we expect 2 retries for each stream. Since there are 4 streams, we expect at most // the number of calls to be 5. - shardDetectorMap.values().stream() - .forEach(shardDetector -> verify(shardDetector, atLeast(2)).listShards()); - shardDetectorMap.values().stream() - .forEach(shardDetector -> verify(shardDetector, atMost(5)).listShards()); - shardSyncTaskManagerMap.values().stream() - .forEach(shardSyncTM -> verify(shardSyncTM, atLeast(2)).hierarchicalShardSyncer()); - shardSyncTaskManagerMap.values().stream() - .forEach(shardSyncTM -> verify(shardSyncTM, atMost(5)).hierarchicalShardSyncer()); + shardDetectorMap.values().forEach(shardDetector -> { + verify(shardDetector, atLeast(2)).listShards(); + verify(shardDetector, atMost(5)).listShards(); + }); + shardSyncTaskManagerMap.values().forEach(shardSyncTM -> { + verify(shardSyncTM, atLeast(2)).hierarchicalShardSyncer(); + verify(shardSyncTM, atMost(5)).hierarchicalShardSyncer(); + }); } - @Test public final void testMultiStreamConsumersAreBuiltOncePerAccountStreamShard() throws KinesisClientLibException { final String shardId = "shardId-000000000000"; @@ -385,13 +385,12 @@ public class SchedulerTest { schedulerSpy.runProcessLoop(); schedulerSpy.runProcessLoop(); - initialShardInfo.stream().forEach( + initialShardInfo.forEach( shardInfo -> verify(schedulerSpy).buildConsumer(same(shardInfo), eq(shardRecordProcessorFactory), same(leaseCleanupManager))); - firstShardInfo.stream().forEach( + firstShardInfo.forEach( shardInfo -> verify(schedulerSpy, never()).buildConsumer(same(shardInfo), eq(shardRecordProcessorFactory), eq(leaseCleanupManager))); - secondShardInfo.stream().forEach( + secondShardInfo.forEach( shardInfo -> verify(schedulerSpy, never()).buildConsumer(same(shardInfo), eq(shardRecordProcessorFactory), eq(leaseCleanupManager))); - } @Test @@ -415,7 +414,7 @@ public class SchedulerTest { when(scheduler.shouldSyncStreamsNow()).thenReturn(true); Set syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases(); Assert.assertTrue("SyncedStreams should be empty", syncedStreams.isEmpty()); - Assert.assertEquals(new HashSet(streamConfigList1), new HashSet(scheduler.currentStreamConfigMap().values())); + assertEquals(new HashSet<>(streamConfigList1), new HashSet<>(scheduler.currentStreamConfigMap().values())); } @Test @@ -447,8 +446,7 @@ public class SchedulerTest { } @Test - public final void testMultiStreamSyncFromTableDefaultInitPos() - throws DependencyException, ProvisionedThroughputException, InvalidStateException { + public final void testMultiStreamSyncFromTableDefaultInitPos() { // Streams in lease table but not tracked by multiStreamTracker List leasesInTable = IntStream.range(1, 3).mapToObj(streamId -> new MultiStreamLease() .streamIdentifier( @@ -474,13 +472,12 @@ public class SchedulerTest { metricsConfig, processorConfig, retrievalConfig); scheduler.syncStreamsFromLeaseTableOnAppInit(leasesInTable); Map expectedConfigMap = expectedConfig.stream().collect(Collectors.toMap( - sc -> sc.streamIdentifier(), sc -> sc)); + StreamConfig::streamIdentifier, Function.identity())); Assert.assertEquals(expectedConfigMap, scheduler.currentStreamConfigMap()); } @Test - public final void testMultiStreamSyncFromTableCustomInitPos() - throws DependencyException, ProvisionedThroughputException, InvalidStateException { + public final void testMultiStreamSyncFromTableCustomInitPos() { Date testTimeStamp = new Date(); // Streams in lease table but not tracked by multiStreamTracker @@ -725,7 +722,8 @@ public class SchedulerTest { testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediately(true, false); } - private final void testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediately(boolean expectPendingStreamsForDeletion, + private void testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediately( + boolean expectPendingStreamsForDeletion, boolean onlyStreamsNoLeasesDeletion) throws DependencyException, ProvisionedThroughputException, InvalidStateException { List streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig(