diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/dynamodb/DynamoDBCheckpointer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/dynamodb/DynamoDBCheckpointer.java index b0ce7675..fb7a6fc7 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/dynamodb/DynamoDBCheckpointer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/dynamodb/DynamoDBCheckpointer.java @@ -54,54 +54,54 @@ public class DynamoDBCheckpointer implements Checkpointer { private String operation; @Override - public void setCheckpoint(final String shardId, final ExtendedSequenceNumber checkpointValue, + public void setCheckpoint(final String leaseKey, final ExtendedSequenceNumber checkpointValue, final String concurrencyToken) throws KinesisClientLibException { try { - boolean wasSuccessful = setCheckpoint(shardId, checkpointValue, UUID.fromString(concurrencyToken)); + boolean wasSuccessful = setCheckpoint(leaseKey, checkpointValue, UUID.fromString(concurrencyToken)); if (!wasSuccessful) { throw new ShutdownException("Can't update checkpoint - instance doesn't hold the lease for this shard"); } } catch (ProvisionedThroughputException e) { throw new ThrottlingException("Got throttled while updating checkpoint.", e); } catch (InvalidStateException e) { - String message = "Unable to save checkpoint for shardId " + shardId; + String message = "Unable to save checkpoint for shardId " + leaseKey; log.error(message, e); throw new software.amazon.kinesis.exceptions.InvalidStateException(message, e); } catch (DependencyException e) { - throw new KinesisClientLibDependencyException("Unable to save checkpoint for shardId " + shardId, e); + throw new KinesisClientLibDependencyException("Unable to save checkpoint for shardId " + leaseKey, e); } } @Override - public ExtendedSequenceNumber getCheckpoint(final String shardId) throws KinesisClientLibException { + public ExtendedSequenceNumber getCheckpoint(final String leaseKey) throws KinesisClientLibException { try { - return leaseRefresher.getLease(shardId).checkpoint(); + return leaseRefresher.getLease(leaseKey).checkpoint(); } catch (DependencyException | InvalidStateException | ProvisionedThroughputException e) { - String message = "Unable to fetch checkpoint for shardId " + shardId; + String message = "Unable to fetch checkpoint for shardId " + leaseKey; log.error(message, e); throw new KinesisClientLibIOException(message, e); } } @Override - public Checkpoint getCheckpointObject(final String shardId) throws KinesisClientLibException { + public Checkpoint getCheckpointObject(final String leaseKey) throws KinesisClientLibException { try { - Lease lease = leaseRefresher.getLease(shardId); - log.debug("[{}] Retrieved lease => {}", shardId, lease); + Lease lease = leaseRefresher.getLease(leaseKey); + log.debug("[{}] Retrieved lease => {}", leaseKey, lease); return new Checkpoint(lease.checkpoint(), lease.pendingCheckpoint()); } catch (DependencyException | InvalidStateException | ProvisionedThroughputException e) { - String message = "Unable to fetch checkpoint for shardId " + shardId; + String message = "Unable to fetch checkpoint for shardId " + leaseKey; log.error(message, e); throw new KinesisClientLibIOException(message, e); } } @Override - public void prepareCheckpoint(final String shardId, final ExtendedSequenceNumber pendingCheckpoint, + public void prepareCheckpoint(final String leaseKey, final ExtendedSequenceNumber pendingCheckpoint, final String concurrencyToken) throws KinesisClientLibException { try { boolean wasSuccessful = - prepareCheckpoint(shardId, pendingCheckpoint, UUID.fromString(concurrencyToken)); + prepareCheckpoint(leaseKey, pendingCheckpoint, UUID.fromString(concurrencyToken)); if (!wasSuccessful) { throw new ShutdownException( "Can't prepare checkpoint - instance doesn't hold the lease for this shard"); @@ -109,21 +109,21 @@ public class DynamoDBCheckpointer implements Checkpointer { } catch (ProvisionedThroughputException e) { throw new ThrottlingException("Got throttled while preparing checkpoint.", e); } catch (InvalidStateException e) { - String message = "Unable to prepare checkpoint for shardId " + shardId; + String message = "Unable to prepare checkpoint for shardId " + leaseKey; log.error(message, e); throw new software.amazon.kinesis.exceptions.InvalidStateException(message, e); } catch (DependencyException e) { - throw new KinesisClientLibDependencyException("Unable to prepare checkpoint for shardId " + shardId, e); + throw new KinesisClientLibDependencyException("Unable to prepare checkpoint for shardId " + leaseKey, e); } } @VisibleForTesting - public boolean setCheckpoint(String shardId, ExtendedSequenceNumber checkpoint, UUID concurrencyToken) + public boolean setCheckpoint(String leaseKey, ExtendedSequenceNumber checkpoint, UUID concurrencyToken) throws DependencyException, InvalidStateException, ProvisionedThroughputException { - Lease lease = leaseCoordinator.getCurrentlyHeldLease(shardId); + Lease lease = leaseCoordinator.getCurrentlyHeldLease(leaseKey); if (lease == null) { log.info("Worker {} could not update checkpoint for shard {} because it does not hold the lease", - leaseCoordinator.workerIdentifier(), shardId); + leaseCoordinator.workerIdentifier(), leaseKey); return false; } @@ -131,20 +131,20 @@ public class DynamoDBCheckpointer implements Checkpointer { lease.pendingCheckpoint(null); lease.ownerSwitchesSinceCheckpoint(0L); - return leaseCoordinator.updateLease(lease, concurrencyToken, operation, shardId); + return leaseCoordinator.updateLease(lease, concurrencyToken, operation, leaseKey); } - boolean prepareCheckpoint(String shardId, ExtendedSequenceNumber pendingCheckpoint, UUID concurrencyToken) + boolean prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, UUID concurrencyToken) throws DependencyException, InvalidStateException, ProvisionedThroughputException { - Lease lease = leaseCoordinator.getCurrentlyHeldLease(shardId); + Lease lease = leaseCoordinator.getCurrentlyHeldLease(leaseKey); if (lease == null) { log.info("Worker {} could not prepare checkpoint for shard {} because it does not hold the lease", - leaseCoordinator.workerIdentifier(), shardId); + leaseCoordinator.workerIdentifier(), leaseKey); return false; } lease.pendingCheckpoint(Objects.requireNonNull(pendingCheckpoint, "pendingCheckpoint should not be null")); - return leaseCoordinator.updateLease(lease, concurrencyToken, operation, shardId); + return leaseCoordinator.updateLease(lease, concurrencyToken, operation, leaseKey); } @Override diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/ConfigsBuilder.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/ConfigsBuilder.java index c7f56d8d..7026e34c 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/ConfigsBuilder.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/ConfigsBuilder.java @@ -30,6 +30,7 @@ import software.amazon.kinesis.lifecycle.LifecycleConfig; import software.amazon.kinesis.metrics.MetricsConfig; import software.amazon.kinesis.processor.ProcessorConfig; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; +import software.amazon.kinesis.processor.MultiStreamTracker; import software.amazon.kinesis.retrieval.RetrievalConfig; /** @@ -108,6 +109,8 @@ public class ConfigsBuilder { return namespace; } + private MultiStreamTracker multiStreamTracker; + /** * Creates a new instance of CheckpointConfig * @@ -170,6 +173,10 @@ public class ConfigsBuilder { * @return RetrievalConfig */ public RetrievalConfig retrievalConfig() { - return new RetrievalConfig(kinesisClient(), streamName(), applicationName()); + final RetrievalConfig retrievalConfig = new RetrievalConfig(kinesisClient(), streamName(), applicationName()); + if(this.multiStreamTracker != null) { + retrievalConfig.multiStreamTracker(multiStreamTracker); + } + return retrievalConfig; } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamConfig.java new file mode 100644 index 00000000..55451709 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamConfig.java @@ -0,0 +1,14 @@ +package software.amazon.kinesis.common; + +import lombok.AccessLevel; +import lombok.Data; +import lombok.experimental.Accessors; +import lombok.experimental.FieldDefaults; + +@Data +@Accessors(fluent = true) +@FieldDefaults(makeFinal=true, level= AccessLevel.PRIVATE) +public class StreamConfig { + String streamName; + InitialPositionInStreamExtended initialPositionInStreamExtended; +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index eaeb5a1c..b6b7fab3 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; import io.reactivex.plugins.RxJavaPlugins; import lombok.AccessLevel; import lombok.Getter; @@ -36,6 +37,8 @@ import lombok.NoArgsConstructor; import lombok.NonNull; import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.Validate; +import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.kinesis.checkpoint.CheckpointConfig; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; import software.amazon.kinesis.common.InitialPositionInStreamExtended; @@ -43,6 +46,7 @@ import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseManagementConfig; import software.amazon.kinesis.leases.LeaseRefresher; +import software.amazon.kinesis.leases.LeaseSerializer; import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.ShardPrioritization; @@ -50,6 +54,8 @@ import software.amazon.kinesis.leases.ShardSyncTask; import software.amazon.kinesis.leases.ShardSyncTaskManager; import software.amazon.kinesis.leases.HierarchicalShardSyncer; import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseCoordinator; +import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseSerializer; +import software.amazon.kinesis.leases.dynamodb.DynamoDBMultiStreamLeaseSerializer; import software.amazon.kinesis.leases.exceptions.LeasingException; import software.amazon.kinesis.lifecycle.LifecycleConfig; import software.amazon.kinesis.lifecycle.ShardConsumer; @@ -66,6 +72,7 @@ import software.amazon.kinesis.processor.Checkpointer; import software.amazon.kinesis.processor.ProcessorConfig; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; import software.amazon.kinesis.processor.ShutdownNotificationAware; +import software.amazon.kinesis.processor.MultiStreamTracker; import software.amazon.kinesis.retrieval.AggregatorUtil; import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RetrievalConfig; @@ -100,7 +107,7 @@ public class Scheduler implements Runnable { private final DiagnosticEventHandler diagnosticEventHandler; // private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; private final LeaseCoordinator leaseCoordinator; - private final ShardSyncTaskManager shardSyncTaskManager; +// private final ShardSyncTaskManager shardSyncTaskManager; private final ShardPrioritization shardPrioritization; private final boolean cleanupLeasesUponShardCompletion; private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist; @@ -110,11 +117,12 @@ public class Scheduler implements Runnable { private final MetricsFactory metricsFactory; private final long failoverTimeMillis; private final long taskBackoffTimeMillis; - private final String streamName; + private final List listOfStreams; + private final MultiStreamTracker multiStreamTracker; private final long listShardsBackoffTimeMillis; private final int maxListShardsRetryAttempts; private final LeaseRefresher leaseRefresher; - private final ShardDetector shardDetector; +// private final ShardDetector shardDetector; private final boolean ignoreUnexpetedChildShards; private final AggregatorUtil aggregatorUtil; private final HierarchicalShardSyncer hierarchicalShardSyncer; @@ -170,9 +178,18 @@ public class Scheduler implements Runnable { this.retrievalConfig = retrievalConfig; this.applicationName = this.coordinatorConfig.applicationName(); + this.multiStreamTracker = this.retrievalConfig.multiStreamTracker(); + this.listOfStreams = this.multiStreamTracker == null ? + ImmutableList.of(this.retrievalConfig.streamName()) : + this.multiStreamTracker.listStreamsToProcess(); + Validate.isTrue(!CollectionUtils.isNullOrEmpty(this.listOfStreams), "No stream configured to process."); this.maxInitializationAttempts = this.coordinatorConfig.maxInitializationAttempts(); this.metricsFactory = this.metricsConfig.metricsFactory(); - this.leaseCoordinator = this.leaseManagementConfig.leaseManagementFactory() + // Determine leaseSerializer based on MultiStreamTracker + final LeaseSerializer leaseSerializer = this.multiStreamTracker == null ? + new DynamoDBMultiStreamLeaseSerializer() : + new DynamoDBLeaseSerializer(); + this.leaseCoordinator = this.leaseManagementConfig.leaseManagementFactory(leaseSerializer) .createLeaseCoordinator(this.metricsFactory); this.leaseRefresher = this.leaseCoordinator.leaseRefresher(); @@ -191,8 +208,8 @@ public class Scheduler implements Runnable { this.diagnosticEventFactory = diagnosticEventFactory; this.diagnosticEventHandler = new DiagnosticEventLogger(); - this.shardSyncTaskManager = this.leaseManagementConfig.leaseManagementFactory() - .createShardSyncTaskManager(this.metricsFactory); +// this.shardSyncTaskManager = this.leaseManagementConfig.leaseManagementFactory() +// .createShardSyncTaskManager(this.metricsFactory); this.shardPrioritization = this.coordinatorConfig.shardPrioritization(); this.cleanupLeasesUponShardCompletion = this.leaseManagementConfig.cleanupLeasesUponShardCompletion(); this.skipShardSyncAtWorkerInitializationIfLeasesExist = @@ -214,10 +231,9 @@ public class Scheduler implements Runnable { this.taskBackoffTimeMillis = this.lifecycleConfig.taskBackoffTimeMillis(); // this.retryGetRecordsInSeconds = this.retrievalConfig.retryGetRecordsInSeconds(); // this.maxGetRecordsThreadPool = this.retrievalConfig.maxGetRecordsThreadPool(); - this.streamName = this.retrievalConfig.streamName(); this.listShardsBackoffTimeMillis = this.retrievalConfig.listShardsBackoffTimeInMillis(); this.maxListShardsRetryAttempts = this.retrievalConfig.maxListShardsRetryAttempts(); - this.shardDetector = this.shardSyncTaskManager.shardDetector(); +// this.shardDetector = this.shardSyncTaskManager.shardDetector(); this.ignoreUnexpetedChildShards = this.leaseManagementConfig.ignoreUnexpectedChildShards(); this.aggregatorUtil = this.lifecycleConfig.aggregatorUtil(); this.hierarchicalShardSyncer = leaseManagementConfig.hierarchicalShardSyncer(); @@ -264,28 +280,35 @@ public class Scheduler implements Runnable { log.info("Initializing LeaseCoordinator"); leaseCoordinator.initialize(); - TaskResult result = null; + TaskResult result; if (!skipShardSyncAtWorkerInitializationIfLeasesExist || leaseRefresher.isLeaseTableEmpty()) { - log.info("Syncing Kinesis shard info"); - ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetector, leaseRefresher, initialPosition, - cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L, hierarchicalShardSyncer, - metricsFactory); - result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call(); + // TODO: Resume the shard sync from failed stream in the next attempt, to avoid syncing + // TODO: for already synced streams + for(String streamName : this.listOfStreams) { + log.info("Syncing Kinesis shard info"); + ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetector, leaseRefresher, + initialPosition, cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L, + hierarchicalShardSyncer, metricsFactory); + result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call(); + // Throwing the exception, to prevent further syncs for other stream. + if (result.getException() != null) { + log.error("Caught exception when sync'ing info for " + streamName, result.getException()); + throw result.getException(); + } + } } else { log.info("Skipping shard sync per configuration setting (and lease table is not empty)"); } - if (result == null || result.getException() == null) { - if (!leaseCoordinator.isRunning()) { - log.info("Starting LeaseCoordinator"); - leaseCoordinator.start(); - } else { - log.info("LeaseCoordinator is already running. No need to start it."); - } - isDone = true; + // If we reach this point, then we either skipped the lease sync or did not have any exception + // for any of the shard sync in the previous attempt. + if (!leaseCoordinator.isRunning()) { + log.info("Starting LeaseCoordinator"); + leaseCoordinator.start(); } else { - lastException = result.getException(); + log.info("LeaseCoordinator is already running. No need to start it."); } + isDone = true; } catch (LeasingException e) { log.error("Caught exception when initializing LeaseCoordinator", e); lastException = e; @@ -591,7 +614,7 @@ public class Scheduler implements Runnable { ShardRecordProcessorCheckpointer checkpointer = coordinatorConfig.coordinatorFactory().createRecordProcessorCheckpointer(shardInfo, checkpoint); ShardConsumerArgument argument = new ShardConsumerArgument(shardInfo, - streamName, + shardInfo.streamName(), leaseCoordinator, executorService, cache, diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/CompositeLeaseKey.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/CompositeLeaseKey.java new file mode 100644 index 00000000..7092ceaa --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/CompositeLeaseKey.java @@ -0,0 +1,46 @@ +package software.amazon.kinesis.leases; + +import lombok.Getter; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.Validate; + +import java.util.Optional; + +public class CompositeLeaseKey { + +// private static final String LEASE_TOKEN_SEPERATOR = ":"; +// +// private String streamName; +// +// @Getter +// private String shardId; +// +// public CompositeLeaseKey(String shardId) { +// this(null, shardId); +// } +// +// public CompositeLeaseKey(String streamName, String shardId) { +// this.streamName = streamName; +// this.shardId = shardId; +// } +// +// public Optional getStreamName() { +// return Optional.ofNullable(streamName); +// } +// +// public String getLeaseKey(boolean isMultiStreamingEnabled) { +// Validate.isTrue(!(isMultiStreamingEnabled && StringUtils.isEmpty(streamName)), +// "Empty stream name found while multiStreaming is enabled"); +// return isMultiStreamingEnabled ? StringUtils.joinWith(LEASE_TOKEN_SEPERATOR, streamName, shardId) : shardId; +// } +// +// public static CompositeLeaseKey getLeaseKey(String leaseKey) { +// Validate.notNull(leaseKey); +// String leaseTokens[] = leaseKey.split(LEASE_TOKEN_SEPERATOR); +// Validate.inclusiveBetween(1, 2, leaseTokens.length); +// return leaseTokens.length == 2 ? +// new CompositeLeaseKey(leaseTokens[0], leaseTokens[1]) : +// new CompositeLeaseKey(leaseTokens[0]); +// } + +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java index 20e0aa8f..3c71934b 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java @@ -16,6 +16,8 @@ package software.amazon.kinesis.leases; import java.time.Duration; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; @@ -27,6 +29,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.Data; import lombok.NonNull; import lombok.experimental.Accessors; +import org.apache.commons.lang3.Validate; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.model.BillingMode; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; @@ -36,6 +39,7 @@ import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseManagementFactory; import software.amazon.kinesis.leases.dynamodb.TableCreatorCallback; import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.metrics.NullMetricsFactory; +import software.amazon.kinesis.processor.MultiStreamTracker; /** * Used by the KCL to configure lease management. @@ -71,7 +75,7 @@ public class LeaseManagementConfig { * Name of the Kinesis Data Stream to read records from. */ @NonNull - private final String streamName; + private String streamName; /** * Used to distinguish different workers/processes of a KCL application. * @@ -116,7 +120,7 @@ public class LeaseManagementConfig { * *

Default value: {@link Integer#MAX_VALUE}

*/ - private int maxLeasesForWorker = Integer.MAX_VALUE;; + private int maxLeasesForWorker = Integer.MAX_VALUE; /** * Max leases to steal from another worker at one time (for load balancing). @@ -182,6 +186,24 @@ public class LeaseManagementConfig { private MetricsFactory metricsFactory = new NullMetricsFactory(); + @Deprecated + public LeaseManagementConfig(String tableName, DynamoDbAsyncClient dynamoDBClient, KinesisAsyncClient kinesisClient, + String streamName, String workerIdentifier) { + this.tableName = tableName; + this.dynamoDBClient = dynamoDBClient; + this.kinesisClient = kinesisClient; + this.streamName = streamName; + this.workerIdentifier = workerIdentifier; + } + + public LeaseManagementConfig(String tableName, DynamoDbAsyncClient dynamoDBClient, KinesisAsyncClient kinesisClient, + String workerIdentifier) { + this.tableName = tableName; + this.dynamoDBClient = dynamoDBClient; + this.kinesisClient = kinesisClient; + this.workerIdentifier = workerIdentifier; + } + /** * Returns the metrics factory. * @@ -244,9 +266,10 @@ public class LeaseManagementConfig { private LeaseManagementFactory leaseManagementFactory; + @Deprecated public LeaseManagementFactory leaseManagementFactory() { - if (leaseManagementFactory == null) { - leaseManagementFactory = new DynamoDBLeaseManagementFactory(kinesisClient(), + Validate.notEmpty(streamName(), "Stream name is empty"); + return new DynamoDBLeaseManagementFactory(kinesisClient(), streamName(), dynamoDBClient(), tableName(), @@ -271,8 +294,43 @@ public class LeaseManagementConfig { initialLeaseTableWriteCapacity(), hierarchicalShardSyncer(), tableCreatorCallback(), dynamoDbRequestTimeout(), billingMode()); - } - return leaseManagementFactory; } + public LeaseManagementFactory leaseManagementFactory(final LeaseSerializer leaseSerializer) { + return new DynamoDBLeaseManagementFactory(kinesisClient(), + dynamoDBClient(), + tableName(), + workerIdentifier(), + executorService(), + failoverTimeMillis(), + epsilonMillis(), + maxLeasesForWorker(), + maxLeasesToStealAtOneTime(), + maxLeaseRenewalThreads(), + cleanupLeasesUponShardCompletion(), + ignoreUnexpectedChildShards(), + shardSyncIntervalMillis(), + consistentReads(), + listShardsBackoffTimeInMillis(), + maxListShardsRetryAttempts(), + maxCacheMissesBeforeReload(), + listShardsCacheAllowedAgeInSeconds(), + cacheMissWarningModulus(), + initialLeaseTableReadCapacity(), + initialLeaseTableWriteCapacity(), + hierarchicalShardSyncer(), + tableCreatorCallback(), + dynamoDbRequestTimeout(), + billingMode(), + leaseSerializer); + } + +// private InitialPositionInStreamExtended getInitialPositionExtendedForStream(String streamName) { +// return multiStreamTracker() == null ? +// initialPositionInStream() : +// multiStreamTracker().initialPositionInStreamExtended(streamName) == null ? +// initialPositionInStream() : +// multiStreamTracker().initialPositionInStreamExtended(streamName); +// } + } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementFactory.java index 72f48fea..37f66258 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementFactory.java @@ -15,6 +15,7 @@ package software.amazon.kinesis.leases; +import software.amazon.kinesis.common.StreamConfig; import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseRefresher; import software.amazon.kinesis.metrics.MetricsFactory; @@ -26,7 +27,16 @@ public interface LeaseManagementFactory { ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFactory); + default ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFactory, StreamConfig streamConfig) { + throw new UnsupportedOperationException(); + } + DynamoDBLeaseRefresher createLeaseRefresher(); ShardDetector createShardDetector(); + + default ShardDetector createShardDetector(StreamConfig streamConfig) { + throw new UnsupportedOperationException(); + } + } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseSerializer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseSerializer.java index b8aa0339..95b98399 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseSerializer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseSerializer.java @@ -23,7 +23,6 @@ import software.amazon.awssdk.services.dynamodb.model.AttributeValue; import software.amazon.awssdk.services.dynamodb.model.AttributeValueUpdate; import software.amazon.awssdk.services.dynamodb.model.ExpectedAttributeValue; import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement; -import software.amazon.kinesis.leases.Lease; /** * Utility class that manages the mapping of Lease objects/operations to records in DynamoDB. @@ -46,6 +45,11 @@ public interface LeaseSerializer { */ Lease fromDynamoRecord(Map dynamoRecord); + + default Lease fromDynamoRecord(Map dynamoRecord, Lease leaseToUpdate) { + throw new UnsupportedOperationException(); + } + /** * @param lease * @return the attribute value map representing a Lease's hash key given a Lease object. diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/MultiStreamLease.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/MultiStreamLease.java new file mode 100644 index 00000000..9878e32c --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/MultiStreamLease.java @@ -0,0 +1,91 @@ +package software.amazon.kinesis.leases; + +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.NonNull; +import lombok.Setter; +import lombok.experimental.Accessors; +import org.apache.commons.lang3.Validate; + +import java.util.Objects; + +import static com.google.common.base.Verify.verifyNotNull; + +@Setter +@NoArgsConstructor +@Getter +@Accessors(fluent = true) +public class MultiStreamLease extends Lease { + + @NonNull private String streamName; + @NonNull private String shardId; + + public MultiStreamLease(Lease other) { + super(other); + MultiStreamLease casted = validateAndCast(other); + streamName(casted.streamName); + shardId(casted.shardId); + } + + @Override + public void update(Lease other) { + MultiStreamLease casted = validateAndCast(other); + super.update(casted); + streamName(casted.streamName); + shardId(casted.shardId); + } + + public static String getLeaseKey(String streamName, String shardId) { + verifyNotNull(streamName, "streamName should not be null"); + verifyNotNull(shardId, "shardId should not be null"); + return streamName + ":" + shardId; + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), streamName); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!super.equals(obj)) { + return false; + } + if (!(obj instanceof MultiStreamLease)) { + return false; + } + MultiStreamLease other = (MultiStreamLease) obj; + if (streamName == null) { + if (other.streamName != null) { + return false; + } + } else if (!streamName.equals(other.streamName)) { + return false; + } + return true; + } + + /** + * Returns a deep copy of this object. Type-unsafe - there aren't good mechanisms for copy-constructing generics. + * + * @return A deep copy of this object. + */ + @Override + public MultiStreamLease copy() { + return new MultiStreamLease(this); + } + + /** + * Validate and cast the lease to MultiStream lease + * @param lease + * @return MultiStreamLease + */ + public static MultiStreamLease validateAndCast(Lease lease) { + Validate.isInstanceOf(MultiStreamLease.class, lease); + return (MultiStreamLease) lease; + } + +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardInfo.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardInfo.java index 89b8f94a..75070d8c 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardInfo.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardInfo.java @@ -36,6 +36,7 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; @ToString public class ShardInfo { + private final String streamName; private final String shardId; private final String concurrencyToken; // Sorted list of parent shardIds. @@ -54,11 +55,18 @@ public class ShardInfo { * @param checkpoint * the latest checkpoint from lease */ - // TODO: check what values can be null public ShardInfo(@NonNull final String shardId, final String concurrencyToken, final Collection parentShardIds, final ExtendedSequenceNumber checkpoint) { + this(shardId, concurrencyToken, parentShardIds, checkpoint, null); + } + + public ShardInfo(@NonNull final String shardId, + final String concurrencyToken, + final Collection parentShardIds, + final ExtendedSequenceNumber checkpoint, + final String streamName) { this.shardId = shardId; this.concurrencyToken = concurrencyToken; this.parentShardIds = new LinkedList<>(); @@ -69,6 +77,7 @@ public class ShardInfo { // This makes it easy to check for equality in ShardInfo.equals method. Collections.sort(this.parentShardIds); this.checkpoint = checkpoint; + this.streamName = streamName; } /** @@ -94,7 +103,8 @@ public class ShardInfo { */ @Override public int hashCode() { - return new HashCodeBuilder().append(concurrencyToken).append(parentShardIds).append(shardId).toHashCode(); + return new HashCodeBuilder() + .append(concurrencyToken).append(parentShardIds).append(shardId).append(streamName).toHashCode(); } /** @@ -118,7 +128,8 @@ public class ShardInfo { } ShardInfo other = (ShardInfo) obj; return new EqualsBuilder().append(concurrencyToken, other.concurrencyToken) - .append(parentShardIds, other.parentShardIds).append(shardId, other.shardId).isEquals(); + .append(parentShardIds, other.parentShardIds).append(shardId, other.shardId) + .append(streamName, other.streamName).isEquals(); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java index fe31d996..01cd6683 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java @@ -33,11 +33,13 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.extern.slf4j.Slf4j; import software.amazon.kinesis.annotations.KinesisClientInternalApi; +import software.amazon.kinesis.leases.CompositeLeaseKey; import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.LeaseRenewer; import software.amazon.kinesis.leases.LeaseTaker; +import software.amazon.kinesis.leases.MultiStreamLease; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.InvalidStateException; @@ -377,9 +379,22 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator { return leases.stream().map(DynamoDBLeaseCoordinator::convertLeaseToAssignment).collect(Collectors.toList()); } + // TODO : Halo : Reenable for backward compatibility +// public static ShardInfo convertLeaseToAssignment(final Lease lease) { +// return new ShardInfo(lease.leaseKey(), lease.concurrencyToken().toString(), lease.parentShardIds(), +// lease.checkpoint()); +// } + + // TODO : Support Shard public static ShardInfo convertLeaseToAssignment(final Lease lease) { - return new ShardInfo(lease.leaseKey(), lease.concurrencyToken().toString(), lease.parentShardIds(), - lease.checkpoint()); + if (lease instanceof MultiStreamLease) { + return new ShardInfo(((MultiStreamLease) lease).shardId(), lease.concurrencyToken().toString(), lease.parentShardIds(), + lease.checkpoint(), ((MultiStreamLease) lease).streamName()); + } else { + return new ShardInfo(lease.leaseKey(), lease.concurrencyToken().toString(), lease.parentShardIds(), + lease.checkpoint()); + } + } /** diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java index c2ade429..124d28ca 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java @@ -16,6 +16,9 @@ package software.amazon.kinesis.leases.dynamodb; import java.time.Duration; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.ExecutorService; import lombok.Data; @@ -23,13 +26,17 @@ import lombok.NonNull; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.model.BillingMode; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.utils.CollectionUtils; +import software.amazon.awssdk.utils.Validate; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.common.StreamConfig; import software.amazon.kinesis.leases.HierarchicalShardSyncer; import software.amazon.kinesis.leases.KinesisShardDetector; import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseManagementConfig; import software.amazon.kinesis.leases.LeaseManagementFactory; +import software.amazon.kinesis.leases.LeaseSerializer; import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardSyncTaskManager; import software.amazon.kinesis.metrics.MetricsFactory; @@ -44,8 +51,6 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { @NonNull private final KinesisAsyncClient kinesisClient; @NonNull - private final String streamName; - @NonNull private final DynamoDbAsyncClient dynamoDBClient; @NonNull private final String tableName; @@ -54,9 +59,11 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { @NonNull private final ExecutorService executorService; @NonNull - private final InitialPositionInStreamExtended initialPositionInStream; - @NonNull private final HierarchicalShardSyncer hierarchicalShardSyncer; + @NonNull + private final LeaseSerializer leaseSerializer; + @NonNull + private StreamConfig streamConfig; private final long failoverTimeMillis; private final long epsilonMillis; @@ -309,6 +316,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { * @param dynamoDbRequestTimeout * @param billingMode */ + @Deprecated public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final String streamName, final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier, final ExecutorService executorService, final InitialPositionInStreamExtended initialPositionInStream, @@ -321,13 +329,83 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity, final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback, Duration dynamoDbRequestTimeout, BillingMode billingMode) { + + this(kinesisClient, new StreamConfig(streamName, initialPositionInStream), dynamoDBClient, tableName, + workerIdentifier, executorService, failoverTimeMillis, epsilonMillis, maxLeasesForWorker, + maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion, + ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis, + maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds, + cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity, + hierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode, new DynamoDBLeaseSerializer()); + } + + /** + * Constructor. + * + * @param kinesisClient + * @param streamConfig + * @param dynamoDBClient + * @param tableName + * @param workerIdentifier + * @param executorService + * @param failoverTimeMillis + * @param epsilonMillis + * @param maxLeasesForWorker + * @param maxLeasesToStealAtOneTime + * @param maxLeaseRenewalThreads + * @param cleanupLeasesUponShardCompletion + * @param ignoreUnexpectedChildShards + * @param shardSyncIntervalMillis + * @param consistentReads + * @param listShardsBackoffTimeMillis + * @param maxListShardsRetryAttempts + * @param maxCacheMissesBeforeReload + * @param listShardsCacheAllowedAgeInSeconds + * @param cacheMissWarningModulus + * @param initialLeaseTableReadCapacity + * @param initialLeaseTableWriteCapacity + * @param hierarchicalShardSyncer + * @param tableCreatorCallback + * @param dynamoDbRequestTimeout + * @param billingMode + */ + public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final StreamConfig streamConfig, + final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier, + final ExecutorService executorService, final long failoverTimeMillis, final long epsilonMillis, + final int maxLeasesForWorker, final int maxLeasesToStealAtOneTime, final int maxLeaseRenewalThreads, + final boolean cleanupLeasesUponShardCompletion, final boolean ignoreUnexpectedChildShards, + final long shardSyncIntervalMillis, final boolean consistentReads, final long listShardsBackoffTimeMillis, + final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload, + final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus, + final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity, + final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback, + Duration dynamoDbRequestTimeout, BillingMode billingMode, LeaseSerializer leaseSerializer) { + this(kinesisClient, dynamoDBClient, tableName, + workerIdentifier, executorService, failoverTimeMillis, epsilonMillis, maxLeasesForWorker, + maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion, + ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis, + maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds, + cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity, + hierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode, leaseSerializer); + this.streamConfig = streamConfig; + } + + public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, + final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier, + final ExecutorService executorService, final long failoverTimeMillis, final long epsilonMillis, + final int maxLeasesForWorker, final int maxLeasesToStealAtOneTime, final int maxLeaseRenewalThreads, + final boolean cleanupLeasesUponShardCompletion, final boolean ignoreUnexpectedChildShards, + final long shardSyncIntervalMillis, final boolean consistentReads, final long listShardsBackoffTimeMillis, + final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload, + final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus, + final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity, + final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback, + Duration dynamoDbRequestTimeout, BillingMode billingMode, LeaseSerializer leaseSerializer) { this.kinesisClient = kinesisClient; - this.streamName = streamName; this.dynamoDBClient = dynamoDBClient; this.tableName = tableName; this.workerIdentifier = workerIdentifier; this.executorService = executorService; - this.initialPositionInStream = initialPositionInStream; this.failoverTimeMillis = failoverTimeMillis; this.epsilonMillis = epsilonMillis; this.maxLeasesForWorker = maxLeasesForWorker; @@ -348,6 +426,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { this.tableCreatorCallback = tableCreatorCallback; this.dynamoDbRequestTimeout = dynamoDbRequestTimeout; this.billingMode = billingMode; + this.leaseSerializer = leaseSerializer; } @Override @@ -364,11 +443,24 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { metricsFactory); } - @Override + @Override @Deprecated public ShardSyncTaskManager createShardSyncTaskManager(@NonNull final MetricsFactory metricsFactory) { return new ShardSyncTaskManager(this.createShardDetector(), this.createLeaseRefresher(), - initialPositionInStream, + streamConfig.initialPositionInStreamExtended(), + cleanupLeasesUponShardCompletion, + ignoreUnexpectedChildShards, + shardSyncIntervalMillis, + executorService, + hierarchicalShardSyncer, + metricsFactory); + } + + @Override + public ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFactory, StreamConfig streamConfig) { + return new ShardSyncTaskManager(this.createShardDetector(streamConfig), + this.createLeaseRefresher(), + streamConfig.initialPositionInStreamExtended(), cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, shardSyncIntervalMillis, @@ -379,13 +471,20 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { @Override public DynamoDBLeaseRefresher createLeaseRefresher() { - return new DynamoDBLeaseRefresher(tableName, dynamoDBClient, new DynamoDBLeaseSerializer(), consistentReads, + return new DynamoDBLeaseRefresher(tableName, dynamoDBClient, leaseSerializer, consistentReads, tableCreatorCallback, dynamoDbRequestTimeout, billingMode); } - @Override + @Override @Deprecated public ShardDetector createShardDetector() { - return new KinesisShardDetector(kinesisClient, streamName, listShardsBackoffTimeMillis, + return new KinesisShardDetector(kinesisClient, streamConfig.streamName(), + listShardsBackoffTimeMillis, maxListShardsRetryAttempts, listShardsCacheAllowedAgeInSeconds, + maxCacheMissesBeforeReload, cacheMissWarningModulus, dynamoDbRequestTimeout); + } + + @Override + public ShardDetector createShardDetector(StreamConfig streamConfig) { + return new KinesisShardDetector(kinesisClient, streamConfig.streamName(), listShardsBackoffTimeMillis, maxListShardsRetryAttempts, listShardsCacheAllowedAgeInSeconds, maxCacheMissesBeforeReload, cacheMissWarningModulus, dynamoDbRequestTimeout); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java index b97738ca..a02e2a6e 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java @@ -80,28 +80,32 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer { @Override public Lease fromDynamoRecord(final Map dynamoRecord) { - Lease result = new Lease(); - result.leaseKey(DynamoUtils.safeGetString(dynamoRecord, LEASE_KEY_KEY)); - result.leaseOwner(DynamoUtils.safeGetString(dynamoRecord, LEASE_OWNER_KEY)); - result.leaseCounter(DynamoUtils.safeGetLong(dynamoRecord, LEASE_COUNTER_KEY)); + final Lease result = new Lease(); + return fromDynamoRecord(dynamoRecord, result); + } - result.ownerSwitchesSinceCheckpoint(DynamoUtils.safeGetLong(dynamoRecord, OWNER_SWITCHES_KEY)); - result.checkpoint( + @Override + public Lease fromDynamoRecord(Map dynamoRecord, Lease leaseToUpdate) { + leaseToUpdate.leaseKey(DynamoUtils.safeGetString(dynamoRecord, LEASE_KEY_KEY)); + leaseToUpdate.leaseOwner(DynamoUtils.safeGetString(dynamoRecord, LEASE_OWNER_KEY)); + leaseToUpdate.leaseCounter(DynamoUtils.safeGetLong(dynamoRecord, LEASE_COUNTER_KEY)); + + leaseToUpdate.ownerSwitchesSinceCheckpoint(DynamoUtils.safeGetLong(dynamoRecord, OWNER_SWITCHES_KEY)); + leaseToUpdate.checkpoint( new ExtendedSequenceNumber( DynamoUtils.safeGetString(dynamoRecord, CHECKPOINT_SEQUENCE_NUMBER_KEY), DynamoUtils.safeGetLong(dynamoRecord, CHECKPOINT_SUBSEQUENCE_NUMBER_KEY)) ); - result.parentShardIds(DynamoUtils.safeGetSS(dynamoRecord, PARENT_SHARD_ID_KEY)); + leaseToUpdate.parentShardIds(DynamoUtils.safeGetSS(dynamoRecord, PARENT_SHARD_ID_KEY)); if (!Strings.isNullOrEmpty(DynamoUtils.safeGetString(dynamoRecord, PENDING_CHECKPOINT_SEQUENCE_KEY))) { - result.pendingCheckpoint( + leaseToUpdate.pendingCheckpoint( new ExtendedSequenceNumber( DynamoUtils.safeGetString(dynamoRecord, PENDING_CHECKPOINT_SEQUENCE_KEY), DynamoUtils.safeGetLong(dynamoRecord, PENDING_CHECKPOINT_SUBSEQUENCE_KEY)) ); } - - return result; + return leaseToUpdate; } @Override @@ -198,7 +202,7 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer { return result; } - private AttributeValueUpdate putUpdate(AttributeValue attributeValue) { + protected AttributeValueUpdate putUpdate(AttributeValue attributeValue) { return AttributeValueUpdate.builder().value(attributeValue).action(AttributeAction.PUT).build(); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBMultiStreamLeaseSerializer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBMultiStreamLeaseSerializer.java new file mode 100644 index 00000000..7525273f --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBMultiStreamLeaseSerializer.java @@ -0,0 +1,47 @@ +package software.amazon.kinesis.leases.dynamodb; + +import lombok.NoArgsConstructor; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; +import software.amazon.awssdk.services.dynamodb.model.AttributeValueUpdate; +import software.amazon.kinesis.leases.DynamoUtils; +import software.amazon.kinesis.leases.Lease; +import software.amazon.kinesis.leases.MultiStreamLease; + +import java.util.Map; + +import static software.amazon.kinesis.leases.MultiStreamLease.validateAndCast; + +@NoArgsConstructor +public class DynamoDBMultiStreamLeaseSerializer extends DynamoDBLeaseSerializer { + + private static final String STREAM_NAME_KEY = "streamName"; + private static final String SHARD_ID_KEY = "shardId"; + + @Override + public Map toDynamoRecord(Lease lease) { + final MultiStreamLease multiStreamLease = validateAndCast(lease); + final Map result = super.toDynamoRecord(multiStreamLease); + result.put(STREAM_NAME_KEY, DynamoUtils.createAttributeValue(multiStreamLease.streamName())); + result.put(SHARD_ID_KEY, DynamoUtils.createAttributeValue(multiStreamLease.shardId())); + return result; + } + + @Override + public MultiStreamLease fromDynamoRecord(Map dynamoRecord) { + final MultiStreamLease multiStreamLease = (MultiStreamLease) super + .fromDynamoRecord(dynamoRecord, new MultiStreamLease()); + multiStreamLease.streamName(DynamoUtils.safeGetString(dynamoRecord, STREAM_NAME_KEY)); + multiStreamLease.shardId(DynamoUtils.safeGetString(dynamoRecord, SHARD_ID_KEY)); + return multiStreamLease; + } + + + @Override + public Map getDynamoUpdateLeaseUpdate(Lease lease) { + final MultiStreamLease multiStreamLease = validateAndCast(lease); + final Map result = super.getDynamoUpdateLeaseUpdate(multiStreamLease); + result.put(STREAM_NAME_KEY, putUpdate(DynamoUtils.createAttributeValue(multiStreamLease.streamName()))); + result.put(SHARD_ID_KEY, putUpdate(DynamoUtils.createAttributeValue(multiStreamLease.shardId()))); + return result; + } +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/Checkpointer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/Checkpointer.java index d3ecebc1..70cdd608 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/Checkpointer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/Checkpointer.java @@ -27,50 +27,50 @@ public interface Checkpointer { * Record a checkpoint for a shard (e.g. sequence and subsequence numbers of last record processed * by application). Upon failover, record processing is resumed from this point. * - * @param shardId Checkpoint is specified for this shard. + * @param leaseKey Checkpoint is specified for this shard. * @param checkpointValue Value of the checkpoint (e.g. Kinesis sequence number and subsequence number) * @param concurrencyToken Used with conditional writes to prevent stale updates * (e.g. if there was a fail over to a different record processor, we don't want to * overwrite it's checkpoint) * @throws KinesisClientLibException Thrown if we were unable to save the checkpoint */ - void setCheckpoint(String shardId, ExtendedSequenceNumber checkpointValue, String concurrencyToken) + void setCheckpoint(String leaseKey, ExtendedSequenceNumber checkpointValue, String concurrencyToken) throws KinesisClientLibException; /** * Get the current checkpoint stored for the specified shard. Useful for checking that the parent shard * has been completely processed before we start processing the child shard. * - * @param shardId Current checkpoint for this shard is fetched + * @param leaseKey Current checkpoint for this shard is fetched * @return Current checkpoint for this shard, null if there is no record for this shard. * @throws KinesisClientLibException Thrown if we are unable to fetch the checkpoint */ - ExtendedSequenceNumber getCheckpoint(String shardId) throws KinesisClientLibException; + ExtendedSequenceNumber getCheckpoint(String leaseKey) throws KinesisClientLibException; /** * Get the current checkpoint stored for the specified shard, which holds the sequence numbers for the checkpoint * and pending checkpoint. Useful for checking that the parent shard has been completely processed before we start * processing the child shard. * - * @param shardId Current checkpoint for this shard is fetched + * @param leaseKey Current checkpoint for this shard is fetched * @return Current checkpoint object for this shard, null if there is no record for this shard. * @throws KinesisClientLibException Thrown if we are unable to fetch the checkpoint */ - Checkpoint getCheckpointObject(String shardId) throws KinesisClientLibException; + Checkpoint getCheckpointObject(String leaseKey) throws KinesisClientLibException; /** * Record intent to checkpoint for a shard. Upon failover, the pendingCheckpointValue will be passed to the new * ShardRecordProcessor's initialize() method. * - * @param shardId Checkpoint is specified for this shard. + * @param leaseKey Checkpoint is specified for this shard. * @param pendingCheckpoint Value of the pending checkpoint (e.g. Kinesis sequence number and subsequence number) * @param concurrencyToken Used with conditional writes to prevent stale updates * (e.g. if there was a fail over to a different record processor, we don't want to * overwrite it's checkpoint) * @throws KinesisClientLibException Thrown if we were unable to save the checkpoint */ - void prepareCheckpoint(String shardId, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken) + void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken) throws KinesisClientLibException; void operation(String operation); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/MultiStreamTracker.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/MultiStreamTracker.java new file mode 100644 index 00000000..ac9e053b --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/MultiStreamTracker.java @@ -0,0 +1,27 @@ +package software.amazon.kinesis.processor; + +import software.amazon.kinesis.common.InitialPositionInStreamExtended; + +import java.util.List; + +/** + * Interface for stream trackers. This is useful for KCL Workers that need + * to consume data from multiple streams. + */ +public interface MultiStreamTracker { + + /** + * Returns the list of streams that the Worker should consume data from. + * + * @return List of stream names + */ + List listStreamsToProcess(); + + /** + * Returns the initial position in stream to read from, for the given stream. + * @param streamName + * @return Initial position to read from, for the given stream + */ + InitialPositionInStreamExtended initialPositionInStreamExtended(String streamName); + +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/ShardRecordProcessorFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/ShardRecordProcessorFactory.java index b0559120..973b0393 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/ShardRecordProcessorFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/ShardRecordProcessorFactory.java @@ -24,5 +24,15 @@ public interface ShardRecordProcessorFactory { * * @return */ - ShardRecordProcessor shardRecordProcessor(); + // TODO : Halo : Reenable +// ShardRecordProcessor shardRecordProcessor(); + + /** + * Returns a new instance of the ShardRecordProcessor for a stream + * @param streamName + * @return ShardRecordProcessor + */ + default ShardRecordProcessor shardRecordProcessor(String streamName) { + throw new UnsupportedOperationException(); + } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java index cfd19654..10b23641 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java @@ -21,6 +21,7 @@ import lombok.experimental.Accessors; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.processor.MultiStreamTracker; import software.amazon.kinesis.retrieval.fanout.FanOutConfig; /** @@ -51,6 +52,11 @@ public class RetrievalConfig { @NonNull private final String applicationName; + /** + * StreamTracker for multi streaming support + */ + private MultiStreamTracker multiStreamTracker; + /** * Backoff time between consecutive ListShards calls. * diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/InMemoryCheckpointer.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/InMemoryCheckpointer.java index b8de6a1b..ebe933b9 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/InMemoryCheckpointer.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/InMemoryCheckpointer.java @@ -39,14 +39,14 @@ public class InMemoryCheckpointer implements Checkpointer { * {@inheritDoc} */ @Override - public void setCheckpoint(String shardId, ExtendedSequenceNumber checkpointValue, String concurrencyToken) + public void setCheckpoint(String leaseKey, ExtendedSequenceNumber checkpointValue, String concurrencyToken) throws KinesisClientLibException { - checkpoints.put(shardId, checkpointValue); - flushpoints.put(shardId, checkpointValue); - pendingCheckpoints.remove(shardId); + checkpoints.put(leaseKey, checkpointValue); + flushpoints.put(leaseKey, checkpointValue); + pendingCheckpoints.remove(leaseKey); if (log.isDebugEnabled()) { - log.debug("shardId: {} checkpoint: {}", shardId, checkpointValue); + log.debug("shardId: {} checkpoint: {}", leaseKey, checkpointValue); } } @@ -55,25 +55,25 @@ public class InMemoryCheckpointer implements Checkpointer { * {@inheritDoc} */ @Override - public ExtendedSequenceNumber getCheckpoint(String shardId) throws KinesisClientLibException { - ExtendedSequenceNumber checkpoint = flushpoints.get(shardId); - log.debug("checkpoint shardId: {} checkpoint: {}", shardId, checkpoint); + public ExtendedSequenceNumber getCheckpoint(String leaseKey) throws KinesisClientLibException { + ExtendedSequenceNumber checkpoint = flushpoints.get(leaseKey); + log.debug("checkpoint shardId: {} checkpoint: {}", leaseKey, checkpoint); return checkpoint; } @Override - public void prepareCheckpoint(String shardId, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken) + public void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken) throws KinesisClientLibException { - pendingCheckpoints.put(shardId, pendingCheckpoint); + pendingCheckpoints.put(leaseKey, pendingCheckpoint); } @Override - public Checkpoint getCheckpointObject(String shardId) throws KinesisClientLibException { - ExtendedSequenceNumber checkpoint = flushpoints.get(shardId); - ExtendedSequenceNumber pendingCheckpoint = pendingCheckpoints.get(shardId); + public Checkpoint getCheckpointObject(String leaseKey) throws KinesisClientLibException { + ExtendedSequenceNumber checkpoint = flushpoints.get(leaseKey); + ExtendedSequenceNumber pendingCheckpoint = pendingCheckpoints.get(leaseKey); Checkpoint checkpointObj = new Checkpoint(checkpoint, pendingCheckpoint); - log.debug("getCheckpointObject shardId: {}, {}", shardId, checkpointObj); + log.debug("getCheckpointObject shardId: {}, {}", leaseKey, checkpointObj); return checkpointObj; }