From 8be8d7a62b347d931352d76df62f501cdf21681e Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Thu, 12 Mar 2020 14:15:36 -0700 Subject: [PATCH] Adding account and stream epoch support. Checkpoint 3 --- .../amazon/kinesis/common/ConfigsBuilder.java | 66 +++++++++++++++---- .../amazon/kinesis/common/StreamConfig.java | 7 +- .../kinesis/common/StreamIdentifier.java | 22 +++++-- .../amazon/kinesis/coordinator/Scheduler.java | 38 +++++------ .../kinesis/leases/MultiStreamLease.java | 16 ++--- .../kinesis/retrieval/RetrievalConfig.java | 50 ++++++++++---- 6 files changed, 130 insertions(+), 69 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/ConfigsBuilder.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/ConfigsBuilder.java index 7026e34c..9595fdf9 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/ConfigsBuilder.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/ConfigsBuilder.java @@ -15,14 +15,18 @@ package software.amazon.kinesis.common; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; import org.apache.commons.lang3.StringUtils; -import lombok.Data; import lombok.NonNull; import lombok.experimental.Accessors; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.utils.Either; import software.amazon.kinesis.checkpoint.CheckpointConfig; import software.amazon.kinesis.coordinator.CoordinatorConfig; import software.amazon.kinesis.leases.LeaseManagementConfig; @@ -36,14 +40,15 @@ import software.amazon.kinesis.retrieval.RetrievalConfig; /** * This Builder is useful to create all configurations for the KCL with default values. */ -@Data +@Getter @Setter @ToString @EqualsAndHashCode @Accessors(fluent = true) public class ConfigsBuilder { /** - * Name of the stream to consume records from + * Either the name of the stream to consume records from + * Or MultiStreamTracker for all the streams to consume records from */ - @NonNull - private final String streamName; + private Either appStreamTracker; + /** * Application name for the KCL Worker */ @@ -109,7 +114,45 @@ public class ConfigsBuilder { return namespace; } - private MultiStreamTracker multiStreamTracker; + /** + * Constructor to initialize ConfigsBuilder with StreamName + * @param streamName + * @param applicationName + * @param kinesisClient + * @param dynamoDBClient + * @param cloudWatchClient + * @param workerIdentifier + * @param shardRecordProcessorFactory + */ + public ConfigsBuilder(@NonNull String streamName, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { + this.appStreamTracker = Either.right(streamName); + this.applicationName = applicationName; + this.kinesisClient = kinesisClient; + this.dynamoDBClient = dynamoDBClient; + this.cloudWatchClient = cloudWatchClient; + this.workerIdentifier = workerIdentifier; + this.shardRecordProcessorFactory = shardRecordProcessorFactory; + } + + /** + * Constructor to initialize ConfigsBuilder with MultiStreamTracker + * @param multiStreamTracker + * @param applicationName + * @param kinesisClient + * @param dynamoDBClient + * @param cloudWatchClient + * @param workerIdentifier + * @param shardRecordProcessorFactory + */ + public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { + this.appStreamTracker = Either.left(multiStreamTracker); + this.applicationName = applicationName; + this.kinesisClient = kinesisClient; + this.dynamoDBClient = dynamoDBClient; + this.cloudWatchClient = cloudWatchClient; + this.workerIdentifier = workerIdentifier; + this.shardRecordProcessorFactory = shardRecordProcessorFactory; + } /** * Creates a new instance of CheckpointConfig @@ -135,8 +178,7 @@ public class ConfigsBuilder { * @return LeaseManagementConfig */ public LeaseManagementConfig leaseManagementConfig() { - return new LeaseManagementConfig(tableName(), dynamoDBClient(), kinesisClient(), streamName(), - workerIdentifier()); + return new LeaseManagementConfig(tableName(), dynamoDBClient(), kinesisClient(), workerIdentifier()); } /** @@ -173,10 +215,10 @@ public class ConfigsBuilder { * @return RetrievalConfig */ public RetrievalConfig retrievalConfig() { - final RetrievalConfig retrievalConfig = new RetrievalConfig(kinesisClient(), streamName(), applicationName()); - if(this.multiStreamTracker != null) { - retrievalConfig.multiStreamTracker(multiStreamTracker); - } + final RetrievalConfig retrievalConfig = + appStreamTracker.map( + multiStreamTracker -> new RetrievalConfig(kinesisClient(), multiStreamTracker, applicationName()), + streamName -> new RetrievalConfig(kinesisClient(), streamName, applicationName())); return retrievalConfig; } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamConfig.java index 667f1f1c..999182b6 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamConfig.java @@ -1,13 +1,10 @@ package software.amazon.kinesis.common; -import lombok.AccessLevel; -import lombok.Data; +import lombok.Value; import lombok.experimental.Accessors; -import lombok.experimental.FieldDefaults; -@Data +@Value @Accessors(fluent = true) -@FieldDefaults(makeFinal=true, level= AccessLevel.PRIVATE) public class StreamConfig { StreamIdentifier streamIdentifier; InitialPositionInStreamExtended initialPositionInStreamExtended; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java index 5b1f9977..2435b5bf 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java @@ -1,11 +1,14 @@ package software.amazon.kinesis.common; +import com.google.common.base.Joiner; import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.experimental.Accessors; import software.amazon.awssdk.utils.Validate; +import java.util.regex.Pattern; + @RequiredArgsConstructor @EqualsAndHashCode @Getter @@ -13,23 +16,28 @@ import software.amazon.awssdk.utils.Validate; public class StreamIdentifier { private final String accountName; private final String streamName; - private final String streamCreationEpoch; + private final Long streamCreationEpoch; - private static final String DEFAULT = "default"; + private static final String DEFAULT_ACCOUNT = "default"; + private static final String DELIMITER = ":"; + private static final Pattern PATTERN = Pattern.compile(".*" + ":" + ".*" + ":" + "[0-9]*"); @Override public String toString(){ - return accountName + ":" + streamName + ":" + streamCreationEpoch; + return Joiner.on(DELIMITER).join(accountName, streamName, streamCreationEpoch); } public static StreamIdentifier fromString(String streamIdentifier) { - final String[] idTokens = streamIdentifier.split(":"); - Validate.isTrue(idTokens.length == 3, "Unable to deserialize StreamIdentifier from " + streamIdentifier); - return new StreamIdentifier(idTokens[0], idTokens[1], idTokens[2]); + if (PATTERN.matcher(streamIdentifier).matches()) { + final String[] split = streamIdentifier.split(DELIMITER); + return new StreamIdentifier(split[0], split[1], Long.parseLong(split[2])); + } else { + throw new IllegalArgumentException("Unable to deserialize StreamIdentifier from " + streamIdentifier); + } } public static StreamIdentifier fromStreamName(String streamName) { Validate.notEmpty(streamName, "StreamName should not be empty"); - return new StreamIdentifier(DEFAULT, streamName, DEFAULT); + return new StreamIdentifier(DEFAULT_ACCOUNT, streamName, 0L); } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index c94981b9..1e95f2e6 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -16,6 +16,7 @@ package software.amazon.kinesis.coordinator; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -77,7 +78,6 @@ import software.amazon.kinesis.processor.Checkpointer; import software.amazon.kinesis.processor.ProcessorConfig; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; import software.amazon.kinesis.processor.ShutdownNotificationAware; -import software.amazon.kinesis.processor.MultiStreamTracker; import software.amazon.kinesis.retrieval.AggregatorUtil; import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RetrievalConfig; @@ -122,7 +122,7 @@ public class Scheduler implements Runnable { private final MetricsFactory metricsFactory; private final long failoverTimeMillis; private final long taskBackoffTimeMillis; - private final Either appStreamTracker; + private final boolean isMultiStreamMode; private final Map currentStreamConfigMap; private final long listShardsBackoffTimeMillis; private final int maxListShardsRetryAttempts; @@ -183,27 +183,22 @@ public class Scheduler implements Runnable { this.retrievalConfig = retrievalConfig; this.applicationName = this.coordinatorConfig.applicationName(); - final MultiStreamTracker multiStreamTracker = this.retrievalConfig.multiStreamTracker(); - if(multiStreamTracker == null) { - final StreamConfig streamConfig = new StreamConfig(StreamIdentifier.fromStreamName(this.retrievalConfig.streamName()), - this.retrievalConfig.initialPositionInStreamExtended()); - this.appStreamTracker = Either.right(streamConfig); - this.currentStreamConfigMap = new HashMap() {{ - put(streamConfig.streamIdentifier(), streamConfig); - }}; - } else { - this.appStreamTracker = Either.left(multiStreamTracker); - this.currentStreamConfigMap = multiStreamTracker.streamConfigList().stream() - .collect(Collectors.toMap(sc -> sc.streamIdentifier(), sc -> sc)); - } + this.isMultiStreamMode = this.retrievalConfig.appStreamTracker().map( + multiStreamTracker -> true, streamConfig -> false); + this.currentStreamConfigMap = this.retrievalConfig.appStreamTracker().map( + multiStreamTracker -> + multiStreamTracker.streamConfigList().stream() + .collect(Collectors.toMap(sc -> sc.streamIdentifier(), sc -> sc)), + streamConfig -> + Collections.singletonMap(streamConfig.streamIdentifier(), streamConfig)); this.maxInitializationAttempts = this.coordinatorConfig.maxInitializationAttempts(); this.metricsFactory = this.metricsConfig.metricsFactory(); // Determine leaseSerializer based on availability of MultiStreamTracker. - final LeaseSerializer leaseSerializer = this.appStreamTracker.map(mst -> true, sc -> false) ? + final LeaseSerializer leaseSerializer = isMultiStreamMode ? new DynamoDBMultiStreamLeaseSerializer() : new DynamoDBLeaseSerializer(); this.leaseCoordinator = this.leaseManagementConfig - .leaseManagementFactory(leaseSerializer, this.appStreamTracker.map(mst -> true, sc -> false)) + .leaseManagementFactory(leaseSerializer, isMultiStreamMode) .createLeaseCoordinator(this.metricsFactory); this.leaseRefresher = this.leaseCoordinator.leaseRefresher(); @@ -224,7 +219,7 @@ public class Scheduler implements Runnable { // TODO : Halo : Handle case of no StreamConfig present in streamConfigList() for the supplied streamName. // TODO : Pass the immutable map here instead of using mst.streamConfigList() this.shardSyncTaskManagerProvider = streamIdentifier -> this.leaseManagementConfig - .leaseManagementFactory(leaseSerializer, this.appStreamTracker.map(mst -> true, sc -> false)) + .leaseManagementFactory(leaseSerializer, isMultiStreamMode) .createShardSyncTaskManager(this.metricsFactory, this.currentStreamConfigMap.get(streamIdentifier)); this.shardPrioritization = this.coordinatorConfig.shardPrioritization(); this.cleanupLeasesUponShardCompletion = this.leaseManagementConfig.cleanupLeasesUponShardCompletion(); @@ -252,8 +247,7 @@ public class Scheduler implements Runnable { this.ignoreUnexpetedChildShards = this.leaseManagementConfig.ignoreUnexpectedChildShards(); this.aggregatorUtil = this.lifecycleConfig.aggregatorUtil(); // TODO : Halo : Check if this needs to be per stream. - this.hierarchicalShardSyncer = leaseManagementConfig - .hierarchicalShardSyncer(this.appStreamTracker.map(mst -> true, sc -> false)); + this.hierarchicalShardSyncer = leaseManagementConfig.hierarchicalShardSyncer(isMultiStreamMode); this.schedulerInitializationBackoffTimeMillis = this.coordinatorConfig.schedulerInitializationBackoffTimeMillis(); } @@ -643,7 +637,6 @@ public class Scheduler implements Runnable { // get the default stream name for the single stream application. final StreamIdentifier streamIdentifier = getStreamIdentifier(shardInfo.streamIdentifier()); // Irrespective of single stream app or multi stream app, streamConfig should always be available. - // TODO: Halo : if not available, construct a default config ? final StreamConfig streamConfig = currentStreamConfigMap.get(streamIdentifier); Validate.notNull(streamConfig, "StreamConfig should not be empty"); ShardConsumerArgument argument = new ShardConsumerArgument(shardInfo, @@ -721,7 +714,8 @@ public class Scheduler implements Runnable { if(streamIdentifierString.isPresent()) { streamIdentifier = StreamIdentifier.fromString(streamIdentifierString.get()); } else { - streamIdentifier = appStreamTracker.map(mst -> null, sc -> sc.streamIdentifier()); + Validate.isTrue(!isMultiStreamMode, "Should not be in MultiStream Mode"); + streamIdentifier = this.currentStreamConfigMap.values().iterator().next().streamIdentifier(); } Validate.notNull(streamIdentifier, "Stream identifier should not be empty"); return streamIdentifier; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/MultiStreamLease.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/MultiStreamLease.java index aa850f2f..862c203d 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/MultiStreamLease.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/MultiStreamLease.java @@ -20,11 +20,10 @@ public class MultiStreamLease extends Lease { @NonNull private String streamIdentifier; @NonNull private String shardId; - public MultiStreamLease(Lease other) { + public MultiStreamLease(MultiStreamLease other) { super(other); - MultiStreamLease casted = validateAndCast(other); - streamIdentifier(casted.streamIdentifier); - shardId(casted.shardId); + streamIdentifier(other.streamIdentifier); + shardId(other.shardId); } @Override @@ -58,14 +57,7 @@ public class MultiStreamLease extends Lease { return false; } MultiStreamLease other = (MultiStreamLease) obj; - if (streamIdentifier == null) { - if (other.streamIdentifier != null) { - return false; - } - } else if (!streamIdentifier.equals(other.streamIdentifier)) { - return false; - } - return true; + return Objects.equals(streamIdentifier, other.streamIdentifier); } /** diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java index 04279f59..94a49903 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java @@ -15,19 +15,25 @@ package software.amazon.kinesis.retrieval; -import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.Getter; import lombok.NonNull; +import lombok.Setter; +import lombok.ToString; import lombok.experimental.Accessors; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.utils.Either; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.common.StreamConfig; +import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.processor.MultiStreamTracker; import software.amazon.kinesis.retrieval.fanout.FanOutConfig; /** * Used by the KCL to configure the retrieval of records from Kinesis. */ -@Data +@Getter @Setter @ToString @EqualsAndHashCode @Accessors(fluent = true) public class RetrievalConfig { /** @@ -43,19 +49,13 @@ public class RetrievalConfig { @NonNull private final KinesisAsyncClient kinesisClient; - /** - * The name of the stream to process records from. - */ - @NonNull - private final String streamName; - @NonNull private final String applicationName; /** - * StreamTracker for multi streaming support + * AppStreamTracker either for multi stream tracking or single stream */ - private MultiStreamTracker multiStreamTracker; + private Either appStreamTracker; /** * Backoff time between consecutive ListShards calls. @@ -90,15 +90,43 @@ public class RetrievalConfig { private RetrievalFactory retrievalFactory; + public RetrievalConfig(@NonNull KinesisAsyncClient kinesisAsyncClient, @NonNull String streamName, + @NonNull String applicationName) { + this.kinesisClient = kinesisAsyncClient; + this.appStreamTracker = Either + .right(new StreamConfig(StreamIdentifier.fromStreamName(streamName), initialPositionInStreamExtended)); + this.applicationName = applicationName; + } + + public RetrievalConfig(@NonNull KinesisAsyncClient kinesisAsyncClient, @NonNull MultiStreamTracker multiStreamTracker, + @NonNull String applicationName) { + this.kinesisClient = kinesisAsyncClient; + this.appStreamTracker = Either.left(multiStreamTracker); + this.applicationName = applicationName; + } + + public void initialPositionInStreamExtended(InitialPositionInStreamExtended initialPositionInStreamExtended) { + final StreamConfig[] streamConfig = new StreamConfig[1]; + this.appStreamTracker.apply(multiStreamTracker -> { + throw new IllegalArgumentException( + "Cannot set initialPositionInStreamExtended when multiStreamTracker is set"); + }, sc -> streamConfig[0] = sc); + this.appStreamTracker = Either + .right(new StreamConfig(streamConfig[0].streamIdentifier(), initialPositionInStreamExtended)); + } + public RetrievalFactory retrievalFactory() { if (retrievalFactory == null) { if (retrievalSpecificConfig == null) { - retrievalSpecificConfig = new FanOutConfig(kinesisClient()).streamName(streamName()) + retrievalSpecificConfig = new FanOutConfig(kinesisClient()) .applicationName(applicationName()); + retrievalSpecificConfig = appStreamTracker.map(multiStreamTracker -> retrievalSpecificConfig, + streamConfig -> ((FanOutConfig)retrievalSpecificConfig).streamName(streamConfig.streamIdentifier().streamName())); } retrievalFactory = retrievalSpecificConfig.retrievalFactory(); } return retrievalFactory; } + }