From c00c943a79de68abcf630bc83afa07dfa0d44879 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Fri, 20 Mar 2020 17:19:04 -0700 Subject: [PATCH 1/4] MultiStream Sync and logging changes --- .../kinesis/common/DiagnosticUtils.java | 8 +- .../amazon/kinesis/common/StreamConfig.java | 1 + .../amazon/kinesis/coordinator/Scheduler.java | 133 +++++++++++++++--- .../leases/HierarchicalShardSyncer.java | 65 +++++---- .../lifecycle/BlockOnParentShardTask.java | 13 +- .../amazon/kinesis/lifecycle/ProcessTask.java | 27 ++-- .../kinesis/lifecycle/ShardConsumer.java | 22 +-- .../lifecycle/ShardConsumerSubscriber.java | 16 ++- .../kinesis/lifecycle/ShutdownTask.java | 14 +- .../kinesis/retrieval/RetrievalConfig.java | 3 +- .../fanout/FanOutRecordsPublisher.java | 126 +++++++++-------- .../retrieval/polling/KinesisDataFetcher.java | 14 +- .../polling/PrefetchRecordsPublisher.java | 41 +++--- .../leases/HierarchicalShardSyncerTest.java | 39 +++++ 14 files changed, 352 insertions(+), 170 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/DiagnosticUtils.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/DiagnosticUtils.java index db0393e1..37eea1a3 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/DiagnosticUtils.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/DiagnosticUtils.java @@ -28,11 +28,11 @@ public class DiagnosticUtils { /** * Util for RecordPublisher to measure the event delivery latency of the executor service and take appropriate action. - * @param shardId of the shard that is having delayed delivery + * @param resourceIdentifier of the shard that is having delayed delivery * @param enqueueTimestamp of the event submitted to the executor service * @param log Slf4j Logger from RecordPublisher to log the events */ - public static void takeDelayedDeliveryActionIfRequired(String shardId, Instant enqueueTimestamp, Logger log) { + public static void takeDelayedDeliveryActionIfRequired(String resourceIdentifier, Instant enqueueTimestamp, Logger log) { final long durationBetweenEnqueueAndAckInMillis = Duration .between(enqueueTimestamp, Instant.now()).toMillis(); if (durationBetweenEnqueueAndAckInMillis > MAX_TIME_BETWEEN_REQUEST_RESPONSE / 3) { @@ -41,9 +41,9 @@ public class DiagnosticUtils { "{}: Record delivery time to shard consumer is high at {} millis. Check the ExecutorStateEvent logs" + " to see the state of the executor service. Also check if the RecordProcessor's processing " + "time is high. ", - shardId, durationBetweenEnqueueAndAckInMillis); + resourceIdentifier, durationBetweenEnqueueAndAckInMillis); } else if (log.isDebugEnabled()) { - log.debug("{}: Record delivery time to shard consumer is {} millis", shardId, + log.debug("{}: Record delivery time to shard consumer is {} millis", resourceIdentifier, durationBetweenEnqueueAndAckInMillis); } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamConfig.java index 999182b6..3cf0eeb2 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamConfig.java @@ -6,6 +6,7 @@ import lombok.experimental.Accessors; @Value @Accessors(fluent = true) public class StreamConfig { + // TODO: Consider having streamIdentifier as the unique identifier of this class. StreamIdentifier streamIdentifier; InitialPositionInStreamExtended initialPositionInStreamExtended; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index 48467dfb..c8354eff 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -19,6 +19,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; @@ -35,6 +36,7 @@ import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Stopwatch; import io.reactivex.plugins.RxJavaPlugins; import lombok.AccessLevel; import lombok.Getter; @@ -45,6 +47,8 @@ import lombok.extern.slf4j.Slf4j; import software.amazon.awssdk.utils.Validate; import software.amazon.kinesis.checkpoint.CheckpointConfig; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; +import software.amazon.kinesis.common.InitialPositionInStream; +import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.StreamConfig; import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.leases.Lease; @@ -52,6 +56,7 @@ import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseManagementConfig; import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.LeaseSerializer; +import software.amazon.kinesis.leases.MultiStreamLease; import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.ShardPrioritization; @@ -61,7 +66,10 @@ import software.amazon.kinesis.leases.HierarchicalShardSyncer; import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseCoordinator; import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseSerializer; import software.amazon.kinesis.leases.dynamodb.DynamoDBMultiStreamLeaseSerializer; +import software.amazon.kinesis.leases.exceptions.DependencyException; +import software.amazon.kinesis.leases.exceptions.InvalidStateException; import software.amazon.kinesis.leases.exceptions.LeasingException; +import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; import software.amazon.kinesis.lifecycle.LifecycleConfig; import software.amazon.kinesis.lifecycle.ShardConsumer; import software.amazon.kinesis.lifecycle.ShardConsumerArgument; @@ -74,6 +82,7 @@ import software.amazon.kinesis.metrics.MetricsCollectingTaskDecorator; import software.amazon.kinesis.metrics.MetricsConfig; import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.processor.Checkpointer; +import software.amazon.kinesis.processor.MultiStreamTracker; import software.amazon.kinesis.processor.ProcessorConfig; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; import software.amazon.kinesis.processor.ShutdownNotificationAware; @@ -89,6 +98,8 @@ import software.amazon.kinesis.retrieval.RetrievalConfig; @Slf4j public class Scheduler implements Runnable { + private static final long NEW_STREAM_CHECK_INTERVAL_MILLIS = 10000L; + private SchedulerLog slog = new SchedulerLog(); private final CheckpointConfig checkpointConfig; @@ -111,8 +122,8 @@ public class Scheduler implements Runnable { private final DiagnosticEventHandler diagnosticEventHandler; // private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; private final LeaseCoordinator leaseCoordinator; - private final Function shardSyncTaskManagerProvider; - private final Map streamToShardSyncTaskManagerMap = new HashMap<>(); + private final Function shardSyncTaskManagerProvider; + private final Map streamToShardSyncTaskManagerMap = new HashMap<>(); private final ShardPrioritization shardPrioritization; private final boolean cleanupLeasesUponShardCompletion; private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist; @@ -122,11 +133,13 @@ public class Scheduler implements Runnable { private final long failoverTimeMillis; private final long taskBackoffTimeMillis; private final boolean isMultiStreamMode; + // TODO : halo : make sure we generate streamConfig if entry not present. private final Map currentStreamConfigMap; + private final MultiStreamTracker multiStreamTracker; private final long listShardsBackoffTimeMillis; private final int maxListShardsRetryAttempts; private final LeaseRefresher leaseRefresher; - private final Function shardDetectorProvider; + private final Function shardDetectorProvider; private final boolean ignoreUnexpetedChildShards; private final AggregatorUtil aggregatorUtil; private final HierarchicalShardSyncer hierarchicalShardSyncer; @@ -142,6 +155,9 @@ public class Scheduler implements Runnable { private final Object lock = new Object(); + private Stopwatch streamSyncWatch = Stopwatch.createUnstarted(); + private boolean leasesSyncedOnAppInit = false; + /** * Used to ensure that only one requestedShutdown is in progress at a time. */ @@ -190,6 +206,9 @@ public class Scheduler implements Runnable { .collect(Collectors.toMap(sc -> sc.streamIdentifier(), sc -> sc)), streamConfig -> Collections.singletonMap(streamConfig.streamIdentifier(), streamConfig)); + this.multiStreamTracker = this.retrievalConfig.appStreamTracker().map( + multiStreamTracker -> multiStreamTracker, + streamConfig -> null); this.maxInitializationAttempts = this.coordinatorConfig.maxInitializationAttempts(); this.metricsFactory = this.metricsConfig.metricsFactory(); // Determine leaseSerializer based on availability of MultiStreamTracker. @@ -217,9 +236,9 @@ public class Scheduler implements Runnable { this.diagnosticEventHandler = new DiagnosticEventLogger(); // TODO : Halo : Handle case of no StreamConfig present in streamConfigList() for the supplied streamName. // TODO : Pass the immutable map here instead of using mst.streamConfigList() - this.shardSyncTaskManagerProvider = streamIdentifier -> this.leaseManagementConfig + this.shardSyncTaskManagerProvider = streamConfig -> this.leaseManagementConfig .leaseManagementFactory(leaseSerializer, isMultiStreamMode) - .createShardSyncTaskManager(this.metricsFactory, this.currentStreamConfigMap.get(streamIdentifier)); + .createShardSyncTaskManager(this.metricsFactory, streamConfig); this.shardPrioritization = this.coordinatorConfig.shardPrioritization(); this.cleanupLeasesUponShardCompletion = this.leaseManagementConfig.cleanupLeasesUponShardCompletion(); this.skipShardSyncAtWorkerInitializationIfLeasesExist = @@ -242,7 +261,7 @@ public class Scheduler implements Runnable { // this.maxGetRecordsThreadPool = this.retrievalConfig.maxGetRecordsThreadPool(); this.listShardsBackoffTimeMillis = this.retrievalConfig.listShardsBackoffTimeInMillis(); this.maxListShardsRetryAttempts = this.retrievalConfig.maxListShardsRetryAttempts(); - this.shardDetectorProvider = streamIdentifier -> createOrGetShardSyncTaskManager(streamIdentifier).shardDetector(); + this.shardDetectorProvider = streamConfig -> createOrGetShardSyncTaskManager(streamConfig).shardDetector(); this.ignoreUnexpetedChildShards = this.leaseManagementConfig.ignoreUnexpectedChildShards(); this.aggregatorUtil = this.lifecycleConfig.aggregatorUtil(); // TODO : Halo : Check if this needs to be per stream. @@ -298,7 +317,7 @@ public class Scheduler implements Runnable { final StreamIdentifier streamIdentifier = streamConfigEntry.getKey(); log.info("Syncing Kinesis shard info for " + streamIdentifier); final StreamConfig streamConfig = streamConfigEntry.getValue(); - ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetectorProvider.apply(streamIdentifier), + ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetectorProvider.apply(streamConfig), leaseRefresher, streamConfig.initialPositionInStreamExtended(), cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L, hierarchicalShardSyncer, metricsFactory); @@ -321,6 +340,7 @@ public class Scheduler implements Runnable { } else { log.info("LeaseCoordinator is already running. No need to start it."); } + streamSyncWatch.start(); isDone = true; } catch (LeasingException e) { log.error("Caught exception when initializing LeaseCoordinator", e); @@ -364,14 +384,20 @@ public class Scheduler implements Runnable { for (ShardInfo completedShard : completedShards) { final StreamIdentifier streamIdentifier = getStreamIdentifier(completedShard.streamIdentifierSerOpt()); - if (createOrGetShardSyncTaskManager(streamIdentifier).syncShardAndLeaseInfo()) { - log.info("Found completed shard, initiated new ShardSyncTak for " + completedShard.toString()); + final StreamConfig streamConfig = currentStreamConfigMap + .getOrDefault(streamIdentifier, getDefaultStreamConfig(streamIdentifier)); + if (createOrGetShardSyncTaskManager(streamConfig).syncShardAndLeaseInfo()) { + log.info("{} : Found completed shard, initiated new ShardSyncTak for {} ", + streamIdentifier.serialize(), completedShard.toString()); } } // clean up shard consumers for unassigned shards cleanupShardConsumers(assignedShards); + // check for new streams and sync with the scheduler state + checkAndSyncStreamShardsAndLeases(); + logExecutorState(); slog.info("Sleeping ..."); Thread.sleep(shardConsumerDispatchPollIntervalMillis); @@ -387,6 +413,77 @@ public class Scheduler implements Runnable { slog.resetInfoLogging(); } + + /** + * Note: This method has package level access solely for testing purposes. + * Sync all streams method. + * @return streams that are being synced by this worker + */ + private Set checkAndSyncStreamShardsAndLeases() + throws DependencyException, ProvisionedThroughputException, InvalidStateException { + final Set streamsSynced = new HashSet<>(); + + if (isMultiStreamMode && (streamSyncWatch.elapsed(TimeUnit.MILLISECONDS) > NEW_STREAM_CHECK_INTERVAL_MILLIS)) { + final Map newStreamConfigMap = new HashMap<>(); + // Making an immutable copy + newStreamConfigMap.putAll(multiStreamTracker.streamConfigList().stream() + .collect(Collectors.toMap(sc -> sc.streamIdentifier(), sc -> sc))); + + // This is done to ensure that we clean up the stale streams lingering in the lease table. + syncStreamsFromLeaseTableOnAppInit(); + + for (StreamIdentifier streamIdentifier : newStreamConfigMap.keySet()) { + if (!currentStreamConfigMap.containsKey(streamIdentifier)) { + log.info("Found new stream to process: " + streamIdentifier + ". Syncing shards of that stream."); + ShardSyncTaskManager shardSyncTaskManager = createOrGetShardSyncTaskManager(newStreamConfigMap.get(streamIdentifier)); + shardSyncTaskManager.syncShardAndLeaseInfo(); + currentStreamConfigMap.put(streamIdentifier, newStreamConfigMap.get(streamIdentifier)); + streamsSynced.add(streamIdentifier); + } else { + if (log.isDebugEnabled()) { + log.debug(streamIdentifier + " is already being processed - skipping shard sync."); + } + } + } + + // TODO: Remove assumption that each Worker gets the full list of streams + Iterator currentStreamConfigIter = currentStreamConfigMap.keySet().iterator(); + while (currentStreamConfigIter.hasNext()) { + StreamIdentifier streamIdentifier = currentStreamConfigIter.next(); + if (!newStreamConfigMap.containsKey(streamIdentifier)) { + log.info("Found old/deleted stream: " + streamIdentifier + ". Syncing shards of that stream."); + ShardSyncTaskManager shardSyncTaskManager = createOrGetShardSyncTaskManager(currentStreamConfigMap.get(streamIdentifier)); + shardSyncTaskManager.syncShardAndLeaseInfo(); + currentStreamConfigIter.remove(); + streamsSynced.add(streamIdentifier); + } + } + streamSyncWatch.reset().start(); + } + return streamsSynced; + } + + private Set syncStreamsFromLeaseTableOnAppInit() + throws DependencyException, ProvisionedThroughputException, InvalidStateException { + if (!leasesSyncedOnAppInit && isMultiStreamMode) { + final Set streamIdentifiers = leaseCoordinator.leaseRefresher().listLeases().stream() + .map(lease -> StreamIdentifier.multiStreamInstance(((MultiStreamLease) lease).streamIdentifier())) + .collect(Collectors.toSet()); + for (StreamIdentifier streamIdentifier : streamIdentifiers) { + if (!currentStreamConfigMap.containsKey(streamIdentifier)) { + currentStreamConfigMap.put(streamIdentifier, getDefaultStreamConfig(streamIdentifier)); + } + } + leasesSyncedOnAppInit = true; + } + return Collections.emptySet(); + } + + // When a stream is no longer needed to be tracked, return a default StreamConfig with LATEST for faster shard end. + private StreamConfig getDefaultStreamConfig(StreamIdentifier streamIdentifier) { + return new StreamConfig(streamIdentifier, InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)); + } + /** * Returns whether worker can shutdown immediately. Note that this method is called from Worker's {{@link #run()} * method before every loop run, so method must do minimum amount of work to not impact shard processing timings. @@ -588,7 +685,8 @@ public class Scheduler implements Runnable { if (!firstItem) { builder.append(", "); } - builder.append(shardInfo.shardId()); + builder.append(shardInfo.streamIdentifierSerOpt().map(s -> s + ":" + shardInfo.shardId()) + .orElse(shardInfo.shardId())); firstItem = false; } slog.info("Current stream shard assignments: " + builder.toString()); @@ -624,8 +722,8 @@ public class Scheduler implements Runnable { return consumer; } - private ShardSyncTaskManager createOrGetShardSyncTaskManager(StreamIdentifier streamIdentifier) { - return streamToShardSyncTaskManagerMap.computeIfAbsent(streamIdentifier, s -> shardSyncTaskManagerProvider.apply(s)); + private ShardSyncTaskManager createOrGetShardSyncTaskManager(StreamConfig streamConfig) { + return streamToShardSyncTaskManagerMap.computeIfAbsent(streamConfig, s -> shardSyncTaskManagerProvider.apply(s)); } protected ShardConsumer buildConsumer(@NonNull final ShardInfo shardInfo, @@ -637,8 +735,10 @@ public class Scheduler implements Runnable { // get the default stream name for the single stream application. final StreamIdentifier streamIdentifier = getStreamIdentifier(shardInfo.streamIdentifierSerOpt()); // Irrespective of single stream app or multi stream app, streamConfig should always be available. - final StreamConfig streamConfig = currentStreamConfigMap.get(streamIdentifier); - Validate.notNull(streamConfig, "StreamConfig should not be empty"); + // If we have a shardInfo, that is not present in currentStreamConfigMap for whatever reason, then return default stream config + // to gracefully complete the reading. + final StreamConfig streamConfig = currentStreamConfigMap.getOrDefault(streamIdentifier, getDefaultStreamConfig(streamIdentifier)); + Validate.notNull(streamConfig, "StreamConfig should not be null"); ShardConsumerArgument argument = new ShardConsumerArgument(shardInfo, streamConfig.streamIdentifier(), leaseCoordinator, @@ -657,7 +757,7 @@ public class Scheduler implements Runnable { streamConfig.initialPositionInStreamExtended(), cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, - shardDetectorProvider.apply(streamConfig.streamIdentifier()), + shardDetectorProvider.apply(streamConfig), aggregatorUtil, hierarchicalShardSyncer, metricsFactory); @@ -682,7 +782,8 @@ public class Scheduler implements Runnable { ShardConsumer consumer = shardInfoShardConsumerMap.get(shard); if (consumer.leaseLost()) { shardInfoShardConsumerMap.remove(shard); - log.debug("Removed consumer for {} as lease has been lost", shard.shardId()); + log.debug("Removed consumer for {} as lease has been lost", + shard.streamIdentifierSerOpt().map(s -> s + ":" + shard.shardId()).orElse(shard.shardId())); } else { consumer.executeLifecycle(); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java index f4143581..6aaa81c4 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java @@ -24,6 +24,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.function.BiFunction; import java.util.function.Function; @@ -64,6 +65,8 @@ public class HierarchicalShardSyncer { private final boolean isMultiStreamMode; + private String streamIdentifier = ""; + public HierarchicalShardSyncer() { isMultiStreamMode = false; } @@ -99,6 +102,7 @@ public class HierarchicalShardSyncer { final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards, final MetricsScope scope) throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { + this.streamIdentifier = shardDetector.streamIdentifier().serialize(); final List latestShards = getShardList(shardDetector); checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, latestShards); @@ -110,8 +114,9 @@ public class HierarchicalShardSyncer { final boolean ignoreUnexpectedChildShards, final MetricsScope scope, List latestShards) throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { + this.streamIdentifier = shardDetector.streamIdentifier().serialize(); if (!CollectionUtils.isNullOrEmpty(latestShards)) { - log.debug("Num shards: {}", latestShards.size()); + log.debug("{} - Num shards: {}", streamIdentifier, latestShards.size()); } final Map shardIdToShardMap = constructShardIdToShardMap(latestShards); @@ -127,7 +132,7 @@ public class HierarchicalShardSyncer { final MultiStreamArgs multiStreamArgs = new MultiStreamArgs(isMultiStreamMode, shardDetector.streamIdentifier()); final List newLeasesToCreate = determineNewLeasesToCreate(latestShards, currentLeases, initialPosition, inconsistentShardIds, multiStreamArgs); - log.debug("Num new leases to create: {}", newLeasesToCreate.size()); + log.debug("{} - Num new leases to create: {}", streamIdentifier, newLeasesToCreate.size()); for (Lease lease : newLeasesToCreate) { long startTime = System.currentTimeMillis(); boolean success = false; @@ -218,7 +223,7 @@ public class HierarchicalShardSyncer { for (String shardId : shardIdsOfClosedShards) { final Shard shard = shardIdToShardMap.get(shardId); if (shard == null) { - log.info("Shard {} is not present in Kinesis anymore.", shardId); + log.info("{} : Shard {} is not present in Kinesis anymore.", streamIdentifier, shardId); continue; } @@ -360,25 +365,26 @@ public class HierarchicalShardSyncer { final MultiStreamArgs multiStreamArgs) { final Map shardIdToNewLeaseMap = new HashMap<>(); final Map shardIdToShardMapOfAllKinesisShards = constructShardIdToShardMap(shards); - + final String streamIdentifier = Optional.ofNullable(multiStreamArgs.streamIdentifier()) + .map(streamId -> streamId.serialize()).orElse(""); final Set shardIdsOfCurrentLeases = currentLeases.stream() - .peek(lease -> log.debug("Existing lease: {}", lease)) + .peek(lease -> log.debug("{} : Existing lease: {}", streamIdentifier, lease)) .map(lease -> shardIdFromLeaseDeducer.apply(lease, multiStreamArgs)) .collect(Collectors.toSet()); - final List openShards = getOpenShards(shards); + final List openShards = getOpenShards(shards, streamIdentifier); final Map memoizationContext = new HashMap<>(); // Iterate over the open shards and find those that don't have any lease entries. for (Shard shard : openShards) { final String shardId = shard.shardId(); - log.debug("Evaluating leases for open shard {} and its ancestors.", shardId); + log.debug("{} : Evaluating leases for open shard {} and its ancestors.", streamIdentifier, shardId); if (shardIdsOfCurrentLeases.contains(shardId)) { - log.debug("Lease for shardId {} already exists. Not creating a lease", shardId); + log.debug("{} : Lease for shardId {} already exists. Not creating a lease", streamIdentifier, shardId); } else if (inconsistentShardIds.contains(shardId)) { - log.info("shardId {} is an inconsistent child. Not creating a lease", shardId); + log.info("{} : shardId {} is an inconsistent child. Not creating a lease", streamIdentifier, shardId); } else { - log.debug("Need to create a lease for shardId {}", shardId); + log.debug("{} : Need to create a lease for shardId {}", streamIdentifier, shardId); final Lease newLease = multiStreamArgs.isMultiStreamMode() ? newKCLMultiStreamLease(shard, multiStreamArgs.streamIdentifier()) : newKCLLease(shard); @@ -415,7 +421,7 @@ public class HierarchicalShardSyncer { } else { newLease.checkpoint(convertToCheckpoint(initialPosition)); } - log.debug("Set checkpoint of {} to {}", newLease.leaseKey(), newLease.checkpoint()); + log.debug("{} : Set checkpoint of {} to {}", streamIdentifier, newLease.leaseKey(), newLease.checkpoint()); shardIdToNewLeaseMap.put(shardId, newLease); } } @@ -464,7 +470,7 @@ public class HierarchicalShardSyncer { final Map shardIdToShardMapOfAllKinesisShards, final Map shardIdToLeaseMapOfNewShards, final Map memoizationContext, final MultiStreamArgs multiStreamArgs) { - + final String streamIdentifier = getStreamIdentifier(multiStreamArgs); final Boolean previousValue = memoizationContext.get(shardId); if (previousValue != null) { return previousValue; @@ -489,9 +495,9 @@ public class HierarchicalShardSyncer { memoizationContext, multiStreamArgs)) { isDescendant = true; descendantParentShardIds.add(parentShardId); - log.debug("Parent shard {} is a descendant.", parentShardId); + log.debug("{} : Parent shard {} is a descendant.", streamIdentifier, parentShardId); } else { - log.debug("Parent shard {} is NOT a descendant.", parentShardId); + log.debug("{} : Parent shard {} is NOT a descendant.", streamIdentifier, parentShardId); } } @@ -499,7 +505,7 @@ public class HierarchicalShardSyncer { if (isDescendant) { for (String parentShardId : parentShardIds) { if (!shardIdsOfCurrentLeases.contains(parentShardId)) { - log.debug("Need to create a lease for shardId {}", parentShardId); + log.debug("{} : Need to create a lease for shardId {}", streamIdentifier, parentShardId); Lease lease = shardIdToLeaseMapOfNewShards.get(parentShardId); if (lease == null) { lease = multiStreamArgs.isMultiStreamMode() ? @@ -593,6 +599,7 @@ public class HierarchicalShardSyncer { final List trackedLeases, final LeaseRefresher leaseRefresher, final MultiStreamArgs multiStreamArgs) throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException { + final String streamIdentifier = getStreamIdentifier(multiStreamArgs); final Set kinesisShards = shards.stream().map(Shard::shardId).collect(Collectors.toSet()); // Check if there are leases for non-existent shards @@ -600,14 +607,15 @@ public class HierarchicalShardSyncer { .filter(lease -> isCandidateForCleanup(lease, kinesisShards, multiStreamArgs)).collect(Collectors.toList()); if (!CollectionUtils.isNullOrEmpty(garbageLeases)) { - log.info("Found {} candidate leases for cleanup. Refreshing list of" - + " Kinesis shards to pick up recent/latest shards", garbageLeases.size()); + log.info("{} : Found {} candidate leases for cleanup. Refreshing list of" + + " Kinesis shards to pick up recent/latest shards", streamIdentifier, garbageLeases.size()); final Set currentKinesisShardIds = getShardList(shardDetector).stream().map(Shard::shardId) .collect(Collectors.toSet()); for (Lease lease : garbageLeases) { if (isCandidateForCleanup(lease, currentKinesisShardIds, multiStreamArgs)) { - log.info("Deleting lease for shard {} as it is not present in Kinesis stream.", lease.leaseKey()); + log.info("{} : Deleting lease for shard {} as it is not present in Kinesis stream.", + streamIdentifier, lease.leaseKey()); leaseRefresher.deleteLease(lease); } } @@ -627,14 +635,16 @@ public class HierarchicalShardSyncer { static boolean isCandidateForCleanup(final Lease lease, final Set currentKinesisShardIds, final MultiStreamArgs multiStreamArgs) throws KinesisClientLibIOException { - boolean isCandidateForCleanup = true; + final String streamIdentifier = getStreamIdentifier(multiStreamArgs); + + boolean isCandidateForCleanup = true; final String shardId = shardIdFromLeaseDeducer.apply(lease, multiStreamArgs); if (currentKinesisShardIds.contains(shardId)) { isCandidateForCleanup = false; } else { - log.info("Found lease for non-existent shard: {}. Checking its parent shards", shardId); + log.info("{} : Found lease for non-existent shard: {}. Checking its parent shards", streamIdentifier, shardId); final Set parentShardIds = lease.parentShardIds(); for (String parentShardId : parentShardIds) { @@ -643,7 +653,7 @@ public class HierarchicalShardSyncer { if (currentKinesisShardIds.contains(parentShardId)) { final String message = String.format("Parent shard %s exists but not the child shard %s", parentShardId, shardId); - log.info(message); + log.info("{} : {}", streamIdentifier, message); throw new KinesisClientLibIOException(message); } } @@ -730,8 +740,8 @@ public class HierarchicalShardSyncer { } if (okayToDelete) { - log.info("Deleting lease for shard {} as it has been completely processed and processing of child " - + "shards has begun.", shardIdFromLeaseDeducer.apply(leaseForClosedShard, multiStreamArgs)); + log.info("{} : Deleting lease for shard {} as it has been completely processed and processing of child " + + "shards has begun.", streamIdentifier, shardIdFromLeaseDeducer.apply(leaseForClosedShard, multiStreamArgs)); leaseRefresher.deleteLease(leaseForClosedShard); } } @@ -794,9 +804,9 @@ public class HierarchicalShardSyncer { * @param allShards All shards returved via DescribeStream. We assume this to represent a consistent shard list. * @return List of open shards (shards at the tip of the stream) - may include shards that are not yet active. */ - static List getOpenShards(final List allShards) { + static List getOpenShards(final List allShards, final String streamIdentifier) { return allShards.stream().filter(shard -> shard.sequenceNumberRange().endingSequenceNumber() == null) - .peek(shard -> log.debug("Found open shard: {}", shard.shardId())).collect(Collectors.toList()); + .peek(shard -> log.debug("{} : Found open shard: {}", streamIdentifier, shard.shardId())).collect(Collectors.toList()); } private static ExtendedSequenceNumber convertToCheckpoint(final InitialPositionInStreamExtended position) { @@ -812,6 +822,11 @@ public class HierarchicalShardSyncer { return checkpoint; } + + private static String getStreamIdentifier(MultiStreamArgs multiStreamArgs) { + return Optional.ofNullable(multiStreamArgs.streamIdentifier()) + .map(streamId -> streamId.serialize()).orElse("single_stream_mode"); + } /** Helper class to compare leases based on starting sequence number of the corresponding shards. * diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTask.java index 4ea4212e..37a092e8 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTask.java @@ -54,7 +54,8 @@ public class BlockOnParentShardTask implements ConsumerTask { @Override public TaskResult call() { Exception exception = null; - + final String shardInfoId = shardInfo.streamIdentifierSerOpt().map(s -> s + ":" + shardInfo.shardId()) + .orElse(shardInfo.shardId()); try { boolean blockedOnParentShard = false; for (String shardId : shardInfo.parentShardIds()) { @@ -62,20 +63,20 @@ public class BlockOnParentShardTask implements ConsumerTask { if (lease != null) { ExtendedSequenceNumber checkpoint = lease.checkpoint(); if ((checkpoint == null) || (!checkpoint.equals(ExtendedSequenceNumber.SHARD_END))) { - log.debug("Shard {} is not yet done. Its current checkpoint is {}", shardId, checkpoint); + log.debug("Shard {} is not yet done. Its current checkpoint is {}", shardInfoId, checkpoint); blockedOnParentShard = true; exception = new BlockedOnParentShardException("Parent shard not yet done"); break; } else { - log.debug("Shard {} has been completely processed.", shardId); + log.debug("Shard {} has been completely processed.", shardInfoId); } } else { - log.info("No lease found for shard {}. Not blocking on completion of this shard.", shardId); + log.info("No lease found for shard {}. Not blocking on completion of this shard.", shardInfoId); } } if (!blockedOnParentShard) { - log.info("No need to block on parents {} of shard {}", shardInfo.parentShardIds(), shardInfo.shardId()); + log.info("No need to block on parents {} of shard {}", shardInfo.parentShardIds(), shardInfoId); return new TaskResult(null); } } catch (Exception e) { @@ -85,7 +86,7 @@ public class BlockOnParentShardTask implements ConsumerTask { try { Thread.sleep(parentShardPollIntervalMillis); } catch (InterruptedException e) { - log.error("Sleep interrupted when waiting on parent shard(s) of {}", shardInfo.shardId(), e); + log.error("Sleep interrupted when waiting on parent shard(s) of {}", shardInfoId, e); } return new TaskResult(exception); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java index 6c223650..fd036c9f 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java @@ -60,6 +60,7 @@ public class ProcessTask implements ConsumerTask { private final ProcessRecordsInput processRecordsInput; private final MetricsFactory metricsFactory; private final AggregatorUtil aggregatorUtil; + private final String shardInfoId; public ProcessTask(@NonNull ShardInfo shardInfo, @NonNull ShardRecordProcessor shardRecordProcessor, @@ -74,6 +75,8 @@ public class ProcessTask implements ConsumerTask { @NonNull AggregatorUtil aggregatorUtil, @NonNull MetricsFactory metricsFactory) { this.shardInfo = shardInfo; + this.shardInfoId = shardInfo.streamIdentifierSerOpt().map(s -> s + ":" + shardInfo.shardId()) + .orElse(shardInfo.shardId()); this.shardRecordProcessor = shardRecordProcessor; this.recordProcessorCheckpointer = recordProcessorCheckpointer; this.backoffTimeMillis = backoffTimeMillis; @@ -121,7 +124,7 @@ public class ProcessTask implements ConsumerTask { } if (processRecordsInput.isAtShardEnd() && processRecordsInput.records().isEmpty()) { - log.info("Reached end of shard {} and have no records to process", shardInfo.shardId()); + log.info("Reached end of shard {} and have no records to process", shardInfoId); return new TaskResult(null, true); } @@ -142,13 +145,13 @@ public class ProcessTask implements ConsumerTask { } success = true; } catch (RuntimeException e) { - log.error("ShardId {}: Caught exception: ", shardInfo.shardId(), e); + log.error("ShardId {}: Caught exception: ", shardInfoId, e); exception = e; backoff(); } if (processRecordsInput.isAtShardEnd()) { - log.info("Reached end of shard {}, and processed {} records", shardInfo.shardId(), processRecordsInput.records().size()); + log.info("Reached end of shard {}, and processed {} records", shardInfoId, processRecordsInput.records().size()); return new TaskResult(null, true); } return new TaskResult(exception); @@ -174,7 +177,7 @@ public class ProcessTask implements ConsumerTask { try { Thread.sleep(this.backoffTimeMillis); } catch (InterruptedException ie) { - log.debug("{}: Sleep was interrupted", shardInfo.shardId(), ie); + log.debug("{}: Sleep was interrupted", shardInfoId, ie); } } @@ -188,7 +191,7 @@ public class ProcessTask implements ConsumerTask { */ private void callProcessRecords(ProcessRecordsInput input, List records) { log.debug("Calling application processRecords() with {} records from {}", records.size(), - shardInfo.shardId()); + shardInfoId); final ProcessRecordsInput processRecordsInput = ProcessRecordsInput.builder().records(records).cacheExitTime(input.cacheExitTime()).cacheEntryTime(input.cacheEntryTime()) .checkpointer(recordProcessorCheckpointer).millisBehindLatest(input.millisBehindLatest()).build(); @@ -200,8 +203,8 @@ public class ProcessTask implements ConsumerTask { shardRecordProcessor.processRecords(processRecordsInput); } catch (Exception e) { log.error("ShardId {}: Application processRecords() threw an exception when processing shard ", - shardInfo.shardId(), e); - log.error("ShardId {}: Skipping over the following data records: {}", shardInfo.shardId(), records); + shardInfoId, e); + log.error("ShardId {}: Skipping over the following data records: {}", shardInfoId, records); } finally { MetricsUtil.addLatency(scope, RECORD_PROCESSOR_PROCESS_RECORDS_METRIC, startTime, MetricsLevel.SUMMARY); MetricsUtil.endScope(scope); @@ -226,17 +229,17 @@ public class ProcessTask implements ConsumerTask { * the time when the task started */ private void handleNoRecords(long startTimeMillis) { - log.debug("Kinesis didn't return any records for shard {}", shardInfo.shardId()); + log.debug("Kinesis didn't return any records for shard {}", shardInfoId); long sleepTimeMillis = idleTimeInMilliseconds - (System.currentTimeMillis() - startTimeMillis); if (sleepTimeMillis > 0) { sleepTimeMillis = Math.max(sleepTimeMillis, idleTimeInMilliseconds); try { log.debug("Sleeping for {} ms since there were no new records in shard {}", sleepTimeMillis, - shardInfo.shardId()); + shardInfoId); Thread.sleep(sleepTimeMillis); } catch (InterruptedException e) { - log.debug("ShardId {}: Sleep was interrupted", shardInfo.shardId()); + log.debug("ShardId {}: Sleep was interrupted", shardInfoId); } } } @@ -273,8 +276,8 @@ public class ProcessTask implements ConsumerTask { if (extendedSequenceNumber.compareTo(lastCheckpointValue) <= 0) { recordIterator.remove(); - log.debug("removing record with ESN {} because the ESN is <= checkpoint ({})", extendedSequenceNumber, - lastCheckpointValue); + log.debug("{} : removing record with ESN {} because the ESN is <= checkpoint ({})", shardInfoId, + extendedSequenceNumber, lastCheckpointValue); continue; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java index 99a680bf..e34f2ea4 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java @@ -62,6 +62,7 @@ public class ShardConsumer { private final Function taskMetricsDecorator; private final int bufferSize; private final TaskExecutionListener taskExecutionListener; + private final String streamIdentifier; private ConsumerTask currentTask; private TaskOutcome taskOutcome; @@ -124,6 +125,7 @@ public class ShardConsumer { this.recordsPublisher = recordsPublisher; this.executorService = executorService; this.shardInfo = shardInfo; + this.streamIdentifier = shardInfo.streamIdentifierSerOpt().orElse("single_stream_mode"); this.shardConsumerArgument = shardConsumerArgument; this.logWarningForTaskAfterMillis = logWarningForTaskAfterMillis; this.taskExecutionListener = taskExecutionListener; @@ -208,8 +210,8 @@ public class ShardConsumer { } Throwable dispatchFailure = subscriber.getAndResetDispatchFailure(); if (dispatchFailure != null) { - log.warn("Exception occurred while dispatching incoming data. The incoming data has been skipped", - dispatchFailure); + log.warn("{} : Exception occurred while dispatching incoming data. The incoming data has been skipped", + streamIdentifier, dispatchFailure); return dispatchFailure; } @@ -238,7 +240,7 @@ public class ShardConsumer { Instant now = Instant.now(); Duration timeSince = Duration.between(subscriber.lastDataArrival(), now); if (timeSince.toMillis() > value) { - log.warn("Last time data arrived: {} ({})", lastDataArrival, timeSince); + log.warn("{} : Last time data arrived: {} ({})", streamIdentifier, lastDataArrival, timeSince); } } }); @@ -250,11 +252,11 @@ public class ShardConsumer { if (taken != null) { String message = longRunningTaskMessage(taken); if (log.isDebugEnabled()) { - log.debug("{} Not submitting new task.", message); + log.debug("{} : {} Not submitting new task.", streamIdentifier, message); } logWarningForTaskAfterMillis.ifPresent(value -> { if (taken.toMillis() > value) { - log.warn(message); + log.warn("{} : {}", streamIdentifier, message); } }); } @@ -358,7 +360,7 @@ public class ShardConsumer { nextState = currentState.failureTransition(); break; default: - log.error("No handler for outcome of {}", outcome.name()); + log.error("{} : No handler for outcome of {}", streamIdentifier, outcome.name()); nextState = currentState.failureTransition(); break; } @@ -382,9 +384,9 @@ public class ShardConsumer { Exception taskException = taskResult.getException(); if (taskException instanceof BlockedOnParentShardException) { // No need to log the stack trace for this exception (it is very specific). - log.debug("Shard {} is blocked on completion of parent shard.", shardInfo.shardId()); + log.debug("{} : Shard {} is blocked on completion of parent shard.", streamIdentifier, shardInfo.shardId()); } else { - log.debug("Caught exception running {} task: ", currentTask.taskType(), taskResult.getException()); + log.debug("{} : Caught exception running {} task: ", streamIdentifier, currentTask.taskType(), taskResult.getException()); } } } @@ -411,10 +413,10 @@ public class ShardConsumer { * @return true if shutdown is complete (false if shutdown is still in progress) */ public boolean leaseLost() { - log.debug("Shutdown({}): Lease lost triggered.", shardInfo.shardId()); + log.debug("{} : Shutdown({}): Lease lost triggered.", streamIdentifier, shardInfo.shardId()); if (subscriber != null) { subscriber.cancel(); - log.debug("Shutdown({}): Subscriber cancelled.", shardInfo.shardId()); + log.debug("{} : Shutdown({}): Subscriber cancelled.", streamIdentifier, shardInfo.shardId()); } markForShutdown(ShutdownReason.LEASE_LOST); return isShutdown(); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java index 4c05ac94..177c0f43 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java @@ -40,8 +40,8 @@ class ShardConsumerSubscriber implements Subscriber { private final int bufferSize; private final ShardConsumer shardConsumer; private final int readTimeoutsToIgnoreBeforeWarning; + private final String shardInfoId; private volatile int readTimeoutSinceLastRead = 0; - @VisibleForTesting final Object lockObject = new Object(); // This holds the last time an attempt of request to upstream service was made including the first try to @@ -70,6 +70,8 @@ class ShardConsumerSubscriber implements Subscriber { this.bufferSize = bufferSize; this.shardConsumer = shardConsumer; this.readTimeoutsToIgnoreBeforeWarning = readTimeoutsToIgnoreBeforeWarning; + this.shardInfoId = shardConsumer.shardInfo().streamIdentifierSerOpt() + .map(s -> s + ":" + shardConsumer.shardInfo().shardId()).orElse(shardConsumer.shardInfo().shardId()); } @@ -107,7 +109,7 @@ class ShardConsumerSubscriber implements Subscriber { if (retrievalFailure != null) { synchronized (lockObject) { String logMessage = String.format("%s: Failure occurred in retrieval. Restarting data requests", - shardConsumer.shardInfo().shardId()); + shardInfoId); if (retrievalFailure instanceof RetryableRetrievalException) { log.debug(logMessage, retrievalFailure.getCause()); } else { @@ -130,7 +132,7 @@ class ShardConsumerSubscriber implements Subscriber { if (timeSinceLastResponse.toMillis() > maxTimeBetweenRequests) { log.error( "{}: Last request was dispatched at {}, but no response as of {} ({}). Cancelling subscription, and restarting. Last successful request details -- {}", - shardConsumer.shardInfo().shardId(), lastRequestTime, now, timeSinceLastResponse, recordsPublisher.getLastSuccessfulRequestDetails()); + shardInfoId, lastRequestTime, now, timeSinceLastResponse, recordsPublisher.getLastSuccessfulRequestDetails()); cancel(); // Start the subscription again which will update the lastRequestTime as well. @@ -157,7 +159,7 @@ class ShardConsumerSubscriber implements Subscriber { subscription); } catch (Throwable t) { - log.warn("{}: Caught exception from handleInput", shardConsumer.shardInfo().shardId(), t); + log.warn("{}: Caught exception from handleInput", shardInfoId, t); synchronized (lockObject) { dispatchFailure = t; } @@ -193,7 +195,7 @@ class ShardConsumerSubscriber implements Subscriber { log.warn( "{}: onError(). Cancelling subscription, and marking self as failed. KCL will " + "recreate the subscription as neccessary to continue processing.", - shardConsumer.shardInfo().shardId(), t); + shardInfoId, t); } protected void logOnErrorReadTimeoutWarning(Throwable t) { @@ -202,13 +204,13 @@ class ShardConsumerSubscriber implements Subscriber { + "are seeing this warning frequently consider increasing the SDK timeouts " + "by providing an OverrideConfiguration to the kinesis client. Alternatively you" + "can configure LifecycleConfig.readTimeoutsToIgnoreBeforeWarning to suppress" - + "intermittant ReadTimeout warnings.", shardConsumer.shardInfo().shardId(), t); + + "intermittant ReadTimeout warnings.", shardInfoId, t); } @Override public void onComplete() { log.debug("{}: onComplete(): Received onComplete. Activity should be triggered externally", - shardConsumer.shardInfo().shardId()); + shardInfoId); } public void cancel() { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index 2bfcd358..81b954a6 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -82,6 +82,8 @@ public class ShutdownTask implements ConsumerTask { private final TaskType taskType = TaskType.SHUTDOWN; + private String shardInfoId = shardInfo.streamIdentifierSerOpt().map(s -> s + ":" + shardInfo.shardId()) + .orElse(shardInfo.shardId()); /* * Invokes ShardRecordProcessor shutdown() API. * (non-Javadoc) @@ -112,7 +114,7 @@ public class ShutdownTask implements ConsumerTask { if (CollectionUtils.isNullOrEmpty(latestShards) || !isShardInContextParentOfAny(latestShards)) { localReason = ShutdownReason.LEASE_LOST; dropLease(); - log.info("Forcing the lease to be lost before shutting down the consumer for Shard: " + shardInfo.shardId()); + log.info("Forcing the lease to be lost before shutting down the consumer for Shard: " + shardInfoId); } } @@ -124,7 +126,7 @@ public class ShutdownTask implements ConsumerTask { } log.debug("Invoking shutdown() for shard {}, concurrencyToken {}. Shutdown reason: {}", - shardInfo.shardId(), shardInfo.concurrencyToken(), localReason); + shardInfoId, shardInfo.concurrencyToken(), localReason); final ShutdownInput shutdownInput = ShutdownInput.builder().shutdownReason(localReason) .checkpointer(recordProcessorCheckpointer).build(); final long startTime = System.currentTimeMillis(); @@ -135,7 +137,7 @@ public class ShutdownTask implements ConsumerTask { if (lastCheckpointValue == null || !lastCheckpointValue.equals(ExtendedSequenceNumber.SHARD_END)) { throw new IllegalArgumentException("Application didn't checkpoint at end of shard " - + shardInfo.shardId() + ". Application must checkpoint upon shard end. " + + + shardInfoId + ". Application must checkpoint upon shard end. " + "See ShardRecordProcessor.shardEnded javadocs for more information."); } } else { @@ -143,7 +145,7 @@ public class ShutdownTask implements ConsumerTask { } log.debug("Shutting down retrieval strategy."); recordsPublisher.shutdown(); - log.debug("Record processor completed shutdown() for shard {}", shardInfo.shardId()); + log.debug("Record processor completed shutdown() for shard {}", shardInfoId); } catch (Exception e) { applicationException = true; throw e; @@ -152,11 +154,11 @@ public class ShutdownTask implements ConsumerTask { } if (localReason == ShutdownReason.SHARD_END) { - log.debug("Looking for child shards of shard {}", shardInfo.shardId()); + log.debug("Looking for child shards of shard {}", shardInfoId); // create leases for the child shards hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseCoordinator.leaseRefresher(), initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, latestShards); - log.debug("Finished checking for child shards of shard {}", shardInfo.shardId()); + log.debug("Finished checking for child shards of shard {}", shardInfoId); } return new TaskResult(null); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java index 98046b6b..746fdc19 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java @@ -105,7 +105,7 @@ public class RetrievalConfig { this.applicationName = applicationName; } - public void initialPositionInStreamExtended(InitialPositionInStreamExtended initialPositionInStreamExtended) { + public RetrievalConfig initialPositionInStreamExtended(InitialPositionInStreamExtended initialPositionInStreamExtended) { final StreamConfig[] streamConfig = new StreamConfig[1]; this.appStreamTracker.apply(multiStreamTracker -> { throw new IllegalArgumentException( @@ -113,6 +113,7 @@ public class RetrievalConfig { }, sc -> streamConfig[0] = sc); this.appStreamTracker = Either .right(new StreamConfig(streamConfig[0].streamIdentifier(), initialPositionInStreamExtended)); + return this; } public RetrievalFactory retrievalFactory() { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java index 27cad136..c24a3803 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java @@ -51,7 +51,6 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import java.time.Instant; import java.util.Collections; import java.util.List; -import java.util.Optional; import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -60,7 +59,6 @@ import java.util.stream.Collectors; import static software.amazon.kinesis.common.DiagnosticUtils.takeDelayedDeliveryActionIfRequired; -@RequiredArgsConstructor @Slf4j @KinesisClientInternalApi public class FanOutRecordsPublisher implements RecordsPublisher { @@ -73,7 +71,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { private final KinesisAsyncClient kinesis; private final String shardId; private final String consumerArn; - + private final String streamAndShardId; private final Object lockObject = new Object(); private final AtomicInteger subscribeToShardId = new AtomicInteger(0); @@ -91,11 +89,25 @@ public class FanOutRecordsPublisher implements RecordsPublisher { private RequestDetails lastSuccessfulRequestDetails = new RequestDetails(); + public FanOutRecordsPublisher(KinesisAsyncClient kinesis, String shardId, String consumerArn) { + this.kinesis = kinesis; + this.shardId = shardId; + this.consumerArn = consumerArn; + this.streamAndShardId = shardId; + } + + public FanOutRecordsPublisher(KinesisAsyncClient kinesis, String shardId, String consumerArn, String streamIdentifierSer) { + this.kinesis = kinesis; + this.shardId = shardId; + this.consumerArn = consumerArn; + this.streamAndShardId = streamIdentifierSer + ":" + shardId; + } + @Override public void start(ExtendedSequenceNumber extendedSequenceNumber, InitialPositionInStreamExtended initialPositionInStreamExtended) { synchronized (lockObject) { - log.debug("[{}] Initializing Publisher @ Sequence: {} -- Initial Position: {}", shardId, + log.debug("[{}] Initializing Publisher @ Sequence: {} -- Initial Position: {}", streamAndShardId, extendedSequenceNumber, initialPositionInStreamExtended); this.initialPositionInStreamExtended = initialPositionInStreamExtended; this.currentSequenceNumber = extendedSequenceNumber.sequenceNumber(); @@ -174,7 +186,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { // It is now safe to remove the element recordsDeliveryQueue.poll(); // Take action based on the time spent by the event in queue. - takeDelayedDeliveryActionIfRequired(shardId, recordsRetrievedContext.getEnqueueTimestamp(), log); + takeDelayedDeliveryActionIfRequired(streamAndShardId, recordsRetrievedContext.getEnqueueTimestamp(), log); // Update current sequence number for the successfully delivered event. currentSequenceNumber = ((FanoutRecordsRetrieved)recordsRetrieved).continuationSequenceNumber(); // Update the triggering flow for post scheduling upstream request. @@ -190,13 +202,12 @@ public class FanOutRecordsPublisher implements RecordsPublisher { if (flow != null && recordsDeliveryAck.batchUniqueIdentifier().getFlowIdentifier() .equals(flow.getSubscribeToShardId())) { log.error( - "{}: Received unexpected ack for the active subscription {}. Throwing. ", - shardId, recordsDeliveryAck.batchUniqueIdentifier().getFlowIdentifier()); + "{}: Received unexpected ack for the active subscription {}. Throwing. ", streamAndShardId, recordsDeliveryAck.batchUniqueIdentifier().getFlowIdentifier()); throw new IllegalStateException("Unexpected ack for the active subscription"); } // Otherwise publisher received a stale ack. else { - log.info("{}: Publisher received an ack for stale subscription {}. Ignoring.", shardId, + log.info("{}: Publisher received an ack for stale subscription {}. Ignoring.", streamAndShardId, recordsDeliveryAck.batchUniqueIdentifier().getFlowIdentifier()); } } @@ -219,10 +230,10 @@ public class FanOutRecordsPublisher implements RecordsPublisher { } catch (IllegalStateException e) { log.warn("{}: Unable to enqueue the payload due to capacity restrictions in delivery queue with remaining capacity {}. Last successful request details -- {}", - shardId, recordsDeliveryQueue.remainingCapacity(), lastSuccessfulRequestDetails); + streamAndShardId, recordsDeliveryQueue.remainingCapacity(), lastSuccessfulRequestDetails); throw e; } catch (Throwable t) { - log.error("{}: Unable to deliver event to the shard consumer.", shardId, t); + log.error("{}: Unable to deliver event to the shard consumer.", streamAndShardId, t); throw t; } } @@ -290,7 +301,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { String instanceId = shardId + "-" + subscribeInvocationId; log.debug( "{}: [SubscriptionLifetime]: (FanOutRecordsPublisher#subscribeToShard) @ {} id: {} -- Starting subscribe to shard", - shardId, connectionStart, instanceId); + streamAndShardId, connectionStart, instanceId); flow = new RecordFlow(this, connectionStart, instanceId); kinesis.subscribeToShard(request, flow); } @@ -303,12 +314,12 @@ public class FanOutRecordsPublisher implements RecordsPublisher { if(hasValidFlow()) { log.warn( "{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ {} id: {} -- Subscriber is null." + - " Last successful request details -- {}", shardId, flow.connectionStartedAt, + " Last successful request details -- {}", streamAndShardId, flow.connectionStartedAt, flow.subscribeToShardId, lastSuccessfulRequestDetails); } else { log.warn( "{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) -- Subscriber and flow are null." + - " Last successful request details -- {}", shardId, lastSuccessfulRequestDetails); + " Last successful request details -- {}", streamAndShardId, lastSuccessfulRequestDetails); } return; } @@ -320,8 +331,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { if (flow != null) { String logMessage = String.format( "%s: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ %s id: %s -- %s." + - " Last successful request details -- %s", - shardId, flow.connectionStartedAt, flow.subscribeToShardId, category.throwableTypeString, lastSuccessfulRequestDetails); + " Last successful request details -- %s", streamAndShardId, flow.connectionStartedAt, flow.subscribeToShardId, category.throwableTypeString, lastSuccessfulRequestDetails); switch (category.throwableType) { case READ_TIMEOUT: log.debug(logMessage, propagationThrowable); @@ -339,13 +349,14 @@ public class FanOutRecordsPublisher implements RecordsPublisher { } flow.cancel(); } - log.debug("{}: availableQueueSpace zeroing from {}", shardId, availableQueueSpace); + log.debug("{}: availableQueueSpace zeroing from {}", streamAndShardId, availableQueueSpace); availableQueueSpace = 0; try { handleFlowError(propagationThrowable, triggeringFlow); } catch (Throwable innerThrowable) { - log.warn("{}: Exception while calling subscriber.onError. Last successful request details -- {}", shardId, lastSuccessfulRequestDetails, innerThrowable); + log.warn("{}: Exception while calling subscriber.onError. Last successful request details -- {}", + streamAndShardId, lastSuccessfulRequestDetails, innerThrowable); } subscriber = null; flow = null; @@ -353,7 +364,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { if (triggeringFlow != null) { log.debug( "{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ {} id: {} -- {} -> triggeringFlow wasn't the active flow. Didn't dispatch error", - shardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId, + streamAndShardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId, category.throwableTypeString); triggeringFlow.cancel(); } @@ -367,7 +378,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { // Clear any lingering records in the queue. if (!recordsDeliveryQueue.isEmpty()) { log.warn("{}: Found non-empty queue while starting subscription. This indicates unsuccessful clean up of" - + "previous subscription - {}. Last successful request details -- {}", shardId, subscribeToShardId, lastSuccessfulRequestDetails); + + "previous subscription - {}. Last successful request details -- {}", streamAndShardId, subscribeToShardId, lastSuccessfulRequestDetails); recordsDeliveryQueue.clear(); } } @@ -383,7 +394,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { if (t.getCause() instanceof ResourceNotFoundException) { log.debug( "{}: Could not call SubscribeToShard successfully because shard no longer exists. Marking shard for completion.", - shardId); + streamAndShardId); // The ack received for this onNext event will be ignored by the publisher as the global flow object should // be either null or renewed when the ack's flow identifier is evaluated. FanoutRecordsRetrieved response = new FanoutRecordsRetrieved( @@ -452,7 +463,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { if (!hasValidSubscriber()) { log.debug( "{}: [SubscriptionLifetime] (FanOutRecordsPublisher#recordsReceived) @ {} id: {} -- Subscriber is null.", - shardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId); + streamAndShardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId); triggeringFlow.cancel(); if (flow != null) { flow.cancel(); @@ -462,7 +473,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { if (!isActiveFlow(triggeringFlow)) { log.debug( "{}: [SubscriptionLifetime] (FanOutRecordsPublisher#recordsReceived) @ {} id: {} -- Received records for an inactive flow.", - shardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId); + streamAndShardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId); return; } @@ -478,7 +489,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { bufferCurrentEventAndScheduleIfRequired(recordsRetrieved, triggeringFlow); } catch (Throwable t) { log.warn("{}: Unable to buffer or schedule onNext for subscriber. Failing publisher." + - " Last successful request details -- {}", shardId, lastSuccessfulRequestDetails); + " Last successful request details -- {}", streamAndShardId, lastSuccessfulRequestDetails); errorOccurred(triggeringFlow, t); } } @@ -488,7 +499,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { if (availableQueueSpace <= 0) { log.debug( "{}: [SubscriptionLifetime] (FanOutRecordsPublisher#recordsReceived) @ {} id: {} -- Attempted to decrement availableQueueSpace to below 0", - shardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId); + streamAndShardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId); } else { availableQueueSpace--; if (availableQueueSpace > 0) { @@ -503,12 +514,13 @@ public class FanOutRecordsPublisher implements RecordsPublisher { private void onComplete(RecordFlow triggeringFlow) { synchronized (lockObject) { - log.debug("{}: [SubscriptionLifetime]: (FanOutRecordsPublisher#onComplete) @ {} id: {}", shardId, + log.debug("{}: [SubscriptionLifetime]: (FanOutRecordsPublisher#onComplete) @ {} id: {}", streamAndShardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId); triggeringFlow.cancel(); if (!hasValidSubscriber()) { - log.debug("{}: [SubscriptionLifetime]: (FanOutRecordsPublisher#onComplete) @ {} id: {}", shardId, + log.debug("{}: [SubscriptionLifetime]: (FanOutRecordsPublisher#onComplete) @ {} id: {}", + streamAndShardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId); return; } @@ -516,15 +528,15 @@ public class FanOutRecordsPublisher implements RecordsPublisher { if (!isActiveFlow(triggeringFlow)) { log.debug( "{}: [SubscriptionLifetime]: (FanOutRecordsPublisher#onComplete) @ {} id: {} -- Received spurious onComplete from unexpected flow. Ignoring.", - shardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId); + streamAndShardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId); return; } if (currentSequenceNumber != null) { - log.debug("{}: Shard hasn't ended. Resubscribing.", shardId); + log.debug("{}: Shard hasn't ended. Resubscribing.", streamAndShardId); subscribeToShard(currentSequenceNumber); } else { - log.debug("{}: Shard has ended completing subscriber.", shardId); + log.debug("{}: Shard has ended completing subscriber.", streamAndShardId); subscriber.onComplete(); } } @@ -536,7 +548,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { if (subscriber != null) { log.error( "{}: A subscribe occurred while there was an active subscriber. Sending error to current subscriber", - shardId); + streamAndShardId); MultipleSubscriberException multipleSubscriberException = new MultipleSubscriberException(); // @@ -575,7 +587,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { if (subscriber != s) { log.warn( "{}: (FanOutRecordsPublisher/Subscription#request) - Rejected an attempt to request({}), because subscribers don't match. Last successful request details -- {}", - shardId, n, lastSuccessfulRequestDetails); + streamAndShardId, n, lastSuccessfulRequestDetails); return; } if (flow == null) { @@ -584,7 +596,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { // log.debug( "{}: (FanOutRecordsPublisher/Subscription#request) - Request called for a null flow.", - shardId); + streamAndShardId); errorOccurred(flow, new IllegalStateException("Attempted to request on a null flow.")); return; } @@ -602,19 +614,19 @@ public class FanOutRecordsPublisher implements RecordsPublisher { if (subscriber != s) { log.warn( "{}: (FanOutRecordsPublisher/Subscription#cancel) - Rejected attempt to cancel subscription, because subscribers don't match. Last successful request details -- {}", - shardId, lastSuccessfulRequestDetails); + streamAndShardId, lastSuccessfulRequestDetails); return; } if (!hasValidSubscriber()) { log.warn( "{}: (FanOutRecordsPublisher/Subscription#cancel) - Cancelled called even with an invalid subscriber. Last successful request details -- {}", - shardId, lastSuccessfulRequestDetails); + streamAndShardId, lastSuccessfulRequestDetails); } subscriber = null; if (flow != null) { log.debug( "{}: [SubscriptionLifetime]: (FanOutRecordsPublisher/Subscription#cancel) @ {} id: {}", - shardId, flow.connectionStartedAt, flow.subscribeToShardId); + streamAndShardId, flow.connectionStartedAt, flow.subscribeToShardId); flow.cancel(); availableQueueSpace = 0; } @@ -703,12 +715,12 @@ public class FanOutRecordsPublisher implements RecordsPublisher { public void onEventStream(SdkPublisher publisher) { synchronized (parent.lockObject) { log.debug("{}: [SubscriptionLifetime]: (RecordFlow#onEventStream) @ {} id: {} -- Subscribe", - parent.shardId, connectionStartedAt, subscribeToShardId); + parent.streamAndShardId, connectionStartedAt, subscribeToShardId); if (!parent.isActiveFlow(this)) { this.isDisposed = true; log.debug( "{}: [SubscriptionLifetime]: (RecordFlow#onEventStream) @ {} id: {} -- parent is disposed", - parent.shardId, connectionStartedAt, subscribeToShardId); + parent.streamAndShardId, connectionStartedAt, subscribeToShardId); parent.rejectSubscription(publisher); return; } @@ -716,7 +728,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { try { log.debug( "{}: [SubscriptionLifetime]: (RecordFlow#onEventStream) @ {} id: {} -- creating record subscription", - parent.shardId, connectionStartedAt, subscribeToShardId); + parent.streamAndShardId, connectionStartedAt, subscribeToShardId); subscription = new RecordSubscription(parent, this, connectionStartedAt, subscribeToShardId); publisher.subscribe(subscription); @@ -727,7 +739,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { } catch (Throwable t) { log.debug( "{}: [SubscriptionLifetime]: (RecordFlow#onEventStream) @ {} id: {} -- throwable during record subscription: {}", - parent.shardId, connectionStartedAt, subscribeToShardId, t.getMessage()); + parent.streamAndShardId, connectionStartedAt, subscribeToShardId, t.getMessage()); parent.errorOccurred(this, t); } } @@ -736,7 +748,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { @Override public void responseReceived(SubscribeToShardResponse response) { log.debug("{}: [SubscriptionLifetime]: (RecordFlow#responseReceived) @ {} id: {} -- Response received. Request id - {}", - parent.shardId, connectionStartedAt, subscribeToShardId, response.responseMetadata().requestId()); + parent.streamAndShardId, connectionStartedAt, subscribeToShardId, response.responseMetadata().requestId()); final RequestDetails requestDetails = new RequestDetails(response.responseMetadata().requestId(), connectionStartedAt.toString()); parent.setLastSuccessfulRequestDetails(requestDetails); @@ -759,12 +771,12 @@ public class FanOutRecordsPublisher implements RecordsPublisher { synchronized (parent.lockObject) { log.debug("{}: [SubscriptionLifetime]: (RecordFlow#exceptionOccurred) @ {} id: {} -- {}: {}", - parent.shardId, connectionStartedAt, subscribeToShardId, throwable.getClass().getName(), + parent.streamAndShardId, connectionStartedAt, subscribeToShardId, throwable.getClass().getName(), throwable.getMessage()); if (this.isDisposed) { log.debug( "{}: [SubscriptionLifetime]: (RecordFlow#exceptionOccurred) @ {} id: {} -- This flow has been disposed, not dispatching error. {}: {}", - parent.shardId, connectionStartedAt, subscribeToShardId, throwable.getClass().getName(), + parent.streamAndShardId, connectionStartedAt, subscribeToShardId, throwable.getClass().getName(), throwable.getMessage()); this.isErrorDispatched = true; } @@ -775,7 +787,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { } else { log.debug( "{}: [SubscriptionLifetime]: (RecordFlow#exceptionOccurred) @ {} id: {} -- An error has previously been dispatched, not dispatching this error {}: {}", - parent.shardId, connectionStartedAt, subscribeToShardId, throwable.getClass().getName(), + parent.streamAndShardId, connectionStartedAt, subscribeToShardId, throwable.getClass().getName(), throwable.getMessage()); } } @@ -802,7 +814,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { } catch (Exception e) { log.warn( "{}: Unable to enqueue the {} shutdown event due to capacity restrictions in delivery queue with remaining capacity {}. Ignoring. Last successful request details -- {}", - parent.shardId, subscriptionShutdownEvent.getEventIdentifier(), parent.recordsDeliveryQueue.remainingCapacity(), + parent.streamAndShardId, subscriptionShutdownEvent.getEventIdentifier(), parent.recordsDeliveryQueue.remainingCapacity(), parent.lastSuccessfulRequestDetails, subscriptionShutdownEvent.getShutdownEventThrowableOptional()); } } @@ -810,7 +822,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { private void executeComplete() { synchronized (parent.lockObject) { log.debug("{}: [SubscriptionLifetime]: (RecordFlow#complete) @ {} id: {} -- Connection completed", - parent.shardId, connectionStartedAt, subscribeToShardId); + parent.streamAndShardId, connectionStartedAt, subscribeToShardId); if (isCancelled) { // @@ -820,13 +832,13 @@ public class FanOutRecordsPublisher implements RecordsPublisher { // subscription, which was cancelled for a reason (usually queue overflow). // log.warn("{}: complete called on a cancelled subscription. Ignoring completion. Last successful request details -- {}", - parent.shardId, parent.lastSuccessfulRequestDetails); + parent.streamAndShardId, parent.lastSuccessfulRequestDetails); return; } if (this.isDisposed) { log.warn( "{}: [SubscriptionLifetime]: (RecordFlow#complete) @ {} id: {} -- This flow has been disposed not dispatching completion. Last successful request details -- {}", - parent.shardId, connectionStartedAt, subscribeToShardId, parent.lastSuccessfulRequestDetails); + parent.streamAndShardId, connectionStartedAt, subscribeToShardId, parent.lastSuccessfulRequestDetails); return; } @@ -844,7 +856,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { } catch (Throwable t) { log.error( "{}: [SubscriptionLifetime]: (RecordFlow#complete) @ {} id: {} -- Exception while trying to cancel failed subscription: {}", - parent.shardId, connectionStartedAt, subscribeToShardId, t.getMessage(), t); + parent.streamAndShardId, connectionStartedAt, subscribeToShardId, t.getMessage(), t); } } } @@ -885,14 +897,14 @@ public class FanOutRecordsPublisher implements RecordsPublisher { public void cancel() { synchronized (parent.lockObject) { log.debug("{}: [SubscriptionLifetime]: (RecordSubscription#cancel) @ {} id: {} -- Cancel called", - parent.shardId, connectionStartedAt, subscribeToShardId); + parent.streamAndShardId, connectionStartedAt, subscribeToShardId); flow.isCancelled = true; if (subscription != null) { subscription.cancel(); } else { log.debug( "{}: [SubscriptionLifetime]: (RecordSubscription#cancel) @ {} id: {} -- SDK subscription is null", - parent.shardId, connectionStartedAt, subscribeToShardId); + parent.streamAndShardId, connectionStartedAt, subscribeToShardId); } } } @@ -906,21 +918,21 @@ public class FanOutRecordsPublisher implements RecordsPublisher { if (flow.isCancelled) { log.debug( "{}: [SubscriptionLifetime]: (RecordSubscription#onSubscribe) @ {} id: {} -- Subscription was cancelled before onSubscribe", - parent.shardId, connectionStartedAt, subscribeToShardId); + parent.streamAndShardId, connectionStartedAt, subscribeToShardId); } if (flow.isDisposed) { log.debug( "{}: [SubscriptionLifetime]: (RecordSubscription#onSubscribe) @ {} id: {} -- RecordFlow has been disposed cancelling subscribe", - parent.shardId, connectionStartedAt, subscribeToShardId); + parent.streamAndShardId, connectionStartedAt, subscribeToShardId); } log.debug( "{}: [SubscriptionLifetime]: (RecordSubscription#onSubscribe) @ {} id: {} -- RecordFlow requires cancelling", - parent.shardId, connectionStartedAt, subscribeToShardId); + parent.streamAndShardId, connectionStartedAt, subscribeToShardId); cancel(); } log.debug( "{}: [SubscriptionLifetime]: (RecordSubscription#onSubscribe) @ {} id: {} -- Outstanding: {} items so requesting an item", - parent.shardId, connectionStartedAt, subscribeToShardId, parent.availableQueueSpace); + parent.streamAndShardId, connectionStartedAt, subscribeToShardId, parent.availableQueueSpace); if (parent.availableQueueSpace > 0) { request(1); } @@ -933,7 +945,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { if (flow.shouldSubscriptionCancel()) { log.debug( "{}: [SubscriptionLifetime]: (RecordSubscription#onNext) @ {} id: {} -- RecordFlow requires cancelling", - parent.shardId, connectionStartedAt, subscribeToShardId); + parent.streamAndShardId, connectionStartedAt, subscribeToShardId); cancel(); return; } @@ -948,7 +960,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { @Override public void onError(Throwable t) { - log.debug("{}: [SubscriptionLifetime]: (RecordSubscription#onError) @ {} id: {} -- {}: {}", parent.shardId, + log.debug("{}: [SubscriptionLifetime]: (RecordSubscription#onError) @ {} id: {} -- {}: {}", parent.streamAndShardId, connectionStartedAt, subscribeToShardId, t.getClass().getName(), t.getMessage()); // @@ -961,7 +973,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { public void onComplete() { log.debug( "{}: [SubscriptionLifetime]: (RecordSubscription#onComplete) @ {} id: {} -- Allowing RecordFlow to call onComplete", - parent.shardId, connectionStartedAt, subscribeToShardId); + parent.streamAndShardId, connectionStartedAt, subscribeToShardId); } } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java index 1ea833a3..1605f941 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java @@ -63,7 +63,7 @@ public class KinesisDataFetcher { @NonNull private final KinesisAsyncClient kinesisClient; - @NonNull + @NonNull @Getter private final StreamIdentifier streamIdentifier; @NonNull private final String shardId; @@ -71,6 +71,7 @@ public class KinesisDataFetcher { @NonNull private final MetricsFactory metricsFactory; private final Duration maxFutureWait; + private final String streamAndShardId; @Deprecated public KinesisDataFetcher(KinesisAsyncClient kinesisClient, String streamName, String shardId, int maxRecords, MetricsFactory metricsFactory) { @@ -93,6 +94,7 @@ public class KinesisDataFetcher { this.maxRecords = maxRecords; this.metricsFactory = metricsFactory; this.maxFutureWait = maxFutureWait; + this.streamAndShardId = streamIdentifier.serialize() + ":" + shardId; } /** Note: This method has package level access for testing purposes. @@ -120,7 +122,7 @@ public class KinesisDataFetcher { try { return new AdvancingResult(getRecords(nextIterator)); } catch (ResourceNotFoundException e) { - log.info("Caught ResourceNotFoundException when fetching records for shard {}", shardId); + log.info("Caught ResourceNotFoundException when fetching records for shard {}", streamAndShardId); return TERMINAL_RESULT; } } else { @@ -182,14 +184,14 @@ public class KinesisDataFetcher { */ public void initialize(final String initialCheckpoint, final InitialPositionInStreamExtended initialPositionInStream) { - log.info("Initializing shard {} with {}", shardId, initialCheckpoint); + log.info("Initializing shard {} with {}", streamAndShardId, initialCheckpoint); advanceIteratorTo(initialCheckpoint, initialPositionInStream); isInitialized = true; } public void initialize(final ExtendedSequenceNumber initialCheckpoint, final InitialPositionInStreamExtended initialPositionInStream) { - log.info("Initializing shard {} with {}", shardId, initialCheckpoint.sequenceNumber()); + log.info("Initializing shard {} with {}", streamAndShardId, initialCheckpoint.sequenceNumber()); advanceIteratorTo(initialCheckpoint.sequenceNumber(), initialPositionInStream); isInitialized = true; } @@ -234,7 +236,7 @@ public class KinesisDataFetcher { throw new RetryableRetrievalException(e.getMessage(), e); } } catch (ResourceNotFoundException e) { - log.info("Caught ResourceNotFoundException when getting an iterator for shard {}", shardId, e); + log.info("Caught ResourceNotFoundException when getting an iterator for shard {}", streamAndShardId, e); nextIterator = null; } finally { MetricsUtil.addSuccessAndLatency(metricsScope, String.format("%s.%s", METRICS_PREFIX, "getShardIterator"), @@ -285,7 +287,7 @@ public class KinesisDataFetcher { throw exceptionManager.apply(e.getCause()); } catch (InterruptedException e) { // TODO: Check behavior - log.debug("Interrupt called on metod, shutdown initiated"); + log.debug("{} : Interrupt called on method, shutdown initiated", streamAndShardId); throw new RuntimeException(e); } catch (TimeoutException e) { throw new RetryableRetrievalException(e.getMessage(), e); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java index dcd5e043..ba8aa117 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java @@ -91,7 +91,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { private final DefaultGetRecordsCacheDaemon defaultGetRecordsCacheDaemon; private boolean started = false; private final String operation; - private final String shardId; + private final String streamAndShardId; private Subscriber subscriber; @VisibleForTesting @Getter private final PublisherSession publisherSession; @@ -135,11 +135,11 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { // Handle records delivery ack and execute nextEventDispatchAction. // This method is not thread-safe and needs to be called after acquiring a monitor. - void handleRecordsDeliveryAck(RecordsDeliveryAck recordsDeliveryAck, String shardId, Runnable nextEventDispatchAction) { + void handleRecordsDeliveryAck(RecordsDeliveryAck recordsDeliveryAck, String streamAndShardId, Runnable nextEventDispatchAction) { final PrefetchRecordsRetrieved recordsToCheck = peekNextRecord(); // Verify if the ack matches the head of the queue and evict it. if (recordsToCheck != null && recordsToCheck.batchUniqueIdentifier().equals(recordsDeliveryAck.batchUniqueIdentifier())) { - evictPublishedRecordAndUpdateDemand(shardId); + evictPublishedRecordAndUpdateDemand(streamAndShardId); nextEventDispatchAction.run(); } else { // Log and ignore any other ack received. As long as an ack is received for head of the queue @@ -148,21 +148,21 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { final BatchUniqueIdentifier peekedBatchUniqueIdentifier = recordsToCheck == null ? null : recordsToCheck.batchUniqueIdentifier(); log.info("{} : Received a stale notification with id {} instead of expected id {} at {}. Will ignore.", - shardId, recordsDeliveryAck.batchUniqueIdentifier(), peekedBatchUniqueIdentifier, Instant.now()); + streamAndShardId, recordsDeliveryAck.batchUniqueIdentifier(), peekedBatchUniqueIdentifier, Instant.now()); } } // Evict the published record from the prefetch queue. // This method is not thread-safe and needs to be called after acquiring a monitor. @VisibleForTesting - RecordsRetrieved evictPublishedRecordAndUpdateDemand(String shardId) { + RecordsRetrieved evictPublishedRecordAndUpdateDemand(String streamAndShardId) { final PrefetchRecordsRetrieved result = prefetchRecordsQueue.poll(); if (result != null) { updateDemandTrackersOnPublish(result); } else { log.info( "{}: No record batch found while evicting from the prefetch queue. This indicates the prefetch buffer" - + "was reset.", shardId); + + "was reset.", streamAndShardId); } return result; } @@ -222,7 +222,8 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { this.defaultGetRecordsCacheDaemon = new DefaultGetRecordsCacheDaemon(); Validate.notEmpty(operation, "Operation cannot be empty"); this.operation = operation; - this.shardId = shardId; + this.streamAndShardId = + this.getRecordsRetrievalStrategy.getDataFetcher().getStreamIdentifier().serialize() + ":" + shardId; } @Override @@ -234,7 +235,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { publisherSession.init(extendedSequenceNumber, initialPositionInStreamExtended); if (!started) { - log.info("{} : Starting prefetching thread.", shardId); + log.info("{} : Starting prefetching thread.", streamAndShardId); executorService.execute(defaultGetRecordsCacheDaemon); } started = true; @@ -304,9 +305,9 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { @Override public synchronized void notify(RecordsDeliveryAck recordsDeliveryAck) { - publisherSession.handleRecordsDeliveryAck(recordsDeliveryAck, shardId, () -> drainQueueForRequests()); + publisherSession.handleRecordsDeliveryAck(recordsDeliveryAck, streamAndShardId, () -> drainQueueForRequests()); // Take action based on the time spent by the event in queue. - takeDelayedDeliveryActionIfRequired(shardId, lastEventDeliveryTime, log); + takeDelayedDeliveryActionIfRequired(streamAndShardId, lastEventDeliveryTime, log); } // Note : Do not make this method synchronous as notify() will not be able to evict any entry from the queue. @@ -403,7 +404,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { public void run() { while (!isShutdown) { if (Thread.currentThread().isInterrupted()) { - log.warn("{} : Prefetch thread was interrupted.", shardId); + log.warn("{} : Prefetch thread was interrupted.", streamAndShardId); break; } @@ -411,7 +412,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { try { makeRetrievalAttempt(); } catch(PositionResetException pre) { - log.debug("{} : Position was reset while attempting to add item to queue.", shardId); + log.debug("{} : Position was reset while attempting to add item to queue.", streamAndShardId); } finally { resetLock.readLock().unlock(); } @@ -447,23 +448,23 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { } catch (PositionResetException pse) { throw pse; } catch (RetryableRetrievalException rre) { - log.info("{} : Timeout occurred while waiting for response from Kinesis. Will retry the request.", shardId); + log.info("{} : Timeout occurred while waiting for response from Kinesis. Will retry the request.", streamAndShardId); } catch (InterruptedException e) { - log.info("{} : Thread was interrupted, indicating shutdown was called on the cache.", shardId); + log.info("{} : Thread was interrupted, indicating shutdown was called on the cache.", streamAndShardId); } catch (ExpiredIteratorException e) { log.info("{} : records threw ExpiredIteratorException - restarting" - + " after greatest seqNum passed to customer", shardId, e); + + " after greatest seqNum passed to customer", streamAndShardId, e); scope.addData(EXPIRED_ITERATOR_METRIC, 1, StandardUnit.COUNT, MetricsLevel.SUMMARY); publisherSession.dataFetcher().restartIterator(); } catch (SdkException e) { - log.error("{} : Exception thrown while fetching records from Kinesis", shardId, e); + log.error("{} : Exception thrown while fetching records from Kinesis", streamAndShardId, e); } catch (Throwable e) { log.error("{} : Unexpected exception was thrown. This could probably be an issue or a bug." + " Please search for the exception/error online to check what is going on. If the " + "issue persists or is a recurring problem, feel free to open an issue on, " + - "https://github.com/awslabs/amazon-kinesis-client.", shardId, e); + "https://github.com/awslabs/amazon-kinesis-client.", streamAndShardId, e); } finally { MetricsUtil.endScope(scope); } @@ -475,7 +476,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { publisherSession.prefetchCounters().waitForConsumer(); } catch (InterruptedException ie) { log.info("{} : Thread was interrupted while waiting for the consumer. " + - "Shutdown has probably been started", shardId); + "Shutdown has probably been started", streamAndShardId); } } } @@ -522,14 +523,14 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { public synchronized void waitForConsumer() throws InterruptedException { if (!shouldGetNewRecords()) { - log.debug("{} : Queue is full waiting for consumer for {} ms", shardId, idleMillisBetweenCalls); + log.debug("{} : Queue is full waiting for consumer for {} ms", streamAndShardId, idleMillisBetweenCalls); this.wait(idleMillisBetweenCalls); } } public synchronized boolean shouldGetNewRecords() { if (log.isDebugEnabled()) { - log.debug("{} : Current Prefetch Counter States: {}", shardId, this.toString()); + log.debug("{} : Current Prefetch Counter States: {}", streamAndShardId, this.toString()); } return size < maxRecordsCount && byteSize < maxByteSize; } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java index 0cc50c2a..84851329 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java @@ -1156,6 +1156,45 @@ public class HierarchicalShardSyncerTest { // * Current leases: (4, 5, 7) // */ @Test + public void understandLeaseBehavior() { + final List shards = constructShardListForGraphA(); +// final List currentLeases = Arrays.asList(newLease("shardId-4"), newLease("shardId-5"), +// newLease("shardId-7")); + + final List currentLeases = Collections.emptyList(); + + final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, + INITIAL_POSITION_LATEST); + + System.out.println("Leases : " + newLeases.stream().map(lease -> lease.leaseKey() + ":" + lease.checkpoint()).collect( + Collectors.joining())); + + final Map expectedShardIdCheckpointMap = new HashMap<>(); + expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.TRIM_HORIZON); + expectedShardIdCheckpointMap.put("shardId-9", ExtendedSequenceNumber.TRIM_HORIZON); + expectedShardIdCheckpointMap.put("shardId-10", ExtendedSequenceNumber.TRIM_HORIZON); + expectedShardIdCheckpointMap.put("shardId-6", ExtendedSequenceNumber.LATEST); + + assertThat(newLeases.size(), equalTo(expectedShardIdCheckpointMap.size())); + for (Lease lease : newLeases) { + assertThat("Unexpected lease: " + lease, expectedShardIdCheckpointMap.containsKey(lease.leaseKey()), + equalTo(true)); + assertThat(lease.checkpoint(), equalTo(expectedShardIdCheckpointMap.get(lease.leaseKey()))); + } + } + + + /** + * Test CheckIfDescendantAndAddNewLeasesForAncestors (initial position Latest) + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Current leases: (4, 5, 7) + */ + @Test public void testDetermineNewLeasesToCreateSplitMergeLatest2() { final List shards = constructShardListForGraphA(); final List currentLeases = Arrays.asList(newLease("shardId-4"), newLease("shardId-5"), From a6f767bf96b8c5ef4bd17d406efeb6c4a20778c9 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Mon, 23 Mar 2020 13:13:49 -0700 Subject: [PATCH 2/4] Shard Syncer and Logging changes --- .../amazon/kinesis/coordinator/Scheduler.java | 13 ++++++------- .../amazon/kinesis/lifecycle/ShutdownTask.java | 17 +++++++++-------- .../kinesis/coordinator/SchedulerTest.java | 3 +++ .../leases/HierarchicalShardSyncerTest.java | 3 ++- .../kinesis/lifecycle/ConsumerStatesTest.java | 4 ++-- .../polling/PrefetchRecordsPublisherTest.java | 3 ++- .../polling/RecordsFetcherFactoryTest.java | 6 ++++++ 7 files changed, 30 insertions(+), 19 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index c8354eff..945700a7 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -135,7 +135,7 @@ public class Scheduler implements Runnable { private final boolean isMultiStreamMode; // TODO : halo : make sure we generate streamConfig if entry not present. private final Map currentStreamConfigMap; - private final MultiStreamTracker multiStreamTracker; + private MultiStreamTracker multiStreamTracker; private final long listShardsBackoffTimeMillis; private final int maxListShardsRetryAttempts; private final LeaseRefresher leaseRefresher; @@ -201,14 +201,13 @@ public class Scheduler implements Runnable { this.isMultiStreamMode = this.retrievalConfig.appStreamTracker().map( multiStreamTracker -> true, streamConfig -> false); this.currentStreamConfigMap = this.retrievalConfig.appStreamTracker().map( - multiStreamTracker -> - multiStreamTracker.streamConfigList().stream() - .collect(Collectors.toMap(sc -> sc.streamIdentifier(), sc -> sc)), + multiStreamTracker -> { + this.multiStreamTracker = multiStreamTracker; + return multiStreamTracker.streamConfigList().stream() + .collect(Collectors.toMap(sc -> sc.streamIdentifier(), sc -> sc)); + }, streamConfig -> Collections.singletonMap(streamConfig.streamIdentifier(), streamConfig)); - this.multiStreamTracker = this.retrievalConfig.appStreamTracker().map( - multiStreamTracker -> multiStreamTracker, - streamConfig -> null); this.maxInitializationAttempts = this.coordinatorConfig.maxInitializationAttempts(); this.metricsFactory = this.metricsConfig.metricsFactory(); // Determine leaseSerializer based on availability of MultiStreamTracker. diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index 81b954a6..2b4899e6 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -45,6 +45,7 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.function.Function; /** * Task for invoking the ShardRecordProcessor shutdown() callback. @@ -82,8 +83,8 @@ public class ShutdownTask implements ConsumerTask { private final TaskType taskType = TaskType.SHUTDOWN; - private String shardInfoId = shardInfo.streamIdentifierSerOpt().map(s -> s + ":" + shardInfo.shardId()) - .orElse(shardInfo.shardId()); + private static final Function shardInfoIdProvider = shardInfo -> shardInfo + .streamIdentifierSerOpt().map(s -> s + ":" + shardInfo.shardId()).orElse(shardInfo.shardId()); /* * Invokes ShardRecordProcessor shutdown() API. * (non-Javadoc) @@ -114,7 +115,7 @@ public class ShutdownTask implements ConsumerTask { if (CollectionUtils.isNullOrEmpty(latestShards) || !isShardInContextParentOfAny(latestShards)) { localReason = ShutdownReason.LEASE_LOST; dropLease(); - log.info("Forcing the lease to be lost before shutting down the consumer for Shard: " + shardInfoId); + log.info("Forcing the lease to be lost before shutting down the consumer for Shard: " + shardInfoIdProvider.apply(shardInfo)); } } @@ -126,7 +127,7 @@ public class ShutdownTask implements ConsumerTask { } log.debug("Invoking shutdown() for shard {}, concurrencyToken {}. Shutdown reason: {}", - shardInfoId, shardInfo.concurrencyToken(), localReason); + shardInfoIdProvider.apply(shardInfo), shardInfo.concurrencyToken(), localReason); final ShutdownInput shutdownInput = ShutdownInput.builder().shutdownReason(localReason) .checkpointer(recordProcessorCheckpointer).build(); final long startTime = System.currentTimeMillis(); @@ -137,7 +138,7 @@ public class ShutdownTask implements ConsumerTask { if (lastCheckpointValue == null || !lastCheckpointValue.equals(ExtendedSequenceNumber.SHARD_END)) { throw new IllegalArgumentException("Application didn't checkpoint at end of shard " - + shardInfoId + ". Application must checkpoint upon shard end. " + + + shardInfoIdProvider.apply(shardInfo) + ". Application must checkpoint upon shard end. " + "See ShardRecordProcessor.shardEnded javadocs for more information."); } } else { @@ -145,7 +146,7 @@ public class ShutdownTask implements ConsumerTask { } log.debug("Shutting down retrieval strategy."); recordsPublisher.shutdown(); - log.debug("Record processor completed shutdown() for shard {}", shardInfoId); + log.debug("Record processor completed shutdown() for shard {}", shardInfoIdProvider.apply(shardInfo)); } catch (Exception e) { applicationException = true; throw e; @@ -154,11 +155,11 @@ public class ShutdownTask implements ConsumerTask { } if (localReason == ShutdownReason.SHARD_END) { - log.debug("Looking for child shards of shard {}", shardInfoId); + log.debug("Looking for child shards of shard {}", shardInfoIdProvider.apply(shardInfo)); // create leases for the child shards hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseCoordinator.leaseRefresher(), initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, latestShards); - log.debug("Finished checking for child shards of shard {}", shardInfoId); + log.debug("Finished checking for child shards of shard {}", shardInfoIdProvider.apply(shardInfo)); } return new TaskResult(null); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java index bd8c28e8..425af67f 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java @@ -57,6 +57,7 @@ import org.mockito.runners.MockitoJUnitRunner; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.utils.Either; import software.amazon.kinesis.checkpoint.Checkpoint; import software.amazon.kinesis.checkpoint.CheckpointConfig; import software.amazon.kinesis.checkpoint.CheckpointFactory; @@ -173,6 +174,7 @@ public class SchedulerTest { when(leaseCoordinator.leaseRefresher()).thenReturn(dynamoDBLeaseRefresher); when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector); when(retrievalFactory.createGetRecordsCache(any(ShardInfo.class), any(MetricsFactory.class))).thenReturn(recordsPublisher); + when(shardDetector.streamIdentifier()).thenReturn(mock(StreamIdentifier.class)); scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, metricsConfig, processorConfig, retrievalConfig); @@ -641,6 +643,7 @@ public class SchedulerTest { shardSyncTaskManagerMap.put(streamConfig.streamIdentifier(), shardSyncTaskManager); shardDetectorMap.put(streamConfig.streamIdentifier(), shardDetector); when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector); + when(shardDetector.streamIdentifier()).thenReturn(streamConfig.streamIdentifier()); if(shardSyncFirstAttemptFailure) { when(shardDetector.listShards()) .thenThrow(new RuntimeException("Service Exception")) diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java index 84851329..5008b912 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java @@ -98,6 +98,7 @@ public class HierarchicalShardSyncerTest { @Before public void setup() { hierarchicalShardSyncer = new HierarchicalShardSyncer(); + when(shardDetector.streamIdentifier()).thenReturn(StreamIdentifier.singleStreamInstance("stream")); } private void setupMultiStream() { @@ -1155,7 +1156,7 @@ public class HierarchicalShardSyncerTest { // * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) // * Current leases: (4, 5, 7) // */ - @Test +// @Test public void understandLeaseBehavior() { final List shards = constructShardListForGraphA(); // final List currentLeases = Arrays.asList(newLease("shardId-4"), newLease("shardId-5"), diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java index 23fb5dad..5d8e302f 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java @@ -119,10 +119,10 @@ public class ConsumerStatesTest { maxListShardsRetryAttempts, shouldCallProcessRecordsEvenForEmptyRecordList, idleTimeInMillis, INITIAL_POSITION_IN_STREAM, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, shardDetector, new AggregatorUtil(), hierarchicalShardSyncer, metricsFactory); + when(shardInfo.shardId()).thenReturn("shardId-000000000000"); + when(shardInfo.streamIdentifierSerOpt()).thenReturn(Optional.of(StreamIdentifier.singleStreamInstance(STREAM_NAME).serialize())); consumer = spy(new ShardConsumer(recordsPublisher, executorService, shardInfo, logWarningForTaskAfterMillis, argument, taskExecutionListener, 0)); - - when(shardInfo.shardId()).thenReturn("shardId-000000000000"); when(recordProcessorCheckpointer.checkpointer()).thenReturn(checkpointer); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java index f5772aaf..a28ded63 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java @@ -76,6 +76,7 @@ import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; import software.amazon.awssdk.services.kinesis.model.Record; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.RequestDetails; +import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.lifecycle.ShardConsumerNotifyingSubscriber; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.metrics.NullMetricsFactory; @@ -120,7 +121,7 @@ public class PrefetchRecordsPublisherTest { @Before public void setup() { when(getRecordsRetrievalStrategy.getDataFetcher()).thenReturn(dataFetcher); - + when(dataFetcher.getStreamIdentifier()).thenReturn(StreamIdentifier.singleStreamInstance("testStream")); executorService = spy(Executors.newFixedThreadPool(1)); getRecordsCache = new PrefetchRecordsPublisher( MAX_SIZE, diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/RecordsFetcherFactoryTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/RecordsFetcherFactoryTest.java index 81ad5b6d..d6d8b6d5 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/RecordsFetcherFactoryTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/RecordsFetcherFactoryTest.java @@ -16,6 +16,7 @@ package software.amazon.kinesis.retrieval.polling; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Mockito.when; import org.junit.Before; import org.junit.Ignore; @@ -23,6 +24,7 @@ import org.junit.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; +import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.retrieval.DataFetchingStrategy; import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; @@ -37,11 +39,15 @@ public class RecordsFetcherFactoryTest { private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; @Mock private MetricsFactory metricsFactory; + @Mock + private KinesisDataFetcher kinesisDataFetcher; @Before public void setUp() { MockitoAnnotations.initMocks(this); recordsFetcherFactory = new SimpleRecordsFetcherFactory(); + when(getRecordsRetrievalStrategy.getDataFetcher()).thenReturn(kinesisDataFetcher); + when(kinesisDataFetcher.getStreamIdentifier()).thenReturn(StreamIdentifier.singleStreamInstance("stream")); } @Test From a7ae4d3e24d936b7f408f6b10e68f5787e8e4076 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Mon, 23 Mar 2020 13:23:59 -0700 Subject: [PATCH 3/4] Minor refactoring --- .../fanout/FanOutRetrievalFactory.java | 8 +++- .../leases/HierarchicalShardSyncerTest.java | 39 ------------------- 2 files changed, 6 insertions(+), 41 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java index f609c1d9..719d2e54 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java @@ -53,10 +53,14 @@ public class FanOutRetrievalFactory implements RetrievalFactory { final String streamName; if(streamIdentifierStr.isPresent()) { streamName = StreamIdentifier.multiStreamInstance(streamIdentifierStr.get()).streamName(); + return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(), + streamToConsumerArnMap.computeIfAbsent(streamName, consumerArnProvider::apply), + streamIdentifierStr.get()); } else { streamName = defaultStreamName; + return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(), + streamToConsumerArnMap.computeIfAbsent(streamName, consumerArnProvider::apply)); } - return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(), - streamToConsumerArnMap.computeIfAbsent(streamName, consumerArnProvider::apply)); + } } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java index 5008b912..9ca59edc 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java @@ -1146,45 +1146,6 @@ public class HierarchicalShardSyncerTest { } } -// /** -// * Test CheckIfDescendantAndAddNewLeasesForAncestors (initial position Latest) -// * Shard structure (each level depicts a stream segment): -// * 0 1 2 3 4 5- shards till epoch 102 -// * \ / \ / | | -// * 6 7 4 5- shards from epoch 103 - 205 -// * \ / | /\ -// * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) -// * Current leases: (4, 5, 7) -// */ -// @Test - public void understandLeaseBehavior() { - final List shards = constructShardListForGraphA(); -// final List currentLeases = Arrays.asList(newLease("shardId-4"), newLease("shardId-5"), -// newLease("shardId-7")); - - final List currentLeases = Collections.emptyList(); - - final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, - INITIAL_POSITION_LATEST); - - System.out.println("Leases : " + newLeases.stream().map(lease -> lease.leaseKey() + ":" + lease.checkpoint()).collect( - Collectors.joining())); - - final Map expectedShardIdCheckpointMap = new HashMap<>(); - expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.TRIM_HORIZON); - expectedShardIdCheckpointMap.put("shardId-9", ExtendedSequenceNumber.TRIM_HORIZON); - expectedShardIdCheckpointMap.put("shardId-10", ExtendedSequenceNumber.TRIM_HORIZON); - expectedShardIdCheckpointMap.put("shardId-6", ExtendedSequenceNumber.LATEST); - - assertThat(newLeases.size(), equalTo(expectedShardIdCheckpointMap.size())); - for (Lease lease : newLeases) { - assertThat("Unexpected lease: " + lease, expectedShardIdCheckpointMap.containsKey(lease.leaseKey()), - equalTo(true)); - assertThat(lease.checkpoint(), equalTo(expectedShardIdCheckpointMap.get(lease.leaseKey()))); - } - } - - /** * Test CheckIfDescendantAndAddNewLeasesForAncestors (initial position Latest) * Shard structure (each level depicts a stream segment): From 771bc914ebc976a23cb9fc8bb31bda3ade1ffe92 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Tue, 31 Mar 2020 15:40:43 -0700 Subject: [PATCH 4/4] Added unit test cases and addressed review comments --- .../InitialPositionInStreamExtended.java | 3 +- .../amazon/kinesis/common/StreamConfig.java | 1 - .../amazon/kinesis/coordinator/Scheduler.java | 15 ++- .../kinesis/coordinator/SchedulerTest.java | 117 ++++++++++++++++++ 4 files changed, 129 insertions(+), 7 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/InitialPositionInStreamExtended.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/InitialPositionInStreamExtended.java index 437abf28..b3bedd88 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/InitialPositionInStreamExtended.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/InitialPositionInStreamExtended.java @@ -14,6 +14,7 @@ */ package software.amazon.kinesis.common; +import lombok.EqualsAndHashCode; import lombok.ToString; import java.util.Date; @@ -22,7 +23,7 @@ import java.util.Date; * Class that houses the entities needed to specify the position in the stream from where a new application should * start. */ -@ToString +@ToString @EqualsAndHashCode public class InitialPositionInStreamExtended { private final InitialPositionInStream position; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamConfig.java index 3cf0eeb2..999182b6 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamConfig.java @@ -6,7 +6,6 @@ import lombok.experimental.Accessors; @Value @Accessors(fluent = true) public class StreamConfig { - // TODO: Consider having streamIdentifier as the unique identifier of this class. StreamIdentifier streamIdentifier; InitialPositionInStreamExtended initialPositionInStreamExtended; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index 945700a7..3f496f64 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -98,7 +98,7 @@ import software.amazon.kinesis.retrieval.RetrievalConfig; @Slf4j public class Scheduler implements Runnable { - private static final long NEW_STREAM_CHECK_INTERVAL_MILLIS = 10000L; + private static final long NEW_STREAM_CHECK_INTERVAL_MILLIS = 1 * 60 * 1000L; private SchedulerLog slog = new SchedulerLog(); @@ -418,11 +418,12 @@ public class Scheduler implements Runnable { * Sync all streams method. * @return streams that are being synced by this worker */ - private Set checkAndSyncStreamShardsAndLeases() + @VisibleForTesting + Set checkAndSyncStreamShardsAndLeases() throws DependencyException, ProvisionedThroughputException, InvalidStateException { final Set streamsSynced = new HashSet<>(); - if (isMultiStreamMode && (streamSyncWatch.elapsed(TimeUnit.MILLISECONDS) > NEW_STREAM_CHECK_INTERVAL_MILLIS)) { + if (shouldSyncStreamsNow()) { final Map newStreamConfigMap = new HashMap<>(); // Making an immutable copy newStreamConfigMap.putAll(multiStreamTracker.streamConfigList().stream() @@ -462,7 +463,12 @@ public class Scheduler implements Runnable { return streamsSynced; } - private Set syncStreamsFromLeaseTableOnAppInit() + @VisibleForTesting + boolean shouldSyncStreamsNow() { + return isMultiStreamMode && (streamSyncWatch.elapsed(TimeUnit.MILLISECONDS) > NEW_STREAM_CHECK_INTERVAL_MILLIS); + } + + private void syncStreamsFromLeaseTableOnAppInit() throws DependencyException, ProvisionedThroughputException, InvalidStateException { if (!leasesSyncedOnAppInit && isMultiStreamMode) { final Set streamIdentifiers = leaseCoordinator.leaseRefresher().listLeases().stream() @@ -475,7 +481,6 @@ public class Scheduler implements Runnable { } leasesSyncedOnAppInit = true; } - return Collections.emptySet(); } // When a stream is no longer needed to be tracked, return a default StreamConfig with LATEST for faster shard end. diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java index 425af67f..ff6eab1c 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java @@ -40,14 +40,20 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.RejectedExecutionException; import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; +import com.google.common.base.Joiner; +import com.google.common.collect.Sets; import io.reactivex.plugins.RxJavaPlugins; import lombok.RequiredArgsConstructor; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -58,6 +64,7 @@ import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.utils.Either; +import software.amazon.awssdk.utils.Validate; import software.amazon.kinesis.checkpoint.Checkpoint; import software.amazon.kinesis.checkpoint.CheckpointConfig; import software.amazon.kinesis.checkpoint.CheckpointFactory; @@ -76,6 +83,7 @@ import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.ShardSyncTaskManager; import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseRefresher; import software.amazon.kinesis.leases.exceptions.DependencyException; +import software.amazon.kinesis.leases.exceptions.InvalidStateException; import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; import software.amazon.kinesis.lifecycle.LifecycleConfig; import software.amazon.kinesis.lifecycle.ShardConsumer; @@ -375,6 +383,115 @@ public class SchedulerTest { } + @Test + public final void testMultiStreamNoStreamsAreSyncedWhenStreamsAreNotRefreshed() + throws DependencyException, ProvisionedThroughputException, InvalidStateException { + List streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig( + StreamIdentifier.multiStreamInstance( + Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)), + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST))) + .collect(Collectors.toCollection(LinkedList::new)); + List streamConfigList2 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig( + StreamIdentifier.multiStreamInstance( + Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)), + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST))) + .collect(Collectors.toCollection(LinkedList::new)); + retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName) + .retrievalFactory(retrievalFactory); + when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList1, streamConfigList2); + scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, + metricsConfig, processorConfig, retrievalConfig)); + when(scheduler.shouldSyncStreamsNow()).thenReturn(true); + Set syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases(); + Assert.assertTrue("SyncedStreams should be empty", syncedStreams.isEmpty()); + Assert.assertEquals(new HashSet(streamConfigList1), new HashSet(scheduler.currentStreamConfigMap().values())); + } + + @Test + public final void testMultiStreamOnlyNewStreamsAreSynced() + throws DependencyException, ProvisionedThroughputException, InvalidStateException { + List streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig( + StreamIdentifier.multiStreamInstance( + Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)), + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST))) + .collect(Collectors.toCollection(LinkedList::new)); + List streamConfigList2 = IntStream.range(1, 7).mapToObj(streamId -> new StreamConfig( + StreamIdentifier.multiStreamInstance( + Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)), + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST))) + .collect(Collectors.toCollection(LinkedList::new)); + retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName) + .retrievalFactory(retrievalFactory); + when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList1, streamConfigList2); + scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, + metricsConfig, processorConfig, retrievalConfig)); + when(scheduler.shouldSyncStreamsNow()).thenReturn(true); + Set syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases(); + Set expectedSyncedStreams = IntStream.range(5, 7).mapToObj(streamId -> StreamIdentifier.multiStreamInstance( + Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))).collect( + Collectors.toCollection(HashSet::new)); + Assert.assertEquals(expectedSyncedStreams, syncedStreams); + Assert.assertEquals(Sets.newHashSet(streamConfigList2), + Sets.newHashSet(scheduler.currentStreamConfigMap().values())); + } + + @Test + public final void testMultiStreamOnlyStaleStreamsAreSynced() + throws DependencyException, ProvisionedThroughputException, InvalidStateException { + List streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig( + StreamIdentifier.multiStreamInstance( + Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)), + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST))) + .collect(Collectors.toCollection(LinkedList::new)); + List streamConfigList2 = IntStream.range(3, 5).mapToObj(streamId -> new StreamConfig( + StreamIdentifier.multiStreamInstance( + Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)), + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST))) + .collect(Collectors.toCollection(LinkedList::new)); + retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName) + .retrievalFactory(retrievalFactory); + when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList1, streamConfigList2); + scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, + metricsConfig, processorConfig, retrievalConfig)); + when(scheduler.shouldSyncStreamsNow()).thenReturn(true); + Set syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases(); + Set expectedSyncedStreams = IntStream.range(1, 3).mapToObj(streamId -> StreamIdentifier.multiStreamInstance( + Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))).collect( + Collectors.toCollection(HashSet::new)); + Assert.assertEquals(expectedSyncedStreams, syncedStreams); + Assert.assertEquals(Sets.newHashSet(streamConfigList2), + Sets.newHashSet(scheduler.currentStreamConfigMap().values())); + } + + @Test + public final void testMultiStreamSyncOnlyNewAndStaleStreamsAreSynced() + throws DependencyException, ProvisionedThroughputException, InvalidStateException { + List streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig( + StreamIdentifier.multiStreamInstance( + Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)), + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST))) + .collect(Collectors.toCollection(LinkedList::new)); + List streamConfigList2 = IntStream.range(3, 7).mapToObj(streamId -> new StreamConfig( + StreamIdentifier.multiStreamInstance( + Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)), + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST))) + .collect(Collectors.toCollection(LinkedList::new)); + retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName) + .retrievalFactory(retrievalFactory); + when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList1, streamConfigList2); + scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, + metricsConfig, processorConfig, retrievalConfig)); + when(scheduler.shouldSyncStreamsNow()).thenReturn(true); + Set syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases(); + Set expectedSyncedStreams = IntStream.concat(IntStream.range(1, 3), IntStream.range(5, 7)) + .mapToObj(streamId -> StreamIdentifier.multiStreamInstance( + Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))) + .collect(Collectors.toCollection(HashSet::new)); + Assert.assertEquals(expectedSyncedStreams, syncedStreams); + Assert.assertEquals(Sets.newHashSet(streamConfigList2), + Sets.newHashSet(scheduler.currentStreamConfigMap().values())); + } + @Test public final void testSchedulerShutdown() { scheduler.shutdown();