From 14e2413c13293693112949294a55ab5d9f272cb9 Mon Sep 17 00:00:00 2001 From: Justin Pfifer Date: Thu, 20 Jul 2017 12:03:34 -0700 Subject: [PATCH 1/2] Advance version, and drop Java 7 support (#176) * Advance version, and drop Java 7 support Advanced the version to 1.8.0 as Java 7 support is being removed. * Remove build settings for Java 7 Removed the configuration for Java 7, and switched to trusty for access to openjdk8. --- .travis.yml | 6 +++--- pom.xml | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/.travis.yml b/.travis.yml index ebb7a2ac..320f811c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,6 +1,6 @@ language: java jdk: - - openjdk7 - - oraclejdk7 + - openjdk8 - oraclejdk8 -sudo: false \ No newline at end of file +sudo: false +dist: trusty \ No newline at end of file diff --git a/pom.xml b/pom.xml index b4a1d10e..47069ad7 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ amazon-kinesis-client jar Amazon Kinesis Client Library for Java - 1.7.6 + 1.8.0-SNAPSHOT The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data from Amazon Kinesis. @@ -131,8 +131,8 @@ maven-compiler-plugin 3.2 - 1.7 - 1.7 + 1.8 + 1.8 UTF-8 From f697a094d9024f06081b60d75d2cc92b29796318 Mon Sep 17 00:00:00 2001 From: Justin Pfifer Date: Thu, 20 Jul 2017 12:07:21 -0700 Subject: [PATCH 2/2] Make the lease renewer thread pool size configurable (#177) Allow configuration of the lease renewer thread pool size. Users can now control the size of the pool. Additionally core threads are now a quarter of the thread pool size, and aren't allowed to timeout. --- .../worker/KinesisClientLibConfiguration.java | 39 +++++++++++++++++-- .../KinesisClientLibLeaseCoordinator.java | 3 +- .../clientlibrary/lib/worker/Worker.java | 2 + .../kinesis/leases/impl/LeaseCoordinator.java | 26 ++++++++----- 4 files changed, 55 insertions(+), 15 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index b708a30a..476a9e03 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -17,6 +17,8 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import java.util.Date; import java.util.Set; +import org.apache.commons.lang.Validate; + import com.amazonaws.ClientConfiguration; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.regions.RegionUtils; @@ -25,6 +27,8 @@ import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope; import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; import com.google.common.collect.ImmutableSet; +import lombok.Getter; + /** * Configuration for the Amazon Kinesis Client Library. */ @@ -155,10 +159,10 @@ public class KinesisClientLibConfiguration { */ public static final int DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY = 10; - /* - * The Worker will skip shard sync during initialization if there are one or more leases in the lease table. - * This assumes that the shards and leases are in-sync. - * This enables customers to choose faster startup times (e.g. during incremental deployments of an application). + /** + * The Worker will skip shard sync during initialization if there are one or more leases in the lease table. This + * assumes that the shards and leases are in-sync. This enables customers to choose faster startup times (e.g. + * during incremental deployments of an application). */ public static final boolean DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST = false; @@ -167,6 +171,11 @@ public class KinesisClientLibConfiguration { */ public static final ShardPrioritization DEFAULT_SHARD_PRIORITIZATION = new NoOpShardPrioritization(); + /** + * The size of the thread pool to create for the lease renewer to use. + */ + public static final int DEFAULT_MAX_LEASE_RENEWAL_THREADS = 20; + private String applicationName; private String tableName; private String streamName; @@ -204,6 +213,9 @@ public class KinesisClientLibConfiguration { private boolean skipShardSyncAtWorkerInitializationIfLeasesExist; private ShardPrioritization shardPrioritization; + @Getter + private int maxLeaseRenewalThreads = DEFAULT_MAX_LEASE_RENEWAL_THREADS; + /** * Constructor. * @@ -1075,4 +1087,23 @@ public class KinesisClientLibConfiguration { this.shardPrioritization = shardPrioritization; return this; } + + /** + * Sets the size of the thread pool that will be used to renew leases. + * + * Setting this to low may starve the lease renewal process, and cause the worker to lose leases at a higher rate. + * + * @param maxLeaseRenewalThreads + * the maximum size of the lease renewal thread pool + * @throws IllegalArgumentException + * if maxLeaseRenewalThreads is <= 0 + * @return this configuration object + */ + public KinesisClientLibConfiguration withMaxLeaseRenewalThreads(int maxLeaseRenewalThreads) { + Validate.isTrue(maxLeaseRenewalThreads > 2, + "The maximum number of lease renewal threads must be greater than or equal to 2."); + this.maxLeaseRenewalThreads = maxLeaseRenewalThreads; + + return this; + } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java index 59de31be..42fa7d0c 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java @@ -99,9 +99,10 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator { private static final int DEFAULT_MAX_LEASES_FOR_WORKER = Integer.MAX_VALUE; private static final int DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME = 1; - private static final ThreadFactory LEASE_COORDINATOR_THREAD_FACTORY = new NamedThreadFactory("LeaseCoordinator-"); - private static final ThreadFactory LEASE_RENEWAL_THREAD_FACTORY = new NamedThreadFactory("LeaseRenewer-"); - - // Package level access for testing. - static final int MAX_LEASE_RENEWAL_THREAD_COUNT = 20; - + private static final ThreadFactory LEASE_COORDINATOR_THREAD_FACTORY = new ThreadFactoryBuilder() + .setNameFormat("LeaseCoordinator-%04d").setDaemon(true).build(); + private static final ThreadFactory LEASE_RENEWAL_THREAD_FACTORY = new ThreadFactoryBuilder() + .setNameFormat("LeaseRenewer-%04d").setDaemon(true).build(); private final ILeaseRenewer leaseRenewer; private final ILeaseTaker leaseTaker; @@ -114,7 +115,8 @@ public class LeaseCoordinator { long epsilonMillis, IMetricsFactory metricsFactory) { this(leaseManager, workerIdentifier, leaseDurationMillis, epsilonMillis, - DEFAULT_MAX_LEASES_FOR_WORKER, DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME, metricsFactory); + DEFAULT_MAX_LEASES_FOR_WORKER, DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME, + KinesisClientLibConfiguration.DEFAULT_MAX_LEASE_RENEWAL_THREADS, metricsFactory); } /** @@ -134,8 +136,9 @@ public class LeaseCoordinator { long epsilonMillis, int maxLeasesForWorker, int maxLeasesToStealAtOneTime, + int maxLeaseRenewerThreadCount, IMetricsFactory metricsFactory) { - this.leaseRenewalThreadpool = getLeaseRenewalExecutorService(MAX_LEASE_RENEWAL_THREAD_COUNT); + this.leaseRenewalThreadpool = getLeaseRenewalExecutorService(maxLeaseRenewerThreadCount); this.leaseTaker = new LeaseTaker(leaseManager, workerIdentifier, leaseDurationMillis) .withMaxLeasesForWorker(maxLeasesForWorker) .withMaxLeasesToStealAtOneTime(maxLeasesToStealAtOneTime); @@ -366,6 +369,9 @@ public class LeaseCoordinator { * @return Executor service that should be used for lease renewal. */ private static ExecutorService getLeaseRenewalExecutorService(int maximumPoolSize) { - return Executors.newFixedThreadPool(maximumPoolSize, LEASE_RENEWAL_THREAD_FACTORY); + int coreLeaseCount = Math.max(maximumPoolSize / 4, 2); + + return new ThreadPoolExecutor(coreLeaseCount, maximumPoolSize, 60, TimeUnit.SECONDS, + new LinkedTransferQueue(), LEASE_RENEWAL_THREAD_FACTORY); } }