Merge pull request #55 from ashwing/ltr_1_periodic_auditor_metrics_configs
Ltr 1 periodic auditor metrics configs
This commit is contained in:
commit
562face833
7 changed files with 108 additions and 98 deletions
|
|
@ -23,6 +23,7 @@ import lombok.Value;
|
|||
import lombok.experimental.Accessors;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.Validate;
|
||||
import software.amazon.awssdk.services.cloudwatch.model.StandardUnit;
|
||||
import software.amazon.awssdk.services.kinesis.model.Shard;
|
||||
import software.amazon.awssdk.utils.CollectionUtils;
|
||||
import software.amazon.kinesis.common.HashKeyRangeForLease;
|
||||
|
|
@ -38,6 +39,10 @@ import software.amazon.kinesis.leases.exceptions.DependencyException;
|
|||
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
|
||||
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
|
||||
import software.amazon.kinesis.lifecycle.TaskResult;
|
||||
import software.amazon.kinesis.metrics.MetricsFactory;
|
||||
import software.amazon.kinesis.metrics.MetricsLevel;
|
||||
import software.amazon.kinesis.metrics.MetricsScope;
|
||||
import software.amazon.kinesis.metrics.MetricsUtil;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.math.BigInteger;
|
||||
|
|
@ -47,7 +52,6 @@ import java.util.Comparator;
|
|||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Executors;
|
||||
|
|
@ -67,13 +71,11 @@ import static software.amazon.kinesis.common.HashKeyRangeForLease.fromHashKeyRan
|
|||
@Slf4j
|
||||
class PeriodicShardSyncManager {
|
||||
private static final long INITIAL_DELAY = 60 * 1000L;
|
||||
private static final long PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 2 * 60 * 1000L;
|
||||
@VisibleForTesting
|
||||
static final BigInteger MIN_HASH_KEY = BigInteger.ZERO;
|
||||
@VisibleForTesting
|
||||
static final BigInteger MAX_HASH_KEY = new BigInteger("2").pow(128).subtract(BigInteger.ONE);
|
||||
@VisibleForTesting
|
||||
static final int CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY = 3;
|
||||
static final String PERIODIC_SHARD_SYNC_MANAGER = "PeriodicShardSyncManager";
|
||||
private Map<StreamIdentifier, HashRangeHoleTracker> hashRangeHoleTrackerMap = new HashMap<>();
|
||||
|
||||
private final String workerId;
|
||||
|
|
@ -83,19 +85,29 @@ class PeriodicShardSyncManager {
|
|||
private final Function<StreamConfig, ShardSyncTaskManager> shardSyncTaskManagerProvider;
|
||||
private final ScheduledExecutorService shardSyncThreadPool;
|
||||
private final boolean isMultiStreamingMode;
|
||||
private final MetricsFactory metricsFactory;
|
||||
private final long leasesRecoveryAuditorExecutionFrequencyMillis;
|
||||
private final int leasesRecoveryAuditorInconsistencyConfidenceThreshold;
|
||||
private boolean isRunning;
|
||||
|
||||
PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, LeaseRefresher leaseRefresher,
|
||||
Map<StreamIdentifier, StreamConfig> currentStreamConfigMap,
|
||||
Function<StreamConfig, ShardSyncTaskManager> shardSyncTaskManagerProvider, boolean isMultiStreamingMode) {
|
||||
Function<StreamConfig, ShardSyncTaskManager> shardSyncTaskManagerProvider, boolean isMultiStreamingMode,
|
||||
MetricsFactory metricsFactory,
|
||||
long leasesRecoveryAuditorExecutionFrequencyMillis,
|
||||
int leasesRecoveryAuditorInconsistencyConfidenceThreshold) {
|
||||
this(workerId, leaderDecider, leaseRefresher, currentStreamConfigMap, shardSyncTaskManagerProvider,
|
||||
Executors.newSingleThreadScheduledExecutor(), isMultiStreamingMode);
|
||||
Executors.newSingleThreadScheduledExecutor(), isMultiStreamingMode, metricsFactory,
|
||||
leasesRecoveryAuditorExecutionFrequencyMillis, leasesRecoveryAuditorInconsistencyConfidenceThreshold);
|
||||
}
|
||||
|
||||
PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, LeaseRefresher leaseRefresher,
|
||||
Map<StreamIdentifier, StreamConfig> currentStreamConfigMap,
|
||||
Function<StreamConfig, ShardSyncTaskManager> shardSyncTaskManagerProvider,
|
||||
ScheduledExecutorService shardSyncThreadPool, boolean isMultiStreamingMode) {
|
||||
ScheduledExecutorService shardSyncThreadPool, boolean isMultiStreamingMode,
|
||||
MetricsFactory metricsFactory,
|
||||
long leasesRecoveryAuditorExecutionFrequencyMillis,
|
||||
int leasesRecoveryAuditorInconsistencyConfidenceThreshold) {
|
||||
Validate.notBlank(workerId, "WorkerID is required to initialize PeriodicShardSyncManager.");
|
||||
Validate.notNull(leaderDecider, "LeaderDecider is required to initialize PeriodicShardSyncManager.");
|
||||
this.workerId = workerId;
|
||||
|
|
@ -105,6 +117,9 @@ class PeriodicShardSyncManager {
|
|||
this.shardSyncTaskManagerProvider = shardSyncTaskManagerProvider;
|
||||
this.shardSyncThreadPool = shardSyncThreadPool;
|
||||
this.isMultiStreamingMode = isMultiStreamingMode;
|
||||
this.metricsFactory = metricsFactory;
|
||||
this.leasesRecoveryAuditorExecutionFrequencyMillis = leasesRecoveryAuditorExecutionFrequencyMillis;
|
||||
this.leasesRecoveryAuditorInconsistencyConfidenceThreshold = leasesRecoveryAuditorInconsistencyConfidenceThreshold;
|
||||
}
|
||||
|
||||
public synchronized TaskResult start() {
|
||||
|
|
@ -116,7 +131,7 @@ class PeriodicShardSyncManager {
|
|||
log.error("Error during runShardSync.", t);
|
||||
}
|
||||
};
|
||||
shardSyncThreadPool.scheduleWithFixedDelay(periodicShardSyncer, INITIAL_DELAY, PERIODIC_SHARD_SYNC_INTERVAL_MILLIS,
|
||||
shardSyncThreadPool.scheduleWithFixedDelay(periodicShardSyncer, INITIAL_DELAY, leasesRecoveryAuditorExecutionFrequencyMillis,
|
||||
TimeUnit.MILLISECONDS);
|
||||
isRunning = true;
|
||||
|
||||
|
|
@ -157,6 +172,14 @@ class PeriodicShardSyncManager {
|
|||
private void runShardSync() {
|
||||
if (leaderDecider.isLeader(workerId)) {
|
||||
log.info(String.format("WorkerId %s is leader, running the periodic shard sync task", workerId));
|
||||
|
||||
final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory,
|
||||
PERIODIC_SHARD_SYNC_MANAGER);
|
||||
int numStreamsWithPartialLeases = 0;
|
||||
int numStreamsToSync = 0;
|
||||
boolean isRunSuccess = false;
|
||||
final long runStartMillis = System.currentTimeMillis();
|
||||
|
||||
try {
|
||||
// Construct the stream to leases map to be used in the lease sync
|
||||
final Map<StreamIdentifier, List<Lease>> streamToLeasesMap = getStreamToLeasesMap(
|
||||
|
|
@ -166,6 +189,10 @@ class PeriodicShardSyncManager {
|
|||
for (Map.Entry<StreamIdentifier, StreamConfig> streamConfigEntry : currentStreamConfigMap.entrySet()) {
|
||||
final ShardSyncResponse shardSyncResponse = checkForShardSync(streamConfigEntry.getKey(),
|
||||
streamToLeasesMap.get(streamConfigEntry.getKey()));
|
||||
|
||||
numStreamsWithPartialLeases += shardSyncResponse.isHoleDetected() ? 1 : 0;
|
||||
numStreamsToSync += shardSyncResponse.shouldDoShardSync ? 1 : 0;
|
||||
|
||||
if (shardSyncResponse.shouldDoShardSync()) {
|
||||
log.info("Periodic shard syncer initiating shard sync for {} due to the reason - {} ",
|
||||
streamConfigEntry.getKey(), shardSyncResponse.reasonForDecision());
|
||||
|
|
@ -181,8 +208,14 @@ class PeriodicShardSyncManager {
|
|||
shardSyncResponse.reasonForDecision());
|
||||
}
|
||||
}
|
||||
isRunSuccess = true;
|
||||
} catch (Exception e) {
|
||||
log.error("Caught exception while running periodic shard syncer.", e);
|
||||
} finally {
|
||||
scope.addData("NumStreamsWithPartialLeases", numStreamsWithPartialLeases, StandardUnit.COUNT, MetricsLevel.SUMMARY);
|
||||
scope.addData("NumStreamsToSync", numStreamsToSync, StandardUnit.COUNT, MetricsLevel.SUMMARY);
|
||||
MetricsUtil.addSuccessAndLatency(scope, isRunSuccess, runStartMillis, MetricsLevel.SUMMARY);
|
||||
scope.end();
|
||||
}
|
||||
} else {
|
||||
log.debug("WorkerId {} is not a leader, not running the shard sync task", workerId);
|
||||
|
|
@ -214,7 +247,7 @@ class PeriodicShardSyncManager {
|
|||
if (CollectionUtils.isNullOrEmpty(leases)) {
|
||||
// If the leases is null or empty then we need to do shard sync
|
||||
log.info("No leases found for {}. Will be triggering shard sync", streamIdentifier);
|
||||
return new ShardSyncResponse(true, "No leases found for " + streamIdentifier);
|
||||
return new ShardSyncResponse(true, false, "No leases found for " + streamIdentifier);
|
||||
}
|
||||
// Check if there are any holes in the leases and return the first hole if present.
|
||||
Optional<HashRangeHole> hashRangeHoleOpt = hasHoleInLeases(streamIdentifier, leases);
|
||||
|
|
@ -227,15 +260,15 @@ class PeriodicShardSyncManager {
|
|||
.computeIfAbsent(streamIdentifier, s -> new HashRangeHoleTracker());
|
||||
final boolean hasHoleWithHighConfidence = hashRangeHoleTracker
|
||||
.hasHighConfidenceOfHoleWith(hashRangeHoleOpt.get());
|
||||
return new ShardSyncResponse(hasHoleWithHighConfidence,
|
||||
return new ShardSyncResponse(hasHoleWithHighConfidence, true,
|
||||
"Detected same hole for " + hashRangeHoleTracker.getNumConsecutiveHoles()
|
||||
+ " times. Shard sync will be initiated when threshold reaches "
|
||||
+ CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY);
|
||||
+ leasesRecoveryAuditorInconsistencyConfidenceThreshold);
|
||||
|
||||
} else {
|
||||
// If hole is not present, clear any previous tracking for this stream and return false;
|
||||
hashRangeHoleTrackerMap.remove(streamIdentifier);
|
||||
return new ShardSyncResponse(false, "Hash Ranges are complete for " + streamIdentifier);
|
||||
return new ShardSyncResponse(false, false, "Hash Ranges are complete for " + streamIdentifier);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -244,6 +277,7 @@ class PeriodicShardSyncManager {
|
|||
@VisibleForTesting
|
||||
static class ShardSyncResponse {
|
||||
private final boolean shouldDoShardSync;
|
||||
private final boolean isHoleDetected;
|
||||
private final String reasonForDecision;
|
||||
}
|
||||
|
||||
|
|
@ -365,7 +399,7 @@ class PeriodicShardSyncManager {
|
|||
private final HashKeyRangeForLease hashRangeAtEndOfPossibleHole;
|
||||
}
|
||||
|
||||
private static class HashRangeHoleTracker {
|
||||
private class HashRangeHoleTracker {
|
||||
private HashRangeHole hashRangeHole;
|
||||
@Getter
|
||||
private Integer numConsecutiveHoles;
|
||||
|
|
@ -377,7 +411,7 @@ class PeriodicShardSyncManager {
|
|||
this.hashRangeHole = hashRangeHole;
|
||||
this.numConsecutiveHoles = 1;
|
||||
}
|
||||
return numConsecutiveHoles >= CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY;
|
||||
return numConsecutiveHoles >= leasesRecoveryAuditorInconsistencyConfidenceThreshold;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -113,6 +113,7 @@ public class Scheduler implements Runnable {
|
|||
private static final long MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 1 * 1000L;
|
||||
private static final long MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 30 * 1000L;
|
||||
private static final long NEW_STREAM_CHECK_INTERVAL_MILLIS = 1 * 60 * 1000L;
|
||||
private static final boolean SHOULD_DO_LEASE_SYNC_FOR_OLD_STREAMS = false;
|
||||
private static final String MULTI_STREAM_TRACKER = "MultiStreamTracker";
|
||||
private static final String ACTIVE_STREAMS_COUNT = "ActiveStreams.Count";
|
||||
private static final String PENDING_STREAMS_DELETION_COUNT = "StreamsPendingDeletion.Count";
|
||||
|
|
@ -290,7 +291,9 @@ public class Scheduler implements Runnable {
|
|||
this.schedulerInitializationBackoffTimeMillis = this.coordinatorConfig.schedulerInitializationBackoffTimeMillis();
|
||||
this.leaderElectedPeriodicShardSyncManager = new PeriodicShardSyncManager(
|
||||
leaseManagementConfig.workerIdentifier(), leaderDecider, leaseRefresher, currentStreamConfigMap,
|
||||
shardSyncTaskManagerProvider, isMultiStreamMode);
|
||||
shardSyncTaskManagerProvider, isMultiStreamMode, metricsFactory,
|
||||
leaseManagementConfig.leasesRecoveryAuditorExecutionFrequencyMillis(),
|
||||
leaseManagementConfig.leasesRecoveryAuditorInconsistencyConfidenceThreshold());
|
||||
this.leaseCleanupManager = this.leaseManagementConfig.leaseManagementFactory(leaseSerializer, isMultiStreamMode)
|
||||
.createLeaseCleanupManager(metricsFactory);
|
||||
}
|
||||
|
|
@ -489,10 +492,9 @@ public class Scheduler implements Runnable {
|
|||
}
|
||||
};
|
||||
|
||||
if (formerStreamsLeasesDeletionStrategy.shouldCleanupLeasesForDeletedStreams()) {
|
||||
if (SHOULD_DO_LEASE_SYNC_FOR_OLD_STREAMS) {
|
||||
// We do lease sync for old streams, before leaving to the deletion strategy to delete leases for
|
||||
// strategy detected leases. Also, for deleted streams we expect the shard sync to remove the
|
||||
// leases.
|
||||
// strategy detected leases.
|
||||
Iterator<StreamIdentifier> currentSetOfStreamsIter = currentStreamConfigMap.keySet().iterator();
|
||||
while (currentSetOfStreamsIter.hasNext()) {
|
||||
StreamIdentifier streamIdentifier = currentSetOfStreamsIter.next();
|
||||
|
|
@ -531,13 +533,13 @@ public class Scheduler implements Runnable {
|
|||
currentStreamConfigMap.keySet().stream().forEach(streamIdentifier -> enqueueStreamLeaseDeletionOperation.accept(streamIdentifier));
|
||||
|
||||
} else if (formerStreamsLeasesDeletionStrategy.leaseDeletionType() == StreamsLeasesDeletionType.PROVIDED_STREAMS_DEFERRED_DELETION) {
|
||||
Optional.ofNullable(formerStreamsLeasesDeletionStrategy.streamIdentifiers()).ifPresent(
|
||||
Optional.ofNullable(formerStreamsLeasesDeletionStrategy.streamIdentifiersForLeaseCleanup()).ifPresent(
|
||||
streamIdentifiers -> streamIdentifiers.stream().forEach(streamIdentifier -> enqueueStreamLeaseDeletionOperation.accept(streamIdentifier)));
|
||||
}
|
||||
|
||||
// Now let's scan the streamIdentifiers eligible for deferred deletion and delete them.
|
||||
// Now let's scan the streamIdentifiersForLeaseCleanup eligible for deferred deletion and delete them.
|
||||
// StreamIdentifiers are eligible for deletion only when the deferment period has elapsed and
|
||||
// the streamIdentifiers are not present in the latest snapshot.
|
||||
// the streamIdentifiersForLeaseCleanup are not present in the latest snapshot.
|
||||
final Map<Boolean, Set<StreamIdentifier>> staleStreamIdDeletionDecisionMap = staleStreamDeletionMap.keySet().stream().collect(Collectors
|
||||
.partitioningBy(streamIdentifier -> newStreamConfigMap.containsKey(streamIdentifier), Collectors.toSet()));
|
||||
final Set<StreamIdentifier> staleStreamIdsToBeDeleted = staleStreamIdDeletionDecisionMap.get(false).stream().filter(streamIdentifier ->
|
||||
|
|
|
|||
|
|
@ -52,6 +52,9 @@ public class LeaseManagementConfig {
|
|||
public static final long DEFAULT_LEASE_CLEANUP_INTERVAL_MILLIS = Duration.ofMinutes(1).toMillis();
|
||||
public static final long DEFAULT_COMPLETED_LEASE_CLEANUP_INTERVAL_MILLIS = Duration.ofMinutes(5).toMillis();
|
||||
public static final long DEFAULT_GARBAGE_LEASE_CLEANUP_INTERVAL_MILLIS = Duration.ofMinutes(30).toMillis();
|
||||
public static final long DEFAULT_PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 2 * 60 * 1000L;
|
||||
public static final int DEFAULT_CONSECUTIVE_HOLES_FOR_TRIGGERING_LEASE_RECOVERY = 3;
|
||||
|
||||
|
||||
public static final LeaseCleanupConfig DEFAULT_LEASE_CLEANUP_CONFIG = LeaseCleanupConfig.builder()
|
||||
.leaseCleanupIntervalMillis(DEFAULT_LEASE_CLEANUP_INTERVAL_MILLIS)
|
||||
|
|
@ -195,6 +198,20 @@ public class LeaseManagementConfig {
|
|||
|
||||
private BillingMode billingMode = BillingMode.PROVISIONED;
|
||||
|
||||
/**
|
||||
* Frequency (in millis) of the auditor job to scan for partial leases in the lease table.
|
||||
* If the auditor detects any hole in the leases for a stream, then it would trigger shard sync based on
|
||||
* {@link #leasesRecoveryAuditorInconsistencyConfidenceThreshold}
|
||||
*/
|
||||
private long leasesRecoveryAuditorExecutionFrequencyMillis = DEFAULT_PERIODIC_SHARD_SYNC_INTERVAL_MILLIS;
|
||||
|
||||
/**
|
||||
* Confidence threshold for the periodic auditor job to determine if leases for a stream in the lease table
|
||||
* is inconsistent. If the auditor finds same set of inconsistencies consecutively for a stream for this many times,
|
||||
* then it would trigger a shard sync.
|
||||
*/
|
||||
private int leasesRecoveryAuditorInconsistencyConfidenceThreshold = DEFAULT_CONSECUTIVE_HOLES_FOR_TRIGGERING_LEASE_RECOVERY;
|
||||
|
||||
/**
|
||||
* The initial position for getting records from Kinesis streams.
|
||||
*
|
||||
|
|
|
|||
|
|
@ -29,7 +29,7 @@ public interface FormerStreamsLeasesDeletionStrategy {
|
|||
* StreamIdentifiers for which leases needs to be cleaned up in the lease table.
|
||||
* @return
|
||||
*/
|
||||
List<StreamIdentifier> streamIdentifiers();
|
||||
List<StreamIdentifier> streamIdentifiersForLeaseCleanup();
|
||||
|
||||
/**
|
||||
* Duration to wait before deleting the leases for this stream.
|
||||
|
|
@ -43,12 +43,6 @@ public interface FormerStreamsLeasesDeletionStrategy {
|
|||
*/
|
||||
StreamsLeasesDeletionType leaseDeletionType();
|
||||
|
||||
/**
|
||||
* Should the leases be cleaned up for deleted streams
|
||||
* @return true if leases be cleaned up for deleted streams; false otherwise.
|
||||
*/
|
||||
boolean shouldCleanupLeasesForDeletedStreams();
|
||||
|
||||
/**
|
||||
* StreamsLeasesDeletionType identifying the different lease cleanup strategies.
|
||||
*/
|
||||
|
|
@ -64,7 +58,7 @@ public interface FormerStreamsLeasesDeletionStrategy {
|
|||
final class NoLeaseDeletionStrategy implements FormerStreamsLeasesDeletionStrategy {
|
||||
|
||||
@Override
|
||||
public final List<StreamIdentifier> streamIdentifiers() {
|
||||
public final List<StreamIdentifier> streamIdentifiersForLeaseCleanup() {
|
||||
throw new UnsupportedOperationException("StreamIdentifiers not required");
|
||||
}
|
||||
|
||||
|
|
@ -77,37 +71,6 @@ public interface FormerStreamsLeasesDeletionStrategy {
|
|||
public final StreamsLeasesDeletionType leaseDeletionType() {
|
||||
return StreamsLeasesDeletionType.NO_STREAMS_LEASES_DELETION;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final boolean shouldCleanupLeasesForDeletedStreams() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Strategy for not cleaning up leases for former streams.
|
||||
*/
|
||||
final class OnlyDeletedStreamsLeasesCleanupStrategy implements FormerStreamsLeasesDeletionStrategy {
|
||||
|
||||
@Override
|
||||
public final List<StreamIdentifier> streamIdentifiers() {
|
||||
throw new UnsupportedOperationException("StreamIdentifiers not required");
|
||||
}
|
||||
|
||||
@Override
|
||||
public final Duration waitPeriodToDeleteFormerStreams() {
|
||||
return Duration.ZERO;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final StreamsLeasesDeletionType leaseDeletionType() {
|
||||
return StreamsLeasesDeletionType.NO_STREAMS_LEASES_DELETION;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final boolean shouldCleanupLeasesForDeletedStreams() {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -117,7 +80,7 @@ public interface FormerStreamsLeasesDeletionStrategy {
|
|||
abstract class AutoDetectionAndDeferredDeletionStrategy implements FormerStreamsLeasesDeletionStrategy {
|
||||
|
||||
@Override
|
||||
public final List<StreamIdentifier> streamIdentifiers() {
|
||||
public final List<StreamIdentifier> streamIdentifiersForLeaseCleanup() {
|
||||
throw new UnsupportedOperationException("StreamIdentifiers not required");
|
||||
}
|
||||
|
||||
|
|
@ -125,15 +88,10 @@ public interface FormerStreamsLeasesDeletionStrategy {
|
|||
public final StreamsLeasesDeletionType leaseDeletionType() {
|
||||
return StreamsLeasesDeletionType.FORMER_STREAMS_AUTO_DETECTION_DEFERRED_DELETION;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean shouldCleanupLeasesForDeletedStreams() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Strategy to detect the streams for deletion through {@link #streamIdentifiers()} provided by customer at runtime
|
||||
* Strategy to detect the streams for deletion through {@link #streamIdentifiersForLeaseCleanup()} provided by customer at runtime
|
||||
* and do deferred deletion based on {@link #waitPeriodToDeleteFormerStreams()}
|
||||
*/
|
||||
abstract class ProvidedStreamsDeferredDeletionStrategy implements FormerStreamsLeasesDeletionStrategy {
|
||||
|
|
@ -142,11 +100,6 @@ public interface FormerStreamsLeasesDeletionStrategy {
|
|||
public final StreamsLeasesDeletionType leaseDeletionType() {
|
||||
return StreamsLeasesDeletionType.PROVIDED_STREAMS_DEFERRED_DELETION;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean shouldCleanupLeasesForDeletedStreams() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -38,9 +38,11 @@ public class FanOutRetrievalFactory implements RetrievalFactory {
|
|||
|
||||
private final KinesisAsyncClient kinesisClient;
|
||||
private final String defaultStreamName;
|
||||
private final String defaultConsumerName;
|
||||
private final String defaultConsumerArn;
|
||||
private final Function<String, String> consumerArnCreator;
|
||||
|
||||
private Map<StreamIdentifier, String> implicitConsumerArnTracker = new HashMap<>();
|
||||
|
||||
@Override
|
||||
public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(final ShardInfo shardInfo,
|
||||
final MetricsFactory metricsFactory) {
|
||||
|
|
@ -52,15 +54,15 @@ public class FanOutRetrievalFactory implements RetrievalFactory {
|
|||
final StreamConfig streamConfig,
|
||||
final MetricsFactory metricsFactory) {
|
||||
final Optional<String> streamIdentifierStr = shardInfo.streamIdentifierSerOpt();
|
||||
final String streamName;
|
||||
if(streamIdentifierStr.isPresent()) {
|
||||
streamName = StreamIdentifier.multiStreamInstance(streamIdentifierStr.get()).streamName();
|
||||
final StreamIdentifier streamIdentifier = StreamIdentifier.multiStreamInstance(streamIdentifierStr.get());
|
||||
return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(),
|
||||
getOrCreateConsumerArn(streamName, streamConfig.consumerArn()),
|
||||
getOrCreateConsumerArn(streamIdentifier, streamConfig.consumerArn()),
|
||||
streamIdentifierStr.get());
|
||||
} else {
|
||||
final StreamIdentifier streamIdentifier = StreamIdentifier.singleStreamInstance(defaultStreamName);
|
||||
return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(),
|
||||
getOrCreateConsumerArn(defaultStreamName, defaultConsumerName));
|
||||
getOrCreateConsumerArn(streamIdentifier, defaultConsumerArn));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -69,7 +71,8 @@ public class FanOutRetrievalFactory implements RetrievalFactory {
|
|||
throw new UnsupportedOperationException("FanoutRetrievalFactory needs StreamConfig Info");
|
||||
}
|
||||
|
||||
private String getOrCreateConsumerArn(String streamName, String consumerArn) {
|
||||
return consumerArn != null ? consumerArn : consumerArnCreator.apply(streamName);
|
||||
private String getOrCreateConsumerArn(StreamIdentifier streamIdentifier, String consumerArn) {
|
||||
return consumerArn != null ? consumerArn : implicitConsumerArnTracker
|
||||
.computeIfAbsent(streamIdentifier, sId -> consumerArnCreator.apply(sId.streamName()));
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -34,6 +34,7 @@ import software.amazon.kinesis.leases.LeaseRefresher;
|
|||
import software.amazon.kinesis.leases.MultiStreamLease;
|
||||
import software.amazon.kinesis.leases.ShardDetector;
|
||||
import software.amazon.kinesis.leases.ShardSyncTaskManager;
|
||||
import software.amazon.kinesis.metrics.NullMetricsFactory;
|
||||
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
|
||||
|
||||
import java.math.BigInteger;
|
||||
|
|
@ -49,9 +50,9 @@ import static org.mockito.Matchers.any;
|
|||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
import static software.amazon.kinesis.common.HashKeyRangeForLease.deserialize;
|
||||
import static software.amazon.kinesis.coordinator.PeriodicShardSyncManager.CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY;
|
||||
import static software.amazon.kinesis.coordinator.PeriodicShardSyncManager.MAX_HASH_KEY;
|
||||
import static software.amazon.kinesis.coordinator.PeriodicShardSyncManager.MIN_HASH_KEY;
|
||||
import static software.amazon.kinesis.leases.LeaseManagementConfig.DEFAULT_CONSECUTIVE_HOLES_FOR_TRIGGERING_LEASE_RECOVERY;
|
||||
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
|
||||
|
|
@ -72,7 +73,7 @@ public class PeriodicShardSyncManagerTest {
|
|||
public void setup() {
|
||||
streamIdentifier = StreamIdentifier.multiStreamInstance("123:stream:456");
|
||||
periodicShardSyncManager = new PeriodicShardSyncManager("worker", leaderDecider, leaseRefresher, currentStreamConfigMap,
|
||||
shardSyncTaskManagerProvider, true);
|
||||
shardSyncTaskManagerProvider, true, new NullMetricsFactory(), 2 * 60 * 1000, 3);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -173,7 +174,7 @@ public class PeriodicShardSyncManagerTest {
|
|||
lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
|
||||
return lease;
|
||||
}).collect(Collectors.toList());
|
||||
IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert
|
||||
IntStream.range(1, DEFAULT_CONSECUTIVE_HOLES_FOR_TRIGGERING_LEASE_RECOVERY).forEach(i -> Assert
|
||||
.assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()));
|
||||
}
|
||||
|
||||
|
|
@ -191,7 +192,7 @@ public class PeriodicShardSyncManagerTest {
|
|||
lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
|
||||
return lease;
|
||||
}).collect(Collectors.toList());
|
||||
IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert
|
||||
IntStream.range(1, DEFAULT_CONSECUTIVE_HOLES_FOR_TRIGGERING_LEASE_RECOVERY).forEach(i -> Assert
|
||||
.assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()));
|
||||
Assert.assertTrue(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync());
|
||||
}
|
||||
|
|
@ -214,7 +215,7 @@ public class PeriodicShardSyncManagerTest {
|
|||
}
|
||||
return lease;
|
||||
}).collect(Collectors.toList());
|
||||
IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert
|
||||
IntStream.range(1, DEFAULT_CONSECUTIVE_HOLES_FOR_TRIGGERING_LEASE_RECOVERY).forEach(i -> Assert
|
||||
.assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()));
|
||||
Assert.assertTrue(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync());
|
||||
}
|
||||
|
|
@ -233,7 +234,7 @@ public class PeriodicShardSyncManagerTest {
|
|||
lease.checkpoint(ExtendedSequenceNumber.SHARD_END);
|
||||
return lease;
|
||||
}).collect(Collectors.toList());
|
||||
IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert
|
||||
IntStream.range(1, DEFAULT_CONSECUTIVE_HOLES_FOR_TRIGGERING_LEASE_RECOVERY).forEach(i -> Assert
|
||||
.assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()));
|
||||
Assert.assertTrue(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync());
|
||||
}
|
||||
|
|
@ -252,7 +253,7 @@ public class PeriodicShardSyncManagerTest {
|
|||
lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
|
||||
return lease;
|
||||
}).collect(Collectors.toList());
|
||||
IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert
|
||||
IntStream.range(1, DEFAULT_CONSECUTIVE_HOLES_FOR_TRIGGERING_LEASE_RECOVERY).forEach(i -> Assert
|
||||
.assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()));
|
||||
List<Lease> multiStreamLeases2 = new ArrayList<HashKeyRangeForLease>() {{
|
||||
add(deserialize(MIN_HASH_KEY.toString(), "1"));
|
||||
|
|
@ -267,7 +268,7 @@ public class PeriodicShardSyncManagerTest {
|
|||
return lease;
|
||||
}).collect(Collectors.toList());
|
||||
// Resetting the holes
|
||||
IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert
|
||||
IntStream.range(1, DEFAULT_CONSECUTIVE_HOLES_FOR_TRIGGERING_LEASE_RECOVERY).forEach(i -> Assert
|
||||
.assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases2).shouldDoShardSync()));
|
||||
Assert.assertTrue(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases2).shouldDoShardSync());
|
||||
}
|
||||
|
|
@ -286,7 +287,7 @@ public class PeriodicShardSyncManagerTest {
|
|||
lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
|
||||
return lease;
|
||||
}).collect(Collectors.toList());
|
||||
IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert
|
||||
IntStream.range(1, DEFAULT_CONSECUTIVE_HOLES_FOR_TRIGGERING_LEASE_RECOVERY).forEach(i -> Assert
|
||||
.assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()));
|
||||
List<Lease> multiStreamLeases2 = new ArrayList<HashKeyRangeForLease>() {{
|
||||
add(deserialize(MIN_HASH_KEY.toString(), "1"));
|
||||
|
|
@ -301,10 +302,10 @@ public class PeriodicShardSyncManagerTest {
|
|||
return lease;
|
||||
}).collect(Collectors.toList());
|
||||
// Resetting the holes
|
||||
IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert
|
||||
IntStream.range(1, DEFAULT_CONSECUTIVE_HOLES_FOR_TRIGGERING_LEASE_RECOVERY).forEach(i -> Assert
|
||||
.assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases2).shouldDoShardSync()));
|
||||
// Resetting the holes
|
||||
IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert
|
||||
IntStream.range(1, DEFAULT_CONSECUTIVE_HOLES_FOR_TRIGGERING_LEASE_RECOVERY).forEach(i -> Assert
|
||||
.assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()));
|
||||
Assert.assertTrue(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync());
|
||||
}
|
||||
|
|
@ -347,7 +348,7 @@ public class PeriodicShardSyncManagerTest {
|
|||
}).collect(Collectors.toList());
|
||||
|
||||
// Assert that shard sync should never trigger
|
||||
IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert
|
||||
IntStream.range(1, DEFAULT_CONSECUTIVE_HOLES_FOR_TRIGGERING_LEASE_RECOVERY).forEach(i -> Assert
|
||||
.assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()));
|
||||
Assert.assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync());
|
||||
|
||||
|
|
@ -395,7 +396,7 @@ public class PeriodicShardSyncManagerTest {
|
|||
}).collect(Collectors.toList());
|
||||
|
||||
// Assert that shard sync should never trigger
|
||||
IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert
|
||||
IntStream.range(1, DEFAULT_CONSECUTIVE_HOLES_FOR_TRIGGERING_LEASE_RECOVERY).forEach(i -> Assert
|
||||
.assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()));
|
||||
Assert.assertTrue(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync());
|
||||
|
||||
|
|
|
|||
|
|
@ -482,7 +482,7 @@ public class SchedulerTest {
|
|||
public final void testMultiStreamStaleStreamsAreNotDeletedImmediatelyProvidedListStrategy()
|
||||
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
|
||||
when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new ProvidedStreamsDeferredDeletionStrategy() {
|
||||
@Override public List<StreamIdentifier> streamIdentifiers() {
|
||||
@Override public List<StreamIdentifier> streamIdentifiersForLeaseCleanup() {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
|
@ -497,7 +497,7 @@ public class SchedulerTest {
|
|||
public final void testMultiStreamStaleStreamsAreNotDeletedImmediatelyProvidedListStrategy2()
|
||||
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
|
||||
when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new ProvidedStreamsDeferredDeletionStrategy() {
|
||||
@Override public List<StreamIdentifier> streamIdentifiers() {
|
||||
@Override public List<StreamIdentifier> streamIdentifiersForLeaseCleanup() {
|
||||
return IntStream.range(1, 3).mapToObj(streamId -> StreamIdentifier.multiStreamInstance(
|
||||
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))).collect(
|
||||
Collectors.toCollection(ArrayList::new));
|
||||
|
|
@ -555,7 +555,7 @@ public class SchedulerTest {
|
|||
public final void testMultiStreamStaleStreamsAreDeletedAfterDefermentPeriodWithProvidedListStrategy()
|
||||
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
|
||||
when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new ProvidedStreamsDeferredDeletionStrategy() {
|
||||
@Override public List<StreamIdentifier> streamIdentifiers() {
|
||||
@Override public List<StreamIdentifier> streamIdentifiersForLeaseCleanup() {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
|
@ -575,7 +575,7 @@ public class SchedulerTest {
|
|||
public final void testMultiStreamStaleStreamsAreDeletedAfterDefermentPeriodWithProvidedListStrategy2()
|
||||
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
|
||||
when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new ProvidedStreamsDeferredDeletionStrategy() {
|
||||
@Override public List<StreamIdentifier> streamIdentifiers() {
|
||||
@Override public List<StreamIdentifier> streamIdentifiersForLeaseCleanup() {
|
||||
return IntStream.range(1, 3).mapToObj(streamId -> StreamIdentifier.multiStreamInstance(
|
||||
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))).collect(
|
||||
Collectors.toCollection(ArrayList::new));
|
||||
|
|
@ -639,7 +639,7 @@ public class SchedulerTest {
|
|||
public final void testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediatelyWithProvidedListStrategy()
|
||||
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
|
||||
when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new ProvidedStreamsDeferredDeletionStrategy() {
|
||||
@Override public List<StreamIdentifier> streamIdentifiers() {
|
||||
@Override public List<StreamIdentifier> streamIdentifiersForLeaseCleanup() {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
|
@ -654,7 +654,7 @@ public class SchedulerTest {
|
|||
public final void testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediatelyWithProvidedListStrategy2()
|
||||
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
|
||||
when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new ProvidedStreamsDeferredDeletionStrategy() {
|
||||
@Override public List<StreamIdentifier> streamIdentifiers() {
|
||||
@Override public List<StreamIdentifier> streamIdentifiersForLeaseCleanup() {
|
||||
return IntStream.range(1, 3)
|
||||
.mapToObj(streamId -> StreamIdentifier.multiStreamInstance(
|
||||
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)))
|
||||
|
|
|
|||
Loading…
Reference in a new issue