Adding account and stream epoch support. Checkpoint 3

This commit is contained in:
Ashwin Giridharan 2020-03-12 14:15:36 -07:00
parent 8e8f6ed352
commit 255ae932d2
6 changed files with 130 additions and 69 deletions

View file

@ -15,14 +15,18 @@
package software.amazon.kinesis.common;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.apache.commons.lang3.StringUtils;
import lombok.Data;
import lombok.NonNull;
import lombok.experimental.Accessors;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.utils.Either;
import software.amazon.kinesis.checkpoint.CheckpointConfig;
import software.amazon.kinesis.coordinator.CoordinatorConfig;
import software.amazon.kinesis.leases.LeaseManagementConfig;
@ -36,14 +40,15 @@ import software.amazon.kinesis.retrieval.RetrievalConfig;
/**
* This Builder is useful to create all configurations for the KCL with default values.
*/
@Data
@Getter @Setter @ToString @EqualsAndHashCode
@Accessors(fluent = true)
public class ConfigsBuilder {
/**
* Name of the stream to consume records from
* Either the name of the stream to consume records from
* Or MultiStreamTracker for all the streams to consume records from
*/
@NonNull
private final String streamName;
private Either<MultiStreamTracker, String> appStreamTracker;
/**
* Application name for the KCL Worker
*/
@ -109,7 +114,45 @@ public class ConfigsBuilder {
return namespace;
}
private MultiStreamTracker multiStreamTracker;
/**
* Constructor to initialize ConfigsBuilder with StreamName
* @param streamName
* @param applicationName
* @param kinesisClient
* @param dynamoDBClient
* @param cloudWatchClient
* @param workerIdentifier
* @param shardRecordProcessorFactory
*/
public ConfigsBuilder(@NonNull String streamName, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) {
this.appStreamTracker = Either.right(streamName);
this.applicationName = applicationName;
this.kinesisClient = kinesisClient;
this.dynamoDBClient = dynamoDBClient;
this.cloudWatchClient = cloudWatchClient;
this.workerIdentifier = workerIdentifier;
this.shardRecordProcessorFactory = shardRecordProcessorFactory;
}
/**
* Constructor to initialize ConfigsBuilder with MultiStreamTracker
* @param multiStreamTracker
* @param applicationName
* @param kinesisClient
* @param dynamoDBClient
* @param cloudWatchClient
* @param workerIdentifier
* @param shardRecordProcessorFactory
*/
public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) {
this.appStreamTracker = Either.left(multiStreamTracker);
this.applicationName = applicationName;
this.kinesisClient = kinesisClient;
this.dynamoDBClient = dynamoDBClient;
this.cloudWatchClient = cloudWatchClient;
this.workerIdentifier = workerIdentifier;
this.shardRecordProcessorFactory = shardRecordProcessorFactory;
}
/**
* Creates a new instance of CheckpointConfig
@ -135,8 +178,7 @@ public class ConfigsBuilder {
* @return LeaseManagementConfig
*/
public LeaseManagementConfig leaseManagementConfig() {
return new LeaseManagementConfig(tableName(), dynamoDBClient(), kinesisClient(), streamName(),
workerIdentifier());
return new LeaseManagementConfig(tableName(), dynamoDBClient(), kinesisClient(), workerIdentifier());
}
/**
@ -173,10 +215,10 @@ public class ConfigsBuilder {
* @return RetrievalConfig
*/
public RetrievalConfig retrievalConfig() {
final RetrievalConfig retrievalConfig = new RetrievalConfig(kinesisClient(), streamName(), applicationName());
if(this.multiStreamTracker != null) {
retrievalConfig.multiStreamTracker(multiStreamTracker);
}
final RetrievalConfig retrievalConfig =
appStreamTracker.map(
multiStreamTracker -> new RetrievalConfig(kinesisClient(), multiStreamTracker, applicationName()),
streamName -> new RetrievalConfig(kinesisClient(), streamName, applicationName()));
return retrievalConfig;
}
}

View file

@ -1,13 +1,10 @@
package software.amazon.kinesis.common;
import lombok.AccessLevel;
import lombok.Data;
import lombok.Value;
import lombok.experimental.Accessors;
import lombok.experimental.FieldDefaults;
@Data
@Value
@Accessors(fluent = true)
@FieldDefaults(makeFinal=true, level= AccessLevel.PRIVATE)
public class StreamConfig {
StreamIdentifier streamIdentifier;
InitialPositionInStreamExtended initialPositionInStreamExtended;

View file

@ -1,11 +1,14 @@
package software.amazon.kinesis.common;
import com.google.common.base.Joiner;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.experimental.Accessors;
import software.amazon.awssdk.utils.Validate;
import java.util.regex.Pattern;
@RequiredArgsConstructor
@EqualsAndHashCode
@Getter
@ -13,23 +16,28 @@ import software.amazon.awssdk.utils.Validate;
public class StreamIdentifier {
private final String accountName;
private final String streamName;
private final String streamCreationEpoch;
private final Long streamCreationEpoch;
private static final String DEFAULT = "default";
private static final String DEFAULT_ACCOUNT = "default";
private static final String DELIMITER = ":";
private static final Pattern PATTERN = Pattern.compile(".*" + ":" + ".*" + ":" + "[0-9]*");
@Override
public String toString(){
return accountName + ":" + streamName + ":" + streamCreationEpoch;
return Joiner.on(DELIMITER).join(accountName, streamName, streamCreationEpoch);
}
public static StreamIdentifier fromString(String streamIdentifier) {
final String[] idTokens = streamIdentifier.split(":");
Validate.isTrue(idTokens.length == 3, "Unable to deserialize StreamIdentifier from " + streamIdentifier);
return new StreamIdentifier(idTokens[0], idTokens[1], idTokens[2]);
if (PATTERN.matcher(streamIdentifier).matches()) {
final String[] split = streamIdentifier.split(DELIMITER);
return new StreamIdentifier(split[0], split[1], Long.parseLong(split[2]));
} else {
throw new IllegalArgumentException("Unable to deserialize StreamIdentifier from " + streamIdentifier);
}
}
public static StreamIdentifier fromStreamName(String streamName) {
Validate.notEmpty(streamName, "StreamName should not be empty");
return new StreamIdentifier(DEFAULT, streamName, DEFAULT);
return new StreamIdentifier(DEFAULT_ACCOUNT, streamName, 0L);
}
}

View file

@ -16,6 +16,7 @@
package software.amazon.kinesis.coordinator;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -77,7 +78,6 @@ import software.amazon.kinesis.processor.Checkpointer;
import software.amazon.kinesis.processor.ProcessorConfig;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.processor.ShutdownNotificationAware;
import software.amazon.kinesis.processor.MultiStreamTracker;
import software.amazon.kinesis.retrieval.AggregatorUtil;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RetrievalConfig;
@ -122,7 +122,7 @@ public class Scheduler implements Runnable {
private final MetricsFactory metricsFactory;
private final long failoverTimeMillis;
private final long taskBackoffTimeMillis;
private final Either<MultiStreamTracker, StreamConfig> appStreamTracker;
private final boolean isMultiStreamMode;
private final Map<StreamIdentifier, StreamConfig> currentStreamConfigMap;
private final long listShardsBackoffTimeMillis;
private final int maxListShardsRetryAttempts;
@ -183,27 +183,22 @@ public class Scheduler implements Runnable {
this.retrievalConfig = retrievalConfig;
this.applicationName = this.coordinatorConfig.applicationName();
final MultiStreamTracker multiStreamTracker = this.retrievalConfig.multiStreamTracker();
if(multiStreamTracker == null) {
final StreamConfig streamConfig = new StreamConfig(StreamIdentifier.fromStreamName(this.retrievalConfig.streamName()),
this.retrievalConfig.initialPositionInStreamExtended());
this.appStreamTracker = Either.right(streamConfig);
this.currentStreamConfigMap = new HashMap<StreamIdentifier, StreamConfig>() {{
put(streamConfig.streamIdentifier(), streamConfig);
}};
} else {
this.appStreamTracker = Either.left(multiStreamTracker);
this.currentStreamConfigMap = multiStreamTracker.streamConfigList().stream()
.collect(Collectors.toMap(sc -> sc.streamIdentifier(), sc -> sc));
}
this.isMultiStreamMode = this.retrievalConfig.appStreamTracker().map(
multiStreamTracker -> true, streamConfig -> false);
this.currentStreamConfigMap = this.retrievalConfig.appStreamTracker().map(
multiStreamTracker ->
multiStreamTracker.streamConfigList().stream()
.collect(Collectors.toMap(sc -> sc.streamIdentifier(), sc -> sc)),
streamConfig ->
Collections.singletonMap(streamConfig.streamIdentifier(), streamConfig));
this.maxInitializationAttempts = this.coordinatorConfig.maxInitializationAttempts();
this.metricsFactory = this.metricsConfig.metricsFactory();
// Determine leaseSerializer based on availability of MultiStreamTracker.
final LeaseSerializer leaseSerializer = this.appStreamTracker.map(mst -> true, sc -> false) ?
final LeaseSerializer leaseSerializer = isMultiStreamMode ?
new DynamoDBMultiStreamLeaseSerializer() :
new DynamoDBLeaseSerializer();
this.leaseCoordinator = this.leaseManagementConfig
.leaseManagementFactory(leaseSerializer, this.appStreamTracker.map(mst -> true, sc -> false))
.leaseManagementFactory(leaseSerializer, isMultiStreamMode)
.createLeaseCoordinator(this.metricsFactory);
this.leaseRefresher = this.leaseCoordinator.leaseRefresher();
@ -224,7 +219,7 @@ public class Scheduler implements Runnable {
// TODO : Halo : Handle case of no StreamConfig present in streamConfigList() for the supplied streamName.
// TODO : Pass the immutable map here instead of using mst.streamConfigList()
this.shardSyncTaskManagerProvider = streamIdentifier -> this.leaseManagementConfig
.leaseManagementFactory(leaseSerializer, this.appStreamTracker.map(mst -> true, sc -> false))
.leaseManagementFactory(leaseSerializer, isMultiStreamMode)
.createShardSyncTaskManager(this.metricsFactory, this.currentStreamConfigMap.get(streamIdentifier));
this.shardPrioritization = this.coordinatorConfig.shardPrioritization();
this.cleanupLeasesUponShardCompletion = this.leaseManagementConfig.cleanupLeasesUponShardCompletion();
@ -252,8 +247,7 @@ public class Scheduler implements Runnable {
this.ignoreUnexpetedChildShards = this.leaseManagementConfig.ignoreUnexpectedChildShards();
this.aggregatorUtil = this.lifecycleConfig.aggregatorUtil();
// TODO : Halo : Check if this needs to be per stream.
this.hierarchicalShardSyncer = leaseManagementConfig
.hierarchicalShardSyncer(this.appStreamTracker.map(mst -> true, sc -> false));
this.hierarchicalShardSyncer = leaseManagementConfig.hierarchicalShardSyncer(isMultiStreamMode);
this.schedulerInitializationBackoffTimeMillis = this.coordinatorConfig.schedulerInitializationBackoffTimeMillis();
}
@ -643,7 +637,6 @@ public class Scheduler implements Runnable {
// get the default stream name for the single stream application.
final StreamIdentifier streamIdentifier = getStreamIdentifier(shardInfo.streamIdentifier());
// Irrespective of single stream app or multi stream app, streamConfig should always be available.
// TODO: Halo : if not available, construct a default config ?
final StreamConfig streamConfig = currentStreamConfigMap.get(streamIdentifier);
Validate.notNull(streamConfig, "StreamConfig should not be empty");
ShardConsumerArgument argument = new ShardConsumerArgument(shardInfo,
@ -721,7 +714,8 @@ public class Scheduler implements Runnable {
if(streamIdentifierString.isPresent()) {
streamIdentifier = StreamIdentifier.fromString(streamIdentifierString.get());
} else {
streamIdentifier = appStreamTracker.map(mst -> null, sc -> sc.streamIdentifier());
Validate.isTrue(!isMultiStreamMode, "Should not be in MultiStream Mode");
streamIdentifier = this.currentStreamConfigMap.values().iterator().next().streamIdentifier();
}
Validate.notNull(streamIdentifier, "Stream identifier should not be empty");
return streamIdentifier;

View file

@ -20,11 +20,10 @@ public class MultiStreamLease extends Lease {
@NonNull private String streamIdentifier;
@NonNull private String shardId;
public MultiStreamLease(Lease other) {
public MultiStreamLease(MultiStreamLease other) {
super(other);
MultiStreamLease casted = validateAndCast(other);
streamIdentifier(casted.streamIdentifier);
shardId(casted.shardId);
streamIdentifier(other.streamIdentifier);
shardId(other.shardId);
}
@Override
@ -58,14 +57,7 @@ public class MultiStreamLease extends Lease {
return false;
}
MultiStreamLease other = (MultiStreamLease) obj;
if (streamIdentifier == null) {
if (other.streamIdentifier != null) {
return false;
}
} else if (!streamIdentifier.equals(other.streamIdentifier)) {
return false;
}
return true;
return Objects.equals(streamIdentifier, other.streamIdentifier);
}
/**

View file

@ -15,19 +15,25 @@
package software.amazon.kinesis.retrieval;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NonNull;
import lombok.Setter;
import lombok.ToString;
import lombok.experimental.Accessors;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.utils.Either;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.processor.MultiStreamTracker;
import software.amazon.kinesis.retrieval.fanout.FanOutConfig;
/**
* Used by the KCL to configure the retrieval of records from Kinesis.
*/
@Data
@Getter @Setter @ToString @EqualsAndHashCode
@Accessors(fluent = true)
public class RetrievalConfig {
/**
@ -43,19 +49,13 @@ public class RetrievalConfig {
@NonNull
private final KinesisAsyncClient kinesisClient;
/**
* The name of the stream to process records from.
*/
@NonNull
private final String streamName;
@NonNull
private final String applicationName;
/**
* StreamTracker for multi streaming support
* AppStreamTracker either for multi stream tracking or single stream
*/
private MultiStreamTracker multiStreamTracker;
private Either<MultiStreamTracker, StreamConfig> appStreamTracker;
/**
* Backoff time between consecutive ListShards calls.
@ -90,15 +90,43 @@ public class RetrievalConfig {
private RetrievalFactory retrievalFactory;
public RetrievalConfig(@NonNull KinesisAsyncClient kinesisAsyncClient, @NonNull String streamName,
@NonNull String applicationName) {
this.kinesisClient = kinesisAsyncClient;
this.appStreamTracker = Either
.right(new StreamConfig(StreamIdentifier.fromStreamName(streamName), initialPositionInStreamExtended));
this.applicationName = applicationName;
}
public RetrievalConfig(@NonNull KinesisAsyncClient kinesisAsyncClient, @NonNull MultiStreamTracker multiStreamTracker,
@NonNull String applicationName) {
this.kinesisClient = kinesisAsyncClient;
this.appStreamTracker = Either.left(multiStreamTracker);
this.applicationName = applicationName;
}
public void initialPositionInStreamExtended(InitialPositionInStreamExtended initialPositionInStreamExtended) {
final StreamConfig[] streamConfig = new StreamConfig[1];
this.appStreamTracker.apply(multiStreamTracker -> {
throw new IllegalArgumentException(
"Cannot set initialPositionInStreamExtended when multiStreamTracker is set");
}, sc -> streamConfig[0] = sc);
this.appStreamTracker = Either
.right(new StreamConfig(streamConfig[0].streamIdentifier(), initialPositionInStreamExtended));
}
public RetrievalFactory retrievalFactory() {
if (retrievalFactory == null) {
if (retrievalSpecificConfig == null) {
retrievalSpecificConfig = new FanOutConfig(kinesisClient()).streamName(streamName())
retrievalSpecificConfig = new FanOutConfig(kinesisClient())
.applicationName(applicationName());
retrievalSpecificConfig = appStreamTracker.map(multiStreamTracker -> retrievalSpecificConfig,
streamConfig -> ((FanOutConfig)retrievalSpecificConfig).streamName(streamConfig.streamIdentifier().streamName()));
}
retrievalFactory = retrievalSpecificConfig.retrievalFactory();
}
return retrievalFactory;
}
}