diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/DiagnosticUtils.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/DiagnosticUtils.java index db0393e1..37eea1a3 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/DiagnosticUtils.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/DiagnosticUtils.java @@ -28,11 +28,11 @@ public class DiagnosticUtils { /** * Util for RecordPublisher to measure the event delivery latency of the executor service and take appropriate action. - * @param shardId of the shard that is having delayed delivery + * @param resourceIdentifier of the shard that is having delayed delivery * @param enqueueTimestamp of the event submitted to the executor service * @param log Slf4j Logger from RecordPublisher to log the events */ - public static void takeDelayedDeliveryActionIfRequired(String shardId, Instant enqueueTimestamp, Logger log) { + public static void takeDelayedDeliveryActionIfRequired(String resourceIdentifier, Instant enqueueTimestamp, Logger log) { final long durationBetweenEnqueueAndAckInMillis = Duration .between(enqueueTimestamp, Instant.now()).toMillis(); if (durationBetweenEnqueueAndAckInMillis > MAX_TIME_BETWEEN_REQUEST_RESPONSE / 3) { @@ -41,9 +41,9 @@ public class DiagnosticUtils { "{}: Record delivery time to shard consumer is high at {} millis. Check the ExecutorStateEvent logs" + " to see the state of the executor service. Also check if the RecordProcessor's processing " + "time is high. ", - shardId, durationBetweenEnqueueAndAckInMillis); + resourceIdentifier, durationBetweenEnqueueAndAckInMillis); } else if (log.isDebugEnabled()) { - log.debug("{}: Record delivery time to shard consumer is {} millis", shardId, + log.debug("{}: Record delivery time to shard consumer is {} millis", resourceIdentifier, durationBetweenEnqueueAndAckInMillis); } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/InitialPositionInStreamExtended.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/InitialPositionInStreamExtended.java index 437abf28..b3bedd88 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/InitialPositionInStreamExtended.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/InitialPositionInStreamExtended.java @@ -14,6 +14,7 @@ */ package software.amazon.kinesis.common; +import lombok.EqualsAndHashCode; import lombok.ToString; import java.util.Date; @@ -22,7 +23,7 @@ import java.util.Date; * Class that houses the entities needed to specify the position in the stream from where a new application should * start. */ -@ToString +@ToString @EqualsAndHashCode public class InitialPositionInStreamExtended { private final InitialPositionInStream position; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java index 1d02f47c..8acccce9 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java @@ -18,6 +18,7 @@ import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.Validate; +import software.amazon.kinesis.common.StreamConfig; import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException; import software.amazon.kinesis.leases.ShardSyncTaskManager; @@ -30,6 +31,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Function; /** * The top level orchestrator for coordinating the periodic shard sync related @@ -44,21 +46,24 @@ class PeriodicShardSyncManager { private final String workerId; private final LeaderDecider leaderDecider; - private final Map streamToShardSyncTaskManagerMap; + private final Map currentStreamConfigMap; + private final Function shardSyncTaskManagerProvider; private final ScheduledExecutorService shardSyncThreadPool; private boolean isRunning; - PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, Map streamToShardSyncTaskManagerMap) { - this(workerId, leaderDecider, streamToShardSyncTaskManagerMap, Executors.newSingleThreadScheduledExecutor()); + PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, Map currentStreamConfigMap, + Function shardSyncTaskManagerProvider) { + this(workerId, leaderDecider, currentStreamConfigMap, shardSyncTaskManagerProvider, Executors.newSingleThreadScheduledExecutor()); } - PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, Map streamToShardSyncTaskManagerMap, - ScheduledExecutorService shardSyncThreadPool) { + PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, Map currentStreamConfigMap, + Function shardSyncTaskManagerProvider, ScheduledExecutorService shardSyncThreadPool) { Validate.notBlank(workerId, "WorkerID is required to initialize PeriodicShardSyncManager."); Validate.notNull(leaderDecider, "LeaderDecider is required to initialize PeriodicShardSyncManager."); this.workerId = workerId; this.leaderDecider = leaderDecider; - this.streamToShardSyncTaskManagerMap = streamToShardSyncTaskManagerMap; + this.currentStreamConfigMap = currentStreamConfigMap; + this.shardSyncTaskManagerProvider = shardSyncTaskManagerProvider; this.shardSyncThreadPool = shardSyncThreadPool; } @@ -85,8 +90,13 @@ class PeriodicShardSyncManager { * @return the result of the task */ public synchronized void syncShardsOnce() throws Exception { - for (Map.Entry mapEntry : streamToShardSyncTaskManagerMap.entrySet()) { - final ShardSyncTaskManager shardSyncTaskManager = mapEntry.getValue(); + // TODO: Resume the shard sync from failed stream in the next attempt, to avoid syncing + // TODO: for already synced streams + for(Map.Entry streamConfigEntry : currentStreamConfigMap.entrySet()) { + final StreamIdentifier streamIdentifier = streamConfigEntry.getKey(); + log.info("Syncing Kinesis shard info for " + streamIdentifier); + final StreamConfig streamConfig = streamConfigEntry.getValue(); + final ShardSyncTaskManager shardSyncTaskManager = shardSyncTaskManagerProvider.apply(streamConfig); final TaskResult taskResult = shardSyncTaskManager.executeShardSyncTask(); if (taskResult.getException() != null) { throw taskResult.getException(); @@ -106,8 +116,8 @@ class PeriodicShardSyncManager { private void runShardSync() { if (leaderDecider.isLeader(workerId)) { - for (Map.Entry mapEntry : streamToShardSyncTaskManagerMap.entrySet()) { - final ShardSyncTaskManager shardSyncTaskManager = mapEntry.getValue(); + for (Map.Entry streamConfigEntry : currentStreamConfigMap.entrySet()) { + final ShardSyncTaskManager shardSyncTaskManager = shardSyncTaskManagerProvider.apply(streamConfigEntry.getValue()); if (!shardSyncTaskManager.syncShardAndLeaseInfo()) { log.warn("Failed to submit shard sync task for stream {}. This could be due to the previous shard sync task not finished.", shardSyncTaskManager.shardDetector().streamIdentifier().streamName()); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index ff9d845e..cb4b4579 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -19,6 +19,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; @@ -34,6 +35,7 @@ import java.util.function.Function; import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Stopwatch; import io.reactivex.plugins.RxJavaPlugins; import lombok.AccessLevel; import lombok.Getter; @@ -44,8 +46,8 @@ import lombok.extern.slf4j.Slf4j; import software.amazon.awssdk.utils.Validate; import software.amazon.kinesis.checkpoint.CheckpointConfig; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; +import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; -import software.amazon.kinesis.leases.HierarchicalShardSyncer; import software.amazon.kinesis.common.StreamConfig; import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.leases.Lease; @@ -53,10 +55,13 @@ import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseManagementConfig; import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.LeaseSerializer; +import software.amazon.kinesis.leases.MultiStreamLease; import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.ShardPrioritization; +import software.amazon.kinesis.leases.ShardSyncTask; import software.amazon.kinesis.leases.ShardSyncTaskManager; +import software.amazon.kinesis.leases.HierarchicalShardSyncer; import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseCoordinator; import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseSerializer; import software.amazon.kinesis.leases.dynamodb.DynamoDBMultiStreamLeaseSerializer; @@ -70,10 +75,13 @@ import software.amazon.kinesis.lifecycle.ShardConsumerArgument; import software.amazon.kinesis.lifecycle.ShardConsumerShutdownNotification; import software.amazon.kinesis.lifecycle.ShutdownNotification; import software.amazon.kinesis.lifecycle.ShutdownReason; +import software.amazon.kinesis.lifecycle.TaskResult; import software.amazon.kinesis.metrics.CloudWatchMetricsFactory; +import software.amazon.kinesis.metrics.MetricsCollectingTaskDecorator; import software.amazon.kinesis.metrics.MetricsConfig; import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.processor.Checkpointer; +import software.amazon.kinesis.processor.MultiStreamTracker; import software.amazon.kinesis.processor.ProcessorConfig; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; import software.amazon.kinesis.processor.ShutdownNotificationAware; @@ -97,6 +105,8 @@ public class Scheduler implements Runnable { private static final long MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 1 * 1000L; private static final long MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 30 * 1000L; private static final long HASH_RANGE_COVERAGE_CHECK_FREQUENCY_MILLIS = 5000L; + private static final long NEW_STREAM_CHECK_INTERVAL_MILLIS = 1 * 60 * 1000L; + private SchedulerLog slog = new SchedulerLog(); private final CheckpointConfig checkpointConfig; @@ -119,8 +129,8 @@ public class Scheduler implements Runnable { private final DiagnosticEventHandler diagnosticEventHandler; // private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; private final LeaseCoordinator leaseCoordinator; - private final Function shardSyncTaskManagerProvider; - private final Map streamToShardSyncTaskManagerMap = new HashMap<>(); + private final Function shardSyncTaskManagerProvider; + private final Map streamToShardSyncTaskManagerMap = new HashMap<>(); private final PeriodicShardSyncManager leaderElectedPeriodicShardSyncManager; private final ShardPrioritization shardPrioritization; private final boolean cleanupLeasesUponShardCompletion; @@ -131,11 +141,13 @@ public class Scheduler implements Runnable { private final long failoverTimeMillis; private final long taskBackoffTimeMillis; private final boolean isMultiStreamMode; + // TODO : halo : make sure we generate streamConfig if entry not present. private final Map currentStreamConfigMap; + private MultiStreamTracker multiStreamTracker; private final long listShardsBackoffTimeMillis; private final int maxListShardsRetryAttempts; private final LeaseRefresher leaseRefresher; - private final Function shardDetectorProvider; + private final Function shardDetectorProvider; private final boolean ignoreUnexpetedChildShards; private final AggregatorUtil aggregatorUtil; private final HierarchicalShardSyncer hierarchicalShardSyncer; @@ -152,6 +164,9 @@ public class Scheduler implements Runnable { private final Object lock = new Object(); + private Stopwatch streamSyncWatch = Stopwatch.createUnstarted(); + private boolean leasesSyncedOnAppInit = false; + /** * Used to ensure that only one requestedShutdown is in progress at a time. */ @@ -195,9 +210,11 @@ public class Scheduler implements Runnable { this.isMultiStreamMode = this.retrievalConfig.appStreamTracker().map( multiStreamTracker -> true, streamConfig -> false); this.currentStreamConfigMap = this.retrievalConfig.appStreamTracker().map( - multiStreamTracker -> - multiStreamTracker.streamConfigList().stream() - .collect(Collectors.toMap(sc -> sc.streamIdentifier(), sc -> sc)), + multiStreamTracker -> { + this.multiStreamTracker = multiStreamTracker; + return multiStreamTracker.streamConfigList().stream() + .collect(Collectors.toMap(sc -> sc.streamIdentifier(), sc -> sc)); + }, streamConfig -> Collections.singletonMap(streamConfig.streamIdentifier(), streamConfig)); this.maxInitializationAttempts = this.coordinatorConfig.maxInitializationAttempts(); @@ -227,9 +244,9 @@ public class Scheduler implements Runnable { this.diagnosticEventHandler = new DiagnosticEventLogger(); // TODO : Halo : Handle case of no StreamConfig present in streamConfigList() for the supplied streamName. // TODO : Pass the immutable map here instead of using mst.streamConfigList() - this.shardSyncTaskManagerProvider = streamIdentifier -> this.leaseManagementConfig + this.shardSyncTaskManagerProvider = streamConfig -> this.leaseManagementConfig .leaseManagementFactory(leaseSerializer, isMultiStreamMode) - .createShardSyncTaskManager(this.metricsFactory, this.currentStreamConfigMap.get(streamIdentifier)); + .createShardSyncTaskManager(this.metricsFactory, streamConfig); this.shardPrioritization = this.coordinatorConfig.shardPrioritization(); this.cleanupLeasesUponShardCompletion = this.leaseManagementConfig.cleanupLeasesUponShardCompletion(); this.skipShardSyncAtWorkerInitializationIfLeasesExist = @@ -254,14 +271,15 @@ public class Scheduler implements Runnable { // this.maxGetRecordsThreadPool = this.retrievalConfig.maxGetRecordsThreadPool(); this.listShardsBackoffTimeMillis = this.retrievalConfig.listShardsBackoffTimeInMillis(); this.maxListShardsRetryAttempts = this.retrievalConfig.maxListShardsRetryAttempts(); - this.shardDetectorProvider = streamIdentifier -> createOrGetShardSyncTaskManager(streamIdentifier).shardDetector(); + this.shardDetectorProvider = streamConfig -> createOrGetShardSyncTaskManager(streamConfig).shardDetector(); this.ignoreUnexpetedChildShards = this.leaseManagementConfig.ignoreUnexpectedChildShards(); this.aggregatorUtil = this.lifecycleConfig.aggregatorUtil(); // TODO : Halo : Check if this needs to be per stream. this.hierarchicalShardSyncer = leaseManagementConfig.hierarchicalShardSyncer(isMultiStreamMode); this.schedulerInitializationBackoffTimeMillis = this.coordinatorConfig.schedulerInitializationBackoffTimeMillis(); - this.leaderElectedPeriodicShardSyncManager = new PeriodicShardSyncManager(leaseManagementConfig.workerIdentifier(), - leaderDecider, streamToShardSyncTaskManagerMap); + this.leaderElectedPeriodicShardSyncManager = new PeriodicShardSyncManager( + leaseManagementConfig.workerIdentifier(), leaderDecider, currentStreamConfigMap, + shardSyncTaskManagerProvider); } /** @@ -303,16 +321,10 @@ public class Scheduler implements Runnable { log.info("Initializing LeaseCoordinator"); leaseCoordinator.initialize(); + TaskResult result; if (!skipShardSyncAtWorkerInitializationIfLeasesExist || leaseRefresher.isLeaseTableEmpty()) { - // TODO: Resume the shard sync from failed stream in the next attempt, to avoid syncing - // TODO: for already synced streams if (shouldInitiateLeaseSync()) { log.info("Worker {} is initiating the lease sync.", leaseManagementConfig.workerIdentifier()); - for (Map.Entry streamConfigEntry : currentStreamConfigMap.entrySet()) { - final StreamIdentifier streamIdentifier = streamConfigEntry.getKey(); - createOrGetShardSyncTaskManager(streamIdentifier); - log.info("Creating shard sync task for " + streamIdentifier); - } leaderElectedPeriodicShardSyncManager.syncShardsOnce(); } } else { @@ -332,6 +344,7 @@ public class Scheduler implements Runnable { // TODO: enable periodicShardSync after https://github.com/jushkem/amazon-kinesis-client/pull/2 is merged // TODO: Determine if waitUntilHashRangeCovered() is needed. //waitUntilHashRangeCovered(); + streamSyncWatch.start(); isDone = true; } catch (LeasingException e) { log.error("Caught exception when initializing LeaseCoordinator", e); @@ -404,14 +417,20 @@ public class Scheduler implements Runnable { for (ShardInfo completedShard : completedShards) { final StreamIdentifier streamIdentifier = getStreamIdentifier(completedShard.streamIdentifierSerOpt()); - if (createOrGetShardSyncTaskManager(streamIdentifier).syncShardAndLeaseInfo()) { - log.info("Found completed shard, initiated new ShardSyncTak for " + completedShard.toString()); + final StreamConfig streamConfig = currentStreamConfigMap + .getOrDefault(streamIdentifier, getDefaultStreamConfig(streamIdentifier)); + if (createOrGetShardSyncTaskManager(streamConfig).syncShardAndLeaseInfo()) { + log.info("{} : Found completed shard, initiated new ShardSyncTak for {} ", + streamIdentifier.serialize(), completedShard.toString()); } } // clean up shard consumers for unassigned shards cleanupShardConsumers(assignedShards); + // check for new streams and sync with the scheduler state + checkAndSyncStreamShardsAndLeases(); + logExecutorState(); slog.info("Sleeping ..."); Thread.sleep(shardConsumerDispatchPollIntervalMillis); @@ -427,6 +446,82 @@ public class Scheduler implements Runnable { slog.resetInfoLogging(); } + + /** + * Note: This method has package level access solely for testing purposes. + * Sync all streams method. + * @return streams that are being synced by this worker + */ + @VisibleForTesting + Set checkAndSyncStreamShardsAndLeases() + throws DependencyException, ProvisionedThroughputException, InvalidStateException { + final Set streamsSynced = new HashSet<>(); + + if (shouldSyncStreamsNow()) { + final Map newStreamConfigMap = new HashMap<>(); + // Making an immutable copy + newStreamConfigMap.putAll(multiStreamTracker.streamConfigList().stream() + .collect(Collectors.toMap(sc -> sc.streamIdentifier(), sc -> sc))); + + // This is done to ensure that we clean up the stale streams lingering in the lease table. + syncStreamsFromLeaseTableOnAppInit(); + + for (StreamIdentifier streamIdentifier : newStreamConfigMap.keySet()) { + if (!currentStreamConfigMap.containsKey(streamIdentifier)) { + log.info("Found new stream to process: " + streamIdentifier + ". Syncing shards of that stream."); + ShardSyncTaskManager shardSyncTaskManager = createOrGetShardSyncTaskManager(newStreamConfigMap.get(streamIdentifier)); + shardSyncTaskManager.syncShardAndLeaseInfo(); + currentStreamConfigMap.put(streamIdentifier, newStreamConfigMap.get(streamIdentifier)); + streamsSynced.add(streamIdentifier); + } else { + if (log.isDebugEnabled()) { + log.debug(streamIdentifier + " is already being processed - skipping shard sync."); + } + } + } + + // TODO: Remove assumption that each Worker gets the full list of streams + Iterator currentStreamConfigIter = currentStreamConfigMap.keySet().iterator(); + while (currentStreamConfigIter.hasNext()) { + StreamIdentifier streamIdentifier = currentStreamConfigIter.next(); + if (!newStreamConfigMap.containsKey(streamIdentifier)) { + log.info("Found old/deleted stream: " + streamIdentifier + ". Syncing shards of that stream."); + ShardSyncTaskManager shardSyncTaskManager = createOrGetShardSyncTaskManager(currentStreamConfigMap.get(streamIdentifier)); + shardSyncTaskManager.syncShardAndLeaseInfo(); + currentStreamConfigIter.remove(); + streamsSynced.add(streamIdentifier); + } + } + streamSyncWatch.reset().start(); + } + return streamsSynced; + } + + @VisibleForTesting + boolean shouldSyncStreamsNow() { + return isMultiStreamMode && (streamSyncWatch.elapsed(TimeUnit.MILLISECONDS) > NEW_STREAM_CHECK_INTERVAL_MILLIS); + } + + private void syncStreamsFromLeaseTableOnAppInit() + throws DependencyException, ProvisionedThroughputException, InvalidStateException { + if (!leasesSyncedOnAppInit && isMultiStreamMode) { + final Set streamIdentifiers = leaseCoordinator.leaseRefresher().listLeases().stream() + .map(lease -> StreamIdentifier.multiStreamInstance(((MultiStreamLease) lease).streamIdentifier())) + .collect(Collectors.toSet()); + for (StreamIdentifier streamIdentifier : streamIdentifiers) { + if (!currentStreamConfigMap.containsKey(streamIdentifier)) { + currentStreamConfigMap.put(streamIdentifier, getDefaultStreamConfig(streamIdentifier)); + } + } + leasesSyncedOnAppInit = true; + } + } + + // When a stream is no longer needed to be tracked, return a default StreamConfig with LATEST for faster shard end. + private StreamConfig getDefaultStreamConfig(StreamIdentifier streamIdentifier) { + return new StreamConfig(streamIdentifier, InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)); + } + /** * Returns whether worker can shutdown immediately. Note that this method is called from Worker's {{@link #run()} * method before every loop run, so method must do minimum amount of work to not impact shard processing timings. @@ -629,7 +724,8 @@ public class Scheduler implements Runnable { if (!firstItem) { builder.append(", "); } - builder.append(shardInfo.shardId()); + builder.append(shardInfo.streamIdentifierSerOpt().map(s -> s + ":" + shardInfo.shardId()) + .orElse(shardInfo.shardId())); firstItem = false; } slog.info("Current stream shard assignments: " + builder.toString()); @@ -665,8 +761,8 @@ public class Scheduler implements Runnable { return consumer; } - private ShardSyncTaskManager createOrGetShardSyncTaskManager(StreamIdentifier streamIdentifier) { - return streamToShardSyncTaskManagerMap.computeIfAbsent(streamIdentifier, s -> shardSyncTaskManagerProvider.apply(s)); + private ShardSyncTaskManager createOrGetShardSyncTaskManager(StreamConfig streamConfig) { + return streamToShardSyncTaskManagerMap.computeIfAbsent(streamConfig, s -> shardSyncTaskManagerProvider.apply(s)); } protected ShardConsumer buildConsumer(@NonNull final ShardInfo shardInfo, @@ -678,8 +774,10 @@ public class Scheduler implements Runnable { // get the default stream name for the single stream application. final StreamIdentifier streamIdentifier = getStreamIdentifier(shardInfo.streamIdentifierSerOpt()); // Irrespective of single stream app or multi stream app, streamConfig should always be available. - final StreamConfig streamConfig = currentStreamConfigMap.get(streamIdentifier); - Validate.notNull(streamConfig, "StreamConfig should not be empty"); + // If we have a shardInfo, that is not present in currentStreamConfigMap for whatever reason, then return default stream config + // to gracefully complete the reading. + final StreamConfig streamConfig = currentStreamConfigMap.getOrDefault(streamIdentifier, getDefaultStreamConfig(streamIdentifier)); + Validate.notNull(streamConfig, "StreamConfig should not be null"); ShardConsumerArgument argument = new ShardConsumerArgument(shardInfo, streamConfig.streamIdentifier(), leaseCoordinator, @@ -698,7 +796,7 @@ public class Scheduler implements Runnable { streamConfig.initialPositionInStreamExtended(), cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, - shardDetectorProvider.apply(streamConfig.streamIdentifier()), + shardDetectorProvider.apply(streamConfig), aggregatorUtil, hierarchicalShardSyncer, metricsFactory); @@ -723,7 +821,8 @@ public class Scheduler implements Runnable { ShardConsumer consumer = shardInfoShardConsumerMap.get(shard); if (consumer.leaseLost()) { shardInfoShardConsumerMap.remove(shard); - log.debug("Removed consumer for {} as lease has been lost", shard.shardId()); + log.debug("Removed consumer for {} as lease has been lost", + shard.streamIdentifierSerOpt().map(s -> s + ":" + shard.shardId()).orElse(shard.shardId())); } else { consumer.executeLifecycle(); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java index 71567921..ecd64952 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java @@ -68,6 +68,8 @@ public class HierarchicalShardSyncer { private final boolean isMultiStreamMode; + private String streamIdentifier = ""; + public HierarchicalShardSyncer() { isMultiStreamMode = false; } @@ -104,6 +106,7 @@ public class HierarchicalShardSyncer { final MetricsScope scope, final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards, final boolean garbageCollectLeases, final boolean isLeaseTableEmpty) throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { + this.streamIdentifier = shardDetector.streamIdentifier().serialize(); final List latestShards = isLeaseTableEmpty ? getShardListAtInitialPosition(shardDetector, initialPosition) : getShardList(shardDetector); checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, latestShards, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, garbageCollectLeases, @@ -117,10 +120,11 @@ public class HierarchicalShardSyncer { final MetricsScope scope, final boolean garbageCollectLeases, final boolean isLeaseTableEmpty) throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { + this.streamIdentifier = shardDetector.streamIdentifier().serialize(); //TODO: Need to add multistream support for this https://sim.amazon.com/issues/KinesisLTR-191 if (!CollectionUtils.isNullOrEmpty(latestShards)) { - log.debug("Num shards: {}", latestShards.size()); + log.debug("{} - Num shards: {}", streamIdentifier, latestShards.size()); } final Map shardIdToShardMap = constructShardIdToShardMap(latestShards); @@ -138,7 +142,7 @@ public class HierarchicalShardSyncer { new NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap); final List newLeasesToCreate = determineNewLeasesToCreate(leaseSynchronizer, latestShards, currentLeases, initialPosition, inconsistentShardIds, multiStreamArgs); - log.debug("Num new leases to create: {}", newLeasesToCreate.size()); + log.debug("{} - Num new leases to create: {}", streamIdentifier, newLeasesToCreate.size()); for (Lease lease : newLeasesToCreate) { long startTime = System.currentTimeMillis(); boolean success = false; @@ -231,7 +235,7 @@ public class HierarchicalShardSyncer { for (String shardId : shardIdsOfClosedShards) { final Shard shard = shardIdToShardMap.get(shardId); if (shard == null) { - log.info("Shard {} is not present in Kinesis anymore.", shardId); + log.info("{} : Shard {} is not present in Kinesis anymore.", streamIdentifier, shardId); continue; } @@ -426,7 +430,7 @@ public class HierarchicalShardSyncer { final Map shardIdToShardMapOfAllKinesisShards, final Map shardIdToLeaseMapOfNewShards, final Map memoizationContext, final MultiStreamArgs multiStreamArgs) { - + final String streamIdentifier = getStreamIdentifier(multiStreamArgs); final Boolean previousValue = memoizationContext.get(shardId); if (previousValue != null) { return previousValue; @@ -451,9 +455,9 @@ public class HierarchicalShardSyncer { memoizationContext, multiStreamArgs)) { isDescendant = true; descendantParentShardIds.add(parentShardId); - log.debug("Parent shard {} is a descendant.", parentShardId); + log.debug("{} : Parent shard {} is a descendant.", streamIdentifier, parentShardId); } else { - log.debug("Parent shard {} is NOT a descendant.", parentShardId); + log.debug("{} : Parent shard {} is NOT a descendant.", streamIdentifier, parentShardId); } } @@ -461,7 +465,7 @@ public class HierarchicalShardSyncer { if (isDescendant) { for (String parentShardId : parentShardIds) { if (!shardIdsOfCurrentLeases.contains(parentShardId)) { - log.debug("Need to create a lease for shardId {}", parentShardId); + log.debug("{} : Need to create a lease for shardId {}", streamIdentifier, parentShardId); Lease lease = shardIdToLeaseMapOfNewShards.get(parentShardId); if (lease == null) { lease = multiStreamArgs.isMultiStreamMode() ? @@ -555,6 +559,7 @@ public class HierarchicalShardSyncer { final List trackedLeases, final LeaseRefresher leaseRefresher, final MultiStreamArgs multiStreamArgs) throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException { + final String streamIdentifier = getStreamIdentifier(multiStreamArgs); final Set kinesisShards = shards.stream().map(Shard::shardId).collect(Collectors.toSet()); // Check if there are leases for non-existent shards @@ -562,14 +567,15 @@ public class HierarchicalShardSyncer { .filter(lease -> isCandidateForCleanup(lease, kinesisShards, multiStreamArgs)).collect(Collectors.toList()); if (!CollectionUtils.isNullOrEmpty(garbageLeases)) { - log.info("Found {} candidate leases for cleanup. Refreshing list of" - + " Kinesis shards to pick up recent/latest shards", garbageLeases.size()); + log.info("{} : Found {} candidate leases for cleanup. Refreshing list of" + + " Kinesis shards to pick up recent/latest shards", streamIdentifier, garbageLeases.size()); final Set currentKinesisShardIds = getShardList(shardDetector).stream().map(Shard::shardId) .collect(Collectors.toSet()); for (Lease lease : garbageLeases) { if (isCandidateForCleanup(lease, currentKinesisShardIds, multiStreamArgs)) { - log.info("Deleting lease for shard {} as it is not present in Kinesis stream.", lease.leaseKey()); + log.info("{} : Deleting lease for shard {} as it is not present in Kinesis stream.", + streamIdentifier, lease.leaseKey()); leaseRefresher.deleteLease(lease); } } @@ -589,14 +595,16 @@ public class HierarchicalShardSyncer { static boolean isCandidateForCleanup(final Lease lease, final Set currentKinesisShardIds, final MultiStreamArgs multiStreamArgs) throws KinesisClientLibIOException { - boolean isCandidateForCleanup = true; + final String streamIdentifier = getStreamIdentifier(multiStreamArgs); + + boolean isCandidateForCleanup = true; final String shardId = shardIdFromLeaseDeducer.apply(lease, multiStreamArgs); if (currentKinesisShardIds.contains(shardId)) { isCandidateForCleanup = false; } else { - log.info("Found lease for non-existent shard: {}. Checking its parent shards", shardId); + log.info("{} : Found lease for non-existent shard: {}. Checking its parent shards", streamIdentifier, shardId); final Set parentShardIds = lease.parentShardIds(); for (String parentShardId : parentShardIds) { @@ -605,7 +613,7 @@ public class HierarchicalShardSyncer { if (currentKinesisShardIds.contains(parentShardId)) { final String message = String.format("Parent shard %s exists but not the child shard %s", parentShardId, shardId); - log.info(message); + log.info("{} : {}", streamIdentifier, message); throw new KinesisClientLibIOException(message); } } @@ -693,8 +701,8 @@ public class HierarchicalShardSyncer { } if (okayToDelete) { - log.info("Deleting lease for shard {} as it has been completely processed and processing of child " - + "shards has begun.", shardIdFromLeaseDeducer.apply(leaseForClosedShard, multiStreamArgs)); + log.info("{} : Deleting lease for shard {} as it has been completely processed and processing of child " + + "shards has begun.", streamIdentifier, shardIdFromLeaseDeducer.apply(leaseForClosedShard, multiStreamArgs)); leaseRefresher.deleteLease(leaseForClosedShard); } } @@ -757,9 +765,9 @@ public class HierarchicalShardSyncer { * @param allShards All shards returved via DescribeStream. We assume this to represent a consistent shard list. * @return List of open shards (shards at the tip of the stream) - may include shards that are not yet active. */ - static List getOpenShards(final List allShards) { + static List getOpenShards(final List allShards, final String streamIdentifier) { return allShards.stream().filter(shard -> shard.sequenceNumberRange().endingSequenceNumber() == null) - .peek(shard -> log.debug("Found open shard: {}", shard.shardId())).collect(Collectors.toList()); + .peek(shard -> log.debug("{} : Found open shard: {}", streamIdentifier, shard.shardId())).collect(Collectors.toList()); } private static ExtendedSequenceNumber convertToCheckpoint(final InitialPositionInStreamExtended position) { @@ -775,7 +783,12 @@ public class HierarchicalShardSyncer { return checkpoint; } - + + private static String getStreamIdentifier(MultiStreamArgs multiStreamArgs) { + return Optional.ofNullable(multiStreamArgs.streamIdentifier()) + .map(streamId -> streamId.serialize()).orElse("single_stream_mode"); + } + /** Helper class to compare leases based on starting sequence number of the corresponding shards. * */ @@ -868,13 +881,15 @@ public class HierarchicalShardSyncer { @Override public List determineNewLeasesToCreate(List shards, List currentLeases, InitialPositionInStreamExtended initialPosition, Set inconsistentShardIds, MultiStreamArgs multiStreamArgs) { + final String streamIdentifier = Optional.ofNullable(multiStreamArgs.streamIdentifier()) + .map(streamId -> streamId.serialize()).orElse(""); final Map shardIdToShardMapOfAllKinesisShards = constructShardIdToShardMap(shards); - currentLeases.stream().peek(lease -> log.debug("Existing lease: {}", lease)) + currentLeases.stream().peek(lease -> log.debug("{} : Existing lease: {}", streamIdentifier, lease)) .map(lease -> shardIdFromLeaseDeducer.apply(lease, multiStreamArgs)) .collect(Collectors.toSet()); - final List newLeasesToCreate = getLeasesToCreateForOpenAndClosedShards(initialPosition, shards, multiStreamArgs); + final List newLeasesToCreate = getLeasesToCreateForOpenAndClosedShards(initialPosition, shards, multiStreamArgs, streamIdentifier); //TODO: Verify before LTR launch that ending sequence number is still returned from the service. final Comparator startingSequenceNumberComparator = @@ -889,7 +904,7 @@ public class HierarchicalShardSyncer { * reaching SHARD_END. */ private List getLeasesToCreateForOpenAndClosedShards(InitialPositionInStreamExtended initialPosition, - List shards, MultiStreamArgs multiStreamArgs) { + List shards, MultiStreamArgs multiStreamArgs, String streamId) { final Map shardIdToNewLeaseMap = new HashMap<>(); for (Shard shard : shards) { @@ -898,7 +913,7 @@ public class HierarchicalShardSyncer { newKCLMultiStreamLease(shard, multiStreamArgs.streamIdentifier) : newKCLLease(shard); lease.checkpoint(convertToCheckpoint(initialPosition)); - log.debug("Need to create a lease for shard with shardId {}", shardId); + log.debug("{} : Need to create a lease for shard with shardId {}", streamId, shardId); shardIdToNewLeaseMap.put(shardId, lease); } @@ -961,29 +976,31 @@ public class HierarchicalShardSyncer { * @return List of new leases to create sorted by starting sequenceNumber of the corresponding shard */ @Override - public synchronized List determineNewLeasesToCreate(List shards, List currentLeases, - InitialPositionInStreamExtended initialPosition, Set inconsistentShardIds, MultiStreamArgs multiStreamArgs) { + public synchronized List determineNewLeasesToCreate(final List shards, final List currentLeases, + final InitialPositionInStreamExtended initialPosition, final Set inconsistentShardIds, + final MultiStreamArgs multiStreamArgs) { final Map shardIdToNewLeaseMap = new HashMap<>(); final Map shardIdToShardMapOfAllKinesisShards = constructShardIdToShardMap(shards); - + final String streamIdentifier = Optional.ofNullable(multiStreamArgs.streamIdentifier()) + .map(streamId -> streamId.serialize()).orElse(""); final Set shardIdsOfCurrentLeases = currentLeases.stream() - .peek(lease -> log.debug("Existing lease: {}", lease)) + .peek(lease -> log.debug("{} : Existing lease: {}", streamIdentifier, lease)) .map(lease -> shardIdFromLeaseDeducer.apply(lease, multiStreamArgs)) .collect(Collectors.toSet()); - final List openShards = getOpenShards(shards); + final List openShards = getOpenShards(shards, streamIdentifier); final Map memoizationContext = new HashMap<>(); // Iterate over the open shards and find those that don't have any lease entries. for (Shard shard : openShards) { final String shardId = shard.shardId(); - log.debug("Evaluating leases for open shard {} and its ancestors.", shardId); + log.debug("{} : Evaluating leases for open shard {} and its ancestors.", streamIdentifier, shardId); if (shardIdsOfCurrentLeases.contains(shardId)) { - log.debug("Lease for shardId {} already exists. Not creating a lease", shardId); + log.debug("{} : Lease for shardId {} already exists. Not creating a lease", streamIdentifier, shardId); } else if (inconsistentShardIds.contains(shardId)) { - log.info("shardId {} is an inconsistent child. Not creating a lease", shardId); + log.info("{} : shardId {} is an inconsistent child. Not creating a lease", streamIdentifier, shardId); } else { - log.debug("Need to create a lease for shardId {}", shardId); + log.debug("{} : Need to create a lease for shardId {}", streamIdentifier, shardId); final Lease newLease = multiStreamArgs.isMultiStreamMode() ? newKCLMultiStreamLease(shard, multiStreamArgs.streamIdentifier()) : newKCLLease(shard); @@ -1020,13 +1037,12 @@ public class HierarchicalShardSyncer { } else { newLease.checkpoint(convertToCheckpoint(initialPosition)); } - log.debug("Set checkpoint of {} to {}", newLease.leaseKey(), newLease.checkpoint()); + log.debug("{} : Set checkpoint of {} to {}", streamIdentifier, newLease.leaseKey(), newLease.checkpoint()); shardIdToNewLeaseMap.put(shardId, newLease); } } final List newLeasesToCreate = new ArrayList<>(shardIdToNewLeaseMap.values()); - //TODO: Verify before LTR launch that ending sequence number is still returned from the service. final Comparator startingSequenceNumberComparator = new StartingSequenceNumberAndShardIdBasedComparator( shardIdToShardMapOfAllKinesisShards, multiStreamArgs); newLeasesToCreate.sort(startingSequenceNumberComparator); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTask.java index 4ea4212e..37a092e8 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTask.java @@ -54,7 +54,8 @@ public class BlockOnParentShardTask implements ConsumerTask { @Override public TaskResult call() { Exception exception = null; - + final String shardInfoId = shardInfo.streamIdentifierSerOpt().map(s -> s + ":" + shardInfo.shardId()) + .orElse(shardInfo.shardId()); try { boolean blockedOnParentShard = false; for (String shardId : shardInfo.parentShardIds()) { @@ -62,20 +63,20 @@ public class BlockOnParentShardTask implements ConsumerTask { if (lease != null) { ExtendedSequenceNumber checkpoint = lease.checkpoint(); if ((checkpoint == null) || (!checkpoint.equals(ExtendedSequenceNumber.SHARD_END))) { - log.debug("Shard {} is not yet done. Its current checkpoint is {}", shardId, checkpoint); + log.debug("Shard {} is not yet done. Its current checkpoint is {}", shardInfoId, checkpoint); blockedOnParentShard = true; exception = new BlockedOnParentShardException("Parent shard not yet done"); break; } else { - log.debug("Shard {} has been completely processed.", shardId); + log.debug("Shard {} has been completely processed.", shardInfoId); } } else { - log.info("No lease found for shard {}. Not blocking on completion of this shard.", shardId); + log.info("No lease found for shard {}. Not blocking on completion of this shard.", shardInfoId); } } if (!blockedOnParentShard) { - log.info("No need to block on parents {} of shard {}", shardInfo.parentShardIds(), shardInfo.shardId()); + log.info("No need to block on parents {} of shard {}", shardInfo.parentShardIds(), shardInfoId); return new TaskResult(null); } } catch (Exception e) { @@ -85,7 +86,7 @@ public class BlockOnParentShardTask implements ConsumerTask { try { Thread.sleep(parentShardPollIntervalMillis); } catch (InterruptedException e) { - log.error("Sleep interrupted when waiting on parent shard(s) of {}", shardInfo.shardId(), e); + log.error("Sleep interrupted when waiting on parent shard(s) of {}", shardInfoId, e); } return new TaskResult(exception); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java index 6c223650..fd036c9f 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java @@ -60,6 +60,7 @@ public class ProcessTask implements ConsumerTask { private final ProcessRecordsInput processRecordsInput; private final MetricsFactory metricsFactory; private final AggregatorUtil aggregatorUtil; + private final String shardInfoId; public ProcessTask(@NonNull ShardInfo shardInfo, @NonNull ShardRecordProcessor shardRecordProcessor, @@ -74,6 +75,8 @@ public class ProcessTask implements ConsumerTask { @NonNull AggregatorUtil aggregatorUtil, @NonNull MetricsFactory metricsFactory) { this.shardInfo = shardInfo; + this.shardInfoId = shardInfo.streamIdentifierSerOpt().map(s -> s + ":" + shardInfo.shardId()) + .orElse(shardInfo.shardId()); this.shardRecordProcessor = shardRecordProcessor; this.recordProcessorCheckpointer = recordProcessorCheckpointer; this.backoffTimeMillis = backoffTimeMillis; @@ -121,7 +124,7 @@ public class ProcessTask implements ConsumerTask { } if (processRecordsInput.isAtShardEnd() && processRecordsInput.records().isEmpty()) { - log.info("Reached end of shard {} and have no records to process", shardInfo.shardId()); + log.info("Reached end of shard {} and have no records to process", shardInfoId); return new TaskResult(null, true); } @@ -142,13 +145,13 @@ public class ProcessTask implements ConsumerTask { } success = true; } catch (RuntimeException e) { - log.error("ShardId {}: Caught exception: ", shardInfo.shardId(), e); + log.error("ShardId {}: Caught exception: ", shardInfoId, e); exception = e; backoff(); } if (processRecordsInput.isAtShardEnd()) { - log.info("Reached end of shard {}, and processed {} records", shardInfo.shardId(), processRecordsInput.records().size()); + log.info("Reached end of shard {}, and processed {} records", shardInfoId, processRecordsInput.records().size()); return new TaskResult(null, true); } return new TaskResult(exception); @@ -174,7 +177,7 @@ public class ProcessTask implements ConsumerTask { try { Thread.sleep(this.backoffTimeMillis); } catch (InterruptedException ie) { - log.debug("{}: Sleep was interrupted", shardInfo.shardId(), ie); + log.debug("{}: Sleep was interrupted", shardInfoId, ie); } } @@ -188,7 +191,7 @@ public class ProcessTask implements ConsumerTask { */ private void callProcessRecords(ProcessRecordsInput input, List records) { log.debug("Calling application processRecords() with {} records from {}", records.size(), - shardInfo.shardId()); + shardInfoId); final ProcessRecordsInput processRecordsInput = ProcessRecordsInput.builder().records(records).cacheExitTime(input.cacheExitTime()).cacheEntryTime(input.cacheEntryTime()) .checkpointer(recordProcessorCheckpointer).millisBehindLatest(input.millisBehindLatest()).build(); @@ -200,8 +203,8 @@ public class ProcessTask implements ConsumerTask { shardRecordProcessor.processRecords(processRecordsInput); } catch (Exception e) { log.error("ShardId {}: Application processRecords() threw an exception when processing shard ", - shardInfo.shardId(), e); - log.error("ShardId {}: Skipping over the following data records: {}", shardInfo.shardId(), records); + shardInfoId, e); + log.error("ShardId {}: Skipping over the following data records: {}", shardInfoId, records); } finally { MetricsUtil.addLatency(scope, RECORD_PROCESSOR_PROCESS_RECORDS_METRIC, startTime, MetricsLevel.SUMMARY); MetricsUtil.endScope(scope); @@ -226,17 +229,17 @@ public class ProcessTask implements ConsumerTask { * the time when the task started */ private void handleNoRecords(long startTimeMillis) { - log.debug("Kinesis didn't return any records for shard {}", shardInfo.shardId()); + log.debug("Kinesis didn't return any records for shard {}", shardInfoId); long sleepTimeMillis = idleTimeInMilliseconds - (System.currentTimeMillis() - startTimeMillis); if (sleepTimeMillis > 0) { sleepTimeMillis = Math.max(sleepTimeMillis, idleTimeInMilliseconds); try { log.debug("Sleeping for {} ms since there were no new records in shard {}", sleepTimeMillis, - shardInfo.shardId()); + shardInfoId); Thread.sleep(sleepTimeMillis); } catch (InterruptedException e) { - log.debug("ShardId {}: Sleep was interrupted", shardInfo.shardId()); + log.debug("ShardId {}: Sleep was interrupted", shardInfoId); } } } @@ -273,8 +276,8 @@ public class ProcessTask implements ConsumerTask { if (extendedSequenceNumber.compareTo(lastCheckpointValue) <= 0) { recordIterator.remove(); - log.debug("removing record with ESN {} because the ESN is <= checkpoint ({})", extendedSequenceNumber, - lastCheckpointValue); + log.debug("{} : removing record with ESN {} because the ESN is <= checkpoint ({})", shardInfoId, + extendedSequenceNumber, lastCheckpointValue); continue; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java index 99a680bf..e34f2ea4 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java @@ -62,6 +62,7 @@ public class ShardConsumer { private final Function taskMetricsDecorator; private final int bufferSize; private final TaskExecutionListener taskExecutionListener; + private final String streamIdentifier; private ConsumerTask currentTask; private TaskOutcome taskOutcome; @@ -124,6 +125,7 @@ public class ShardConsumer { this.recordsPublisher = recordsPublisher; this.executorService = executorService; this.shardInfo = shardInfo; + this.streamIdentifier = shardInfo.streamIdentifierSerOpt().orElse("single_stream_mode"); this.shardConsumerArgument = shardConsumerArgument; this.logWarningForTaskAfterMillis = logWarningForTaskAfterMillis; this.taskExecutionListener = taskExecutionListener; @@ -208,8 +210,8 @@ public class ShardConsumer { } Throwable dispatchFailure = subscriber.getAndResetDispatchFailure(); if (dispatchFailure != null) { - log.warn("Exception occurred while dispatching incoming data. The incoming data has been skipped", - dispatchFailure); + log.warn("{} : Exception occurred while dispatching incoming data. The incoming data has been skipped", + streamIdentifier, dispatchFailure); return dispatchFailure; } @@ -238,7 +240,7 @@ public class ShardConsumer { Instant now = Instant.now(); Duration timeSince = Duration.between(subscriber.lastDataArrival(), now); if (timeSince.toMillis() > value) { - log.warn("Last time data arrived: {} ({})", lastDataArrival, timeSince); + log.warn("{} : Last time data arrived: {} ({})", streamIdentifier, lastDataArrival, timeSince); } } }); @@ -250,11 +252,11 @@ public class ShardConsumer { if (taken != null) { String message = longRunningTaskMessage(taken); if (log.isDebugEnabled()) { - log.debug("{} Not submitting new task.", message); + log.debug("{} : {} Not submitting new task.", streamIdentifier, message); } logWarningForTaskAfterMillis.ifPresent(value -> { if (taken.toMillis() > value) { - log.warn(message); + log.warn("{} : {}", streamIdentifier, message); } }); } @@ -358,7 +360,7 @@ public class ShardConsumer { nextState = currentState.failureTransition(); break; default: - log.error("No handler for outcome of {}", outcome.name()); + log.error("{} : No handler for outcome of {}", streamIdentifier, outcome.name()); nextState = currentState.failureTransition(); break; } @@ -382,9 +384,9 @@ public class ShardConsumer { Exception taskException = taskResult.getException(); if (taskException instanceof BlockedOnParentShardException) { // No need to log the stack trace for this exception (it is very specific). - log.debug("Shard {} is blocked on completion of parent shard.", shardInfo.shardId()); + log.debug("{} : Shard {} is blocked on completion of parent shard.", streamIdentifier, shardInfo.shardId()); } else { - log.debug("Caught exception running {} task: ", currentTask.taskType(), taskResult.getException()); + log.debug("{} : Caught exception running {} task: ", streamIdentifier, currentTask.taskType(), taskResult.getException()); } } } @@ -411,10 +413,10 @@ public class ShardConsumer { * @return true if shutdown is complete (false if shutdown is still in progress) */ public boolean leaseLost() { - log.debug("Shutdown({}): Lease lost triggered.", shardInfo.shardId()); + log.debug("{} : Shutdown({}): Lease lost triggered.", streamIdentifier, shardInfo.shardId()); if (subscriber != null) { subscriber.cancel(); - log.debug("Shutdown({}): Subscriber cancelled.", shardInfo.shardId()); + log.debug("{} : Shutdown({}): Subscriber cancelled.", streamIdentifier, shardInfo.shardId()); } markForShutdown(ShutdownReason.LEASE_LOST); return isShutdown(); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java index 4c05ac94..177c0f43 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java @@ -40,8 +40,8 @@ class ShardConsumerSubscriber implements Subscriber { private final int bufferSize; private final ShardConsumer shardConsumer; private final int readTimeoutsToIgnoreBeforeWarning; + private final String shardInfoId; private volatile int readTimeoutSinceLastRead = 0; - @VisibleForTesting final Object lockObject = new Object(); // This holds the last time an attempt of request to upstream service was made including the first try to @@ -70,6 +70,8 @@ class ShardConsumerSubscriber implements Subscriber { this.bufferSize = bufferSize; this.shardConsumer = shardConsumer; this.readTimeoutsToIgnoreBeforeWarning = readTimeoutsToIgnoreBeforeWarning; + this.shardInfoId = shardConsumer.shardInfo().streamIdentifierSerOpt() + .map(s -> s + ":" + shardConsumer.shardInfo().shardId()).orElse(shardConsumer.shardInfo().shardId()); } @@ -107,7 +109,7 @@ class ShardConsumerSubscriber implements Subscriber { if (retrievalFailure != null) { synchronized (lockObject) { String logMessage = String.format("%s: Failure occurred in retrieval. Restarting data requests", - shardConsumer.shardInfo().shardId()); + shardInfoId); if (retrievalFailure instanceof RetryableRetrievalException) { log.debug(logMessage, retrievalFailure.getCause()); } else { @@ -130,7 +132,7 @@ class ShardConsumerSubscriber implements Subscriber { if (timeSinceLastResponse.toMillis() > maxTimeBetweenRequests) { log.error( "{}: Last request was dispatched at {}, but no response as of {} ({}). Cancelling subscription, and restarting. Last successful request details -- {}", - shardConsumer.shardInfo().shardId(), lastRequestTime, now, timeSinceLastResponse, recordsPublisher.getLastSuccessfulRequestDetails()); + shardInfoId, lastRequestTime, now, timeSinceLastResponse, recordsPublisher.getLastSuccessfulRequestDetails()); cancel(); // Start the subscription again which will update the lastRequestTime as well. @@ -157,7 +159,7 @@ class ShardConsumerSubscriber implements Subscriber { subscription); } catch (Throwable t) { - log.warn("{}: Caught exception from handleInput", shardConsumer.shardInfo().shardId(), t); + log.warn("{}: Caught exception from handleInput", shardInfoId, t); synchronized (lockObject) { dispatchFailure = t; } @@ -193,7 +195,7 @@ class ShardConsumerSubscriber implements Subscriber { log.warn( "{}: onError(). Cancelling subscription, and marking self as failed. KCL will " + "recreate the subscription as neccessary to continue processing.", - shardConsumer.shardInfo().shardId(), t); + shardInfoId, t); } protected void logOnErrorReadTimeoutWarning(Throwable t) { @@ -202,13 +204,13 @@ class ShardConsumerSubscriber implements Subscriber { + "are seeing this warning frequently consider increasing the SDK timeouts " + "by providing an OverrideConfiguration to the kinesis client. Alternatively you" + "can configure LifecycleConfig.readTimeoutsToIgnoreBeforeWarning to suppress" - + "intermittant ReadTimeout warnings.", shardConsumer.shardInfo().shardId(), t); + + "intermittant ReadTimeout warnings.", shardInfoId, t); } @Override public void onComplete() { log.debug("{}: onComplete(): Received onComplete. Activity should be triggered externally", - shardConsumer.shardInfo().shardId()); + shardInfoId); } public void cancel() { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index f4538be6..18e2be76 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -41,6 +41,7 @@ import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import java.util.List; +import java.util.function.Function; /** * Task for invoking the ShardRecordProcessor shutdown() callback. @@ -66,7 +67,7 @@ public class ShutdownTask implements ConsumerTask { private final InitialPositionInStreamExtended initialPositionInStream; private final boolean cleanupLeasesOfCompletedShards; private final boolean garbageCollectLeases = false; - private final boolean isLeaseTableEmpty= false; + private final boolean isLeaseTableEmpty = false; private final boolean ignoreUnexpectedChildShards; @NonNull private final LeaseCoordinator leaseCoordinator; @@ -80,6 +81,8 @@ public class ShutdownTask implements ConsumerTask { private final TaskType taskType = TaskType.SHUTDOWN; + private static final Function shardInfoIdProvider = shardInfo -> shardInfo + .streamIdentifierSerOpt().map(s -> s + ":" + shardInfo.shardId()).orElse(shardInfo.shardId()); /* * Invokes ShardRecordProcessor shutdown() API. * (non-Javadoc) @@ -110,7 +113,7 @@ public class ShutdownTask implements ConsumerTask { if (CollectionUtils.isNullOrEmpty(latestShards) || !isShardInContextParentOfAny(latestShards)) { localReason = ShutdownReason.LEASE_LOST; dropLease(); - log.info("Forcing the lease to be lost before shutting down the consumer for Shard: " + shardInfo.shardId()); + log.info("Forcing the lease to be lost before shutting down the consumer for Shard: " + shardInfoIdProvider.apply(shardInfo)); } } @@ -122,7 +125,7 @@ public class ShutdownTask implements ConsumerTask { } log.debug("Invoking shutdown() for shard {}, concurrencyToken {}. Shutdown reason: {}", - shardInfo.shardId(), shardInfo.concurrencyToken(), localReason); + shardInfoIdProvider.apply(shardInfo), shardInfo.concurrencyToken(), localReason); final ShutdownInput shutdownInput = ShutdownInput.builder().shutdownReason(localReason) .checkpointer(recordProcessorCheckpointer).build(); final long startTime = System.currentTimeMillis(); @@ -133,7 +136,7 @@ public class ShutdownTask implements ConsumerTask { if (lastCheckpointValue == null || !lastCheckpointValue.equals(ExtendedSequenceNumber.SHARD_END)) { throw new IllegalArgumentException("Application didn't checkpoint at end of shard " - + shardInfo.shardId() + ". Application must checkpoint upon shard end. " + + + shardInfoIdProvider.apply(shardInfo) + ". Application must checkpoint upon shard end. " + "See ShardRecordProcessor.shardEnded javadocs for more information."); } } else { @@ -141,7 +144,7 @@ public class ShutdownTask implements ConsumerTask { } log.debug("Shutting down retrieval strategy."); recordsPublisher.shutdown(); - log.debug("Record processor completed shutdown() for shard {}", shardInfo.shardId()); + log.debug("Record processor completed shutdown() for shard {}", shardInfoIdProvider.apply(shardInfo)); } catch (Exception e) { applicationException = true; throw e; @@ -150,12 +153,12 @@ public class ShutdownTask implements ConsumerTask { } if (localReason == ShutdownReason.SHARD_END) { - log.debug("Looking for child shards of shard {}", shardInfo.shardId()); + log.debug("Looking for child shards of shard {}", shardInfoIdProvider.apply(shardInfo)); // create leases for the child shards hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseCoordinator.leaseRefresher(), initialPositionInStream, latestShards, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, garbageCollectLeases, isLeaseTableEmpty); - log.debug("Finished checking for child shards of shard {}", shardInfo.shardId()); + log.debug("Finished checking for child shards of shard {}", shardInfoIdProvider.apply(shardInfo)); } return new TaskResult(null); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java index 98046b6b..746fdc19 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java @@ -105,7 +105,7 @@ public class RetrievalConfig { this.applicationName = applicationName; } - public void initialPositionInStreamExtended(InitialPositionInStreamExtended initialPositionInStreamExtended) { + public RetrievalConfig initialPositionInStreamExtended(InitialPositionInStreamExtended initialPositionInStreamExtended) { final StreamConfig[] streamConfig = new StreamConfig[1]; this.appStreamTracker.apply(multiStreamTracker -> { throw new IllegalArgumentException( @@ -113,6 +113,7 @@ public class RetrievalConfig { }, sc -> streamConfig[0] = sc); this.appStreamTracker = Either .right(new StreamConfig(streamConfig[0].streamIdentifier(), initialPositionInStreamExtended)); + return this; } public RetrievalFactory retrievalFactory() { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java index 27cad136..c24a3803 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java @@ -51,7 +51,6 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import java.time.Instant; import java.util.Collections; import java.util.List; -import java.util.Optional; import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -60,7 +59,6 @@ import java.util.stream.Collectors; import static software.amazon.kinesis.common.DiagnosticUtils.takeDelayedDeliveryActionIfRequired; -@RequiredArgsConstructor @Slf4j @KinesisClientInternalApi public class FanOutRecordsPublisher implements RecordsPublisher { @@ -73,7 +71,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { private final KinesisAsyncClient kinesis; private final String shardId; private final String consumerArn; - + private final String streamAndShardId; private final Object lockObject = new Object(); private final AtomicInteger subscribeToShardId = new AtomicInteger(0); @@ -91,11 +89,25 @@ public class FanOutRecordsPublisher implements RecordsPublisher { private RequestDetails lastSuccessfulRequestDetails = new RequestDetails(); + public FanOutRecordsPublisher(KinesisAsyncClient kinesis, String shardId, String consumerArn) { + this.kinesis = kinesis; + this.shardId = shardId; + this.consumerArn = consumerArn; + this.streamAndShardId = shardId; + } + + public FanOutRecordsPublisher(KinesisAsyncClient kinesis, String shardId, String consumerArn, String streamIdentifierSer) { + this.kinesis = kinesis; + this.shardId = shardId; + this.consumerArn = consumerArn; + this.streamAndShardId = streamIdentifierSer + ":" + shardId; + } + @Override public void start(ExtendedSequenceNumber extendedSequenceNumber, InitialPositionInStreamExtended initialPositionInStreamExtended) { synchronized (lockObject) { - log.debug("[{}] Initializing Publisher @ Sequence: {} -- Initial Position: {}", shardId, + log.debug("[{}] Initializing Publisher @ Sequence: {} -- Initial Position: {}", streamAndShardId, extendedSequenceNumber, initialPositionInStreamExtended); this.initialPositionInStreamExtended = initialPositionInStreamExtended; this.currentSequenceNumber = extendedSequenceNumber.sequenceNumber(); @@ -174,7 +186,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { // It is now safe to remove the element recordsDeliveryQueue.poll(); // Take action based on the time spent by the event in queue. - takeDelayedDeliveryActionIfRequired(shardId, recordsRetrievedContext.getEnqueueTimestamp(), log); + takeDelayedDeliveryActionIfRequired(streamAndShardId, recordsRetrievedContext.getEnqueueTimestamp(), log); // Update current sequence number for the successfully delivered event. currentSequenceNumber = ((FanoutRecordsRetrieved)recordsRetrieved).continuationSequenceNumber(); // Update the triggering flow for post scheduling upstream request. @@ -190,13 +202,12 @@ public class FanOutRecordsPublisher implements RecordsPublisher { if (flow != null && recordsDeliveryAck.batchUniqueIdentifier().getFlowIdentifier() .equals(flow.getSubscribeToShardId())) { log.error( - "{}: Received unexpected ack for the active subscription {}. Throwing. ", - shardId, recordsDeliveryAck.batchUniqueIdentifier().getFlowIdentifier()); + "{}: Received unexpected ack for the active subscription {}. Throwing. ", streamAndShardId, recordsDeliveryAck.batchUniqueIdentifier().getFlowIdentifier()); throw new IllegalStateException("Unexpected ack for the active subscription"); } // Otherwise publisher received a stale ack. else { - log.info("{}: Publisher received an ack for stale subscription {}. Ignoring.", shardId, + log.info("{}: Publisher received an ack for stale subscription {}. Ignoring.", streamAndShardId, recordsDeliveryAck.batchUniqueIdentifier().getFlowIdentifier()); } } @@ -219,10 +230,10 @@ public class FanOutRecordsPublisher implements RecordsPublisher { } catch (IllegalStateException e) { log.warn("{}: Unable to enqueue the payload due to capacity restrictions in delivery queue with remaining capacity {}. Last successful request details -- {}", - shardId, recordsDeliveryQueue.remainingCapacity(), lastSuccessfulRequestDetails); + streamAndShardId, recordsDeliveryQueue.remainingCapacity(), lastSuccessfulRequestDetails); throw e; } catch (Throwable t) { - log.error("{}: Unable to deliver event to the shard consumer.", shardId, t); + log.error("{}: Unable to deliver event to the shard consumer.", streamAndShardId, t); throw t; } } @@ -290,7 +301,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { String instanceId = shardId + "-" + subscribeInvocationId; log.debug( "{}: [SubscriptionLifetime]: (FanOutRecordsPublisher#subscribeToShard) @ {} id: {} -- Starting subscribe to shard", - shardId, connectionStart, instanceId); + streamAndShardId, connectionStart, instanceId); flow = new RecordFlow(this, connectionStart, instanceId); kinesis.subscribeToShard(request, flow); } @@ -303,12 +314,12 @@ public class FanOutRecordsPublisher implements RecordsPublisher { if(hasValidFlow()) { log.warn( "{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ {} id: {} -- Subscriber is null." + - " Last successful request details -- {}", shardId, flow.connectionStartedAt, + " Last successful request details -- {}", streamAndShardId, flow.connectionStartedAt, flow.subscribeToShardId, lastSuccessfulRequestDetails); } else { log.warn( "{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) -- Subscriber and flow are null." + - " Last successful request details -- {}", shardId, lastSuccessfulRequestDetails); + " Last successful request details -- {}", streamAndShardId, lastSuccessfulRequestDetails); } return; } @@ -320,8 +331,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { if (flow != null) { String logMessage = String.format( "%s: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ %s id: %s -- %s." + - " Last successful request details -- %s", - shardId, flow.connectionStartedAt, flow.subscribeToShardId, category.throwableTypeString, lastSuccessfulRequestDetails); + " Last successful request details -- %s", streamAndShardId, flow.connectionStartedAt, flow.subscribeToShardId, category.throwableTypeString, lastSuccessfulRequestDetails); switch (category.throwableType) { case READ_TIMEOUT: log.debug(logMessage, propagationThrowable); @@ -339,13 +349,14 @@ public class FanOutRecordsPublisher implements RecordsPublisher { } flow.cancel(); } - log.debug("{}: availableQueueSpace zeroing from {}", shardId, availableQueueSpace); + log.debug("{}: availableQueueSpace zeroing from {}", streamAndShardId, availableQueueSpace); availableQueueSpace = 0; try { handleFlowError(propagationThrowable, triggeringFlow); } catch (Throwable innerThrowable) { - log.warn("{}: Exception while calling subscriber.onError. Last successful request details -- {}", shardId, lastSuccessfulRequestDetails, innerThrowable); + log.warn("{}: Exception while calling subscriber.onError. Last successful request details -- {}", + streamAndShardId, lastSuccessfulRequestDetails, innerThrowable); } subscriber = null; flow = null; @@ -353,7 +364,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { if (triggeringFlow != null) { log.debug( "{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ {} id: {} -- {} -> triggeringFlow wasn't the active flow. Didn't dispatch error", - shardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId, + streamAndShardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId, category.throwableTypeString); triggeringFlow.cancel(); } @@ -367,7 +378,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { // Clear any lingering records in the queue. if (!recordsDeliveryQueue.isEmpty()) { log.warn("{}: Found non-empty queue while starting subscription. This indicates unsuccessful clean up of" - + "previous subscription - {}. Last successful request details -- {}", shardId, subscribeToShardId, lastSuccessfulRequestDetails); + + "previous subscription - {}. Last successful request details -- {}", streamAndShardId, subscribeToShardId, lastSuccessfulRequestDetails); recordsDeliveryQueue.clear(); } } @@ -383,7 +394,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { if (t.getCause() instanceof ResourceNotFoundException) { log.debug( "{}: Could not call SubscribeToShard successfully because shard no longer exists. Marking shard for completion.", - shardId); + streamAndShardId); // The ack received for this onNext event will be ignored by the publisher as the global flow object should // be either null or renewed when the ack's flow identifier is evaluated. FanoutRecordsRetrieved response = new FanoutRecordsRetrieved( @@ -452,7 +463,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { if (!hasValidSubscriber()) { log.debug( "{}: [SubscriptionLifetime] (FanOutRecordsPublisher#recordsReceived) @ {} id: {} -- Subscriber is null.", - shardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId); + streamAndShardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId); triggeringFlow.cancel(); if (flow != null) { flow.cancel(); @@ -462,7 +473,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { if (!isActiveFlow(triggeringFlow)) { log.debug( "{}: [SubscriptionLifetime] (FanOutRecordsPublisher#recordsReceived) @ {} id: {} -- Received records for an inactive flow.", - shardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId); + streamAndShardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId); return; } @@ -478,7 +489,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { bufferCurrentEventAndScheduleIfRequired(recordsRetrieved, triggeringFlow); } catch (Throwable t) { log.warn("{}: Unable to buffer or schedule onNext for subscriber. Failing publisher." + - " Last successful request details -- {}", shardId, lastSuccessfulRequestDetails); + " Last successful request details -- {}", streamAndShardId, lastSuccessfulRequestDetails); errorOccurred(triggeringFlow, t); } } @@ -488,7 +499,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { if (availableQueueSpace <= 0) { log.debug( "{}: [SubscriptionLifetime] (FanOutRecordsPublisher#recordsReceived) @ {} id: {} -- Attempted to decrement availableQueueSpace to below 0", - shardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId); + streamAndShardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId); } else { availableQueueSpace--; if (availableQueueSpace > 0) { @@ -503,12 +514,13 @@ public class FanOutRecordsPublisher implements RecordsPublisher { private void onComplete(RecordFlow triggeringFlow) { synchronized (lockObject) { - log.debug("{}: [SubscriptionLifetime]: (FanOutRecordsPublisher#onComplete) @ {} id: {}", shardId, + log.debug("{}: [SubscriptionLifetime]: (FanOutRecordsPublisher#onComplete) @ {} id: {}", streamAndShardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId); triggeringFlow.cancel(); if (!hasValidSubscriber()) { - log.debug("{}: [SubscriptionLifetime]: (FanOutRecordsPublisher#onComplete) @ {} id: {}", shardId, + log.debug("{}: [SubscriptionLifetime]: (FanOutRecordsPublisher#onComplete) @ {} id: {}", + streamAndShardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId); return; } @@ -516,15 +528,15 @@ public class FanOutRecordsPublisher implements RecordsPublisher { if (!isActiveFlow(triggeringFlow)) { log.debug( "{}: [SubscriptionLifetime]: (FanOutRecordsPublisher#onComplete) @ {} id: {} -- Received spurious onComplete from unexpected flow. Ignoring.", - shardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId); + streamAndShardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId); return; } if (currentSequenceNumber != null) { - log.debug("{}: Shard hasn't ended. Resubscribing.", shardId); + log.debug("{}: Shard hasn't ended. Resubscribing.", streamAndShardId); subscribeToShard(currentSequenceNumber); } else { - log.debug("{}: Shard has ended completing subscriber.", shardId); + log.debug("{}: Shard has ended completing subscriber.", streamAndShardId); subscriber.onComplete(); } } @@ -536,7 +548,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { if (subscriber != null) { log.error( "{}: A subscribe occurred while there was an active subscriber. Sending error to current subscriber", - shardId); + streamAndShardId); MultipleSubscriberException multipleSubscriberException = new MultipleSubscriberException(); // @@ -575,7 +587,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { if (subscriber != s) { log.warn( "{}: (FanOutRecordsPublisher/Subscription#request) - Rejected an attempt to request({}), because subscribers don't match. Last successful request details -- {}", - shardId, n, lastSuccessfulRequestDetails); + streamAndShardId, n, lastSuccessfulRequestDetails); return; } if (flow == null) { @@ -584,7 +596,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { // log.debug( "{}: (FanOutRecordsPublisher/Subscription#request) - Request called for a null flow.", - shardId); + streamAndShardId); errorOccurred(flow, new IllegalStateException("Attempted to request on a null flow.")); return; } @@ -602,19 +614,19 @@ public class FanOutRecordsPublisher implements RecordsPublisher { if (subscriber != s) { log.warn( "{}: (FanOutRecordsPublisher/Subscription#cancel) - Rejected attempt to cancel subscription, because subscribers don't match. Last successful request details -- {}", - shardId, lastSuccessfulRequestDetails); + streamAndShardId, lastSuccessfulRequestDetails); return; } if (!hasValidSubscriber()) { log.warn( "{}: (FanOutRecordsPublisher/Subscription#cancel) - Cancelled called even with an invalid subscriber. Last successful request details -- {}", - shardId, lastSuccessfulRequestDetails); + streamAndShardId, lastSuccessfulRequestDetails); } subscriber = null; if (flow != null) { log.debug( "{}: [SubscriptionLifetime]: (FanOutRecordsPublisher/Subscription#cancel) @ {} id: {}", - shardId, flow.connectionStartedAt, flow.subscribeToShardId); + streamAndShardId, flow.connectionStartedAt, flow.subscribeToShardId); flow.cancel(); availableQueueSpace = 0; } @@ -703,12 +715,12 @@ public class FanOutRecordsPublisher implements RecordsPublisher { public void onEventStream(SdkPublisher publisher) { synchronized (parent.lockObject) { log.debug("{}: [SubscriptionLifetime]: (RecordFlow#onEventStream) @ {} id: {} -- Subscribe", - parent.shardId, connectionStartedAt, subscribeToShardId); + parent.streamAndShardId, connectionStartedAt, subscribeToShardId); if (!parent.isActiveFlow(this)) { this.isDisposed = true; log.debug( "{}: [SubscriptionLifetime]: (RecordFlow#onEventStream) @ {} id: {} -- parent is disposed", - parent.shardId, connectionStartedAt, subscribeToShardId); + parent.streamAndShardId, connectionStartedAt, subscribeToShardId); parent.rejectSubscription(publisher); return; } @@ -716,7 +728,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { try { log.debug( "{}: [SubscriptionLifetime]: (RecordFlow#onEventStream) @ {} id: {} -- creating record subscription", - parent.shardId, connectionStartedAt, subscribeToShardId); + parent.streamAndShardId, connectionStartedAt, subscribeToShardId); subscription = new RecordSubscription(parent, this, connectionStartedAt, subscribeToShardId); publisher.subscribe(subscription); @@ -727,7 +739,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { } catch (Throwable t) { log.debug( "{}: [SubscriptionLifetime]: (RecordFlow#onEventStream) @ {} id: {} -- throwable during record subscription: {}", - parent.shardId, connectionStartedAt, subscribeToShardId, t.getMessage()); + parent.streamAndShardId, connectionStartedAt, subscribeToShardId, t.getMessage()); parent.errorOccurred(this, t); } } @@ -736,7 +748,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { @Override public void responseReceived(SubscribeToShardResponse response) { log.debug("{}: [SubscriptionLifetime]: (RecordFlow#responseReceived) @ {} id: {} -- Response received. Request id - {}", - parent.shardId, connectionStartedAt, subscribeToShardId, response.responseMetadata().requestId()); + parent.streamAndShardId, connectionStartedAt, subscribeToShardId, response.responseMetadata().requestId()); final RequestDetails requestDetails = new RequestDetails(response.responseMetadata().requestId(), connectionStartedAt.toString()); parent.setLastSuccessfulRequestDetails(requestDetails); @@ -759,12 +771,12 @@ public class FanOutRecordsPublisher implements RecordsPublisher { synchronized (parent.lockObject) { log.debug("{}: [SubscriptionLifetime]: (RecordFlow#exceptionOccurred) @ {} id: {} -- {}: {}", - parent.shardId, connectionStartedAt, subscribeToShardId, throwable.getClass().getName(), + parent.streamAndShardId, connectionStartedAt, subscribeToShardId, throwable.getClass().getName(), throwable.getMessage()); if (this.isDisposed) { log.debug( "{}: [SubscriptionLifetime]: (RecordFlow#exceptionOccurred) @ {} id: {} -- This flow has been disposed, not dispatching error. {}: {}", - parent.shardId, connectionStartedAt, subscribeToShardId, throwable.getClass().getName(), + parent.streamAndShardId, connectionStartedAt, subscribeToShardId, throwable.getClass().getName(), throwable.getMessage()); this.isErrorDispatched = true; } @@ -775,7 +787,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { } else { log.debug( "{}: [SubscriptionLifetime]: (RecordFlow#exceptionOccurred) @ {} id: {} -- An error has previously been dispatched, not dispatching this error {}: {}", - parent.shardId, connectionStartedAt, subscribeToShardId, throwable.getClass().getName(), + parent.streamAndShardId, connectionStartedAt, subscribeToShardId, throwable.getClass().getName(), throwable.getMessage()); } } @@ -802,7 +814,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { } catch (Exception e) { log.warn( "{}: Unable to enqueue the {} shutdown event due to capacity restrictions in delivery queue with remaining capacity {}. Ignoring. Last successful request details -- {}", - parent.shardId, subscriptionShutdownEvent.getEventIdentifier(), parent.recordsDeliveryQueue.remainingCapacity(), + parent.streamAndShardId, subscriptionShutdownEvent.getEventIdentifier(), parent.recordsDeliveryQueue.remainingCapacity(), parent.lastSuccessfulRequestDetails, subscriptionShutdownEvent.getShutdownEventThrowableOptional()); } } @@ -810,7 +822,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { private void executeComplete() { synchronized (parent.lockObject) { log.debug("{}: [SubscriptionLifetime]: (RecordFlow#complete) @ {} id: {} -- Connection completed", - parent.shardId, connectionStartedAt, subscribeToShardId); + parent.streamAndShardId, connectionStartedAt, subscribeToShardId); if (isCancelled) { // @@ -820,13 +832,13 @@ public class FanOutRecordsPublisher implements RecordsPublisher { // subscription, which was cancelled for a reason (usually queue overflow). // log.warn("{}: complete called on a cancelled subscription. Ignoring completion. Last successful request details -- {}", - parent.shardId, parent.lastSuccessfulRequestDetails); + parent.streamAndShardId, parent.lastSuccessfulRequestDetails); return; } if (this.isDisposed) { log.warn( "{}: [SubscriptionLifetime]: (RecordFlow#complete) @ {} id: {} -- This flow has been disposed not dispatching completion. Last successful request details -- {}", - parent.shardId, connectionStartedAt, subscribeToShardId, parent.lastSuccessfulRequestDetails); + parent.streamAndShardId, connectionStartedAt, subscribeToShardId, parent.lastSuccessfulRequestDetails); return; } @@ -844,7 +856,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { } catch (Throwable t) { log.error( "{}: [SubscriptionLifetime]: (RecordFlow#complete) @ {} id: {} -- Exception while trying to cancel failed subscription: {}", - parent.shardId, connectionStartedAt, subscribeToShardId, t.getMessage(), t); + parent.streamAndShardId, connectionStartedAt, subscribeToShardId, t.getMessage(), t); } } } @@ -885,14 +897,14 @@ public class FanOutRecordsPublisher implements RecordsPublisher { public void cancel() { synchronized (parent.lockObject) { log.debug("{}: [SubscriptionLifetime]: (RecordSubscription#cancel) @ {} id: {} -- Cancel called", - parent.shardId, connectionStartedAt, subscribeToShardId); + parent.streamAndShardId, connectionStartedAt, subscribeToShardId); flow.isCancelled = true; if (subscription != null) { subscription.cancel(); } else { log.debug( "{}: [SubscriptionLifetime]: (RecordSubscription#cancel) @ {} id: {} -- SDK subscription is null", - parent.shardId, connectionStartedAt, subscribeToShardId); + parent.streamAndShardId, connectionStartedAt, subscribeToShardId); } } } @@ -906,21 +918,21 @@ public class FanOutRecordsPublisher implements RecordsPublisher { if (flow.isCancelled) { log.debug( "{}: [SubscriptionLifetime]: (RecordSubscription#onSubscribe) @ {} id: {} -- Subscription was cancelled before onSubscribe", - parent.shardId, connectionStartedAt, subscribeToShardId); + parent.streamAndShardId, connectionStartedAt, subscribeToShardId); } if (flow.isDisposed) { log.debug( "{}: [SubscriptionLifetime]: (RecordSubscription#onSubscribe) @ {} id: {} -- RecordFlow has been disposed cancelling subscribe", - parent.shardId, connectionStartedAt, subscribeToShardId); + parent.streamAndShardId, connectionStartedAt, subscribeToShardId); } log.debug( "{}: [SubscriptionLifetime]: (RecordSubscription#onSubscribe) @ {} id: {} -- RecordFlow requires cancelling", - parent.shardId, connectionStartedAt, subscribeToShardId); + parent.streamAndShardId, connectionStartedAt, subscribeToShardId); cancel(); } log.debug( "{}: [SubscriptionLifetime]: (RecordSubscription#onSubscribe) @ {} id: {} -- Outstanding: {} items so requesting an item", - parent.shardId, connectionStartedAt, subscribeToShardId, parent.availableQueueSpace); + parent.streamAndShardId, connectionStartedAt, subscribeToShardId, parent.availableQueueSpace); if (parent.availableQueueSpace > 0) { request(1); } @@ -933,7 +945,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { if (flow.shouldSubscriptionCancel()) { log.debug( "{}: [SubscriptionLifetime]: (RecordSubscription#onNext) @ {} id: {} -- RecordFlow requires cancelling", - parent.shardId, connectionStartedAt, subscribeToShardId); + parent.streamAndShardId, connectionStartedAt, subscribeToShardId); cancel(); return; } @@ -948,7 +960,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { @Override public void onError(Throwable t) { - log.debug("{}: [SubscriptionLifetime]: (RecordSubscription#onError) @ {} id: {} -- {}: {}", parent.shardId, + log.debug("{}: [SubscriptionLifetime]: (RecordSubscription#onError) @ {} id: {} -- {}: {}", parent.streamAndShardId, connectionStartedAt, subscribeToShardId, t.getClass().getName(), t.getMessage()); // @@ -961,7 +973,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { public void onComplete() { log.debug( "{}: [SubscriptionLifetime]: (RecordSubscription#onComplete) @ {} id: {} -- Allowing RecordFlow to call onComplete", - parent.shardId, connectionStartedAt, subscribeToShardId); + parent.streamAndShardId, connectionStartedAt, subscribeToShardId); } } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java index f609c1d9..719d2e54 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java @@ -53,10 +53,14 @@ public class FanOutRetrievalFactory implements RetrievalFactory { final String streamName; if(streamIdentifierStr.isPresent()) { streamName = StreamIdentifier.multiStreamInstance(streamIdentifierStr.get()).streamName(); + return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(), + streamToConsumerArnMap.computeIfAbsent(streamName, consumerArnProvider::apply), + streamIdentifierStr.get()); } else { streamName = defaultStreamName; + return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(), + streamToConsumerArnMap.computeIfAbsent(streamName, consumerArnProvider::apply)); } - return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(), - streamToConsumerArnMap.computeIfAbsent(streamName, consumerArnProvider::apply)); + } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java index 1ea833a3..1605f941 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java @@ -63,7 +63,7 @@ public class KinesisDataFetcher { @NonNull private final KinesisAsyncClient kinesisClient; - @NonNull + @NonNull @Getter private final StreamIdentifier streamIdentifier; @NonNull private final String shardId; @@ -71,6 +71,7 @@ public class KinesisDataFetcher { @NonNull private final MetricsFactory metricsFactory; private final Duration maxFutureWait; + private final String streamAndShardId; @Deprecated public KinesisDataFetcher(KinesisAsyncClient kinesisClient, String streamName, String shardId, int maxRecords, MetricsFactory metricsFactory) { @@ -93,6 +94,7 @@ public class KinesisDataFetcher { this.maxRecords = maxRecords; this.metricsFactory = metricsFactory; this.maxFutureWait = maxFutureWait; + this.streamAndShardId = streamIdentifier.serialize() + ":" + shardId; } /** Note: This method has package level access for testing purposes. @@ -120,7 +122,7 @@ public class KinesisDataFetcher { try { return new AdvancingResult(getRecords(nextIterator)); } catch (ResourceNotFoundException e) { - log.info("Caught ResourceNotFoundException when fetching records for shard {}", shardId); + log.info("Caught ResourceNotFoundException when fetching records for shard {}", streamAndShardId); return TERMINAL_RESULT; } } else { @@ -182,14 +184,14 @@ public class KinesisDataFetcher { */ public void initialize(final String initialCheckpoint, final InitialPositionInStreamExtended initialPositionInStream) { - log.info("Initializing shard {} with {}", shardId, initialCheckpoint); + log.info("Initializing shard {} with {}", streamAndShardId, initialCheckpoint); advanceIteratorTo(initialCheckpoint, initialPositionInStream); isInitialized = true; } public void initialize(final ExtendedSequenceNumber initialCheckpoint, final InitialPositionInStreamExtended initialPositionInStream) { - log.info("Initializing shard {} with {}", shardId, initialCheckpoint.sequenceNumber()); + log.info("Initializing shard {} with {}", streamAndShardId, initialCheckpoint.sequenceNumber()); advanceIteratorTo(initialCheckpoint.sequenceNumber(), initialPositionInStream); isInitialized = true; } @@ -234,7 +236,7 @@ public class KinesisDataFetcher { throw new RetryableRetrievalException(e.getMessage(), e); } } catch (ResourceNotFoundException e) { - log.info("Caught ResourceNotFoundException when getting an iterator for shard {}", shardId, e); + log.info("Caught ResourceNotFoundException when getting an iterator for shard {}", streamAndShardId, e); nextIterator = null; } finally { MetricsUtil.addSuccessAndLatency(metricsScope, String.format("%s.%s", METRICS_PREFIX, "getShardIterator"), @@ -285,7 +287,7 @@ public class KinesisDataFetcher { throw exceptionManager.apply(e.getCause()); } catch (InterruptedException e) { // TODO: Check behavior - log.debug("Interrupt called on metod, shutdown initiated"); + log.debug("{} : Interrupt called on method, shutdown initiated", streamAndShardId); throw new RuntimeException(e); } catch (TimeoutException e) { throw new RetryableRetrievalException(e.getMessage(), e); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java index dcd5e043..ba8aa117 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java @@ -91,7 +91,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { private final DefaultGetRecordsCacheDaemon defaultGetRecordsCacheDaemon; private boolean started = false; private final String operation; - private final String shardId; + private final String streamAndShardId; private Subscriber subscriber; @VisibleForTesting @Getter private final PublisherSession publisherSession; @@ -135,11 +135,11 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { // Handle records delivery ack and execute nextEventDispatchAction. // This method is not thread-safe and needs to be called after acquiring a monitor. - void handleRecordsDeliveryAck(RecordsDeliveryAck recordsDeliveryAck, String shardId, Runnable nextEventDispatchAction) { + void handleRecordsDeliveryAck(RecordsDeliveryAck recordsDeliveryAck, String streamAndShardId, Runnable nextEventDispatchAction) { final PrefetchRecordsRetrieved recordsToCheck = peekNextRecord(); // Verify if the ack matches the head of the queue and evict it. if (recordsToCheck != null && recordsToCheck.batchUniqueIdentifier().equals(recordsDeliveryAck.batchUniqueIdentifier())) { - evictPublishedRecordAndUpdateDemand(shardId); + evictPublishedRecordAndUpdateDemand(streamAndShardId); nextEventDispatchAction.run(); } else { // Log and ignore any other ack received. As long as an ack is received for head of the queue @@ -148,21 +148,21 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { final BatchUniqueIdentifier peekedBatchUniqueIdentifier = recordsToCheck == null ? null : recordsToCheck.batchUniqueIdentifier(); log.info("{} : Received a stale notification with id {} instead of expected id {} at {}. Will ignore.", - shardId, recordsDeliveryAck.batchUniqueIdentifier(), peekedBatchUniqueIdentifier, Instant.now()); + streamAndShardId, recordsDeliveryAck.batchUniqueIdentifier(), peekedBatchUniqueIdentifier, Instant.now()); } } // Evict the published record from the prefetch queue. // This method is not thread-safe and needs to be called after acquiring a monitor. @VisibleForTesting - RecordsRetrieved evictPublishedRecordAndUpdateDemand(String shardId) { + RecordsRetrieved evictPublishedRecordAndUpdateDemand(String streamAndShardId) { final PrefetchRecordsRetrieved result = prefetchRecordsQueue.poll(); if (result != null) { updateDemandTrackersOnPublish(result); } else { log.info( "{}: No record batch found while evicting from the prefetch queue. This indicates the prefetch buffer" - + "was reset.", shardId); + + "was reset.", streamAndShardId); } return result; } @@ -222,7 +222,8 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { this.defaultGetRecordsCacheDaemon = new DefaultGetRecordsCacheDaemon(); Validate.notEmpty(operation, "Operation cannot be empty"); this.operation = operation; - this.shardId = shardId; + this.streamAndShardId = + this.getRecordsRetrievalStrategy.getDataFetcher().getStreamIdentifier().serialize() + ":" + shardId; } @Override @@ -234,7 +235,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { publisherSession.init(extendedSequenceNumber, initialPositionInStreamExtended); if (!started) { - log.info("{} : Starting prefetching thread.", shardId); + log.info("{} : Starting prefetching thread.", streamAndShardId); executorService.execute(defaultGetRecordsCacheDaemon); } started = true; @@ -304,9 +305,9 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { @Override public synchronized void notify(RecordsDeliveryAck recordsDeliveryAck) { - publisherSession.handleRecordsDeliveryAck(recordsDeliveryAck, shardId, () -> drainQueueForRequests()); + publisherSession.handleRecordsDeliveryAck(recordsDeliveryAck, streamAndShardId, () -> drainQueueForRequests()); // Take action based on the time spent by the event in queue. - takeDelayedDeliveryActionIfRequired(shardId, lastEventDeliveryTime, log); + takeDelayedDeliveryActionIfRequired(streamAndShardId, lastEventDeliveryTime, log); } // Note : Do not make this method synchronous as notify() will not be able to evict any entry from the queue. @@ -403,7 +404,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { public void run() { while (!isShutdown) { if (Thread.currentThread().isInterrupted()) { - log.warn("{} : Prefetch thread was interrupted.", shardId); + log.warn("{} : Prefetch thread was interrupted.", streamAndShardId); break; } @@ -411,7 +412,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { try { makeRetrievalAttempt(); } catch(PositionResetException pre) { - log.debug("{} : Position was reset while attempting to add item to queue.", shardId); + log.debug("{} : Position was reset while attempting to add item to queue.", streamAndShardId); } finally { resetLock.readLock().unlock(); } @@ -447,23 +448,23 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { } catch (PositionResetException pse) { throw pse; } catch (RetryableRetrievalException rre) { - log.info("{} : Timeout occurred while waiting for response from Kinesis. Will retry the request.", shardId); + log.info("{} : Timeout occurred while waiting for response from Kinesis. Will retry the request.", streamAndShardId); } catch (InterruptedException e) { - log.info("{} : Thread was interrupted, indicating shutdown was called on the cache.", shardId); + log.info("{} : Thread was interrupted, indicating shutdown was called on the cache.", streamAndShardId); } catch (ExpiredIteratorException e) { log.info("{} : records threw ExpiredIteratorException - restarting" - + " after greatest seqNum passed to customer", shardId, e); + + " after greatest seqNum passed to customer", streamAndShardId, e); scope.addData(EXPIRED_ITERATOR_METRIC, 1, StandardUnit.COUNT, MetricsLevel.SUMMARY); publisherSession.dataFetcher().restartIterator(); } catch (SdkException e) { - log.error("{} : Exception thrown while fetching records from Kinesis", shardId, e); + log.error("{} : Exception thrown while fetching records from Kinesis", streamAndShardId, e); } catch (Throwable e) { log.error("{} : Unexpected exception was thrown. This could probably be an issue or a bug." + " Please search for the exception/error online to check what is going on. If the " + "issue persists or is a recurring problem, feel free to open an issue on, " + - "https://github.com/awslabs/amazon-kinesis-client.", shardId, e); + "https://github.com/awslabs/amazon-kinesis-client.", streamAndShardId, e); } finally { MetricsUtil.endScope(scope); } @@ -475,7 +476,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { publisherSession.prefetchCounters().waitForConsumer(); } catch (InterruptedException ie) { log.info("{} : Thread was interrupted while waiting for the consumer. " + - "Shutdown has probably been started", shardId); + "Shutdown has probably been started", streamAndShardId); } } } @@ -522,14 +523,14 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { public synchronized void waitForConsumer() throws InterruptedException { if (!shouldGetNewRecords()) { - log.debug("{} : Queue is full waiting for consumer for {} ms", shardId, idleMillisBetweenCalls); + log.debug("{} : Queue is full waiting for consumer for {} ms", streamAndShardId, idleMillisBetweenCalls); this.wait(idleMillisBetweenCalls); } } public synchronized boolean shouldGetNewRecords() { if (log.isDebugEnabled()) { - log.debug("{} : Current Prefetch Counter States: {}", shardId, this.toString()); + log.debug("{} : Current Prefetch Counter States: {}", streamAndShardId, this.toString()); } return size < maxRecordsCount && byteSize < maxByteSize; } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java index 812b05df..fd6b531b 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java @@ -40,14 +40,20 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.RejectedExecutionException; import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; +import com.google.common.base.Joiner; +import com.google.common.collect.Sets; import io.reactivex.plugins.RxJavaPlugins; import lombok.RequiredArgsConstructor; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -57,6 +63,8 @@ import org.mockito.runners.MockitoJUnitRunner; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.utils.Either; +import software.amazon.awssdk.utils.Validate; import software.amazon.kinesis.checkpoint.Checkpoint; import software.amazon.kinesis.checkpoint.CheckpointConfig; import software.amazon.kinesis.checkpoint.CheckpointFactory; @@ -75,6 +83,7 @@ import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.ShardSyncTaskManager; import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseRefresher; import software.amazon.kinesis.leases.exceptions.DependencyException; +import software.amazon.kinesis.leases.exceptions.InvalidStateException; import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; import software.amazon.kinesis.lifecycle.LifecycleConfig; import software.amazon.kinesis.lifecycle.ShardConsumer; @@ -178,6 +187,7 @@ public class SchedulerTest { when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector); when(shardSyncTaskManager.executeShardSyncTask()).thenReturn(new TaskResult(null)); when(retrievalFactory.createGetRecordsCache(any(ShardInfo.class), any(MetricsFactory.class))).thenReturn(recordsPublisher); + when(shardDetector.streamIdentifier()).thenReturn(mock(StreamIdentifier.class)); scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, metricsConfig, processorConfig, retrievalConfig); @@ -380,6 +390,115 @@ public class SchedulerTest { } + @Test + public final void testMultiStreamNoStreamsAreSyncedWhenStreamsAreNotRefreshed() + throws DependencyException, ProvisionedThroughputException, InvalidStateException { + List streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig( + StreamIdentifier.multiStreamInstance( + Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)), + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST))) + .collect(Collectors.toCollection(LinkedList::new)); + List streamConfigList2 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig( + StreamIdentifier.multiStreamInstance( + Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)), + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST))) + .collect(Collectors.toCollection(LinkedList::new)); + retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName) + .retrievalFactory(retrievalFactory); + when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList1, streamConfigList2); + scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, + metricsConfig, processorConfig, retrievalConfig)); + when(scheduler.shouldSyncStreamsNow()).thenReturn(true); + Set syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases(); + Assert.assertTrue("SyncedStreams should be empty", syncedStreams.isEmpty()); + Assert.assertEquals(new HashSet(streamConfigList1), new HashSet(scheduler.currentStreamConfigMap().values())); + } + + @Test + public final void testMultiStreamOnlyNewStreamsAreSynced() + throws DependencyException, ProvisionedThroughputException, InvalidStateException { + List streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig( + StreamIdentifier.multiStreamInstance( + Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)), + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST))) + .collect(Collectors.toCollection(LinkedList::new)); + List streamConfigList2 = IntStream.range(1, 7).mapToObj(streamId -> new StreamConfig( + StreamIdentifier.multiStreamInstance( + Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)), + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST))) + .collect(Collectors.toCollection(LinkedList::new)); + retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName) + .retrievalFactory(retrievalFactory); + when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList1, streamConfigList2); + scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, + metricsConfig, processorConfig, retrievalConfig)); + when(scheduler.shouldSyncStreamsNow()).thenReturn(true); + Set syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases(); + Set expectedSyncedStreams = IntStream.range(5, 7).mapToObj(streamId -> StreamIdentifier.multiStreamInstance( + Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))).collect( + Collectors.toCollection(HashSet::new)); + Assert.assertEquals(expectedSyncedStreams, syncedStreams); + Assert.assertEquals(Sets.newHashSet(streamConfigList2), + Sets.newHashSet(scheduler.currentStreamConfigMap().values())); + } + + @Test + public final void testMultiStreamOnlyStaleStreamsAreSynced() + throws DependencyException, ProvisionedThroughputException, InvalidStateException { + List streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig( + StreamIdentifier.multiStreamInstance( + Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)), + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST))) + .collect(Collectors.toCollection(LinkedList::new)); + List streamConfigList2 = IntStream.range(3, 5).mapToObj(streamId -> new StreamConfig( + StreamIdentifier.multiStreamInstance( + Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)), + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST))) + .collect(Collectors.toCollection(LinkedList::new)); + retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName) + .retrievalFactory(retrievalFactory); + when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList1, streamConfigList2); + scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, + metricsConfig, processorConfig, retrievalConfig)); + when(scheduler.shouldSyncStreamsNow()).thenReturn(true); + Set syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases(); + Set expectedSyncedStreams = IntStream.range(1, 3).mapToObj(streamId -> StreamIdentifier.multiStreamInstance( + Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))).collect( + Collectors.toCollection(HashSet::new)); + Assert.assertEquals(expectedSyncedStreams, syncedStreams); + Assert.assertEquals(Sets.newHashSet(streamConfigList2), + Sets.newHashSet(scheduler.currentStreamConfigMap().values())); + } + + @Test + public final void testMultiStreamSyncOnlyNewAndStaleStreamsAreSynced() + throws DependencyException, ProvisionedThroughputException, InvalidStateException { + List streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig( + StreamIdentifier.multiStreamInstance( + Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)), + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST))) + .collect(Collectors.toCollection(LinkedList::new)); + List streamConfigList2 = IntStream.range(3, 7).mapToObj(streamId -> new StreamConfig( + StreamIdentifier.multiStreamInstance( + Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)), + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST))) + .collect(Collectors.toCollection(LinkedList::new)); + retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName) + .retrievalFactory(retrievalFactory); + when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList1, streamConfigList2); + scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, + metricsConfig, processorConfig, retrievalConfig)); + when(scheduler.shouldSyncStreamsNow()).thenReturn(true); + Set syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases(); + Set expectedSyncedStreams = IntStream.concat(IntStream.range(1, 3), IntStream.range(5, 7)) + .mapToObj(streamId -> StreamIdentifier.multiStreamInstance( + Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))) + .collect(Collectors.toCollection(HashSet::new)); + Assert.assertEquals(expectedSyncedStreams, syncedStreams); + Assert.assertEquals(Sets.newHashSet(streamConfigList2), + Sets.newHashSet(scheduler.currentStreamConfigMap().values())); + } + @Test public final void testInitializationWaitsWhenLeaseTableIsEmpty() throws Exception { final int maxInitializationAttempts = 1; @@ -685,6 +804,7 @@ public class SchedulerTest { shardSyncTaskManagerMap.put(streamConfig.streamIdentifier(), shardSyncTaskManager); shardDetectorMap.put(streamConfig.streamIdentifier(), shardDetector); when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector); + when(shardDetector.streamIdentifier()).thenReturn(streamConfig.streamIdentifier()); when(shardSyncTaskManager.executeShardSyncTask()).thenReturn(new TaskResult(null)); if(shardSyncFirstAttemptFailure) { when(shardDetector.listShards()) diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java index d881b776..30e4f081 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java @@ -103,6 +103,7 @@ public class HierarchicalShardSyncerTest { @Before public void setup() { hierarchicalShardSyncer = new HierarchicalShardSyncer(); + when(shardDetector.streamIdentifier()).thenReturn(StreamIdentifier.singleStreamInstance("stream")); } private void setupMultiStream() { @@ -1196,16 +1197,16 @@ public class HierarchicalShardSyncerTest { } } -// /** -// * Test CheckIfDescendantAndAddNewLeasesForAncestors (initial position Latest) -// * Shard structure (each level depicts a stream segment): -// * 0 1 2 3 4 5- shards till epoch 102 -// * \ / \ / | | -// * 6 7 4 5- shards from epoch 103 - 205 -// * \ / | /\ -// * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) -// * Current leases: (4, 5, 7) -// */ + /** + * Test CheckIfDescendantAndAddNewLeasesForAncestors (initial position Latest) + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Current leases: (4, 5, 7) + */ @Test public void testDetermineNewLeasesToCreateSplitMergeLatest2() { final List shards = constructShardListForGraphA(); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java index 23fb5dad..5d8e302f 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java @@ -119,10 +119,10 @@ public class ConsumerStatesTest { maxListShardsRetryAttempts, shouldCallProcessRecordsEvenForEmptyRecordList, idleTimeInMillis, INITIAL_POSITION_IN_STREAM, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, shardDetector, new AggregatorUtil(), hierarchicalShardSyncer, metricsFactory); + when(shardInfo.shardId()).thenReturn("shardId-000000000000"); + when(shardInfo.streamIdentifierSerOpt()).thenReturn(Optional.of(StreamIdentifier.singleStreamInstance(STREAM_NAME).serialize())); consumer = spy(new ShardConsumer(recordsPublisher, executorService, shardInfo, logWarningForTaskAfterMillis, argument, taskExecutionListener, 0)); - - when(shardInfo.shardId()).thenReturn("shardId-000000000000"); when(recordProcessorCheckpointer.checkpointer()).thenReturn(checkpointer); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java index f5772aaf..a28ded63 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java @@ -76,6 +76,7 @@ import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; import software.amazon.awssdk.services.kinesis.model.Record; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.RequestDetails; +import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.lifecycle.ShardConsumerNotifyingSubscriber; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.metrics.NullMetricsFactory; @@ -120,7 +121,7 @@ public class PrefetchRecordsPublisherTest { @Before public void setup() { when(getRecordsRetrievalStrategy.getDataFetcher()).thenReturn(dataFetcher); - + when(dataFetcher.getStreamIdentifier()).thenReturn(StreamIdentifier.singleStreamInstance("testStream")); executorService = spy(Executors.newFixedThreadPool(1)); getRecordsCache = new PrefetchRecordsPublisher( MAX_SIZE, diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/RecordsFetcherFactoryTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/RecordsFetcherFactoryTest.java index 81ad5b6d..d6d8b6d5 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/RecordsFetcherFactoryTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/RecordsFetcherFactoryTest.java @@ -16,6 +16,7 @@ package software.amazon.kinesis.retrieval.polling; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Mockito.when; import org.junit.Before; import org.junit.Ignore; @@ -23,6 +24,7 @@ import org.junit.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; +import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.retrieval.DataFetchingStrategy; import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; @@ -37,11 +39,15 @@ public class RecordsFetcherFactoryTest { private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; @Mock private MetricsFactory metricsFactory; + @Mock + private KinesisDataFetcher kinesisDataFetcher; @Before public void setUp() { MockitoAnnotations.initMocks(this); recordsFetcherFactory = new SimpleRecordsFetcherFactory(); + when(getRecordsRetrievalStrategy.getDataFetcher()).thenReturn(kinesisDataFetcher); + when(kinesisDataFetcher.getStreamIdentifier()).thenReturn(StreamIdentifier.singleStreamInstance("stream")); } @Test diff --git a/pom.xml b/pom.xml index bdd2d0c2..f21b230e 100644 --- a/pom.xml +++ b/pom.xml @@ -33,7 +33,7 @@ - 2.10.66 + 2.11.8-SNAPSHOT