MultiStream Sync and logging changes

This commit is contained in:
Ashwin Giridharan 2020-03-20 17:19:04 -07:00
parent 0285789a24
commit c00c943a79
14 changed files with 352 additions and 170 deletions

View file

@ -28,11 +28,11 @@ public class DiagnosticUtils {
/**
* Util for RecordPublisher to measure the event delivery latency of the executor service and take appropriate action.
* @param shardId of the shard that is having delayed delivery
* @param resourceIdentifier of the shard that is having delayed delivery
* @param enqueueTimestamp of the event submitted to the executor service
* @param log Slf4j Logger from RecordPublisher to log the events
*/
public static void takeDelayedDeliveryActionIfRequired(String shardId, Instant enqueueTimestamp, Logger log) {
public static void takeDelayedDeliveryActionIfRequired(String resourceIdentifier, Instant enqueueTimestamp, Logger log) {
final long durationBetweenEnqueueAndAckInMillis = Duration
.between(enqueueTimestamp, Instant.now()).toMillis();
if (durationBetweenEnqueueAndAckInMillis > MAX_TIME_BETWEEN_REQUEST_RESPONSE / 3) {
@ -41,9 +41,9 @@ public class DiagnosticUtils {
"{}: Record delivery time to shard consumer is high at {} millis. Check the ExecutorStateEvent logs"
+ " to see the state of the executor service. Also check if the RecordProcessor's processing "
+ "time is high. ",
shardId, durationBetweenEnqueueAndAckInMillis);
resourceIdentifier, durationBetweenEnqueueAndAckInMillis);
} else if (log.isDebugEnabled()) {
log.debug("{}: Record delivery time to shard consumer is {} millis", shardId,
log.debug("{}: Record delivery time to shard consumer is {} millis", resourceIdentifier,
durationBetweenEnqueueAndAckInMillis);
}
}

View file

@ -6,6 +6,7 @@ import lombok.experimental.Accessors;
@Value
@Accessors(fluent = true)
public class StreamConfig {
// TODO: Consider having streamIdentifier as the unique identifier of this class.
StreamIdentifier streamIdentifier;
InitialPositionInStreamExtended initialPositionInStreamExtended;
}

View file

@ -19,6 +19,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -35,6 +36,7 @@ import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import io.reactivex.plugins.RxJavaPlugins;
import lombok.AccessLevel;
import lombok.Getter;
@ -45,6 +47,8 @@ import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.utils.Validate;
import software.amazon.kinesis.checkpoint.CheckpointConfig;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.leases.Lease;
@ -52,6 +56,7 @@ import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.LeaseManagementConfig;
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.LeaseSerializer;
import software.amazon.kinesis.leases.MultiStreamLease;
import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.ShardPrioritization;
@ -61,7 +66,10 @@ import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseCoordinator;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseSerializer;
import software.amazon.kinesis.leases.dynamodb.DynamoDBMultiStreamLeaseSerializer;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.LeasingException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.lifecycle.LifecycleConfig;
import software.amazon.kinesis.lifecycle.ShardConsumer;
import software.amazon.kinesis.lifecycle.ShardConsumerArgument;
@ -74,6 +82,7 @@ import software.amazon.kinesis.metrics.MetricsCollectingTaskDecorator;
import software.amazon.kinesis.metrics.MetricsConfig;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.processor.Checkpointer;
import software.amazon.kinesis.processor.MultiStreamTracker;
import software.amazon.kinesis.processor.ProcessorConfig;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.processor.ShutdownNotificationAware;
@ -89,6 +98,8 @@ import software.amazon.kinesis.retrieval.RetrievalConfig;
@Slf4j
public class Scheduler implements Runnable {
private static final long NEW_STREAM_CHECK_INTERVAL_MILLIS = 10000L;
private SchedulerLog slog = new SchedulerLog();
private final CheckpointConfig checkpointConfig;
@ -111,8 +122,8 @@ public class Scheduler implements Runnable {
private final DiagnosticEventHandler diagnosticEventHandler;
// private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
private final LeaseCoordinator leaseCoordinator;
private final Function<StreamIdentifier, ShardSyncTaskManager> shardSyncTaskManagerProvider;
private final Map<StreamIdentifier, ShardSyncTaskManager> streamToShardSyncTaskManagerMap = new HashMap<>();
private final Function<StreamConfig, ShardSyncTaskManager> shardSyncTaskManagerProvider;
private final Map<StreamConfig, ShardSyncTaskManager> streamToShardSyncTaskManagerMap = new HashMap<>();
private final ShardPrioritization shardPrioritization;
private final boolean cleanupLeasesUponShardCompletion;
private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist;
@ -122,11 +133,13 @@ public class Scheduler implements Runnable {
private final long failoverTimeMillis;
private final long taskBackoffTimeMillis;
private final boolean isMultiStreamMode;
// TODO : halo : make sure we generate streamConfig if entry not present.
private final Map<StreamIdentifier, StreamConfig> currentStreamConfigMap;
private final MultiStreamTracker multiStreamTracker;
private final long listShardsBackoffTimeMillis;
private final int maxListShardsRetryAttempts;
private final LeaseRefresher leaseRefresher;
private final Function<StreamIdentifier, ShardDetector> shardDetectorProvider;
private final Function<StreamConfig, ShardDetector> shardDetectorProvider;
private final boolean ignoreUnexpetedChildShards;
private final AggregatorUtil aggregatorUtil;
private final HierarchicalShardSyncer hierarchicalShardSyncer;
@ -142,6 +155,9 @@ public class Scheduler implements Runnable {
private final Object lock = new Object();
private Stopwatch streamSyncWatch = Stopwatch.createUnstarted();
private boolean leasesSyncedOnAppInit = false;
/**
* Used to ensure that only one requestedShutdown is in progress at a time.
*/
@ -190,6 +206,9 @@ public class Scheduler implements Runnable {
.collect(Collectors.toMap(sc -> sc.streamIdentifier(), sc -> sc)),
streamConfig ->
Collections.singletonMap(streamConfig.streamIdentifier(), streamConfig));
this.multiStreamTracker = this.retrievalConfig.appStreamTracker().map(
multiStreamTracker -> multiStreamTracker,
streamConfig -> null);
this.maxInitializationAttempts = this.coordinatorConfig.maxInitializationAttempts();
this.metricsFactory = this.metricsConfig.metricsFactory();
// Determine leaseSerializer based on availability of MultiStreamTracker.
@ -217,9 +236,9 @@ public class Scheduler implements Runnable {
this.diagnosticEventHandler = new DiagnosticEventLogger();
// TODO : Halo : Handle case of no StreamConfig present in streamConfigList() for the supplied streamName.
// TODO : Pass the immutable map here instead of using mst.streamConfigList()
this.shardSyncTaskManagerProvider = streamIdentifier -> this.leaseManagementConfig
this.shardSyncTaskManagerProvider = streamConfig -> this.leaseManagementConfig
.leaseManagementFactory(leaseSerializer, isMultiStreamMode)
.createShardSyncTaskManager(this.metricsFactory, this.currentStreamConfigMap.get(streamIdentifier));
.createShardSyncTaskManager(this.metricsFactory, streamConfig);
this.shardPrioritization = this.coordinatorConfig.shardPrioritization();
this.cleanupLeasesUponShardCompletion = this.leaseManagementConfig.cleanupLeasesUponShardCompletion();
this.skipShardSyncAtWorkerInitializationIfLeasesExist =
@ -242,7 +261,7 @@ public class Scheduler implements Runnable {
// this.maxGetRecordsThreadPool = this.retrievalConfig.maxGetRecordsThreadPool();
this.listShardsBackoffTimeMillis = this.retrievalConfig.listShardsBackoffTimeInMillis();
this.maxListShardsRetryAttempts = this.retrievalConfig.maxListShardsRetryAttempts();
this.shardDetectorProvider = streamIdentifier -> createOrGetShardSyncTaskManager(streamIdentifier).shardDetector();
this.shardDetectorProvider = streamConfig -> createOrGetShardSyncTaskManager(streamConfig).shardDetector();
this.ignoreUnexpetedChildShards = this.leaseManagementConfig.ignoreUnexpectedChildShards();
this.aggregatorUtil = this.lifecycleConfig.aggregatorUtil();
// TODO : Halo : Check if this needs to be per stream.
@ -298,7 +317,7 @@ public class Scheduler implements Runnable {
final StreamIdentifier streamIdentifier = streamConfigEntry.getKey();
log.info("Syncing Kinesis shard info for " + streamIdentifier);
final StreamConfig streamConfig = streamConfigEntry.getValue();
ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetectorProvider.apply(streamIdentifier),
ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetectorProvider.apply(streamConfig),
leaseRefresher, streamConfig.initialPositionInStreamExtended(),
cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L,
hierarchicalShardSyncer, metricsFactory);
@ -321,6 +340,7 @@ public class Scheduler implements Runnable {
} else {
log.info("LeaseCoordinator is already running. No need to start it.");
}
streamSyncWatch.start();
isDone = true;
} catch (LeasingException e) {
log.error("Caught exception when initializing LeaseCoordinator", e);
@ -364,14 +384,20 @@ public class Scheduler implements Runnable {
for (ShardInfo completedShard : completedShards) {
final StreamIdentifier streamIdentifier = getStreamIdentifier(completedShard.streamIdentifierSerOpt());
if (createOrGetShardSyncTaskManager(streamIdentifier).syncShardAndLeaseInfo()) {
log.info("Found completed shard, initiated new ShardSyncTak for " + completedShard.toString());
final StreamConfig streamConfig = currentStreamConfigMap
.getOrDefault(streamIdentifier, getDefaultStreamConfig(streamIdentifier));
if (createOrGetShardSyncTaskManager(streamConfig).syncShardAndLeaseInfo()) {
log.info("{} : Found completed shard, initiated new ShardSyncTak for {} ",
streamIdentifier.serialize(), completedShard.toString());
}
}
// clean up shard consumers for unassigned shards
cleanupShardConsumers(assignedShards);
// check for new streams and sync with the scheduler state
checkAndSyncStreamShardsAndLeases();
logExecutorState();
slog.info("Sleeping ...");
Thread.sleep(shardConsumerDispatchPollIntervalMillis);
@ -387,6 +413,77 @@ public class Scheduler implements Runnable {
slog.resetInfoLogging();
}
/**
* Note: This method has package level access solely for testing purposes.
* Sync all streams method.
* @return streams that are being synced by this worker
*/
private Set<StreamIdentifier> checkAndSyncStreamShardsAndLeases()
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
final Set<StreamIdentifier> streamsSynced = new HashSet<>();
if (isMultiStreamMode && (streamSyncWatch.elapsed(TimeUnit.MILLISECONDS) > NEW_STREAM_CHECK_INTERVAL_MILLIS)) {
final Map<StreamIdentifier, StreamConfig> newStreamConfigMap = new HashMap<>();
// Making an immutable copy
newStreamConfigMap.putAll(multiStreamTracker.streamConfigList().stream()
.collect(Collectors.toMap(sc -> sc.streamIdentifier(), sc -> sc)));
// This is done to ensure that we clean up the stale streams lingering in the lease table.
syncStreamsFromLeaseTableOnAppInit();
for (StreamIdentifier streamIdentifier : newStreamConfigMap.keySet()) {
if (!currentStreamConfigMap.containsKey(streamIdentifier)) {
log.info("Found new stream to process: " + streamIdentifier + ". Syncing shards of that stream.");
ShardSyncTaskManager shardSyncTaskManager = createOrGetShardSyncTaskManager(newStreamConfigMap.get(streamIdentifier));
shardSyncTaskManager.syncShardAndLeaseInfo();
currentStreamConfigMap.put(streamIdentifier, newStreamConfigMap.get(streamIdentifier));
streamsSynced.add(streamIdentifier);
} else {
if (log.isDebugEnabled()) {
log.debug(streamIdentifier + " is already being processed - skipping shard sync.");
}
}
}
// TODO: Remove assumption that each Worker gets the full list of streams
Iterator<StreamIdentifier> currentStreamConfigIter = currentStreamConfigMap.keySet().iterator();
while (currentStreamConfigIter.hasNext()) {
StreamIdentifier streamIdentifier = currentStreamConfigIter.next();
if (!newStreamConfigMap.containsKey(streamIdentifier)) {
log.info("Found old/deleted stream: " + streamIdentifier + ". Syncing shards of that stream.");
ShardSyncTaskManager shardSyncTaskManager = createOrGetShardSyncTaskManager(currentStreamConfigMap.get(streamIdentifier));
shardSyncTaskManager.syncShardAndLeaseInfo();
currentStreamConfigIter.remove();
streamsSynced.add(streamIdentifier);
}
}
streamSyncWatch.reset().start();
}
return streamsSynced;
}
private Set<StreamIdentifier> syncStreamsFromLeaseTableOnAppInit()
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
if (!leasesSyncedOnAppInit && isMultiStreamMode) {
final Set<StreamIdentifier> streamIdentifiers = leaseCoordinator.leaseRefresher().listLeases().stream()
.map(lease -> StreamIdentifier.multiStreamInstance(((MultiStreamLease) lease).streamIdentifier()))
.collect(Collectors.toSet());
for (StreamIdentifier streamIdentifier : streamIdentifiers) {
if (!currentStreamConfigMap.containsKey(streamIdentifier)) {
currentStreamConfigMap.put(streamIdentifier, getDefaultStreamConfig(streamIdentifier));
}
}
leasesSyncedOnAppInit = true;
}
return Collections.emptySet();
}
// When a stream is no longer needed to be tracked, return a default StreamConfig with LATEST for faster shard end.
private StreamConfig getDefaultStreamConfig(StreamIdentifier streamIdentifier) {
return new StreamConfig(streamIdentifier, InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST));
}
/**
* Returns whether worker can shutdown immediately. Note that this method is called from Worker's {{@link #run()}
* method before every loop run, so method must do minimum amount of work to not impact shard processing timings.
@ -588,7 +685,8 @@ public class Scheduler implements Runnable {
if (!firstItem) {
builder.append(", ");
}
builder.append(shardInfo.shardId());
builder.append(shardInfo.streamIdentifierSerOpt().map(s -> s + ":" + shardInfo.shardId())
.orElse(shardInfo.shardId()));
firstItem = false;
}
slog.info("Current stream shard assignments: " + builder.toString());
@ -624,8 +722,8 @@ public class Scheduler implements Runnable {
return consumer;
}
private ShardSyncTaskManager createOrGetShardSyncTaskManager(StreamIdentifier streamIdentifier) {
return streamToShardSyncTaskManagerMap.computeIfAbsent(streamIdentifier, s -> shardSyncTaskManagerProvider.apply(s));
private ShardSyncTaskManager createOrGetShardSyncTaskManager(StreamConfig streamConfig) {
return streamToShardSyncTaskManagerMap.computeIfAbsent(streamConfig, s -> shardSyncTaskManagerProvider.apply(s));
}
protected ShardConsumer buildConsumer(@NonNull final ShardInfo shardInfo,
@ -637,8 +735,10 @@ public class Scheduler implements Runnable {
// get the default stream name for the single stream application.
final StreamIdentifier streamIdentifier = getStreamIdentifier(shardInfo.streamIdentifierSerOpt());
// Irrespective of single stream app or multi stream app, streamConfig should always be available.
final StreamConfig streamConfig = currentStreamConfigMap.get(streamIdentifier);
Validate.notNull(streamConfig, "StreamConfig should not be empty");
// If we have a shardInfo, that is not present in currentStreamConfigMap for whatever reason, then return default stream config
// to gracefully complete the reading.
final StreamConfig streamConfig = currentStreamConfigMap.getOrDefault(streamIdentifier, getDefaultStreamConfig(streamIdentifier));
Validate.notNull(streamConfig, "StreamConfig should not be null");
ShardConsumerArgument argument = new ShardConsumerArgument(shardInfo,
streamConfig.streamIdentifier(),
leaseCoordinator,
@ -657,7 +757,7 @@ public class Scheduler implements Runnable {
streamConfig.initialPositionInStreamExtended(),
cleanupLeasesUponShardCompletion,
ignoreUnexpetedChildShards,
shardDetectorProvider.apply(streamConfig.streamIdentifier()),
shardDetectorProvider.apply(streamConfig),
aggregatorUtil,
hierarchicalShardSyncer,
metricsFactory);
@ -682,7 +782,8 @@ public class Scheduler implements Runnable {
ShardConsumer consumer = shardInfoShardConsumerMap.get(shard);
if (consumer.leaseLost()) {
shardInfoShardConsumerMap.remove(shard);
log.debug("Removed consumer for {} as lease has been lost", shard.shardId());
log.debug("Removed consumer for {} as lease has been lost",
shard.streamIdentifierSerOpt().map(s -> s + ":" + shard.shardId()).orElse(shard.shardId()));
} else {
consumer.executeLifecycle();
}

View file

@ -24,6 +24,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Function;
@ -64,6 +65,8 @@ public class HierarchicalShardSyncer {
private final boolean isMultiStreamMode;
private String streamIdentifier = "";
public HierarchicalShardSyncer() {
isMultiStreamMode = false;
}
@ -99,6 +102,7 @@ public class HierarchicalShardSyncer {
final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards,
final MetricsScope scope) throws DependencyException, InvalidStateException,
ProvisionedThroughputException, KinesisClientLibIOException {
this.streamIdentifier = shardDetector.streamIdentifier().serialize();
final List<Shard> latestShards = getShardList(shardDetector);
checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards, scope, latestShards);
@ -110,8 +114,9 @@ public class HierarchicalShardSyncer {
final boolean ignoreUnexpectedChildShards, final MetricsScope scope, List<Shard> latestShards)
throws DependencyException, InvalidStateException,
ProvisionedThroughputException, KinesisClientLibIOException {
this.streamIdentifier = shardDetector.streamIdentifier().serialize();
if (!CollectionUtils.isNullOrEmpty(latestShards)) {
log.debug("Num shards: {}", latestShards.size());
log.debug("{} - Num shards: {}", streamIdentifier, latestShards.size());
}
final Map<String, Shard> shardIdToShardMap = constructShardIdToShardMap(latestShards);
@ -127,7 +132,7 @@ public class HierarchicalShardSyncer {
final MultiStreamArgs multiStreamArgs = new MultiStreamArgs(isMultiStreamMode, shardDetector.streamIdentifier());
final List<Lease> newLeasesToCreate = determineNewLeasesToCreate(latestShards, currentLeases, initialPosition,
inconsistentShardIds, multiStreamArgs);
log.debug("Num new leases to create: {}", newLeasesToCreate.size());
log.debug("{} - Num new leases to create: {}", streamIdentifier, newLeasesToCreate.size());
for (Lease lease : newLeasesToCreate) {
long startTime = System.currentTimeMillis();
boolean success = false;
@ -218,7 +223,7 @@ public class HierarchicalShardSyncer {
for (String shardId : shardIdsOfClosedShards) {
final Shard shard = shardIdToShardMap.get(shardId);
if (shard == null) {
log.info("Shard {} is not present in Kinesis anymore.", shardId);
log.info("{} : Shard {} is not present in Kinesis anymore.", streamIdentifier, shardId);
continue;
}
@ -360,25 +365,26 @@ public class HierarchicalShardSyncer {
final MultiStreamArgs multiStreamArgs) {
final Map<String, Lease> shardIdToNewLeaseMap = new HashMap<>();
final Map<String, Shard> shardIdToShardMapOfAllKinesisShards = constructShardIdToShardMap(shards);
final String streamIdentifier = Optional.ofNullable(multiStreamArgs.streamIdentifier())
.map(streamId -> streamId.serialize()).orElse("");
final Set<String> shardIdsOfCurrentLeases = currentLeases.stream()
.peek(lease -> log.debug("Existing lease: {}", lease))
.peek(lease -> log.debug("{} : Existing lease: {}", streamIdentifier, lease))
.map(lease -> shardIdFromLeaseDeducer.apply(lease, multiStreamArgs))
.collect(Collectors.toSet());
final List<Shard> openShards = getOpenShards(shards);
final List<Shard> openShards = getOpenShards(shards, streamIdentifier);
final Map<String, Boolean> memoizationContext = new HashMap<>();
// Iterate over the open shards and find those that don't have any lease entries.
for (Shard shard : openShards) {
final String shardId = shard.shardId();
log.debug("Evaluating leases for open shard {} and its ancestors.", shardId);
log.debug("{} : Evaluating leases for open shard {} and its ancestors.", streamIdentifier, shardId);
if (shardIdsOfCurrentLeases.contains(shardId)) {
log.debug("Lease for shardId {} already exists. Not creating a lease", shardId);
log.debug("{} : Lease for shardId {} already exists. Not creating a lease", streamIdentifier, shardId);
} else if (inconsistentShardIds.contains(shardId)) {
log.info("shardId {} is an inconsistent child. Not creating a lease", shardId);
log.info("{} : shardId {} is an inconsistent child. Not creating a lease", streamIdentifier, shardId);
} else {
log.debug("Need to create a lease for shardId {}", shardId);
log.debug("{} : Need to create a lease for shardId {}", streamIdentifier, shardId);
final Lease newLease = multiStreamArgs.isMultiStreamMode() ?
newKCLMultiStreamLease(shard, multiStreamArgs.streamIdentifier()) :
newKCLLease(shard);
@ -415,7 +421,7 @@ public class HierarchicalShardSyncer {
} else {
newLease.checkpoint(convertToCheckpoint(initialPosition));
}
log.debug("Set checkpoint of {} to {}", newLease.leaseKey(), newLease.checkpoint());
log.debug("{} : Set checkpoint of {} to {}", streamIdentifier, newLease.leaseKey(), newLease.checkpoint());
shardIdToNewLeaseMap.put(shardId, newLease);
}
}
@ -464,7 +470,7 @@ public class HierarchicalShardSyncer {
final Map<String, Shard> shardIdToShardMapOfAllKinesisShards,
final Map<String, Lease> shardIdToLeaseMapOfNewShards, final Map<String, Boolean> memoizationContext,
final MultiStreamArgs multiStreamArgs) {
final String streamIdentifier = getStreamIdentifier(multiStreamArgs);
final Boolean previousValue = memoizationContext.get(shardId);
if (previousValue != null) {
return previousValue;
@ -489,9 +495,9 @@ public class HierarchicalShardSyncer {
memoizationContext, multiStreamArgs)) {
isDescendant = true;
descendantParentShardIds.add(parentShardId);
log.debug("Parent shard {} is a descendant.", parentShardId);
log.debug("{} : Parent shard {} is a descendant.", streamIdentifier, parentShardId);
} else {
log.debug("Parent shard {} is NOT a descendant.", parentShardId);
log.debug("{} : Parent shard {} is NOT a descendant.", streamIdentifier, parentShardId);
}
}
@ -499,7 +505,7 @@ public class HierarchicalShardSyncer {
if (isDescendant) {
for (String parentShardId : parentShardIds) {
if (!shardIdsOfCurrentLeases.contains(parentShardId)) {
log.debug("Need to create a lease for shardId {}", parentShardId);
log.debug("{} : Need to create a lease for shardId {}", streamIdentifier, parentShardId);
Lease lease = shardIdToLeaseMapOfNewShards.get(parentShardId);
if (lease == null) {
lease = multiStreamArgs.isMultiStreamMode() ?
@ -593,6 +599,7 @@ public class HierarchicalShardSyncer {
final List<Lease> trackedLeases, final LeaseRefresher leaseRefresher,
final MultiStreamArgs multiStreamArgs) throws KinesisClientLibIOException,
DependencyException, InvalidStateException, ProvisionedThroughputException {
final String streamIdentifier = getStreamIdentifier(multiStreamArgs);
final Set<String> kinesisShards = shards.stream().map(Shard::shardId).collect(Collectors.toSet());
// Check if there are leases for non-existent shards
@ -600,14 +607,15 @@ public class HierarchicalShardSyncer {
.filter(lease -> isCandidateForCleanup(lease, kinesisShards, multiStreamArgs)).collect(Collectors.toList());
if (!CollectionUtils.isNullOrEmpty(garbageLeases)) {
log.info("Found {} candidate leases for cleanup. Refreshing list of"
+ " Kinesis shards to pick up recent/latest shards", garbageLeases.size());
log.info("{} : Found {} candidate leases for cleanup. Refreshing list of"
+ " Kinesis shards to pick up recent/latest shards", streamIdentifier, garbageLeases.size());
final Set<String> currentKinesisShardIds = getShardList(shardDetector).stream().map(Shard::shardId)
.collect(Collectors.toSet());
for (Lease lease : garbageLeases) {
if (isCandidateForCleanup(lease, currentKinesisShardIds, multiStreamArgs)) {
log.info("Deleting lease for shard {} as it is not present in Kinesis stream.", lease.leaseKey());
log.info("{} : Deleting lease for shard {} as it is not present in Kinesis stream.",
streamIdentifier, lease.leaseKey());
leaseRefresher.deleteLease(lease);
}
}
@ -627,14 +635,16 @@ public class HierarchicalShardSyncer {
static boolean isCandidateForCleanup(final Lease lease, final Set<String> currentKinesisShardIds,
final MultiStreamArgs multiStreamArgs)
throws KinesisClientLibIOException {
boolean isCandidateForCleanup = true;
final String streamIdentifier = getStreamIdentifier(multiStreamArgs);
boolean isCandidateForCleanup = true;
final String shardId = shardIdFromLeaseDeducer.apply(lease, multiStreamArgs);
if (currentKinesisShardIds.contains(shardId)) {
isCandidateForCleanup = false;
} else {
log.info("Found lease for non-existent shard: {}. Checking its parent shards", shardId);
log.info("{} : Found lease for non-existent shard: {}. Checking its parent shards", streamIdentifier, shardId);
final Set<String> parentShardIds = lease.parentShardIds();
for (String parentShardId : parentShardIds) {
@ -643,7 +653,7 @@ public class HierarchicalShardSyncer {
if (currentKinesisShardIds.contains(parentShardId)) {
final String message = String.format("Parent shard %s exists but not the child shard %s",
parentShardId, shardId);
log.info(message);
log.info("{} : {}", streamIdentifier, message);
throw new KinesisClientLibIOException(message);
}
}
@ -730,8 +740,8 @@ public class HierarchicalShardSyncer {
}
if (okayToDelete) {
log.info("Deleting lease for shard {} as it has been completely processed and processing of child "
+ "shards has begun.", shardIdFromLeaseDeducer.apply(leaseForClosedShard, multiStreamArgs));
log.info("{} : Deleting lease for shard {} as it has been completely processed and processing of child "
+ "shards has begun.", streamIdentifier, shardIdFromLeaseDeducer.apply(leaseForClosedShard, multiStreamArgs));
leaseRefresher.deleteLease(leaseForClosedShard);
}
}
@ -794,9 +804,9 @@ public class HierarchicalShardSyncer {
* @param allShards All shards returved via DescribeStream. We assume this to represent a consistent shard list.
* @return List of open shards (shards at the tip of the stream) - may include shards that are not yet active.
*/
static List<Shard> getOpenShards(final List<Shard> allShards) {
static List<Shard> getOpenShards(final List<Shard> allShards, final String streamIdentifier) {
return allShards.stream().filter(shard -> shard.sequenceNumberRange().endingSequenceNumber() == null)
.peek(shard -> log.debug("Found open shard: {}", shard.shardId())).collect(Collectors.toList());
.peek(shard -> log.debug("{} : Found open shard: {}", streamIdentifier, shard.shardId())).collect(Collectors.toList());
}
private static ExtendedSequenceNumber convertToCheckpoint(final InitialPositionInStreamExtended position) {
@ -812,6 +822,11 @@ public class HierarchicalShardSyncer {
return checkpoint;
}
private static String getStreamIdentifier(MultiStreamArgs multiStreamArgs) {
return Optional.ofNullable(multiStreamArgs.streamIdentifier())
.map(streamId -> streamId.serialize()).orElse("single_stream_mode");
}
/** Helper class to compare leases based on starting sequence number of the corresponding shards.
*

View file

@ -54,7 +54,8 @@ public class BlockOnParentShardTask implements ConsumerTask {
@Override
public TaskResult call() {
Exception exception = null;
final String shardInfoId = shardInfo.streamIdentifierSerOpt().map(s -> s + ":" + shardInfo.shardId())
.orElse(shardInfo.shardId());
try {
boolean blockedOnParentShard = false;
for (String shardId : shardInfo.parentShardIds()) {
@ -62,20 +63,20 @@ public class BlockOnParentShardTask implements ConsumerTask {
if (lease != null) {
ExtendedSequenceNumber checkpoint = lease.checkpoint();
if ((checkpoint == null) || (!checkpoint.equals(ExtendedSequenceNumber.SHARD_END))) {
log.debug("Shard {} is not yet done. Its current checkpoint is {}", shardId, checkpoint);
log.debug("Shard {} is not yet done. Its current checkpoint is {}", shardInfoId, checkpoint);
blockedOnParentShard = true;
exception = new BlockedOnParentShardException("Parent shard not yet done");
break;
} else {
log.debug("Shard {} has been completely processed.", shardId);
log.debug("Shard {} has been completely processed.", shardInfoId);
}
} else {
log.info("No lease found for shard {}. Not blocking on completion of this shard.", shardId);
log.info("No lease found for shard {}. Not blocking on completion of this shard.", shardInfoId);
}
}
if (!blockedOnParentShard) {
log.info("No need to block on parents {} of shard {}", shardInfo.parentShardIds(), shardInfo.shardId());
log.info("No need to block on parents {} of shard {}", shardInfo.parentShardIds(), shardInfoId);
return new TaskResult(null);
}
} catch (Exception e) {
@ -85,7 +86,7 @@ public class BlockOnParentShardTask implements ConsumerTask {
try {
Thread.sleep(parentShardPollIntervalMillis);
} catch (InterruptedException e) {
log.error("Sleep interrupted when waiting on parent shard(s) of {}", shardInfo.shardId(), e);
log.error("Sleep interrupted when waiting on parent shard(s) of {}", shardInfoId, e);
}
return new TaskResult(exception);

View file

@ -60,6 +60,7 @@ public class ProcessTask implements ConsumerTask {
private final ProcessRecordsInput processRecordsInput;
private final MetricsFactory metricsFactory;
private final AggregatorUtil aggregatorUtil;
private final String shardInfoId;
public ProcessTask(@NonNull ShardInfo shardInfo,
@NonNull ShardRecordProcessor shardRecordProcessor,
@ -74,6 +75,8 @@ public class ProcessTask implements ConsumerTask {
@NonNull AggregatorUtil aggregatorUtil,
@NonNull MetricsFactory metricsFactory) {
this.shardInfo = shardInfo;
this.shardInfoId = shardInfo.streamIdentifierSerOpt().map(s -> s + ":" + shardInfo.shardId())
.orElse(shardInfo.shardId());
this.shardRecordProcessor = shardRecordProcessor;
this.recordProcessorCheckpointer = recordProcessorCheckpointer;
this.backoffTimeMillis = backoffTimeMillis;
@ -121,7 +124,7 @@ public class ProcessTask implements ConsumerTask {
}
if (processRecordsInput.isAtShardEnd() && processRecordsInput.records().isEmpty()) {
log.info("Reached end of shard {} and have no records to process", shardInfo.shardId());
log.info("Reached end of shard {} and have no records to process", shardInfoId);
return new TaskResult(null, true);
}
@ -142,13 +145,13 @@ public class ProcessTask implements ConsumerTask {
}
success = true;
} catch (RuntimeException e) {
log.error("ShardId {}: Caught exception: ", shardInfo.shardId(), e);
log.error("ShardId {}: Caught exception: ", shardInfoId, e);
exception = e;
backoff();
}
if (processRecordsInput.isAtShardEnd()) {
log.info("Reached end of shard {}, and processed {} records", shardInfo.shardId(), processRecordsInput.records().size());
log.info("Reached end of shard {}, and processed {} records", shardInfoId, processRecordsInput.records().size());
return new TaskResult(null, true);
}
return new TaskResult(exception);
@ -174,7 +177,7 @@ public class ProcessTask implements ConsumerTask {
try {
Thread.sleep(this.backoffTimeMillis);
} catch (InterruptedException ie) {
log.debug("{}: Sleep was interrupted", shardInfo.shardId(), ie);
log.debug("{}: Sleep was interrupted", shardInfoId, ie);
}
}
@ -188,7 +191,7 @@ public class ProcessTask implements ConsumerTask {
*/
private void callProcessRecords(ProcessRecordsInput input, List<KinesisClientRecord> records) {
log.debug("Calling application processRecords() with {} records from {}", records.size(),
shardInfo.shardId());
shardInfoId);
final ProcessRecordsInput processRecordsInput = ProcessRecordsInput.builder().records(records).cacheExitTime(input.cacheExitTime()).cacheEntryTime(input.cacheEntryTime())
.checkpointer(recordProcessorCheckpointer).millisBehindLatest(input.millisBehindLatest()).build();
@ -200,8 +203,8 @@ public class ProcessTask implements ConsumerTask {
shardRecordProcessor.processRecords(processRecordsInput);
} catch (Exception e) {
log.error("ShardId {}: Application processRecords() threw an exception when processing shard ",
shardInfo.shardId(), e);
log.error("ShardId {}: Skipping over the following data records: {}", shardInfo.shardId(), records);
shardInfoId, e);
log.error("ShardId {}: Skipping over the following data records: {}", shardInfoId, records);
} finally {
MetricsUtil.addLatency(scope, RECORD_PROCESSOR_PROCESS_RECORDS_METRIC, startTime, MetricsLevel.SUMMARY);
MetricsUtil.endScope(scope);
@ -226,17 +229,17 @@ public class ProcessTask implements ConsumerTask {
* the time when the task started
*/
private void handleNoRecords(long startTimeMillis) {
log.debug("Kinesis didn't return any records for shard {}", shardInfo.shardId());
log.debug("Kinesis didn't return any records for shard {}", shardInfoId);
long sleepTimeMillis = idleTimeInMilliseconds - (System.currentTimeMillis() - startTimeMillis);
if (sleepTimeMillis > 0) {
sleepTimeMillis = Math.max(sleepTimeMillis, idleTimeInMilliseconds);
try {
log.debug("Sleeping for {} ms since there were no new records in shard {}", sleepTimeMillis,
shardInfo.shardId());
shardInfoId);
Thread.sleep(sleepTimeMillis);
} catch (InterruptedException e) {
log.debug("ShardId {}: Sleep was interrupted", shardInfo.shardId());
log.debug("ShardId {}: Sleep was interrupted", shardInfoId);
}
}
}
@ -273,8 +276,8 @@ public class ProcessTask implements ConsumerTask {
if (extendedSequenceNumber.compareTo(lastCheckpointValue) <= 0) {
recordIterator.remove();
log.debug("removing record with ESN {} because the ESN is <= checkpoint ({})", extendedSequenceNumber,
lastCheckpointValue);
log.debug("{} : removing record with ESN {} because the ESN is <= checkpoint ({})", shardInfoId,
extendedSequenceNumber, lastCheckpointValue);
continue;
}

View file

@ -62,6 +62,7 @@ public class ShardConsumer {
private final Function<ConsumerTask, ConsumerTask> taskMetricsDecorator;
private final int bufferSize;
private final TaskExecutionListener taskExecutionListener;
private final String streamIdentifier;
private ConsumerTask currentTask;
private TaskOutcome taskOutcome;
@ -124,6 +125,7 @@ public class ShardConsumer {
this.recordsPublisher = recordsPublisher;
this.executorService = executorService;
this.shardInfo = shardInfo;
this.streamIdentifier = shardInfo.streamIdentifierSerOpt().orElse("single_stream_mode");
this.shardConsumerArgument = shardConsumerArgument;
this.logWarningForTaskAfterMillis = logWarningForTaskAfterMillis;
this.taskExecutionListener = taskExecutionListener;
@ -208,8 +210,8 @@ public class ShardConsumer {
}
Throwable dispatchFailure = subscriber.getAndResetDispatchFailure();
if (dispatchFailure != null) {
log.warn("Exception occurred while dispatching incoming data. The incoming data has been skipped",
dispatchFailure);
log.warn("{} : Exception occurred while dispatching incoming data. The incoming data has been skipped",
streamIdentifier, dispatchFailure);
return dispatchFailure;
}
@ -238,7 +240,7 @@ public class ShardConsumer {
Instant now = Instant.now();
Duration timeSince = Duration.between(subscriber.lastDataArrival(), now);
if (timeSince.toMillis() > value) {
log.warn("Last time data arrived: {} ({})", lastDataArrival, timeSince);
log.warn("{} : Last time data arrived: {} ({})", streamIdentifier, lastDataArrival, timeSince);
}
}
});
@ -250,11 +252,11 @@ public class ShardConsumer {
if (taken != null) {
String message = longRunningTaskMessage(taken);
if (log.isDebugEnabled()) {
log.debug("{} Not submitting new task.", message);
log.debug("{} : {} Not submitting new task.", streamIdentifier, message);
}
logWarningForTaskAfterMillis.ifPresent(value -> {
if (taken.toMillis() > value) {
log.warn(message);
log.warn("{} : {}", streamIdentifier, message);
}
});
}
@ -358,7 +360,7 @@ public class ShardConsumer {
nextState = currentState.failureTransition();
break;
default:
log.error("No handler for outcome of {}", outcome.name());
log.error("{} : No handler for outcome of {}", streamIdentifier, outcome.name());
nextState = currentState.failureTransition();
break;
}
@ -382,9 +384,9 @@ public class ShardConsumer {
Exception taskException = taskResult.getException();
if (taskException instanceof BlockedOnParentShardException) {
// No need to log the stack trace for this exception (it is very specific).
log.debug("Shard {} is blocked on completion of parent shard.", shardInfo.shardId());
log.debug("{} : Shard {} is blocked on completion of parent shard.", streamIdentifier, shardInfo.shardId());
} else {
log.debug("Caught exception running {} task: ", currentTask.taskType(), taskResult.getException());
log.debug("{} : Caught exception running {} task: ", streamIdentifier, currentTask.taskType(), taskResult.getException());
}
}
}
@ -411,10 +413,10 @@ public class ShardConsumer {
* @return true if shutdown is complete (false if shutdown is still in progress)
*/
public boolean leaseLost() {
log.debug("Shutdown({}): Lease lost triggered.", shardInfo.shardId());
log.debug("{} : Shutdown({}): Lease lost triggered.", streamIdentifier, shardInfo.shardId());
if (subscriber != null) {
subscriber.cancel();
log.debug("Shutdown({}): Subscriber cancelled.", shardInfo.shardId());
log.debug("{} : Shutdown({}): Subscriber cancelled.", streamIdentifier, shardInfo.shardId());
}
markForShutdown(ShutdownReason.LEASE_LOST);
return isShutdown();

View file

@ -40,8 +40,8 @@ class ShardConsumerSubscriber implements Subscriber<RecordsRetrieved> {
private final int bufferSize;
private final ShardConsumer shardConsumer;
private final int readTimeoutsToIgnoreBeforeWarning;
private final String shardInfoId;
private volatile int readTimeoutSinceLastRead = 0;
@VisibleForTesting
final Object lockObject = new Object();
// This holds the last time an attempt of request to upstream service was made including the first try to
@ -70,6 +70,8 @@ class ShardConsumerSubscriber implements Subscriber<RecordsRetrieved> {
this.bufferSize = bufferSize;
this.shardConsumer = shardConsumer;
this.readTimeoutsToIgnoreBeforeWarning = readTimeoutsToIgnoreBeforeWarning;
this.shardInfoId = shardConsumer.shardInfo().streamIdentifierSerOpt()
.map(s -> s + ":" + shardConsumer.shardInfo().shardId()).orElse(shardConsumer.shardInfo().shardId());
}
@ -107,7 +109,7 @@ class ShardConsumerSubscriber implements Subscriber<RecordsRetrieved> {
if (retrievalFailure != null) {
synchronized (lockObject) {
String logMessage = String.format("%s: Failure occurred in retrieval. Restarting data requests",
shardConsumer.shardInfo().shardId());
shardInfoId);
if (retrievalFailure instanceof RetryableRetrievalException) {
log.debug(logMessage, retrievalFailure.getCause());
} else {
@ -130,7 +132,7 @@ class ShardConsumerSubscriber implements Subscriber<RecordsRetrieved> {
if (timeSinceLastResponse.toMillis() > maxTimeBetweenRequests) {
log.error(
"{}: Last request was dispatched at {}, but no response as of {} ({}). Cancelling subscription, and restarting. Last successful request details -- {}",
shardConsumer.shardInfo().shardId(), lastRequestTime, now, timeSinceLastResponse, recordsPublisher.getLastSuccessfulRequestDetails());
shardInfoId, lastRequestTime, now, timeSinceLastResponse, recordsPublisher.getLastSuccessfulRequestDetails());
cancel();
// Start the subscription again which will update the lastRequestTime as well.
@ -157,7 +159,7 @@ class ShardConsumerSubscriber implements Subscriber<RecordsRetrieved> {
subscription);
} catch (Throwable t) {
log.warn("{}: Caught exception from handleInput", shardConsumer.shardInfo().shardId(), t);
log.warn("{}: Caught exception from handleInput", shardInfoId, t);
synchronized (lockObject) {
dispatchFailure = t;
}
@ -193,7 +195,7 @@ class ShardConsumerSubscriber implements Subscriber<RecordsRetrieved> {
log.warn(
"{}: onError(). Cancelling subscription, and marking self as failed. KCL will "
+ "recreate the subscription as neccessary to continue processing.",
shardConsumer.shardInfo().shardId(), t);
shardInfoId, t);
}
protected void logOnErrorReadTimeoutWarning(Throwable t) {
@ -202,13 +204,13 @@ class ShardConsumerSubscriber implements Subscriber<RecordsRetrieved> {
+ "are seeing this warning frequently consider increasing the SDK timeouts "
+ "by providing an OverrideConfiguration to the kinesis client. Alternatively you"
+ "can configure LifecycleConfig.readTimeoutsToIgnoreBeforeWarning to suppress"
+ "intermittant ReadTimeout warnings.", shardConsumer.shardInfo().shardId(), t);
+ "intermittant ReadTimeout warnings.", shardInfoId, t);
}
@Override
public void onComplete() {
log.debug("{}: onComplete(): Received onComplete. Activity should be triggered externally",
shardConsumer.shardInfo().shardId());
shardInfoId);
}
public void cancel() {

View file

@ -82,6 +82,8 @@ public class ShutdownTask implements ConsumerTask {
private final TaskType taskType = TaskType.SHUTDOWN;
private String shardInfoId = shardInfo.streamIdentifierSerOpt().map(s -> s + ":" + shardInfo.shardId())
.orElse(shardInfo.shardId());
/*
* Invokes ShardRecordProcessor shutdown() API.
* (non-Javadoc)
@ -112,7 +114,7 @@ public class ShutdownTask implements ConsumerTask {
if (CollectionUtils.isNullOrEmpty(latestShards) || !isShardInContextParentOfAny(latestShards)) {
localReason = ShutdownReason.LEASE_LOST;
dropLease();
log.info("Forcing the lease to be lost before shutting down the consumer for Shard: " + shardInfo.shardId());
log.info("Forcing the lease to be lost before shutting down the consumer for Shard: " + shardInfoId);
}
}
@ -124,7 +126,7 @@ public class ShutdownTask implements ConsumerTask {
}
log.debug("Invoking shutdown() for shard {}, concurrencyToken {}. Shutdown reason: {}",
shardInfo.shardId(), shardInfo.concurrencyToken(), localReason);
shardInfoId, shardInfo.concurrencyToken(), localReason);
final ShutdownInput shutdownInput = ShutdownInput.builder().shutdownReason(localReason)
.checkpointer(recordProcessorCheckpointer).build();
final long startTime = System.currentTimeMillis();
@ -135,7 +137,7 @@ public class ShutdownTask implements ConsumerTask {
if (lastCheckpointValue == null
|| !lastCheckpointValue.equals(ExtendedSequenceNumber.SHARD_END)) {
throw new IllegalArgumentException("Application didn't checkpoint at end of shard "
+ shardInfo.shardId() + ". Application must checkpoint upon shard end. " +
+ shardInfoId + ". Application must checkpoint upon shard end. " +
"See ShardRecordProcessor.shardEnded javadocs for more information.");
}
} else {
@ -143,7 +145,7 @@ public class ShutdownTask implements ConsumerTask {
}
log.debug("Shutting down retrieval strategy.");
recordsPublisher.shutdown();
log.debug("Record processor completed shutdown() for shard {}", shardInfo.shardId());
log.debug("Record processor completed shutdown() for shard {}", shardInfoId);
} catch (Exception e) {
applicationException = true;
throw e;
@ -152,11 +154,11 @@ public class ShutdownTask implements ConsumerTask {
}
if (localReason == ShutdownReason.SHARD_END) {
log.debug("Looking for child shards of shard {}", shardInfo.shardId());
log.debug("Looking for child shards of shard {}", shardInfoId);
// create leases for the child shards
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseCoordinator.leaseRefresher(),
initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, latestShards);
log.debug("Finished checking for child shards of shard {}", shardInfo.shardId());
log.debug("Finished checking for child shards of shard {}", shardInfoId);
}
return new TaskResult(null);

View file

@ -105,7 +105,7 @@ public class RetrievalConfig {
this.applicationName = applicationName;
}
public void initialPositionInStreamExtended(InitialPositionInStreamExtended initialPositionInStreamExtended) {
public RetrievalConfig initialPositionInStreamExtended(InitialPositionInStreamExtended initialPositionInStreamExtended) {
final StreamConfig[] streamConfig = new StreamConfig[1];
this.appStreamTracker.apply(multiStreamTracker -> {
throw new IllegalArgumentException(
@ -113,6 +113,7 @@ public class RetrievalConfig {
}, sc -> streamConfig[0] = sc);
this.appStreamTracker = Either
.right(new StreamConfig(streamConfig[0].streamIdentifier(), initialPositionInStreamExtended));
return this;
}
public RetrievalFactory retrievalFactory() {

View file

@ -51,7 +51,6 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@ -60,7 +59,6 @@ import java.util.stream.Collectors;
import static software.amazon.kinesis.common.DiagnosticUtils.takeDelayedDeliveryActionIfRequired;
@RequiredArgsConstructor
@Slf4j
@KinesisClientInternalApi
public class FanOutRecordsPublisher implements RecordsPublisher {
@ -73,7 +71,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
private final KinesisAsyncClient kinesis;
private final String shardId;
private final String consumerArn;
private final String streamAndShardId;
private final Object lockObject = new Object();
private final AtomicInteger subscribeToShardId = new AtomicInteger(0);
@ -91,11 +89,25 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
private RequestDetails lastSuccessfulRequestDetails = new RequestDetails();
public FanOutRecordsPublisher(KinesisAsyncClient kinesis, String shardId, String consumerArn) {
this.kinesis = kinesis;
this.shardId = shardId;
this.consumerArn = consumerArn;
this.streamAndShardId = shardId;
}
public FanOutRecordsPublisher(KinesisAsyncClient kinesis, String shardId, String consumerArn, String streamIdentifierSer) {
this.kinesis = kinesis;
this.shardId = shardId;
this.consumerArn = consumerArn;
this.streamAndShardId = streamIdentifierSer + ":" + shardId;
}
@Override
public void start(ExtendedSequenceNumber extendedSequenceNumber,
InitialPositionInStreamExtended initialPositionInStreamExtended) {
synchronized (lockObject) {
log.debug("[{}] Initializing Publisher @ Sequence: {} -- Initial Position: {}", shardId,
log.debug("[{}] Initializing Publisher @ Sequence: {} -- Initial Position: {}", streamAndShardId,
extendedSequenceNumber, initialPositionInStreamExtended);
this.initialPositionInStreamExtended = initialPositionInStreamExtended;
this.currentSequenceNumber = extendedSequenceNumber.sequenceNumber();
@ -174,7 +186,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
// It is now safe to remove the element
recordsDeliveryQueue.poll();
// Take action based on the time spent by the event in queue.
takeDelayedDeliveryActionIfRequired(shardId, recordsRetrievedContext.getEnqueueTimestamp(), log);
takeDelayedDeliveryActionIfRequired(streamAndShardId, recordsRetrievedContext.getEnqueueTimestamp(), log);
// Update current sequence number for the successfully delivered event.
currentSequenceNumber = ((FanoutRecordsRetrieved)recordsRetrieved).continuationSequenceNumber();
// Update the triggering flow for post scheduling upstream request.
@ -190,13 +202,12 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
if (flow != null && recordsDeliveryAck.batchUniqueIdentifier().getFlowIdentifier()
.equals(flow.getSubscribeToShardId())) {
log.error(
"{}: Received unexpected ack for the active subscription {}. Throwing. ",
shardId, recordsDeliveryAck.batchUniqueIdentifier().getFlowIdentifier());
"{}: Received unexpected ack for the active subscription {}. Throwing. ", streamAndShardId, recordsDeliveryAck.batchUniqueIdentifier().getFlowIdentifier());
throw new IllegalStateException("Unexpected ack for the active subscription");
}
// Otherwise publisher received a stale ack.
else {
log.info("{}: Publisher received an ack for stale subscription {}. Ignoring.", shardId,
log.info("{}: Publisher received an ack for stale subscription {}. Ignoring.", streamAndShardId,
recordsDeliveryAck.batchUniqueIdentifier().getFlowIdentifier());
}
}
@ -219,10 +230,10 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
} catch (IllegalStateException e) {
log.warn("{}: Unable to enqueue the payload due to capacity restrictions in delivery queue with remaining capacity {}. Last successful request details -- {}",
shardId, recordsDeliveryQueue.remainingCapacity(), lastSuccessfulRequestDetails);
streamAndShardId, recordsDeliveryQueue.remainingCapacity(), lastSuccessfulRequestDetails);
throw e;
} catch (Throwable t) {
log.error("{}: Unable to deliver event to the shard consumer.", shardId, t);
log.error("{}: Unable to deliver event to the shard consumer.", streamAndShardId, t);
throw t;
}
}
@ -290,7 +301,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
String instanceId = shardId + "-" + subscribeInvocationId;
log.debug(
"{}: [SubscriptionLifetime]: (FanOutRecordsPublisher#subscribeToShard) @ {} id: {} -- Starting subscribe to shard",
shardId, connectionStart, instanceId);
streamAndShardId, connectionStart, instanceId);
flow = new RecordFlow(this, connectionStart, instanceId);
kinesis.subscribeToShard(request, flow);
}
@ -303,12 +314,12 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
if(hasValidFlow()) {
log.warn(
"{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ {} id: {} -- Subscriber is null." +
" Last successful request details -- {}", shardId, flow.connectionStartedAt,
" Last successful request details -- {}", streamAndShardId, flow.connectionStartedAt,
flow.subscribeToShardId, lastSuccessfulRequestDetails);
} else {
log.warn(
"{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) -- Subscriber and flow are null." +
" Last successful request details -- {}", shardId, lastSuccessfulRequestDetails);
" Last successful request details -- {}", streamAndShardId, lastSuccessfulRequestDetails);
}
return;
}
@ -320,8 +331,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
if (flow != null) {
String logMessage = String.format(
"%s: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ %s id: %s -- %s." +
" Last successful request details -- %s",
shardId, flow.connectionStartedAt, flow.subscribeToShardId, category.throwableTypeString, lastSuccessfulRequestDetails);
" Last successful request details -- %s", streamAndShardId, flow.connectionStartedAt, flow.subscribeToShardId, category.throwableTypeString, lastSuccessfulRequestDetails);
switch (category.throwableType) {
case READ_TIMEOUT:
log.debug(logMessage, propagationThrowable);
@ -339,13 +349,14 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
}
flow.cancel();
}
log.debug("{}: availableQueueSpace zeroing from {}", shardId, availableQueueSpace);
log.debug("{}: availableQueueSpace zeroing from {}", streamAndShardId, availableQueueSpace);
availableQueueSpace = 0;
try {
handleFlowError(propagationThrowable, triggeringFlow);
} catch (Throwable innerThrowable) {
log.warn("{}: Exception while calling subscriber.onError. Last successful request details -- {}", shardId, lastSuccessfulRequestDetails, innerThrowable);
log.warn("{}: Exception while calling subscriber.onError. Last successful request details -- {}",
streamAndShardId, lastSuccessfulRequestDetails, innerThrowable);
}
subscriber = null;
flow = null;
@ -353,7 +364,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
if (triggeringFlow != null) {
log.debug(
"{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ {} id: {} -- {} -> triggeringFlow wasn't the active flow. Didn't dispatch error",
shardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId,
streamAndShardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId,
category.throwableTypeString);
triggeringFlow.cancel();
}
@ -367,7 +378,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
// Clear any lingering records in the queue.
if (!recordsDeliveryQueue.isEmpty()) {
log.warn("{}: Found non-empty queue while starting subscription. This indicates unsuccessful clean up of"
+ "previous subscription - {}. Last successful request details -- {}", shardId, subscribeToShardId, lastSuccessfulRequestDetails);
+ "previous subscription - {}. Last successful request details -- {}", streamAndShardId, subscribeToShardId, lastSuccessfulRequestDetails);
recordsDeliveryQueue.clear();
}
}
@ -383,7 +394,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
if (t.getCause() instanceof ResourceNotFoundException) {
log.debug(
"{}: Could not call SubscribeToShard successfully because shard no longer exists. Marking shard for completion.",
shardId);
streamAndShardId);
// The ack received for this onNext event will be ignored by the publisher as the global flow object should
// be either null or renewed when the ack's flow identifier is evaluated.
FanoutRecordsRetrieved response = new FanoutRecordsRetrieved(
@ -452,7 +463,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
if (!hasValidSubscriber()) {
log.debug(
"{}: [SubscriptionLifetime] (FanOutRecordsPublisher#recordsReceived) @ {} id: {} -- Subscriber is null.",
shardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId);
streamAndShardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId);
triggeringFlow.cancel();
if (flow != null) {
flow.cancel();
@ -462,7 +473,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
if (!isActiveFlow(triggeringFlow)) {
log.debug(
"{}: [SubscriptionLifetime] (FanOutRecordsPublisher#recordsReceived) @ {} id: {} -- Received records for an inactive flow.",
shardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId);
streamAndShardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId);
return;
}
@ -478,7 +489,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
bufferCurrentEventAndScheduleIfRequired(recordsRetrieved, triggeringFlow);
} catch (Throwable t) {
log.warn("{}: Unable to buffer or schedule onNext for subscriber. Failing publisher." +
" Last successful request details -- {}", shardId, lastSuccessfulRequestDetails);
" Last successful request details -- {}", streamAndShardId, lastSuccessfulRequestDetails);
errorOccurred(triggeringFlow, t);
}
}
@ -488,7 +499,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
if (availableQueueSpace <= 0) {
log.debug(
"{}: [SubscriptionLifetime] (FanOutRecordsPublisher#recordsReceived) @ {} id: {} -- Attempted to decrement availableQueueSpace to below 0",
shardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId);
streamAndShardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId);
} else {
availableQueueSpace--;
if (availableQueueSpace > 0) {
@ -503,12 +514,13 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
private void onComplete(RecordFlow triggeringFlow) {
synchronized (lockObject) {
log.debug("{}: [SubscriptionLifetime]: (FanOutRecordsPublisher#onComplete) @ {} id: {}", shardId,
log.debug("{}: [SubscriptionLifetime]: (FanOutRecordsPublisher#onComplete) @ {} id: {}", streamAndShardId,
triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId);
triggeringFlow.cancel();
if (!hasValidSubscriber()) {
log.debug("{}: [SubscriptionLifetime]: (FanOutRecordsPublisher#onComplete) @ {} id: {}", shardId,
log.debug("{}: [SubscriptionLifetime]: (FanOutRecordsPublisher#onComplete) @ {} id: {}",
streamAndShardId,
triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId);
return;
}
@ -516,15 +528,15 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
if (!isActiveFlow(triggeringFlow)) {
log.debug(
"{}: [SubscriptionLifetime]: (FanOutRecordsPublisher#onComplete) @ {} id: {} -- Received spurious onComplete from unexpected flow. Ignoring.",
shardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId);
streamAndShardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId);
return;
}
if (currentSequenceNumber != null) {
log.debug("{}: Shard hasn't ended. Resubscribing.", shardId);
log.debug("{}: Shard hasn't ended. Resubscribing.", streamAndShardId);
subscribeToShard(currentSequenceNumber);
} else {
log.debug("{}: Shard has ended completing subscriber.", shardId);
log.debug("{}: Shard has ended completing subscriber.", streamAndShardId);
subscriber.onComplete();
}
}
@ -536,7 +548,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
if (subscriber != null) {
log.error(
"{}: A subscribe occurred while there was an active subscriber. Sending error to current subscriber",
shardId);
streamAndShardId);
MultipleSubscriberException multipleSubscriberException = new MultipleSubscriberException();
//
@ -575,7 +587,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
if (subscriber != s) {
log.warn(
"{}: (FanOutRecordsPublisher/Subscription#request) - Rejected an attempt to request({}), because subscribers don't match. Last successful request details -- {}",
shardId, n, lastSuccessfulRequestDetails);
streamAndShardId, n, lastSuccessfulRequestDetails);
return;
}
if (flow == null) {
@ -584,7 +596,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
//
log.debug(
"{}: (FanOutRecordsPublisher/Subscription#request) - Request called for a null flow.",
shardId);
streamAndShardId);
errorOccurred(flow, new IllegalStateException("Attempted to request on a null flow."));
return;
}
@ -602,19 +614,19 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
if (subscriber != s) {
log.warn(
"{}: (FanOutRecordsPublisher/Subscription#cancel) - Rejected attempt to cancel subscription, because subscribers don't match. Last successful request details -- {}",
shardId, lastSuccessfulRequestDetails);
streamAndShardId, lastSuccessfulRequestDetails);
return;
}
if (!hasValidSubscriber()) {
log.warn(
"{}: (FanOutRecordsPublisher/Subscription#cancel) - Cancelled called even with an invalid subscriber. Last successful request details -- {}",
shardId, lastSuccessfulRequestDetails);
streamAndShardId, lastSuccessfulRequestDetails);
}
subscriber = null;
if (flow != null) {
log.debug(
"{}: [SubscriptionLifetime]: (FanOutRecordsPublisher/Subscription#cancel) @ {} id: {}",
shardId, flow.connectionStartedAt, flow.subscribeToShardId);
streamAndShardId, flow.connectionStartedAt, flow.subscribeToShardId);
flow.cancel();
availableQueueSpace = 0;
}
@ -703,12 +715,12 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
public void onEventStream(SdkPublisher<SubscribeToShardEventStream> publisher) {
synchronized (parent.lockObject) {
log.debug("{}: [SubscriptionLifetime]: (RecordFlow#onEventStream) @ {} id: {} -- Subscribe",
parent.shardId, connectionStartedAt, subscribeToShardId);
parent.streamAndShardId, connectionStartedAt, subscribeToShardId);
if (!parent.isActiveFlow(this)) {
this.isDisposed = true;
log.debug(
"{}: [SubscriptionLifetime]: (RecordFlow#onEventStream) @ {} id: {} -- parent is disposed",
parent.shardId, connectionStartedAt, subscribeToShardId);
parent.streamAndShardId, connectionStartedAt, subscribeToShardId);
parent.rejectSubscription(publisher);
return;
}
@ -716,7 +728,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
try {
log.debug(
"{}: [SubscriptionLifetime]: (RecordFlow#onEventStream) @ {} id: {} -- creating record subscription",
parent.shardId, connectionStartedAt, subscribeToShardId);
parent.streamAndShardId, connectionStartedAt, subscribeToShardId);
subscription = new RecordSubscription(parent, this, connectionStartedAt, subscribeToShardId);
publisher.subscribe(subscription);
@ -727,7 +739,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
} catch (Throwable t) {
log.debug(
"{}: [SubscriptionLifetime]: (RecordFlow#onEventStream) @ {} id: {} -- throwable during record subscription: {}",
parent.shardId, connectionStartedAt, subscribeToShardId, t.getMessage());
parent.streamAndShardId, connectionStartedAt, subscribeToShardId, t.getMessage());
parent.errorOccurred(this, t);
}
}
@ -736,7 +748,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
@Override
public void responseReceived(SubscribeToShardResponse response) {
log.debug("{}: [SubscriptionLifetime]: (RecordFlow#responseReceived) @ {} id: {} -- Response received. Request id - {}",
parent.shardId, connectionStartedAt, subscribeToShardId, response.responseMetadata().requestId());
parent.streamAndShardId, connectionStartedAt, subscribeToShardId, response.responseMetadata().requestId());
final RequestDetails requestDetails = new RequestDetails(response.responseMetadata().requestId(), connectionStartedAt.toString());
parent.setLastSuccessfulRequestDetails(requestDetails);
@ -759,12 +771,12 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
synchronized (parent.lockObject) {
log.debug("{}: [SubscriptionLifetime]: (RecordFlow#exceptionOccurred) @ {} id: {} -- {}: {}",
parent.shardId, connectionStartedAt, subscribeToShardId, throwable.getClass().getName(),
parent.streamAndShardId, connectionStartedAt, subscribeToShardId, throwable.getClass().getName(),
throwable.getMessage());
if (this.isDisposed) {
log.debug(
"{}: [SubscriptionLifetime]: (RecordFlow#exceptionOccurred) @ {} id: {} -- This flow has been disposed, not dispatching error. {}: {}",
parent.shardId, connectionStartedAt, subscribeToShardId, throwable.getClass().getName(),
parent.streamAndShardId, connectionStartedAt, subscribeToShardId, throwable.getClass().getName(),
throwable.getMessage());
this.isErrorDispatched = true;
}
@ -775,7 +787,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
} else {
log.debug(
"{}: [SubscriptionLifetime]: (RecordFlow#exceptionOccurred) @ {} id: {} -- An error has previously been dispatched, not dispatching this error {}: {}",
parent.shardId, connectionStartedAt, subscribeToShardId, throwable.getClass().getName(),
parent.streamAndShardId, connectionStartedAt, subscribeToShardId, throwable.getClass().getName(),
throwable.getMessage());
}
}
@ -802,7 +814,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
} catch (Exception e) {
log.warn(
"{}: Unable to enqueue the {} shutdown event due to capacity restrictions in delivery queue with remaining capacity {}. Ignoring. Last successful request details -- {}",
parent.shardId, subscriptionShutdownEvent.getEventIdentifier(), parent.recordsDeliveryQueue.remainingCapacity(),
parent.streamAndShardId, subscriptionShutdownEvent.getEventIdentifier(), parent.recordsDeliveryQueue.remainingCapacity(),
parent.lastSuccessfulRequestDetails, subscriptionShutdownEvent.getShutdownEventThrowableOptional());
}
}
@ -810,7 +822,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
private void executeComplete() {
synchronized (parent.lockObject) {
log.debug("{}: [SubscriptionLifetime]: (RecordFlow#complete) @ {} id: {} -- Connection completed",
parent.shardId, connectionStartedAt, subscribeToShardId);
parent.streamAndShardId, connectionStartedAt, subscribeToShardId);
if (isCancelled) {
//
@ -820,13 +832,13 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
// subscription, which was cancelled for a reason (usually queue overflow).
//
log.warn("{}: complete called on a cancelled subscription. Ignoring completion. Last successful request details -- {}",
parent.shardId, parent.lastSuccessfulRequestDetails);
parent.streamAndShardId, parent.lastSuccessfulRequestDetails);
return;
}
if (this.isDisposed) {
log.warn(
"{}: [SubscriptionLifetime]: (RecordFlow#complete) @ {} id: {} -- This flow has been disposed not dispatching completion. Last successful request details -- {}",
parent.shardId, connectionStartedAt, subscribeToShardId, parent.lastSuccessfulRequestDetails);
parent.streamAndShardId, connectionStartedAt, subscribeToShardId, parent.lastSuccessfulRequestDetails);
return;
}
@ -844,7 +856,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
} catch (Throwable t) {
log.error(
"{}: [SubscriptionLifetime]: (RecordFlow#complete) @ {} id: {} -- Exception while trying to cancel failed subscription: {}",
parent.shardId, connectionStartedAt, subscribeToShardId, t.getMessage(), t);
parent.streamAndShardId, connectionStartedAt, subscribeToShardId, t.getMessage(), t);
}
}
}
@ -885,14 +897,14 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
public void cancel() {
synchronized (parent.lockObject) {
log.debug("{}: [SubscriptionLifetime]: (RecordSubscription#cancel) @ {} id: {} -- Cancel called",
parent.shardId, connectionStartedAt, subscribeToShardId);
parent.streamAndShardId, connectionStartedAt, subscribeToShardId);
flow.isCancelled = true;
if (subscription != null) {
subscription.cancel();
} else {
log.debug(
"{}: [SubscriptionLifetime]: (RecordSubscription#cancel) @ {} id: {} -- SDK subscription is null",
parent.shardId, connectionStartedAt, subscribeToShardId);
parent.streamAndShardId, connectionStartedAt, subscribeToShardId);
}
}
}
@ -906,21 +918,21 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
if (flow.isCancelled) {
log.debug(
"{}: [SubscriptionLifetime]: (RecordSubscription#onSubscribe) @ {} id: {} -- Subscription was cancelled before onSubscribe",
parent.shardId, connectionStartedAt, subscribeToShardId);
parent.streamAndShardId, connectionStartedAt, subscribeToShardId);
}
if (flow.isDisposed) {
log.debug(
"{}: [SubscriptionLifetime]: (RecordSubscription#onSubscribe) @ {} id: {} -- RecordFlow has been disposed cancelling subscribe",
parent.shardId, connectionStartedAt, subscribeToShardId);
parent.streamAndShardId, connectionStartedAt, subscribeToShardId);
}
log.debug(
"{}: [SubscriptionLifetime]: (RecordSubscription#onSubscribe) @ {} id: {} -- RecordFlow requires cancelling",
parent.shardId, connectionStartedAt, subscribeToShardId);
parent.streamAndShardId, connectionStartedAt, subscribeToShardId);
cancel();
}
log.debug(
"{}: [SubscriptionLifetime]: (RecordSubscription#onSubscribe) @ {} id: {} -- Outstanding: {} items so requesting an item",
parent.shardId, connectionStartedAt, subscribeToShardId, parent.availableQueueSpace);
parent.streamAndShardId, connectionStartedAt, subscribeToShardId, parent.availableQueueSpace);
if (parent.availableQueueSpace > 0) {
request(1);
}
@ -933,7 +945,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
if (flow.shouldSubscriptionCancel()) {
log.debug(
"{}: [SubscriptionLifetime]: (RecordSubscription#onNext) @ {} id: {} -- RecordFlow requires cancelling",
parent.shardId, connectionStartedAt, subscribeToShardId);
parent.streamAndShardId, connectionStartedAt, subscribeToShardId);
cancel();
return;
}
@ -948,7 +960,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
@Override
public void onError(Throwable t) {
log.debug("{}: [SubscriptionLifetime]: (RecordSubscription#onError) @ {} id: {} -- {}: {}", parent.shardId,
log.debug("{}: [SubscriptionLifetime]: (RecordSubscription#onError) @ {} id: {} -- {}: {}", parent.streamAndShardId,
connectionStartedAt, subscribeToShardId, t.getClass().getName(), t.getMessage());
//
@ -961,7 +973,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
public void onComplete() {
log.debug(
"{}: [SubscriptionLifetime]: (RecordSubscription#onComplete) @ {} id: {} -- Allowing RecordFlow to call onComplete",
parent.shardId, connectionStartedAt, subscribeToShardId);
parent.streamAndShardId, connectionStartedAt, subscribeToShardId);
}
}
}

View file

@ -63,7 +63,7 @@ public class KinesisDataFetcher {
@NonNull
private final KinesisAsyncClient kinesisClient;
@NonNull
@NonNull @Getter
private final StreamIdentifier streamIdentifier;
@NonNull
private final String shardId;
@ -71,6 +71,7 @@ public class KinesisDataFetcher {
@NonNull
private final MetricsFactory metricsFactory;
private final Duration maxFutureWait;
private final String streamAndShardId;
@Deprecated
public KinesisDataFetcher(KinesisAsyncClient kinesisClient, String streamName, String shardId, int maxRecords, MetricsFactory metricsFactory) {
@ -93,6 +94,7 @@ public class KinesisDataFetcher {
this.maxRecords = maxRecords;
this.metricsFactory = metricsFactory;
this.maxFutureWait = maxFutureWait;
this.streamAndShardId = streamIdentifier.serialize() + ":" + shardId;
}
/** Note: This method has package level access for testing purposes.
@ -120,7 +122,7 @@ public class KinesisDataFetcher {
try {
return new AdvancingResult(getRecords(nextIterator));
} catch (ResourceNotFoundException e) {
log.info("Caught ResourceNotFoundException when fetching records for shard {}", shardId);
log.info("Caught ResourceNotFoundException when fetching records for shard {}", streamAndShardId);
return TERMINAL_RESULT;
}
} else {
@ -182,14 +184,14 @@ public class KinesisDataFetcher {
*/
public void initialize(final String initialCheckpoint,
final InitialPositionInStreamExtended initialPositionInStream) {
log.info("Initializing shard {} with {}", shardId, initialCheckpoint);
log.info("Initializing shard {} with {}", streamAndShardId, initialCheckpoint);
advanceIteratorTo(initialCheckpoint, initialPositionInStream);
isInitialized = true;
}
public void initialize(final ExtendedSequenceNumber initialCheckpoint,
final InitialPositionInStreamExtended initialPositionInStream) {
log.info("Initializing shard {} with {}", shardId, initialCheckpoint.sequenceNumber());
log.info("Initializing shard {} with {}", streamAndShardId, initialCheckpoint.sequenceNumber());
advanceIteratorTo(initialCheckpoint.sequenceNumber(), initialPositionInStream);
isInitialized = true;
}
@ -234,7 +236,7 @@ public class KinesisDataFetcher {
throw new RetryableRetrievalException(e.getMessage(), e);
}
} catch (ResourceNotFoundException e) {
log.info("Caught ResourceNotFoundException when getting an iterator for shard {}", shardId, e);
log.info("Caught ResourceNotFoundException when getting an iterator for shard {}", streamAndShardId, e);
nextIterator = null;
} finally {
MetricsUtil.addSuccessAndLatency(metricsScope, String.format("%s.%s", METRICS_PREFIX, "getShardIterator"),
@ -285,7 +287,7 @@ public class KinesisDataFetcher {
throw exceptionManager.apply(e.getCause());
} catch (InterruptedException e) {
// TODO: Check behavior
log.debug("Interrupt called on metod, shutdown initiated");
log.debug("{} : Interrupt called on method, shutdown initiated", streamAndShardId);
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RetryableRetrievalException(e.getMessage(), e);

View file

@ -91,7 +91,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
private final DefaultGetRecordsCacheDaemon defaultGetRecordsCacheDaemon;
private boolean started = false;
private final String operation;
private final String shardId;
private final String streamAndShardId;
private Subscriber<? super RecordsRetrieved> subscriber;
@VisibleForTesting @Getter
private final PublisherSession publisherSession;
@ -135,11 +135,11 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
// Handle records delivery ack and execute nextEventDispatchAction.
// This method is not thread-safe and needs to be called after acquiring a monitor.
void handleRecordsDeliveryAck(RecordsDeliveryAck recordsDeliveryAck, String shardId, Runnable nextEventDispatchAction) {
void handleRecordsDeliveryAck(RecordsDeliveryAck recordsDeliveryAck, String streamAndShardId, Runnable nextEventDispatchAction) {
final PrefetchRecordsRetrieved recordsToCheck = peekNextRecord();
// Verify if the ack matches the head of the queue and evict it.
if (recordsToCheck != null && recordsToCheck.batchUniqueIdentifier().equals(recordsDeliveryAck.batchUniqueIdentifier())) {
evictPublishedRecordAndUpdateDemand(shardId);
evictPublishedRecordAndUpdateDemand(streamAndShardId);
nextEventDispatchAction.run();
} else {
// Log and ignore any other ack received. As long as an ack is received for head of the queue
@ -148,21 +148,21 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
final BatchUniqueIdentifier peekedBatchUniqueIdentifier =
recordsToCheck == null ? null : recordsToCheck.batchUniqueIdentifier();
log.info("{} : Received a stale notification with id {} instead of expected id {} at {}. Will ignore.",
shardId, recordsDeliveryAck.batchUniqueIdentifier(), peekedBatchUniqueIdentifier, Instant.now());
streamAndShardId, recordsDeliveryAck.batchUniqueIdentifier(), peekedBatchUniqueIdentifier, Instant.now());
}
}
// Evict the published record from the prefetch queue.
// This method is not thread-safe and needs to be called after acquiring a monitor.
@VisibleForTesting
RecordsRetrieved evictPublishedRecordAndUpdateDemand(String shardId) {
RecordsRetrieved evictPublishedRecordAndUpdateDemand(String streamAndShardId) {
final PrefetchRecordsRetrieved result = prefetchRecordsQueue.poll();
if (result != null) {
updateDemandTrackersOnPublish(result);
} else {
log.info(
"{}: No record batch found while evicting from the prefetch queue. This indicates the prefetch buffer"
+ "was reset.", shardId);
+ "was reset.", streamAndShardId);
}
return result;
}
@ -222,7 +222,8 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
this.defaultGetRecordsCacheDaemon = new DefaultGetRecordsCacheDaemon();
Validate.notEmpty(operation, "Operation cannot be empty");
this.operation = operation;
this.shardId = shardId;
this.streamAndShardId =
this.getRecordsRetrievalStrategy.getDataFetcher().getStreamIdentifier().serialize() + ":" + shardId;
}
@Override
@ -234,7 +235,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
publisherSession.init(extendedSequenceNumber, initialPositionInStreamExtended);
if (!started) {
log.info("{} : Starting prefetching thread.", shardId);
log.info("{} : Starting prefetching thread.", streamAndShardId);
executorService.execute(defaultGetRecordsCacheDaemon);
}
started = true;
@ -304,9 +305,9 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
@Override
public synchronized void notify(RecordsDeliveryAck recordsDeliveryAck) {
publisherSession.handleRecordsDeliveryAck(recordsDeliveryAck, shardId, () -> drainQueueForRequests());
publisherSession.handleRecordsDeliveryAck(recordsDeliveryAck, streamAndShardId, () -> drainQueueForRequests());
// Take action based on the time spent by the event in queue.
takeDelayedDeliveryActionIfRequired(shardId, lastEventDeliveryTime, log);
takeDelayedDeliveryActionIfRequired(streamAndShardId, lastEventDeliveryTime, log);
}
// Note : Do not make this method synchronous as notify() will not be able to evict any entry from the queue.
@ -403,7 +404,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
public void run() {
while (!isShutdown) {
if (Thread.currentThread().isInterrupted()) {
log.warn("{} : Prefetch thread was interrupted.", shardId);
log.warn("{} : Prefetch thread was interrupted.", streamAndShardId);
break;
}
@ -411,7 +412,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
try {
makeRetrievalAttempt();
} catch(PositionResetException pre) {
log.debug("{} : Position was reset while attempting to add item to queue.", shardId);
log.debug("{} : Position was reset while attempting to add item to queue.", streamAndShardId);
} finally {
resetLock.readLock().unlock();
}
@ -447,23 +448,23 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
} catch (PositionResetException pse) {
throw pse;
} catch (RetryableRetrievalException rre) {
log.info("{} : Timeout occurred while waiting for response from Kinesis. Will retry the request.", shardId);
log.info("{} : Timeout occurred while waiting for response from Kinesis. Will retry the request.", streamAndShardId);
} catch (InterruptedException e) {
log.info("{} : Thread was interrupted, indicating shutdown was called on the cache.", shardId);
log.info("{} : Thread was interrupted, indicating shutdown was called on the cache.", streamAndShardId);
} catch (ExpiredIteratorException e) {
log.info("{} : records threw ExpiredIteratorException - restarting"
+ " after greatest seqNum passed to customer", shardId, e);
+ " after greatest seqNum passed to customer", streamAndShardId, e);
scope.addData(EXPIRED_ITERATOR_METRIC, 1, StandardUnit.COUNT, MetricsLevel.SUMMARY);
publisherSession.dataFetcher().restartIterator();
} catch (SdkException e) {
log.error("{} : Exception thrown while fetching records from Kinesis", shardId, e);
log.error("{} : Exception thrown while fetching records from Kinesis", streamAndShardId, e);
} catch (Throwable e) {
log.error("{} : Unexpected exception was thrown. This could probably be an issue or a bug." +
" Please search for the exception/error online to check what is going on. If the " +
"issue persists or is a recurring problem, feel free to open an issue on, " +
"https://github.com/awslabs/amazon-kinesis-client.", shardId, e);
"https://github.com/awslabs/amazon-kinesis-client.", streamAndShardId, e);
} finally {
MetricsUtil.endScope(scope);
}
@ -475,7 +476,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
publisherSession.prefetchCounters().waitForConsumer();
} catch (InterruptedException ie) {
log.info("{} : Thread was interrupted while waiting for the consumer. " +
"Shutdown has probably been started", shardId);
"Shutdown has probably been started", streamAndShardId);
}
}
}
@ -522,14 +523,14 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
public synchronized void waitForConsumer() throws InterruptedException {
if (!shouldGetNewRecords()) {
log.debug("{} : Queue is full waiting for consumer for {} ms", shardId, idleMillisBetweenCalls);
log.debug("{} : Queue is full waiting for consumer for {} ms", streamAndShardId, idleMillisBetweenCalls);
this.wait(idleMillisBetweenCalls);
}
}
public synchronized boolean shouldGetNewRecords() {
if (log.isDebugEnabled()) {
log.debug("{} : Current Prefetch Counter States: {}", shardId, this.toString());
log.debug("{} : Current Prefetch Counter States: {}", streamAndShardId, this.toString());
}
return size < maxRecordsCount && byteSize < maxByteSize;
}

View file

@ -1156,6 +1156,45 @@ public class HierarchicalShardSyncerTest {
// * Current leases: (4, 5, 7)
// */
@Test
public void understandLeaseBehavior() {
final List<Shard> shards = constructShardListForGraphA();
// final List<Lease> currentLeases = Arrays.asList(newLease("shardId-4"), newLease("shardId-5"),
// newLease("shardId-7"));
final List<Lease> currentLeases = Collections.emptyList();
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
INITIAL_POSITION_LATEST);
System.out.println("Leases : " + newLeases.stream().map(lease -> lease.leaseKey() + ":" + lease.checkpoint()).collect(
Collectors.joining()));
final Map<String, ExtendedSequenceNumber> expectedShardIdCheckpointMap = new HashMap<>();
expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.TRIM_HORIZON);
expectedShardIdCheckpointMap.put("shardId-9", ExtendedSequenceNumber.TRIM_HORIZON);
expectedShardIdCheckpointMap.put("shardId-10", ExtendedSequenceNumber.TRIM_HORIZON);
expectedShardIdCheckpointMap.put("shardId-6", ExtendedSequenceNumber.LATEST);
assertThat(newLeases.size(), equalTo(expectedShardIdCheckpointMap.size()));
for (Lease lease : newLeases) {
assertThat("Unexpected lease: " + lease, expectedShardIdCheckpointMap.containsKey(lease.leaseKey()),
equalTo(true));
assertThat(lease.checkpoint(), equalTo(expectedShardIdCheckpointMap.get(lease.leaseKey())));
}
}
/**
* Test CheckIfDescendantAndAddNewLeasesForAncestors (initial position Latest)
* Shard structure (each level depicts a stream segment):
* 0 1 2 3 4 5- shards till epoch 102
* \ / \ / | |
* 6 7 4 5- shards from epoch 103 - 205
* \ / | /\
* 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
* Current leases: (4, 5, 7)
*/
@Test
public void testDetermineNewLeasesToCreateSplitMergeLatest2() {
final List<Shard> shards = constructShardListForGraphA();
final List<Lease> currentLeases = Arrays.asList(newLease("shardId-4"), newLease("shardId-5"),