Changes for adding stream as dimension in multistream mode. Changes for deferred cleanup of stale stream leases to free up record processors.
This commit is contained in:
parent
3de44dc4eb
commit
f51657f6f7
7 changed files with 221 additions and 31 deletions
|
|
@ -19,6 +19,9 @@ import com.google.common.annotations.VisibleForTesting;
|
|||
import com.google.common.base.Stopwatch;
|
||||
|
||||
import io.reactivex.plugins.RxJavaPlugins;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
|
|
@ -40,6 +43,7 @@ import java.util.concurrent.TimeUnit;
|
|||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import lombok.AccessLevel;
|
||||
import lombok.Data;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.NonNull;
|
||||
|
|
@ -103,6 +107,7 @@ public class Scheduler implements Runnable {
|
|||
private static final long MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 30 * 1000L;
|
||||
private static final long HASH_RANGE_COVERAGE_CHECK_FREQUENCY_MILLIS = 5000L;
|
||||
private static final long NEW_STREAM_CHECK_INTERVAL_MILLIS = 1 * 60 * 1000L;
|
||||
private static final long OLD_STREAM_DEFERRED_DELETION_PERIOD_MILLIS = 1 * 60 * 60 * 1000L;
|
||||
|
||||
private SchedulerLog slog = new SchedulerLog();
|
||||
|
||||
|
|
@ -138,7 +143,6 @@ public class Scheduler implements Runnable {
|
|||
private final long failoverTimeMillis;
|
||||
private final long taskBackoffTimeMillis;
|
||||
private final boolean isMultiStreamMode;
|
||||
// TODO : halo : make sure we generate streamConfig if entry not present.
|
||||
private final Map<StreamIdentifier, StreamConfig> currentStreamConfigMap;
|
||||
private MultiStreamTracker multiStreamTracker;
|
||||
private final long listShardsBackoffTimeMillis;
|
||||
|
|
@ -150,6 +154,7 @@ public class Scheduler implements Runnable {
|
|||
private final HierarchicalShardSyncer hierarchicalShardSyncer;
|
||||
private final long schedulerInitializationBackoffTimeMillis;
|
||||
private final LeaderDecider leaderDecider;
|
||||
private final Map<StreamIdentifier, Instant> staleStreamDeletionMap = new HashMap<>();
|
||||
|
||||
// Holds consumers for shards the worker is currently tracking. Key is shard
|
||||
// info, value is ShardConsumer.
|
||||
|
|
@ -239,8 +244,6 @@ public class Scheduler implements Runnable {
|
|||
this.executorService = this.coordinatorConfig.coordinatorFactory().createExecutorService();
|
||||
this.diagnosticEventFactory = diagnosticEventFactory;
|
||||
this.diagnosticEventHandler = new DiagnosticEventLogger();
|
||||
// TODO : Halo : Handle case of no StreamConfig present in streamConfigList() for the supplied streamName.
|
||||
// TODO : Pass the immutable map here instead of using mst.streamConfigList()
|
||||
this.shardSyncTaskManagerProvider = streamConfig -> this.leaseManagementConfig
|
||||
.leaseManagementFactory(leaseSerializer, isMultiStreamMode)
|
||||
.createShardSyncTaskManager(this.metricsFactory, streamConfig);
|
||||
|
|
@ -461,9 +464,16 @@ public class Scheduler implements Runnable {
|
|||
newStreamConfigMap.putAll(multiStreamTracker.streamConfigList().stream()
|
||||
.collect(Collectors.toMap(sc -> sc.streamIdentifier(), sc -> sc)));
|
||||
|
||||
// This is done to ensure that we clean up the stale streams lingering in the lease table.
|
||||
syncStreamsFromLeaseTableOnAppInit();
|
||||
List<MultiStreamLease> leases;
|
||||
|
||||
// This is done to ensure that we clean up the stale streams lingering in the lease table.
|
||||
if (!leasesSyncedOnAppInit && isMultiStreamMode) {
|
||||
leases = fetchMultiStreamLeases();
|
||||
syncStreamsFromLeaseTableOnAppInit(leases);
|
||||
leasesSyncedOnAppInit = true;
|
||||
}
|
||||
|
||||
// For new streams discovered, do a shard sync and update the currentStreamConfigMap
|
||||
for (StreamIdentifier streamIdentifier : newStreamConfigMap.keySet()) {
|
||||
if (!currentStreamConfigMap.containsKey(streamIdentifier)) {
|
||||
log.info("Found new stream to process: " + streamIdentifier + ". Syncing shards of that stream.");
|
||||
|
|
@ -478,41 +488,117 @@ public class Scheduler implements Runnable {
|
|||
}
|
||||
}
|
||||
|
||||
// TODO: Remove assumption that each Worker gets the full list of streams
|
||||
// Now, we are identifying the stale/old streams and enqueuing it for deferred deletion.
|
||||
// It is assumed that all the workers will always have the latest and consistent snapshot of streams
|
||||
// from the multiStreamTracker.
|
||||
//
|
||||
// The following streams transition state among two workers are NOT considered safe, where Worker 2, on
|
||||
// initialization learn about D from lease table and delete the leases for D, as it is not available
|
||||
// in its latest MultiStreamTracker.
|
||||
// Worker 1 : A,B,C -> A,B,C,D (latest)
|
||||
// Worker 2 : BOOTS_UP -> A,B,C (stale)
|
||||
//
|
||||
// The following streams transition state among two workers are NOT considered safe, where Worker 2 might
|
||||
// end up deleting the leases for A and D and loose progress made so far.
|
||||
// Worker 1 : A,B,C -> A,B,C,D (latest)
|
||||
// Worker 2 : A,B,C -> B,C (stale/partial)
|
||||
//
|
||||
// In order to give workers with stale stream info, sufficient time to learn about the new streams
|
||||
// before attempting to delete it, we will be deferring the leases deletion based on the
|
||||
// defer time period.
|
||||
|
||||
Iterator<StreamIdentifier> currentStreamConfigIter = currentStreamConfigMap.keySet().iterator();
|
||||
while (currentStreamConfigIter.hasNext()) {
|
||||
StreamIdentifier streamIdentifier = currentStreamConfigIter.next();
|
||||
if (!newStreamConfigMap.containsKey(streamIdentifier)) {
|
||||
log.info("Found old/deleted stream: " + streamIdentifier + ". Syncing shards of that stream.");
|
||||
ShardSyncTaskManager shardSyncTaskManager = createOrGetShardSyncTaskManager(currentStreamConfigMap.get(streamIdentifier));
|
||||
shardSyncTaskManager.syncShardAndLeaseInfo();
|
||||
currentStreamConfigIter.remove();
|
||||
streamsSynced.add(streamIdentifier);
|
||||
staleStreamDeletionMap.putIfAbsent(streamIdentifier, Instant.now());
|
||||
}
|
||||
}
|
||||
|
||||
// Now let's scan the streamIdentifiers eligible for deferred deletion and delete them.
|
||||
// StreamIdentifiers are eligible for deletion only when the deferment period has elapsed and
|
||||
// the streamIdentifiers are not present in the latest snapshot.
|
||||
final Set<StreamIdentifier> streamIdsToBeDeleted = staleStreamDeletionMap.keySet().stream()
|
||||
.filter(streamIdentifier ->
|
||||
Duration.between(staleStreamDeletionMap.get(streamIdentifier), Instant.now()).toMillis()
|
||||
>= getOldStreamDeferredDeletionPeriodMillis() &&
|
||||
!newStreamConfigMap.containsKey(streamIdentifier))
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
streamsSynced.addAll(deleteMultiStreamLeases(streamIdsToBeDeleted));
|
||||
|
||||
streamSyncWatch.reset().start();
|
||||
}
|
||||
return streamsSynced;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
long getOldStreamDeferredDeletionPeriodMillis() {
|
||||
return OLD_STREAM_DEFERRED_DELETION_PERIOD_MILLIS;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
boolean shouldSyncStreamsNow() {
|
||||
return isMultiStreamMode && (streamSyncWatch.elapsed(TimeUnit.MILLISECONDS) > NEW_STREAM_CHECK_INTERVAL_MILLIS);
|
||||
}
|
||||
|
||||
private void syncStreamsFromLeaseTableOnAppInit()
|
||||
private void syncStreamsFromLeaseTableOnAppInit(List<MultiStreamLease> leases) {
|
||||
final Set<StreamIdentifier> streamIdentifiers = leases.stream()
|
||||
.map(lease -> StreamIdentifier.multiStreamInstance(lease.streamIdentifier()))
|
||||
.collect(Collectors.toSet());
|
||||
for (StreamIdentifier streamIdentifier : streamIdentifiers) {
|
||||
if (!currentStreamConfigMap.containsKey(streamIdentifier)) {
|
||||
currentStreamConfigMap.put(streamIdentifier, getDefaultStreamConfig(streamIdentifier));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private List<MultiStreamLease> fetchMultiStreamLeases()
|
||||
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
|
||||
if (!leasesSyncedOnAppInit && isMultiStreamMode) {
|
||||
final Set<StreamIdentifier> streamIdentifiers = leaseCoordinator.leaseRefresher().listLeases().stream()
|
||||
.map(lease -> StreamIdentifier.multiStreamInstance(((MultiStreamLease) lease).streamIdentifier()))
|
||||
.collect(Collectors.toSet());
|
||||
for (StreamIdentifier streamIdentifier : streamIdentifiers) {
|
||||
if (!currentStreamConfigMap.containsKey(streamIdentifier)) {
|
||||
currentStreamConfigMap.put(streamIdentifier, getDefaultStreamConfig(streamIdentifier));
|
||||
return (List<MultiStreamLease>) ((List) leaseCoordinator.leaseRefresher().listLeases());
|
||||
}
|
||||
|
||||
private Set<StreamIdentifier> deleteMultiStreamLeases(Set<StreamIdentifier> streamIdentifiers)
|
||||
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
|
||||
final Set<StreamIdentifier> streamsSynced = new HashSet<>();
|
||||
List<MultiStreamLease> leases = null;
|
||||
Map<String, List<MultiStreamLease>> streamIdToShardsMap = null;
|
||||
for(StreamIdentifier streamIdentifier : streamIdentifiers) {
|
||||
if (leases == null) {
|
||||
// Lazy Load once and use many times for this iteration.
|
||||
leases = fetchMultiStreamLeases();
|
||||
}
|
||||
if (streamIdToShardsMap == null) {
|
||||
// Lazy load once and use many times for this iteration.
|
||||
streamIdToShardsMap = leases.stream().collect(Collectors
|
||||
.groupingBy(MultiStreamLease::streamIdentifier,
|
||||
Collectors.toCollection(ArrayList::new)));
|
||||
}
|
||||
log.warn("Found old/deleted stream: " + streamIdentifier + ". Deleting leases of this stream.");
|
||||
// Deleting leases will cause the workers to shutdown the record processors for these shards.
|
||||
if (deleteMultiStreamLeases(streamIdToShardsMap.get(streamIdentifier.serialize()))) {
|
||||
currentStreamConfigMap.remove(streamIdentifier);
|
||||
staleStreamDeletionMap.remove(streamIdentifier);
|
||||
streamsSynced.add(streamIdentifier);
|
||||
}
|
||||
}
|
||||
return streamsSynced;
|
||||
}
|
||||
|
||||
private boolean deleteMultiStreamLeases(List<MultiStreamLease> leases) {
|
||||
if (leases != null) {
|
||||
for (MultiStreamLease lease : leases) {
|
||||
try {
|
||||
leaseRefresher.deleteLease(lease);
|
||||
} catch (DependencyException | InvalidStateException | ProvisionedThroughputException e) {
|
||||
log.error(
|
||||
"Unable to delete stale stream lease {}. Skipping further deletions for this stream. Will retry later.",
|
||||
lease.leaseKey(), e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
leasesSyncedOnAppInit = true;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
// When a stream is no longer needed to be tracked, return a default StreamConfig with LATEST for faster shard end.
|
||||
|
|
@ -549,6 +635,7 @@ public class Scheduler implements Runnable {
|
|||
* Requests a graceful shutdown of the worker, notifying record processors, that implement
|
||||
* {@link ShutdownNotificationAware}, of the impending shutdown. This gives the record processor a final chance to
|
||||
* checkpoint.
|
||||
*
|
||||
* This will only create a single shutdown future. Additional attempts to start a graceful shutdown will return the
|
||||
* previous future.
|
||||
*
|
||||
|
|
@ -575,8 +662,8 @@ public class Scheduler implements Runnable {
|
|||
* </ol>
|
||||
*
|
||||
* @return a future that will be set once the shutdown has completed. True indicates that the graceful shutdown
|
||||
* completed successfully. A false value indicates that a non-exception case caused the shutdown process to
|
||||
* terminate early.
|
||||
* completed successfully. A false value indicates that a non-exception case caused the shutdown process to
|
||||
* terminate early.
|
||||
*/
|
||||
public Future<Boolean> startGracefulShutdown() {
|
||||
synchronized (this) {
|
||||
|
|
@ -593,8 +680,9 @@ public class Scheduler implements Runnable {
|
|||
* shutdowns in your own executor, or execute the shutdown synchronously.
|
||||
*
|
||||
* @return a callable that run the graceful shutdown process. This may return a callable that return true if the
|
||||
* graceful shutdown has already been completed.
|
||||
* @throws IllegalStateException thrown by the callable if another callable has already started the shutdown process.
|
||||
* graceful shutdown has already been completed.
|
||||
* @throws IllegalStateException
|
||||
* thrown by the callable if another callable has already started the shutdown process.
|
||||
*/
|
||||
public Callable<Boolean> createGracefulShutdownCallable() {
|
||||
if (shutdownComplete()) {
|
||||
|
|
@ -736,7 +824,8 @@ public class Scheduler implements Runnable {
|
|||
/**
|
||||
* NOTE: This method is internal/private to the Worker class. It has package access solely for testing.
|
||||
*
|
||||
* @param shardInfo Kinesis shard info
|
||||
* @param shardInfo
|
||||
* Kinesis shard info
|
||||
* @return ShardConsumer for the shard
|
||||
*/
|
||||
ShardConsumer createOrGetShardConsumer(@NonNull final ShardInfo shardInfo,
|
||||
|
|
@ -764,7 +853,7 @@ public class Scheduler implements Runnable {
|
|||
@NonNull final ShardRecordProcessorFactory shardRecordProcessorFactory) {
|
||||
RecordsPublisher cache = retrievalConfig.retrievalFactory().createGetRecordsCache(shardInfo, metricsFactory);
|
||||
ShardRecordProcessorCheckpointer checkpointer = coordinatorConfig.coordinatorFactory().createRecordProcessorCheckpointer(shardInfo,
|
||||
checkpoint);
|
||||
checkpoint);
|
||||
// The only case where streamName is not available will be when multistreamtracker not set. In this case,
|
||||
// get the default stream name for the single stream application.
|
||||
final StreamIdentifier streamIdentifier = getStreamIdentifier(shardInfo.streamIdentifierSerOpt());
|
||||
|
|
@ -801,6 +890,7 @@ public class Scheduler implements Runnable {
|
|||
|
||||
/**
|
||||
* NOTE: This method is internal/private to the Worker class. It has package access solely for testing.
|
||||
*
|
||||
* This method relies on ShardInfo.equals() method returning true for ShardInfo objects which may have been
|
||||
* instantiated with parentShardIds in a different order (and rest of the fields being the equal). For example
|
||||
* shardInfo1.equals(shardInfo2) should return true with shardInfo1 and shardInfo2 defined as follows. ShardInfo
|
||||
|
|
|
|||
|
|
@ -36,9 +36,11 @@ import lombok.RequiredArgsConstructor;
|
|||
import lombok.extern.slf4j.Slf4j;
|
||||
import software.amazon.awssdk.services.cloudwatch.model.StandardUnit;
|
||||
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
|
||||
import software.amazon.kinesis.common.StreamIdentifier;
|
||||
import software.amazon.kinesis.leases.Lease;
|
||||
import software.amazon.kinesis.leases.LeaseRefresher;
|
||||
import software.amazon.kinesis.leases.LeaseRenewer;
|
||||
import software.amazon.kinesis.leases.MultiStreamLease;
|
||||
import software.amazon.kinesis.leases.exceptions.DependencyException;
|
||||
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
|
||||
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
|
||||
|
|
@ -297,6 +299,10 @@ public class DynamoDBLeaseRenewer implements LeaseRenewer {
|
|||
|
||||
final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, operation);
|
||||
if (StringUtils.isNotEmpty(shardId)) {
|
||||
if(lease instanceof MultiStreamLease) {
|
||||
MetricsUtil.addStreamId(scope,
|
||||
StreamIdentifier.multiStreamInstance(((MultiStreamLease) lease).streamIdentifier()));
|
||||
}
|
||||
MetricsUtil.addShardId(scope, shardId);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -23,6 +23,7 @@ import software.amazon.awssdk.services.cloudwatch.model.StandardUnit;
|
|||
import software.amazon.awssdk.services.kinesis.model.Shard;
|
||||
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
|
||||
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
|
||||
import software.amazon.kinesis.common.StreamIdentifier;
|
||||
import software.amazon.kinesis.leases.ShardDetector;
|
||||
import software.amazon.kinesis.leases.ShardInfo;
|
||||
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
|
||||
|
|
@ -109,6 +110,8 @@ public class ProcessTask implements ConsumerTask {
|
|||
@Override
|
||||
public TaskResult call() {
|
||||
final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, PROCESS_TASK_OPERATION);
|
||||
shardInfo.streamIdentifierSerOpt()
|
||||
.ifPresent(streamId -> MetricsUtil.addStreamId(scope, StreamIdentifier.multiStreamInstance(streamId)));
|
||||
MetricsUtil.addShardId(scope, shardInfo.shardId());
|
||||
long startTimeMillis = System.currentTimeMillis();
|
||||
boolean success = false;
|
||||
|
|
@ -197,6 +200,8 @@ public class ProcessTask implements ConsumerTask {
|
|||
.checkpointer(recordProcessorCheckpointer).millisBehindLatest(input.millisBehindLatest()).build();
|
||||
|
||||
final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, PROCESS_TASK_OPERATION);
|
||||
shardInfo.streamIdentifierSerOpt()
|
||||
.ifPresent(streamId -> MetricsUtil.addStreamId(scope, StreamIdentifier.multiStreamInstance(streamId)));
|
||||
MetricsUtil.addShardId(scope, shardInfo.shardId());
|
||||
final long startTime = System.currentTimeMillis();
|
||||
try {
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ import org.apache.commons.lang3.StringUtils;
|
|||
|
||||
import lombok.NonNull;
|
||||
import software.amazon.awssdk.services.cloudwatch.model.StandardUnit;
|
||||
import software.amazon.kinesis.common.StreamIdentifier;
|
||||
|
||||
/**
|
||||
*
|
||||
|
|
@ -26,6 +27,7 @@ import software.amazon.awssdk.services.cloudwatch.model.StandardUnit;
|
|||
public class MetricsUtil {
|
||||
public static final String OPERATION_DIMENSION_NAME = "Operation";
|
||||
public static final String SHARD_ID_DIMENSION_NAME = "ShardId";
|
||||
public static final String STREAM_IDENTIFIER = "StreamId";
|
||||
private static final String WORKER_IDENTIFIER_DIMENSION = "WorkerIdentifier";
|
||||
private static final String TIME_METRIC = "Time";
|
||||
private static final String SUCCESS_METRIC = "Success";
|
||||
|
|
@ -51,6 +53,11 @@ public class MetricsUtil {
|
|||
addOperation(metricsScope, SHARD_ID_DIMENSION_NAME, shardId);
|
||||
}
|
||||
|
||||
public static void addStreamId(@NonNull final MetricsScope metricsScope, @NonNull final StreamIdentifier streamId) {
|
||||
streamId.accountIdOptional()
|
||||
.ifPresent(acc -> addOperation(metricsScope, STREAM_IDENTIFIER, streamId.serialize()));
|
||||
}
|
||||
|
||||
public static void addWorkerIdentifier(@NonNull final MetricsScope metricsScope,
|
||||
@NonNull final String workerIdentifier) {
|
||||
addOperation(metricsScope, WORKER_IDENTIFIER_DIMENSION, workerIdentifier);
|
||||
|
|
|
|||
|
|
@ -232,6 +232,7 @@ public class KinesisDataFetcher implements DataFetcher {
|
|||
|
||||
// TODO: Check if this metric is fine to be added
|
||||
final MetricsScope metricsScope = MetricsUtil.createMetricsWithOperation(metricsFactory, OPERATION);
|
||||
MetricsUtil.addStreamId(metricsScope, streamIdentifier);
|
||||
MetricsUtil.addShardId(metricsScope, shardId);
|
||||
boolean success = false;
|
||||
long startTime = System.currentTimeMillis();
|
||||
|
|
@ -315,6 +316,7 @@ public class KinesisDataFetcher implements DataFetcher {
|
|||
GetRecordsRequest request = getGetRecordsRequest(nextIterator);
|
||||
|
||||
final MetricsScope metricsScope = MetricsUtil.createMetricsWithOperation(metricsFactory, OPERATION);
|
||||
MetricsUtil.addStreamId(metricsScope, streamIdentifier);
|
||||
MetricsUtil.addShardId(metricsScope, shardId);
|
||||
boolean success = false ;
|
||||
long startTime = System.currentTimeMillis();
|
||||
|
|
@ -325,7 +327,7 @@ public class KinesisDataFetcher implements DataFetcher {
|
|||
} catch (ExecutionException e) {
|
||||
throw exceptionManager.apply(e.getCause());
|
||||
} catch (InterruptedException e) {
|
||||
// TODO: Check behavior
|
||||
// TODO: Check behaviorF
|
||||
log.debug("{} : Interrupt called on method, shutdown initiated", streamAndShardId);
|
||||
throw new RuntimeException(e);
|
||||
} catch (TimeoutException e) {
|
||||
|
|
|
|||
|
|
@ -45,6 +45,7 @@ import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
|
|||
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
|
||||
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
|
||||
import software.amazon.kinesis.common.RequestDetails;
|
||||
import software.amazon.kinesis.common.StreamIdentifier;
|
||||
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
|
||||
import software.amazon.kinesis.metrics.MetricsFactory;
|
||||
import software.amazon.kinesis.metrics.MetricsLevel;
|
||||
|
|
@ -88,6 +89,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
|
|||
private final DefaultGetRecordsCacheDaemon defaultGetRecordsCacheDaemon;
|
||||
private boolean started = false;
|
||||
private final String operation;
|
||||
private final StreamIdentifier streamId;
|
||||
private final String streamAndShardId;
|
||||
private Subscriber<? super RecordsRetrieved> subscriber;
|
||||
@VisibleForTesting @Getter
|
||||
|
|
@ -219,8 +221,8 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
|
|||
this.defaultGetRecordsCacheDaemon = new DefaultGetRecordsCacheDaemon();
|
||||
Validate.notEmpty(operation, "Operation cannot be empty");
|
||||
this.operation = operation;
|
||||
this.streamAndShardId =
|
||||
this.getRecordsRetrievalStrategy.dataFetcher().getStreamIdentifier().serialize() + ":" + shardId;
|
||||
this.streamId = this.getRecordsRetrievalStrategy.dataFetcher().getStreamIdentifier();
|
||||
this.streamAndShardId = this.streamId.serialize() + ":" + shardId;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -453,6 +455,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
|
|||
log.info("{} : records threw ExpiredIteratorException - restarting"
|
||||
+ " after greatest seqNum passed to customer", streamAndShardId, e);
|
||||
|
||||
MetricsUtil.addStreamId(scope, streamId);
|
||||
scope.addData(EXPIRED_ITERATOR_METRIC, 1, StandardUnit.COUNT, MetricsLevel.SUMMARY);
|
||||
|
||||
publisherSession.dataFetcher().restartIterator();
|
||||
|
|
|
|||
|
|
@ -36,6 +36,7 @@ import static org.mockito.Mockito.verify;
|
|||
import static org.mockito.Mockito.when;
|
||||
import static org.mockito.internal.verification.VerificationModeFactory.atMost;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
|
|
@ -443,7 +444,7 @@ public class SchedulerTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public final void testMultiStreamOnlyStaleStreamsAreSynced()
|
||||
public final void testMultiStreamStaleStreamsAreNotDeletedImmediately()
|
||||
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
|
||||
List<StreamConfig> streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig(
|
||||
StreamIdentifier.multiStreamInstance(
|
||||
|
|
@ -462,16 +463,49 @@ public class SchedulerTest {
|
|||
metricsConfig, processorConfig, retrievalConfig));
|
||||
when(scheduler.shouldSyncStreamsNow()).thenReturn(true);
|
||||
Set<StreamIdentifier> syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases();
|
||||
Set<StreamIdentifier> expectedPendingStreams = IntStream.range(1, 3).mapToObj(streamId -> StreamIdentifier.multiStreamInstance(
|
||||
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))).collect(
|
||||
Collectors.toCollection(HashSet::new));
|
||||
Assert.assertEquals(Sets.newHashSet(), syncedStreams);
|
||||
Assert.assertEquals(Sets.newHashSet(streamConfigList1),
|
||||
Sets.newHashSet(scheduler.currentStreamConfigMap().values()));
|
||||
Assert.assertEquals(expectedPendingStreams,
|
||||
scheduler.staleStreamDeletionMap().keySet());
|
||||
}
|
||||
|
||||
@Test
|
||||
public final void testMultiStreamStaleStreamsAreDeletedAfterDefermentPeriod()
|
||||
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
|
||||
List<StreamConfig> streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig(
|
||||
StreamIdentifier.multiStreamInstance(
|
||||
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)),
|
||||
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)))
|
||||
.collect(Collectors.toCollection(LinkedList::new));
|
||||
List<StreamConfig> streamConfigList2 = IntStream.range(3, 5).mapToObj(streamId -> new StreamConfig(
|
||||
StreamIdentifier.multiStreamInstance(
|
||||
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)),
|
||||
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)))
|
||||
.collect(Collectors.toCollection(LinkedList::new));
|
||||
retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName)
|
||||
.retrievalFactory(retrievalFactory);
|
||||
when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList1, streamConfigList2);
|
||||
scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
|
||||
metricsConfig, processorConfig, retrievalConfig));
|
||||
when(scheduler.shouldSyncStreamsNow()).thenReturn(true);
|
||||
when(scheduler.getOldStreamDeferredDeletionPeriodMillis()).thenReturn(0L);
|
||||
Set<StreamIdentifier> syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases();
|
||||
Set<StreamIdentifier> expectedSyncedStreams = IntStream.range(1, 3).mapToObj(streamId -> StreamIdentifier.multiStreamInstance(
|
||||
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))).collect(
|
||||
Collectors.toCollection(HashSet::new));
|
||||
Assert.assertEquals(expectedSyncedStreams, syncedStreams);
|
||||
Assert.assertEquals(Sets.newHashSet(streamConfigList2),
|
||||
Sets.newHashSet(scheduler.currentStreamConfigMap().values()));
|
||||
Assert.assertEquals(Sets.newHashSet(),
|
||||
scheduler.staleStreamDeletionMap().keySet());
|
||||
}
|
||||
|
||||
@Test
|
||||
public final void testMultiStreamSyncOnlyNewAndStaleStreamsAreSynced()
|
||||
public final void testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediately()
|
||||
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
|
||||
List<StreamConfig> streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig(
|
||||
StreamIdentifier.multiStreamInstance(
|
||||
|
|
@ -490,6 +524,47 @@ public class SchedulerTest {
|
|||
metricsConfig, processorConfig, retrievalConfig));
|
||||
when(scheduler.shouldSyncStreamsNow()).thenReturn(true);
|
||||
Set<StreamIdentifier> syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases();
|
||||
Set<StreamIdentifier> expectedSyncedStreams = IntStream.range(5, 7)
|
||||
.mapToObj(streamId -> StreamIdentifier.multiStreamInstance(
|
||||
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)))
|
||||
.collect(Collectors.toCollection(HashSet::new));
|
||||
Set<StreamIdentifier> expectedPendingStreams = IntStream.range(1, 3)
|
||||
.mapToObj(streamId -> StreamIdentifier.multiStreamInstance(
|
||||
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)))
|
||||
.collect(Collectors.toCollection(HashSet::new));
|
||||
Assert.assertEquals(expectedSyncedStreams, syncedStreams);
|
||||
List<StreamConfig> expectedCurrentStreamConfigs = IntStream.range(1, 7).mapToObj(streamId -> new StreamConfig(
|
||||
StreamIdentifier.multiStreamInstance(
|
||||
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)),
|
||||
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)))
|
||||
.collect(Collectors.toCollection(LinkedList::new));
|
||||
Assert.assertEquals(Sets.newHashSet(expectedCurrentStreamConfigs),
|
||||
Sets.newHashSet(scheduler.currentStreamConfigMap().values()));
|
||||
Assert.assertEquals(expectedPendingStreams,
|
||||
scheduler.staleStreamDeletionMap().keySet());
|
||||
}
|
||||
|
||||
@Test
|
||||
public final void testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreDeletedAfterDefermentPeriod()
|
||||
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
|
||||
List<StreamConfig> streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig(
|
||||
StreamIdentifier.multiStreamInstance(
|
||||
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)),
|
||||
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)))
|
||||
.collect(Collectors.toCollection(LinkedList::new));
|
||||
List<StreamConfig> streamConfigList2 = IntStream.range(3, 7).mapToObj(streamId -> new StreamConfig(
|
||||
StreamIdentifier.multiStreamInstance(
|
||||
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)),
|
||||
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)))
|
||||
.collect(Collectors.toCollection(LinkedList::new));
|
||||
retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName)
|
||||
.retrievalFactory(retrievalFactory);
|
||||
when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList1, streamConfigList2);
|
||||
scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
|
||||
metricsConfig, processorConfig, retrievalConfig));
|
||||
when(scheduler.shouldSyncStreamsNow()).thenReturn(true);
|
||||
when(scheduler.getOldStreamDeferredDeletionPeriodMillis()).thenReturn(0L);
|
||||
Set<StreamIdentifier> syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases();
|
||||
Set<StreamIdentifier> expectedSyncedStreams = IntStream.concat(IntStream.range(1, 3), IntStream.range(5, 7))
|
||||
.mapToObj(streamId -> StreamIdentifier.multiStreamInstance(
|
||||
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)))
|
||||
|
|
@ -497,6 +572,8 @@ public class SchedulerTest {
|
|||
Assert.assertEquals(expectedSyncedStreams, syncedStreams);
|
||||
Assert.assertEquals(Sets.newHashSet(streamConfigList2),
|
||||
Sets.newHashSet(scheduler.currentStreamConfigMap().values()));
|
||||
Assert.assertEquals(Sets.newHashSet(),
|
||||
scheduler.staleStreamDeletionMap().keySet());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
|||
Loading…
Reference in a new issue