Exposing interface method to set the wait time for old stream deletion. Adding license details to new classes
This commit is contained in:
parent
45387bfd74
commit
596e3ee797
8 changed files with 79 additions and 17 deletions
|
|
@ -1,3 +1,18 @@
|
|||
/*
|
||||
* Copyright 2020 Amazon.com, Inc. or its affiliates.
|
||||
* Licensed under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package software.amazon.kinesis.common;
|
||||
|
||||
import lombok.experimental.Accessors;
|
||||
|
|
|
|||
|
|
@ -1,3 +1,18 @@
|
|||
/*
|
||||
* Copyright 2020 Amazon.com, Inc. or its affiliates.
|
||||
* Licensed under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package software.amazon.kinesis.common;
|
||||
|
||||
import lombok.Value;
|
||||
|
|
|
|||
|
|
@ -1,3 +1,18 @@
|
|||
/*
|
||||
* Copyright 2020 Amazon.com, Inc. or its affiliates.
|
||||
* Licensed under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package software.amazon.kinesis.common;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
|
|
|
|||
|
|
@ -106,7 +106,6 @@ public class Scheduler implements Runnable {
|
|||
private static final long MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 30 * 1000L;
|
||||
private static final long HASH_RANGE_COVERAGE_CHECK_FREQUENCY_MILLIS = 5000L;
|
||||
private static final long NEW_STREAM_CHECK_INTERVAL_MILLIS = 1 * 60 * 1000L;
|
||||
private static final long OLD_STREAM_DEFERRED_DELETION_PERIOD_MILLIS = 1 * 60 * 60 * 1000L;
|
||||
|
||||
private SchedulerLog slog = new SchedulerLog();
|
||||
|
||||
|
|
@ -273,7 +272,7 @@ public class Scheduler implements Runnable {
|
|||
this.shardDetectorProvider = streamConfig -> createOrGetShardSyncTaskManager(streamConfig).shardDetector();
|
||||
this.ignoreUnexpetedChildShards = this.leaseManagementConfig.ignoreUnexpectedChildShards();
|
||||
this.aggregatorUtil = this.lifecycleConfig.aggregatorUtil();
|
||||
// TODO : Halo : Check if this needs to be per stream.
|
||||
// TODO : LTR : Check if this needs to be per stream.
|
||||
this.hierarchicalShardSyncer = leaseManagementConfig.hierarchicalShardSyncer(isMultiStreamMode);
|
||||
this.schedulerInitializationBackoffTimeMillis = this.coordinatorConfig.schedulerInitializationBackoffTimeMillis();
|
||||
this.leaderElectedPeriodicShardSyncManager = new PeriodicShardSyncManager(
|
||||
|
|
@ -459,6 +458,8 @@ public class Scheduler implements Runnable {
|
|||
|
||||
if (shouldSyncStreamsNow()) {
|
||||
final Map<StreamIdentifier, StreamConfig> newStreamConfigMap = new HashMap<>();
|
||||
final long waitPeriodToDeleteOldStreamsMillis = multiStreamTracker.waitPeriodToDeleteOldStreams()
|
||||
.toMillis();
|
||||
// Making an immutable copy
|
||||
newStreamConfigMap.putAll(multiStreamTracker.streamConfigList().stream()
|
||||
.collect(Collectors.toMap(sc -> sc.streamIdentifier(), sc -> sc)));
|
||||
|
|
@ -524,7 +525,7 @@ public class Scheduler implements Runnable {
|
|||
final Set<StreamIdentifier> staleStreamIdsToBeDeleted = staleStreamIdDeletionDecisionMap.get(false).stream()
|
||||
.filter(streamIdentifier ->
|
||||
Duration.between(staleStreamDeletionMap.get(streamIdentifier), Instant.now()).toMillis()
|
||||
>= getOldStreamDeferredDeletionPeriodMillis()).collect(Collectors.toSet());
|
||||
>= waitPeriodToDeleteOldStreamsMillis).collect(Collectors.toSet());
|
||||
streamsSynced.addAll(deleteMultiStreamLeases(staleStreamIdsToBeDeleted));
|
||||
|
||||
// Purge the active streams from stale streams list.
|
||||
|
|
@ -536,11 +537,6 @@ public class Scheduler implements Runnable {
|
|||
return streamsSynced;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
long getOldStreamDeferredDeletionPeriodMillis() {
|
||||
return OLD_STREAM_DEFERRED_DELETION_PERIOD_MILLIS;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
boolean shouldSyncStreamsNow() {
|
||||
return isMultiStreamMode && (streamSyncWatch.elapsed(TimeUnit.MILLISECONDS) > NEW_STREAM_CHECK_INTERVAL_MILLIS);
|
||||
|
|
|
|||
|
|
@ -1,20 +1,43 @@
|
|||
/*
|
||||
* Copyright 2020 Amazon.com, Inc. or its affiliates.
|
||||
* Licensed under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package software.amazon.kinesis.processor;
|
||||
|
||||
import software.amazon.kinesis.common.StreamConfig;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Interface for stream trackers. This is useful for KCL Workers that need
|
||||
* to consume data from multiple streams.
|
||||
* KCL will periodically probe this interface to learn about the new and old streams.
|
||||
*/
|
||||
public interface MultiStreamTracker {
|
||||
|
||||
/**
|
||||
* Returns the list of stream config, to be processed by the current application.
|
||||
* Note that this method will be called periodically called by the KCL to learn about the new and old streams.
|
||||
*
|
||||
* @return List of StreamConfig
|
||||
*/
|
||||
List<StreamConfig> streamConfigList();
|
||||
|
||||
/**
|
||||
* Duration to wait before deleting the old streams in the lease table.
|
||||
* @return Wait time before deleting old streams
|
||||
*/
|
||||
Duration waitPeriodToDeleteOldStreams();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -85,7 +85,7 @@ public class FanOutConfig implements RetrievalSpecificConfig {
|
|||
return new FanOutRetrievalFactory(kinesisClient, streamName, this::getOrCreateConsumerArn);
|
||||
}
|
||||
|
||||
// TODO : Halo. Need Stream Specific ConsumerArn to be passed from Customer
|
||||
// TODO : LTR. Need Stream Specific ConsumerArn to be passed from Customer
|
||||
private String getOrCreateConsumerArn(String streamName) {
|
||||
if (consumerArn != null) {
|
||||
return consumerArn;
|
||||
|
|
|
|||
|
|
@ -327,7 +327,7 @@ public class KinesisDataFetcher implements DataFetcher {
|
|||
} catch (ExecutionException e) {
|
||||
throw exceptionManager.apply(e.getCause());
|
||||
} catch (InterruptedException e) {
|
||||
// TODO: Check behaviorF
|
||||
// TODO: Check behavior
|
||||
log.debug("{} : Interrupt called on method, shutdown initiated", streamAndShardId);
|
||||
throw new RuntimeException(e);
|
||||
} catch (TimeoutException e) {
|
||||
|
|
|
|||
|
|
@ -36,7 +36,7 @@ import static org.mockito.Mockito.verify;
|
|||
import static org.mockito.Mockito.when;
|
||||
import static org.mockito.internal.verification.VerificationModeFactory.atMost;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
|
|
@ -48,7 +48,6 @@ import java.util.Set;
|
|||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.collect.Sets;
|
||||
|
|
@ -64,8 +63,6 @@ import org.mockito.runners.MockitoJUnitRunner;
|
|||
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
|
||||
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
|
||||
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
|
||||
import software.amazon.awssdk.utils.Either;
|
||||
import software.amazon.awssdk.utils.Validate;
|
||||
import software.amazon.kinesis.checkpoint.Checkpoint;
|
||||
import software.amazon.kinesis.checkpoint.CheckpointConfig;
|
||||
import software.amazon.kinesis.checkpoint.CheckpointFactory;
|
||||
|
|
@ -184,6 +181,7 @@ public class SchedulerTest {
|
|||
}};
|
||||
|
||||
when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList);
|
||||
when(multiStreamTracker.waitPeriodToDeleteOldStreams()).thenReturn(Duration.ofHours(1L));
|
||||
when(leaseCoordinator.leaseRefresher()).thenReturn(dynamoDBLeaseRefresher);
|
||||
when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector);
|
||||
when(shardSyncTaskManager.executeShardSyncTask()).thenReturn(new TaskResult(null));
|
||||
|
|
@ -492,7 +490,7 @@ public class SchedulerTest {
|
|||
scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
|
||||
metricsConfig, processorConfig, retrievalConfig));
|
||||
when(scheduler.shouldSyncStreamsNow()).thenReturn(true);
|
||||
when(scheduler.getOldStreamDeferredDeletionPeriodMillis()).thenReturn(0L);
|
||||
when(multiStreamTracker.waitPeriodToDeleteOldStreams()).thenReturn(Duration.ZERO);
|
||||
Set<StreamIdentifier> syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases();
|
||||
Set<StreamIdentifier> expectedSyncedStreams = IntStream.range(1, 3).mapToObj(streamId -> StreamIdentifier.multiStreamInstance(
|
||||
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))).collect(
|
||||
|
|
@ -563,7 +561,7 @@ public class SchedulerTest {
|
|||
scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
|
||||
metricsConfig, processorConfig, retrievalConfig));
|
||||
when(scheduler.shouldSyncStreamsNow()).thenReturn(true);
|
||||
when(scheduler.getOldStreamDeferredDeletionPeriodMillis()).thenReturn(0L);
|
||||
when(multiStreamTracker.waitPeriodToDeleteOldStreams()).thenReturn(Duration.ZERO);
|
||||
Set<StreamIdentifier> syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases();
|
||||
Set<StreamIdentifier> expectedSyncedStreams = IntStream.concat(IntStream.range(1, 3), IntStream.range(5, 7))
|
||||
.mapToObj(streamId -> StreamIdentifier.multiStreamInstance(
|
||||
|
|
|
|||
Loading…
Reference in a new issue