From b1b02abe25e188e4d98e57ee587ab6b33bb93bda Mon Sep 17 00:00:00 2001 From: "Pfifer, Justin" Date: Wed, 7 Jun 2017 08:10:57 -0700 Subject: [PATCH] Make the lease renewer thread pool size configurable Allow configuration of the lease renewer thread pool size. Users can now control the size of the pool. Additionally core threads are now a quarter of the thread pool size, and aren't allowed to timeout. --- .../worker/KinesisClientLibConfiguration.java | 39 +++++++++++++++++-- .../KinesisClientLibLeaseCoordinator.java | 3 +- .../clientlibrary/lib/worker/Worker.java | 2 + .../kinesis/leases/impl/LeaseCoordinator.java | 26 ++++++++----- 4 files changed, 55 insertions(+), 15 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index b8218968..b250d943 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -17,6 +17,8 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import java.util.Date; import java.util.Set; +import org.apache.commons.lang.Validate; + import com.amazonaws.ClientConfiguration; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.regions.RegionUtils; @@ -25,6 +27,8 @@ import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope; import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; import com.google.common.collect.ImmutableSet; +import lombok.Getter; + /** * Configuration for the Amazon Kinesis Client Library. */ @@ -154,10 +158,10 @@ public class KinesisClientLibConfiguration { */ public static final int DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY = 10; - /* - * The Worker will skip shard sync during initialization if there are one or more leases in the lease table. - * This assumes that the shards and leases are in-sync. - * This enables customers to choose faster startup times (e.g. during incremental deployments of an application). + /** + * The Worker will skip shard sync during initialization if there are one or more leases in the lease table. This + * assumes that the shards and leases are in-sync. This enables customers to choose faster startup times (e.g. + * during incremental deployments of an application). */ public static final boolean DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST = false; @@ -166,6 +170,11 @@ public class KinesisClientLibConfiguration { */ public static final ShardPrioritization DEFAULT_SHARD_PRIORITIZATION = new NoOpShardPrioritization(); + /** + * The size of the thread pool to create for the lease renewer to use. + */ + public static final int DEFAULT_MAX_LEASE_RENEWAL_THREADS = 20; + private String applicationName; private String tableName; private String streamName; @@ -203,6 +212,9 @@ public class KinesisClientLibConfiguration { private boolean skipShardSyncAtWorkerInitializationIfLeasesExist; private ShardPrioritization shardPrioritization; + @Getter + private int maxLeaseRenewalThreads = DEFAULT_MAX_LEASE_RENEWAL_THREADS; + /** * Constructor. * @@ -1058,4 +1070,23 @@ public class KinesisClientLibConfiguration { this.shardPrioritization = shardPrioritization; return this; } + + /** + * Sets the size of the thread pool that will be used to renew leases. + * + * Setting this to low may starve the lease renewal process, and cause the worker to lose leases at a higher rate. + * + * @param maxLeaseRenewalThreads + * the maximum size of the lease renewal thread pool + * @throws IllegalArgumentException + * if maxLeaseRenewalThreads is <= 0 + * @return this configuration object + */ + public KinesisClientLibConfiguration withMaxLeaseRenewalThreads(int maxLeaseRenewalThreads) { + Validate.isTrue(maxLeaseRenewalThreads > 2, + "The maximum number of lease renewal threads must be greater than or equal to 2."); + this.maxLeaseRenewalThreads = maxLeaseRenewalThreads; + + return this; + } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java index 59de31be..42fa7d0c 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java @@ -99,9 +99,10 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator { private static final int DEFAULT_MAX_LEASES_FOR_WORKER = Integer.MAX_VALUE; private static final int DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME = 1; - private static final ThreadFactory LEASE_COORDINATOR_THREAD_FACTORY = new NamedThreadFactory("LeaseCoordinator-"); - private static final ThreadFactory LEASE_RENEWAL_THREAD_FACTORY = new NamedThreadFactory("LeaseRenewer-"); - - // Package level access for testing. - static final int MAX_LEASE_RENEWAL_THREAD_COUNT = 20; - + private static final ThreadFactory LEASE_COORDINATOR_THREAD_FACTORY = new ThreadFactoryBuilder() + .setNameFormat("LeaseCoordinator-%04d").setDaemon(true).build(); + private static final ThreadFactory LEASE_RENEWAL_THREAD_FACTORY = new ThreadFactoryBuilder() + .setNameFormat("LeaseRenewer-%04d").setDaemon(true).build(); private final ILeaseRenewer leaseRenewer; private final ILeaseTaker leaseTaker; @@ -114,7 +115,8 @@ public class LeaseCoordinator { long epsilonMillis, IMetricsFactory metricsFactory) { this(leaseManager, workerIdentifier, leaseDurationMillis, epsilonMillis, - DEFAULT_MAX_LEASES_FOR_WORKER, DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME, metricsFactory); + DEFAULT_MAX_LEASES_FOR_WORKER, DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME, + KinesisClientLibConfiguration.DEFAULT_MAX_LEASE_RENEWAL_THREADS, metricsFactory); } /** @@ -134,8 +136,9 @@ public class LeaseCoordinator { long epsilonMillis, int maxLeasesForWorker, int maxLeasesToStealAtOneTime, + int maxLeaseRenewerThreadCount, IMetricsFactory metricsFactory) { - this.leaseRenewalThreadpool = getLeaseRenewalExecutorService(MAX_LEASE_RENEWAL_THREAD_COUNT); + this.leaseRenewalThreadpool = getLeaseRenewalExecutorService(maxLeaseRenewerThreadCount); this.leaseTaker = new LeaseTaker(leaseManager, workerIdentifier, leaseDurationMillis) .withMaxLeasesForWorker(maxLeasesForWorker) .withMaxLeasesToStealAtOneTime(maxLeasesToStealAtOneTime); @@ -366,6 +369,9 @@ public class LeaseCoordinator { * @return Executor service that should be used for lease renewal. */ private static ExecutorService getLeaseRenewalExecutorService(int maximumPoolSize) { - return Executors.newFixedThreadPool(maximumPoolSize, LEASE_RENEWAL_THREAD_FACTORY); + int coreLeaseCount = Math.max(maximumPoolSize / 4, 2); + + return new ThreadPoolExecutor(coreLeaseCount, maximumPoolSize, 60, TimeUnit.SECONDS, + new LinkedTransferQueue(), LEASE_RENEWAL_THREAD_FACTORY); } }