Added logging w.r.t. StreamConfig handling. (#1024)

This commit is contained in:
stair 2023-01-30 17:41:11 -05:00 committed by GitHub
parent 0e86089123
commit 4411a3dc77
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 45 additions and 41 deletions

View file

@ -21,7 +21,7 @@
<parent>
<artifactId>amazon-kinesis-client-pom</artifactId>
<groupId>software.amazon.kinesis</groupId>
<version>2.4.5</version>
<version>2.4.6-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

View file

@ -22,7 +22,7 @@
<parent>
<groupId>software.amazon.kinesis</groupId>
<artifactId>amazon-kinesis-client-pom</artifactId>
<version>2.4.5</version>
<version>2.4.6-SNAPSHOT</version>
</parent>
<artifactId>amazon-kinesis-client</artifactId>

View file

@ -41,7 +41,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -54,7 +53,6 @@ import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.utils.Validate;
import software.amazon.kinesis.checkpoint.CheckpointConfig;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.common.StreamIdentifier;
@ -75,7 +73,6 @@ import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseSerializer;
import software.amazon.kinesis.leases.dynamodb.DynamoDBMultiStreamLeaseSerializer;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.LeasingException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.lifecycle.LifecycleConfig;
import software.amazon.kinesis.lifecycle.ShardConsumer;
@ -83,7 +80,6 @@ import software.amazon.kinesis.lifecycle.ShardConsumerArgument;
import software.amazon.kinesis.lifecycle.ShardConsumerShutdownNotification;
import software.amazon.kinesis.lifecycle.ShutdownNotification;
import software.amazon.kinesis.lifecycle.ShutdownReason;
import software.amazon.kinesis.lifecycle.TaskResult;
import software.amazon.kinesis.metrics.CloudWatchMetricsFactory;
import software.amazon.kinesis.metrics.MetricsConfig;
import software.amazon.kinesis.metrics.MetricsFactory;
@ -113,16 +109,16 @@ public class Scheduler implements Runnable {
private static final int PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT = 1;
private static final long LEASE_TABLE_CHECK_FREQUENCY_MILLIS = 3 * 1000L;
private static final long MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 1 * 1000L;
private static final long MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 1000L;
private static final long MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 30 * 1000L;
private static final long NEW_STREAM_CHECK_INTERVAL_MILLIS = 1 * 60 * 1000L;
private static final long NEW_STREAM_CHECK_INTERVAL_MILLIS = 60_000L;
private static final boolean SHOULD_DO_LEASE_SYNC_FOR_OLD_STREAMS = false;
private static final String MULTI_STREAM_TRACKER = "MultiStreamTracker";
private static final String ACTIVE_STREAMS_COUNT = "ActiveStreams.Count";
private static final String PENDING_STREAMS_DELETION_COUNT = "StreamsPendingDeletion.Count";
private static final String DELETED_STREAMS_COUNT = "DeletedStreams.Count";
private SchedulerLog slog = new SchedulerLog();
private final SchedulerLog slog = new SchedulerLog();
private final CheckpointConfig checkpointConfig;
private final CoordinatorConfig coordinatorConfig;
@ -175,7 +171,7 @@ public class Scheduler implements Runnable {
// Holds consumers for shards the worker is currently tracking. Key is shard
// info, value is ShardConsumer.
private ConcurrentMap<ShardInfo, ShardConsumer> shardInfoShardConsumerMap = new ConcurrentHashMap<>();
private final ConcurrentMap<ShardInfo, ShardConsumer> shardInfoShardConsumerMap = new ConcurrentHashMap<>();
private volatile boolean shutdown;
private volatile long shutdownStartTimeMillis;
@ -183,7 +179,7 @@ public class Scheduler implements Runnable {
private final Object lock = new Object();
private Stopwatch streamSyncWatch = Stopwatch.createUnstarted();
private final Stopwatch streamSyncWatch = Stopwatch.createUnstarted();
private boolean leasesSyncedOnAppInit = false;
/**
@ -236,8 +232,13 @@ public class Scheduler implements Runnable {
return multiStreamTracker.streamConfigList().stream()
.collect(Collectors.toMap(sc -> sc.streamIdentifier(), sc -> sc));
},
streamConfig ->
Collections.singletonMap(streamConfig.streamIdentifier(), streamConfig));
streamConfig -> {
// use a concrete, non-singleton map to allow computeIfAbsent(...)
// without forcing behavioral differences for multi-stream support
final Map<StreamIdentifier, StreamConfig> map = new HashMap<>();
map.put(streamConfig.streamIdentifier(), streamConfig);
return map;
});
this.maxInitializationAttempts = this.coordinatorConfig.maxInitializationAttempts();
this.metricsFactory = this.metricsConfig.metricsFactory();
// Determine leaseSerializer based on availability of MultiStreamTracker.
@ -348,7 +349,6 @@ public class Scheduler implements Runnable {
log.info("Initializing LeaseCoordinator");
leaseCoordinator.initialize();
TaskResult result;
if (!skipShardSyncAtWorkerInitializationIfLeasesExist || leaseRefresher.isLeaseTableEmpty()) {
if (shouldInitiateLeaseSync()) {
log.info("Worker {} is initiating the lease sync.", leaseManagementConfig.workerIdentifier());
@ -450,7 +450,6 @@ public class Scheduler implements Runnable {
return leaderDecider.isLeader(leaseManagementConfig.workerIdentifier());
}
/**
* Note: This method has package level access solely for testing purposes.
* Sync all streams method.
@ -484,10 +483,11 @@ public class Scheduler implements Runnable {
// For new streams discovered, do a shard sync and update the currentStreamConfigMap
for (StreamIdentifier streamIdentifier : newStreamConfigMap.keySet()) {
if (!currentStreamConfigMap.containsKey(streamIdentifier)) {
log.info("Found new stream to process: " + streamIdentifier + ". Syncing shards of that stream.");
ShardSyncTaskManager shardSyncTaskManager = createOrGetShardSyncTaskManager(newStreamConfigMap.get(streamIdentifier));
final StreamConfig streamConfig = newStreamConfigMap.get(streamIdentifier);
log.info("Found new stream to process: {}. Syncing shards of that stream.", streamConfig);
ShardSyncTaskManager shardSyncTaskManager = createOrGetShardSyncTaskManager(streamConfig);
shardSyncTaskManager.submitShardSyncTask();
currentStreamConfigMap.put(streamIdentifier, newStreamConfigMap.get(streamIdentifier));
currentStreamConfigMap.put(streamIdentifier, streamConfig);
streamsSynced.add(streamIdentifier);
} else {
if (log.isDebugEnabled()) {
@ -521,12 +521,10 @@ public class Scheduler implements Runnable {
// In order to give workers with stale stream info, sufficient time to learn about the new streams
// before attempting to delete it, we will be deferring the leases deletion based on the
// defer time period.
currentStreamConfigMap.keySet().stream().forEach(streamIdentifier -> enqueueStreamLeaseDeletionOperation.accept(streamIdentifier));
currentStreamConfigMap.keySet().forEach(enqueueStreamLeaseDeletionOperation);
} else if (formerStreamsLeasesDeletionStrategy.leaseDeletionType() == StreamsLeasesDeletionType.PROVIDED_STREAMS_DEFERRED_DELETION) {
Optional.ofNullable(formerStreamsLeasesDeletionStrategy.streamIdentifiersForLeaseCleanup()).ifPresent(
streamIdentifiers -> streamIdentifiers.stream().forEach(streamIdentifier -> enqueueStreamLeaseDeletionOperation.accept(streamIdentifier)));
streamIdentifiers -> streamIdentifiers.forEach(enqueueStreamLeaseDeletionOperation));
} else {
// Remove the old/stale streams identified through the new and existing streams list, without
// cleaning up their leases. Disabling deprecated shard sync + lease cleanup through a flag.
@ -614,28 +612,27 @@ public class Scheduler implements Runnable {
private Set<StreamIdentifier> deleteMultiStreamLeases(Set<StreamIdentifier> streamIdentifiers)
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
if (streamIdentifiers.isEmpty()) {
return Collections.emptySet();
}
final Set<StreamIdentifier> streamsSynced = new HashSet<>();
List<MultiStreamLease> leases = null;
Map<String, List<MultiStreamLease>> streamIdToShardsMap = null;
for(StreamIdentifier streamIdentifier : streamIdentifiers) {
if (leases == null) {
// Lazy Load once and use many times for this iteration.
leases = fetchMultiStreamLeases();
}
if (streamIdToShardsMap == null) {
// Lazy load once and use many times for this iteration.
streamIdToShardsMap = leases.stream().collect(Collectors
.groupingBy(MultiStreamLease::streamIdentifier,
Collectors.toCollection(ArrayList::new)));
}
log.warn("Found old/deleted stream: " + streamIdentifier + ". Directly deleting leases of this stream.");
final List<MultiStreamLease> leases = fetchMultiStreamLeases();
final Map<String, List<MultiStreamLease>> streamIdToShardsMap = leases.stream().collect(
Collectors.groupingBy(MultiStreamLease::streamIdentifier, Collectors.toCollection(ArrayList::new)));
for (StreamIdentifier streamIdentifier : streamIdentifiers) {
// Deleting leases will cause the workers to shutdown the record processors for these shards.
if (deleteMultiStreamLeases(streamIdToShardsMap.get(streamIdentifier.serialize()))) {
log.warn("Found old/deleted stream: {}. Directly deleting leases of this stream.", streamIdentifier);
currentStreamConfigMap.remove(streamIdentifier);
staleStreamDeletionMap.remove(streamIdentifier);
streamsSynced.add(streamIdentifier);
}
}
if (!streamsSynced.isEmpty()) {
// map keys are StreamIdentifiers, which are members of StreamConfig, and therefore redundant
log.info("Streams retained post-deletion: {}", currentStreamConfigMap.values());
}
return streamsSynced;
}
@ -655,9 +652,15 @@ public class Scheduler implements Runnable {
return true;
}
// Generate default StreamConfig for an "orphaned" stream that is in the lease table but not tracked
/**
* Generates default StreamConfig for an "orphaned" stream that is in the lease table but not tracked.
*
* @param streamIdentifier stream for which an orphan config should be generated
*/
private StreamConfig getOrphanedStreamConfig(StreamIdentifier streamIdentifier) {
return new StreamConfig(streamIdentifier, orphanedStreamInitialPositionInStream);
final StreamConfig orphanConfig = new StreamConfig(streamIdentifier, orphanedStreamInitialPositionInStream);
log.info("Identified as orphan: {}", orphanConfig);
return orphanConfig;
}
/**
@ -919,7 +922,7 @@ public class Scheduler implements Runnable {
// Irrespective of single stream app or multi stream app, streamConfig should always be available.
// If we have a shardInfo, that is not present in currentStreamConfigMap for whatever reason, then return default stream config
// to gracefully complete the reading.
final StreamConfig streamConfig = currentStreamConfigMap.getOrDefault(streamIdentifier, getOrphanedStreamConfig(streamIdentifier));
final StreamConfig streamConfig = currentStreamConfigMap.computeIfAbsent(streamIdentifier, this::getOrphanedStreamConfig);
Validate.notNull(streamConfig, "StreamConfig should not be null");
RecordsPublisher cache = retrievalConfig.retrievalFactory().createGetRecordsCache(shardInfo, streamConfig, metricsFactory);
ShardConsumerArgument argument = new ShardConsumerArgument(shardInfo,

View file

@ -26,6 +26,7 @@ import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.utils.Either;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.KinesisClientLibraryPackage;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.processor.MultiStreamTracker;
@ -46,7 +47,7 @@ public class RetrievalConfig {
*/
public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java";
public static final String KINESIS_CLIENT_LIB_USER_AGENT_VERSION = "2.4.5";
public static final String KINESIS_CLIENT_LIB_USER_AGENT_VERSION = "2.4.6-SNAPSHOT";
/**
* Client used to make calls to Kinesis for records retrieval

View file

@ -22,7 +22,7 @@
<artifactId>amazon-kinesis-client-pom</artifactId>
<packaging>pom</packaging>
<name>Amazon Kinesis Client Library</name>
<version>2.4.5</version>
<version>2.4.6-SNAPSHOT</version>
<description>The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data
from Amazon Kinesis.
</description>