diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/CheckpointConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/CheckpointConfig.java index b78e0852..98037255 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/CheckpointConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/CheckpointConfig.java @@ -17,11 +17,19 @@ package software.amazon.kinesis.checkpoint; import lombok.Data; import lombok.experimental.Accessors; +import software.amazon.kinesis.coordinator.RecordProcessorCheckpointer; /** - * + * Used by the KCL to manage checkpointing. */ @Data @Accessors(fluent = true) public class CheckpointConfig { + /** + * KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls + * to {@link RecordProcessorCheckpointer#checkpoint(String)} by default. + * + *
Default value: true
+ */ + private boolean validateSequenceNumberBeforeCheckpointing = true; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorConfig.java index a344ac7b..c581d253 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorConfig.java @@ -16,12 +16,47 @@ package software.amazon.kinesis.coordinator; import lombok.Data; +import lombok.NonNull; import lombok.experimental.Accessors; +import software.amazon.kinesis.leases.NoOpShardPrioritization; +import software.amazon.kinesis.leases.ShardPrioritization; /** - * + * Used by the KCL to configure the coordinator. */ @Data @Accessors(fluent = true) public class CoordinatorConfig { + /** + * Application name used by checkpointer to checkpoint. + * + * @return String + */ + @NonNull + private final String applicationName; + + /** + * Interval in milliseconds between polling to check for parent shard completion. + * Polling frequently will take up more DynamoDB IOPS (when there are leases for shards waiting on + * completion of parent shards). + * + *Default value: 10000L
+ */ + private long parentShardPollIntervalMillis = 10000L; + + /** + * The Worker will skip shard sync during initialization if there are one or more leases in the lease table. This + * assumes that the shards and leases are in-sync. This enables customers to choose faster startup times (e.g. + * during incremental deployments of an application). + * + *Default value: false
+ */ + private boolean skipShardSyncAtWorkerInitializationIfLeasesExist = false; + + /** + * Shard prioritization strategy. + * + *Default value: {@link NoOpShardPrioritization}
+ */ + private ShardPrioritization shardPrioritization = new NoOpShardPrioritization(); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java index 30c4012f..d3737039 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java @@ -15,13 +15,107 @@ package software.amazon.kinesis.leases; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; + import lombok.Data; +import lombok.NonNull; import lombok.experimental.Accessors; /** - * + * Used by the KCL to configure lease management. */ @Data @Accessors(fluent = true) public class LeaseManagementConfig { + private static final long EPSILON_MS = 25L; + + /** + * Name of the table to use in DynamoDB + * + * @return String + */ + @NonNull + private final String tableName; + /** + * Client to be used to access DynamoDB service. + * + * @return AmazonDynamoDB + */ + @NonNull + private final AmazonDynamoDB amazonDynamoDB; + /** + * Used to distinguish different workers/processes of a KCL application. + * + * @return String + */ + @NonNull + private final String workerIdentifier; + + /** + * Fail over time in milliseconds. A worker which does not renew it's lease within this time interval + * will be regarded as having problems and it's shards will be assigned to other workers. + * For applications that have a large number of shards, this may be set to a higher number to reduce + * the number of DynamoDB IOPS required for tracking leases. + * + *Default value: 10000L
+ */ + private long failoverTimeMillis = 10000L; + + /** + * Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks. + * + *Default value: 60000L
+ */ + private long shardSyncIntervalMillis = 60000L; + + /** + * Cleanup leases upon shards completion (don't wait until they expire in Kinesis). + * Keeping leases takes some tracking/resources (e.g. they need to be renewed, assigned), so by default we try + * to delete the ones we don't need any longer. + * + *Default value: true
+ */ + private boolean cleanupLeasesUponShardCompletion = true; + + /** + * The max number of leases (shards) this worker should process. + * This can be useful to avoid overloading (and thrashing) a worker when a host has resource constraints + * or during deployment. + * + *NOTE: Setting this to a low value can cause data loss if workers are not able to pick up all shards in the + * stream due to the max limit.
+ * + *Default value: {@link Integer#MAX_VALUE}
+ */ + private int maxLeasesForWorker = Integer.MAX_VALUE;; + + /** + * Max leases to steal from another worker at one time (for load balancing). + * Setting this to a higher number can allow for faster load convergence (e.g. during deployments, cold starts), + * but can cause higher churn in the system. + * + *Default value: 1
+ */ + private int maxLeasesToStealAtOneTime = 1; + + /** + * The Amazon DynamoDB table used for tracking leases will be provisioned with this read capacity. + * + *Default value: 10
+ */ + private int initialLeaseTableReadCapacity = 10; + + /** + * The Amazon DynamoDB table used for tracking leases will be provisioned with this write capacity. + * + *Default value: 10
+ */ + private int initialLeaseTableWriteCapacity = 10; + + /** + * The size of the thread pool to create for the lease renewer to use. + * + *Default value: 20
+ */ + private int maxLeaseRenewalThreads = 20; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/LifecycleConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/LifecycleConfig.java index cdaf5391..a7b7dfe9 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/LifecycleConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/LifecycleConfig.java @@ -15,13 +15,35 @@ package software.amazon.kinesis.lifecycle; +import java.util.Optional; + import lombok.Data; import lombok.experimental.Accessors; /** - * + * Used by the KCL to configure the lifecycle. */ @Data @Accessors(fluent = true) public class LifecycleConfig { + /** + * The amount of milliseconds to wait before graceful shutdown forcefully terminates. + * + *Default value: 5000L
+ */ + private long shutdownGraceMillis = 5000L; + + /** + * Logs warn message if as task is held in a task for more than the set time. + * + *Default value: {@link Optional#empty()}
+ */ + private OptionalDefault value: 500L
+ */ + private long taskBackoffTimeMillis = 500L; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/metrics/MetricsConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/metrics/MetricsConfig.java index f824f842..80191520 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/metrics/MetricsConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/metrics/MetricsConfig.java @@ -15,13 +15,70 @@ package software.amazon.kinesis.metrics; +import com.amazonaws.services.cloudwatch.AmazonCloudWatch; +import com.google.common.collect.ImmutableSet; import lombok.Data; +import lombok.NonNull; import lombok.experimental.Accessors; +import java.util.Set; + /** - * + * Used by KCL to configure the metrics reported by the application. */ @Data @Accessors(fluent = true) public class MetricsConfig { + /** + * Metrics dimensions that always will be enabled regardless of the config provided by user. + */ + public static final SetDefault value: 10000L
+ */ + private long metricsBufferTimeMillis = 10000L; + + /** + * Buffer at most this many metrics before publishing to CloudWatch. + * + *Default value: 10000
+ */ + private int metricsMaxQueueSize = 10000; + + /** + * Metrics level for which to enable CloudWatch metrics. + * + *Default value: {@link MetricsLevel#DETAILED}
+ */ + private MetricsLevel metricsLevel = MetricsLevel.DETAILED; + + /** + * Allowed dimensions for CloudWatchMetrics. + * + *Default value: {@link MetricsConfig#DEFAULT_METRICS_ENABLED_DIMENSIONS}
+ */ + private SetDefault value: false
+ */ + private boolean callProcessRecordsEvenForEmptyRecordList = false; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java index acff73fb..3ab7cb18 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java @@ -15,13 +15,93 @@ package software.amazon.kinesis.retrieval; +import java.util.Optional; + +import com.amazonaws.services.kinesis.AmazonKinesis; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; + import lombok.Data; +import lombok.NonNull; import lombok.experimental.Accessors; +import software.amazon.kinesis.lifecycle.ShardConsumer; /** - * + * Used by the KCL to configure the retrieval of records from Kinesis. */ @Data @Accessors(fluent = true) public class RetrievalConfig { + /** + * Name of the Kinesis stream. + * + * @return String + */ + @NonNull + private final String streamName; + + /** + * Client used to access to Kinesis service. + * + * @return {@link AmazonKinesis} + */ + @NonNull + private final AmazonKinesis amazonKinesis; + + /** + * Max records to fetch from Kinesis in a single GetRecords call. + * + *Default value: 10000
+ */ + private int maxRecords = 10000; + + /** + * The value for how long the {@link ShardConsumer} should sleep if no records are returned from the call to + * {@link com.amazonaws.services.kinesis.AmazonKinesis#getRecords(com.amazonaws.services.kinesis.model.GetRecordsRequest)}. + * + *Default value: 1000L
+ */ + private long idleTimeBetweenReadsInMillis = 1000L; + + /** + * Time to wait in seconds before the worker retries to get a record. + * + *Default value: {@link Optional#empty()}
+ */ + private OptionalDefault value: {@link Optional#empty()}
+ */ + private OptionalDefault value: {@link SimpleRecordsFetcherFactory}
+ */ + private RecordsFetcherFactory recordsFetcherFactory = new SimpleRecordsFetcherFactory(); + + /** + * Backoff time between consecutive ListShards calls. + * + *Default value: 1500L
+ */ + private long listShardsBackoffTimeInMillis = 1500L; + + /** + * Max number of retries for ListShards when throttled/exception is thrown. + * + *Default value: 50
+ */ + private int maxListShardsRetryAttempts = 50; + + /** + * The location in the shard from which the KinesisClientLibrary will start fetching records from + * when the application starts for the first time and there is no checkpoint for the shard. + * + *Default value: {@link InitialPositionInStream#LATEST}
+ */ + private InitialPositionInStream initialPositionInStream = InitialPositionInStream.LATEST; }