Checkpointing and retrieval fix
This commit is contained in:
parent
25436b8122
commit
0580b9b3f7
8 changed files with 40 additions and 23 deletions
|
|
@ -60,7 +60,7 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi
|
|||
public synchronized void checkpoint()
|
||||
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Checkpointing {}, token {} at largest permitted value {}", shardInfo.shardId(),
|
||||
log.debug("Checkpointing {}, token {} at largest permitted value {}", ShardInfo.getLeaseKey(shardInfo),
|
||||
shardInfo.concurrencyToken(), this.largestPermittedCheckpointValue);
|
||||
}
|
||||
advancePosition(this.largestPermittedCheckpointValue);
|
||||
|
|
@ -116,7 +116,7 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi
|
|||
&& newCheckpoint.compareTo(largestPermittedCheckpointValue) <= 0) {
|
||||
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Checkpointing {}, token {} at specific extended sequence number {}", shardInfo.shardId(),
|
||||
log.debug("Checkpointing {}, token {} at specific extended sequence number {}", ShardInfo.getLeaseKey(shardInfo),
|
||||
shardInfo.concurrencyToken(), newCheckpoint);
|
||||
}
|
||||
this.advancePosition(newCheckpoint);
|
||||
|
|
@ -189,7 +189,7 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi
|
|||
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Preparing checkpoint {}, token {} at specific extended sequence number {}",
|
||||
shardInfo.shardId(), shardInfo.concurrencyToken(), pendingCheckpoint);
|
||||
ShardInfo.getLeaseKey(shardInfo), shardInfo.concurrencyToken(), pendingCheckpoint);
|
||||
}
|
||||
return doPrepareCheckpoint(pendingCheckpoint);
|
||||
} else {
|
||||
|
|
@ -252,10 +252,10 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi
|
|||
if (extendedSequenceNumber != null && !extendedSequenceNumber.equals(lastCheckpointValue)) {
|
||||
try {
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Setting {}, token {} checkpoint to {}", shardInfo.shardId(),
|
||||
log.debug("Setting {}, token {} checkpoint to {}", ShardInfo.getLeaseKey(shardInfo),
|
||||
shardInfo.concurrencyToken(), checkpointToRecord);
|
||||
}
|
||||
checkpointer.setCheckpoint(shardInfo.shardId(), checkpointToRecord, shardInfo.concurrencyToken());
|
||||
checkpointer.setCheckpoint(ShardInfo.getLeaseKey(shardInfo), checkpointToRecord, shardInfo.concurrencyToken());
|
||||
lastCheckpointValue = checkpointToRecord;
|
||||
} catch (ThrottlingException | ShutdownException | InvalidStateException
|
||||
| KinesisClientLibDependencyException e) {
|
||||
|
|
@ -308,7 +308,7 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi
|
|||
}
|
||||
|
||||
try {
|
||||
checkpointer.prepareCheckpoint(shardInfo.shardId(), newPrepareCheckpoint, shardInfo.concurrencyToken());
|
||||
checkpointer.prepareCheckpoint(ShardInfo.getLeaseKey(shardInfo), newPrepareCheckpoint, shardInfo.concurrencyToken());
|
||||
} catch (ThrottlingException | ShutdownException | InvalidStateException
|
||||
| KinesisClientLibDependencyException e) {
|
||||
throw e;
|
||||
|
|
|
|||
|
|
@ -367,8 +367,8 @@ public class Scheduler implements Runnable {
|
|||
|
||||
for (ShardInfo completedShard : completedShards) {
|
||||
final String streamName = completedShard.streamName()
|
||||
.orElseGet(() -> appStreamTracker.map(mst -> null, sc -> sc.streamName()));
|
||||
Validate.notEmpty(streamName, "Stream name should not be null");
|
||||
.orElseGet(() -> appStreamTracker.map(mst -> "", sc -> sc.streamName()));
|
||||
Validate.notEmpty(streamName, "Stream name should not be empty");
|
||||
if (createOrGetShardSyncTaskManager(streamName).syncShardAndLeaseInfo()) {
|
||||
log.info("Found completed shard, initiated new ShardSyncTak for " + completedShard.toString());
|
||||
}
|
||||
|
|
@ -640,7 +640,7 @@ public class Scheduler implements Runnable {
|
|||
checkpoint);
|
||||
// The only case where streamName is not available will be when multistreamtracker not set. In this case,
|
||||
// get the default stream name for the single stream application.
|
||||
final String streamName = shardInfo.streamName().orElseGet(() -> appStreamTracker.map(mst -> null, sc -> sc.streamName()));
|
||||
final String streamName = shardInfo.streamName().orElseGet(() -> appStreamTracker.map(mst -> "", sc -> sc.streamName()));
|
||||
Validate.notEmpty(streamName, "StreamName should not be empty");
|
||||
// Irrespective of single stream app or multi stream app, streamConfig should always be available.
|
||||
// TODO: Halo : if not available, construct a default config ?
|
||||
|
|
|
|||
|
|
@ -134,4 +134,15 @@ public class ShardInfo {
|
|||
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param shardInfo
|
||||
* @return
|
||||
*/
|
||||
public static String getLeaseKey(ShardInfo shardInfo) {
|
||||
return shardInfo.streamName().isPresent() ?
|
||||
MultiStreamLease.getLeaseKey(shardInfo.streamName().get(), shardInfo.shardId()) :
|
||||
shardInfo.shardId();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ import software.amazon.kinesis.annotations.KinesisClientInternalApi;
|
|||
import software.amazon.kinesis.checkpoint.Checkpoint;
|
||||
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
|
||||
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
|
||||
import software.amazon.kinesis.leases.MultiStreamLease;
|
||||
import software.amazon.kinesis.leases.ShardInfo;
|
||||
import software.amazon.kinesis.lifecycle.events.InitializationInput;
|
||||
import software.amazon.kinesis.metrics.MetricsFactory;
|
||||
|
|
@ -75,9 +76,10 @@ public class InitializeTask implements ConsumerTask {
|
|||
|
||||
try {
|
||||
log.debug("Initializing ShardId {}", shardInfo);
|
||||
Checkpoint initialCheckpointObject = checkpoint.getCheckpointObject(shardInfo.shardId());
|
||||
final String leaseKey = ShardInfo.getLeaseKey(shardInfo);
|
||||
Checkpoint initialCheckpointObject = checkpoint.getCheckpointObject(leaseKey);
|
||||
ExtendedSequenceNumber initialCheckpoint = initialCheckpointObject.checkpoint();
|
||||
log.debug("[{}]: Checkpoint: {} -- Initial Position: {}", shardInfo.shardId(), initialCheckpoint,
|
||||
log.debug("[{}]: Checkpoint: {} -- Initial Position: {}", leaseKey, initialCheckpoint,
|
||||
initialPositionInStream);
|
||||
|
||||
cache.start(initialCheckpoint, initialPositionInStream);
|
||||
|
|
|
|||
|
|
@ -82,15 +82,15 @@ public class FanOutConfig implements RetrievalSpecificConfig {
|
|||
|
||||
@Override
|
||||
public RetrievalFactory retrievalFactory() {
|
||||
return new FanOutRetrievalFactory(kinesisClient, getOrCreateConsumerArn());
|
||||
return new FanOutRetrievalFactory(kinesisClient, streamName, this::getOrCreateConsumerArn);
|
||||
}
|
||||
|
||||
private String getOrCreateConsumerArn() {
|
||||
private String getOrCreateConsumerArn(String streamName) {
|
||||
if (consumerArn != null) {
|
||||
return consumerArn;
|
||||
}
|
||||
|
||||
FanOutConsumerRegistration registration = createConsumerRegistration();
|
||||
FanOutConsumerRegistration registration = createConsumerRegistration(streamName);
|
||||
try {
|
||||
return registration.getOrCreateStreamConsumerArn();
|
||||
} catch (DependencyException e) {
|
||||
|
|
@ -98,10 +98,10 @@ public class FanOutConfig implements RetrievalSpecificConfig {
|
|||
}
|
||||
}
|
||||
|
||||
private FanOutConsumerRegistration createConsumerRegistration() {
|
||||
private FanOutConsumerRegistration createConsumerRegistration(String streamName) {
|
||||
String consumerToCreate = ObjectUtils.firstNonNull(consumerName(), applicationName());
|
||||
return createConsumerRegistration(kinesisClient(),
|
||||
Preconditions.checkNotNull(streamName(), "streamName must be set for consumer creation"),
|
||||
Preconditions.checkNotNull(streamName, "streamName must be set for consumer creation"),
|
||||
Preconditions.checkNotNull(consumerToCreate,
|
||||
"applicationName or consumerName must be set for consumer creation"));
|
||||
|
||||
|
|
|
|||
|
|
@ -25,12 +25,15 @@ import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
|
|||
import software.amazon.kinesis.retrieval.RecordsPublisher;
|
||||
import software.amazon.kinesis.retrieval.RetrievalFactory;
|
||||
|
||||
import java.util.function.Function;
|
||||
|
||||
@RequiredArgsConstructor
|
||||
@KinesisClientInternalApi
|
||||
public class FanOutRetrievalFactory implements RetrievalFactory {
|
||||
|
||||
private final KinesisAsyncClient kinesisClient;
|
||||
private final String consumerArn;
|
||||
private final String defaultStreamName;
|
||||
private final Function<String, String> consumerArnProvider;
|
||||
|
||||
@Override
|
||||
public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(final ShardInfo shardInfo,
|
||||
|
|
@ -41,6 +44,7 @@ public class FanOutRetrievalFactory implements RetrievalFactory {
|
|||
@Override
|
||||
public RecordsPublisher createGetRecordsCache(@NonNull final ShardInfo shardInfo,
|
||||
final MetricsFactory metricsFactory) {
|
||||
return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(), consumerArn);
|
||||
final String streamName = shardInfo.streamName().orElse(defaultStreamName);
|
||||
return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(), consumerArnProvider.apply(streamName));
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -63,7 +63,7 @@ public class SynchronousBlockingRetrievalFactory implements RetrievalFactory {
|
|||
public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(@NonNull final ShardInfo shardInfo,
|
||||
@NonNull final MetricsFactory metricsFactory) {
|
||||
return new SynchronousGetRecordsRetrievalStrategy(
|
||||
new KinesisDataFetcher(kinesisClient, streamName, shardInfo.shardId(), maxRecords, metricsFactory, kinesisRequestTimeout));
|
||||
new KinesisDataFetcher(kinesisClient, shardInfo.streamName().orElse(streamName), shardInfo.shardId(), maxRecords, metricsFactory, kinesisRequestTimeout));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -65,11 +65,11 @@ public class SynchronousPrefetchingRetrievalFactory implements RetrievalFactory
|
|||
this.maxFutureWait = maxFutureWait;
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(@NonNull final ShardInfo shardInfo,
|
||||
@Override public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(@NonNull final ShardInfo shardInfo,
|
||||
@NonNull final MetricsFactory metricsFactory) {
|
||||
return new SynchronousGetRecordsRetrievalStrategy(new KinesisDataFetcher(kinesisClient, streamName,
|
||||
shardInfo.shardId(), maxRecords, metricsFactory, maxFutureWait));
|
||||
return new SynchronousGetRecordsRetrievalStrategy(
|
||||
new KinesisDataFetcher(kinesisClient, shardInfo.streamName().orElse(streamName), shardInfo.shardId(),
|
||||
maxRecords, metricsFactory, maxFutureWait));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
Loading…
Reference in a new issue