Refactoring repo (#358)

* Introducing internal annotation to provided implementations
* Deleting unused Task classes
This commit is contained in:
Sahil Palvia 2018-08-08 11:04:29 -07:00 committed by Justin Pfifer
parent b52cee9c43
commit 0b267037ea
35 changed files with 60 additions and 91 deletions

View file

@ -14,6 +14,7 @@
*/ */
package software.amazon.kinesis.checkpoint; package software.amazon.kinesis.checkpoint;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.KinesisClientLibDependencyException; import software.amazon.kinesis.exceptions.KinesisClientLibDependencyException;
import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.exceptions.ShutdownException;
@ -32,6 +33,7 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
* initialized, processes 0 records, then calls prepareCheckpoint(). The value in the table is the same, so there's * initialized, processes 0 records, then calls prepareCheckpoint(). The value in the table is the same, so there's
* no reason to overwrite it with another copy of itself. * no reason to overwrite it with another copy of itself.
*/ */
@KinesisClientInternalApi
public class DoesNothingPreparedCheckpointer implements PreparedCheckpointer { public class DoesNothingPreparedCheckpointer implements PreparedCheckpointer {
private final ExtendedSequenceNumber sequenceNumber; private final ExtendedSequenceNumber sequenceNumber;

View file

@ -16,6 +16,7 @@
package software.amazon.kinesis.checkpoint.dynamodb; package software.amazon.kinesis.checkpoint.dynamodb;
import lombok.Data; import lombok.Data;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.checkpoint.CheckpointFactory; import software.amazon.kinesis.checkpoint.CheckpointFactory;
import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.LeaseRefresher;
@ -25,6 +26,7 @@ import software.amazon.kinesis.processor.Checkpointer;
* *
*/ */
@Data @Data
@KinesisClientInternalApi
public class DynamoDBCheckpointFactory implements CheckpointFactory { public class DynamoDBCheckpointFactory implements CheckpointFactory {
@Override @Override
public Checkpointer createCheckpointer(final LeaseCoordinator leaseLeaseCoordinator, public Checkpointer createCheckpointer(final LeaseCoordinator leaseLeaseCoordinator,

View file

@ -23,6 +23,7 @@ import com.google.common.annotations.VisibleForTesting;
import lombok.NonNull; import lombok.NonNull;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.checkpoint.Checkpoint; import software.amazon.kinesis.checkpoint.Checkpoint;
import software.amazon.kinesis.exceptions.KinesisClientLibDependencyException; import software.amazon.kinesis.exceptions.KinesisClientLibDependencyException;
import software.amazon.kinesis.exceptions.KinesisClientLibException; import software.amazon.kinesis.exceptions.KinesisClientLibException;
@ -43,6 +44,7 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
*/ */
@RequiredArgsConstructor @RequiredArgsConstructor
@Slf4j @Slf4j
@KinesisClientInternalApi
public class DynamoDBCheckpointer implements Checkpointer { public class DynamoDBCheckpointer implements Checkpointer {
@NonNull @NonNull
private final LeaseCoordinator leaseCoordinator; private final LeaseCoordinator leaseCoordinator;

View file

@ -25,11 +25,13 @@ import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.kinesis.model.ListShardsRequest; import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
import software.amazon.awssdk.services.kinesis.model.RegisterStreamConsumerRequest; import software.amazon.awssdk.services.kinesis.model.RegisterStreamConsumerRequest;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.retrieval.RetrievalConfig; import software.amazon.kinesis.retrieval.RetrievalConfig;
/** /**
* *
*/ */
@KinesisClientInternalApi
public class KinesisRequestsBuilder { public class KinesisRequestsBuilder {
public static ListShardsRequest.Builder listShardsRequestBuilder() { public static ListShardsRequest.Builder listShardsRequestBuilder() {
return appendUserAgent(ListShardsRequest.builder()); return appendUserAgent(ListShardsRequest.builder());

View file

@ -25,6 +25,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.Data; import lombok.Data;
import lombok.NonNull; import lombok.NonNull;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.processor.Checkpointer; import software.amazon.kinesis.processor.Checkpointer;
@ -33,6 +34,7 @@ import software.amazon.kinesis.processor.Checkpointer;
* *
*/ */
@Data @Data
@KinesisClientInternalApi
public class SchedulerCoordinatorFactory implements CoordinatorFactory { public class SchedulerCoordinatorFactory implements CoordinatorFactory {
@Override @Override
public ExecutorService createExecutorService() { public ExecutorService createExecutorService() {

View file

@ -15,6 +15,7 @@
package software.amazon.kinesis.leases; package software.amazon.kinesis.leases;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue; import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
@ -24,6 +25,7 @@ import java.util.Map;
/** /**
* Static utility functions used by our LeaseSerializers. * Static utility functions used by our LeaseSerializers.
*/ */
@KinesisClientInternalApi
public class DynamoUtils { public class DynamoUtils {
public static AttributeValue createAttributeValue(Collection<String> collectionValue) { public static AttributeValue createAttributeValue(Collection<String> collectionValue) {

View file

@ -43,6 +43,7 @@ import software.amazon.awssdk.services.kinesis.model.ListShardsResponse;
import software.amazon.awssdk.services.kinesis.model.ResourceInUseException; import software.amazon.awssdk.services.kinesis.model.ResourceInUseException;
import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.KinesisRequestsBuilder; import software.amazon.kinesis.common.KinesisRequestsBuilder;
import software.amazon.kinesis.retrieval.AWSExceptionManager; import software.amazon.kinesis.retrieval.AWSExceptionManager;
@ -52,6 +53,7 @@ import software.amazon.kinesis.retrieval.AWSExceptionManager;
@RequiredArgsConstructor @RequiredArgsConstructor
@Slf4j @Slf4j
@Accessors(fluent = true) @Accessors(fluent = true)
@KinesisClientInternalApi
public class KinesisShardDetector implements ShardDetector { public class KinesisShardDetector implements ShardDetector {
@NonNull @NonNull
private final KinesisAsyncClient kinesisClient; private final KinesisAsyncClient kinesisClient;

View file

@ -17,6 +17,7 @@ package software.amazon.kinesis.leases;
import lombok.NonNull; import lombok.NonNull;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.lifecycle.ConsumerTask; import software.amazon.kinesis.lifecycle.ConsumerTask;
import software.amazon.kinesis.lifecycle.TaskResult; import software.amazon.kinesis.lifecycle.TaskResult;
@ -33,6 +34,7 @@ import software.amazon.kinesis.metrics.MetricsUtil;
*/ */
@RequiredArgsConstructor @RequiredArgsConstructor
@Slf4j @Slf4j
@KinesisClientInternalApi
public class ShardSyncTask implements ConsumerTask { public class ShardSyncTask implements ConsumerTask {
private final String SHARD_SYNC_TASK_OPERATION = "ShardSyncTask"; private final String SHARD_SYNC_TASK_OPERATION = "ShardSyncTask";

View file

@ -32,6 +32,7 @@ import java.util.stream.Collectors;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.LeaseRefresher;
@ -54,6 +55,7 @@ import software.amazon.kinesis.metrics.MetricsUtil;
* *
*/ */
@Slf4j @Slf4j
@KinesisClientInternalApi
public class DynamoDBLeaseCoordinator implements LeaseCoordinator { public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
// Time to wait for in-flight Runnables to finish when calling .stop(); // Time to wait for in-flight Runnables to finish when calling .stop();
private static final long STOP_WAIT_TIME_MILLIS = 2000L; private static final long STOP_WAIT_TIME_MILLIS = 2000L;

View file

@ -21,6 +21,7 @@ import lombok.Data;
import lombok.NonNull; import lombok.NonNull;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.leases.KinesisShardDetector; import software.amazon.kinesis.leases.KinesisShardDetector;
import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseCoordinator;
@ -33,6 +34,7 @@ import software.amazon.kinesis.metrics.MetricsFactory;
* *
*/ */
@Data @Data
@KinesisClientInternalApi
public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
@NonNull @NonNull
private final KinesisAsyncClient kinesisClient; private final KinesisAsyncClient kinesisClient;

View file

@ -45,6 +45,7 @@ import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
import software.amazon.awssdk.services.dynamodb.model.TableStatus; import software.amazon.awssdk.services.dynamodb.model.TableStatus;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest; import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.LeaseSerializer; import software.amazon.kinesis.leases.LeaseSerializer;
@ -59,6 +60,7 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
*/ */
@AllArgsConstructor @AllArgsConstructor
@Slf4j @Slf4j
@KinesisClientInternalApi
public class DynamoDBLeaseRefresher implements LeaseRefresher { public class DynamoDBLeaseRefresher implements LeaseRefresher {
protected final String table; protected final String table;
protected final DynamoDbAsyncClient dynamoDBClient; protected final DynamoDbAsyncClient dynamoDBClient;

View file

@ -35,6 +35,7 @@ import lombok.NonNull;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.cloudwatch.model.StandardUnit; import software.amazon.awssdk.services.cloudwatch.model.StandardUnit;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.LeaseRenewer; import software.amazon.kinesis.leases.LeaseRenewer;
@ -50,6 +51,7 @@ import software.amazon.kinesis.metrics.MetricsUtil;
* An implementation of {@link LeaseRenewer} that uses DynamoDB via {@link LeaseRefresher}. * An implementation of {@link LeaseRenewer} that uses DynamoDB via {@link LeaseRefresher}.
*/ */
@Slf4j @Slf4j
@KinesisClientInternalApi
public class DynamoDBLeaseRenewer implements LeaseRenewer { public class DynamoDBLeaseRenewer implements LeaseRenewer {
private static final int RENEWAL_RETRIES = 2; private static final int RENEWAL_RETRIES = 2;
private static final String RENEW_ALL_LEASES_DIMENSION = "RenewAllLeases"; private static final String RENEW_ALL_LEASES_DIMENSION = "RenewAllLeases";

View file

@ -30,6 +30,7 @@ import software.amazon.awssdk.services.dynamodb.model.ExpectedAttributeValue;
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement; import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
import software.amazon.awssdk.services.dynamodb.model.KeyType; import software.amazon.awssdk.services.dynamodb.model.KeyType;
import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType; import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.leases.DynamoUtils; import software.amazon.kinesis.leases.DynamoUtils;
import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseSerializer; import software.amazon.kinesis.leases.LeaseSerializer;
@ -39,6 +40,7 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
* An implementation of ILeaseSerializer for basic Lease objects. Can also instantiate subclasses of Lease so that * An implementation of ILeaseSerializer for basic Lease objects. Can also instantiate subclasses of Lease so that
* LeaseSerializer can be decorated by other classes if you need to add fields to leases. * LeaseSerializer can be decorated by other classes if you need to add fields to leases.
*/ */
@KinesisClientInternalApi
public class DynamoDBLeaseSerializer implements LeaseSerializer { public class DynamoDBLeaseSerializer implements LeaseSerializer {
private static final String LEASE_KEY_KEY = "leaseKey"; private static final String LEASE_KEY_KEY = "leaseKey";
private static final String LEASE_OWNER_KEY = "leaseOwner"; private static final String LEASE_OWNER_KEY = "leaseOwner";

View file

@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.cloudwatch.model.StandardUnit; import software.amazon.awssdk.services.cloudwatch.model.StandardUnit;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.LeaseTaker; import software.amazon.kinesis.leases.LeaseTaker;
@ -43,6 +44,7 @@ import software.amazon.kinesis.metrics.MetricsUtil;
* An implementation of {@link LeaseTaker} that uses DynamoDB via {@link LeaseRefresher}. * An implementation of {@link LeaseTaker} that uses DynamoDB via {@link LeaseRefresher}.
*/ */
@Slf4j @Slf4j
@KinesisClientInternalApi
public class DynamoDBLeaseTaker implements LeaseTaker { public class DynamoDBLeaseTaker implements LeaseTaker {
private static final int TAKE_RETRIES = 3; private static final int TAKE_RETRIES = 3;
private static final int SCAN_RETRIES = 1; private static final int SCAN_RETRIES = 1;

View file

@ -18,6 +18,7 @@ import lombok.AccessLevel;
import lombok.NonNull; import lombok.NonNull;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.exceptions.internal.BlockedOnParentShardException; import software.amazon.kinesis.exceptions.internal.BlockedOnParentShardException;
import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.LeaseRefresher;
@ -34,6 +35,7 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
*/ */
@RequiredArgsConstructor(access = AccessLevel.PACKAGE) @RequiredArgsConstructor(access = AccessLevel.PACKAGE)
@Slf4j @Slf4j
@KinesisClientInternalApi
// TODO: Check for non null values // TODO: Check for non null values
public class BlockOnParentShardTask implements ConsumerTask { public class BlockOnParentShardTask implements ConsumerTask {
@NonNull @NonNull

View file

@ -61,8 +61,6 @@ public class ProcessTask implements ConsumerTask {
private final MetricsFactory metricsFactory; private final MetricsFactory metricsFactory;
private final AggregatorUtil aggregatorUtil; private final AggregatorUtil aggregatorUtil;
private TaskCompletedListener listener;
public ProcessTask(@NonNull ShardInfo shardInfo, public ProcessTask(@NonNull ShardInfo shardInfo,
@NonNull ShardRecordProcessor shardRecordProcessor, @NonNull ShardRecordProcessor shardRecordProcessor,
@NonNull ShardRecordProcessorCheckpointer recordProcessorCheckpointer, @NonNull ShardRecordProcessorCheckpointer recordProcessorCheckpointer,
@ -157,9 +155,6 @@ public class ProcessTask implements ConsumerTask {
} finally { } finally {
MetricsUtil.addSuccessAndLatency(scope, success, startTimeMillis, MetricsLevel.SUMMARY); MetricsUtil.addSuccessAndLatency(scope, success, startTimeMillis, MetricsLevel.SUMMARY);
MetricsUtil.endScope(scope); MetricsUtil.endScope(scope);
if (listener != null) {
listener.taskCompleted(this);
}
} }
} }

View file

@ -18,6 +18,7 @@ package software.amazon.kinesis.lifecycle;
import lombok.Data; import lombok.Data;
import lombok.NonNull; import lombok.NonNull;
import lombok.experimental.Accessors; import lombok.experimental.Accessors;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.LeaseRefresher;
@ -33,6 +34,7 @@ import java.util.concurrent.ExecutorService;
@Data @Data
@Accessors(fluent = true) @Accessors(fluent = true)
@KinesisClientInternalApi
public class ShardConsumerArgument { public class ShardConsumerArgument {
@NonNull @NonNull
private final ShardInfo shardInfo; private final ShardInfo shardInfo;

View file

@ -16,6 +16,7 @@ package software.amazon.kinesis.lifecycle;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.processor.ShutdownNotificationAware; import software.amazon.kinesis.processor.ShutdownNotificationAware;
@ -24,6 +25,7 @@ import software.amazon.kinesis.processor.ShutdownNotificationAware;
* Contains callbacks for completion of stages in a requested record processor shutdown. * Contains callbacks for completion of stages in a requested record processor shutdown.
* *
*/ */
@KinesisClientInternalApi
public class ShardConsumerShutdownNotification implements ShutdownNotification { public class ShardConsumerShutdownNotification implements ShutdownNotification {
private final LeaseCoordinator leaseCoordinator; private final LeaseCoordinator leaseCoordinator;

View file

@ -1,25 +0,0 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package software.amazon.kinesis.lifecycle;
public interface TaskCompletedListener {
/**
* Called once a task has completed
*
* @param task
* the task that completed
*/
void taskCompleted(ConsumerTask task);
}

View file

@ -1,22 +0,0 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package software.amazon.kinesis.lifecycle;
import lombok.Data;
@Data
public class TaskFailed {
private final Throwable throwable;
}

View file

@ -1,20 +0,0 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package software.amazon.kinesis.lifecycle;
@FunctionalInterface
public interface TaskFailedListener {
TaskFailureHandling taskFailed(TaskFailed result);
}

View file

@ -1,19 +0,0 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package software.amazon.kinesis.lifecycle;
public enum TaskFailureHandling {
STOP, CONTINUE
}

View file

@ -14,6 +14,7 @@
*/ */
package software.amazon.kinesis.metrics; package software.amazon.kinesis.metrics;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.lifecycle.ConsumerTask; import software.amazon.kinesis.lifecycle.ConsumerTask;
import software.amazon.kinesis.lifecycle.TaskResult; import software.amazon.kinesis.lifecycle.TaskResult;
import software.amazon.kinesis.lifecycle.TaskType; import software.amazon.kinesis.lifecycle.TaskType;
@ -21,6 +22,7 @@ import software.amazon.kinesis.lifecycle.TaskType;
/** /**
* Decorates an ConsumerTask and reports metrics about its timing and success/failure. * Decorates an ConsumerTask and reports metrics about its timing and success/failure.
*/ */
@KinesisClientInternalApi
public class MetricsCollectingTaskDecorator implements ConsumerTask { public class MetricsCollectingTaskDecorator implements ConsumerTask {
private final ConsumerTask other; private final ConsumerTask other;

View file

@ -23,10 +23,12 @@ import java.util.function.Function;
import lombok.NonNull; import lombok.NonNull;
import lombok.Setter; import lombok.Setter;
import lombok.experimental.Accessors; import lombok.experimental.Accessors;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
/** /**
* *
*/ */
@KinesisClientInternalApi
public class AWSExceptionManager { public class AWSExceptionManager {
private final Map<Class<? extends Throwable>, Function<? extends Throwable, RuntimeException>> map = new HashMap<>(); private final Map<Class<? extends Throwable>, Function<? extends Throwable, RuntimeException>> map = new HashMap<>();

View file

@ -9,9 +9,11 @@ import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
import software.amazon.awssdk.services.kinesis.model.StartingPosition; import software.amazon.awssdk.services.kinesis.model.StartingPosition;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.checkpoint.SentinelCheckpoint; import software.amazon.kinesis.checkpoint.SentinelCheckpoint;
import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
@KinesisClientInternalApi
public class IteratorBuilder { public class IteratorBuilder {
public static SubscribeToShardRequest.Builder request(SubscribeToShardRequest.Builder builder, public static SubscribeToShardRequest.Builder request(SubscribeToShardRequest.Builder builder,

View file

@ -29,6 +29,7 @@ import software.amazon.awssdk.services.kinesis.model.RegisterStreamConsumerReque
import software.amazon.awssdk.services.kinesis.model.RegisterStreamConsumerResponse; import software.amazon.awssdk.services.kinesis.model.RegisterStreamConsumerResponse;
import software.amazon.awssdk.services.kinesis.model.ResourceInUseException; import software.amazon.awssdk.services.kinesis.model.ResourceInUseException;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.KinesisRequestsBuilder; import software.amazon.kinesis.common.KinesisRequestsBuilder;
import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.retrieval.AWSExceptionManager; import software.amazon.kinesis.retrieval.AWSExceptionManager;
@ -40,6 +41,7 @@ import software.amazon.kinesis.retrieval.ConsumerRegistration;
@RequiredArgsConstructor @RequiredArgsConstructor
@Slf4j @Slf4j
@Accessors(fluent = true) @Accessors(fluent = true)
@KinesisClientInternalApi
public class FanOutConsumerRegistration implements ConsumerRegistration { public class FanOutConsumerRegistration implements ConsumerRegistration {
@NonNull @NonNull
private final KinesisAsyncClient kinesisClient; private final KinesisAsyncClient kinesisClient;

View file

@ -32,6 +32,7 @@ import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponse; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponse;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.KinesisRequestsBuilder; import software.amazon.kinesis.common.KinesisRequestsBuilder;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
@ -42,6 +43,7 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
@RequiredArgsConstructor @RequiredArgsConstructor
@Slf4j @Slf4j
@KinesisClientInternalApi
public class FanOutRecordsPublisher implements RecordsPublisher { public class FanOutRecordsPublisher implements RecordsPublisher {
private final KinesisAsyncClient kinesis; private final KinesisAsyncClient kinesis;

View file

@ -18,6 +18,7 @@ package software.amazon.kinesis.retrieval.fanout;
import lombok.NonNull; import lombok.NonNull;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
@ -25,6 +26,7 @@ import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RetrievalFactory; import software.amazon.kinesis.retrieval.RetrievalFactory;
@RequiredArgsConstructor @RequiredArgsConstructor
@KinesisClientInternalApi
public class FanOutRetrievalFactory implements RetrievalFactory { public class FanOutRetrievalFactory implements RetrievalFactory {
private final KinesisAsyncClient kinesisClient; private final KinesisAsyncClient kinesisClient;

View file

@ -34,6 +34,7 @@ import lombok.NonNull;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException; import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.retrieval.DataFetcherResult; import software.amazon.kinesis.retrieval.DataFetcherResult;
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
@ -41,6 +42,7 @@ import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
* *
*/ */
@Slf4j @Slf4j
@KinesisClientInternalApi
public class AsynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrievalStrategy { public class AsynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrievalStrategy {
private static final int TIME_TO_KEEP_ALIVE = 5; private static final int TIME_TO_KEEP_ALIVE = 5;
private static final int CORE_THREAD_POOL_COUNT = 1; private static final int CORE_THREAD_POOL_COUNT = 1;

View file

@ -19,6 +19,7 @@ import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.reactivestreams.Subscriber; import org.reactivestreams.Subscriber;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
@ -32,6 +33,7 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
* This is the BlockingRecordsPublisher class. This class blocks any calls to the records on the * This is the BlockingRecordsPublisher class. This class blocks any calls to the records on the
* GetRecordsRetrievalStrategy class. * GetRecordsRetrievalStrategy class.
*/ */
@KinesisClientInternalApi
public class BlockingRecordsPublisher implements RecordsPublisher { public class BlockingRecordsPublisher implements RecordsPublisher {
private final int maxRecordsPerCall; private final int maxRecordsPerCall;
private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;

View file

@ -33,6 +33,7 @@ import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.services.cloudwatch.model.StandardUnit; import software.amazon.awssdk.services.cloudwatch.model.StandardUnit;
import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException; import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.metrics.MetricsFactory;
@ -54,6 +55,7 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
* the record processor is blocked till records are retrieved from Kinesis. * the record processor is blocked till records are retrieved from Kinesis.
*/ */
@Slf4j @Slf4j
@KinesisClientInternalApi
public class PrefetchRecordsPublisher implements RecordsPublisher { public class PrefetchRecordsPublisher implements RecordsPublisher {
private static final String EXPIRED_ITERATOR_METRIC = "ExpiredIterator"; private static final String EXPIRED_ITERATOR_METRIC = "ExpiredIterator";
LinkedBlockingQueue<ProcessRecordsInput> getRecordsResultQueue; LinkedBlockingQueue<ProcessRecordsInput> getRecordsResultQueue;

View file

@ -19,6 +19,7 @@ import java.util.concurrent.Executors;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.retrieval.DataFetchingStrategy; import software.amazon.kinesis.retrieval.DataFetchingStrategy;
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
@ -26,6 +27,7 @@ import software.amazon.kinesis.retrieval.RecordsFetcherFactory;
import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RecordsPublisher;
@Slf4j @Slf4j
@KinesisClientInternalApi
public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory { public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory {
private int maxPendingProcessRecordsInput = 3; private int maxPendingProcessRecordsInput = 3;
private int maxByteSize = 8 * 1024 * 1024; private int maxByteSize = 8 * 1024 * 1024;

View file

@ -11,6 +11,7 @@ package software.amazon.kinesis.retrieval.polling;
import lombok.Data; import lombok.Data;
import lombok.NonNull; import lombok.NonNull;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
@ -22,6 +23,7 @@ import software.amazon.kinesis.retrieval.RetrievalFactory;
* *
*/ */
@Data @Data
@KinesisClientInternalApi
public class SynchronousBlockingRetrievalFactory implements RetrievalFactory { public class SynchronousBlockingRetrievalFactory implements RetrievalFactory {
@NonNull @NonNull
private final String streamName; private final String streamName;

View file

@ -17,12 +17,14 @@ package software.amazon.kinesis.retrieval.polling;
import lombok.Data; import lombok.Data;
import lombok.NonNull; import lombok.NonNull;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
/** /**
* *
*/ */
@Data @Data
@KinesisClientInternalApi
public class SynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrievalStrategy { public class SynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrievalStrategy {
@NonNull @NonNull
private final KinesisDataFetcher dataFetcher; private final KinesisDataFetcher dataFetcher;

View file

@ -20,6 +20,7 @@ import java.util.concurrent.ExecutorService;
import lombok.NonNull; import lombok.NonNull;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
@ -31,6 +32,7 @@ import software.amazon.kinesis.retrieval.RetrievalFactory;
* *
*/ */
@RequiredArgsConstructor @RequiredArgsConstructor
@KinesisClientInternalApi
public class SynchronousPrefetchingRetrievalFactory implements RetrievalFactory { public class SynchronousPrefetchingRetrievalFactory implements RetrievalFactory {
@NonNull @NonNull
private final String streamName; private final String streamName;