diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index b6b7fab3..fffc88a6 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -16,8 +16,11 @@ package software.amazon.kinesis.coordinator; import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; @@ -26,6 +29,8 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; @@ -39,9 +44,11 @@ import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.Validate; import software.amazon.awssdk.utils.CollectionUtils; +import software.amazon.awssdk.utils.Either; import software.amazon.kinesis.checkpoint.CheckpointConfig; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.common.StreamConfig; import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseManagementConfig; @@ -107,22 +114,21 @@ public class Scheduler implements Runnable { private final DiagnosticEventHandler diagnosticEventHandler; // private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; private final LeaseCoordinator leaseCoordinator; -// private final ShardSyncTaskManager shardSyncTaskManager; + private final Function shardSyncTaskManagerProvider; + private final Map streamToShardSyncTaskManagerMap = new HashMap<>(); private final ShardPrioritization shardPrioritization; private final boolean cleanupLeasesUponShardCompletion; private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist; private final GracefulShutdownCoordinator gracefulShutdownCoordinator; private final WorkerStateChangeListener workerStateChangeListener; - private final InitialPositionInStreamExtended initialPosition; private final MetricsFactory metricsFactory; private final long failoverTimeMillis; private final long taskBackoffTimeMillis; - private final List listOfStreams; - private final MultiStreamTracker multiStreamTracker; + private final Either applicationStreamTracker; private final long listShardsBackoffTimeMillis; private final int maxListShardsRetryAttempts; private final LeaseRefresher leaseRefresher; -// private final ShardDetector shardDetector; + private final Function shardDetectorProvider; private final boolean ignoreUnexpetedChildShards; private final AggregatorUtil aggregatorUtil; private final HierarchicalShardSyncer hierarchicalShardSyncer; @@ -178,15 +184,17 @@ public class Scheduler implements Runnable { this.retrievalConfig = retrievalConfig; this.applicationName = this.coordinatorConfig.applicationName(); - this.multiStreamTracker = this.retrievalConfig.multiStreamTracker(); - this.listOfStreams = this.multiStreamTracker == null ? - ImmutableList.of(this.retrievalConfig.streamName()) : - this.multiStreamTracker.listStreamsToProcess(); - Validate.isTrue(!CollectionUtils.isNullOrEmpty(this.listOfStreams), "No stream configured to process."); + final MultiStreamTracker multiStreamTracker = this.retrievalConfig.multiStreamTracker(); + if(multiStreamTracker == null) { + this.applicationStreamTracker = Either.right(new StreamConfig(this.retrievalConfig.streamName(), + this.retrievalConfig.initialPositionInStreamExtended())); + } else { + this.applicationStreamTracker = Either.left(multiStreamTracker); + } this.maxInitializationAttempts = this.coordinatorConfig.maxInitializationAttempts(); this.metricsFactory = this.metricsConfig.metricsFactory(); - // Determine leaseSerializer based on MultiStreamTracker - final LeaseSerializer leaseSerializer = this.multiStreamTracker == null ? + // Determine leaseSerializer based on availability of MultiStreamTracker. + final LeaseSerializer leaseSerializer = this.applicationStreamTracker.map(mst -> true, sc -> false) ? new DynamoDBMultiStreamLeaseSerializer() : new DynamoDBLeaseSerializer(); this.leaseCoordinator = this.leaseManagementConfig.leaseManagementFactory(leaseSerializer) @@ -207,9 +215,11 @@ public class Scheduler implements Runnable { this.executorService = this.coordinatorConfig.coordinatorFactory().createExecutorService(); this.diagnosticEventFactory = diagnosticEventFactory; this.diagnosticEventHandler = new DiagnosticEventLogger(); - -// this.shardSyncTaskManager = this.leaseManagementConfig.leaseManagementFactory() -// .createShardSyncTaskManager(this.metricsFactory); + // TODO : Halo : Handle case of no StreamConfig present in streamConfigMap() for the supplied streamName. + // TODO : Pass the immutable map here instead of using mst.streamConfigMap() + this.shardSyncTaskManagerProvider = streamName -> this.leaseManagementConfig + .leaseManagementFactory(leaseSerializer).createShardSyncTaskManager(this.metricsFactory, + applicationStreamTracker.map(mst -> mst.streamConfigMap().get(streamName), sc -> sc)); this.shardPrioritization = this.coordinatorConfig.shardPrioritization(); this.cleanupLeasesUponShardCompletion = this.leaseManagementConfig.cleanupLeasesUponShardCompletion(); this.skipShardSyncAtWorkerInitializationIfLeasesExist = @@ -226,14 +236,13 @@ public class Scheduler implements Runnable { this.workerStateChangeListener = this.coordinatorConfig.coordinatorFactory() .createWorkerStateChangeListener(); } - this.initialPosition = retrievalConfig.initialPositionInStreamExtended(); this.failoverTimeMillis = this.leaseManagementConfig.failoverTimeMillis(); this.taskBackoffTimeMillis = this.lifecycleConfig.taskBackoffTimeMillis(); // this.retryGetRecordsInSeconds = this.retrievalConfig.retryGetRecordsInSeconds(); // this.maxGetRecordsThreadPool = this.retrievalConfig.maxGetRecordsThreadPool(); this.listShardsBackoffTimeMillis = this.retrievalConfig.listShardsBackoffTimeInMillis(); this.maxListShardsRetryAttempts = this.retrievalConfig.maxListShardsRetryAttempts(); -// this.shardDetector = this.shardSyncTaskManager.shardDetector(); + this.shardDetectorProvider = streamName -> createOrGetShardSyncTaskManager(streamName).shardDetector(); this.ignoreUnexpetedChildShards = this.leaseManagementConfig.ignoreUnexpectedChildShards(); this.aggregatorUtil = this.lifecycleConfig.aggregatorUtil(); this.hierarchicalShardSyncer = leaseManagementConfig.hierarchicalShardSyncer(); @@ -284,10 +293,14 @@ public class Scheduler implements Runnable { if (!skipShardSyncAtWorkerInitializationIfLeasesExist || leaseRefresher.isLeaseTableEmpty()) { // TODO: Resume the shard sync from failed stream in the next attempt, to avoid syncing // TODO: for already synced streams - for(String streamName : this.listOfStreams) { + final Map streamConfigMap = applicationStreamTracker + .map(mst -> mst.streamConfigMap(), sc -> Collections.singletonMap(sc.streamName(), sc)); + for(String streamName : streamConfigMap.keySet().stream().collect(Collectors.toList())) { log.info("Syncing Kinesis shard info"); - ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetector, leaseRefresher, - initialPosition, cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L, + final StreamConfig streamConfig = streamConfigMap.get(streamName); + ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetectorProvider.apply(streamName), + leaseRefresher, streamConfig.initialPositionInStreamExtended(), + cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L, hierarchicalShardSyncer, metricsFactory); result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call(); // Throwing the exception, to prevent further syncs for other stream. @@ -608,6 +621,10 @@ public class Scheduler implements Runnable { return consumer; } + private ShardSyncTaskManager createOrGetShardSyncTaskManager(String streamName) { + return streamToShardSyncTaskManagerMap.computeIfAbsent(streamName, s -> shardSyncTaskManagerProvider.apply(s)); + } + protected ShardConsumer buildConsumer(@NonNull final ShardInfo shardInfo, @NonNull final ShardRecordProcessorFactory shardRecordProcessorFactory) { RecordsPublisher cache = retrievalConfig.retrievalFactory().createGetRecordsCache(shardInfo, metricsFactory); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardInfo.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardInfo.java index 75070d8c..6a6c73b7 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardInfo.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardInfo.java @@ -18,6 +18,7 @@ import java.util.Collection; import java.util.Collections; import java.util.LinkedList; import java.util.List; +import java.util.Optional; import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; @@ -36,7 +37,7 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; @ToString public class ShardInfo { - private final String streamName; + private final Optional streamName; private final String shardId; private final String concurrencyToken; // Sorted list of parent shardIds. @@ -77,7 +78,7 @@ public class ShardInfo { // This makes it easy to check for equality in ShardInfo.equals method. Collections.sort(this.parentShardIds); this.checkpoint = checkpoint; - this.streamName = streamName; + this.streamName = Optional.ofNullable(streamName); } /** diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java index 01cd6683..2252da84 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java @@ -379,13 +379,7 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator { return leases.stream().map(DynamoDBLeaseCoordinator::convertLeaseToAssignment).collect(Collectors.toList()); } - // TODO : Halo : Reenable for backward compatibility -// public static ShardInfo convertLeaseToAssignment(final Lease lease) { -// return new ShardInfo(lease.leaseKey(), lease.concurrencyToken().toString(), lease.parentShardIds(), -// lease.checkpoint()); -// } - - // TODO : Support Shard + // TODO : Halo : Check for better way public static ShardInfo convertLeaseToAssignment(final Lease lease) { if (lease instanceof MultiStreamLease) { return new ShardInfo(((MultiStreamLease) lease).shardId(), lease.concurrencyToken().toString(), lease.parentShardIds(), @@ -394,7 +388,6 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator { return new ShardInfo(lease.leaseKey(), lease.concurrencyToken().toString(), lease.parentShardIds(), lease.checkpoint()); } - } /** diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/MultiStreamTracker.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/MultiStreamTracker.java index ac9e053b..fae7e4cb 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/MultiStreamTracker.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/MultiStreamTracker.java @@ -1,8 +1,8 @@ package software.amazon.kinesis.processor; -import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.common.StreamConfig; -import java.util.List; +import java.util.Map; /** * Interface for stream trackers. This is useful for KCL Workers that need @@ -11,17 +11,9 @@ import java.util.List; public interface MultiStreamTracker { /** - * Returns the list of streams that the Worker should consume data from. + * Returns the map of streams and its associated stream specific config. * * @return List of stream names */ - List listStreamsToProcess(); - - /** - * Returns the initial position in stream to read from, for the given stream. - * @param streamName - * @return Initial position to read from, for the given stream - */ - InitialPositionInStreamExtended initialPositionInStreamExtended(String streamName); - + Map streamConfigMap(); }