diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index b8218968..b250d943 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -17,6 +17,8 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import java.util.Date; import java.util.Set; +import org.apache.commons.lang.Validate; + import com.amazonaws.ClientConfiguration; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.regions.RegionUtils; @@ -25,6 +27,8 @@ import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope; import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; import com.google.common.collect.ImmutableSet; +import lombok.Getter; + /** * Configuration for the Amazon Kinesis Client Library. */ @@ -154,10 +158,10 @@ public class KinesisClientLibConfiguration { */ public static final int DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY = 10; - /* - * The Worker will skip shard sync during initialization if there are one or more leases in the lease table. - * This assumes that the shards and leases are in-sync. - * This enables customers to choose faster startup times (e.g. during incremental deployments of an application). + /** + * The Worker will skip shard sync during initialization if there are one or more leases in the lease table. This + * assumes that the shards and leases are in-sync. This enables customers to choose faster startup times (e.g. + * during incremental deployments of an application). */ public static final boolean DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST = false; @@ -166,6 +170,11 @@ public class KinesisClientLibConfiguration { */ public static final ShardPrioritization DEFAULT_SHARD_PRIORITIZATION = new NoOpShardPrioritization(); + /** + * The size of the thread pool to create for the lease renewer to use. + */ + public static final int DEFAULT_MAX_LEASE_RENEWAL_THREADS = 20; + private String applicationName; private String tableName; private String streamName; @@ -203,6 +212,9 @@ public class KinesisClientLibConfiguration { private boolean skipShardSyncAtWorkerInitializationIfLeasesExist; private ShardPrioritization shardPrioritization; + @Getter + private int maxLeaseRenewalThreads = DEFAULT_MAX_LEASE_RENEWAL_THREADS; + /** * Constructor. * @@ -1058,4 +1070,23 @@ public class KinesisClientLibConfiguration { this.shardPrioritization = shardPrioritization; return this; } + + /** + * Sets the size of the thread pool that will be used to renew leases. + * + * Setting this to low may starve the lease renewal process, and cause the worker to lose leases at a higher rate. + * + * @param maxLeaseRenewalThreads + * the maximum size of the lease renewal thread pool + * @throws IllegalArgumentException + * if maxLeaseRenewalThreads is <= 0 + * @return this configuration object + */ + public KinesisClientLibConfiguration withMaxLeaseRenewalThreads(int maxLeaseRenewalThreads) { + Validate.isTrue(maxLeaseRenewalThreads > 2, + "The maximum number of lease renewal threads must be greater than or equal to 2."); + this.maxLeaseRenewalThreads = maxLeaseRenewalThreads; + + return this; + } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java index 59de31be..42fa7d0c 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java @@ -99,9 +99,10 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator { private static final int DEFAULT_MAX_LEASES_FOR_WORKER = Integer.MAX_VALUE; private static final int DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME = 1; - private static final ThreadFactory LEASE_COORDINATOR_THREAD_FACTORY = new NamedThreadFactory("LeaseCoordinator-"); - private static final ThreadFactory LEASE_RENEWAL_THREAD_FACTORY = new NamedThreadFactory("LeaseRenewer-"); - - // Package level access for testing. - static final int MAX_LEASE_RENEWAL_THREAD_COUNT = 20; - + private static final ThreadFactory LEASE_COORDINATOR_THREAD_FACTORY = new ThreadFactoryBuilder() + .setNameFormat("LeaseCoordinator-%04d").setDaemon(true).build(); + private static final ThreadFactory LEASE_RENEWAL_THREAD_FACTORY = new ThreadFactoryBuilder() + .setNameFormat("LeaseRenewer-%04d").setDaemon(true).build(); private final ILeaseRenewer leaseRenewer; private final ILeaseTaker leaseTaker; @@ -114,7 +115,8 @@ public class LeaseCoordinator { long epsilonMillis, IMetricsFactory metricsFactory) { this(leaseManager, workerIdentifier, leaseDurationMillis, epsilonMillis, - DEFAULT_MAX_LEASES_FOR_WORKER, DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME, metricsFactory); + DEFAULT_MAX_LEASES_FOR_WORKER, DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME, + KinesisClientLibConfiguration.DEFAULT_MAX_LEASE_RENEWAL_THREADS, metricsFactory); } /** @@ -134,8 +136,9 @@ public class LeaseCoordinator { long epsilonMillis, int maxLeasesForWorker, int maxLeasesToStealAtOneTime, + int maxLeaseRenewerThreadCount, IMetricsFactory metricsFactory) { - this.leaseRenewalThreadpool = getLeaseRenewalExecutorService(MAX_LEASE_RENEWAL_THREAD_COUNT); + this.leaseRenewalThreadpool = getLeaseRenewalExecutorService(maxLeaseRenewerThreadCount); this.leaseTaker = new LeaseTaker(leaseManager, workerIdentifier, leaseDurationMillis) .withMaxLeasesForWorker(maxLeasesForWorker) .withMaxLeasesToStealAtOneTime(maxLeasesToStealAtOneTime); @@ -366,6 +369,9 @@ public class LeaseCoordinator { * @return Executor service that should be used for lease renewal. */ private static ExecutorService getLeaseRenewalExecutorService(int maximumPoolSize) { - return Executors.newFixedThreadPool(maximumPoolSize, LEASE_RENEWAL_THREAD_FACTORY); + int coreLeaseCount = Math.max(maximumPoolSize / 4, 2); + + return new ThreadPoolExecutor(coreLeaseCount, maximumPoolSize, 60, TimeUnit.SECONDS, + new LinkedTransferQueue(), LEASE_RENEWAL_THREAD_FACTORY); } }