diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/RequestDetails.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/RequestDetails.java index ca14155e..9f511123 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/RequestDetails.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/RequestDetails.java @@ -1,3 +1,18 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package software.amazon.kinesis.common; import lombok.experimental.Accessors; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamConfig.java index 999182b6..8856a4a0 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamConfig.java @@ -1,3 +1,18 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package software.amazon.kinesis.common; import lombok.Value; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java index f4cbac29..7a416c7a 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java @@ -1,3 +1,18 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package software.amazon.kinesis.common; import com.google.common.base.Joiner; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index 5a486a0d..22be7fc7 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -106,7 +106,6 @@ public class Scheduler implements Runnable { private static final long MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 30 * 1000L; private static final long HASH_RANGE_COVERAGE_CHECK_FREQUENCY_MILLIS = 5000L; private static final long NEW_STREAM_CHECK_INTERVAL_MILLIS = 1 * 60 * 1000L; - private static final long OLD_STREAM_DEFERRED_DELETION_PERIOD_MILLIS = 1 * 60 * 60 * 1000L; private SchedulerLog slog = new SchedulerLog(); @@ -273,7 +272,7 @@ public class Scheduler implements Runnable { this.shardDetectorProvider = streamConfig -> createOrGetShardSyncTaskManager(streamConfig).shardDetector(); this.ignoreUnexpetedChildShards = this.leaseManagementConfig.ignoreUnexpectedChildShards(); this.aggregatorUtil = this.lifecycleConfig.aggregatorUtil(); - // TODO : Halo : Check if this needs to be per stream. + // TODO : LTR : Check if this needs to be per stream. this.hierarchicalShardSyncer = leaseManagementConfig.hierarchicalShardSyncer(isMultiStreamMode); this.schedulerInitializationBackoffTimeMillis = this.coordinatorConfig.schedulerInitializationBackoffTimeMillis(); this.leaderElectedPeriodicShardSyncManager = new PeriodicShardSyncManager( @@ -459,6 +458,8 @@ public class Scheduler implements Runnable { if (shouldSyncStreamsNow()) { final Map newStreamConfigMap = new HashMap<>(); + final long waitPeriodToDeleteOldStreamsMillis = multiStreamTracker.waitPeriodToDeleteOldStreams() + .toMillis(); // Making an immutable copy newStreamConfigMap.putAll(multiStreamTracker.streamConfigList().stream() .collect(Collectors.toMap(sc -> sc.streamIdentifier(), sc -> sc))); @@ -524,7 +525,7 @@ public class Scheduler implements Runnable { final Set staleStreamIdsToBeDeleted = staleStreamIdDeletionDecisionMap.get(false).stream() .filter(streamIdentifier -> Duration.between(staleStreamDeletionMap.get(streamIdentifier), Instant.now()).toMillis() - >= getOldStreamDeferredDeletionPeriodMillis()).collect(Collectors.toSet()); + >= waitPeriodToDeleteOldStreamsMillis).collect(Collectors.toSet()); streamsSynced.addAll(deleteMultiStreamLeases(staleStreamIdsToBeDeleted)); // Purge the active streams from stale streams list. @@ -536,11 +537,6 @@ public class Scheduler implements Runnable { return streamsSynced; } - @VisibleForTesting - long getOldStreamDeferredDeletionPeriodMillis() { - return OLD_STREAM_DEFERRED_DELETION_PERIOD_MILLIS; - } - @VisibleForTesting boolean shouldSyncStreamsNow() { return isMultiStreamMode && (streamSyncWatch.elapsed(TimeUnit.MILLISECONDS) > NEW_STREAM_CHECK_INTERVAL_MILLIS); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/MultiStreamTracker.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/MultiStreamTracker.java index 171687bc..785778db 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/MultiStreamTracker.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/MultiStreamTracker.java @@ -1,20 +1,43 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package software.amazon.kinesis.processor; import software.amazon.kinesis.common.StreamConfig; +import java.time.Duration; import java.util.List; -import java.util.Map; /** * Interface for stream trackers. This is useful for KCL Workers that need * to consume data from multiple streams. + * KCL will periodically probe this interface to learn about the new and old streams. */ public interface MultiStreamTracker { /** * Returns the list of stream config, to be processed by the current application. + * Note that this method will be called periodically called by the KCL to learn about the new and old streams. * * @return List of StreamConfig */ List streamConfigList(); + + /** + * Duration to wait before deleting the old streams in the lease table. + * @return Wait time before deleting old streams + */ + Duration waitPeriodToDeleteOldStreams(); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutConfig.java index cbbcb483..fafe7e18 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutConfig.java @@ -85,7 +85,7 @@ public class FanOutConfig implements RetrievalSpecificConfig { return new FanOutRetrievalFactory(kinesisClient, streamName, this::getOrCreateConsumerArn); } - // TODO : Halo. Need Stream Specific ConsumerArn to be passed from Customer + // TODO : LTR. Need Stream Specific ConsumerArn to be passed from Customer private String getOrCreateConsumerArn(String streamName) { if (consumerArn != null) { return consumerArn; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java index 0b7df88d..3af3dcf5 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java @@ -327,7 +327,7 @@ public class KinesisDataFetcher implements DataFetcher { } catch (ExecutionException e) { throw exceptionManager.apply(e.getCause()); } catch (InterruptedException e) { - // TODO: Check behaviorF + // TODO: Check behavior log.debug("{} : Interrupt called on method, shutdown initiated", streamAndShardId); throw new RuntimeException(e); } catch (TimeoutException e) { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java index 96cec37a..740cbbef 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java @@ -36,7 +36,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.mockito.internal.verification.VerificationModeFactory.atMost; -import java.time.Instant; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -48,7 +48,6 @@ import java.util.Set; import java.util.concurrent.RejectedExecutionException; import java.util.stream.Collectors; import java.util.stream.IntStream; -import java.util.stream.Stream; import com.google.common.base.Joiner; import com.google.common.collect.Sets; @@ -64,8 +63,6 @@ import org.mockito.runners.MockitoJUnitRunner; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; -import software.amazon.awssdk.utils.Either; -import software.amazon.awssdk.utils.Validate; import software.amazon.kinesis.checkpoint.Checkpoint; import software.amazon.kinesis.checkpoint.CheckpointConfig; import software.amazon.kinesis.checkpoint.CheckpointFactory; @@ -184,6 +181,7 @@ public class SchedulerTest { }}; when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList); + when(multiStreamTracker.waitPeriodToDeleteOldStreams()).thenReturn(Duration.ofHours(1L)); when(leaseCoordinator.leaseRefresher()).thenReturn(dynamoDBLeaseRefresher); when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector); when(shardSyncTaskManager.executeShardSyncTask()).thenReturn(new TaskResult(null)); @@ -492,7 +490,7 @@ public class SchedulerTest { scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, metricsConfig, processorConfig, retrievalConfig)); when(scheduler.shouldSyncStreamsNow()).thenReturn(true); - when(scheduler.getOldStreamDeferredDeletionPeriodMillis()).thenReturn(0L); + when(multiStreamTracker.waitPeriodToDeleteOldStreams()).thenReturn(Duration.ZERO); Set syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases(); Set expectedSyncedStreams = IntStream.range(1, 3).mapToObj(streamId -> StreamIdentifier.multiStreamInstance( Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))).collect( @@ -563,7 +561,7 @@ public class SchedulerTest { scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, metricsConfig, processorConfig, retrievalConfig)); when(scheduler.shouldSyncStreamsNow()).thenReturn(true); - when(scheduler.getOldStreamDeferredDeletionPeriodMillis()).thenReturn(0L); + when(multiStreamTracker.waitPeriodToDeleteOldStreams()).thenReturn(Duration.ZERO); Set syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases(); Set expectedSyncedStreams = IntStream.concat(IntStream.range(1, 3), IntStream.range(5, 7)) .mapToObj(streamId -> StreamIdentifier.multiStreamInstance(