Checkpointing and retrieval fix

This commit is contained in:
Ashwin Giridharan 2020-02-20 01:55:31 -08:00
parent 9ad65ee486
commit 097559eca2
8 changed files with 40 additions and 23 deletions

View file

@ -60,7 +60,7 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi
public synchronized void checkpoint() public synchronized void checkpoint()
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException { throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("Checkpointing {}, token {} at largest permitted value {}", shardInfo.shardId(), log.debug("Checkpointing {}, token {} at largest permitted value {}", ShardInfo.getLeaseKey(shardInfo),
shardInfo.concurrencyToken(), this.largestPermittedCheckpointValue); shardInfo.concurrencyToken(), this.largestPermittedCheckpointValue);
} }
advancePosition(this.largestPermittedCheckpointValue); advancePosition(this.largestPermittedCheckpointValue);
@ -116,7 +116,7 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi
&& newCheckpoint.compareTo(largestPermittedCheckpointValue) <= 0) { && newCheckpoint.compareTo(largestPermittedCheckpointValue) <= 0) {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("Checkpointing {}, token {} at specific extended sequence number {}", shardInfo.shardId(), log.debug("Checkpointing {}, token {} at specific extended sequence number {}", ShardInfo.getLeaseKey(shardInfo),
shardInfo.concurrencyToken(), newCheckpoint); shardInfo.concurrencyToken(), newCheckpoint);
} }
this.advancePosition(newCheckpoint); this.advancePosition(newCheckpoint);
@ -189,7 +189,7 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("Preparing checkpoint {}, token {} at specific extended sequence number {}", log.debug("Preparing checkpoint {}, token {} at specific extended sequence number {}",
shardInfo.shardId(), shardInfo.concurrencyToken(), pendingCheckpoint); ShardInfo.getLeaseKey(shardInfo), shardInfo.concurrencyToken(), pendingCheckpoint);
} }
return doPrepareCheckpoint(pendingCheckpoint); return doPrepareCheckpoint(pendingCheckpoint);
} else { } else {
@ -252,10 +252,10 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi
if (extendedSequenceNumber != null && !extendedSequenceNumber.equals(lastCheckpointValue)) { if (extendedSequenceNumber != null && !extendedSequenceNumber.equals(lastCheckpointValue)) {
try { try {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("Setting {}, token {} checkpoint to {}", shardInfo.shardId(), log.debug("Setting {}, token {} checkpoint to {}", ShardInfo.getLeaseKey(shardInfo),
shardInfo.concurrencyToken(), checkpointToRecord); shardInfo.concurrencyToken(), checkpointToRecord);
} }
checkpointer.setCheckpoint(shardInfo.shardId(), checkpointToRecord, shardInfo.concurrencyToken()); checkpointer.setCheckpoint(ShardInfo.getLeaseKey(shardInfo), checkpointToRecord, shardInfo.concurrencyToken());
lastCheckpointValue = checkpointToRecord; lastCheckpointValue = checkpointToRecord;
} catch (ThrottlingException | ShutdownException | InvalidStateException } catch (ThrottlingException | ShutdownException | InvalidStateException
| KinesisClientLibDependencyException e) { | KinesisClientLibDependencyException e) {
@ -308,7 +308,7 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi
} }
try { try {
checkpointer.prepareCheckpoint(shardInfo.shardId(), newPrepareCheckpoint, shardInfo.concurrencyToken()); checkpointer.prepareCheckpoint(ShardInfo.getLeaseKey(shardInfo), newPrepareCheckpoint, shardInfo.concurrencyToken());
} catch (ThrottlingException | ShutdownException | InvalidStateException } catch (ThrottlingException | ShutdownException | InvalidStateException
| KinesisClientLibDependencyException e) { | KinesisClientLibDependencyException e) {
throw e; throw e;

View file

@ -367,8 +367,8 @@ public class Scheduler implements Runnable {
for (ShardInfo completedShard : completedShards) { for (ShardInfo completedShard : completedShards) {
final String streamName = completedShard.streamName() final String streamName = completedShard.streamName()
.orElseGet(() -> appStreamTracker.map(mst -> null, sc -> sc.streamName())); .orElseGet(() -> appStreamTracker.map(mst -> "", sc -> sc.streamName()));
Validate.notEmpty(streamName, "Stream name should not be null"); Validate.notEmpty(streamName, "Stream name should not be empty");
if (createOrGetShardSyncTaskManager(streamName).syncShardAndLeaseInfo()) { if (createOrGetShardSyncTaskManager(streamName).syncShardAndLeaseInfo()) {
log.info("Found completed shard, initiated new ShardSyncTak for " + completedShard.toString()); log.info("Found completed shard, initiated new ShardSyncTak for " + completedShard.toString());
} }
@ -640,7 +640,7 @@ public class Scheduler implements Runnable {
checkpoint); checkpoint);
// The only case where streamName is not available will be when multistreamtracker not set. In this case, // The only case where streamName is not available will be when multistreamtracker not set. In this case,
// get the default stream name for the single stream application. // get the default stream name for the single stream application.
final String streamName = shardInfo.streamName().orElseGet(() -> appStreamTracker.map(mst -> null, sc -> sc.streamName())); final String streamName = shardInfo.streamName().orElseGet(() -> appStreamTracker.map(mst -> "", sc -> sc.streamName()));
Validate.notEmpty(streamName, "StreamName should not be empty"); Validate.notEmpty(streamName, "StreamName should not be empty");
// Irrespective of single stream app or multi stream app, streamConfig should always be available. // Irrespective of single stream app or multi stream app, streamConfig should always be available.
// TODO: Halo : if not available, construct a default config ? // TODO: Halo : if not available, construct a default config ?

View file

@ -134,4 +134,15 @@ public class ShardInfo {
} }
/**
*
* @param shardInfo
* @return
*/
public static String getLeaseKey(ShardInfo shardInfo) {
return shardInfo.streamName().isPresent() ?
MultiStreamLease.getLeaseKey(shardInfo.streamName().get(), shardInfo.shardId()) :
shardInfo.shardId();
}
} }

View file

@ -21,6 +21,7 @@ import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.checkpoint.Checkpoint; import software.amazon.kinesis.checkpoint.Checkpoint;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.leases.MultiStreamLease;
import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.metrics.MetricsFactory;
@ -75,9 +76,10 @@ public class InitializeTask implements ConsumerTask {
try { try {
log.debug("Initializing ShardId {}", shardInfo); log.debug("Initializing ShardId {}", shardInfo);
Checkpoint initialCheckpointObject = checkpoint.getCheckpointObject(shardInfo.shardId()); final String leaseKey = ShardInfo.getLeaseKey(shardInfo);
Checkpoint initialCheckpointObject = checkpoint.getCheckpointObject(leaseKey);
ExtendedSequenceNumber initialCheckpoint = initialCheckpointObject.checkpoint(); ExtendedSequenceNumber initialCheckpoint = initialCheckpointObject.checkpoint();
log.debug("[{}]: Checkpoint: {} -- Initial Position: {}", shardInfo.shardId(), initialCheckpoint, log.debug("[{}]: Checkpoint: {} -- Initial Position: {}", leaseKey, initialCheckpoint,
initialPositionInStream); initialPositionInStream);
cache.start(initialCheckpoint, initialPositionInStream); cache.start(initialCheckpoint, initialPositionInStream);

View file

@ -82,15 +82,15 @@ public class FanOutConfig implements RetrievalSpecificConfig {
@Override @Override
public RetrievalFactory retrievalFactory() { public RetrievalFactory retrievalFactory() {
return new FanOutRetrievalFactory(kinesisClient, getOrCreateConsumerArn()); return new FanOutRetrievalFactory(kinesisClient, streamName, this::getOrCreateConsumerArn);
} }
private String getOrCreateConsumerArn() { private String getOrCreateConsumerArn(String streamName) {
if (consumerArn != null) { if (consumerArn != null) {
return consumerArn; return consumerArn;
} }
FanOutConsumerRegistration registration = createConsumerRegistration(); FanOutConsumerRegistration registration = createConsumerRegistration(streamName);
try { try {
return registration.getOrCreateStreamConsumerArn(); return registration.getOrCreateStreamConsumerArn();
} catch (DependencyException e) { } catch (DependencyException e) {
@ -98,10 +98,10 @@ public class FanOutConfig implements RetrievalSpecificConfig {
} }
} }
private FanOutConsumerRegistration createConsumerRegistration() { private FanOutConsumerRegistration createConsumerRegistration(String streamName) {
String consumerToCreate = ObjectUtils.firstNonNull(consumerName(), applicationName()); String consumerToCreate = ObjectUtils.firstNonNull(consumerName(), applicationName());
return createConsumerRegistration(kinesisClient(), return createConsumerRegistration(kinesisClient(),
Preconditions.checkNotNull(streamName(), "streamName must be set for consumer creation"), Preconditions.checkNotNull(streamName, "streamName must be set for consumer creation"),
Preconditions.checkNotNull(consumerToCreate, Preconditions.checkNotNull(consumerToCreate,
"applicationName or consumerName must be set for consumer creation")); "applicationName or consumerName must be set for consumer creation"));

View file

@ -25,12 +25,15 @@ import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RetrievalFactory; import software.amazon.kinesis.retrieval.RetrievalFactory;
import java.util.function.Function;
@RequiredArgsConstructor @RequiredArgsConstructor
@KinesisClientInternalApi @KinesisClientInternalApi
public class FanOutRetrievalFactory implements RetrievalFactory { public class FanOutRetrievalFactory implements RetrievalFactory {
private final KinesisAsyncClient kinesisClient; private final KinesisAsyncClient kinesisClient;
private final String consumerArn; private final String defaultStreamName;
private final Function<String, String> consumerArnProvider;
@Override @Override
public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(final ShardInfo shardInfo, public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(final ShardInfo shardInfo,
@ -41,6 +44,7 @@ public class FanOutRetrievalFactory implements RetrievalFactory {
@Override @Override
public RecordsPublisher createGetRecordsCache(@NonNull final ShardInfo shardInfo, public RecordsPublisher createGetRecordsCache(@NonNull final ShardInfo shardInfo,
final MetricsFactory metricsFactory) { final MetricsFactory metricsFactory) {
return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(), consumerArn); final String streamName = shardInfo.streamName().orElse(defaultStreamName);
return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(), consumerArnProvider.apply(streamName));
} }
} }

View file

@ -63,7 +63,7 @@ public class SynchronousBlockingRetrievalFactory implements RetrievalFactory {
public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(@NonNull final ShardInfo shardInfo, public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(@NonNull final ShardInfo shardInfo,
@NonNull final MetricsFactory metricsFactory) { @NonNull final MetricsFactory metricsFactory) {
return new SynchronousGetRecordsRetrievalStrategy( return new SynchronousGetRecordsRetrievalStrategy(
new KinesisDataFetcher(kinesisClient, streamName, shardInfo.shardId(), maxRecords, metricsFactory, kinesisRequestTimeout)); new KinesisDataFetcher(kinesisClient, shardInfo.streamName().orElse(streamName), shardInfo.shardId(), maxRecords, metricsFactory, kinesisRequestTimeout));
} }
@Override @Override

View file

@ -65,11 +65,11 @@ public class SynchronousPrefetchingRetrievalFactory implements RetrievalFactory
this.maxFutureWait = maxFutureWait; this.maxFutureWait = maxFutureWait;
} }
@Override @Override public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(@NonNull final ShardInfo shardInfo,
public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(@NonNull final ShardInfo shardInfo,
@NonNull final MetricsFactory metricsFactory) { @NonNull final MetricsFactory metricsFactory) {
return new SynchronousGetRecordsRetrievalStrategy(new KinesisDataFetcher(kinesisClient, streamName, return new SynchronousGetRecordsRetrievalStrategy(
shardInfo.shardId(), maxRecords, metricsFactory, maxFutureWait)); new KinesisDataFetcher(kinesisClient, shardInfo.streamName().orElse(streamName), shardInfo.shardId(),
maxRecords, metricsFactory, maxFutureWait));
} }
@Override @Override