Adding account and stream epoch support. Checkpoint 3

This commit is contained in:
Ashwin Giridharan 2020-03-12 14:15:36 -07:00
parent f94b1b801a
commit 8be8d7a62b
6 changed files with 130 additions and 69 deletions

View file

@ -15,14 +15,18 @@
package software.amazon.kinesis.common; package software.amazon.kinesis.common;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import lombok.Data;
import lombok.NonNull; import lombok.NonNull;
import lombok.experimental.Accessors; import lombok.experimental.Accessors;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.utils.Either;
import software.amazon.kinesis.checkpoint.CheckpointConfig; import software.amazon.kinesis.checkpoint.CheckpointConfig;
import software.amazon.kinesis.coordinator.CoordinatorConfig; import software.amazon.kinesis.coordinator.CoordinatorConfig;
import software.amazon.kinesis.leases.LeaseManagementConfig; import software.amazon.kinesis.leases.LeaseManagementConfig;
@ -36,14 +40,15 @@ import software.amazon.kinesis.retrieval.RetrievalConfig;
/** /**
* This Builder is useful to create all configurations for the KCL with default values. * This Builder is useful to create all configurations for the KCL with default values.
*/ */
@Data @Getter @Setter @ToString @EqualsAndHashCode
@Accessors(fluent = true) @Accessors(fluent = true)
public class ConfigsBuilder { public class ConfigsBuilder {
/** /**
* Name of the stream to consume records from * Either the name of the stream to consume records from
* Or MultiStreamTracker for all the streams to consume records from
*/ */
@NonNull private Either<MultiStreamTracker, String> appStreamTracker;
private final String streamName;
/** /**
* Application name for the KCL Worker * Application name for the KCL Worker
*/ */
@ -109,7 +114,45 @@ public class ConfigsBuilder {
return namespace; return namespace;
} }
private MultiStreamTracker multiStreamTracker; /**
* Constructor to initialize ConfigsBuilder with StreamName
* @param streamName
* @param applicationName
* @param kinesisClient
* @param dynamoDBClient
* @param cloudWatchClient
* @param workerIdentifier
* @param shardRecordProcessorFactory
*/
public ConfigsBuilder(@NonNull String streamName, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) {
this.appStreamTracker = Either.right(streamName);
this.applicationName = applicationName;
this.kinesisClient = kinesisClient;
this.dynamoDBClient = dynamoDBClient;
this.cloudWatchClient = cloudWatchClient;
this.workerIdentifier = workerIdentifier;
this.shardRecordProcessorFactory = shardRecordProcessorFactory;
}
/**
* Constructor to initialize ConfigsBuilder with MultiStreamTracker
* @param multiStreamTracker
* @param applicationName
* @param kinesisClient
* @param dynamoDBClient
* @param cloudWatchClient
* @param workerIdentifier
* @param shardRecordProcessorFactory
*/
public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) {
this.appStreamTracker = Either.left(multiStreamTracker);
this.applicationName = applicationName;
this.kinesisClient = kinesisClient;
this.dynamoDBClient = dynamoDBClient;
this.cloudWatchClient = cloudWatchClient;
this.workerIdentifier = workerIdentifier;
this.shardRecordProcessorFactory = shardRecordProcessorFactory;
}
/** /**
* Creates a new instance of CheckpointConfig * Creates a new instance of CheckpointConfig
@ -135,8 +178,7 @@ public class ConfigsBuilder {
* @return LeaseManagementConfig * @return LeaseManagementConfig
*/ */
public LeaseManagementConfig leaseManagementConfig() { public LeaseManagementConfig leaseManagementConfig() {
return new LeaseManagementConfig(tableName(), dynamoDBClient(), kinesisClient(), streamName(), return new LeaseManagementConfig(tableName(), dynamoDBClient(), kinesisClient(), workerIdentifier());
workerIdentifier());
} }
/** /**
@ -173,10 +215,10 @@ public class ConfigsBuilder {
* @return RetrievalConfig * @return RetrievalConfig
*/ */
public RetrievalConfig retrievalConfig() { public RetrievalConfig retrievalConfig() {
final RetrievalConfig retrievalConfig = new RetrievalConfig(kinesisClient(), streamName(), applicationName()); final RetrievalConfig retrievalConfig =
if(this.multiStreamTracker != null) { appStreamTracker.map(
retrievalConfig.multiStreamTracker(multiStreamTracker); multiStreamTracker -> new RetrievalConfig(kinesisClient(), multiStreamTracker, applicationName()),
} streamName -> new RetrievalConfig(kinesisClient(), streamName, applicationName()));
return retrievalConfig; return retrievalConfig;
} }
} }

View file

@ -1,13 +1,10 @@
package software.amazon.kinesis.common; package software.amazon.kinesis.common;
import lombok.AccessLevel; import lombok.Value;
import lombok.Data;
import lombok.experimental.Accessors; import lombok.experimental.Accessors;
import lombok.experimental.FieldDefaults;
@Data @Value
@Accessors(fluent = true) @Accessors(fluent = true)
@FieldDefaults(makeFinal=true, level= AccessLevel.PRIVATE)
public class StreamConfig { public class StreamConfig {
StreamIdentifier streamIdentifier; StreamIdentifier streamIdentifier;
InitialPositionInStreamExtended initialPositionInStreamExtended; InitialPositionInStreamExtended initialPositionInStreamExtended;

View file

@ -1,11 +1,14 @@
package software.amazon.kinesis.common; package software.amazon.kinesis.common;
import com.google.common.base.Joiner;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;
import lombok.Getter; import lombok.Getter;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.experimental.Accessors; import lombok.experimental.Accessors;
import software.amazon.awssdk.utils.Validate; import software.amazon.awssdk.utils.Validate;
import java.util.regex.Pattern;
@RequiredArgsConstructor @RequiredArgsConstructor
@EqualsAndHashCode @EqualsAndHashCode
@Getter @Getter
@ -13,23 +16,28 @@ import software.amazon.awssdk.utils.Validate;
public class StreamIdentifier { public class StreamIdentifier {
private final String accountName; private final String accountName;
private final String streamName; private final String streamName;
private final String streamCreationEpoch; private final Long streamCreationEpoch;
private static final String DEFAULT = "default"; private static final String DEFAULT_ACCOUNT = "default";
private static final String DELIMITER = ":";
private static final Pattern PATTERN = Pattern.compile(".*" + ":" + ".*" + ":" + "[0-9]*");
@Override @Override
public String toString(){ public String toString(){
return accountName + ":" + streamName + ":" + streamCreationEpoch; return Joiner.on(DELIMITER).join(accountName, streamName, streamCreationEpoch);
} }
public static StreamIdentifier fromString(String streamIdentifier) { public static StreamIdentifier fromString(String streamIdentifier) {
final String[] idTokens = streamIdentifier.split(":"); if (PATTERN.matcher(streamIdentifier).matches()) {
Validate.isTrue(idTokens.length == 3, "Unable to deserialize StreamIdentifier from " + streamIdentifier); final String[] split = streamIdentifier.split(DELIMITER);
return new StreamIdentifier(idTokens[0], idTokens[1], idTokens[2]); return new StreamIdentifier(split[0], split[1], Long.parseLong(split[2]));
} else {
throw new IllegalArgumentException("Unable to deserialize StreamIdentifier from " + streamIdentifier);
}
} }
public static StreamIdentifier fromStreamName(String streamName) { public static StreamIdentifier fromStreamName(String streamName) {
Validate.notEmpty(streamName, "StreamName should not be empty"); Validate.notEmpty(streamName, "StreamName should not be empty");
return new StreamIdentifier(DEFAULT, streamName, DEFAULT); return new StreamIdentifier(DEFAULT_ACCOUNT, streamName, 0L);
} }
} }

View file

@ -16,6 +16,7 @@
package software.amazon.kinesis.coordinator; package software.amazon.kinesis.coordinator;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
@ -77,7 +78,6 @@ import software.amazon.kinesis.processor.Checkpointer;
import software.amazon.kinesis.processor.ProcessorConfig; import software.amazon.kinesis.processor.ProcessorConfig;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory; import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.processor.ShutdownNotificationAware; import software.amazon.kinesis.processor.ShutdownNotificationAware;
import software.amazon.kinesis.processor.MultiStreamTracker;
import software.amazon.kinesis.retrieval.AggregatorUtil; import software.amazon.kinesis.retrieval.AggregatorUtil;
import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RetrievalConfig; import software.amazon.kinesis.retrieval.RetrievalConfig;
@ -122,7 +122,7 @@ public class Scheduler implements Runnable {
private final MetricsFactory metricsFactory; private final MetricsFactory metricsFactory;
private final long failoverTimeMillis; private final long failoverTimeMillis;
private final long taskBackoffTimeMillis; private final long taskBackoffTimeMillis;
private final Either<MultiStreamTracker, StreamConfig> appStreamTracker; private final boolean isMultiStreamMode;
private final Map<StreamIdentifier, StreamConfig> currentStreamConfigMap; private final Map<StreamIdentifier, StreamConfig> currentStreamConfigMap;
private final long listShardsBackoffTimeMillis; private final long listShardsBackoffTimeMillis;
private final int maxListShardsRetryAttempts; private final int maxListShardsRetryAttempts;
@ -183,27 +183,22 @@ public class Scheduler implements Runnable {
this.retrievalConfig = retrievalConfig; this.retrievalConfig = retrievalConfig;
this.applicationName = this.coordinatorConfig.applicationName(); this.applicationName = this.coordinatorConfig.applicationName();
final MultiStreamTracker multiStreamTracker = this.retrievalConfig.multiStreamTracker(); this.isMultiStreamMode = this.retrievalConfig.appStreamTracker().map(
if(multiStreamTracker == null) { multiStreamTracker -> true, streamConfig -> false);
final StreamConfig streamConfig = new StreamConfig(StreamIdentifier.fromStreamName(this.retrievalConfig.streamName()), this.currentStreamConfigMap = this.retrievalConfig.appStreamTracker().map(
this.retrievalConfig.initialPositionInStreamExtended()); multiStreamTracker ->
this.appStreamTracker = Either.right(streamConfig); multiStreamTracker.streamConfigList().stream()
this.currentStreamConfigMap = new HashMap<StreamIdentifier, StreamConfig>() {{ .collect(Collectors.toMap(sc -> sc.streamIdentifier(), sc -> sc)),
put(streamConfig.streamIdentifier(), streamConfig); streamConfig ->
}}; Collections.singletonMap(streamConfig.streamIdentifier(), streamConfig));
} else {
this.appStreamTracker = Either.left(multiStreamTracker);
this.currentStreamConfigMap = multiStreamTracker.streamConfigList().stream()
.collect(Collectors.toMap(sc -> sc.streamIdentifier(), sc -> sc));
}
this.maxInitializationAttempts = this.coordinatorConfig.maxInitializationAttempts(); this.maxInitializationAttempts = this.coordinatorConfig.maxInitializationAttempts();
this.metricsFactory = this.metricsConfig.metricsFactory(); this.metricsFactory = this.metricsConfig.metricsFactory();
// Determine leaseSerializer based on availability of MultiStreamTracker. // Determine leaseSerializer based on availability of MultiStreamTracker.
final LeaseSerializer leaseSerializer = this.appStreamTracker.map(mst -> true, sc -> false) ? final LeaseSerializer leaseSerializer = isMultiStreamMode ?
new DynamoDBMultiStreamLeaseSerializer() : new DynamoDBMultiStreamLeaseSerializer() :
new DynamoDBLeaseSerializer(); new DynamoDBLeaseSerializer();
this.leaseCoordinator = this.leaseManagementConfig this.leaseCoordinator = this.leaseManagementConfig
.leaseManagementFactory(leaseSerializer, this.appStreamTracker.map(mst -> true, sc -> false)) .leaseManagementFactory(leaseSerializer, isMultiStreamMode)
.createLeaseCoordinator(this.metricsFactory); .createLeaseCoordinator(this.metricsFactory);
this.leaseRefresher = this.leaseCoordinator.leaseRefresher(); this.leaseRefresher = this.leaseCoordinator.leaseRefresher();
@ -224,7 +219,7 @@ public class Scheduler implements Runnable {
// TODO : Halo : Handle case of no StreamConfig present in streamConfigList() for the supplied streamName. // TODO : Halo : Handle case of no StreamConfig present in streamConfigList() for the supplied streamName.
// TODO : Pass the immutable map here instead of using mst.streamConfigList() // TODO : Pass the immutable map here instead of using mst.streamConfigList()
this.shardSyncTaskManagerProvider = streamIdentifier -> this.leaseManagementConfig this.shardSyncTaskManagerProvider = streamIdentifier -> this.leaseManagementConfig
.leaseManagementFactory(leaseSerializer, this.appStreamTracker.map(mst -> true, sc -> false)) .leaseManagementFactory(leaseSerializer, isMultiStreamMode)
.createShardSyncTaskManager(this.metricsFactory, this.currentStreamConfigMap.get(streamIdentifier)); .createShardSyncTaskManager(this.metricsFactory, this.currentStreamConfigMap.get(streamIdentifier));
this.shardPrioritization = this.coordinatorConfig.shardPrioritization(); this.shardPrioritization = this.coordinatorConfig.shardPrioritization();
this.cleanupLeasesUponShardCompletion = this.leaseManagementConfig.cleanupLeasesUponShardCompletion(); this.cleanupLeasesUponShardCompletion = this.leaseManagementConfig.cleanupLeasesUponShardCompletion();
@ -252,8 +247,7 @@ public class Scheduler implements Runnable {
this.ignoreUnexpetedChildShards = this.leaseManagementConfig.ignoreUnexpectedChildShards(); this.ignoreUnexpetedChildShards = this.leaseManagementConfig.ignoreUnexpectedChildShards();
this.aggregatorUtil = this.lifecycleConfig.aggregatorUtil(); this.aggregatorUtil = this.lifecycleConfig.aggregatorUtil();
// TODO : Halo : Check if this needs to be per stream. // TODO : Halo : Check if this needs to be per stream.
this.hierarchicalShardSyncer = leaseManagementConfig this.hierarchicalShardSyncer = leaseManagementConfig.hierarchicalShardSyncer(isMultiStreamMode);
.hierarchicalShardSyncer(this.appStreamTracker.map(mst -> true, sc -> false));
this.schedulerInitializationBackoffTimeMillis = this.coordinatorConfig.schedulerInitializationBackoffTimeMillis(); this.schedulerInitializationBackoffTimeMillis = this.coordinatorConfig.schedulerInitializationBackoffTimeMillis();
} }
@ -643,7 +637,6 @@ public class Scheduler implements Runnable {
// get the default stream name for the single stream application. // get the default stream name for the single stream application.
final StreamIdentifier streamIdentifier = getStreamIdentifier(shardInfo.streamIdentifier()); final StreamIdentifier streamIdentifier = getStreamIdentifier(shardInfo.streamIdentifier());
// Irrespective of single stream app or multi stream app, streamConfig should always be available. // Irrespective of single stream app or multi stream app, streamConfig should always be available.
// TODO: Halo : if not available, construct a default config ?
final StreamConfig streamConfig = currentStreamConfigMap.get(streamIdentifier); final StreamConfig streamConfig = currentStreamConfigMap.get(streamIdentifier);
Validate.notNull(streamConfig, "StreamConfig should not be empty"); Validate.notNull(streamConfig, "StreamConfig should not be empty");
ShardConsumerArgument argument = new ShardConsumerArgument(shardInfo, ShardConsumerArgument argument = new ShardConsumerArgument(shardInfo,
@ -721,7 +714,8 @@ public class Scheduler implements Runnable {
if(streamIdentifierString.isPresent()) { if(streamIdentifierString.isPresent()) {
streamIdentifier = StreamIdentifier.fromString(streamIdentifierString.get()); streamIdentifier = StreamIdentifier.fromString(streamIdentifierString.get());
} else { } else {
streamIdentifier = appStreamTracker.map(mst -> null, sc -> sc.streamIdentifier()); Validate.isTrue(!isMultiStreamMode, "Should not be in MultiStream Mode");
streamIdentifier = this.currentStreamConfigMap.values().iterator().next().streamIdentifier();
} }
Validate.notNull(streamIdentifier, "Stream identifier should not be empty"); Validate.notNull(streamIdentifier, "Stream identifier should not be empty");
return streamIdentifier; return streamIdentifier;

View file

@ -20,11 +20,10 @@ public class MultiStreamLease extends Lease {
@NonNull private String streamIdentifier; @NonNull private String streamIdentifier;
@NonNull private String shardId; @NonNull private String shardId;
public MultiStreamLease(Lease other) { public MultiStreamLease(MultiStreamLease other) {
super(other); super(other);
MultiStreamLease casted = validateAndCast(other); streamIdentifier(other.streamIdentifier);
streamIdentifier(casted.streamIdentifier); shardId(other.shardId);
shardId(casted.shardId);
} }
@Override @Override
@ -58,14 +57,7 @@ public class MultiStreamLease extends Lease {
return false; return false;
} }
MultiStreamLease other = (MultiStreamLease) obj; MultiStreamLease other = (MultiStreamLease) obj;
if (streamIdentifier == null) { return Objects.equals(streamIdentifier, other.streamIdentifier);
if (other.streamIdentifier != null) {
return false;
}
} else if (!streamIdentifier.equals(other.streamIdentifier)) {
return false;
}
return true;
} }
/** /**

View file

@ -15,19 +15,25 @@
package software.amazon.kinesis.retrieval; package software.amazon.kinesis.retrieval;
import lombok.Data; import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NonNull; import lombok.NonNull;
import lombok.Setter;
import lombok.ToString;
import lombok.experimental.Accessors; import lombok.experimental.Accessors;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.utils.Either;
import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.processor.MultiStreamTracker; import software.amazon.kinesis.processor.MultiStreamTracker;
import software.amazon.kinesis.retrieval.fanout.FanOutConfig; import software.amazon.kinesis.retrieval.fanout.FanOutConfig;
/** /**
* Used by the KCL to configure the retrieval of records from Kinesis. * Used by the KCL to configure the retrieval of records from Kinesis.
*/ */
@Data @Getter @Setter @ToString @EqualsAndHashCode
@Accessors(fluent = true) @Accessors(fluent = true)
public class RetrievalConfig { public class RetrievalConfig {
/** /**
@ -43,19 +49,13 @@ public class RetrievalConfig {
@NonNull @NonNull
private final KinesisAsyncClient kinesisClient; private final KinesisAsyncClient kinesisClient;
/**
* The name of the stream to process records from.
*/
@NonNull
private final String streamName;
@NonNull @NonNull
private final String applicationName; private final String applicationName;
/** /**
* StreamTracker for multi streaming support * AppStreamTracker either for multi stream tracking or single stream
*/ */
private MultiStreamTracker multiStreamTracker; private Either<MultiStreamTracker, StreamConfig> appStreamTracker;
/** /**
* Backoff time between consecutive ListShards calls. * Backoff time between consecutive ListShards calls.
@ -90,15 +90,43 @@ public class RetrievalConfig {
private RetrievalFactory retrievalFactory; private RetrievalFactory retrievalFactory;
public RetrievalConfig(@NonNull KinesisAsyncClient kinesisAsyncClient, @NonNull String streamName,
@NonNull String applicationName) {
this.kinesisClient = kinesisAsyncClient;
this.appStreamTracker = Either
.right(new StreamConfig(StreamIdentifier.fromStreamName(streamName), initialPositionInStreamExtended));
this.applicationName = applicationName;
}
public RetrievalConfig(@NonNull KinesisAsyncClient kinesisAsyncClient, @NonNull MultiStreamTracker multiStreamTracker,
@NonNull String applicationName) {
this.kinesisClient = kinesisAsyncClient;
this.appStreamTracker = Either.left(multiStreamTracker);
this.applicationName = applicationName;
}
public void initialPositionInStreamExtended(InitialPositionInStreamExtended initialPositionInStreamExtended) {
final StreamConfig[] streamConfig = new StreamConfig[1];
this.appStreamTracker.apply(multiStreamTracker -> {
throw new IllegalArgumentException(
"Cannot set initialPositionInStreamExtended when multiStreamTracker is set");
}, sc -> streamConfig[0] = sc);
this.appStreamTracker = Either
.right(new StreamConfig(streamConfig[0].streamIdentifier(), initialPositionInStreamExtended));
}
public RetrievalFactory retrievalFactory() { public RetrievalFactory retrievalFactory() {
if (retrievalFactory == null) { if (retrievalFactory == null) {
if (retrievalSpecificConfig == null) { if (retrievalSpecificConfig == null) {
retrievalSpecificConfig = new FanOutConfig(kinesisClient()).streamName(streamName()) retrievalSpecificConfig = new FanOutConfig(kinesisClient())
.applicationName(applicationName()); .applicationName(applicationName());
retrievalSpecificConfig = appStreamTracker.map(multiStreamTracker -> retrievalSpecificConfig,
streamConfig -> ((FanOutConfig)retrievalSpecificConfig).streamName(streamConfig.streamIdentifier().streamName()));
} }
retrievalFactory = retrievalSpecificConfig.retrievalFactory(); retrievalFactory = retrievalSpecificConfig.retrievalFactory();
} }
return retrievalFactory; return retrievalFactory;
} }
} }