From 0580b9b3f74ff2db36cb5ae2f5c48a6fc56b400b Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Thu, 20 Feb 2020 01:55:31 -0800 Subject: [PATCH] Checkpointing and retrieval fix --- .../checkpoint/ShardRecordProcessorCheckpointer.java | 12 ++++++------ .../amazon/kinesis/coordinator/Scheduler.java | 6 +++--- .../software/amazon/kinesis/leases/ShardInfo.java | 11 +++++++++++ .../amazon/kinesis/lifecycle/InitializeTask.java | 6 ++++-- .../kinesis/retrieval/fanout/FanOutConfig.java | 10 +++++----- .../retrieval/fanout/FanOutRetrievalFactory.java | 8 ++++++-- .../polling/SynchronousBlockingRetrievalFactory.java | 2 +- .../SynchronousPrefetchingRetrievalFactory.java | 8 ++++---- 8 files changed, 40 insertions(+), 23 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/ShardRecordProcessorCheckpointer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/ShardRecordProcessorCheckpointer.java index 4705d564..7d504bbb 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/ShardRecordProcessorCheckpointer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/ShardRecordProcessorCheckpointer.java @@ -60,7 +60,7 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi public synchronized void checkpoint() throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException { if (log.isDebugEnabled()) { - log.debug("Checkpointing {}, token {} at largest permitted value {}", shardInfo.shardId(), + log.debug("Checkpointing {}, token {} at largest permitted value {}", ShardInfo.getLeaseKey(shardInfo), shardInfo.concurrencyToken(), this.largestPermittedCheckpointValue); } advancePosition(this.largestPermittedCheckpointValue); @@ -116,7 +116,7 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi && newCheckpoint.compareTo(largestPermittedCheckpointValue) <= 0) { if (log.isDebugEnabled()) { - log.debug("Checkpointing {}, token {} at specific extended sequence number {}", shardInfo.shardId(), + log.debug("Checkpointing {}, token {} at specific extended sequence number {}", ShardInfo.getLeaseKey(shardInfo), shardInfo.concurrencyToken(), newCheckpoint); } this.advancePosition(newCheckpoint); @@ -189,7 +189,7 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi if (log.isDebugEnabled()) { log.debug("Preparing checkpoint {}, token {} at specific extended sequence number {}", - shardInfo.shardId(), shardInfo.concurrencyToken(), pendingCheckpoint); + ShardInfo.getLeaseKey(shardInfo), shardInfo.concurrencyToken(), pendingCheckpoint); } return doPrepareCheckpoint(pendingCheckpoint); } else { @@ -252,10 +252,10 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi if (extendedSequenceNumber != null && !extendedSequenceNumber.equals(lastCheckpointValue)) { try { if (log.isDebugEnabled()) { - log.debug("Setting {}, token {} checkpoint to {}", shardInfo.shardId(), + log.debug("Setting {}, token {} checkpoint to {}", ShardInfo.getLeaseKey(shardInfo), shardInfo.concurrencyToken(), checkpointToRecord); } - checkpointer.setCheckpoint(shardInfo.shardId(), checkpointToRecord, shardInfo.concurrencyToken()); + checkpointer.setCheckpoint(ShardInfo.getLeaseKey(shardInfo), checkpointToRecord, shardInfo.concurrencyToken()); lastCheckpointValue = checkpointToRecord; } catch (ThrottlingException | ShutdownException | InvalidStateException | KinesisClientLibDependencyException e) { @@ -308,7 +308,7 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi } try { - checkpointer.prepareCheckpoint(shardInfo.shardId(), newPrepareCheckpoint, shardInfo.concurrencyToken()); + checkpointer.prepareCheckpoint(ShardInfo.getLeaseKey(shardInfo), newPrepareCheckpoint, shardInfo.concurrencyToken()); } catch (ThrottlingException | ShutdownException | InvalidStateException | KinesisClientLibDependencyException e) { throw e; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index 0ba40d81..547feebb 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -367,8 +367,8 @@ public class Scheduler implements Runnable { for (ShardInfo completedShard : completedShards) { final String streamName = completedShard.streamName() - .orElseGet(() -> appStreamTracker.map(mst -> null, sc -> sc.streamName())); - Validate.notEmpty(streamName, "Stream name should not be null"); + .orElseGet(() -> appStreamTracker.map(mst -> "", sc -> sc.streamName())); + Validate.notEmpty(streamName, "Stream name should not be empty"); if (createOrGetShardSyncTaskManager(streamName).syncShardAndLeaseInfo()) { log.info("Found completed shard, initiated new ShardSyncTak for " + completedShard.toString()); } @@ -640,7 +640,7 @@ public class Scheduler implements Runnable { checkpoint); // The only case where streamName is not available will be when multistreamtracker not set. In this case, // get the default stream name for the single stream application. - final String streamName = shardInfo.streamName().orElseGet(() -> appStreamTracker.map(mst -> null, sc -> sc.streamName())); + final String streamName = shardInfo.streamName().orElseGet(() -> appStreamTracker.map(mst -> "", sc -> sc.streamName())); Validate.notEmpty(streamName, "StreamName should not be empty"); // Irrespective of single stream app or multi stream app, streamConfig should always be available. // TODO: Halo : if not available, construct a default config ? diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardInfo.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardInfo.java index 2d51c0bc..0f86efb2 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardInfo.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardInfo.java @@ -134,4 +134,15 @@ public class ShardInfo { } + /** + * + * @param shardInfo + * @return + */ + public static String getLeaseKey(ShardInfo shardInfo) { + return shardInfo.streamName().isPresent() ? + MultiStreamLease.getLeaseKey(shardInfo.streamName().get(), shardInfo.shardId()) : + shardInfo.shardId(); + } + } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/InitializeTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/InitializeTask.java index fdb0e947..e11eebfa 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/InitializeTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/InitializeTask.java @@ -21,6 +21,7 @@ import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.checkpoint.Checkpoint; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.leases.MultiStreamLease; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.metrics.MetricsFactory; @@ -75,9 +76,10 @@ public class InitializeTask implements ConsumerTask { try { log.debug("Initializing ShardId {}", shardInfo); - Checkpoint initialCheckpointObject = checkpoint.getCheckpointObject(shardInfo.shardId()); + final String leaseKey = ShardInfo.getLeaseKey(shardInfo); + Checkpoint initialCheckpointObject = checkpoint.getCheckpointObject(leaseKey); ExtendedSequenceNumber initialCheckpoint = initialCheckpointObject.checkpoint(); - log.debug("[{}]: Checkpoint: {} -- Initial Position: {}", shardInfo.shardId(), initialCheckpoint, + log.debug("[{}]: Checkpoint: {} -- Initial Position: {}", leaseKey, initialCheckpoint, initialPositionInStream); cache.start(initialCheckpoint, initialPositionInStream); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutConfig.java index 661c2841..45679089 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutConfig.java @@ -82,15 +82,15 @@ public class FanOutConfig implements RetrievalSpecificConfig { @Override public RetrievalFactory retrievalFactory() { - return new FanOutRetrievalFactory(kinesisClient, getOrCreateConsumerArn()); + return new FanOutRetrievalFactory(kinesisClient, streamName, this::getOrCreateConsumerArn); } - private String getOrCreateConsumerArn() { + private String getOrCreateConsumerArn(String streamName) { if (consumerArn != null) { return consumerArn; } - FanOutConsumerRegistration registration = createConsumerRegistration(); + FanOutConsumerRegistration registration = createConsumerRegistration(streamName); try { return registration.getOrCreateStreamConsumerArn(); } catch (DependencyException e) { @@ -98,10 +98,10 @@ public class FanOutConfig implements RetrievalSpecificConfig { } } - private FanOutConsumerRegistration createConsumerRegistration() { + private FanOutConsumerRegistration createConsumerRegistration(String streamName) { String consumerToCreate = ObjectUtils.firstNonNull(consumerName(), applicationName()); return createConsumerRegistration(kinesisClient(), - Preconditions.checkNotNull(streamName(), "streamName must be set for consumer creation"), + Preconditions.checkNotNull(streamName, "streamName must be set for consumer creation"), Preconditions.checkNotNull(consumerToCreate, "applicationName or consumerName must be set for consumer creation")); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java index 4add0dab..e712c6db 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java @@ -25,12 +25,15 @@ import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RetrievalFactory; +import java.util.function.Function; + @RequiredArgsConstructor @KinesisClientInternalApi public class FanOutRetrievalFactory implements RetrievalFactory { private final KinesisAsyncClient kinesisClient; - private final String consumerArn; + private final String defaultStreamName; + private final Function consumerArnProvider; @Override public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(final ShardInfo shardInfo, @@ -41,6 +44,7 @@ public class FanOutRetrievalFactory implements RetrievalFactory { @Override public RecordsPublisher createGetRecordsCache(@NonNull final ShardInfo shardInfo, final MetricsFactory metricsFactory) { - return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(), consumerArn); + final String streamName = shardInfo.streamName().orElse(defaultStreamName); + return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(), consumerArnProvider.apply(streamName)); } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousBlockingRetrievalFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousBlockingRetrievalFactory.java index ac40c7d2..7405730e 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousBlockingRetrievalFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousBlockingRetrievalFactory.java @@ -63,7 +63,7 @@ public class SynchronousBlockingRetrievalFactory implements RetrievalFactory { public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(@NonNull final ShardInfo shardInfo, @NonNull final MetricsFactory metricsFactory) { return new SynchronousGetRecordsRetrievalStrategy( - new KinesisDataFetcher(kinesisClient, streamName, shardInfo.shardId(), maxRecords, metricsFactory, kinesisRequestTimeout)); + new KinesisDataFetcher(kinesisClient, shardInfo.streamName().orElse(streamName), shardInfo.shardId(), maxRecords, metricsFactory, kinesisRequestTimeout)); } @Override diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousPrefetchingRetrievalFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousPrefetchingRetrievalFactory.java index 320fe4dd..8b669893 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousPrefetchingRetrievalFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousPrefetchingRetrievalFactory.java @@ -65,11 +65,11 @@ public class SynchronousPrefetchingRetrievalFactory implements RetrievalFactory this.maxFutureWait = maxFutureWait; } - @Override - public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(@NonNull final ShardInfo shardInfo, + @Override public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(@NonNull final ShardInfo shardInfo, @NonNull final MetricsFactory metricsFactory) { - return new SynchronousGetRecordsRetrievalStrategy(new KinesisDataFetcher(kinesisClient, streamName, - shardInfo.shardId(), maxRecords, metricsFactory, maxFutureWait)); + return new SynchronousGetRecordsRetrievalStrategy( + new KinesisDataFetcher(kinesisClient, shardInfo.streamName().orElse(streamName), shardInfo.shardId(), + maxRecords, metricsFactory, maxFutureWait)); } @Override