From 6483e291f8521290292edce6a58a06efa408f9be Mon Sep 17 00:00:00 2001 From: Sahil Palvia Date: Fri, 16 Mar 2018 11:43:26 -0700 Subject: [PATCH] Migrating configuration from KinesisClientLibConfiguration to top level package configurations. --- .../kinesis/checkpoint/CheckpointConfig.java | 10 +- .../coordinator/CoordinatorConfig.java | 37 ++++++- .../kinesis/leases/LeaseManagementConfig.java | 96 ++++++++++++++++++- .../kinesis/lifecycle/LifecycleConfig.java | 24 ++++- .../amazon/kinesis/metrics/MetricsConfig.java | 59 +++++++++++- .../kinesis/processor/ProcessorConfig.java | 14 ++- .../kinesis/retrieval/RetrievalConfig.java | 82 +++++++++++++++- 7 files changed, 312 insertions(+), 10 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/CheckpointConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/CheckpointConfig.java index b78e0852..98037255 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/CheckpointConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/CheckpointConfig.java @@ -17,11 +17,19 @@ package software.amazon.kinesis.checkpoint; import lombok.Data; import lombok.experimental.Accessors; +import software.amazon.kinesis.coordinator.RecordProcessorCheckpointer; /** - * + * Used by the KCL to manage checkpointing. */ @Data @Accessors(fluent = true) public class CheckpointConfig { + /** + * KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls + * to {@link RecordProcessorCheckpointer#checkpoint(String)} by default. + * + *

Default value: true

+ */ + private boolean validateSequenceNumberBeforeCheckpointing = true; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorConfig.java index a344ac7b..c581d253 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorConfig.java @@ -16,12 +16,47 @@ package software.amazon.kinesis.coordinator; import lombok.Data; +import lombok.NonNull; import lombok.experimental.Accessors; +import software.amazon.kinesis.leases.NoOpShardPrioritization; +import software.amazon.kinesis.leases.ShardPrioritization; /** - * + * Used by the KCL to configure the coordinator. */ @Data @Accessors(fluent = true) public class CoordinatorConfig { + /** + * Application name used by checkpointer to checkpoint. + * + * @return String + */ + @NonNull + private final String applicationName; + + /** + * Interval in milliseconds between polling to check for parent shard completion. + * Polling frequently will take up more DynamoDB IOPS (when there are leases for shards waiting on + * completion of parent shards). + * + *

Default value: 10000L

+ */ + private long parentShardPollIntervalMillis = 10000L; + + /** + * The Worker will skip shard sync during initialization if there are one or more leases in the lease table. This + * assumes that the shards and leases are in-sync. This enables customers to choose faster startup times (e.g. + * during incremental deployments of an application). + * + *

Default value: false

+ */ + private boolean skipShardSyncAtWorkerInitializationIfLeasesExist = false; + + /** + * Shard prioritization strategy. + * + *

Default value: {@link NoOpShardPrioritization}

+ */ + private ShardPrioritization shardPrioritization = new NoOpShardPrioritization(); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java index 30c4012f..d3737039 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java @@ -15,13 +15,107 @@ package software.amazon.kinesis.leases; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; + import lombok.Data; +import lombok.NonNull; import lombok.experimental.Accessors; /** - * + * Used by the KCL to configure lease management. */ @Data @Accessors(fluent = true) public class LeaseManagementConfig { + private static final long EPSILON_MS = 25L; + + /** + * Name of the table to use in DynamoDB + * + * @return String + */ + @NonNull + private final String tableName; + /** + * Client to be used to access DynamoDB service. + * + * @return AmazonDynamoDB + */ + @NonNull + private final AmazonDynamoDB amazonDynamoDB; + /** + * Used to distinguish different workers/processes of a KCL application. + * + * @return String + */ + @NonNull + private final String workerIdentifier; + + /** + * Fail over time in milliseconds. A worker which does not renew it's lease within this time interval + * will be regarded as having problems and it's shards will be assigned to other workers. + * For applications that have a large number of shards, this may be set to a higher number to reduce + * the number of DynamoDB IOPS required for tracking leases. + * + *

Default value: 10000L

+ */ + private long failoverTimeMillis = 10000L; + + /** + * Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks. + * + *

Default value: 60000L

+ */ + private long shardSyncIntervalMillis = 60000L; + + /** + * Cleanup leases upon shards completion (don't wait until they expire in Kinesis). + * Keeping leases takes some tracking/resources (e.g. they need to be renewed, assigned), so by default we try + * to delete the ones we don't need any longer. + * + *

Default value: true

+ */ + private boolean cleanupLeasesUponShardCompletion = true; + + /** + * The max number of leases (shards) this worker should process. + * This can be useful to avoid overloading (and thrashing) a worker when a host has resource constraints + * or during deployment. + * + *

NOTE: Setting this to a low value can cause data loss if workers are not able to pick up all shards in the + * stream due to the max limit.

+ * + *

Default value: {@link Integer#MAX_VALUE}

+ */ + private int maxLeasesForWorker = Integer.MAX_VALUE;; + + /** + * Max leases to steal from another worker at one time (for load balancing). + * Setting this to a higher number can allow for faster load convergence (e.g. during deployments, cold starts), + * but can cause higher churn in the system. + * + *

Default value: 1

+ */ + private int maxLeasesToStealAtOneTime = 1; + + /** + * The Amazon DynamoDB table used for tracking leases will be provisioned with this read capacity. + * + *

Default value: 10

+ */ + private int initialLeaseTableReadCapacity = 10; + + /** + * The Amazon DynamoDB table used for tracking leases will be provisioned with this write capacity. + * + *

Default value: 10

+ */ + private int initialLeaseTableWriteCapacity = 10; + + /** + * The size of the thread pool to create for the lease renewer to use. + * + *

Default value: 20

+ */ + private int maxLeaseRenewalThreads = 20; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/LifecycleConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/LifecycleConfig.java index cdaf5391..a7b7dfe9 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/LifecycleConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/LifecycleConfig.java @@ -15,13 +15,35 @@ package software.amazon.kinesis.lifecycle; +import java.util.Optional; + import lombok.Data; import lombok.experimental.Accessors; /** - * + * Used by the KCL to configure the lifecycle. */ @Data @Accessors(fluent = true) public class LifecycleConfig { + /** + * The amount of milliseconds to wait before graceful shutdown forcefully terminates. + * + *

Default value: 5000L

+ */ + private long shutdownGraceMillis = 5000L; + + /** + * Logs warn message if as task is held in a task for more than the set time. + * + *

Default value: {@link Optional#empty()}

+ */ + private Optional logWarningForTaskAfterMillis = Optional.empty(); + + /** + * Backoff time in milliseconds for Amazon Kinesis Client Library tasks (in the event of failures). + * + *

Default value: 500L

+ */ + private long taskBackoffTimeMillis = 500L; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/metrics/MetricsConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/metrics/MetricsConfig.java index f824f842..80191520 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/metrics/MetricsConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/metrics/MetricsConfig.java @@ -15,13 +15,70 @@ package software.amazon.kinesis.metrics; +import com.amazonaws.services.cloudwatch.AmazonCloudWatch; +import com.google.common.collect.ImmutableSet; import lombok.Data; +import lombok.NonNull; import lombok.experimental.Accessors; +import java.util.Set; + /** - * + * Used by KCL to configure the metrics reported by the application. */ @Data @Accessors(fluent = true) public class MetricsConfig { + /** + * Metrics dimensions that always will be enabled regardless of the config provided by user. + */ + public static final Set METRICS_ALWAYS_ENABLED_DIMENSIONS = ImmutableSet.of( + MetricsHelper.OPERATION_DIMENSION_NAME); + + /** + * Allowed dimensions for CloudWatch metrics. By default, worker ID dimension will be disabled. + */ + public static final Set DEFAULT_METRICS_ENABLED_DIMENSIONS = ImmutableSet.builder().addAll( + METRICS_ALWAYS_ENABLED_DIMENSIONS).add(MetricsHelper.SHARD_ID_DIMENSION_NAME).build(); + + /** + * Metrics dimensions that signify all possible dimensions. + */ + public static final Set METRICS_DIMENSIONS_ALL = ImmutableSet.of(IMetricsScope.METRICS_DIMENSIONS_ALL); + + /** + * Client used by the KCL to access the CloudWatch service for reporting metrics. + * + * @return {@link AmazonCloudWatch} + */ + @NonNull + private final AmazonCloudWatch amazonCloudWatch; + + /** + * Buffer metrics for at most this long before publishing to CloudWatch. + * + *

Default value: 10000L

+ */ + private long metricsBufferTimeMillis = 10000L; + + /** + * Buffer at most this many metrics before publishing to CloudWatch. + * + *

Default value: 10000

+ */ + private int metricsMaxQueueSize = 10000; + + /** + * Metrics level for which to enable CloudWatch metrics. + * + *

Default value: {@link MetricsLevel#DETAILED}

+ */ + private MetricsLevel metricsLevel = MetricsLevel.DETAILED; + + /** + * Allowed dimensions for CloudWatchMetrics. + * + *

Default value: {@link MetricsConfig#DEFAULT_METRICS_ENABLED_DIMENSIONS}

+ */ + private Set metricsEnabledDimensions = DEFAULT_METRICS_ENABLED_DIMENSIONS; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/ProcessorConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/ProcessorConfig.java index 33c27faa..aeceac98 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/ProcessorConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/ProcessorConfig.java @@ -10,18 +10,24 @@ * or in the "license" file accompanying this file. This file is distributed * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either * express or implied. See the License for the specific language governing - * permissions and limitations under the License. + * permissions and limitations under the License. */ package software.amazon.kinesis.processor; -import lombok.Data; -import lombok.experimental.Accessors; + import lombok.Data; + import lombok.experimental.Accessors; /** - * + * Used by the KCL to configure the processor for processing the records. */ @Data @Accessors(fluent = true) public class ProcessorConfig { + /** + * Don't call processRecords() on the record processor for empty record lists. + * + *

Default value: false

+ */ + private boolean callProcessRecordsEvenForEmptyRecordList = false; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java index acff73fb..3ab7cb18 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java @@ -15,13 +15,93 @@ package software.amazon.kinesis.retrieval; +import java.util.Optional; + +import com.amazonaws.services.kinesis.AmazonKinesis; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; + import lombok.Data; +import lombok.NonNull; import lombok.experimental.Accessors; +import software.amazon.kinesis.lifecycle.ShardConsumer; /** - * + * Used by the KCL to configure the retrieval of records from Kinesis. */ @Data @Accessors(fluent = true) public class RetrievalConfig { + /** + * Name of the Kinesis stream. + * + * @return String + */ + @NonNull + private final String streamName; + + /** + * Client used to access to Kinesis service. + * + * @return {@link AmazonKinesis} + */ + @NonNull + private final AmazonKinesis amazonKinesis; + + /** + * Max records to fetch from Kinesis in a single GetRecords call. + * + *

Default value: 10000

+ */ + private int maxRecords = 10000; + + /** + * The value for how long the {@link ShardConsumer} should sleep if no records are returned from the call to + * {@link com.amazonaws.services.kinesis.AmazonKinesis#getRecords(com.amazonaws.services.kinesis.model.GetRecordsRequest)}. + * + *

Default value: 1000L

+ */ + private long idleTimeBetweenReadsInMillis = 1000L; + + /** + * Time to wait in seconds before the worker retries to get a record. + * + *

Default value: {@link Optional#empty()}

+ */ + private Optional retryGetRecordsInSeconds = Optional.empty(); + + /** + * The max number of threads in the getRecords thread pool. + * + *

Default value: {@link Optional#empty()}

+ */ + private Optional maxGetRecordsThreadPool = Optional.empty(); + + /** + * The factory that creates the {@link GetRecordsCache} used to getRecords from Kinesis. + * + *

Default value: {@link SimpleRecordsFetcherFactory}

+ */ + private RecordsFetcherFactory recordsFetcherFactory = new SimpleRecordsFetcherFactory(); + + /** + * Backoff time between consecutive ListShards calls. + * + *

Default value: 1500L

+ */ + private long listShardsBackoffTimeInMillis = 1500L; + + /** + * Max number of retries for ListShards when throttled/exception is thrown. + * + *

Default value: 50

+ */ + private int maxListShardsRetryAttempts = 50; + + /** + * The location in the shard from which the KinesisClientLibrary will start fetching records from + * when the application starts for the first time and there is no checkpoint for the shard. + * + *

Default value: {@link InitialPositionInStream#LATEST}

+ */ + private InitialPositionInStream initialPositionInStream = InitialPositionInStream.LATEST; }