checkpointing
This commit is contained in:
parent
d6e2e0b324
commit
fd0e96a5d1
4 changed files with 45 additions and 42 deletions
|
|
@ -16,8 +16,11 @@
|
||||||
package software.amazon.kinesis.coordinator;
|
package software.amazon.kinesis.coordinator;
|
||||||
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
@ -26,6 +29,8 @@ import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.function.Function;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
|
|
@ -39,9 +44,11 @@ import lombok.experimental.Accessors;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.apache.commons.lang3.Validate;
|
import org.apache.commons.lang3.Validate;
|
||||||
import software.amazon.awssdk.utils.CollectionUtils;
|
import software.amazon.awssdk.utils.CollectionUtils;
|
||||||
|
import software.amazon.awssdk.utils.Either;
|
||||||
import software.amazon.kinesis.checkpoint.CheckpointConfig;
|
import software.amazon.kinesis.checkpoint.CheckpointConfig;
|
||||||
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
|
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
|
||||||
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
|
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
|
||||||
|
import software.amazon.kinesis.common.StreamConfig;
|
||||||
import software.amazon.kinesis.leases.Lease;
|
import software.amazon.kinesis.leases.Lease;
|
||||||
import software.amazon.kinesis.leases.LeaseCoordinator;
|
import software.amazon.kinesis.leases.LeaseCoordinator;
|
||||||
import software.amazon.kinesis.leases.LeaseManagementConfig;
|
import software.amazon.kinesis.leases.LeaseManagementConfig;
|
||||||
|
|
@ -107,22 +114,21 @@ public class Scheduler implements Runnable {
|
||||||
private final DiagnosticEventHandler diagnosticEventHandler;
|
private final DiagnosticEventHandler diagnosticEventHandler;
|
||||||
// private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
|
// private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
|
||||||
private final LeaseCoordinator leaseCoordinator;
|
private final LeaseCoordinator leaseCoordinator;
|
||||||
// private final ShardSyncTaskManager shardSyncTaskManager;
|
private final Function<String, ShardSyncTaskManager> shardSyncTaskManagerProvider;
|
||||||
|
private final Map<String, ShardSyncTaskManager> streamToShardSyncTaskManagerMap = new HashMap<>();
|
||||||
private final ShardPrioritization shardPrioritization;
|
private final ShardPrioritization shardPrioritization;
|
||||||
private final boolean cleanupLeasesUponShardCompletion;
|
private final boolean cleanupLeasesUponShardCompletion;
|
||||||
private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist;
|
private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist;
|
||||||
private final GracefulShutdownCoordinator gracefulShutdownCoordinator;
|
private final GracefulShutdownCoordinator gracefulShutdownCoordinator;
|
||||||
private final WorkerStateChangeListener workerStateChangeListener;
|
private final WorkerStateChangeListener workerStateChangeListener;
|
||||||
private final InitialPositionInStreamExtended initialPosition;
|
|
||||||
private final MetricsFactory metricsFactory;
|
private final MetricsFactory metricsFactory;
|
||||||
private final long failoverTimeMillis;
|
private final long failoverTimeMillis;
|
||||||
private final long taskBackoffTimeMillis;
|
private final long taskBackoffTimeMillis;
|
||||||
private final List<String> listOfStreams;
|
private final Either<MultiStreamTracker, StreamConfig> applicationStreamTracker;
|
||||||
private final MultiStreamTracker multiStreamTracker;
|
|
||||||
private final long listShardsBackoffTimeMillis;
|
private final long listShardsBackoffTimeMillis;
|
||||||
private final int maxListShardsRetryAttempts;
|
private final int maxListShardsRetryAttempts;
|
||||||
private final LeaseRefresher leaseRefresher;
|
private final LeaseRefresher leaseRefresher;
|
||||||
// private final ShardDetector shardDetector;
|
private final Function<String, ShardDetector> shardDetectorProvider;
|
||||||
private final boolean ignoreUnexpetedChildShards;
|
private final boolean ignoreUnexpetedChildShards;
|
||||||
private final AggregatorUtil aggregatorUtil;
|
private final AggregatorUtil aggregatorUtil;
|
||||||
private final HierarchicalShardSyncer hierarchicalShardSyncer;
|
private final HierarchicalShardSyncer hierarchicalShardSyncer;
|
||||||
|
|
@ -178,15 +184,17 @@ public class Scheduler implements Runnable {
|
||||||
this.retrievalConfig = retrievalConfig;
|
this.retrievalConfig = retrievalConfig;
|
||||||
|
|
||||||
this.applicationName = this.coordinatorConfig.applicationName();
|
this.applicationName = this.coordinatorConfig.applicationName();
|
||||||
this.multiStreamTracker = this.retrievalConfig.multiStreamTracker();
|
final MultiStreamTracker multiStreamTracker = this.retrievalConfig.multiStreamTracker();
|
||||||
this.listOfStreams = this.multiStreamTracker == null ?
|
if(multiStreamTracker == null) {
|
||||||
ImmutableList.of(this.retrievalConfig.streamName()) :
|
this.applicationStreamTracker = Either.right(new StreamConfig(this.retrievalConfig.streamName(),
|
||||||
this.multiStreamTracker.listStreamsToProcess();
|
this.retrievalConfig.initialPositionInStreamExtended()));
|
||||||
Validate.isTrue(!CollectionUtils.isNullOrEmpty(this.listOfStreams), "No stream configured to process.");
|
} else {
|
||||||
|
this.applicationStreamTracker = Either.left(multiStreamTracker);
|
||||||
|
}
|
||||||
this.maxInitializationAttempts = this.coordinatorConfig.maxInitializationAttempts();
|
this.maxInitializationAttempts = this.coordinatorConfig.maxInitializationAttempts();
|
||||||
this.metricsFactory = this.metricsConfig.metricsFactory();
|
this.metricsFactory = this.metricsConfig.metricsFactory();
|
||||||
// Determine leaseSerializer based on MultiStreamTracker
|
// Determine leaseSerializer based on availability of MultiStreamTracker.
|
||||||
final LeaseSerializer leaseSerializer = this.multiStreamTracker == null ?
|
final LeaseSerializer leaseSerializer = this.applicationStreamTracker.map(mst -> true, sc -> false) ?
|
||||||
new DynamoDBMultiStreamLeaseSerializer() :
|
new DynamoDBMultiStreamLeaseSerializer() :
|
||||||
new DynamoDBLeaseSerializer();
|
new DynamoDBLeaseSerializer();
|
||||||
this.leaseCoordinator = this.leaseManagementConfig.leaseManagementFactory(leaseSerializer)
|
this.leaseCoordinator = this.leaseManagementConfig.leaseManagementFactory(leaseSerializer)
|
||||||
|
|
@ -207,9 +215,11 @@ public class Scheduler implements Runnable {
|
||||||
this.executorService = this.coordinatorConfig.coordinatorFactory().createExecutorService();
|
this.executorService = this.coordinatorConfig.coordinatorFactory().createExecutorService();
|
||||||
this.diagnosticEventFactory = diagnosticEventFactory;
|
this.diagnosticEventFactory = diagnosticEventFactory;
|
||||||
this.diagnosticEventHandler = new DiagnosticEventLogger();
|
this.diagnosticEventHandler = new DiagnosticEventLogger();
|
||||||
|
// TODO : Halo : Handle case of no StreamConfig present in streamConfigMap() for the supplied streamName.
|
||||||
// this.shardSyncTaskManager = this.leaseManagementConfig.leaseManagementFactory()
|
// TODO : Pass the immutable map here instead of using mst.streamConfigMap()
|
||||||
// .createShardSyncTaskManager(this.metricsFactory);
|
this.shardSyncTaskManagerProvider = streamName -> this.leaseManagementConfig
|
||||||
|
.leaseManagementFactory(leaseSerializer).createShardSyncTaskManager(this.metricsFactory,
|
||||||
|
applicationStreamTracker.map(mst -> mst.streamConfigMap().get(streamName), sc -> sc));
|
||||||
this.shardPrioritization = this.coordinatorConfig.shardPrioritization();
|
this.shardPrioritization = this.coordinatorConfig.shardPrioritization();
|
||||||
this.cleanupLeasesUponShardCompletion = this.leaseManagementConfig.cleanupLeasesUponShardCompletion();
|
this.cleanupLeasesUponShardCompletion = this.leaseManagementConfig.cleanupLeasesUponShardCompletion();
|
||||||
this.skipShardSyncAtWorkerInitializationIfLeasesExist =
|
this.skipShardSyncAtWorkerInitializationIfLeasesExist =
|
||||||
|
|
@ -226,14 +236,13 @@ public class Scheduler implements Runnable {
|
||||||
this.workerStateChangeListener = this.coordinatorConfig.coordinatorFactory()
|
this.workerStateChangeListener = this.coordinatorConfig.coordinatorFactory()
|
||||||
.createWorkerStateChangeListener();
|
.createWorkerStateChangeListener();
|
||||||
}
|
}
|
||||||
this.initialPosition = retrievalConfig.initialPositionInStreamExtended();
|
|
||||||
this.failoverTimeMillis = this.leaseManagementConfig.failoverTimeMillis();
|
this.failoverTimeMillis = this.leaseManagementConfig.failoverTimeMillis();
|
||||||
this.taskBackoffTimeMillis = this.lifecycleConfig.taskBackoffTimeMillis();
|
this.taskBackoffTimeMillis = this.lifecycleConfig.taskBackoffTimeMillis();
|
||||||
// this.retryGetRecordsInSeconds = this.retrievalConfig.retryGetRecordsInSeconds();
|
// this.retryGetRecordsInSeconds = this.retrievalConfig.retryGetRecordsInSeconds();
|
||||||
// this.maxGetRecordsThreadPool = this.retrievalConfig.maxGetRecordsThreadPool();
|
// this.maxGetRecordsThreadPool = this.retrievalConfig.maxGetRecordsThreadPool();
|
||||||
this.listShardsBackoffTimeMillis = this.retrievalConfig.listShardsBackoffTimeInMillis();
|
this.listShardsBackoffTimeMillis = this.retrievalConfig.listShardsBackoffTimeInMillis();
|
||||||
this.maxListShardsRetryAttempts = this.retrievalConfig.maxListShardsRetryAttempts();
|
this.maxListShardsRetryAttempts = this.retrievalConfig.maxListShardsRetryAttempts();
|
||||||
// this.shardDetector = this.shardSyncTaskManager.shardDetector();
|
this.shardDetectorProvider = streamName -> createOrGetShardSyncTaskManager(streamName).shardDetector();
|
||||||
this.ignoreUnexpetedChildShards = this.leaseManagementConfig.ignoreUnexpectedChildShards();
|
this.ignoreUnexpetedChildShards = this.leaseManagementConfig.ignoreUnexpectedChildShards();
|
||||||
this.aggregatorUtil = this.lifecycleConfig.aggregatorUtil();
|
this.aggregatorUtil = this.lifecycleConfig.aggregatorUtil();
|
||||||
this.hierarchicalShardSyncer = leaseManagementConfig.hierarchicalShardSyncer();
|
this.hierarchicalShardSyncer = leaseManagementConfig.hierarchicalShardSyncer();
|
||||||
|
|
@ -284,10 +293,14 @@ public class Scheduler implements Runnable {
|
||||||
if (!skipShardSyncAtWorkerInitializationIfLeasesExist || leaseRefresher.isLeaseTableEmpty()) {
|
if (!skipShardSyncAtWorkerInitializationIfLeasesExist || leaseRefresher.isLeaseTableEmpty()) {
|
||||||
// TODO: Resume the shard sync from failed stream in the next attempt, to avoid syncing
|
// TODO: Resume the shard sync from failed stream in the next attempt, to avoid syncing
|
||||||
// TODO: for already synced streams
|
// TODO: for already synced streams
|
||||||
for(String streamName : this.listOfStreams) {
|
final Map<String, StreamConfig> streamConfigMap = applicationStreamTracker
|
||||||
|
.map(mst -> mst.streamConfigMap(), sc -> Collections.singletonMap(sc.streamName(), sc));
|
||||||
|
for(String streamName : streamConfigMap.keySet().stream().collect(Collectors.toList())) {
|
||||||
log.info("Syncing Kinesis shard info");
|
log.info("Syncing Kinesis shard info");
|
||||||
ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetector, leaseRefresher,
|
final StreamConfig streamConfig = streamConfigMap.get(streamName);
|
||||||
initialPosition, cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L,
|
ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetectorProvider.apply(streamName),
|
||||||
|
leaseRefresher, streamConfig.initialPositionInStreamExtended(),
|
||||||
|
cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L,
|
||||||
hierarchicalShardSyncer, metricsFactory);
|
hierarchicalShardSyncer, metricsFactory);
|
||||||
result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call();
|
result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call();
|
||||||
// Throwing the exception, to prevent further syncs for other stream.
|
// Throwing the exception, to prevent further syncs for other stream.
|
||||||
|
|
@ -608,6 +621,10 @@ public class Scheduler implements Runnable {
|
||||||
return consumer;
|
return consumer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private ShardSyncTaskManager createOrGetShardSyncTaskManager(String streamName) {
|
||||||
|
return streamToShardSyncTaskManagerMap.computeIfAbsent(streamName, s -> shardSyncTaskManagerProvider.apply(s));
|
||||||
|
}
|
||||||
|
|
||||||
protected ShardConsumer buildConsumer(@NonNull final ShardInfo shardInfo,
|
protected ShardConsumer buildConsumer(@NonNull final ShardInfo shardInfo,
|
||||||
@NonNull final ShardRecordProcessorFactory shardRecordProcessorFactory) {
|
@NonNull final ShardRecordProcessorFactory shardRecordProcessorFactory) {
|
||||||
RecordsPublisher cache = retrievalConfig.retrievalFactory().createGetRecordsCache(shardInfo, metricsFactory);
|
RecordsPublisher cache = retrievalConfig.retrievalFactory().createGetRecordsCache(shardInfo, metricsFactory);
|
||||||
|
|
|
||||||
|
|
@ -18,6 +18,7 @@ import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
import org.apache.commons.lang3.builder.EqualsBuilder;
|
import org.apache.commons.lang3.builder.EqualsBuilder;
|
||||||
import org.apache.commons.lang3.builder.HashCodeBuilder;
|
import org.apache.commons.lang3.builder.HashCodeBuilder;
|
||||||
|
|
@ -36,7 +37,7 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
|
||||||
@ToString
|
@ToString
|
||||||
public class ShardInfo {
|
public class ShardInfo {
|
||||||
|
|
||||||
private final String streamName;
|
private final Optional<String> streamName;
|
||||||
private final String shardId;
|
private final String shardId;
|
||||||
private final String concurrencyToken;
|
private final String concurrencyToken;
|
||||||
// Sorted list of parent shardIds.
|
// Sorted list of parent shardIds.
|
||||||
|
|
@ -77,7 +78,7 @@ public class ShardInfo {
|
||||||
// This makes it easy to check for equality in ShardInfo.equals method.
|
// This makes it easy to check for equality in ShardInfo.equals method.
|
||||||
Collections.sort(this.parentShardIds);
|
Collections.sort(this.parentShardIds);
|
||||||
this.checkpoint = checkpoint;
|
this.checkpoint = checkpoint;
|
||||||
this.streamName = streamName;
|
this.streamName = Optional.ofNullable(streamName);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -379,13 +379,7 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
|
||||||
return leases.stream().map(DynamoDBLeaseCoordinator::convertLeaseToAssignment).collect(Collectors.toList());
|
return leases.stream().map(DynamoDBLeaseCoordinator::convertLeaseToAssignment).collect(Collectors.toList());
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO : Halo : Reenable for backward compatibility
|
// TODO : Halo : Check for better way
|
||||||
// public static ShardInfo convertLeaseToAssignment(final Lease lease) {
|
|
||||||
// return new ShardInfo(lease.leaseKey(), lease.concurrencyToken().toString(), lease.parentShardIds(),
|
|
||||||
// lease.checkpoint());
|
|
||||||
// }
|
|
||||||
|
|
||||||
// TODO : Support Shard
|
|
||||||
public static ShardInfo convertLeaseToAssignment(final Lease lease) {
|
public static ShardInfo convertLeaseToAssignment(final Lease lease) {
|
||||||
if (lease instanceof MultiStreamLease) {
|
if (lease instanceof MultiStreamLease) {
|
||||||
return new ShardInfo(((MultiStreamLease) lease).shardId(), lease.concurrencyToken().toString(), lease.parentShardIds(),
|
return new ShardInfo(((MultiStreamLease) lease).shardId(), lease.concurrencyToken().toString(), lease.parentShardIds(),
|
||||||
|
|
@ -394,7 +388,6 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
|
||||||
return new ShardInfo(lease.leaseKey(), lease.concurrencyToken().toString(), lease.parentShardIds(),
|
return new ShardInfo(lease.leaseKey(), lease.concurrencyToken().toString(), lease.parentShardIds(),
|
||||||
lease.checkpoint());
|
lease.checkpoint());
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -1,8 +1,8 @@
|
||||||
package software.amazon.kinesis.processor;
|
package software.amazon.kinesis.processor;
|
||||||
|
|
||||||
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
|
import software.amazon.kinesis.common.StreamConfig;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.Map;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Interface for stream trackers. This is useful for KCL Workers that need
|
* Interface for stream trackers. This is useful for KCL Workers that need
|
||||||
|
|
@ -11,17 +11,9 @@ import java.util.List;
|
||||||
public interface MultiStreamTracker {
|
public interface MultiStreamTracker {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the list of streams that the Worker should consume data from.
|
* Returns the map of streams and its associated stream specific config.
|
||||||
*
|
*
|
||||||
* @return List of stream names
|
* @return List of stream names
|
||||||
*/
|
*/
|
||||||
List<String> listStreamsToProcess();
|
Map<String, StreamConfig> streamConfigMap();
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns the initial position in stream to read from, for the given stream.
|
|
||||||
* @param streamName
|
|
||||||
* @return Initial position to read from, for the given stream
|
|
||||||
*/
|
|
||||||
InitialPositionInStreamExtended initialPositionInStreamExtended(String streamName);
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue