Merge branch 'master' into requested-shutdown-fix

This commit is contained in:
Pfifer, Justin 2017-07-21 08:19:40 -07:00
commit 89ebbd6d3b
4 changed files with 100 additions and 38 deletions

View file

@ -17,6 +17,8 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.util.Date; import java.util.Date;
import java.util.Set; import java.util.Set;
import org.apache.commons.lang.Validate;
import com.amazonaws.ClientConfiguration; import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.regions.RegionUtils; import com.amazonaws.regions.RegionUtils;
@ -25,6 +27,8 @@ import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope;
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import lombok.Getter;
/** /**
* Configuration for the Amazon Kinesis Client Library. * Configuration for the Amazon Kinesis Client Library.
*/ */
@ -155,10 +159,10 @@ public class KinesisClientLibConfiguration {
*/ */
public static final int DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY = 10; public static final int DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY = 10;
/* /**
* The Worker will skip shard sync during initialization if there are one or more leases in the lease table. * The Worker will skip shard sync during initialization if there are one or more leases in the lease table. This
* This assumes that the shards and leases are in-sync. * assumes that the shards and leases are in-sync. This enables customers to choose faster startup times (e.g.
* This enables customers to choose faster startup times (e.g. during incremental deployments of an application). * during incremental deployments of an application).
*/ */
public static final boolean DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST = false; public static final boolean DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST = false;
@ -167,6 +171,11 @@ public class KinesisClientLibConfiguration {
*/ */
public static final ShardPrioritization DEFAULT_SHARD_PRIORITIZATION = new NoOpShardPrioritization(); public static final ShardPrioritization DEFAULT_SHARD_PRIORITIZATION = new NoOpShardPrioritization();
/**
* The size of the thread pool to create for the lease renewer to use.
*/
public static final int DEFAULT_MAX_LEASE_RENEWAL_THREADS = 20;
private String applicationName; private String applicationName;
private String tableName; private String tableName;
private String streamName; private String streamName;
@ -204,6 +213,9 @@ public class KinesisClientLibConfiguration {
private boolean skipShardSyncAtWorkerInitializationIfLeasesExist; private boolean skipShardSyncAtWorkerInitializationIfLeasesExist;
private ShardPrioritization shardPrioritization; private ShardPrioritization shardPrioritization;
@Getter
private int maxLeaseRenewalThreads = DEFAULT_MAX_LEASE_RENEWAL_THREADS;
/** /**
* Constructor. * Constructor.
* *
@ -1075,4 +1087,23 @@ public class KinesisClientLibConfiguration {
this.shardPrioritization = shardPrioritization; this.shardPrioritization = shardPrioritization;
return this; return this;
} }
/**
* Sets the size of the thread pool that will be used to renew leases.
*
* Setting this to low may starve the lease renewal process, and cause the worker to lose leases at a higher rate.
*
* @param maxLeaseRenewalThreads
* the maximum size of the lease renewal thread pool
* @throws IllegalArgumentException
* if maxLeaseRenewalThreads is <= 0
* @return this configuration object
*/
public KinesisClientLibConfiguration withMaxLeaseRenewalThreads(int maxLeaseRenewalThreads) {
Validate.isTrue(maxLeaseRenewalThreads > 2,
"The maximum number of lease renewal threads must be greater than or equal to 2.");
this.maxLeaseRenewalThreads = maxLeaseRenewalThreads;
return this;
}
} }

View file

@ -99,9 +99,10 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator<KinesisClientLea
long epsilonMillis, long epsilonMillis,
int maxLeasesForWorker, int maxLeasesForWorker,
int maxLeasesToStealAtOneTime, int maxLeasesToStealAtOneTime,
int maxLeaseRenewerThreadCount,
IMetricsFactory metricsFactory) { IMetricsFactory metricsFactory) {
super(leaseManager, workerIdentifier, leaseDurationMillis, epsilonMillis, maxLeasesForWorker, super(leaseManager, workerIdentifier, leaseDurationMillis, epsilonMillis, maxLeasesForWorker,
maxLeasesToStealAtOneTime, metricsFactory); maxLeasesToStealAtOneTime, maxLeaseRenewerThreadCount, metricsFactory);
this.leaseManager = leaseManager; this.leaseManager = leaseManager;
} }

View file

@ -252,12 +252,21 @@ public class Worker implements Runnable {
config.getShardSyncIntervalMillis(), config.shouldCleanupLeasesUponShardCompletion(), null, config.getShardSyncIntervalMillis(), config.shouldCleanupLeasesUponShardCompletion(), null,
new KinesisClientLibLeaseCoordinator( new KinesisClientLibLeaseCoordinator(
new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient), new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient),
config.getWorkerIdentifier(), config.getFailoverTimeMillis(), config.getEpsilonMillis(), config.getWorkerIdentifier(),
config.getMaxLeasesForWorker(), config.getMaxLeasesToStealAtOneTime(), metricsFactory) config.getFailoverTimeMillis(),
.withInitialLeaseTableReadCapacity(config.getInitialLeaseTableReadCapacity()) config.getEpsilonMillis(),
.withInitialLeaseTableWriteCapacity(config.getInitialLeaseTableWriteCapacity()), config.getMaxLeasesForWorker(),
execService, metricsFactory, config.getTaskBackoffTimeMillis(), config.getFailoverTimeMillis(), config.getMaxLeasesToStealAtOneTime(),
config.getSkipShardSyncAtWorkerInitializationIfLeasesExist(), config.getShardPrioritizationStrategy()); config.getMaxLeaseRenewalThreads(),
metricsFactory)
.withInitialLeaseTableReadCapacity(config.getInitialLeaseTableReadCapacity())
.withInitialLeaseTableWriteCapacity(config.getInitialLeaseTableWriteCapacity()),
execService,
metricsFactory,
config.getTaskBackoffTimeMillis(),
config.getFailoverTimeMillis(),
config.getSkipShardSyncAtWorkerInitializationIfLeasesExist(),
config.getShardPrioritizationStrategy());
// If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB. // If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB.
if (config.getRegionName() != null) { if (config.getRegionName() != null) {
@ -1173,23 +1182,38 @@ public class Worker implements Runnable {
shardPrioritization = new ParentsFirstShardPrioritization(1); shardPrioritization = new ParentsFirstShardPrioritization(1);
} }
return new Worker(config.getApplicationName(), recordProcessorFactory, new StreamConfig(
new KinesisProxyFactory(config.getKinesisCredentialsProvider(), kinesisClient) return new Worker(config.getApplicationName(),
.getProxy(config.getStreamName()), recordProcessorFactory,
config.getMaxRecords(), config.getIdleTimeBetweenReadsInMillis(), new StreamConfig(new KinesisProxyFactory(config.getKinesisCredentialsProvider(),
config.shouldCallProcessRecordsEvenForEmptyRecordList(), kinesisClient).getProxy(config.getStreamName()),
config.shouldValidateSequenceNumberBeforeCheckpointing(), config.getMaxRecords(),
config.getInitialPositionInStreamExtended()), config.getInitialPositionInStreamExtended(), config.getIdleTimeBetweenReadsInMillis(),
config.getParentShardPollIntervalMillis(), config.getShardSyncIntervalMillis(), config.shouldCallProcessRecordsEvenForEmptyRecordList(),
config.shouldCleanupLeasesUponShardCompletion(), null, config.shouldValidateSequenceNumberBeforeCheckpointing(),
new KinesisClientLibLeaseCoordinator( config.getInitialPositionInStreamExtended()),
new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient), config.getInitialPositionInStreamExtended(),
config.getWorkerIdentifier(), config.getFailoverTimeMillis(), config.getEpsilonMillis(), config.getParentShardPollIntervalMillis(),
config.getMaxLeasesForWorker(), config.getMaxLeasesToStealAtOneTime(), metricsFactory) config.getShardSyncIntervalMillis(),
.withInitialLeaseTableReadCapacity(config.getInitialLeaseTableReadCapacity()) config.shouldCleanupLeasesUponShardCompletion(),
.withInitialLeaseTableWriteCapacity(config.getInitialLeaseTableWriteCapacity()), null,
execService, metricsFactory, config.getTaskBackoffTimeMillis(), config.getFailoverTimeMillis(), new KinesisClientLibLeaseCoordinator(new KinesisClientLeaseManager(config.getTableName(),
config.getSkipShardSyncAtWorkerInitializationIfLeasesExist(), shardPrioritization); dynamoDBClient),
config.getWorkerIdentifier(),
config.getFailoverTimeMillis(),
config.getEpsilonMillis(),
config.getMaxLeasesForWorker(),
config.getMaxLeasesToStealAtOneTime(),
config.getMaxLeaseRenewalThreads(),
metricsFactory)
.withInitialLeaseTableReadCapacity(config.getInitialLeaseTableReadCapacity())
.withInitialLeaseTableWriteCapacity(config.getInitialLeaseTableWriteCapacity()),
execService,
metricsFactory,
config.getTaskBackoffTimeMillis(),
config.getFailoverTimeMillis(),
config.getSkipShardSyncAtWorkerInitializationIfLeasesExist(),
shardPrioritization);
} }

View file

@ -19,15 +19,17 @@ import java.util.Map;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import com.amazonaws.services.kinesis.clientlibrary.utils.NamedThreadFactory; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.leases.exceptions.DependencyException; import com.amazonaws.services.kinesis.leases.exceptions.DependencyException;
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.leases.exceptions.LeasingException; import com.amazonaws.services.kinesis.leases.exceptions.LeasingException;
@ -40,6 +42,7 @@ import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope; import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope;
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/** /**
* LeaseCoordinator abstracts away LeaseTaker and LeaseRenewer from the application code that's using leasing. It owns * LeaseCoordinator abstracts away LeaseTaker and LeaseRenewer from the application code that's using leasing. It owns
@ -63,12 +66,10 @@ public class LeaseCoordinator<T extends Lease> {
private static final int DEFAULT_MAX_LEASES_FOR_WORKER = Integer.MAX_VALUE; private static final int DEFAULT_MAX_LEASES_FOR_WORKER = Integer.MAX_VALUE;
private static final int DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME = 1; private static final int DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME = 1;
private static final ThreadFactory LEASE_COORDINATOR_THREAD_FACTORY = new NamedThreadFactory("LeaseCoordinator-"); private static final ThreadFactory LEASE_COORDINATOR_THREAD_FACTORY = new ThreadFactoryBuilder()
private static final ThreadFactory LEASE_RENEWAL_THREAD_FACTORY = new NamedThreadFactory("LeaseRenewer-"); .setNameFormat("LeaseCoordinator-%04d").setDaemon(true).build();
private static final ThreadFactory LEASE_RENEWAL_THREAD_FACTORY = new ThreadFactoryBuilder()
// Package level access for testing. .setNameFormat("LeaseRenewer-%04d").setDaemon(true).build();
static final int MAX_LEASE_RENEWAL_THREAD_COUNT = 20;
private final ILeaseRenewer<T> leaseRenewer; private final ILeaseRenewer<T> leaseRenewer;
private final ILeaseTaker<T> leaseTaker; private final ILeaseTaker<T> leaseTaker;
@ -114,7 +115,8 @@ public class LeaseCoordinator<T extends Lease> {
long epsilonMillis, long epsilonMillis,
IMetricsFactory metricsFactory) { IMetricsFactory metricsFactory) {
this(leaseManager, workerIdentifier, leaseDurationMillis, epsilonMillis, this(leaseManager, workerIdentifier, leaseDurationMillis, epsilonMillis,
DEFAULT_MAX_LEASES_FOR_WORKER, DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME, metricsFactory); DEFAULT_MAX_LEASES_FOR_WORKER, DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME,
KinesisClientLibConfiguration.DEFAULT_MAX_LEASE_RENEWAL_THREADS, metricsFactory);
} }
/** /**
@ -134,8 +136,9 @@ public class LeaseCoordinator<T extends Lease> {
long epsilonMillis, long epsilonMillis,
int maxLeasesForWorker, int maxLeasesForWorker,
int maxLeasesToStealAtOneTime, int maxLeasesToStealAtOneTime,
int maxLeaseRenewerThreadCount,
IMetricsFactory metricsFactory) { IMetricsFactory metricsFactory) {
this.leaseRenewalThreadpool = getLeaseRenewalExecutorService(MAX_LEASE_RENEWAL_THREAD_COUNT); this.leaseRenewalThreadpool = getLeaseRenewalExecutorService(maxLeaseRenewerThreadCount);
this.leaseTaker = new LeaseTaker<T>(leaseManager, workerIdentifier, leaseDurationMillis) this.leaseTaker = new LeaseTaker<T>(leaseManager, workerIdentifier, leaseDurationMillis)
.withMaxLeasesForWorker(maxLeasesForWorker) .withMaxLeasesForWorker(maxLeasesForWorker)
.withMaxLeasesToStealAtOneTime(maxLeasesToStealAtOneTime); .withMaxLeasesToStealAtOneTime(maxLeasesToStealAtOneTime);
@ -366,6 +369,9 @@ public class LeaseCoordinator<T extends Lease> {
* @return Executor service that should be used for lease renewal. * @return Executor service that should be used for lease renewal.
*/ */
private static ExecutorService getLeaseRenewalExecutorService(int maximumPoolSize) { private static ExecutorService getLeaseRenewalExecutorService(int maximumPoolSize) {
return Executors.newFixedThreadPool(maximumPoolSize, LEASE_RENEWAL_THREAD_FACTORY); int coreLeaseCount = Math.max(maximumPoolSize / 4, 2);
return new ThreadPoolExecutor(coreLeaseCount, maximumPoolSize, 60, TimeUnit.SECONDS,
new LinkedTransferQueue<Runnable>(), LEASE_RENEWAL_THREAD_FACTORY);
} }
} }