From 0b267037ea54965890bf5ce1d611760a083d61e8 Mon Sep 17 00:00:00 2001 From: Sahil Palvia Date: Wed, 8 Aug 2018 11:04:29 -0700 Subject: [PATCH] Refactoring repo (#358) * Introducing internal annotation to provided implementations * Deleting unused Task classes --- .../DoesNothingPreparedCheckpointer.java | 2 ++ .../dynamodb/DynamoDBCheckpointFactory.java | 2 ++ .../dynamodb/DynamoDBCheckpointer.java | 2 ++ .../common/KinesisRequestsBuilder.java | 2 ++ .../SchedulerCoordinatorFactory.java | 2 ++ .../amazon/kinesis/leases/DynamoUtils.java | 2 ++ .../kinesis/leases/KinesisShardDetector.java | 2 ++ .../amazon/kinesis/leases/ShardSyncTask.java | 2 ++ .../dynamodb/DynamoDBLeaseCoordinator.java | 2 ++ .../DynamoDBLeaseManagementFactory.java | 2 ++ .../dynamodb/DynamoDBLeaseRefresher.java | 2 ++ .../leases/dynamodb/DynamoDBLeaseRenewer.java | 2 ++ .../dynamodb/DynamoDBLeaseSerializer.java | 2 ++ .../leases/dynamodb/DynamoDBLeaseTaker.java | 2 ++ .../lifecycle/BlockOnParentShardTask.java | 2 ++ .../amazon/kinesis/lifecycle/ProcessTask.java | 5 ---- .../lifecycle/ShardConsumerArgument.java | 2 ++ .../ShardConsumerShutdownNotification.java | 2 ++ .../lifecycle/TaskCompletedListener.java | 25 ------------------- .../amazon/kinesis/lifecycle/TaskFailed.java | 22 ---------------- .../kinesis/lifecycle/TaskFailedListener.java | 20 --------------- .../lifecycle/TaskFailureHandling.java | 19 -------------- .../MetricsCollectingTaskDecorator.java | 2 ++ .../retrieval/AWSExceptionManager.java | 2 ++ .../kinesis/retrieval/IteratorBuilder.java | 2 ++ .../fanout/FanOutConsumerRegistration.java | 2 ++ .../fanout/FanOutRecordsPublisher.java | 2 ++ .../fanout/FanOutRetrievalFactory.java | 2 ++ ...ynchronousGetRecordsRetrievalStrategy.java | 2 ++ .../polling/BlockingRecordsPublisher.java | 2 ++ .../polling/PrefetchRecordsPublisher.java | 2 ++ .../polling/SimpleRecordsFetcherFactory.java | 2 ++ .../SynchronousBlockingRetrievalFactory.java | 2 ++ ...ynchronousGetRecordsRetrievalStrategy.java | 2 ++ ...ynchronousPrefetchingRetrievalFactory.java | 2 ++ 35 files changed, 60 insertions(+), 91 deletions(-) delete mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/TaskCompletedListener.java delete mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/TaskFailed.java delete mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/TaskFailedListener.java delete mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/TaskFailureHandling.java diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/DoesNothingPreparedCheckpointer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/DoesNothingPreparedCheckpointer.java index c27dee7a..be211204 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/DoesNothingPreparedCheckpointer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/DoesNothingPreparedCheckpointer.java @@ -14,6 +14,7 @@ */ package software.amazon.kinesis.checkpoint; +import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.KinesisClientLibDependencyException; import software.amazon.kinesis.exceptions.ShutdownException; @@ -32,6 +33,7 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; * initialized, processes 0 records, then calls prepareCheckpoint(). The value in the table is the same, so there's * no reason to overwrite it with another copy of itself. */ +@KinesisClientInternalApi public class DoesNothingPreparedCheckpointer implements PreparedCheckpointer { private final ExtendedSequenceNumber sequenceNumber; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/dynamodb/DynamoDBCheckpointFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/dynamodb/DynamoDBCheckpointFactory.java index 1f7d4531..f0ba08e8 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/dynamodb/DynamoDBCheckpointFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/dynamodb/DynamoDBCheckpointFactory.java @@ -16,6 +16,7 @@ package software.amazon.kinesis.checkpoint.dynamodb; import lombok.Data; +import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.checkpoint.CheckpointFactory; import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseRefresher; @@ -25,6 +26,7 @@ import software.amazon.kinesis.processor.Checkpointer; * */ @Data +@KinesisClientInternalApi public class DynamoDBCheckpointFactory implements CheckpointFactory { @Override public Checkpointer createCheckpointer(final LeaseCoordinator leaseLeaseCoordinator, diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/dynamodb/DynamoDBCheckpointer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/dynamodb/DynamoDBCheckpointer.java index 15d9dd8f..171002e4 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/dynamodb/DynamoDBCheckpointer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/dynamodb/DynamoDBCheckpointer.java @@ -23,6 +23,7 @@ import com.google.common.annotations.VisibleForTesting; import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.checkpoint.Checkpoint; import software.amazon.kinesis.exceptions.KinesisClientLibDependencyException; import software.amazon.kinesis.exceptions.KinesisClientLibException; @@ -43,6 +44,7 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; */ @RequiredArgsConstructor @Slf4j +@KinesisClientInternalApi public class DynamoDBCheckpointer implements Checkpointer { @NonNull private final LeaseCoordinator leaseCoordinator; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/KinesisRequestsBuilder.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/KinesisRequestsBuilder.java index 13a593c7..4a384c89 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/KinesisRequestsBuilder.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/KinesisRequestsBuilder.java @@ -25,11 +25,13 @@ import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; import software.amazon.awssdk.services.kinesis.model.ListShardsRequest; import software.amazon.awssdk.services.kinesis.model.RegisterStreamConsumerRequest; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; +import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.retrieval.RetrievalConfig; /** * */ +@KinesisClientInternalApi public class KinesisRequestsBuilder { public static ListShardsRequest.Builder listShardsRequestBuilder() { return appendUserAgent(ListShardsRequest.builder()); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/SchedulerCoordinatorFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/SchedulerCoordinatorFactory.java index 2e90e558..f3a2b14b 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/SchedulerCoordinatorFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/SchedulerCoordinatorFactory.java @@ -25,6 +25,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.Data; import lombok.NonNull; +import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.processor.Checkpointer; @@ -33,6 +34,7 @@ import software.amazon.kinesis.processor.Checkpointer; * */ @Data +@KinesisClientInternalApi public class SchedulerCoordinatorFactory implements CoordinatorFactory { @Override public ExecutorService createExecutorService() { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/DynamoUtils.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/DynamoUtils.java index c878abce..31bbadbe 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/DynamoUtils.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/DynamoUtils.java @@ -15,6 +15,7 @@ package software.amazon.kinesis.leases; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; +import software.amazon.kinesis.annotations.KinesisClientInternalApi; import java.util.ArrayList; import java.util.Collection; @@ -24,6 +25,7 @@ import java.util.Map; /** * Static utility functions used by our LeaseSerializers. */ +@KinesisClientInternalApi public class DynamoUtils { public static AttributeValue createAttributeValue(Collection collectionValue) { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java index 20572648..b4938cc1 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java @@ -43,6 +43,7 @@ import software.amazon.awssdk.services.kinesis.model.ListShardsResponse; import software.amazon.awssdk.services.kinesis.model.ResourceInUseException; import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.awssdk.utils.CollectionUtils; +import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.common.KinesisRequestsBuilder; import software.amazon.kinesis.retrieval.AWSExceptionManager; @@ -52,6 +53,7 @@ import software.amazon.kinesis.retrieval.AWSExceptionManager; @RequiredArgsConstructor @Slf4j @Accessors(fluent = true) +@KinesisClientInternalApi public class KinesisShardDetector implements ShardDetector { @NonNull private final KinesisAsyncClient kinesisClient; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java index c8347e44..dd98206a 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java @@ -17,6 +17,7 @@ package software.amazon.kinesis.leases; import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.lifecycle.ConsumerTask; import software.amazon.kinesis.lifecycle.TaskResult; @@ -33,6 +34,7 @@ import software.amazon.kinesis.metrics.MetricsUtil; */ @RequiredArgsConstructor @Slf4j +@KinesisClientInternalApi public class ShardSyncTask implements ConsumerTask { private final String SHARD_SYNC_TASK_OPERATION = "ShardSyncTask"; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java index ace235e1..2ae70669 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java @@ -32,6 +32,7 @@ import java.util.stream.Collectors; import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.extern.slf4j.Slf4j; +import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseRefresher; @@ -54,6 +55,7 @@ import software.amazon.kinesis.metrics.MetricsUtil; * */ @Slf4j +@KinesisClientInternalApi public class DynamoDBLeaseCoordinator implements LeaseCoordinator { // Time to wait for in-flight Runnables to finish when calling .stop(); private static final long STOP_WAIT_TIME_MILLIS = 2000L; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java index b4a7e27a..cce48b07 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java @@ -21,6 +21,7 @@ import lombok.Data; import lombok.NonNull; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.leases.KinesisShardDetector; import software.amazon.kinesis.leases.LeaseCoordinator; @@ -33,6 +34,7 @@ import software.amazon.kinesis.metrics.MetricsFactory; * */ @Data +@KinesisClientInternalApi public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { @NonNull private final KinesisAsyncClient kinesisClient; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java index 87f497c9..ae687d77 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java @@ -45,6 +45,7 @@ import software.amazon.awssdk.services.dynamodb.model.ScanResponse; import software.amazon.awssdk.services.dynamodb.model.TableStatus; import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest; import software.amazon.awssdk.utils.CollectionUtils; +import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.LeaseSerializer; @@ -59,6 +60,7 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; */ @AllArgsConstructor @Slf4j +@KinesisClientInternalApi public class DynamoDBLeaseRefresher implements LeaseRefresher { protected final String table; protected final DynamoDbAsyncClient dynamoDBClient; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewer.java index 36de99cd..6c0a8ae5 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewer.java @@ -35,6 +35,7 @@ import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import software.amazon.awssdk.services.cloudwatch.model.StandardUnit; +import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.LeaseRenewer; @@ -50,6 +51,7 @@ import software.amazon.kinesis.metrics.MetricsUtil; * An implementation of {@link LeaseRenewer} that uses DynamoDB via {@link LeaseRefresher}. */ @Slf4j +@KinesisClientInternalApi public class DynamoDBLeaseRenewer implements LeaseRenewer { private static final int RENEWAL_RETRIES = 2; private static final String RENEW_ALL_LEASES_DIMENSION = "RenewAllLeases"; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java index 1626bfeb..4b58a429 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java @@ -30,6 +30,7 @@ import software.amazon.awssdk.services.dynamodb.model.ExpectedAttributeValue; import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement; import software.amazon.awssdk.services.dynamodb.model.KeyType; import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType; +import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.leases.DynamoUtils; import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.LeaseSerializer; @@ -39,6 +40,7 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; * An implementation of ILeaseSerializer for basic Lease objects. Can also instantiate subclasses of Lease so that * LeaseSerializer can be decorated by other classes if you need to add fields to leases. */ +@KinesisClientInternalApi public class DynamoDBLeaseSerializer implements LeaseSerializer { private static final String LEASE_KEY_KEY = "leaseKey"; private static final String LEASE_OWNER_KEY = "leaseOwner"; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java index 8cd36ce0..2d52fe2a 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java @@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import software.amazon.awssdk.services.cloudwatch.model.StandardUnit; +import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.LeaseTaker; @@ -43,6 +44,7 @@ import software.amazon.kinesis.metrics.MetricsUtil; * An implementation of {@link LeaseTaker} that uses DynamoDB via {@link LeaseRefresher}. */ @Slf4j +@KinesisClientInternalApi public class DynamoDBLeaseTaker implements LeaseTaker { private static final int TAKE_RETRIES = 3; private static final int SCAN_RETRIES = 1; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTask.java index e7ac032b..e14b111a 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTask.java @@ -18,6 +18,7 @@ import lombok.AccessLevel; import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.exceptions.internal.BlockedOnParentShardException; import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.LeaseRefresher; @@ -34,6 +35,7 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; */ @RequiredArgsConstructor(access = AccessLevel.PACKAGE) @Slf4j +@KinesisClientInternalApi // TODO: Check for non null values public class BlockOnParentShardTask implements ConsumerTask { @NonNull diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java index f7597512..f9cdd2ac 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java @@ -61,8 +61,6 @@ public class ProcessTask implements ConsumerTask { private final MetricsFactory metricsFactory; private final AggregatorUtil aggregatorUtil; - private TaskCompletedListener listener; - public ProcessTask(@NonNull ShardInfo shardInfo, @NonNull ShardRecordProcessor shardRecordProcessor, @NonNull ShardRecordProcessorCheckpointer recordProcessorCheckpointer, @@ -157,9 +155,6 @@ public class ProcessTask implements ConsumerTask { } finally { MetricsUtil.addSuccessAndLatency(scope, success, startTimeMillis, MetricsLevel.SUMMARY); MetricsUtil.endScope(scope); - if (listener != null) { - listener.taskCompleted(this); - } } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java index e296b893..4fcda076 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java @@ -18,6 +18,7 @@ package software.amazon.kinesis.lifecycle; import lombok.Data; import lombok.NonNull; import lombok.experimental.Accessors; +import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.leases.LeaseRefresher; @@ -33,6 +34,7 @@ import java.util.concurrent.ExecutorService; @Data @Accessors(fluent = true) +@KinesisClientInternalApi public class ShardConsumerArgument { @NonNull private final ShardInfo shardInfo; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerShutdownNotification.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerShutdownNotification.java index 7fe94141..99505f17 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerShutdownNotification.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerShutdownNotification.java @@ -16,6 +16,7 @@ package software.amazon.kinesis.lifecycle; import java.util.concurrent.CountDownLatch; +import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.processor.ShutdownNotificationAware; @@ -24,6 +25,7 @@ import software.amazon.kinesis.processor.ShutdownNotificationAware; * Contains callbacks for completion of stages in a requested record processor shutdown. * */ +@KinesisClientInternalApi public class ShardConsumerShutdownNotification implements ShutdownNotification { private final LeaseCoordinator leaseCoordinator; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/TaskCompletedListener.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/TaskCompletedListener.java deleted file mode 100644 index 26ca3b0b..00000000 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/TaskCompletedListener.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/asl/ - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package software.amazon.kinesis.lifecycle; - -public interface TaskCompletedListener { - /** - * Called once a task has completed - * - * @param task - * the task that completed - */ - void taskCompleted(ConsumerTask task); -} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/TaskFailed.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/TaskFailed.java deleted file mode 100644 index c35128ff..00000000 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/TaskFailed.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/asl/ - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package software.amazon.kinesis.lifecycle; - -import lombok.Data; - -@Data -public class TaskFailed { - private final Throwable throwable; -} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/TaskFailedListener.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/TaskFailedListener.java deleted file mode 100644 index 47851fcb..00000000 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/TaskFailedListener.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/asl/ - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package software.amazon.kinesis.lifecycle; - -@FunctionalInterface -public interface TaskFailedListener { - TaskFailureHandling taskFailed(TaskFailed result); -} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/TaskFailureHandling.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/TaskFailureHandling.java deleted file mode 100644 index b5dacac1..00000000 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/TaskFailureHandling.java +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/asl/ - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package software.amazon.kinesis.lifecycle; - -public enum TaskFailureHandling { - STOP, CONTINUE -} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/metrics/MetricsCollectingTaskDecorator.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/metrics/MetricsCollectingTaskDecorator.java index 4d7f51f9..24c3a3c5 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/metrics/MetricsCollectingTaskDecorator.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/metrics/MetricsCollectingTaskDecorator.java @@ -14,6 +14,7 @@ */ package software.amazon.kinesis.metrics; +import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.lifecycle.ConsumerTask; import software.amazon.kinesis.lifecycle.TaskResult; import software.amazon.kinesis.lifecycle.TaskType; @@ -21,6 +22,7 @@ import software.amazon.kinesis.lifecycle.TaskType; /** * Decorates an ConsumerTask and reports metrics about its timing and success/failure. */ +@KinesisClientInternalApi public class MetricsCollectingTaskDecorator implements ConsumerTask { private final ConsumerTask other; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/AWSExceptionManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/AWSExceptionManager.java index f77a35c6..2bc18032 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/AWSExceptionManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/AWSExceptionManager.java @@ -23,10 +23,12 @@ import java.util.function.Function; import lombok.NonNull; import lombok.Setter; import lombok.experimental.Accessors; +import software.amazon.kinesis.annotations.KinesisClientInternalApi; /** * */ +@KinesisClientInternalApi public class AWSExceptionManager { private final Map, Function> map = new HashMap<>(); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/IteratorBuilder.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/IteratorBuilder.java index 5e37464f..e6ed3f27 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/IteratorBuilder.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/IteratorBuilder.java @@ -9,9 +9,11 @@ import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; import software.amazon.awssdk.services.kinesis.model.StartingPosition; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; +import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.checkpoint.SentinelCheckpoint; import software.amazon.kinesis.common.InitialPositionInStreamExtended; +@KinesisClientInternalApi public class IteratorBuilder { public static SubscribeToShardRequest.Builder request(SubscribeToShardRequest.Builder builder, diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutConsumerRegistration.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutConsumerRegistration.java index d971b0f3..7ae8fdeb 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutConsumerRegistration.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutConsumerRegistration.java @@ -29,6 +29,7 @@ import software.amazon.awssdk.services.kinesis.model.RegisterStreamConsumerReque import software.amazon.awssdk.services.kinesis.model.RegisterStreamConsumerResponse; import software.amazon.awssdk.services.kinesis.model.ResourceInUseException; import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; +import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.common.KinesisRequestsBuilder; import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.retrieval.AWSExceptionManager; @@ -40,6 +41,7 @@ import software.amazon.kinesis.retrieval.ConsumerRegistration; @RequiredArgsConstructor @Slf4j @Accessors(fluent = true) +@KinesisClientInternalApi public class FanOutConsumerRegistration implements ConsumerRegistration { @NonNull private final KinesisAsyncClient kinesisClient; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java index 3524e0e4..774ce917 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java @@ -32,6 +32,7 @@ import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponse; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; +import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.KinesisRequestsBuilder; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; @@ -42,6 +43,7 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; @RequiredArgsConstructor @Slf4j +@KinesisClientInternalApi public class FanOutRecordsPublisher implements RecordsPublisher { private final KinesisAsyncClient kinesis; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java index b6ae18ce..eea61250 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java @@ -18,6 +18,7 @@ package software.amazon.kinesis.retrieval.fanout; import lombok.NonNull; import lombok.RequiredArgsConstructor; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; @@ -25,6 +26,7 @@ import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RetrievalFactory; @RequiredArgsConstructor +@KinesisClientInternalApi public class FanOutRetrievalFactory implements RetrievalFactory { private final KinesisAsyncClient kinesisClient; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/AsynchronousGetRecordsRetrievalStrategy.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/AsynchronousGetRecordsRetrievalStrategy.java index 52052c18..bb975652 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/AsynchronousGetRecordsRetrievalStrategy.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/AsynchronousGetRecordsRetrievalStrategy.java @@ -34,6 +34,7 @@ import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException; import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; +import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.retrieval.DataFetcherResult; import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; @@ -41,6 +42,7 @@ import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; * */ @Slf4j +@KinesisClientInternalApi public class AsynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrievalStrategy { private static final int TIME_TO_KEEP_ALIVE = 5; private static final int CORE_THREAD_POOL_COUNT = 1; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/BlockingRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/BlockingRecordsPublisher.java index 090e40bd..8fd68b80 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/BlockingRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/BlockingRecordsPublisher.java @@ -19,6 +19,7 @@ import java.util.List; import java.util.stream.Collectors; import org.reactivestreams.Subscriber; +import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; @@ -32,6 +33,7 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; * This is the BlockingRecordsPublisher class. This class blocks any calls to the records on the * GetRecordsRetrievalStrategy class. */ +@KinesisClientInternalApi public class BlockingRecordsPublisher implements RecordsPublisher { private final int maxRecordsPerCall; private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java index 7ecd55ca..76e32a65 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java @@ -33,6 +33,7 @@ import software.amazon.awssdk.core.exception.SdkClientException; import software.amazon.awssdk.services.cloudwatch.model.StandardUnit; import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException; import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; +import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.metrics.MetricsFactory; @@ -54,6 +55,7 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; * the record processor is blocked till records are retrieved from Kinesis. */ @Slf4j +@KinesisClientInternalApi public class PrefetchRecordsPublisher implements RecordsPublisher { private static final String EXPIRED_ITERATOR_METRIC = "ExpiredIterator"; LinkedBlockingQueue getRecordsResultQueue; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SimpleRecordsFetcherFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SimpleRecordsFetcherFactory.java index 7254e39a..dcf1367c 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SimpleRecordsFetcherFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SimpleRecordsFetcherFactory.java @@ -19,6 +19,7 @@ import java.util.concurrent.Executors; import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.extern.slf4j.Slf4j; +import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.retrieval.DataFetchingStrategy; import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; @@ -26,6 +27,7 @@ import software.amazon.kinesis.retrieval.RecordsFetcherFactory; import software.amazon.kinesis.retrieval.RecordsPublisher; @Slf4j +@KinesisClientInternalApi public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory { private int maxPendingProcessRecordsInput = 3; private int maxByteSize = 8 * 1024 * 1024; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousBlockingRetrievalFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousBlockingRetrievalFactory.java index 6a01be99..1e827fc9 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousBlockingRetrievalFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousBlockingRetrievalFactory.java @@ -11,6 +11,7 @@ package software.amazon.kinesis.retrieval.polling; import lombok.Data; import lombok.NonNull; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; @@ -22,6 +23,7 @@ import software.amazon.kinesis.retrieval.RetrievalFactory; * */ @Data +@KinesisClientInternalApi public class SynchronousBlockingRetrievalFactory implements RetrievalFactory { @NonNull private final String streamName; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousGetRecordsRetrievalStrategy.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousGetRecordsRetrievalStrategy.java index 0bc8c9f4..933e9318 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousGetRecordsRetrievalStrategy.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousGetRecordsRetrievalStrategy.java @@ -17,12 +17,14 @@ package software.amazon.kinesis.retrieval.polling; import lombok.Data; import lombok.NonNull; import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; +import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; /** * */ @Data +@KinesisClientInternalApi public class SynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrievalStrategy { @NonNull private final KinesisDataFetcher dataFetcher; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousPrefetchingRetrievalFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousPrefetchingRetrievalFactory.java index 0e897944..be69618d 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousPrefetchingRetrievalFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousPrefetchingRetrievalFactory.java @@ -20,6 +20,7 @@ import java.util.concurrent.ExecutorService; import lombok.NonNull; import lombok.RequiredArgsConstructor; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; @@ -31,6 +32,7 @@ import software.amazon.kinesis.retrieval.RetrievalFactory; * */ @RequiredArgsConstructor +@KinesisClientInternalApi public class SynchronousPrefetchingRetrievalFactory implements RetrievalFactory { @NonNull private final String streamName;