Migrating configuration from KinesisClientLibConfiguration to top level package configurations.

This commit is contained in:
Sahil Palvia 2018-03-16 11:43:26 -07:00
parent ae49097c49
commit 6483e291f8
7 changed files with 312 additions and 10 deletions

View file

@ -17,11 +17,19 @@ package software.amazon.kinesis.checkpoint;
import lombok.Data;
import lombok.experimental.Accessors;
import software.amazon.kinesis.coordinator.RecordProcessorCheckpointer;
/**
*
* Used by the KCL to manage checkpointing.
*/
@Data
@Accessors(fluent = true)
public class CheckpointConfig {
/**
* KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls
* to {@link RecordProcessorCheckpointer#checkpoint(String)} by default.
*
* <p>Default value: true</p>
*/
private boolean validateSequenceNumberBeforeCheckpointing = true;
}

View file

@ -16,12 +16,47 @@
package software.amazon.kinesis.coordinator;
import lombok.Data;
import lombok.NonNull;
import lombok.experimental.Accessors;
import software.amazon.kinesis.leases.NoOpShardPrioritization;
import software.amazon.kinesis.leases.ShardPrioritization;
/**
*
* Used by the KCL to configure the coordinator.
*/
@Data
@Accessors(fluent = true)
public class CoordinatorConfig {
/**
* Application name used by checkpointer to checkpoint.
*
* @return String
*/
@NonNull
private final String applicationName;
/**
* Interval in milliseconds between polling to check for parent shard completion.
* Polling frequently will take up more DynamoDB IOPS (when there are leases for shards waiting on
* completion of parent shards).
*
* <p>Default value: 10000L</p>
*/
private long parentShardPollIntervalMillis = 10000L;
/**
* The Worker will skip shard sync during initialization if there are one or more leases in the lease table. This
* assumes that the shards and leases are in-sync. This enables customers to choose faster startup times (e.g.
* during incremental deployments of an application).
*
* <p>Default value: false</p>
*/
private boolean skipShardSyncAtWorkerInitializationIfLeasesExist = false;
/**
* Shard prioritization strategy.
*
* <p>Default value: {@link NoOpShardPrioritization}</p>
*/
private ShardPrioritization shardPrioritization = new NoOpShardPrioritization();
}

View file

@ -15,13 +15,107 @@
package software.amazon.kinesis.leases;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import lombok.Data;
import lombok.NonNull;
import lombok.experimental.Accessors;
/**
*
* Used by the KCL to configure lease management.
*/
@Data
@Accessors(fluent = true)
public class LeaseManagementConfig {
private static final long EPSILON_MS = 25L;
/**
* Name of the table to use in DynamoDB
*
* @return String
*/
@NonNull
private final String tableName;
/**
* Client to be used to access DynamoDB service.
*
* @return AmazonDynamoDB
*/
@NonNull
private final AmazonDynamoDB amazonDynamoDB;
/**
* Used to distinguish different workers/processes of a KCL application.
*
* @return String
*/
@NonNull
private final String workerIdentifier;
/**
* Fail over time in milliseconds. A worker which does not renew it's lease within this time interval
* will be regarded as having problems and it's shards will be assigned to other workers.
* For applications that have a large number of shards, this may be set to a higher number to reduce
* the number of DynamoDB IOPS required for tracking leases.
*
* <p>Default value: 10000L</p>
*/
private long failoverTimeMillis = 10000L;
/**
* Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks.
*
* <p>Default value: 60000L</p>
*/
private long shardSyncIntervalMillis = 60000L;
/**
* Cleanup leases upon shards completion (don't wait until they expire in Kinesis).
* Keeping leases takes some tracking/resources (e.g. they need to be renewed, assigned), so by default we try
* to delete the ones we don't need any longer.
*
* <p>Default value: true</p>
*/
private boolean cleanupLeasesUponShardCompletion = true;
/**
* The max number of leases (shards) this worker should process.
* This can be useful to avoid overloading (and thrashing) a worker when a host has resource constraints
* or during deployment.
*
* <p>NOTE: Setting this to a low value can cause data loss if workers are not able to pick up all shards in the
* stream due to the max limit.</p>
*
* <p>Default value: {@link Integer#MAX_VALUE}</p>
*/
private int maxLeasesForWorker = Integer.MAX_VALUE;;
/**
* Max leases to steal from another worker at one time (for load balancing).
* Setting this to a higher number can allow for faster load convergence (e.g. during deployments, cold starts),
* but can cause higher churn in the system.
*
* <p>Default value: 1</p>
*/
private int maxLeasesToStealAtOneTime = 1;
/**
* The Amazon DynamoDB table used for tracking leases will be provisioned with this read capacity.
*
* <p>Default value: 10</p>
*/
private int initialLeaseTableReadCapacity = 10;
/**
* The Amazon DynamoDB table used for tracking leases will be provisioned with this write capacity.
*
* <p>Default value: 10</p>
*/
private int initialLeaseTableWriteCapacity = 10;
/**
* The size of the thread pool to create for the lease renewer to use.
*
* <p>Default value: 20</p>
*/
private int maxLeaseRenewalThreads = 20;
}

View file

@ -15,13 +15,35 @@
package software.amazon.kinesis.lifecycle;
import java.util.Optional;
import lombok.Data;
import lombok.experimental.Accessors;
/**
*
* Used by the KCL to configure the lifecycle.
*/
@Data
@Accessors(fluent = true)
public class LifecycleConfig {
/**
* The amount of milliseconds to wait before graceful shutdown forcefully terminates.
*
* <p>Default value: 5000L</p>
*/
private long shutdownGraceMillis = 5000L;
/**
* Logs warn message if as task is held in a task for more than the set time.
*
* <p>Default value: {@link Optional#empty()}</p>
*/
private Optional<Long> logWarningForTaskAfterMillis = Optional.empty();
/**
* Backoff time in milliseconds for Amazon Kinesis Client Library tasks (in the event of failures).
*
* <p>Default value: 500L</p>
*/
private long taskBackoffTimeMillis = 500L;
}

View file

@ -15,13 +15,70 @@
package software.amazon.kinesis.metrics;
import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
import com.google.common.collect.ImmutableSet;
import lombok.Data;
import lombok.NonNull;
import lombok.experimental.Accessors;
import java.util.Set;
/**
*
* Used by KCL to configure the metrics reported by the application.
*/
@Data
@Accessors(fluent = true)
public class MetricsConfig {
/**
* Metrics dimensions that always will be enabled regardless of the config provided by user.
*/
public static final Set<String> METRICS_ALWAYS_ENABLED_DIMENSIONS = ImmutableSet.of(
MetricsHelper.OPERATION_DIMENSION_NAME);
/**
* Allowed dimensions for CloudWatch metrics. By default, worker ID dimension will be disabled.
*/
public static final Set<String> DEFAULT_METRICS_ENABLED_DIMENSIONS = ImmutableSet.<String>builder().addAll(
METRICS_ALWAYS_ENABLED_DIMENSIONS).add(MetricsHelper.SHARD_ID_DIMENSION_NAME).build();
/**
* Metrics dimensions that signify all possible dimensions.
*/
public static final Set<String> METRICS_DIMENSIONS_ALL = ImmutableSet.of(IMetricsScope.METRICS_DIMENSIONS_ALL);
/**
* Client used by the KCL to access the CloudWatch service for reporting metrics.
*
* @return {@link AmazonCloudWatch}
*/
@NonNull
private final AmazonCloudWatch amazonCloudWatch;
/**
* Buffer metrics for at most this long before publishing to CloudWatch.
*
* <p>Default value: 10000L</p>
*/
private long metricsBufferTimeMillis = 10000L;
/**
* Buffer at most this many metrics before publishing to CloudWatch.
*
* <p>Default value: 10000</p>
*/
private int metricsMaxQueueSize = 10000;
/**
* Metrics level for which to enable CloudWatch metrics.
*
* <p>Default value: {@link MetricsLevel#DETAILED}</p>
*/
private MetricsLevel metricsLevel = MetricsLevel.DETAILED;
/**
* Allowed dimensions for CloudWatchMetrics.
*
* <p>Default value: {@link MetricsConfig#DEFAULT_METRICS_ENABLED_DIMENSIONS}</p>
*/
private Set<String> metricsEnabledDimensions = DEFAULT_METRICS_ENABLED_DIMENSIONS;
}

View file

@ -10,18 +10,24 @@
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
* permissions and limitations under the License.
*/
package software.amazon.kinesis.processor;
import lombok.Data;
import lombok.experimental.Accessors;
import lombok.Data;
import lombok.experimental.Accessors;
/**
*
* Used by the KCL to configure the processor for processing the records.
*/
@Data
@Accessors(fluent = true)
public class ProcessorConfig {
/**
* Don't call processRecords() on the record processor for empty record lists.
*
* <p>Default value: false</p>
*/
private boolean callProcessRecordsEvenForEmptyRecordList = false;
}

View file

@ -15,13 +15,93 @@
package software.amazon.kinesis.retrieval;
import java.util.Optional;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import lombok.Data;
import lombok.NonNull;
import lombok.experimental.Accessors;
import software.amazon.kinesis.lifecycle.ShardConsumer;
/**
*
* Used by the KCL to configure the retrieval of records from Kinesis.
*/
@Data
@Accessors(fluent = true)
public class RetrievalConfig {
/**
* Name of the Kinesis stream.
*
* @return String
*/
@NonNull
private final String streamName;
/**
* Client used to access to Kinesis service.
*
* @return {@link AmazonKinesis}
*/
@NonNull
private final AmazonKinesis amazonKinesis;
/**
* Max records to fetch from Kinesis in a single GetRecords call.
*
* <p>Default value: 10000</p>
*/
private int maxRecords = 10000;
/**
* The value for how long the {@link ShardConsumer} should sleep if no records are returned from the call to
* {@link com.amazonaws.services.kinesis.AmazonKinesis#getRecords(com.amazonaws.services.kinesis.model.GetRecordsRequest)}.
*
* <p>Default value: 1000L</p>
*/
private long idleTimeBetweenReadsInMillis = 1000L;
/**
* Time to wait in seconds before the worker retries to get a record.
*
* <p>Default value: {@link Optional#empty()}</p>
*/
private Optional<Integer> retryGetRecordsInSeconds = Optional.empty();
/**
* The max number of threads in the getRecords thread pool.
*
* <p>Default value: {@link Optional#empty()}</p>
*/
private Optional<Integer> maxGetRecordsThreadPool = Optional.empty();
/**
* The factory that creates the {@link GetRecordsCache} used to getRecords from Kinesis.
*
* <p>Default value: {@link SimpleRecordsFetcherFactory}</p>
*/
private RecordsFetcherFactory recordsFetcherFactory = new SimpleRecordsFetcherFactory();
/**
* Backoff time between consecutive ListShards calls.
*
* <p>Default value: 1500L</p>
*/
private long listShardsBackoffTimeInMillis = 1500L;
/**
* Max number of retries for ListShards when throttled/exception is thrown.
*
* <p>Default value: 50</p>
*/
private int maxListShardsRetryAttempts = 50;
/**
* The location in the shard from which the KinesisClientLibrary will start fetching records from
* when the application starts for the first time and there is no checkpoint for the shard.
*
* <p>Default value: {@link InitialPositionInStream#LATEST}</p>
*/
private InitialPositionInStream initialPositionInStream = InitialPositionInStream.LATEST;
}