Merging multstreming dynamic stream syncer, logging changes and periodic lease sync modifications

This commit is contained in:
Ashwin Giridharan 2020-04-02 17:07:15 -07:00
commit 33cd0f52b2
21 changed files with 503 additions and 218 deletions

View file

@ -28,11 +28,11 @@ public class DiagnosticUtils {
/**
* Util for RecordPublisher to measure the event delivery latency of the executor service and take appropriate action.
* @param shardId of the shard that is having delayed delivery
* @param resourceIdentifier of the shard that is having delayed delivery
* @param enqueueTimestamp of the event submitted to the executor service
* @param log Slf4j Logger from RecordPublisher to log the events
*/
public static void takeDelayedDeliveryActionIfRequired(String shardId, Instant enqueueTimestamp, Logger log) {
public static void takeDelayedDeliveryActionIfRequired(String resourceIdentifier, Instant enqueueTimestamp, Logger log) {
final long durationBetweenEnqueueAndAckInMillis = Duration
.between(enqueueTimestamp, Instant.now()).toMillis();
if (durationBetweenEnqueueAndAckInMillis > MAX_TIME_BETWEEN_REQUEST_RESPONSE / 3) {
@ -41,9 +41,9 @@ public class DiagnosticUtils {
"{}: Record delivery time to shard consumer is high at {} millis. Check the ExecutorStateEvent logs"
+ " to see the state of the executor service. Also check if the RecordProcessor's processing "
+ "time is high. ",
shardId, durationBetweenEnqueueAndAckInMillis);
resourceIdentifier, durationBetweenEnqueueAndAckInMillis);
} else if (log.isDebugEnabled()) {
log.debug("{}: Record delivery time to shard consumer is {} millis", shardId,
log.debug("{}: Record delivery time to shard consumer is {} millis", resourceIdentifier,
durationBetweenEnqueueAndAckInMillis);
}
}

View file

@ -14,6 +14,7 @@
*/
package software.amazon.kinesis.common;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import java.util.Date;
@ -22,7 +23,7 @@ import java.util.Date;
* Class that houses the entities needed to specify the position in the stream from where a new application should
* start.
*/
@ToString
@ToString @EqualsAndHashCode
public class InitialPositionInStreamExtended {
private final InitialPositionInStream position;

View file

@ -18,6 +18,7 @@ import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.Validate;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException;
import software.amazon.kinesis.leases.ShardSyncTaskManager;
@ -30,6 +31,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
/**
* The top level orchestrator for coordinating the periodic shard sync related
@ -44,21 +46,24 @@ class PeriodicShardSyncManager {
private final String workerId;
private final LeaderDecider leaderDecider;
private final Map<StreamIdentifier, ShardSyncTaskManager> streamToShardSyncTaskManagerMap;
private final Map<StreamIdentifier, StreamConfig> currentStreamConfigMap;
private final Function<StreamConfig, ShardSyncTaskManager> shardSyncTaskManagerProvider;
private final ScheduledExecutorService shardSyncThreadPool;
private boolean isRunning;
PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, Map<StreamIdentifier, ShardSyncTaskManager> streamToShardSyncTaskManagerMap) {
this(workerId, leaderDecider, streamToShardSyncTaskManagerMap, Executors.newSingleThreadScheduledExecutor());
PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, Map<StreamIdentifier, StreamConfig> currentStreamConfigMap,
Function<StreamConfig, ShardSyncTaskManager> shardSyncTaskManagerProvider) {
this(workerId, leaderDecider, currentStreamConfigMap, shardSyncTaskManagerProvider, Executors.newSingleThreadScheduledExecutor());
}
PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, Map<StreamIdentifier, ShardSyncTaskManager> streamToShardSyncTaskManagerMap,
ScheduledExecutorService shardSyncThreadPool) {
PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, Map<StreamIdentifier, StreamConfig> currentStreamConfigMap,
Function<StreamConfig, ShardSyncTaskManager> shardSyncTaskManagerProvider, ScheduledExecutorService shardSyncThreadPool) {
Validate.notBlank(workerId, "WorkerID is required to initialize PeriodicShardSyncManager.");
Validate.notNull(leaderDecider, "LeaderDecider is required to initialize PeriodicShardSyncManager.");
this.workerId = workerId;
this.leaderDecider = leaderDecider;
this.streamToShardSyncTaskManagerMap = streamToShardSyncTaskManagerMap;
this.currentStreamConfigMap = currentStreamConfigMap;
this.shardSyncTaskManagerProvider = shardSyncTaskManagerProvider;
this.shardSyncThreadPool = shardSyncThreadPool;
}
@ -85,8 +90,13 @@ class PeriodicShardSyncManager {
* @return the result of the task
*/
public synchronized void syncShardsOnce() throws Exception {
for (Map.Entry<StreamIdentifier, ShardSyncTaskManager> mapEntry : streamToShardSyncTaskManagerMap.entrySet()) {
final ShardSyncTaskManager shardSyncTaskManager = mapEntry.getValue();
// TODO: Resume the shard sync from failed stream in the next attempt, to avoid syncing
// TODO: for already synced streams
for(Map.Entry<StreamIdentifier, StreamConfig> streamConfigEntry : currentStreamConfigMap.entrySet()) {
final StreamIdentifier streamIdentifier = streamConfigEntry.getKey();
log.info("Syncing Kinesis shard info for " + streamIdentifier);
final StreamConfig streamConfig = streamConfigEntry.getValue();
final ShardSyncTaskManager shardSyncTaskManager = shardSyncTaskManagerProvider.apply(streamConfig);
final TaskResult taskResult = shardSyncTaskManager.executeShardSyncTask();
if (taskResult.getException() != null) {
throw taskResult.getException();
@ -106,8 +116,8 @@ class PeriodicShardSyncManager {
private void runShardSync() {
if (leaderDecider.isLeader(workerId)) {
for (Map.Entry<StreamIdentifier, ShardSyncTaskManager> mapEntry : streamToShardSyncTaskManagerMap.entrySet()) {
final ShardSyncTaskManager shardSyncTaskManager = mapEntry.getValue();
for (Map.Entry<StreamIdentifier, StreamConfig> streamConfigEntry : currentStreamConfigMap.entrySet()) {
final ShardSyncTaskManager shardSyncTaskManager = shardSyncTaskManagerProvider.apply(streamConfigEntry.getValue());
if (!shardSyncTaskManager.syncShardAndLeaseInfo()) {
log.warn("Failed to submit shard sync task for stream {}. This could be due to the previous shard sync task not finished.",
shardSyncTaskManager.shardDetector().streamIdentifier().streamName());

View file

@ -19,6 +19,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -34,6 +35,7 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import io.reactivex.plugins.RxJavaPlugins;
import lombok.AccessLevel;
import lombok.Getter;
@ -44,8 +46,8 @@ import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.utils.Validate;
import software.amazon.kinesis.checkpoint.CheckpointConfig;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.leases.Lease;
@ -53,10 +55,13 @@ import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.LeaseManagementConfig;
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.LeaseSerializer;
import software.amazon.kinesis.leases.MultiStreamLease;
import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.ShardPrioritization;
import software.amazon.kinesis.leases.ShardSyncTask;
import software.amazon.kinesis.leases.ShardSyncTaskManager;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseCoordinator;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseSerializer;
import software.amazon.kinesis.leases.dynamodb.DynamoDBMultiStreamLeaseSerializer;
@ -70,10 +75,13 @@ import software.amazon.kinesis.lifecycle.ShardConsumerArgument;
import software.amazon.kinesis.lifecycle.ShardConsumerShutdownNotification;
import software.amazon.kinesis.lifecycle.ShutdownNotification;
import software.amazon.kinesis.lifecycle.ShutdownReason;
import software.amazon.kinesis.lifecycle.TaskResult;
import software.amazon.kinesis.metrics.CloudWatchMetricsFactory;
import software.amazon.kinesis.metrics.MetricsCollectingTaskDecorator;
import software.amazon.kinesis.metrics.MetricsConfig;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.processor.Checkpointer;
import software.amazon.kinesis.processor.MultiStreamTracker;
import software.amazon.kinesis.processor.ProcessorConfig;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.processor.ShutdownNotificationAware;
@ -97,6 +105,8 @@ public class Scheduler implements Runnable {
private static final long MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 1 * 1000L;
private static final long MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 30 * 1000L;
private static final long HASH_RANGE_COVERAGE_CHECK_FREQUENCY_MILLIS = 5000L;
private static final long NEW_STREAM_CHECK_INTERVAL_MILLIS = 1 * 60 * 1000L;
private SchedulerLog slog = new SchedulerLog();
private final CheckpointConfig checkpointConfig;
@ -119,8 +129,8 @@ public class Scheduler implements Runnable {
private final DiagnosticEventHandler diagnosticEventHandler;
// private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
private final LeaseCoordinator leaseCoordinator;
private final Function<StreamIdentifier, ShardSyncTaskManager> shardSyncTaskManagerProvider;
private final Map<StreamIdentifier, ShardSyncTaskManager> streamToShardSyncTaskManagerMap = new HashMap<>();
private final Function<StreamConfig, ShardSyncTaskManager> shardSyncTaskManagerProvider;
private final Map<StreamConfig, ShardSyncTaskManager> streamToShardSyncTaskManagerMap = new HashMap<>();
private final PeriodicShardSyncManager leaderElectedPeriodicShardSyncManager;
private final ShardPrioritization shardPrioritization;
private final boolean cleanupLeasesUponShardCompletion;
@ -131,11 +141,13 @@ public class Scheduler implements Runnable {
private final long failoverTimeMillis;
private final long taskBackoffTimeMillis;
private final boolean isMultiStreamMode;
// TODO : halo : make sure we generate streamConfig if entry not present.
private final Map<StreamIdentifier, StreamConfig> currentStreamConfigMap;
private MultiStreamTracker multiStreamTracker;
private final long listShardsBackoffTimeMillis;
private final int maxListShardsRetryAttempts;
private final LeaseRefresher leaseRefresher;
private final Function<StreamIdentifier, ShardDetector> shardDetectorProvider;
private final Function<StreamConfig, ShardDetector> shardDetectorProvider;
private final boolean ignoreUnexpetedChildShards;
private final AggregatorUtil aggregatorUtil;
private final HierarchicalShardSyncer hierarchicalShardSyncer;
@ -152,6 +164,9 @@ public class Scheduler implements Runnable {
private final Object lock = new Object();
private Stopwatch streamSyncWatch = Stopwatch.createUnstarted();
private boolean leasesSyncedOnAppInit = false;
/**
* Used to ensure that only one requestedShutdown is in progress at a time.
*/
@ -195,9 +210,11 @@ public class Scheduler implements Runnable {
this.isMultiStreamMode = this.retrievalConfig.appStreamTracker().map(
multiStreamTracker -> true, streamConfig -> false);
this.currentStreamConfigMap = this.retrievalConfig.appStreamTracker().map(
multiStreamTracker ->
multiStreamTracker.streamConfigList().stream()
.collect(Collectors.toMap(sc -> sc.streamIdentifier(), sc -> sc)),
multiStreamTracker -> {
this.multiStreamTracker = multiStreamTracker;
return multiStreamTracker.streamConfigList().stream()
.collect(Collectors.toMap(sc -> sc.streamIdentifier(), sc -> sc));
},
streamConfig ->
Collections.singletonMap(streamConfig.streamIdentifier(), streamConfig));
this.maxInitializationAttempts = this.coordinatorConfig.maxInitializationAttempts();
@ -227,9 +244,9 @@ public class Scheduler implements Runnable {
this.diagnosticEventHandler = new DiagnosticEventLogger();
// TODO : Halo : Handle case of no StreamConfig present in streamConfigList() for the supplied streamName.
// TODO : Pass the immutable map here instead of using mst.streamConfigList()
this.shardSyncTaskManagerProvider = streamIdentifier -> this.leaseManagementConfig
this.shardSyncTaskManagerProvider = streamConfig -> this.leaseManagementConfig
.leaseManagementFactory(leaseSerializer, isMultiStreamMode)
.createShardSyncTaskManager(this.metricsFactory, this.currentStreamConfigMap.get(streamIdentifier));
.createShardSyncTaskManager(this.metricsFactory, streamConfig);
this.shardPrioritization = this.coordinatorConfig.shardPrioritization();
this.cleanupLeasesUponShardCompletion = this.leaseManagementConfig.cleanupLeasesUponShardCompletion();
this.skipShardSyncAtWorkerInitializationIfLeasesExist =
@ -254,14 +271,15 @@ public class Scheduler implements Runnable {
// this.maxGetRecordsThreadPool = this.retrievalConfig.maxGetRecordsThreadPool();
this.listShardsBackoffTimeMillis = this.retrievalConfig.listShardsBackoffTimeInMillis();
this.maxListShardsRetryAttempts = this.retrievalConfig.maxListShardsRetryAttempts();
this.shardDetectorProvider = streamIdentifier -> createOrGetShardSyncTaskManager(streamIdentifier).shardDetector();
this.shardDetectorProvider = streamConfig -> createOrGetShardSyncTaskManager(streamConfig).shardDetector();
this.ignoreUnexpetedChildShards = this.leaseManagementConfig.ignoreUnexpectedChildShards();
this.aggregatorUtil = this.lifecycleConfig.aggregatorUtil();
// TODO : Halo : Check if this needs to be per stream.
this.hierarchicalShardSyncer = leaseManagementConfig.hierarchicalShardSyncer(isMultiStreamMode);
this.schedulerInitializationBackoffTimeMillis = this.coordinatorConfig.schedulerInitializationBackoffTimeMillis();
this.leaderElectedPeriodicShardSyncManager = new PeriodicShardSyncManager(leaseManagementConfig.workerIdentifier(),
leaderDecider, streamToShardSyncTaskManagerMap);
this.leaderElectedPeriodicShardSyncManager = new PeriodicShardSyncManager(
leaseManagementConfig.workerIdentifier(), leaderDecider, currentStreamConfigMap,
shardSyncTaskManagerProvider);
}
/**
@ -303,16 +321,10 @@ public class Scheduler implements Runnable {
log.info("Initializing LeaseCoordinator");
leaseCoordinator.initialize();
TaskResult result;
if (!skipShardSyncAtWorkerInitializationIfLeasesExist || leaseRefresher.isLeaseTableEmpty()) {
// TODO: Resume the shard sync from failed stream in the next attempt, to avoid syncing
// TODO: for already synced streams
if (shouldInitiateLeaseSync()) {
log.info("Worker {} is initiating the lease sync.", leaseManagementConfig.workerIdentifier());
for (Map.Entry<StreamIdentifier, StreamConfig> streamConfigEntry : currentStreamConfigMap.entrySet()) {
final StreamIdentifier streamIdentifier = streamConfigEntry.getKey();
createOrGetShardSyncTaskManager(streamIdentifier);
log.info("Creating shard sync task for " + streamIdentifier);
}
leaderElectedPeriodicShardSyncManager.syncShardsOnce();
}
} else {
@ -332,6 +344,7 @@ public class Scheduler implements Runnable {
// TODO: enable periodicShardSync after https://github.com/jushkem/amazon-kinesis-client/pull/2 is merged
// TODO: Determine if waitUntilHashRangeCovered() is needed.
//waitUntilHashRangeCovered();
streamSyncWatch.start();
isDone = true;
} catch (LeasingException e) {
log.error("Caught exception when initializing LeaseCoordinator", e);
@ -404,14 +417,20 @@ public class Scheduler implements Runnable {
for (ShardInfo completedShard : completedShards) {
final StreamIdentifier streamIdentifier = getStreamIdentifier(completedShard.streamIdentifierSerOpt());
if (createOrGetShardSyncTaskManager(streamIdentifier).syncShardAndLeaseInfo()) {
log.info("Found completed shard, initiated new ShardSyncTak for " + completedShard.toString());
final StreamConfig streamConfig = currentStreamConfigMap
.getOrDefault(streamIdentifier, getDefaultStreamConfig(streamIdentifier));
if (createOrGetShardSyncTaskManager(streamConfig).syncShardAndLeaseInfo()) {
log.info("{} : Found completed shard, initiated new ShardSyncTak for {} ",
streamIdentifier.serialize(), completedShard.toString());
}
}
// clean up shard consumers for unassigned shards
cleanupShardConsumers(assignedShards);
// check for new streams and sync with the scheduler state
checkAndSyncStreamShardsAndLeases();
logExecutorState();
slog.info("Sleeping ...");
Thread.sleep(shardConsumerDispatchPollIntervalMillis);
@ -427,6 +446,82 @@ public class Scheduler implements Runnable {
slog.resetInfoLogging();
}
/**
* Note: This method has package level access solely for testing purposes.
* Sync all streams method.
* @return streams that are being synced by this worker
*/
@VisibleForTesting
Set<StreamIdentifier> checkAndSyncStreamShardsAndLeases()
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
final Set<StreamIdentifier> streamsSynced = new HashSet<>();
if (shouldSyncStreamsNow()) {
final Map<StreamIdentifier, StreamConfig> newStreamConfigMap = new HashMap<>();
// Making an immutable copy
newStreamConfigMap.putAll(multiStreamTracker.streamConfigList().stream()
.collect(Collectors.toMap(sc -> sc.streamIdentifier(), sc -> sc)));
// This is done to ensure that we clean up the stale streams lingering in the lease table.
syncStreamsFromLeaseTableOnAppInit();
for (StreamIdentifier streamIdentifier : newStreamConfigMap.keySet()) {
if (!currentStreamConfigMap.containsKey(streamIdentifier)) {
log.info("Found new stream to process: " + streamIdentifier + ". Syncing shards of that stream.");
ShardSyncTaskManager shardSyncTaskManager = createOrGetShardSyncTaskManager(newStreamConfigMap.get(streamIdentifier));
shardSyncTaskManager.syncShardAndLeaseInfo();
currentStreamConfigMap.put(streamIdentifier, newStreamConfigMap.get(streamIdentifier));
streamsSynced.add(streamIdentifier);
} else {
if (log.isDebugEnabled()) {
log.debug(streamIdentifier + " is already being processed - skipping shard sync.");
}
}
}
// TODO: Remove assumption that each Worker gets the full list of streams
Iterator<StreamIdentifier> currentStreamConfigIter = currentStreamConfigMap.keySet().iterator();
while (currentStreamConfigIter.hasNext()) {
StreamIdentifier streamIdentifier = currentStreamConfigIter.next();
if (!newStreamConfigMap.containsKey(streamIdentifier)) {
log.info("Found old/deleted stream: " + streamIdentifier + ". Syncing shards of that stream.");
ShardSyncTaskManager shardSyncTaskManager = createOrGetShardSyncTaskManager(currentStreamConfigMap.get(streamIdentifier));
shardSyncTaskManager.syncShardAndLeaseInfo();
currentStreamConfigIter.remove();
streamsSynced.add(streamIdentifier);
}
}
streamSyncWatch.reset().start();
}
return streamsSynced;
}
@VisibleForTesting
boolean shouldSyncStreamsNow() {
return isMultiStreamMode && (streamSyncWatch.elapsed(TimeUnit.MILLISECONDS) > NEW_STREAM_CHECK_INTERVAL_MILLIS);
}
private void syncStreamsFromLeaseTableOnAppInit()
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
if (!leasesSyncedOnAppInit && isMultiStreamMode) {
final Set<StreamIdentifier> streamIdentifiers = leaseCoordinator.leaseRefresher().listLeases().stream()
.map(lease -> StreamIdentifier.multiStreamInstance(((MultiStreamLease) lease).streamIdentifier()))
.collect(Collectors.toSet());
for (StreamIdentifier streamIdentifier : streamIdentifiers) {
if (!currentStreamConfigMap.containsKey(streamIdentifier)) {
currentStreamConfigMap.put(streamIdentifier, getDefaultStreamConfig(streamIdentifier));
}
}
leasesSyncedOnAppInit = true;
}
}
// When a stream is no longer needed to be tracked, return a default StreamConfig with LATEST for faster shard end.
private StreamConfig getDefaultStreamConfig(StreamIdentifier streamIdentifier) {
return new StreamConfig(streamIdentifier, InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST));
}
/**
* Returns whether worker can shutdown immediately. Note that this method is called from Worker's {{@link #run()}
* method before every loop run, so method must do minimum amount of work to not impact shard processing timings.
@ -629,7 +724,8 @@ public class Scheduler implements Runnable {
if (!firstItem) {
builder.append(", ");
}
builder.append(shardInfo.shardId());
builder.append(shardInfo.streamIdentifierSerOpt().map(s -> s + ":" + shardInfo.shardId())
.orElse(shardInfo.shardId()));
firstItem = false;
}
slog.info("Current stream shard assignments: " + builder.toString());
@ -665,8 +761,8 @@ public class Scheduler implements Runnable {
return consumer;
}
private ShardSyncTaskManager createOrGetShardSyncTaskManager(StreamIdentifier streamIdentifier) {
return streamToShardSyncTaskManagerMap.computeIfAbsent(streamIdentifier, s -> shardSyncTaskManagerProvider.apply(s));
private ShardSyncTaskManager createOrGetShardSyncTaskManager(StreamConfig streamConfig) {
return streamToShardSyncTaskManagerMap.computeIfAbsent(streamConfig, s -> shardSyncTaskManagerProvider.apply(s));
}
protected ShardConsumer buildConsumer(@NonNull final ShardInfo shardInfo,
@ -678,8 +774,10 @@ public class Scheduler implements Runnable {
// get the default stream name for the single stream application.
final StreamIdentifier streamIdentifier = getStreamIdentifier(shardInfo.streamIdentifierSerOpt());
// Irrespective of single stream app or multi stream app, streamConfig should always be available.
final StreamConfig streamConfig = currentStreamConfigMap.get(streamIdentifier);
Validate.notNull(streamConfig, "StreamConfig should not be empty");
// If we have a shardInfo, that is not present in currentStreamConfigMap for whatever reason, then return default stream config
// to gracefully complete the reading.
final StreamConfig streamConfig = currentStreamConfigMap.getOrDefault(streamIdentifier, getDefaultStreamConfig(streamIdentifier));
Validate.notNull(streamConfig, "StreamConfig should not be null");
ShardConsumerArgument argument = new ShardConsumerArgument(shardInfo,
streamConfig.streamIdentifier(),
leaseCoordinator,
@ -698,7 +796,7 @@ public class Scheduler implements Runnable {
streamConfig.initialPositionInStreamExtended(),
cleanupLeasesUponShardCompletion,
ignoreUnexpetedChildShards,
shardDetectorProvider.apply(streamConfig.streamIdentifier()),
shardDetectorProvider.apply(streamConfig),
aggregatorUtil,
hierarchicalShardSyncer,
metricsFactory);
@ -723,7 +821,8 @@ public class Scheduler implements Runnable {
ShardConsumer consumer = shardInfoShardConsumerMap.get(shard);
if (consumer.leaseLost()) {
shardInfoShardConsumerMap.remove(shard);
log.debug("Removed consumer for {} as lease has been lost", shard.shardId());
log.debug("Removed consumer for {} as lease has been lost",
shard.streamIdentifierSerOpt().map(s -> s + ":" + shard.shardId()).orElse(shard.shardId()));
} else {
consumer.executeLifecycle();
}

View file

@ -68,6 +68,8 @@ public class HierarchicalShardSyncer {
private final boolean isMultiStreamMode;
private String streamIdentifier = "";
public HierarchicalShardSyncer() {
isMultiStreamMode = false;
}
@ -104,6 +106,7 @@ public class HierarchicalShardSyncer {
final MetricsScope scope, final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards,
final boolean garbageCollectLeases, final boolean isLeaseTableEmpty)
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
this.streamIdentifier = shardDetector.streamIdentifier().serialize();
final List<Shard> latestShards = isLeaseTableEmpty ?
getShardListAtInitialPosition(shardDetector, initialPosition) : getShardList(shardDetector);
checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, latestShards, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, garbageCollectLeases,
@ -117,10 +120,11 @@ public class HierarchicalShardSyncer {
final MetricsScope scope, final boolean garbageCollectLeases, final boolean isLeaseTableEmpty)
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
this.streamIdentifier = shardDetector.streamIdentifier().serialize();
//TODO: Need to add multistream support for this https://sim.amazon.com/issues/KinesisLTR-191
if (!CollectionUtils.isNullOrEmpty(latestShards)) {
log.debug("Num shards: {}", latestShards.size());
log.debug("{} - Num shards: {}", streamIdentifier, latestShards.size());
}
final Map<String, Shard> shardIdToShardMap = constructShardIdToShardMap(latestShards);
@ -138,7 +142,7 @@ public class HierarchicalShardSyncer {
new NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap);
final List<Lease> newLeasesToCreate = determineNewLeasesToCreate(leaseSynchronizer, latestShards, currentLeases,
initialPosition, inconsistentShardIds, multiStreamArgs);
log.debug("Num new leases to create: {}", newLeasesToCreate.size());
log.debug("{} - Num new leases to create: {}", streamIdentifier, newLeasesToCreate.size());
for (Lease lease : newLeasesToCreate) {
long startTime = System.currentTimeMillis();
boolean success = false;
@ -231,7 +235,7 @@ public class HierarchicalShardSyncer {
for (String shardId : shardIdsOfClosedShards) {
final Shard shard = shardIdToShardMap.get(shardId);
if (shard == null) {
log.info("Shard {} is not present in Kinesis anymore.", shardId);
log.info("{} : Shard {} is not present in Kinesis anymore.", streamIdentifier, shardId);
continue;
}
@ -426,7 +430,7 @@ public class HierarchicalShardSyncer {
final Map<String, Shard> shardIdToShardMapOfAllKinesisShards,
final Map<String, Lease> shardIdToLeaseMapOfNewShards, final Map<String, Boolean> memoizationContext,
final MultiStreamArgs multiStreamArgs) {
final String streamIdentifier = getStreamIdentifier(multiStreamArgs);
final Boolean previousValue = memoizationContext.get(shardId);
if (previousValue != null) {
return previousValue;
@ -451,9 +455,9 @@ public class HierarchicalShardSyncer {
memoizationContext, multiStreamArgs)) {
isDescendant = true;
descendantParentShardIds.add(parentShardId);
log.debug("Parent shard {} is a descendant.", parentShardId);
log.debug("{} : Parent shard {} is a descendant.", streamIdentifier, parentShardId);
} else {
log.debug("Parent shard {} is NOT a descendant.", parentShardId);
log.debug("{} : Parent shard {} is NOT a descendant.", streamIdentifier, parentShardId);
}
}
@ -461,7 +465,7 @@ public class HierarchicalShardSyncer {
if (isDescendant) {
for (String parentShardId : parentShardIds) {
if (!shardIdsOfCurrentLeases.contains(parentShardId)) {
log.debug("Need to create a lease for shardId {}", parentShardId);
log.debug("{} : Need to create a lease for shardId {}", streamIdentifier, parentShardId);
Lease lease = shardIdToLeaseMapOfNewShards.get(parentShardId);
if (lease == null) {
lease = multiStreamArgs.isMultiStreamMode() ?
@ -555,6 +559,7 @@ public class HierarchicalShardSyncer {
final List<Lease> trackedLeases, final LeaseRefresher leaseRefresher,
final MultiStreamArgs multiStreamArgs) throws KinesisClientLibIOException,
DependencyException, InvalidStateException, ProvisionedThroughputException {
final String streamIdentifier = getStreamIdentifier(multiStreamArgs);
final Set<String> kinesisShards = shards.stream().map(Shard::shardId).collect(Collectors.toSet());
// Check if there are leases for non-existent shards
@ -562,14 +567,15 @@ public class HierarchicalShardSyncer {
.filter(lease -> isCandidateForCleanup(lease, kinesisShards, multiStreamArgs)).collect(Collectors.toList());
if (!CollectionUtils.isNullOrEmpty(garbageLeases)) {
log.info("Found {} candidate leases for cleanup. Refreshing list of"
+ " Kinesis shards to pick up recent/latest shards", garbageLeases.size());
log.info("{} : Found {} candidate leases for cleanup. Refreshing list of"
+ " Kinesis shards to pick up recent/latest shards", streamIdentifier, garbageLeases.size());
final Set<String> currentKinesisShardIds = getShardList(shardDetector).stream().map(Shard::shardId)
.collect(Collectors.toSet());
for (Lease lease : garbageLeases) {
if (isCandidateForCleanup(lease, currentKinesisShardIds, multiStreamArgs)) {
log.info("Deleting lease for shard {} as it is not present in Kinesis stream.", lease.leaseKey());
log.info("{} : Deleting lease for shard {} as it is not present in Kinesis stream.",
streamIdentifier, lease.leaseKey());
leaseRefresher.deleteLease(lease);
}
}
@ -589,14 +595,16 @@ public class HierarchicalShardSyncer {
static boolean isCandidateForCleanup(final Lease lease, final Set<String> currentKinesisShardIds,
final MultiStreamArgs multiStreamArgs)
throws KinesisClientLibIOException {
boolean isCandidateForCleanup = true;
final String streamIdentifier = getStreamIdentifier(multiStreamArgs);
boolean isCandidateForCleanup = true;
final String shardId = shardIdFromLeaseDeducer.apply(lease, multiStreamArgs);
if (currentKinesisShardIds.contains(shardId)) {
isCandidateForCleanup = false;
} else {
log.info("Found lease for non-existent shard: {}. Checking its parent shards", shardId);
log.info("{} : Found lease for non-existent shard: {}. Checking its parent shards", streamIdentifier, shardId);
final Set<String> parentShardIds = lease.parentShardIds();
for (String parentShardId : parentShardIds) {
@ -605,7 +613,7 @@ public class HierarchicalShardSyncer {
if (currentKinesisShardIds.contains(parentShardId)) {
final String message = String.format("Parent shard %s exists but not the child shard %s",
parentShardId, shardId);
log.info(message);
log.info("{} : {}", streamIdentifier, message);
throw new KinesisClientLibIOException(message);
}
}
@ -693,8 +701,8 @@ public class HierarchicalShardSyncer {
}
if (okayToDelete) {
log.info("Deleting lease for shard {} as it has been completely processed and processing of child "
+ "shards has begun.", shardIdFromLeaseDeducer.apply(leaseForClosedShard, multiStreamArgs));
log.info("{} : Deleting lease for shard {} as it has been completely processed and processing of child "
+ "shards has begun.", streamIdentifier, shardIdFromLeaseDeducer.apply(leaseForClosedShard, multiStreamArgs));
leaseRefresher.deleteLease(leaseForClosedShard);
}
}
@ -757,9 +765,9 @@ public class HierarchicalShardSyncer {
* @param allShards All shards returved via DescribeStream. We assume this to represent a consistent shard list.
* @return List of open shards (shards at the tip of the stream) - may include shards that are not yet active.
*/
static List<Shard> getOpenShards(final List<Shard> allShards) {
static List<Shard> getOpenShards(final List<Shard> allShards, final String streamIdentifier) {
return allShards.stream().filter(shard -> shard.sequenceNumberRange().endingSequenceNumber() == null)
.peek(shard -> log.debug("Found open shard: {}", shard.shardId())).collect(Collectors.toList());
.peek(shard -> log.debug("{} : Found open shard: {}", streamIdentifier, shard.shardId())).collect(Collectors.toList());
}
private static ExtendedSequenceNumber convertToCheckpoint(final InitialPositionInStreamExtended position) {
@ -775,7 +783,12 @@ public class HierarchicalShardSyncer {
return checkpoint;
}
private static String getStreamIdentifier(MultiStreamArgs multiStreamArgs) {
return Optional.ofNullable(multiStreamArgs.streamIdentifier())
.map(streamId -> streamId.serialize()).orElse("single_stream_mode");
}
/** Helper class to compare leases based on starting sequence number of the corresponding shards.
*
*/
@ -868,13 +881,15 @@ public class HierarchicalShardSyncer {
@Override
public List<Lease> determineNewLeasesToCreate(List<Shard> shards, List<Lease> currentLeases,
InitialPositionInStreamExtended initialPosition, Set<String> inconsistentShardIds, MultiStreamArgs multiStreamArgs) {
final String streamIdentifier = Optional.ofNullable(multiStreamArgs.streamIdentifier())
.map(streamId -> streamId.serialize()).orElse("");
final Map<String, Shard> shardIdToShardMapOfAllKinesisShards = constructShardIdToShardMap(shards);
currentLeases.stream().peek(lease -> log.debug("Existing lease: {}", lease))
currentLeases.stream().peek(lease -> log.debug("{} : Existing lease: {}", streamIdentifier, lease))
.map(lease -> shardIdFromLeaseDeducer.apply(lease, multiStreamArgs))
.collect(Collectors.toSet());
final List<Lease> newLeasesToCreate = getLeasesToCreateForOpenAndClosedShards(initialPosition, shards, multiStreamArgs);
final List<Lease> newLeasesToCreate = getLeasesToCreateForOpenAndClosedShards(initialPosition, shards, multiStreamArgs, streamIdentifier);
//TODO: Verify before LTR launch that ending sequence number is still returned from the service.
final Comparator<Lease> startingSequenceNumberComparator =
@ -889,7 +904,7 @@ public class HierarchicalShardSyncer {
* reaching SHARD_END.
*/
private List<Lease> getLeasesToCreateForOpenAndClosedShards(InitialPositionInStreamExtended initialPosition,
List<Shard> shards, MultiStreamArgs multiStreamArgs) {
List<Shard> shards, MultiStreamArgs multiStreamArgs, String streamId) {
final Map<String, Lease> shardIdToNewLeaseMap = new HashMap<>();
for (Shard shard : shards) {
@ -898,7 +913,7 @@ public class HierarchicalShardSyncer {
newKCLMultiStreamLease(shard, multiStreamArgs.streamIdentifier) : newKCLLease(shard);
lease.checkpoint(convertToCheckpoint(initialPosition));
log.debug("Need to create a lease for shard with shardId {}", shardId);
log.debug("{} : Need to create a lease for shard with shardId {}", streamId, shardId);
shardIdToNewLeaseMap.put(shardId, lease);
}
@ -961,29 +976,31 @@ public class HierarchicalShardSyncer {
* @return List of new leases to create sorted by starting sequenceNumber of the corresponding shard
*/
@Override
public synchronized List<Lease> determineNewLeasesToCreate(List<Shard> shards, List<Lease> currentLeases,
InitialPositionInStreamExtended initialPosition, Set<String> inconsistentShardIds, MultiStreamArgs multiStreamArgs) {
public synchronized List<Lease> determineNewLeasesToCreate(final List<Shard> shards, final List<Lease> currentLeases,
final InitialPositionInStreamExtended initialPosition, final Set<String> inconsistentShardIds,
final MultiStreamArgs multiStreamArgs) {
final Map<String, Lease> shardIdToNewLeaseMap = new HashMap<>();
final Map<String, Shard> shardIdToShardMapOfAllKinesisShards = constructShardIdToShardMap(shards);
final String streamIdentifier = Optional.ofNullable(multiStreamArgs.streamIdentifier())
.map(streamId -> streamId.serialize()).orElse("");
final Set<String> shardIdsOfCurrentLeases = currentLeases.stream()
.peek(lease -> log.debug("Existing lease: {}", lease))
.peek(lease -> log.debug("{} : Existing lease: {}", streamIdentifier, lease))
.map(lease -> shardIdFromLeaseDeducer.apply(lease, multiStreamArgs))
.collect(Collectors.toSet());
final List<Shard> openShards = getOpenShards(shards);
final List<Shard> openShards = getOpenShards(shards, streamIdentifier);
final Map<String, Boolean> memoizationContext = new HashMap<>();
// Iterate over the open shards and find those that don't have any lease entries.
for (Shard shard : openShards) {
final String shardId = shard.shardId();
log.debug("Evaluating leases for open shard {} and its ancestors.", shardId);
log.debug("{} : Evaluating leases for open shard {} and its ancestors.", streamIdentifier, shardId);
if (shardIdsOfCurrentLeases.contains(shardId)) {
log.debug("Lease for shardId {} already exists. Not creating a lease", shardId);
log.debug("{} : Lease for shardId {} already exists. Not creating a lease", streamIdentifier, shardId);
} else if (inconsistentShardIds.contains(shardId)) {
log.info("shardId {} is an inconsistent child. Not creating a lease", shardId);
log.info("{} : shardId {} is an inconsistent child. Not creating a lease", streamIdentifier, shardId);
} else {
log.debug("Need to create a lease for shardId {}", shardId);
log.debug("{} : Need to create a lease for shardId {}", streamIdentifier, shardId);
final Lease newLease = multiStreamArgs.isMultiStreamMode() ?
newKCLMultiStreamLease(shard, multiStreamArgs.streamIdentifier()) :
newKCLLease(shard);
@ -1020,13 +1037,12 @@ public class HierarchicalShardSyncer {
} else {
newLease.checkpoint(convertToCheckpoint(initialPosition));
}
log.debug("Set checkpoint of {} to {}", newLease.leaseKey(), newLease.checkpoint());
log.debug("{} : Set checkpoint of {} to {}", streamIdentifier, newLease.leaseKey(), newLease.checkpoint());
shardIdToNewLeaseMap.put(shardId, newLease);
}
}
final List<Lease> newLeasesToCreate = new ArrayList<>(shardIdToNewLeaseMap.values());
//TODO: Verify before LTR launch that ending sequence number is still returned from the service.
final Comparator<Lease> startingSequenceNumberComparator = new StartingSequenceNumberAndShardIdBasedComparator(
shardIdToShardMapOfAllKinesisShards, multiStreamArgs);
newLeasesToCreate.sort(startingSequenceNumberComparator);

View file

@ -54,7 +54,8 @@ public class BlockOnParentShardTask implements ConsumerTask {
@Override
public TaskResult call() {
Exception exception = null;
final String shardInfoId = shardInfo.streamIdentifierSerOpt().map(s -> s + ":" + shardInfo.shardId())
.orElse(shardInfo.shardId());
try {
boolean blockedOnParentShard = false;
for (String shardId : shardInfo.parentShardIds()) {
@ -62,20 +63,20 @@ public class BlockOnParentShardTask implements ConsumerTask {
if (lease != null) {
ExtendedSequenceNumber checkpoint = lease.checkpoint();
if ((checkpoint == null) || (!checkpoint.equals(ExtendedSequenceNumber.SHARD_END))) {
log.debug("Shard {} is not yet done. Its current checkpoint is {}", shardId, checkpoint);
log.debug("Shard {} is not yet done. Its current checkpoint is {}", shardInfoId, checkpoint);
blockedOnParentShard = true;
exception = new BlockedOnParentShardException("Parent shard not yet done");
break;
} else {
log.debug("Shard {} has been completely processed.", shardId);
log.debug("Shard {} has been completely processed.", shardInfoId);
}
} else {
log.info("No lease found for shard {}. Not blocking on completion of this shard.", shardId);
log.info("No lease found for shard {}. Not blocking on completion of this shard.", shardInfoId);
}
}
if (!blockedOnParentShard) {
log.info("No need to block on parents {} of shard {}", shardInfo.parentShardIds(), shardInfo.shardId());
log.info("No need to block on parents {} of shard {}", shardInfo.parentShardIds(), shardInfoId);
return new TaskResult(null);
}
} catch (Exception e) {
@ -85,7 +86,7 @@ public class BlockOnParentShardTask implements ConsumerTask {
try {
Thread.sleep(parentShardPollIntervalMillis);
} catch (InterruptedException e) {
log.error("Sleep interrupted when waiting on parent shard(s) of {}", shardInfo.shardId(), e);
log.error("Sleep interrupted when waiting on parent shard(s) of {}", shardInfoId, e);
}
return new TaskResult(exception);

View file

@ -60,6 +60,7 @@ public class ProcessTask implements ConsumerTask {
private final ProcessRecordsInput processRecordsInput;
private final MetricsFactory metricsFactory;
private final AggregatorUtil aggregatorUtil;
private final String shardInfoId;
public ProcessTask(@NonNull ShardInfo shardInfo,
@NonNull ShardRecordProcessor shardRecordProcessor,
@ -74,6 +75,8 @@ public class ProcessTask implements ConsumerTask {
@NonNull AggregatorUtil aggregatorUtil,
@NonNull MetricsFactory metricsFactory) {
this.shardInfo = shardInfo;
this.shardInfoId = shardInfo.streamIdentifierSerOpt().map(s -> s + ":" + shardInfo.shardId())
.orElse(shardInfo.shardId());
this.shardRecordProcessor = shardRecordProcessor;
this.recordProcessorCheckpointer = recordProcessorCheckpointer;
this.backoffTimeMillis = backoffTimeMillis;
@ -121,7 +124,7 @@ public class ProcessTask implements ConsumerTask {
}
if (processRecordsInput.isAtShardEnd() && processRecordsInput.records().isEmpty()) {
log.info("Reached end of shard {} and have no records to process", shardInfo.shardId());
log.info("Reached end of shard {} and have no records to process", shardInfoId);
return new TaskResult(null, true);
}
@ -142,13 +145,13 @@ public class ProcessTask implements ConsumerTask {
}
success = true;
} catch (RuntimeException e) {
log.error("ShardId {}: Caught exception: ", shardInfo.shardId(), e);
log.error("ShardId {}: Caught exception: ", shardInfoId, e);
exception = e;
backoff();
}
if (processRecordsInput.isAtShardEnd()) {
log.info("Reached end of shard {}, and processed {} records", shardInfo.shardId(), processRecordsInput.records().size());
log.info("Reached end of shard {}, and processed {} records", shardInfoId, processRecordsInput.records().size());
return new TaskResult(null, true);
}
return new TaskResult(exception);
@ -174,7 +177,7 @@ public class ProcessTask implements ConsumerTask {
try {
Thread.sleep(this.backoffTimeMillis);
} catch (InterruptedException ie) {
log.debug("{}: Sleep was interrupted", shardInfo.shardId(), ie);
log.debug("{}: Sleep was interrupted", shardInfoId, ie);
}
}
@ -188,7 +191,7 @@ public class ProcessTask implements ConsumerTask {
*/
private void callProcessRecords(ProcessRecordsInput input, List<KinesisClientRecord> records) {
log.debug("Calling application processRecords() with {} records from {}", records.size(),
shardInfo.shardId());
shardInfoId);
final ProcessRecordsInput processRecordsInput = ProcessRecordsInput.builder().records(records).cacheExitTime(input.cacheExitTime()).cacheEntryTime(input.cacheEntryTime())
.checkpointer(recordProcessorCheckpointer).millisBehindLatest(input.millisBehindLatest()).build();
@ -200,8 +203,8 @@ public class ProcessTask implements ConsumerTask {
shardRecordProcessor.processRecords(processRecordsInput);
} catch (Exception e) {
log.error("ShardId {}: Application processRecords() threw an exception when processing shard ",
shardInfo.shardId(), e);
log.error("ShardId {}: Skipping over the following data records: {}", shardInfo.shardId(), records);
shardInfoId, e);
log.error("ShardId {}: Skipping over the following data records: {}", shardInfoId, records);
} finally {
MetricsUtil.addLatency(scope, RECORD_PROCESSOR_PROCESS_RECORDS_METRIC, startTime, MetricsLevel.SUMMARY);
MetricsUtil.endScope(scope);
@ -226,17 +229,17 @@ public class ProcessTask implements ConsumerTask {
* the time when the task started
*/
private void handleNoRecords(long startTimeMillis) {
log.debug("Kinesis didn't return any records for shard {}", shardInfo.shardId());
log.debug("Kinesis didn't return any records for shard {}", shardInfoId);
long sleepTimeMillis = idleTimeInMilliseconds - (System.currentTimeMillis() - startTimeMillis);
if (sleepTimeMillis > 0) {
sleepTimeMillis = Math.max(sleepTimeMillis, idleTimeInMilliseconds);
try {
log.debug("Sleeping for {} ms since there were no new records in shard {}", sleepTimeMillis,
shardInfo.shardId());
shardInfoId);
Thread.sleep(sleepTimeMillis);
} catch (InterruptedException e) {
log.debug("ShardId {}: Sleep was interrupted", shardInfo.shardId());
log.debug("ShardId {}: Sleep was interrupted", shardInfoId);
}
}
}
@ -273,8 +276,8 @@ public class ProcessTask implements ConsumerTask {
if (extendedSequenceNumber.compareTo(lastCheckpointValue) <= 0) {
recordIterator.remove();
log.debug("removing record with ESN {} because the ESN is <= checkpoint ({})", extendedSequenceNumber,
lastCheckpointValue);
log.debug("{} : removing record with ESN {} because the ESN is <= checkpoint ({})", shardInfoId,
extendedSequenceNumber, lastCheckpointValue);
continue;
}

View file

@ -62,6 +62,7 @@ public class ShardConsumer {
private final Function<ConsumerTask, ConsumerTask> taskMetricsDecorator;
private final int bufferSize;
private final TaskExecutionListener taskExecutionListener;
private final String streamIdentifier;
private ConsumerTask currentTask;
private TaskOutcome taskOutcome;
@ -124,6 +125,7 @@ public class ShardConsumer {
this.recordsPublisher = recordsPublisher;
this.executorService = executorService;
this.shardInfo = shardInfo;
this.streamIdentifier = shardInfo.streamIdentifierSerOpt().orElse("single_stream_mode");
this.shardConsumerArgument = shardConsumerArgument;
this.logWarningForTaskAfterMillis = logWarningForTaskAfterMillis;
this.taskExecutionListener = taskExecutionListener;
@ -208,8 +210,8 @@ public class ShardConsumer {
}
Throwable dispatchFailure = subscriber.getAndResetDispatchFailure();
if (dispatchFailure != null) {
log.warn("Exception occurred while dispatching incoming data. The incoming data has been skipped",
dispatchFailure);
log.warn("{} : Exception occurred while dispatching incoming data. The incoming data has been skipped",
streamIdentifier, dispatchFailure);
return dispatchFailure;
}
@ -238,7 +240,7 @@ public class ShardConsumer {
Instant now = Instant.now();
Duration timeSince = Duration.between(subscriber.lastDataArrival(), now);
if (timeSince.toMillis() > value) {
log.warn("Last time data arrived: {} ({})", lastDataArrival, timeSince);
log.warn("{} : Last time data arrived: {} ({})", streamIdentifier, lastDataArrival, timeSince);
}
}
});
@ -250,11 +252,11 @@ public class ShardConsumer {
if (taken != null) {
String message = longRunningTaskMessage(taken);
if (log.isDebugEnabled()) {
log.debug("{} Not submitting new task.", message);
log.debug("{} : {} Not submitting new task.", streamIdentifier, message);
}
logWarningForTaskAfterMillis.ifPresent(value -> {
if (taken.toMillis() > value) {
log.warn(message);
log.warn("{} : {}", streamIdentifier, message);
}
});
}
@ -358,7 +360,7 @@ public class ShardConsumer {
nextState = currentState.failureTransition();
break;
default:
log.error("No handler for outcome of {}", outcome.name());
log.error("{} : No handler for outcome of {}", streamIdentifier, outcome.name());
nextState = currentState.failureTransition();
break;
}
@ -382,9 +384,9 @@ public class ShardConsumer {
Exception taskException = taskResult.getException();
if (taskException instanceof BlockedOnParentShardException) {
// No need to log the stack trace for this exception (it is very specific).
log.debug("Shard {} is blocked on completion of parent shard.", shardInfo.shardId());
log.debug("{} : Shard {} is blocked on completion of parent shard.", streamIdentifier, shardInfo.shardId());
} else {
log.debug("Caught exception running {} task: ", currentTask.taskType(), taskResult.getException());
log.debug("{} : Caught exception running {} task: ", streamIdentifier, currentTask.taskType(), taskResult.getException());
}
}
}
@ -411,10 +413,10 @@ public class ShardConsumer {
* @return true if shutdown is complete (false if shutdown is still in progress)
*/
public boolean leaseLost() {
log.debug("Shutdown({}): Lease lost triggered.", shardInfo.shardId());
log.debug("{} : Shutdown({}): Lease lost triggered.", streamIdentifier, shardInfo.shardId());
if (subscriber != null) {
subscriber.cancel();
log.debug("Shutdown({}): Subscriber cancelled.", shardInfo.shardId());
log.debug("{} : Shutdown({}): Subscriber cancelled.", streamIdentifier, shardInfo.shardId());
}
markForShutdown(ShutdownReason.LEASE_LOST);
return isShutdown();

View file

@ -40,8 +40,8 @@ class ShardConsumerSubscriber implements Subscriber<RecordsRetrieved> {
private final int bufferSize;
private final ShardConsumer shardConsumer;
private final int readTimeoutsToIgnoreBeforeWarning;
private final String shardInfoId;
private volatile int readTimeoutSinceLastRead = 0;
@VisibleForTesting
final Object lockObject = new Object();
// This holds the last time an attempt of request to upstream service was made including the first try to
@ -70,6 +70,8 @@ class ShardConsumerSubscriber implements Subscriber<RecordsRetrieved> {
this.bufferSize = bufferSize;
this.shardConsumer = shardConsumer;
this.readTimeoutsToIgnoreBeforeWarning = readTimeoutsToIgnoreBeforeWarning;
this.shardInfoId = shardConsumer.shardInfo().streamIdentifierSerOpt()
.map(s -> s + ":" + shardConsumer.shardInfo().shardId()).orElse(shardConsumer.shardInfo().shardId());
}
@ -107,7 +109,7 @@ class ShardConsumerSubscriber implements Subscriber<RecordsRetrieved> {
if (retrievalFailure != null) {
synchronized (lockObject) {
String logMessage = String.format("%s: Failure occurred in retrieval. Restarting data requests",
shardConsumer.shardInfo().shardId());
shardInfoId);
if (retrievalFailure instanceof RetryableRetrievalException) {
log.debug(logMessage, retrievalFailure.getCause());
} else {
@ -130,7 +132,7 @@ class ShardConsumerSubscriber implements Subscriber<RecordsRetrieved> {
if (timeSinceLastResponse.toMillis() > maxTimeBetweenRequests) {
log.error(
"{}: Last request was dispatched at {}, but no response as of {} ({}). Cancelling subscription, and restarting. Last successful request details -- {}",
shardConsumer.shardInfo().shardId(), lastRequestTime, now, timeSinceLastResponse, recordsPublisher.getLastSuccessfulRequestDetails());
shardInfoId, lastRequestTime, now, timeSinceLastResponse, recordsPublisher.getLastSuccessfulRequestDetails());
cancel();
// Start the subscription again which will update the lastRequestTime as well.
@ -157,7 +159,7 @@ class ShardConsumerSubscriber implements Subscriber<RecordsRetrieved> {
subscription);
} catch (Throwable t) {
log.warn("{}: Caught exception from handleInput", shardConsumer.shardInfo().shardId(), t);
log.warn("{}: Caught exception from handleInput", shardInfoId, t);
synchronized (lockObject) {
dispatchFailure = t;
}
@ -193,7 +195,7 @@ class ShardConsumerSubscriber implements Subscriber<RecordsRetrieved> {
log.warn(
"{}: onError(). Cancelling subscription, and marking self as failed. KCL will "
+ "recreate the subscription as neccessary to continue processing.",
shardConsumer.shardInfo().shardId(), t);
shardInfoId, t);
}
protected void logOnErrorReadTimeoutWarning(Throwable t) {
@ -202,13 +204,13 @@ class ShardConsumerSubscriber implements Subscriber<RecordsRetrieved> {
+ "are seeing this warning frequently consider increasing the SDK timeouts "
+ "by providing an OverrideConfiguration to the kinesis client. Alternatively you"
+ "can configure LifecycleConfig.readTimeoutsToIgnoreBeforeWarning to suppress"
+ "intermittant ReadTimeout warnings.", shardConsumer.shardInfo().shardId(), t);
+ "intermittant ReadTimeout warnings.", shardInfoId, t);
}
@Override
public void onComplete() {
log.debug("{}: onComplete(): Received onComplete. Activity should be triggered externally",
shardConsumer.shardInfo().shardId());
shardInfoId);
}
public void cancel() {

View file

@ -41,6 +41,7 @@ import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import java.util.List;
import java.util.function.Function;
/**
* Task for invoking the ShardRecordProcessor shutdown() callback.
@ -66,7 +67,7 @@ public class ShutdownTask implements ConsumerTask {
private final InitialPositionInStreamExtended initialPositionInStream;
private final boolean cleanupLeasesOfCompletedShards;
private final boolean garbageCollectLeases = false;
private final boolean isLeaseTableEmpty= false;
private final boolean isLeaseTableEmpty = false;
private final boolean ignoreUnexpectedChildShards;
@NonNull
private final LeaseCoordinator leaseCoordinator;
@ -80,6 +81,8 @@ public class ShutdownTask implements ConsumerTask {
private final TaskType taskType = TaskType.SHUTDOWN;
private static final Function<ShardInfo, String> shardInfoIdProvider = shardInfo -> shardInfo
.streamIdentifierSerOpt().map(s -> s + ":" + shardInfo.shardId()).orElse(shardInfo.shardId());
/*
* Invokes ShardRecordProcessor shutdown() API.
* (non-Javadoc)
@ -110,7 +113,7 @@ public class ShutdownTask implements ConsumerTask {
if (CollectionUtils.isNullOrEmpty(latestShards) || !isShardInContextParentOfAny(latestShards)) {
localReason = ShutdownReason.LEASE_LOST;
dropLease();
log.info("Forcing the lease to be lost before shutting down the consumer for Shard: " + shardInfo.shardId());
log.info("Forcing the lease to be lost before shutting down the consumer for Shard: " + shardInfoIdProvider.apply(shardInfo));
}
}
@ -122,7 +125,7 @@ public class ShutdownTask implements ConsumerTask {
}
log.debug("Invoking shutdown() for shard {}, concurrencyToken {}. Shutdown reason: {}",
shardInfo.shardId(), shardInfo.concurrencyToken(), localReason);
shardInfoIdProvider.apply(shardInfo), shardInfo.concurrencyToken(), localReason);
final ShutdownInput shutdownInput = ShutdownInput.builder().shutdownReason(localReason)
.checkpointer(recordProcessorCheckpointer).build();
final long startTime = System.currentTimeMillis();
@ -133,7 +136,7 @@ public class ShutdownTask implements ConsumerTask {
if (lastCheckpointValue == null
|| !lastCheckpointValue.equals(ExtendedSequenceNumber.SHARD_END)) {
throw new IllegalArgumentException("Application didn't checkpoint at end of shard "
+ shardInfo.shardId() + ". Application must checkpoint upon shard end. " +
+ shardInfoIdProvider.apply(shardInfo) + ". Application must checkpoint upon shard end. " +
"See ShardRecordProcessor.shardEnded javadocs for more information.");
}
} else {
@ -141,7 +144,7 @@ public class ShutdownTask implements ConsumerTask {
}
log.debug("Shutting down retrieval strategy.");
recordsPublisher.shutdown();
log.debug("Record processor completed shutdown() for shard {}", shardInfo.shardId());
log.debug("Record processor completed shutdown() for shard {}", shardInfoIdProvider.apply(shardInfo));
} catch (Exception e) {
applicationException = true;
throw e;
@ -150,12 +153,12 @@ public class ShutdownTask implements ConsumerTask {
}
if (localReason == ShutdownReason.SHARD_END) {
log.debug("Looking for child shards of shard {}", shardInfo.shardId());
log.debug("Looking for child shards of shard {}", shardInfoIdProvider.apply(shardInfo));
// create leases for the child shards
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseCoordinator.leaseRefresher(),
initialPositionInStream, latestShards, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, garbageCollectLeases,
isLeaseTableEmpty);
log.debug("Finished checking for child shards of shard {}", shardInfo.shardId());
log.debug("Finished checking for child shards of shard {}", shardInfoIdProvider.apply(shardInfo));
}
return new TaskResult(null);

View file

@ -105,7 +105,7 @@ public class RetrievalConfig {
this.applicationName = applicationName;
}
public void initialPositionInStreamExtended(InitialPositionInStreamExtended initialPositionInStreamExtended) {
public RetrievalConfig initialPositionInStreamExtended(InitialPositionInStreamExtended initialPositionInStreamExtended) {
final StreamConfig[] streamConfig = new StreamConfig[1];
this.appStreamTracker.apply(multiStreamTracker -> {
throw new IllegalArgumentException(
@ -113,6 +113,7 @@ public class RetrievalConfig {
}, sc -> streamConfig[0] = sc);
this.appStreamTracker = Either
.right(new StreamConfig(streamConfig[0].streamIdentifier(), initialPositionInStreamExtended));
return this;
}
public RetrievalFactory retrievalFactory() {

View file

@ -51,7 +51,6 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@ -60,7 +59,6 @@ import java.util.stream.Collectors;
import static software.amazon.kinesis.common.DiagnosticUtils.takeDelayedDeliveryActionIfRequired;
@RequiredArgsConstructor
@Slf4j
@KinesisClientInternalApi
public class FanOutRecordsPublisher implements RecordsPublisher {
@ -73,7 +71,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
private final KinesisAsyncClient kinesis;
private final String shardId;
private final String consumerArn;
private final String streamAndShardId;
private final Object lockObject = new Object();
private final AtomicInteger subscribeToShardId = new AtomicInteger(0);
@ -91,11 +89,25 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
private RequestDetails lastSuccessfulRequestDetails = new RequestDetails();
public FanOutRecordsPublisher(KinesisAsyncClient kinesis, String shardId, String consumerArn) {
this.kinesis = kinesis;
this.shardId = shardId;
this.consumerArn = consumerArn;
this.streamAndShardId = shardId;
}
public FanOutRecordsPublisher(KinesisAsyncClient kinesis, String shardId, String consumerArn, String streamIdentifierSer) {
this.kinesis = kinesis;
this.shardId = shardId;
this.consumerArn = consumerArn;
this.streamAndShardId = streamIdentifierSer + ":" + shardId;
}
@Override
public void start(ExtendedSequenceNumber extendedSequenceNumber,
InitialPositionInStreamExtended initialPositionInStreamExtended) {
synchronized (lockObject) {
log.debug("[{}] Initializing Publisher @ Sequence: {} -- Initial Position: {}", shardId,
log.debug("[{}] Initializing Publisher @ Sequence: {} -- Initial Position: {}", streamAndShardId,
extendedSequenceNumber, initialPositionInStreamExtended);
this.initialPositionInStreamExtended = initialPositionInStreamExtended;
this.currentSequenceNumber = extendedSequenceNumber.sequenceNumber();
@ -174,7 +186,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
// It is now safe to remove the element
recordsDeliveryQueue.poll();
// Take action based on the time spent by the event in queue.
takeDelayedDeliveryActionIfRequired(shardId, recordsRetrievedContext.getEnqueueTimestamp(), log);
takeDelayedDeliveryActionIfRequired(streamAndShardId, recordsRetrievedContext.getEnqueueTimestamp(), log);
// Update current sequence number for the successfully delivered event.
currentSequenceNumber = ((FanoutRecordsRetrieved)recordsRetrieved).continuationSequenceNumber();
// Update the triggering flow for post scheduling upstream request.
@ -190,13 +202,12 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
if (flow != null && recordsDeliveryAck.batchUniqueIdentifier().getFlowIdentifier()
.equals(flow.getSubscribeToShardId())) {
log.error(
"{}: Received unexpected ack for the active subscription {}. Throwing. ",
shardId, recordsDeliveryAck.batchUniqueIdentifier().getFlowIdentifier());
"{}: Received unexpected ack for the active subscription {}. Throwing. ", streamAndShardId, recordsDeliveryAck.batchUniqueIdentifier().getFlowIdentifier());
throw new IllegalStateException("Unexpected ack for the active subscription");
}
// Otherwise publisher received a stale ack.
else {
log.info("{}: Publisher received an ack for stale subscription {}. Ignoring.", shardId,
log.info("{}: Publisher received an ack for stale subscription {}. Ignoring.", streamAndShardId,
recordsDeliveryAck.batchUniqueIdentifier().getFlowIdentifier());
}
}
@ -219,10 +230,10 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
} catch (IllegalStateException e) {
log.warn("{}: Unable to enqueue the payload due to capacity restrictions in delivery queue with remaining capacity {}. Last successful request details -- {}",
shardId, recordsDeliveryQueue.remainingCapacity(), lastSuccessfulRequestDetails);
streamAndShardId, recordsDeliveryQueue.remainingCapacity(), lastSuccessfulRequestDetails);
throw e;
} catch (Throwable t) {
log.error("{}: Unable to deliver event to the shard consumer.", shardId, t);
log.error("{}: Unable to deliver event to the shard consumer.", streamAndShardId, t);
throw t;
}
}
@ -290,7 +301,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
String instanceId = shardId + "-" + subscribeInvocationId;
log.debug(
"{}: [SubscriptionLifetime]: (FanOutRecordsPublisher#subscribeToShard) @ {} id: {} -- Starting subscribe to shard",
shardId, connectionStart, instanceId);
streamAndShardId, connectionStart, instanceId);
flow = new RecordFlow(this, connectionStart, instanceId);
kinesis.subscribeToShard(request, flow);
}
@ -303,12 +314,12 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
if(hasValidFlow()) {
log.warn(
"{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ {} id: {} -- Subscriber is null." +
" Last successful request details -- {}", shardId, flow.connectionStartedAt,
" Last successful request details -- {}", streamAndShardId, flow.connectionStartedAt,
flow.subscribeToShardId, lastSuccessfulRequestDetails);
} else {
log.warn(
"{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) -- Subscriber and flow are null." +
" Last successful request details -- {}", shardId, lastSuccessfulRequestDetails);
" Last successful request details -- {}", streamAndShardId, lastSuccessfulRequestDetails);
}
return;
}
@ -320,8 +331,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
if (flow != null) {
String logMessage = String.format(
"%s: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ %s id: %s -- %s." +
" Last successful request details -- %s",
shardId, flow.connectionStartedAt, flow.subscribeToShardId, category.throwableTypeString, lastSuccessfulRequestDetails);
" Last successful request details -- %s", streamAndShardId, flow.connectionStartedAt, flow.subscribeToShardId, category.throwableTypeString, lastSuccessfulRequestDetails);
switch (category.throwableType) {
case READ_TIMEOUT:
log.debug(logMessage, propagationThrowable);
@ -339,13 +349,14 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
}
flow.cancel();
}
log.debug("{}: availableQueueSpace zeroing from {}", shardId, availableQueueSpace);
log.debug("{}: availableQueueSpace zeroing from {}", streamAndShardId, availableQueueSpace);
availableQueueSpace = 0;
try {
handleFlowError(propagationThrowable, triggeringFlow);
} catch (Throwable innerThrowable) {
log.warn("{}: Exception while calling subscriber.onError. Last successful request details -- {}", shardId, lastSuccessfulRequestDetails, innerThrowable);
log.warn("{}: Exception while calling subscriber.onError. Last successful request details -- {}",
streamAndShardId, lastSuccessfulRequestDetails, innerThrowable);
}
subscriber = null;
flow = null;
@ -353,7 +364,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
if (triggeringFlow != null) {
log.debug(
"{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ {} id: {} -- {} -> triggeringFlow wasn't the active flow. Didn't dispatch error",
shardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId,
streamAndShardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId,
category.throwableTypeString);
triggeringFlow.cancel();
}
@ -367,7 +378,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
// Clear any lingering records in the queue.
if (!recordsDeliveryQueue.isEmpty()) {
log.warn("{}: Found non-empty queue while starting subscription. This indicates unsuccessful clean up of"
+ "previous subscription - {}. Last successful request details -- {}", shardId, subscribeToShardId, lastSuccessfulRequestDetails);
+ "previous subscription - {}. Last successful request details -- {}", streamAndShardId, subscribeToShardId, lastSuccessfulRequestDetails);
recordsDeliveryQueue.clear();
}
}
@ -383,7 +394,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
if (t.getCause() instanceof ResourceNotFoundException) {
log.debug(
"{}: Could not call SubscribeToShard successfully because shard no longer exists. Marking shard for completion.",
shardId);
streamAndShardId);
// The ack received for this onNext event will be ignored by the publisher as the global flow object should
// be either null or renewed when the ack's flow identifier is evaluated.
FanoutRecordsRetrieved response = new FanoutRecordsRetrieved(
@ -452,7 +463,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
if (!hasValidSubscriber()) {
log.debug(
"{}: [SubscriptionLifetime] (FanOutRecordsPublisher#recordsReceived) @ {} id: {} -- Subscriber is null.",
shardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId);
streamAndShardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId);
triggeringFlow.cancel();
if (flow != null) {
flow.cancel();
@ -462,7 +473,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
if (!isActiveFlow(triggeringFlow)) {
log.debug(
"{}: [SubscriptionLifetime] (FanOutRecordsPublisher#recordsReceived) @ {} id: {} -- Received records for an inactive flow.",
shardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId);
streamAndShardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId);
return;
}
@ -478,7 +489,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
bufferCurrentEventAndScheduleIfRequired(recordsRetrieved, triggeringFlow);
} catch (Throwable t) {
log.warn("{}: Unable to buffer or schedule onNext for subscriber. Failing publisher." +
" Last successful request details -- {}", shardId, lastSuccessfulRequestDetails);
" Last successful request details -- {}", streamAndShardId, lastSuccessfulRequestDetails);
errorOccurred(triggeringFlow, t);
}
}
@ -488,7 +499,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
if (availableQueueSpace <= 0) {
log.debug(
"{}: [SubscriptionLifetime] (FanOutRecordsPublisher#recordsReceived) @ {} id: {} -- Attempted to decrement availableQueueSpace to below 0",
shardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId);
streamAndShardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId);
} else {
availableQueueSpace--;
if (availableQueueSpace > 0) {
@ -503,12 +514,13 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
private void onComplete(RecordFlow triggeringFlow) {
synchronized (lockObject) {
log.debug("{}: [SubscriptionLifetime]: (FanOutRecordsPublisher#onComplete) @ {} id: {}", shardId,
log.debug("{}: [SubscriptionLifetime]: (FanOutRecordsPublisher#onComplete) @ {} id: {}", streamAndShardId,
triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId);
triggeringFlow.cancel();
if (!hasValidSubscriber()) {
log.debug("{}: [SubscriptionLifetime]: (FanOutRecordsPublisher#onComplete) @ {} id: {}", shardId,
log.debug("{}: [SubscriptionLifetime]: (FanOutRecordsPublisher#onComplete) @ {} id: {}",
streamAndShardId,
triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId);
return;
}
@ -516,15 +528,15 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
if (!isActiveFlow(triggeringFlow)) {
log.debug(
"{}: [SubscriptionLifetime]: (FanOutRecordsPublisher#onComplete) @ {} id: {} -- Received spurious onComplete from unexpected flow. Ignoring.",
shardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId);
streamAndShardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId);
return;
}
if (currentSequenceNumber != null) {
log.debug("{}: Shard hasn't ended. Resubscribing.", shardId);
log.debug("{}: Shard hasn't ended. Resubscribing.", streamAndShardId);
subscribeToShard(currentSequenceNumber);
} else {
log.debug("{}: Shard has ended completing subscriber.", shardId);
log.debug("{}: Shard has ended completing subscriber.", streamAndShardId);
subscriber.onComplete();
}
}
@ -536,7 +548,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
if (subscriber != null) {
log.error(
"{}: A subscribe occurred while there was an active subscriber. Sending error to current subscriber",
shardId);
streamAndShardId);
MultipleSubscriberException multipleSubscriberException = new MultipleSubscriberException();
//
@ -575,7 +587,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
if (subscriber != s) {
log.warn(
"{}: (FanOutRecordsPublisher/Subscription#request) - Rejected an attempt to request({}), because subscribers don't match. Last successful request details -- {}",
shardId, n, lastSuccessfulRequestDetails);
streamAndShardId, n, lastSuccessfulRequestDetails);
return;
}
if (flow == null) {
@ -584,7 +596,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
//
log.debug(
"{}: (FanOutRecordsPublisher/Subscription#request) - Request called for a null flow.",
shardId);
streamAndShardId);
errorOccurred(flow, new IllegalStateException("Attempted to request on a null flow."));
return;
}
@ -602,19 +614,19 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
if (subscriber != s) {
log.warn(
"{}: (FanOutRecordsPublisher/Subscription#cancel) - Rejected attempt to cancel subscription, because subscribers don't match. Last successful request details -- {}",
shardId, lastSuccessfulRequestDetails);
streamAndShardId, lastSuccessfulRequestDetails);
return;
}
if (!hasValidSubscriber()) {
log.warn(
"{}: (FanOutRecordsPublisher/Subscription#cancel) - Cancelled called even with an invalid subscriber. Last successful request details -- {}",
shardId, lastSuccessfulRequestDetails);
streamAndShardId, lastSuccessfulRequestDetails);
}
subscriber = null;
if (flow != null) {
log.debug(
"{}: [SubscriptionLifetime]: (FanOutRecordsPublisher/Subscription#cancel) @ {} id: {}",
shardId, flow.connectionStartedAt, flow.subscribeToShardId);
streamAndShardId, flow.connectionStartedAt, flow.subscribeToShardId);
flow.cancel();
availableQueueSpace = 0;
}
@ -703,12 +715,12 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
public void onEventStream(SdkPublisher<SubscribeToShardEventStream> publisher) {
synchronized (parent.lockObject) {
log.debug("{}: [SubscriptionLifetime]: (RecordFlow#onEventStream) @ {} id: {} -- Subscribe",
parent.shardId, connectionStartedAt, subscribeToShardId);
parent.streamAndShardId, connectionStartedAt, subscribeToShardId);
if (!parent.isActiveFlow(this)) {
this.isDisposed = true;
log.debug(
"{}: [SubscriptionLifetime]: (RecordFlow#onEventStream) @ {} id: {} -- parent is disposed",
parent.shardId, connectionStartedAt, subscribeToShardId);
parent.streamAndShardId, connectionStartedAt, subscribeToShardId);
parent.rejectSubscription(publisher);
return;
}
@ -716,7 +728,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
try {
log.debug(
"{}: [SubscriptionLifetime]: (RecordFlow#onEventStream) @ {} id: {} -- creating record subscription",
parent.shardId, connectionStartedAt, subscribeToShardId);
parent.streamAndShardId, connectionStartedAt, subscribeToShardId);
subscription = new RecordSubscription(parent, this, connectionStartedAt, subscribeToShardId);
publisher.subscribe(subscription);
@ -727,7 +739,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
} catch (Throwable t) {
log.debug(
"{}: [SubscriptionLifetime]: (RecordFlow#onEventStream) @ {} id: {} -- throwable during record subscription: {}",
parent.shardId, connectionStartedAt, subscribeToShardId, t.getMessage());
parent.streamAndShardId, connectionStartedAt, subscribeToShardId, t.getMessage());
parent.errorOccurred(this, t);
}
}
@ -736,7 +748,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
@Override
public void responseReceived(SubscribeToShardResponse response) {
log.debug("{}: [SubscriptionLifetime]: (RecordFlow#responseReceived) @ {} id: {} -- Response received. Request id - {}",
parent.shardId, connectionStartedAt, subscribeToShardId, response.responseMetadata().requestId());
parent.streamAndShardId, connectionStartedAt, subscribeToShardId, response.responseMetadata().requestId());
final RequestDetails requestDetails = new RequestDetails(response.responseMetadata().requestId(), connectionStartedAt.toString());
parent.setLastSuccessfulRequestDetails(requestDetails);
@ -759,12 +771,12 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
synchronized (parent.lockObject) {
log.debug("{}: [SubscriptionLifetime]: (RecordFlow#exceptionOccurred) @ {} id: {} -- {}: {}",
parent.shardId, connectionStartedAt, subscribeToShardId, throwable.getClass().getName(),
parent.streamAndShardId, connectionStartedAt, subscribeToShardId, throwable.getClass().getName(),
throwable.getMessage());
if (this.isDisposed) {
log.debug(
"{}: [SubscriptionLifetime]: (RecordFlow#exceptionOccurred) @ {} id: {} -- This flow has been disposed, not dispatching error. {}: {}",
parent.shardId, connectionStartedAt, subscribeToShardId, throwable.getClass().getName(),
parent.streamAndShardId, connectionStartedAt, subscribeToShardId, throwable.getClass().getName(),
throwable.getMessage());
this.isErrorDispatched = true;
}
@ -775,7 +787,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
} else {
log.debug(
"{}: [SubscriptionLifetime]: (RecordFlow#exceptionOccurred) @ {} id: {} -- An error has previously been dispatched, not dispatching this error {}: {}",
parent.shardId, connectionStartedAt, subscribeToShardId, throwable.getClass().getName(),
parent.streamAndShardId, connectionStartedAt, subscribeToShardId, throwable.getClass().getName(),
throwable.getMessage());
}
}
@ -802,7 +814,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
} catch (Exception e) {
log.warn(
"{}: Unable to enqueue the {} shutdown event due to capacity restrictions in delivery queue with remaining capacity {}. Ignoring. Last successful request details -- {}",
parent.shardId, subscriptionShutdownEvent.getEventIdentifier(), parent.recordsDeliveryQueue.remainingCapacity(),
parent.streamAndShardId, subscriptionShutdownEvent.getEventIdentifier(), parent.recordsDeliveryQueue.remainingCapacity(),
parent.lastSuccessfulRequestDetails, subscriptionShutdownEvent.getShutdownEventThrowableOptional());
}
}
@ -810,7 +822,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
private void executeComplete() {
synchronized (parent.lockObject) {
log.debug("{}: [SubscriptionLifetime]: (RecordFlow#complete) @ {} id: {} -- Connection completed",
parent.shardId, connectionStartedAt, subscribeToShardId);
parent.streamAndShardId, connectionStartedAt, subscribeToShardId);
if (isCancelled) {
//
@ -820,13 +832,13 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
// subscription, which was cancelled for a reason (usually queue overflow).
//
log.warn("{}: complete called on a cancelled subscription. Ignoring completion. Last successful request details -- {}",
parent.shardId, parent.lastSuccessfulRequestDetails);
parent.streamAndShardId, parent.lastSuccessfulRequestDetails);
return;
}
if (this.isDisposed) {
log.warn(
"{}: [SubscriptionLifetime]: (RecordFlow#complete) @ {} id: {} -- This flow has been disposed not dispatching completion. Last successful request details -- {}",
parent.shardId, connectionStartedAt, subscribeToShardId, parent.lastSuccessfulRequestDetails);
parent.streamAndShardId, connectionStartedAt, subscribeToShardId, parent.lastSuccessfulRequestDetails);
return;
}
@ -844,7 +856,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
} catch (Throwable t) {
log.error(
"{}: [SubscriptionLifetime]: (RecordFlow#complete) @ {} id: {} -- Exception while trying to cancel failed subscription: {}",
parent.shardId, connectionStartedAt, subscribeToShardId, t.getMessage(), t);
parent.streamAndShardId, connectionStartedAt, subscribeToShardId, t.getMessage(), t);
}
}
}
@ -885,14 +897,14 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
public void cancel() {
synchronized (parent.lockObject) {
log.debug("{}: [SubscriptionLifetime]: (RecordSubscription#cancel) @ {} id: {} -- Cancel called",
parent.shardId, connectionStartedAt, subscribeToShardId);
parent.streamAndShardId, connectionStartedAt, subscribeToShardId);
flow.isCancelled = true;
if (subscription != null) {
subscription.cancel();
} else {
log.debug(
"{}: [SubscriptionLifetime]: (RecordSubscription#cancel) @ {} id: {} -- SDK subscription is null",
parent.shardId, connectionStartedAt, subscribeToShardId);
parent.streamAndShardId, connectionStartedAt, subscribeToShardId);
}
}
}
@ -906,21 +918,21 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
if (flow.isCancelled) {
log.debug(
"{}: [SubscriptionLifetime]: (RecordSubscription#onSubscribe) @ {} id: {} -- Subscription was cancelled before onSubscribe",
parent.shardId, connectionStartedAt, subscribeToShardId);
parent.streamAndShardId, connectionStartedAt, subscribeToShardId);
}
if (flow.isDisposed) {
log.debug(
"{}: [SubscriptionLifetime]: (RecordSubscription#onSubscribe) @ {} id: {} -- RecordFlow has been disposed cancelling subscribe",
parent.shardId, connectionStartedAt, subscribeToShardId);
parent.streamAndShardId, connectionStartedAt, subscribeToShardId);
}
log.debug(
"{}: [SubscriptionLifetime]: (RecordSubscription#onSubscribe) @ {} id: {} -- RecordFlow requires cancelling",
parent.shardId, connectionStartedAt, subscribeToShardId);
parent.streamAndShardId, connectionStartedAt, subscribeToShardId);
cancel();
}
log.debug(
"{}: [SubscriptionLifetime]: (RecordSubscription#onSubscribe) @ {} id: {} -- Outstanding: {} items so requesting an item",
parent.shardId, connectionStartedAt, subscribeToShardId, parent.availableQueueSpace);
parent.streamAndShardId, connectionStartedAt, subscribeToShardId, parent.availableQueueSpace);
if (parent.availableQueueSpace > 0) {
request(1);
}
@ -933,7 +945,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
if (flow.shouldSubscriptionCancel()) {
log.debug(
"{}: [SubscriptionLifetime]: (RecordSubscription#onNext) @ {} id: {} -- RecordFlow requires cancelling",
parent.shardId, connectionStartedAt, subscribeToShardId);
parent.streamAndShardId, connectionStartedAt, subscribeToShardId);
cancel();
return;
}
@ -948,7 +960,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
@Override
public void onError(Throwable t) {
log.debug("{}: [SubscriptionLifetime]: (RecordSubscription#onError) @ {} id: {} -- {}: {}", parent.shardId,
log.debug("{}: [SubscriptionLifetime]: (RecordSubscription#onError) @ {} id: {} -- {}: {}", parent.streamAndShardId,
connectionStartedAt, subscribeToShardId, t.getClass().getName(), t.getMessage());
//
@ -961,7 +973,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
public void onComplete() {
log.debug(
"{}: [SubscriptionLifetime]: (RecordSubscription#onComplete) @ {} id: {} -- Allowing RecordFlow to call onComplete",
parent.shardId, connectionStartedAt, subscribeToShardId);
parent.streamAndShardId, connectionStartedAt, subscribeToShardId);
}
}
}

View file

@ -53,10 +53,14 @@ public class FanOutRetrievalFactory implements RetrievalFactory {
final String streamName;
if(streamIdentifierStr.isPresent()) {
streamName = StreamIdentifier.multiStreamInstance(streamIdentifierStr.get()).streamName();
return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(),
streamToConsumerArnMap.computeIfAbsent(streamName, consumerArnProvider::apply),
streamIdentifierStr.get());
} else {
streamName = defaultStreamName;
return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(),
streamToConsumerArnMap.computeIfAbsent(streamName, consumerArnProvider::apply));
}
return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(),
streamToConsumerArnMap.computeIfAbsent(streamName, consumerArnProvider::apply));
}
}

View file

@ -63,7 +63,7 @@ public class KinesisDataFetcher {
@NonNull
private final KinesisAsyncClient kinesisClient;
@NonNull
@NonNull @Getter
private final StreamIdentifier streamIdentifier;
@NonNull
private final String shardId;
@ -71,6 +71,7 @@ public class KinesisDataFetcher {
@NonNull
private final MetricsFactory metricsFactory;
private final Duration maxFutureWait;
private final String streamAndShardId;
@Deprecated
public KinesisDataFetcher(KinesisAsyncClient kinesisClient, String streamName, String shardId, int maxRecords, MetricsFactory metricsFactory) {
@ -93,6 +94,7 @@ public class KinesisDataFetcher {
this.maxRecords = maxRecords;
this.metricsFactory = metricsFactory;
this.maxFutureWait = maxFutureWait;
this.streamAndShardId = streamIdentifier.serialize() + ":" + shardId;
}
/** Note: This method has package level access for testing purposes.
@ -120,7 +122,7 @@ public class KinesisDataFetcher {
try {
return new AdvancingResult(getRecords(nextIterator));
} catch (ResourceNotFoundException e) {
log.info("Caught ResourceNotFoundException when fetching records for shard {}", shardId);
log.info("Caught ResourceNotFoundException when fetching records for shard {}", streamAndShardId);
return TERMINAL_RESULT;
}
} else {
@ -182,14 +184,14 @@ public class KinesisDataFetcher {
*/
public void initialize(final String initialCheckpoint,
final InitialPositionInStreamExtended initialPositionInStream) {
log.info("Initializing shard {} with {}", shardId, initialCheckpoint);
log.info("Initializing shard {} with {}", streamAndShardId, initialCheckpoint);
advanceIteratorTo(initialCheckpoint, initialPositionInStream);
isInitialized = true;
}
public void initialize(final ExtendedSequenceNumber initialCheckpoint,
final InitialPositionInStreamExtended initialPositionInStream) {
log.info("Initializing shard {} with {}", shardId, initialCheckpoint.sequenceNumber());
log.info("Initializing shard {} with {}", streamAndShardId, initialCheckpoint.sequenceNumber());
advanceIteratorTo(initialCheckpoint.sequenceNumber(), initialPositionInStream);
isInitialized = true;
}
@ -234,7 +236,7 @@ public class KinesisDataFetcher {
throw new RetryableRetrievalException(e.getMessage(), e);
}
} catch (ResourceNotFoundException e) {
log.info("Caught ResourceNotFoundException when getting an iterator for shard {}", shardId, e);
log.info("Caught ResourceNotFoundException when getting an iterator for shard {}", streamAndShardId, e);
nextIterator = null;
} finally {
MetricsUtil.addSuccessAndLatency(metricsScope, String.format("%s.%s", METRICS_PREFIX, "getShardIterator"),
@ -285,7 +287,7 @@ public class KinesisDataFetcher {
throw exceptionManager.apply(e.getCause());
} catch (InterruptedException e) {
// TODO: Check behavior
log.debug("Interrupt called on metod, shutdown initiated");
log.debug("{} : Interrupt called on method, shutdown initiated", streamAndShardId);
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RetryableRetrievalException(e.getMessage(), e);

View file

@ -91,7 +91,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
private final DefaultGetRecordsCacheDaemon defaultGetRecordsCacheDaemon;
private boolean started = false;
private final String operation;
private final String shardId;
private final String streamAndShardId;
private Subscriber<? super RecordsRetrieved> subscriber;
@VisibleForTesting @Getter
private final PublisherSession publisherSession;
@ -135,11 +135,11 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
// Handle records delivery ack and execute nextEventDispatchAction.
// This method is not thread-safe and needs to be called after acquiring a monitor.
void handleRecordsDeliveryAck(RecordsDeliveryAck recordsDeliveryAck, String shardId, Runnable nextEventDispatchAction) {
void handleRecordsDeliveryAck(RecordsDeliveryAck recordsDeliveryAck, String streamAndShardId, Runnable nextEventDispatchAction) {
final PrefetchRecordsRetrieved recordsToCheck = peekNextRecord();
// Verify if the ack matches the head of the queue and evict it.
if (recordsToCheck != null && recordsToCheck.batchUniqueIdentifier().equals(recordsDeliveryAck.batchUniqueIdentifier())) {
evictPublishedRecordAndUpdateDemand(shardId);
evictPublishedRecordAndUpdateDemand(streamAndShardId);
nextEventDispatchAction.run();
} else {
// Log and ignore any other ack received. As long as an ack is received for head of the queue
@ -148,21 +148,21 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
final BatchUniqueIdentifier peekedBatchUniqueIdentifier =
recordsToCheck == null ? null : recordsToCheck.batchUniqueIdentifier();
log.info("{} : Received a stale notification with id {} instead of expected id {} at {}. Will ignore.",
shardId, recordsDeliveryAck.batchUniqueIdentifier(), peekedBatchUniqueIdentifier, Instant.now());
streamAndShardId, recordsDeliveryAck.batchUniqueIdentifier(), peekedBatchUniqueIdentifier, Instant.now());
}
}
// Evict the published record from the prefetch queue.
// This method is not thread-safe and needs to be called after acquiring a monitor.
@VisibleForTesting
RecordsRetrieved evictPublishedRecordAndUpdateDemand(String shardId) {
RecordsRetrieved evictPublishedRecordAndUpdateDemand(String streamAndShardId) {
final PrefetchRecordsRetrieved result = prefetchRecordsQueue.poll();
if (result != null) {
updateDemandTrackersOnPublish(result);
} else {
log.info(
"{}: No record batch found while evicting from the prefetch queue. This indicates the prefetch buffer"
+ "was reset.", shardId);
+ "was reset.", streamAndShardId);
}
return result;
}
@ -222,7 +222,8 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
this.defaultGetRecordsCacheDaemon = new DefaultGetRecordsCacheDaemon();
Validate.notEmpty(operation, "Operation cannot be empty");
this.operation = operation;
this.shardId = shardId;
this.streamAndShardId =
this.getRecordsRetrievalStrategy.getDataFetcher().getStreamIdentifier().serialize() + ":" + shardId;
}
@Override
@ -234,7 +235,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
publisherSession.init(extendedSequenceNumber, initialPositionInStreamExtended);
if (!started) {
log.info("{} : Starting prefetching thread.", shardId);
log.info("{} : Starting prefetching thread.", streamAndShardId);
executorService.execute(defaultGetRecordsCacheDaemon);
}
started = true;
@ -304,9 +305,9 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
@Override
public synchronized void notify(RecordsDeliveryAck recordsDeliveryAck) {
publisherSession.handleRecordsDeliveryAck(recordsDeliveryAck, shardId, () -> drainQueueForRequests());
publisherSession.handleRecordsDeliveryAck(recordsDeliveryAck, streamAndShardId, () -> drainQueueForRequests());
// Take action based on the time spent by the event in queue.
takeDelayedDeliveryActionIfRequired(shardId, lastEventDeliveryTime, log);
takeDelayedDeliveryActionIfRequired(streamAndShardId, lastEventDeliveryTime, log);
}
// Note : Do not make this method synchronous as notify() will not be able to evict any entry from the queue.
@ -403,7 +404,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
public void run() {
while (!isShutdown) {
if (Thread.currentThread().isInterrupted()) {
log.warn("{} : Prefetch thread was interrupted.", shardId);
log.warn("{} : Prefetch thread was interrupted.", streamAndShardId);
break;
}
@ -411,7 +412,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
try {
makeRetrievalAttempt();
} catch(PositionResetException pre) {
log.debug("{} : Position was reset while attempting to add item to queue.", shardId);
log.debug("{} : Position was reset while attempting to add item to queue.", streamAndShardId);
} finally {
resetLock.readLock().unlock();
}
@ -447,23 +448,23 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
} catch (PositionResetException pse) {
throw pse;
} catch (RetryableRetrievalException rre) {
log.info("{} : Timeout occurred while waiting for response from Kinesis. Will retry the request.", shardId);
log.info("{} : Timeout occurred while waiting for response from Kinesis. Will retry the request.", streamAndShardId);
} catch (InterruptedException e) {
log.info("{} : Thread was interrupted, indicating shutdown was called on the cache.", shardId);
log.info("{} : Thread was interrupted, indicating shutdown was called on the cache.", streamAndShardId);
} catch (ExpiredIteratorException e) {
log.info("{} : records threw ExpiredIteratorException - restarting"
+ " after greatest seqNum passed to customer", shardId, e);
+ " after greatest seqNum passed to customer", streamAndShardId, e);
scope.addData(EXPIRED_ITERATOR_METRIC, 1, StandardUnit.COUNT, MetricsLevel.SUMMARY);
publisherSession.dataFetcher().restartIterator();
} catch (SdkException e) {
log.error("{} : Exception thrown while fetching records from Kinesis", shardId, e);
log.error("{} : Exception thrown while fetching records from Kinesis", streamAndShardId, e);
} catch (Throwable e) {
log.error("{} : Unexpected exception was thrown. This could probably be an issue or a bug." +
" Please search for the exception/error online to check what is going on. If the " +
"issue persists or is a recurring problem, feel free to open an issue on, " +
"https://github.com/awslabs/amazon-kinesis-client.", shardId, e);
"https://github.com/awslabs/amazon-kinesis-client.", streamAndShardId, e);
} finally {
MetricsUtil.endScope(scope);
}
@ -475,7 +476,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
publisherSession.prefetchCounters().waitForConsumer();
} catch (InterruptedException ie) {
log.info("{} : Thread was interrupted while waiting for the consumer. " +
"Shutdown has probably been started", shardId);
"Shutdown has probably been started", streamAndShardId);
}
}
}
@ -522,14 +523,14 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
public synchronized void waitForConsumer() throws InterruptedException {
if (!shouldGetNewRecords()) {
log.debug("{} : Queue is full waiting for consumer for {} ms", shardId, idleMillisBetweenCalls);
log.debug("{} : Queue is full waiting for consumer for {} ms", streamAndShardId, idleMillisBetweenCalls);
this.wait(idleMillisBetweenCalls);
}
}
public synchronized boolean shouldGetNewRecords() {
if (log.isDebugEnabled()) {
log.debug("{} : Current Prefetch Counter States: {}", shardId, this.toString());
log.debug("{} : Current Prefetch Counter States: {}", streamAndShardId, this.toString());
}
return size < maxRecordsCount && byteSize < maxByteSize;
}

View file

@ -40,14 +40,20 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.RejectedExecutionException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import com.google.common.base.Joiner;
import com.google.common.collect.Sets;
import io.reactivex.plugins.RxJavaPlugins;
import lombok.RequiredArgsConstructor;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -57,6 +63,8 @@ import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.utils.Either;
import software.amazon.awssdk.utils.Validate;
import software.amazon.kinesis.checkpoint.Checkpoint;
import software.amazon.kinesis.checkpoint.CheckpointConfig;
import software.amazon.kinesis.checkpoint.CheckpointFactory;
@ -75,6 +83,7 @@ import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.ShardSyncTaskManager;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseRefresher;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.lifecycle.LifecycleConfig;
import software.amazon.kinesis.lifecycle.ShardConsumer;
@ -178,6 +187,7 @@ public class SchedulerTest {
when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector);
when(shardSyncTaskManager.executeShardSyncTask()).thenReturn(new TaskResult(null));
when(retrievalFactory.createGetRecordsCache(any(ShardInfo.class), any(MetricsFactory.class))).thenReturn(recordsPublisher);
when(shardDetector.streamIdentifier()).thenReturn(mock(StreamIdentifier.class));
scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
metricsConfig, processorConfig, retrievalConfig);
@ -380,6 +390,115 @@ public class SchedulerTest {
}
@Test
public final void testMultiStreamNoStreamsAreSyncedWhenStreamsAreNotRefreshed()
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
List<StreamConfig> streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig(
StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)),
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)))
.collect(Collectors.toCollection(LinkedList::new));
List<StreamConfig> streamConfigList2 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig(
StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)),
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)))
.collect(Collectors.toCollection(LinkedList::new));
retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName)
.retrievalFactory(retrievalFactory);
when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList1, streamConfigList2);
scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
metricsConfig, processorConfig, retrievalConfig));
when(scheduler.shouldSyncStreamsNow()).thenReturn(true);
Set<StreamIdentifier> syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases();
Assert.assertTrue("SyncedStreams should be empty", syncedStreams.isEmpty());
Assert.assertEquals(new HashSet(streamConfigList1), new HashSet(scheduler.currentStreamConfigMap().values()));
}
@Test
public final void testMultiStreamOnlyNewStreamsAreSynced()
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
List<StreamConfig> streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig(
StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)),
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)))
.collect(Collectors.toCollection(LinkedList::new));
List<StreamConfig> streamConfigList2 = IntStream.range(1, 7).mapToObj(streamId -> new StreamConfig(
StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)),
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)))
.collect(Collectors.toCollection(LinkedList::new));
retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName)
.retrievalFactory(retrievalFactory);
when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList1, streamConfigList2);
scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
metricsConfig, processorConfig, retrievalConfig));
when(scheduler.shouldSyncStreamsNow()).thenReturn(true);
Set<StreamIdentifier> syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases();
Set<StreamIdentifier> expectedSyncedStreams = IntStream.range(5, 7).mapToObj(streamId -> StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))).collect(
Collectors.toCollection(HashSet::new));
Assert.assertEquals(expectedSyncedStreams, syncedStreams);
Assert.assertEquals(Sets.newHashSet(streamConfigList2),
Sets.newHashSet(scheduler.currentStreamConfigMap().values()));
}
@Test
public final void testMultiStreamOnlyStaleStreamsAreSynced()
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
List<StreamConfig> streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig(
StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)),
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)))
.collect(Collectors.toCollection(LinkedList::new));
List<StreamConfig> streamConfigList2 = IntStream.range(3, 5).mapToObj(streamId -> new StreamConfig(
StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)),
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)))
.collect(Collectors.toCollection(LinkedList::new));
retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName)
.retrievalFactory(retrievalFactory);
when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList1, streamConfigList2);
scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
metricsConfig, processorConfig, retrievalConfig));
when(scheduler.shouldSyncStreamsNow()).thenReturn(true);
Set<StreamIdentifier> syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases();
Set<StreamIdentifier> expectedSyncedStreams = IntStream.range(1, 3).mapToObj(streamId -> StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))).collect(
Collectors.toCollection(HashSet::new));
Assert.assertEquals(expectedSyncedStreams, syncedStreams);
Assert.assertEquals(Sets.newHashSet(streamConfigList2),
Sets.newHashSet(scheduler.currentStreamConfigMap().values()));
}
@Test
public final void testMultiStreamSyncOnlyNewAndStaleStreamsAreSynced()
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
List<StreamConfig> streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig(
StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)),
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)))
.collect(Collectors.toCollection(LinkedList::new));
List<StreamConfig> streamConfigList2 = IntStream.range(3, 7).mapToObj(streamId -> new StreamConfig(
StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)),
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)))
.collect(Collectors.toCollection(LinkedList::new));
retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName)
.retrievalFactory(retrievalFactory);
when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList1, streamConfigList2);
scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
metricsConfig, processorConfig, retrievalConfig));
when(scheduler.shouldSyncStreamsNow()).thenReturn(true);
Set<StreamIdentifier> syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases();
Set<StreamIdentifier> expectedSyncedStreams = IntStream.concat(IntStream.range(1, 3), IntStream.range(5, 7))
.mapToObj(streamId -> StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)))
.collect(Collectors.toCollection(HashSet::new));
Assert.assertEquals(expectedSyncedStreams, syncedStreams);
Assert.assertEquals(Sets.newHashSet(streamConfigList2),
Sets.newHashSet(scheduler.currentStreamConfigMap().values()));
}
@Test
public final void testInitializationWaitsWhenLeaseTableIsEmpty() throws Exception {
final int maxInitializationAttempts = 1;
@ -685,6 +804,7 @@ public class SchedulerTest {
shardSyncTaskManagerMap.put(streamConfig.streamIdentifier(), shardSyncTaskManager);
shardDetectorMap.put(streamConfig.streamIdentifier(), shardDetector);
when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector);
when(shardDetector.streamIdentifier()).thenReturn(streamConfig.streamIdentifier());
when(shardSyncTaskManager.executeShardSyncTask()).thenReturn(new TaskResult(null));
if(shardSyncFirstAttemptFailure) {
when(shardDetector.listShards())

View file

@ -103,6 +103,7 @@ public class HierarchicalShardSyncerTest {
@Before
public void setup() {
hierarchicalShardSyncer = new HierarchicalShardSyncer();
when(shardDetector.streamIdentifier()).thenReturn(StreamIdentifier.singleStreamInstance("stream"));
}
private void setupMultiStream() {
@ -1196,16 +1197,16 @@ public class HierarchicalShardSyncerTest {
}
}
// /**
// * Test CheckIfDescendantAndAddNewLeasesForAncestors (initial position Latest)
// * Shard structure (each level depicts a stream segment):
// * 0 1 2 3 4 5- shards till epoch 102
// * \ / \ / | |
// * 6 7 4 5- shards from epoch 103 - 205
// * \ / | /\
// * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
// * Current leases: (4, 5, 7)
// */
/**
* Test CheckIfDescendantAndAddNewLeasesForAncestors (initial position Latest)
* Shard structure (each level depicts a stream segment):
* 0 1 2 3 4 5- shards till epoch 102
* \ / \ / | |
* 6 7 4 5- shards from epoch 103 - 205
* \ / | /\
* 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
* Current leases: (4, 5, 7)
*/
@Test
public void testDetermineNewLeasesToCreateSplitMergeLatest2() {
final List<Shard> shards = constructShardListForGraphA();

View file

@ -119,10 +119,10 @@ public class ConsumerStatesTest {
maxListShardsRetryAttempts, shouldCallProcessRecordsEvenForEmptyRecordList, idleTimeInMillis,
INITIAL_POSITION_IN_STREAM, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, shardDetector,
new AggregatorUtil(), hierarchicalShardSyncer, metricsFactory);
when(shardInfo.shardId()).thenReturn("shardId-000000000000");
when(shardInfo.streamIdentifierSerOpt()).thenReturn(Optional.of(StreamIdentifier.singleStreamInstance(STREAM_NAME).serialize()));
consumer = spy(new ShardConsumer(recordsPublisher, executorService, shardInfo, logWarningForTaskAfterMillis,
argument, taskExecutionListener, 0));
when(shardInfo.shardId()).thenReturn("shardId-000000000000");
when(recordProcessorCheckpointer.checkpointer()).thenReturn(checkpointer);
}

View file

@ -76,6 +76,7 @@ import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.RequestDetails;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.lifecycle.ShardConsumerNotifyingSubscriber;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.metrics.NullMetricsFactory;
@ -120,7 +121,7 @@ public class PrefetchRecordsPublisherTest {
@Before
public void setup() {
when(getRecordsRetrievalStrategy.getDataFetcher()).thenReturn(dataFetcher);
when(dataFetcher.getStreamIdentifier()).thenReturn(StreamIdentifier.singleStreamInstance("testStream"));
executorService = spy(Executors.newFixedThreadPool(1));
getRecordsCache = new PrefetchRecordsPublisher(
MAX_SIZE,

View file

@ -16,6 +16,7 @@ package software.amazon.kinesis.retrieval.polling;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.when;
import org.junit.Before;
import org.junit.Ignore;
@ -23,6 +24,7 @@ import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.retrieval.DataFetchingStrategy;
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
@ -37,11 +39,15 @@ public class RecordsFetcherFactoryTest {
private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
@Mock
private MetricsFactory metricsFactory;
@Mock
private KinesisDataFetcher kinesisDataFetcher;
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
recordsFetcherFactory = new SimpleRecordsFetcherFactory();
when(getRecordsRetrievalStrategy.getDataFetcher()).thenReturn(kinesisDataFetcher);
when(kinesisDataFetcher.getStreamIdentifier()).thenReturn(StreamIdentifier.singleStreamInstance("stream"));
}
@Test

View file

@ -33,7 +33,7 @@
</scm>
<properties>
<awssdk.version>2.10.66</awssdk.version>
<awssdk.version>2.11.8-SNAPSHOT</awssdk.version>
</properties>
<licenses>