Multistream tracker v1

This commit is contained in:
Ashwin Giridharan 2020-02-15 00:29:38 -08:00
parent 38c64b8624
commit d6e2e0b324
19 changed files with 577 additions and 105 deletions

View file

@ -54,54 +54,54 @@ public class DynamoDBCheckpointer implements Checkpointer {
private String operation;
@Override
public void setCheckpoint(final String shardId, final ExtendedSequenceNumber checkpointValue,
public void setCheckpoint(final String leaseKey, final ExtendedSequenceNumber checkpointValue,
final String concurrencyToken) throws KinesisClientLibException {
try {
boolean wasSuccessful = setCheckpoint(shardId, checkpointValue, UUID.fromString(concurrencyToken));
boolean wasSuccessful = setCheckpoint(leaseKey, checkpointValue, UUID.fromString(concurrencyToken));
if (!wasSuccessful) {
throw new ShutdownException("Can't update checkpoint - instance doesn't hold the lease for this shard");
}
} catch (ProvisionedThroughputException e) {
throw new ThrottlingException("Got throttled while updating checkpoint.", e);
} catch (InvalidStateException e) {
String message = "Unable to save checkpoint for shardId " + shardId;
String message = "Unable to save checkpoint for shardId " + leaseKey;
log.error(message, e);
throw new software.amazon.kinesis.exceptions.InvalidStateException(message, e);
} catch (DependencyException e) {
throw new KinesisClientLibDependencyException("Unable to save checkpoint for shardId " + shardId, e);
throw new KinesisClientLibDependencyException("Unable to save checkpoint for shardId " + leaseKey, e);
}
}
@Override
public ExtendedSequenceNumber getCheckpoint(final String shardId) throws KinesisClientLibException {
public ExtendedSequenceNumber getCheckpoint(final String leaseKey) throws KinesisClientLibException {
try {
return leaseRefresher.getLease(shardId).checkpoint();
return leaseRefresher.getLease(leaseKey).checkpoint();
} catch (DependencyException | InvalidStateException | ProvisionedThroughputException e) {
String message = "Unable to fetch checkpoint for shardId " + shardId;
String message = "Unable to fetch checkpoint for shardId " + leaseKey;
log.error(message, e);
throw new KinesisClientLibIOException(message, e);
}
}
@Override
public Checkpoint getCheckpointObject(final String shardId) throws KinesisClientLibException {
public Checkpoint getCheckpointObject(final String leaseKey) throws KinesisClientLibException {
try {
Lease lease = leaseRefresher.getLease(shardId);
log.debug("[{}] Retrieved lease => {}", shardId, lease);
Lease lease = leaseRefresher.getLease(leaseKey);
log.debug("[{}] Retrieved lease => {}", leaseKey, lease);
return new Checkpoint(lease.checkpoint(), lease.pendingCheckpoint());
} catch (DependencyException | InvalidStateException | ProvisionedThroughputException e) {
String message = "Unable to fetch checkpoint for shardId " + shardId;
String message = "Unable to fetch checkpoint for shardId " + leaseKey;
log.error(message, e);
throw new KinesisClientLibIOException(message, e);
}
}
@Override
public void prepareCheckpoint(final String shardId, final ExtendedSequenceNumber pendingCheckpoint,
public void prepareCheckpoint(final String leaseKey, final ExtendedSequenceNumber pendingCheckpoint,
final String concurrencyToken) throws KinesisClientLibException {
try {
boolean wasSuccessful =
prepareCheckpoint(shardId, pendingCheckpoint, UUID.fromString(concurrencyToken));
prepareCheckpoint(leaseKey, pendingCheckpoint, UUID.fromString(concurrencyToken));
if (!wasSuccessful) {
throw new ShutdownException(
"Can't prepare checkpoint - instance doesn't hold the lease for this shard");
@ -109,21 +109,21 @@ public class DynamoDBCheckpointer implements Checkpointer {
} catch (ProvisionedThroughputException e) {
throw new ThrottlingException("Got throttled while preparing checkpoint.", e);
} catch (InvalidStateException e) {
String message = "Unable to prepare checkpoint for shardId " + shardId;
String message = "Unable to prepare checkpoint for shardId " + leaseKey;
log.error(message, e);
throw new software.amazon.kinesis.exceptions.InvalidStateException(message, e);
} catch (DependencyException e) {
throw new KinesisClientLibDependencyException("Unable to prepare checkpoint for shardId " + shardId, e);
throw new KinesisClientLibDependencyException("Unable to prepare checkpoint for shardId " + leaseKey, e);
}
}
@VisibleForTesting
public boolean setCheckpoint(String shardId, ExtendedSequenceNumber checkpoint, UUID concurrencyToken)
public boolean setCheckpoint(String leaseKey, ExtendedSequenceNumber checkpoint, UUID concurrencyToken)
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
Lease lease = leaseCoordinator.getCurrentlyHeldLease(shardId);
Lease lease = leaseCoordinator.getCurrentlyHeldLease(leaseKey);
if (lease == null) {
log.info("Worker {} could not update checkpoint for shard {} because it does not hold the lease",
leaseCoordinator.workerIdentifier(), shardId);
leaseCoordinator.workerIdentifier(), leaseKey);
return false;
}
@ -131,20 +131,20 @@ public class DynamoDBCheckpointer implements Checkpointer {
lease.pendingCheckpoint(null);
lease.ownerSwitchesSinceCheckpoint(0L);
return leaseCoordinator.updateLease(lease, concurrencyToken, operation, shardId);
return leaseCoordinator.updateLease(lease, concurrencyToken, operation, leaseKey);
}
boolean prepareCheckpoint(String shardId, ExtendedSequenceNumber pendingCheckpoint, UUID concurrencyToken)
boolean prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, UUID concurrencyToken)
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
Lease lease = leaseCoordinator.getCurrentlyHeldLease(shardId);
Lease lease = leaseCoordinator.getCurrentlyHeldLease(leaseKey);
if (lease == null) {
log.info("Worker {} could not prepare checkpoint for shard {} because it does not hold the lease",
leaseCoordinator.workerIdentifier(), shardId);
leaseCoordinator.workerIdentifier(), leaseKey);
return false;
}
lease.pendingCheckpoint(Objects.requireNonNull(pendingCheckpoint, "pendingCheckpoint should not be null"));
return leaseCoordinator.updateLease(lease, concurrencyToken, operation, shardId);
return leaseCoordinator.updateLease(lease, concurrencyToken, operation, leaseKey);
}
@Override

View file

@ -30,6 +30,7 @@ import software.amazon.kinesis.lifecycle.LifecycleConfig;
import software.amazon.kinesis.metrics.MetricsConfig;
import software.amazon.kinesis.processor.ProcessorConfig;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.processor.MultiStreamTracker;
import software.amazon.kinesis.retrieval.RetrievalConfig;
/**
@ -108,6 +109,8 @@ public class ConfigsBuilder {
return namespace;
}
private MultiStreamTracker multiStreamTracker;
/**
* Creates a new instance of CheckpointConfig
*
@ -170,6 +173,10 @@ public class ConfigsBuilder {
* @return RetrievalConfig
*/
public RetrievalConfig retrievalConfig() {
return new RetrievalConfig(kinesisClient(), streamName(), applicationName());
final RetrievalConfig retrievalConfig = new RetrievalConfig(kinesisClient(), streamName(), applicationName());
if(this.multiStreamTracker != null) {
retrievalConfig.multiStreamTracker(multiStreamTracker);
}
return retrievalConfig;
}
}

View file

@ -0,0 +1,14 @@
package software.amazon.kinesis.common;
import lombok.AccessLevel;
import lombok.Data;
import lombok.experimental.Accessors;
import lombok.experimental.FieldDefaults;
@Data
@Accessors(fluent = true)
@FieldDefaults(makeFinal=true, level= AccessLevel.PRIVATE)
public class StreamConfig {
String streamName;
InitialPositionInStreamExtended initialPositionInStreamExtended;
}

View file

@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import io.reactivex.plugins.RxJavaPlugins;
import lombok.AccessLevel;
import lombok.Getter;
@ -36,6 +37,8 @@ import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.Validate;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.checkpoint.CheckpointConfig;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
@ -43,6 +46,7 @@ import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.LeaseManagementConfig;
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.LeaseSerializer;
import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.ShardPrioritization;
@ -50,6 +54,8 @@ import software.amazon.kinesis.leases.ShardSyncTask;
import software.amazon.kinesis.leases.ShardSyncTaskManager;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseCoordinator;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseSerializer;
import software.amazon.kinesis.leases.dynamodb.DynamoDBMultiStreamLeaseSerializer;
import software.amazon.kinesis.leases.exceptions.LeasingException;
import software.amazon.kinesis.lifecycle.LifecycleConfig;
import software.amazon.kinesis.lifecycle.ShardConsumer;
@ -66,6 +72,7 @@ import software.amazon.kinesis.processor.Checkpointer;
import software.amazon.kinesis.processor.ProcessorConfig;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.processor.ShutdownNotificationAware;
import software.amazon.kinesis.processor.MultiStreamTracker;
import software.amazon.kinesis.retrieval.AggregatorUtil;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RetrievalConfig;
@ -100,7 +107,7 @@ public class Scheduler implements Runnable {
private final DiagnosticEventHandler diagnosticEventHandler;
// private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
private final LeaseCoordinator leaseCoordinator;
private final ShardSyncTaskManager shardSyncTaskManager;
// private final ShardSyncTaskManager shardSyncTaskManager;
private final ShardPrioritization shardPrioritization;
private final boolean cleanupLeasesUponShardCompletion;
private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist;
@ -110,11 +117,12 @@ public class Scheduler implements Runnable {
private final MetricsFactory metricsFactory;
private final long failoverTimeMillis;
private final long taskBackoffTimeMillis;
private final String streamName;
private final List<String> listOfStreams;
private final MultiStreamTracker multiStreamTracker;
private final long listShardsBackoffTimeMillis;
private final int maxListShardsRetryAttempts;
private final LeaseRefresher leaseRefresher;
private final ShardDetector shardDetector;
// private final ShardDetector shardDetector;
private final boolean ignoreUnexpetedChildShards;
private final AggregatorUtil aggregatorUtil;
private final HierarchicalShardSyncer hierarchicalShardSyncer;
@ -170,9 +178,18 @@ public class Scheduler implements Runnable {
this.retrievalConfig = retrievalConfig;
this.applicationName = this.coordinatorConfig.applicationName();
this.multiStreamTracker = this.retrievalConfig.multiStreamTracker();
this.listOfStreams = this.multiStreamTracker == null ?
ImmutableList.of(this.retrievalConfig.streamName()) :
this.multiStreamTracker.listStreamsToProcess();
Validate.isTrue(!CollectionUtils.isNullOrEmpty(this.listOfStreams), "No stream configured to process.");
this.maxInitializationAttempts = this.coordinatorConfig.maxInitializationAttempts();
this.metricsFactory = this.metricsConfig.metricsFactory();
this.leaseCoordinator = this.leaseManagementConfig.leaseManagementFactory()
// Determine leaseSerializer based on MultiStreamTracker
final LeaseSerializer leaseSerializer = this.multiStreamTracker == null ?
new DynamoDBMultiStreamLeaseSerializer() :
new DynamoDBLeaseSerializer();
this.leaseCoordinator = this.leaseManagementConfig.leaseManagementFactory(leaseSerializer)
.createLeaseCoordinator(this.metricsFactory);
this.leaseRefresher = this.leaseCoordinator.leaseRefresher();
@ -191,8 +208,8 @@ public class Scheduler implements Runnable {
this.diagnosticEventFactory = diagnosticEventFactory;
this.diagnosticEventHandler = new DiagnosticEventLogger();
this.shardSyncTaskManager = this.leaseManagementConfig.leaseManagementFactory()
.createShardSyncTaskManager(this.metricsFactory);
// this.shardSyncTaskManager = this.leaseManagementConfig.leaseManagementFactory()
// .createShardSyncTaskManager(this.metricsFactory);
this.shardPrioritization = this.coordinatorConfig.shardPrioritization();
this.cleanupLeasesUponShardCompletion = this.leaseManagementConfig.cleanupLeasesUponShardCompletion();
this.skipShardSyncAtWorkerInitializationIfLeasesExist =
@ -214,10 +231,9 @@ public class Scheduler implements Runnable {
this.taskBackoffTimeMillis = this.lifecycleConfig.taskBackoffTimeMillis();
// this.retryGetRecordsInSeconds = this.retrievalConfig.retryGetRecordsInSeconds();
// this.maxGetRecordsThreadPool = this.retrievalConfig.maxGetRecordsThreadPool();
this.streamName = this.retrievalConfig.streamName();
this.listShardsBackoffTimeMillis = this.retrievalConfig.listShardsBackoffTimeInMillis();
this.maxListShardsRetryAttempts = this.retrievalConfig.maxListShardsRetryAttempts();
this.shardDetector = this.shardSyncTaskManager.shardDetector();
// this.shardDetector = this.shardSyncTaskManager.shardDetector();
this.ignoreUnexpetedChildShards = this.leaseManagementConfig.ignoreUnexpectedChildShards();
this.aggregatorUtil = this.lifecycleConfig.aggregatorUtil();
this.hierarchicalShardSyncer = leaseManagementConfig.hierarchicalShardSyncer();
@ -264,28 +280,35 @@ public class Scheduler implements Runnable {
log.info("Initializing LeaseCoordinator");
leaseCoordinator.initialize();
TaskResult result = null;
TaskResult result;
if (!skipShardSyncAtWorkerInitializationIfLeasesExist || leaseRefresher.isLeaseTableEmpty()) {
log.info("Syncing Kinesis shard info");
ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetector, leaseRefresher, initialPosition,
cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L, hierarchicalShardSyncer,
metricsFactory);
result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call();
// TODO: Resume the shard sync from failed stream in the next attempt, to avoid syncing
// TODO: for already synced streams
for(String streamName : this.listOfStreams) {
log.info("Syncing Kinesis shard info");
ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetector, leaseRefresher,
initialPosition, cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L,
hierarchicalShardSyncer, metricsFactory);
result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call();
// Throwing the exception, to prevent further syncs for other stream.
if (result.getException() != null) {
log.error("Caught exception when sync'ing info for " + streamName, result.getException());
throw result.getException();
}
}
} else {
log.info("Skipping shard sync per configuration setting (and lease table is not empty)");
}
if (result == null || result.getException() == null) {
if (!leaseCoordinator.isRunning()) {
log.info("Starting LeaseCoordinator");
leaseCoordinator.start();
} else {
log.info("LeaseCoordinator is already running. No need to start it.");
}
isDone = true;
// If we reach this point, then we either skipped the lease sync or did not have any exception
// for any of the shard sync in the previous attempt.
if (!leaseCoordinator.isRunning()) {
log.info("Starting LeaseCoordinator");
leaseCoordinator.start();
} else {
lastException = result.getException();
log.info("LeaseCoordinator is already running. No need to start it.");
}
isDone = true;
} catch (LeasingException e) {
log.error("Caught exception when initializing LeaseCoordinator", e);
lastException = e;
@ -591,7 +614,7 @@ public class Scheduler implements Runnable {
ShardRecordProcessorCheckpointer checkpointer = coordinatorConfig.coordinatorFactory().createRecordProcessorCheckpointer(shardInfo,
checkpoint);
ShardConsumerArgument argument = new ShardConsumerArgument(shardInfo,
streamName,
shardInfo.streamName(),
leaseCoordinator,
executorService,
cache,

View file

@ -0,0 +1,46 @@
package software.amazon.kinesis.leases;
import lombok.Getter;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
import java.util.Optional;
public class CompositeLeaseKey {
// private static final String LEASE_TOKEN_SEPERATOR = ":";
//
// private String streamName;
//
// @Getter
// private String shardId;
//
// public CompositeLeaseKey(String shardId) {
// this(null, shardId);
// }
//
// public CompositeLeaseKey(String streamName, String shardId) {
// this.streamName = streamName;
// this.shardId = shardId;
// }
//
// public Optional<String> getStreamName() {
// return Optional.ofNullable(streamName);
// }
//
// public String getLeaseKey(boolean isMultiStreamingEnabled) {
// Validate.isTrue(!(isMultiStreamingEnabled && StringUtils.isEmpty(streamName)),
// "Empty stream name found while multiStreaming is enabled");
// return isMultiStreamingEnabled ? StringUtils.joinWith(LEASE_TOKEN_SEPERATOR, streamName, shardId) : shardId;
// }
//
// public static CompositeLeaseKey getLeaseKey(String leaseKey) {
// Validate.notNull(leaseKey);
// String leaseTokens[] = leaseKey.split(LEASE_TOKEN_SEPERATOR);
// Validate.inclusiveBetween(1, 2, leaseTokens.length);
// return leaseTokens.length == 2 ?
// new CompositeLeaseKey(leaseTokens[0], leaseTokens[1]) :
// new CompositeLeaseKey(leaseTokens[0]);
// }
}

View file

@ -16,6 +16,8 @@
package software.amazon.kinesis.leases;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
@ -27,6 +29,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.Data;
import lombok.NonNull;
import lombok.experimental.Accessors;
import org.apache.commons.lang3.Validate;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
@ -36,6 +39,7 @@ import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseManagementFactory;
import software.amazon.kinesis.leases.dynamodb.TableCreatorCallback;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.NullMetricsFactory;
import software.amazon.kinesis.processor.MultiStreamTracker;
/**
* Used by the KCL to configure lease management.
@ -71,7 +75,7 @@ public class LeaseManagementConfig {
* Name of the Kinesis Data Stream to read records from.
*/
@NonNull
private final String streamName;
private String streamName;
/**
* Used to distinguish different workers/processes of a KCL application.
*
@ -116,7 +120,7 @@ public class LeaseManagementConfig {
*
* <p>Default value: {@link Integer#MAX_VALUE}</p>
*/
private int maxLeasesForWorker = Integer.MAX_VALUE;;
private int maxLeasesForWorker = Integer.MAX_VALUE;
/**
* Max leases to steal from another worker at one time (for load balancing).
@ -182,6 +186,24 @@ public class LeaseManagementConfig {
private MetricsFactory metricsFactory = new NullMetricsFactory();
@Deprecated
public LeaseManagementConfig(String tableName, DynamoDbAsyncClient dynamoDBClient, KinesisAsyncClient kinesisClient,
String streamName, String workerIdentifier) {
this.tableName = tableName;
this.dynamoDBClient = dynamoDBClient;
this.kinesisClient = kinesisClient;
this.streamName = streamName;
this.workerIdentifier = workerIdentifier;
}
public LeaseManagementConfig(String tableName, DynamoDbAsyncClient dynamoDBClient, KinesisAsyncClient kinesisClient,
String workerIdentifier) {
this.tableName = tableName;
this.dynamoDBClient = dynamoDBClient;
this.kinesisClient = kinesisClient;
this.workerIdentifier = workerIdentifier;
}
/**
* Returns the metrics factory.
*
@ -244,9 +266,10 @@ public class LeaseManagementConfig {
private LeaseManagementFactory leaseManagementFactory;
@Deprecated
public LeaseManagementFactory leaseManagementFactory() {
if (leaseManagementFactory == null) {
leaseManagementFactory = new DynamoDBLeaseManagementFactory(kinesisClient(),
Validate.notEmpty(streamName(), "Stream name is empty");
return new DynamoDBLeaseManagementFactory(kinesisClient(),
streamName(),
dynamoDBClient(),
tableName(),
@ -271,8 +294,43 @@ public class LeaseManagementConfig {
initialLeaseTableWriteCapacity(),
hierarchicalShardSyncer(),
tableCreatorCallback(), dynamoDbRequestTimeout(), billingMode());
}
return leaseManagementFactory;
}
public LeaseManagementFactory leaseManagementFactory(final LeaseSerializer leaseSerializer) {
return new DynamoDBLeaseManagementFactory(kinesisClient(),
dynamoDBClient(),
tableName(),
workerIdentifier(),
executorService(),
failoverTimeMillis(),
epsilonMillis(),
maxLeasesForWorker(),
maxLeasesToStealAtOneTime(),
maxLeaseRenewalThreads(),
cleanupLeasesUponShardCompletion(),
ignoreUnexpectedChildShards(),
shardSyncIntervalMillis(),
consistentReads(),
listShardsBackoffTimeInMillis(),
maxListShardsRetryAttempts(),
maxCacheMissesBeforeReload(),
listShardsCacheAllowedAgeInSeconds(),
cacheMissWarningModulus(),
initialLeaseTableReadCapacity(),
initialLeaseTableWriteCapacity(),
hierarchicalShardSyncer(),
tableCreatorCallback(),
dynamoDbRequestTimeout(),
billingMode(),
leaseSerializer);
}
// private InitialPositionInStreamExtended getInitialPositionExtendedForStream(String streamName) {
// return multiStreamTracker() == null ?
// initialPositionInStream() :
// multiStreamTracker().initialPositionInStreamExtended(streamName) == null ?
// initialPositionInStream() :
// multiStreamTracker().initialPositionInStreamExtended(streamName);
// }
}

View file

@ -15,6 +15,7 @@
package software.amazon.kinesis.leases;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseRefresher;
import software.amazon.kinesis.metrics.MetricsFactory;
@ -26,7 +27,16 @@ public interface LeaseManagementFactory {
ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFactory);
default ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFactory, StreamConfig streamConfig) {
throw new UnsupportedOperationException();
}
DynamoDBLeaseRefresher createLeaseRefresher();
ShardDetector createShardDetector();
default ShardDetector createShardDetector(StreamConfig streamConfig) {
throw new UnsupportedOperationException();
}
}

View file

@ -23,7 +23,6 @@ import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.AttributeValueUpdate;
import software.amazon.awssdk.services.dynamodb.model.ExpectedAttributeValue;
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
import software.amazon.kinesis.leases.Lease;
/**
* Utility class that manages the mapping of Lease objects/operations to records in DynamoDB.
@ -46,6 +45,11 @@ public interface LeaseSerializer {
*/
Lease fromDynamoRecord(Map<String, AttributeValue> dynamoRecord);
default Lease fromDynamoRecord(Map<String, AttributeValue> dynamoRecord, Lease leaseToUpdate) {
throw new UnsupportedOperationException();
}
/**
* @param lease
* @return the attribute value map representing a Lease's hash key given a Lease object.

View file

@ -0,0 +1,91 @@
package software.amazon.kinesis.leases;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.Setter;
import lombok.experimental.Accessors;
import org.apache.commons.lang3.Validate;
import java.util.Objects;
import static com.google.common.base.Verify.verifyNotNull;
@Setter
@NoArgsConstructor
@Getter
@Accessors(fluent = true)
public class MultiStreamLease extends Lease {
@NonNull private String streamName;
@NonNull private String shardId;
public MultiStreamLease(Lease other) {
super(other);
MultiStreamLease casted = validateAndCast(other);
streamName(casted.streamName);
shardId(casted.shardId);
}
@Override
public void update(Lease other) {
MultiStreamLease casted = validateAndCast(other);
super.update(casted);
streamName(casted.streamName);
shardId(casted.shardId);
}
public static String getLeaseKey(String streamName, String shardId) {
verifyNotNull(streamName, "streamName should not be null");
verifyNotNull(shardId, "shardId should not be null");
return streamName + ":" + shardId;
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), streamName);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!super.equals(obj)) {
return false;
}
if (!(obj instanceof MultiStreamLease)) {
return false;
}
MultiStreamLease other = (MultiStreamLease) obj;
if (streamName == null) {
if (other.streamName != null) {
return false;
}
} else if (!streamName.equals(other.streamName)) {
return false;
}
return true;
}
/**
* Returns a deep copy of this object. Type-unsafe - there aren't good mechanisms for copy-constructing generics.
*
* @return A deep copy of this object.
*/
@Override
public MultiStreamLease copy() {
return new MultiStreamLease(this);
}
/**
* Validate and cast the lease to MultiStream lease
* @param lease
* @return MultiStreamLease
*/
public static MultiStreamLease validateAndCast(Lease lease) {
Validate.isInstanceOf(MultiStreamLease.class, lease);
return (MultiStreamLease) lease;
}
}

View file

@ -36,6 +36,7 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
@ToString
public class ShardInfo {
private final String streamName;
private final String shardId;
private final String concurrencyToken;
// Sorted list of parent shardIds.
@ -54,11 +55,18 @@ public class ShardInfo {
* @param checkpoint
* the latest checkpoint from lease
*/
// TODO: check what values can be null
public ShardInfo(@NonNull final String shardId,
final String concurrencyToken,
final Collection<String> parentShardIds,
final ExtendedSequenceNumber checkpoint) {
this(shardId, concurrencyToken, parentShardIds, checkpoint, null);
}
public ShardInfo(@NonNull final String shardId,
final String concurrencyToken,
final Collection<String> parentShardIds,
final ExtendedSequenceNumber checkpoint,
final String streamName) {
this.shardId = shardId;
this.concurrencyToken = concurrencyToken;
this.parentShardIds = new LinkedList<>();
@ -69,6 +77,7 @@ public class ShardInfo {
// This makes it easy to check for equality in ShardInfo.equals method.
Collections.sort(this.parentShardIds);
this.checkpoint = checkpoint;
this.streamName = streamName;
}
/**
@ -94,7 +103,8 @@ public class ShardInfo {
*/
@Override
public int hashCode() {
return new HashCodeBuilder().append(concurrencyToken).append(parentShardIds).append(shardId).toHashCode();
return new HashCodeBuilder()
.append(concurrencyToken).append(parentShardIds).append(shardId).append(streamName).toHashCode();
}
/**
@ -118,7 +128,8 @@ public class ShardInfo {
}
ShardInfo other = (ShardInfo) obj;
return new EqualsBuilder().append(concurrencyToken, other.concurrencyToken)
.append(parentShardIds, other.parentShardIds).append(shardId, other.shardId).isEquals();
.append(parentShardIds, other.parentShardIds).append(shardId, other.shardId)
.append(streamName, other.streamName).isEquals();
}

View file

@ -33,11 +33,13 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.leases.CompositeLeaseKey;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.LeaseRenewer;
import software.amazon.kinesis.leases.LeaseTaker;
import software.amazon.kinesis.leases.MultiStreamLease;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
@ -377,9 +379,22 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
return leases.stream().map(DynamoDBLeaseCoordinator::convertLeaseToAssignment).collect(Collectors.toList());
}
// TODO : Halo : Reenable for backward compatibility
// public static ShardInfo convertLeaseToAssignment(final Lease lease) {
// return new ShardInfo(lease.leaseKey(), lease.concurrencyToken().toString(), lease.parentShardIds(),
// lease.checkpoint());
// }
// TODO : Support Shard
public static ShardInfo convertLeaseToAssignment(final Lease lease) {
return new ShardInfo(lease.leaseKey(), lease.concurrencyToken().toString(), lease.parentShardIds(),
lease.checkpoint());
if (lease instanceof MultiStreamLease) {
return new ShardInfo(((MultiStreamLease) lease).shardId(), lease.concurrencyToken().toString(), lease.parentShardIds(),
lease.checkpoint(), ((MultiStreamLease) lease).streamName());
} else {
return new ShardInfo(lease.leaseKey(), lease.concurrencyToken().toString(), lease.parentShardIds(),
lease.checkpoint());
}
}
/**

View file

@ -16,6 +16,9 @@
package software.amazon.kinesis.leases.dynamodb;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import lombok.Data;
@ -23,13 +26,17 @@ import lombok.NonNull;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.awssdk.utils.Validate;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.leases.KinesisShardDetector;
import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.LeaseManagementConfig;
import software.amazon.kinesis.leases.LeaseManagementFactory;
import software.amazon.kinesis.leases.LeaseSerializer;
import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardSyncTaskManager;
import software.amazon.kinesis.metrics.MetricsFactory;
@ -44,8 +51,6 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
@NonNull
private final KinesisAsyncClient kinesisClient;
@NonNull
private final String streamName;
@NonNull
private final DynamoDbAsyncClient dynamoDBClient;
@NonNull
private final String tableName;
@ -54,9 +59,11 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
@NonNull
private final ExecutorService executorService;
@NonNull
private final InitialPositionInStreamExtended initialPositionInStream;
@NonNull
private final HierarchicalShardSyncer hierarchicalShardSyncer;
@NonNull
private final LeaseSerializer leaseSerializer;
@NonNull
private StreamConfig streamConfig;
private final long failoverTimeMillis;
private final long epsilonMillis;
@ -309,6 +316,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
* @param dynamoDbRequestTimeout
* @param billingMode
*/
@Deprecated
public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final String streamName,
final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier,
final ExecutorService executorService, final InitialPositionInStreamExtended initialPositionInStream,
@ -321,13 +329,83 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity,
final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
Duration dynamoDbRequestTimeout, BillingMode billingMode) {
this(kinesisClient, new StreamConfig(streamName, initialPositionInStream), dynamoDBClient, tableName,
workerIdentifier, executorService, failoverTimeMillis, epsilonMillis, maxLeasesForWorker,
maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity,
hierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode, new DynamoDBLeaseSerializer());
}
/**
* Constructor.
*
* @param kinesisClient
* @param streamConfig
* @param dynamoDBClient
* @param tableName
* @param workerIdentifier
* @param executorService
* @param failoverTimeMillis
* @param epsilonMillis
* @param maxLeasesForWorker
* @param maxLeasesToStealAtOneTime
* @param maxLeaseRenewalThreads
* @param cleanupLeasesUponShardCompletion
* @param ignoreUnexpectedChildShards
* @param shardSyncIntervalMillis
* @param consistentReads
* @param listShardsBackoffTimeMillis
* @param maxListShardsRetryAttempts
* @param maxCacheMissesBeforeReload
* @param listShardsCacheAllowedAgeInSeconds
* @param cacheMissWarningModulus
* @param initialLeaseTableReadCapacity
* @param initialLeaseTableWriteCapacity
* @param hierarchicalShardSyncer
* @param tableCreatorCallback
* @param dynamoDbRequestTimeout
* @param billingMode
*/
public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final StreamConfig streamConfig,
final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier,
final ExecutorService executorService, final long failoverTimeMillis, final long epsilonMillis,
final int maxLeasesForWorker, final int maxLeasesToStealAtOneTime, final int maxLeaseRenewalThreads,
final boolean cleanupLeasesUponShardCompletion, final boolean ignoreUnexpectedChildShards,
final long shardSyncIntervalMillis, final boolean consistentReads, final long listShardsBackoffTimeMillis,
final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload,
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus,
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity,
final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
Duration dynamoDbRequestTimeout, BillingMode billingMode, LeaseSerializer leaseSerializer) {
this(kinesisClient, dynamoDBClient, tableName,
workerIdentifier, executorService, failoverTimeMillis, epsilonMillis, maxLeasesForWorker,
maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity,
hierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode, leaseSerializer);
this.streamConfig = streamConfig;
}
public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient,
final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier,
final ExecutorService executorService, final long failoverTimeMillis, final long epsilonMillis,
final int maxLeasesForWorker, final int maxLeasesToStealAtOneTime, final int maxLeaseRenewalThreads,
final boolean cleanupLeasesUponShardCompletion, final boolean ignoreUnexpectedChildShards,
final long shardSyncIntervalMillis, final boolean consistentReads, final long listShardsBackoffTimeMillis,
final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload,
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus,
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity,
final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
Duration dynamoDbRequestTimeout, BillingMode billingMode, LeaseSerializer leaseSerializer) {
this.kinesisClient = kinesisClient;
this.streamName = streamName;
this.dynamoDBClient = dynamoDBClient;
this.tableName = tableName;
this.workerIdentifier = workerIdentifier;
this.executorService = executorService;
this.initialPositionInStream = initialPositionInStream;
this.failoverTimeMillis = failoverTimeMillis;
this.epsilonMillis = epsilonMillis;
this.maxLeasesForWorker = maxLeasesForWorker;
@ -348,6 +426,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
this.tableCreatorCallback = tableCreatorCallback;
this.dynamoDbRequestTimeout = dynamoDbRequestTimeout;
this.billingMode = billingMode;
this.leaseSerializer = leaseSerializer;
}
@Override
@ -364,11 +443,24 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
metricsFactory);
}
@Override
@Override @Deprecated
public ShardSyncTaskManager createShardSyncTaskManager(@NonNull final MetricsFactory metricsFactory) {
return new ShardSyncTaskManager(this.createShardDetector(),
this.createLeaseRefresher(),
initialPositionInStream,
streamConfig.initialPositionInStreamExtended(),
cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards,
shardSyncIntervalMillis,
executorService,
hierarchicalShardSyncer,
metricsFactory);
}
@Override
public ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFactory, StreamConfig streamConfig) {
return new ShardSyncTaskManager(this.createShardDetector(streamConfig),
this.createLeaseRefresher(),
streamConfig.initialPositionInStreamExtended(),
cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards,
shardSyncIntervalMillis,
@ -379,13 +471,20 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
@Override
public DynamoDBLeaseRefresher createLeaseRefresher() {
return new DynamoDBLeaseRefresher(tableName, dynamoDBClient, new DynamoDBLeaseSerializer(), consistentReads,
return new DynamoDBLeaseRefresher(tableName, dynamoDBClient, leaseSerializer, consistentReads,
tableCreatorCallback, dynamoDbRequestTimeout, billingMode);
}
@Override
@Override @Deprecated
public ShardDetector createShardDetector() {
return new KinesisShardDetector(kinesisClient, streamName, listShardsBackoffTimeMillis,
return new KinesisShardDetector(kinesisClient, streamConfig.streamName(),
listShardsBackoffTimeMillis, maxListShardsRetryAttempts, listShardsCacheAllowedAgeInSeconds,
maxCacheMissesBeforeReload, cacheMissWarningModulus, dynamoDbRequestTimeout);
}
@Override
public ShardDetector createShardDetector(StreamConfig streamConfig) {
return new KinesisShardDetector(kinesisClient, streamConfig.streamName(), listShardsBackoffTimeMillis,
maxListShardsRetryAttempts, listShardsCacheAllowedAgeInSeconds, maxCacheMissesBeforeReload,
cacheMissWarningModulus, dynamoDbRequestTimeout);
}

View file

@ -80,28 +80,32 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer {
@Override
public Lease fromDynamoRecord(final Map<String, AttributeValue> dynamoRecord) {
Lease result = new Lease();
result.leaseKey(DynamoUtils.safeGetString(dynamoRecord, LEASE_KEY_KEY));
result.leaseOwner(DynamoUtils.safeGetString(dynamoRecord, LEASE_OWNER_KEY));
result.leaseCounter(DynamoUtils.safeGetLong(dynamoRecord, LEASE_COUNTER_KEY));
final Lease result = new Lease();
return fromDynamoRecord(dynamoRecord, result);
}
result.ownerSwitchesSinceCheckpoint(DynamoUtils.safeGetLong(dynamoRecord, OWNER_SWITCHES_KEY));
result.checkpoint(
@Override
public Lease fromDynamoRecord(Map<String, AttributeValue> dynamoRecord, Lease leaseToUpdate) {
leaseToUpdate.leaseKey(DynamoUtils.safeGetString(dynamoRecord, LEASE_KEY_KEY));
leaseToUpdate.leaseOwner(DynamoUtils.safeGetString(dynamoRecord, LEASE_OWNER_KEY));
leaseToUpdate.leaseCounter(DynamoUtils.safeGetLong(dynamoRecord, LEASE_COUNTER_KEY));
leaseToUpdate.ownerSwitchesSinceCheckpoint(DynamoUtils.safeGetLong(dynamoRecord, OWNER_SWITCHES_KEY));
leaseToUpdate.checkpoint(
new ExtendedSequenceNumber(
DynamoUtils.safeGetString(dynamoRecord, CHECKPOINT_SEQUENCE_NUMBER_KEY),
DynamoUtils.safeGetLong(dynamoRecord, CHECKPOINT_SUBSEQUENCE_NUMBER_KEY))
);
result.parentShardIds(DynamoUtils.safeGetSS(dynamoRecord, PARENT_SHARD_ID_KEY));
leaseToUpdate.parentShardIds(DynamoUtils.safeGetSS(dynamoRecord, PARENT_SHARD_ID_KEY));
if (!Strings.isNullOrEmpty(DynamoUtils.safeGetString(dynamoRecord, PENDING_CHECKPOINT_SEQUENCE_KEY))) {
result.pendingCheckpoint(
leaseToUpdate.pendingCheckpoint(
new ExtendedSequenceNumber(
DynamoUtils.safeGetString(dynamoRecord, PENDING_CHECKPOINT_SEQUENCE_KEY),
DynamoUtils.safeGetLong(dynamoRecord, PENDING_CHECKPOINT_SUBSEQUENCE_KEY))
);
}
return result;
return leaseToUpdate;
}
@Override
@ -198,7 +202,7 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer {
return result;
}
private AttributeValueUpdate putUpdate(AttributeValue attributeValue) {
protected AttributeValueUpdate putUpdate(AttributeValue attributeValue) {
return AttributeValueUpdate.builder().value(attributeValue).action(AttributeAction.PUT).build();
}

View file

@ -0,0 +1,47 @@
package software.amazon.kinesis.leases.dynamodb;
import lombok.NoArgsConstructor;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.AttributeValueUpdate;
import software.amazon.kinesis.leases.DynamoUtils;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.MultiStreamLease;
import java.util.Map;
import static software.amazon.kinesis.leases.MultiStreamLease.validateAndCast;
@NoArgsConstructor
public class DynamoDBMultiStreamLeaseSerializer extends DynamoDBLeaseSerializer {
private static final String STREAM_NAME_KEY = "streamName";
private static final String SHARD_ID_KEY = "shardId";
@Override
public Map<String, AttributeValue> toDynamoRecord(Lease lease) {
final MultiStreamLease multiStreamLease = validateAndCast(lease);
final Map<String, AttributeValue> result = super.toDynamoRecord(multiStreamLease);
result.put(STREAM_NAME_KEY, DynamoUtils.createAttributeValue(multiStreamLease.streamName()));
result.put(SHARD_ID_KEY, DynamoUtils.createAttributeValue(multiStreamLease.shardId()));
return result;
}
@Override
public MultiStreamLease fromDynamoRecord(Map<String, AttributeValue> dynamoRecord) {
final MultiStreamLease multiStreamLease = (MultiStreamLease) super
.fromDynamoRecord(dynamoRecord, new MultiStreamLease());
multiStreamLease.streamName(DynamoUtils.safeGetString(dynamoRecord, STREAM_NAME_KEY));
multiStreamLease.shardId(DynamoUtils.safeGetString(dynamoRecord, SHARD_ID_KEY));
return multiStreamLease;
}
@Override
public Map<String, AttributeValueUpdate> getDynamoUpdateLeaseUpdate(Lease lease) {
final MultiStreamLease multiStreamLease = validateAndCast(lease);
final Map<String, AttributeValueUpdate> result = super.getDynamoUpdateLeaseUpdate(multiStreamLease);
result.put(STREAM_NAME_KEY, putUpdate(DynamoUtils.createAttributeValue(multiStreamLease.streamName())));
result.put(SHARD_ID_KEY, putUpdate(DynamoUtils.createAttributeValue(multiStreamLease.shardId())));
return result;
}
}

View file

@ -27,50 +27,50 @@ public interface Checkpointer {
* Record a checkpoint for a shard (e.g. sequence and subsequence numbers of last record processed
* by application). Upon failover, record processing is resumed from this point.
*
* @param shardId Checkpoint is specified for this shard.
* @param leaseKey Checkpoint is specified for this shard.
* @param checkpointValue Value of the checkpoint (e.g. Kinesis sequence number and subsequence number)
* @param concurrencyToken Used with conditional writes to prevent stale updates
* (e.g. if there was a fail over to a different record processor, we don't want to
* overwrite it's checkpoint)
* @throws KinesisClientLibException Thrown if we were unable to save the checkpoint
*/
void setCheckpoint(String shardId, ExtendedSequenceNumber checkpointValue, String concurrencyToken)
void setCheckpoint(String leaseKey, ExtendedSequenceNumber checkpointValue, String concurrencyToken)
throws KinesisClientLibException;
/**
* Get the current checkpoint stored for the specified shard. Useful for checking that the parent shard
* has been completely processed before we start processing the child shard.
*
* @param shardId Current checkpoint for this shard is fetched
* @param leaseKey Current checkpoint for this shard is fetched
* @return Current checkpoint for this shard, null if there is no record for this shard.
* @throws KinesisClientLibException Thrown if we are unable to fetch the checkpoint
*/
ExtendedSequenceNumber getCheckpoint(String shardId) throws KinesisClientLibException;
ExtendedSequenceNumber getCheckpoint(String leaseKey) throws KinesisClientLibException;
/**
* Get the current checkpoint stored for the specified shard, which holds the sequence numbers for the checkpoint
* and pending checkpoint. Useful for checking that the parent shard has been completely processed before we start
* processing the child shard.
*
* @param shardId Current checkpoint for this shard is fetched
* @param leaseKey Current checkpoint for this shard is fetched
* @return Current checkpoint object for this shard, null if there is no record for this shard.
* @throws KinesisClientLibException Thrown if we are unable to fetch the checkpoint
*/
Checkpoint getCheckpointObject(String shardId) throws KinesisClientLibException;
Checkpoint getCheckpointObject(String leaseKey) throws KinesisClientLibException;
/**
* Record intent to checkpoint for a shard. Upon failover, the pendingCheckpointValue will be passed to the new
* ShardRecordProcessor's initialize() method.
*
* @param shardId Checkpoint is specified for this shard.
* @param leaseKey Checkpoint is specified for this shard.
* @param pendingCheckpoint Value of the pending checkpoint (e.g. Kinesis sequence number and subsequence number)
* @param concurrencyToken Used with conditional writes to prevent stale updates
* (e.g. if there was a fail over to a different record processor, we don't want to
* overwrite it's checkpoint)
* @throws KinesisClientLibException Thrown if we were unable to save the checkpoint
*/
void prepareCheckpoint(String shardId, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken)
void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken)
throws KinesisClientLibException;
void operation(String operation);

View file

@ -0,0 +1,27 @@
package software.amazon.kinesis.processor;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import java.util.List;
/**
* Interface for stream trackers. This is useful for KCL Workers that need
* to consume data from multiple streams.
*/
public interface MultiStreamTracker {
/**
* Returns the list of streams that the Worker should consume data from.
*
* @return List of stream names
*/
List<String> listStreamsToProcess();
/**
* Returns the initial position in stream to read from, for the given stream.
* @param streamName
* @return Initial position to read from, for the given stream
*/
InitialPositionInStreamExtended initialPositionInStreamExtended(String streamName);
}

View file

@ -24,5 +24,15 @@ public interface ShardRecordProcessorFactory {
*
* @return
*/
ShardRecordProcessor shardRecordProcessor();
// TODO : Halo : Reenable
// ShardRecordProcessor shardRecordProcessor();
/**
* Returns a new instance of the ShardRecordProcessor for a stream
* @param streamName
* @return ShardRecordProcessor
*/
default ShardRecordProcessor shardRecordProcessor(String streamName) {
throw new UnsupportedOperationException();
}
}

View file

@ -21,6 +21,7 @@ import lombok.experimental.Accessors;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.processor.MultiStreamTracker;
import software.amazon.kinesis.retrieval.fanout.FanOutConfig;
/**
@ -51,6 +52,11 @@ public class RetrievalConfig {
@NonNull
private final String applicationName;
/**
* StreamTracker for multi streaming support
*/
private MultiStreamTracker multiStreamTracker;
/**
* Backoff time between consecutive ListShards calls.
*

View file

@ -39,14 +39,14 @@ public class InMemoryCheckpointer implements Checkpointer {
* {@inheritDoc}
*/
@Override
public void setCheckpoint(String shardId, ExtendedSequenceNumber checkpointValue, String concurrencyToken)
public void setCheckpoint(String leaseKey, ExtendedSequenceNumber checkpointValue, String concurrencyToken)
throws KinesisClientLibException {
checkpoints.put(shardId, checkpointValue);
flushpoints.put(shardId, checkpointValue);
pendingCheckpoints.remove(shardId);
checkpoints.put(leaseKey, checkpointValue);
flushpoints.put(leaseKey, checkpointValue);
pendingCheckpoints.remove(leaseKey);
if (log.isDebugEnabled()) {
log.debug("shardId: {} checkpoint: {}", shardId, checkpointValue);
log.debug("shardId: {} checkpoint: {}", leaseKey, checkpointValue);
}
}
@ -55,25 +55,25 @@ public class InMemoryCheckpointer implements Checkpointer {
* {@inheritDoc}
*/
@Override
public ExtendedSequenceNumber getCheckpoint(String shardId) throws KinesisClientLibException {
ExtendedSequenceNumber checkpoint = flushpoints.get(shardId);
log.debug("checkpoint shardId: {} checkpoint: {}", shardId, checkpoint);
public ExtendedSequenceNumber getCheckpoint(String leaseKey) throws KinesisClientLibException {
ExtendedSequenceNumber checkpoint = flushpoints.get(leaseKey);
log.debug("checkpoint shardId: {} checkpoint: {}", leaseKey, checkpoint);
return checkpoint;
}
@Override
public void prepareCheckpoint(String shardId, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken)
public void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken)
throws KinesisClientLibException {
pendingCheckpoints.put(shardId, pendingCheckpoint);
pendingCheckpoints.put(leaseKey, pendingCheckpoint);
}
@Override
public Checkpoint getCheckpointObject(String shardId) throws KinesisClientLibException {
ExtendedSequenceNumber checkpoint = flushpoints.get(shardId);
ExtendedSequenceNumber pendingCheckpoint = pendingCheckpoints.get(shardId);
public Checkpoint getCheckpointObject(String leaseKey) throws KinesisClientLibException {
ExtendedSequenceNumber checkpoint = flushpoints.get(leaseKey);
ExtendedSequenceNumber pendingCheckpoint = pendingCheckpoints.get(leaseKey);
Checkpoint checkpointObj = new Checkpoint(checkpoint, pendingCheckpoint);
log.debug("getCheckpointObject shardId: {}, {}", shardId, checkpointObj);
log.debug("getCheckpointObject shardId: {}, {}", leaseKey, checkpointObj);
return checkpointObj;
}