Make the lease renewer thread pool size configurable

Allow configuration of the lease renewer thread pool size.  Users can
now control the size of the pool.  Additionally core threads are now
a quarter of the thread pool size, and aren't allowed to timeout.
This commit is contained in:
Pfifer, Justin 2017-06-07 08:10:57 -07:00
parent 8d339bdb88
commit b1b02abe25
4 changed files with 55 additions and 15 deletions

View file

@ -17,6 +17,8 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.util.Date; import java.util.Date;
import java.util.Set; import java.util.Set;
import org.apache.commons.lang.Validate;
import com.amazonaws.ClientConfiguration; import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.regions.RegionUtils; import com.amazonaws.regions.RegionUtils;
@ -25,6 +27,8 @@ import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope;
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import lombok.Getter;
/** /**
* Configuration for the Amazon Kinesis Client Library. * Configuration for the Amazon Kinesis Client Library.
*/ */
@ -154,10 +158,10 @@ public class KinesisClientLibConfiguration {
*/ */
public static final int DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY = 10; public static final int DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY = 10;
/* /**
* The Worker will skip shard sync during initialization if there are one or more leases in the lease table. * The Worker will skip shard sync during initialization if there are one or more leases in the lease table. This
* This assumes that the shards and leases are in-sync. * assumes that the shards and leases are in-sync. This enables customers to choose faster startup times (e.g.
* This enables customers to choose faster startup times (e.g. during incremental deployments of an application). * during incremental deployments of an application).
*/ */
public static final boolean DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST = false; public static final boolean DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST = false;
@ -166,6 +170,11 @@ public class KinesisClientLibConfiguration {
*/ */
public static final ShardPrioritization DEFAULT_SHARD_PRIORITIZATION = new NoOpShardPrioritization(); public static final ShardPrioritization DEFAULT_SHARD_PRIORITIZATION = new NoOpShardPrioritization();
/**
* The size of the thread pool to create for the lease renewer to use.
*/
public static final int DEFAULT_MAX_LEASE_RENEWAL_THREADS = 20;
private String applicationName; private String applicationName;
private String tableName; private String tableName;
private String streamName; private String streamName;
@ -203,6 +212,9 @@ public class KinesisClientLibConfiguration {
private boolean skipShardSyncAtWorkerInitializationIfLeasesExist; private boolean skipShardSyncAtWorkerInitializationIfLeasesExist;
private ShardPrioritization shardPrioritization; private ShardPrioritization shardPrioritization;
@Getter
private int maxLeaseRenewalThreads = DEFAULT_MAX_LEASE_RENEWAL_THREADS;
/** /**
* Constructor. * Constructor.
* *
@ -1058,4 +1070,23 @@ public class KinesisClientLibConfiguration {
this.shardPrioritization = shardPrioritization; this.shardPrioritization = shardPrioritization;
return this; return this;
} }
/**
* Sets the size of the thread pool that will be used to renew leases.
*
* Setting this to low may starve the lease renewal process, and cause the worker to lose leases at a higher rate.
*
* @param maxLeaseRenewalThreads
* the maximum size of the lease renewal thread pool
* @throws IllegalArgumentException
* if maxLeaseRenewalThreads is <= 0
* @return this configuration object
*/
public KinesisClientLibConfiguration withMaxLeaseRenewalThreads(int maxLeaseRenewalThreads) {
Validate.isTrue(maxLeaseRenewalThreads > 2,
"The maximum number of lease renewal threads must be greater than or equal to 2.");
this.maxLeaseRenewalThreads = maxLeaseRenewalThreads;
return this;
}
} }

View file

@ -99,9 +99,10 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator<KinesisClientLea
long epsilonMillis, long epsilonMillis,
int maxLeasesForWorker, int maxLeasesForWorker,
int maxLeasesToStealAtOneTime, int maxLeasesToStealAtOneTime,
int maxLeaseRenewerThreadCount,
IMetricsFactory metricsFactory) { IMetricsFactory metricsFactory) {
super(leaseManager, workerIdentifier, leaseDurationMillis, epsilonMillis, maxLeasesForWorker, super(leaseManager, workerIdentifier, leaseDurationMillis, epsilonMillis, maxLeasesForWorker,
maxLeasesToStealAtOneTime, metricsFactory); maxLeasesToStealAtOneTime, maxLeaseRenewerThreadCount, metricsFactory);
this.leaseManager = leaseManager; this.leaseManager = leaseManager;
} }

View file

@ -238,6 +238,7 @@ public class Worker implements Runnable {
config.getEpsilonMillis(), config.getEpsilonMillis(),
config.getMaxLeasesForWorker(), config.getMaxLeasesForWorker(),
config.getMaxLeasesToStealAtOneTime(), config.getMaxLeasesToStealAtOneTime(),
config.getMaxLeaseRenewalThreads(),
metricsFactory) metricsFactory)
.withInitialLeaseTableReadCapacity(config.getInitialLeaseTableReadCapacity()) .withInitialLeaseTableReadCapacity(config.getInitialLeaseTableReadCapacity())
.withInitialLeaseTableWriteCapacity(config.getInitialLeaseTableWriteCapacity()), .withInitialLeaseTableWriteCapacity(config.getInitialLeaseTableWriteCapacity()),
@ -1110,6 +1111,7 @@ public class Worker implements Runnable {
config.getEpsilonMillis(), config.getEpsilonMillis(),
config.getMaxLeasesForWorker(), config.getMaxLeasesForWorker(),
config.getMaxLeasesToStealAtOneTime(), config.getMaxLeasesToStealAtOneTime(),
config.getMaxLeaseRenewalThreads(),
metricsFactory) metricsFactory)
.withInitialLeaseTableReadCapacity(config.getInitialLeaseTableReadCapacity()) .withInitialLeaseTableReadCapacity(config.getInitialLeaseTableReadCapacity())
.withInitialLeaseTableWriteCapacity(config.getInitialLeaseTableWriteCapacity()), .withInitialLeaseTableWriteCapacity(config.getInitialLeaseTableWriteCapacity()),

View file

@ -19,15 +19,17 @@ import java.util.Map;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import com.amazonaws.services.kinesis.clientlibrary.utils.NamedThreadFactory; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.leases.exceptions.DependencyException; import com.amazonaws.services.kinesis.leases.exceptions.DependencyException;
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.leases.exceptions.LeasingException; import com.amazonaws.services.kinesis.leases.exceptions.LeasingException;
@ -40,6 +42,7 @@ import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope; import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope;
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/** /**
* LeaseCoordinator abstracts away LeaseTaker and LeaseRenewer from the application code that's using leasing. It owns * LeaseCoordinator abstracts away LeaseTaker and LeaseRenewer from the application code that's using leasing. It owns
@ -63,12 +66,10 @@ public class LeaseCoordinator<T extends Lease> {
private static final int DEFAULT_MAX_LEASES_FOR_WORKER = Integer.MAX_VALUE; private static final int DEFAULT_MAX_LEASES_FOR_WORKER = Integer.MAX_VALUE;
private static final int DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME = 1; private static final int DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME = 1;
private static final ThreadFactory LEASE_COORDINATOR_THREAD_FACTORY = new NamedThreadFactory("LeaseCoordinator-"); private static final ThreadFactory LEASE_COORDINATOR_THREAD_FACTORY = new ThreadFactoryBuilder()
private static final ThreadFactory LEASE_RENEWAL_THREAD_FACTORY = new NamedThreadFactory("LeaseRenewer-"); .setNameFormat("LeaseCoordinator-%04d").setDaemon(true).build();
private static final ThreadFactory LEASE_RENEWAL_THREAD_FACTORY = new ThreadFactoryBuilder()
// Package level access for testing. .setNameFormat("LeaseRenewer-%04d").setDaemon(true).build();
static final int MAX_LEASE_RENEWAL_THREAD_COUNT = 20;
private final ILeaseRenewer<T> leaseRenewer; private final ILeaseRenewer<T> leaseRenewer;
private final ILeaseTaker<T> leaseTaker; private final ILeaseTaker<T> leaseTaker;
@ -114,7 +115,8 @@ public class LeaseCoordinator<T extends Lease> {
long epsilonMillis, long epsilonMillis,
IMetricsFactory metricsFactory) { IMetricsFactory metricsFactory) {
this(leaseManager, workerIdentifier, leaseDurationMillis, epsilonMillis, this(leaseManager, workerIdentifier, leaseDurationMillis, epsilonMillis,
DEFAULT_MAX_LEASES_FOR_WORKER, DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME, metricsFactory); DEFAULT_MAX_LEASES_FOR_WORKER, DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME,
KinesisClientLibConfiguration.DEFAULT_MAX_LEASE_RENEWAL_THREADS, metricsFactory);
} }
/** /**
@ -134,8 +136,9 @@ public class LeaseCoordinator<T extends Lease> {
long epsilonMillis, long epsilonMillis,
int maxLeasesForWorker, int maxLeasesForWorker,
int maxLeasesToStealAtOneTime, int maxLeasesToStealAtOneTime,
int maxLeaseRenewerThreadCount,
IMetricsFactory metricsFactory) { IMetricsFactory metricsFactory) {
this.leaseRenewalThreadpool = getLeaseRenewalExecutorService(MAX_LEASE_RENEWAL_THREAD_COUNT); this.leaseRenewalThreadpool = getLeaseRenewalExecutorService(maxLeaseRenewerThreadCount);
this.leaseTaker = new LeaseTaker<T>(leaseManager, workerIdentifier, leaseDurationMillis) this.leaseTaker = new LeaseTaker<T>(leaseManager, workerIdentifier, leaseDurationMillis)
.withMaxLeasesForWorker(maxLeasesForWorker) .withMaxLeasesForWorker(maxLeasesForWorker)
.withMaxLeasesToStealAtOneTime(maxLeasesToStealAtOneTime); .withMaxLeasesToStealAtOneTime(maxLeasesToStealAtOneTime);
@ -366,6 +369,9 @@ public class LeaseCoordinator<T extends Lease> {
* @return Executor service that should be used for lease renewal. * @return Executor service that should be used for lease renewal.
*/ */
private static ExecutorService getLeaseRenewalExecutorService(int maximumPoolSize) { private static ExecutorService getLeaseRenewalExecutorService(int maximumPoolSize) {
return Executors.newFixedThreadPool(maximumPoolSize, LEASE_RENEWAL_THREAD_FACTORY); int coreLeaseCount = Math.max(maximumPoolSize / 4, 2);
return new ThreadPoolExecutor(coreLeaseCount, maximumPoolSize, 60, TimeUnit.SECONDS,
new LinkedTransferQueue<Runnable>(), LEASE_RENEWAL_THREAD_FACTORY);
} }
} }