From 74c259ca1182dffa05fec6edf307f53429d39f62 Mon Sep 17 00:00:00 2001 From: "Gosalia, Manan" Date: Wed, 23 Mar 2016 12:22:24 -0700 Subject: [PATCH] Version 1.6.2 of the Amazon Kinesis Client Library --- META-INF/MANIFEST.MF | 4 +- README.md | 8 + pom.xml | 4 +- .../worker/KinesisClientLibConfiguration.java | 120 ++++++++- .../KinesisClientLibLeaseCoordinator.java | 62 ++++- .../lib/worker/ShardConsumer.java | 25 +- .../clientlibrary/lib/worker/Worker.java | 234 +++++++++++++----- .../kinesis/leases/impl/LeaseCoordinator.java | 109 ++++++-- .../kinesis/leases/impl/LeaseRenewer.java | 93 ++++++- .../kinesis/leases/impl/LeaseTaker.java | 121 +++++++-- .../impl/DefaultCWMetricsPublisher.java | 2 +- .../kinesis/metrics/impl/MetricsHelper.java | 25 +- .../ThreadSafeMetricsDelegatingFactory.java | 44 ++++ .../ThreadSafeMetricsDelegatingScope.java | 70 ++++++ .../multilang/MultiLangRecordProcessor.java | 20 +- 15 files changed, 809 insertions(+), 132 deletions(-) create mode 100644 src/main/java/com/amazonaws/services/kinesis/metrics/impl/ThreadSafeMetricsDelegatingFactory.java create mode 100644 src/main/java/com/amazonaws/services/kinesis/metrics/impl/ThreadSafeMetricsDelegatingScope.java diff --git a/META-INF/MANIFEST.MF b/META-INF/MANIFEST.MF index 008db351..304a299a 100644 --- a/META-INF/MANIFEST.MF +++ b/META-INF/MANIFEST.MF @@ -2,7 +2,7 @@ Manifest-Version: 1.0 Bundle-ManifestVersion: 2 Bundle-Name: Amazon Kinesis Client Library for Java Bundle-SymbolicName: com.amazonaws.kinesisclientlibrary;singleton:=true -Bundle-Version: 1.6.1 +Bundle-Version: 1.6.2 Bundle-Vendor: Amazon Technologies, Inc Bundle-RequiredExecutionEnvironment: JavaSE-1.7 Require-Bundle: org.apache.commons.codec;bundle-version="1.6", @@ -12,7 +12,7 @@ Require-Bundle: org.apache.commons.codec;bundle-version="1.6", com.fasterxml.jackson.core.jackson-annotations;bundle-version="2.5.0", org.apache.httpcomponents.httpcore;bundle-version="4.3.3", org.apache.httpcomponents.httpclient;bundle-version="4.3.6" - com.amazonaws.sdk;bundle-version="1.10.20", + com.amazonaws.sdk;bundle-version="1.10.61", Export-Package: com.amazonaws.services.kinesis, com.amazonaws.services.kinesis.clientlibrary, com.amazonaws.services.kinesis.clientlibrary.config, diff --git a/README.md b/README.md index 0d2617fa..b44bb167 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,14 @@ For producer-side developers using the **[Kinesis Producer Library (KPL)][kinesi To make it easier for developers to write record processors in other languages, we have implemented a Java based daemon, called MultiLangDaemon that does all the heavy lifting. Our approach has the daemon spawn a sub-process, which in turn runs the record processor, which can be written in any language. The MultiLangDaemon process and the record processor sub-process communicate with each other over [STDIN and STDOUT using a defined protocol][multi-lang-protocol]. There will be a one to one correspondence amongst record processors, child processes, and shards. For Python developers specifically, we have abstracted these implementation details away and [expose an interface][kclpy] that enables you to focus on writing record processing logic in Python. This approach enables KCL to be language agnostic, while providing identical features and similar parallel processing model across all languages. ## Release Notes +### Release 1.6.2 (March 23, 2016) +* Support for specifying max leases per worker and max leases to steal at a time. +* Support for specifying initial DynamoDB table read and write capacity. +* Support for parallel lease renewal. +* Support for graceful worker shutdown. +* Change DefaultCWMetricsPublisher log level to debug. [PR # 49](https://github.com/awslabs/amazon-kinesis-client/pull/49) +* Avoid NPE in MLD record processor shutdown if record processor was not initialized. [Issue # 29](https://github.com/awslabs/amazon-kinesis-client/issues/29) + ### Release 1.6.1 (September 23, 2015) * Expose [approximateArrivalTimestamp](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html) for Records in processRecords API call. diff --git a/pom.xml b/pom.xml index 18f0b2c6..c5c17f85 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ amazon-kinesis-client jar Amazon Kinesis Client Library for Java - 1.6.1 + 1.6.2 The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data from Amazon Kinesis. https://aws.amazon.com/kinesis @@ -23,7 +23,7 @@ - 1.10.20 + 1.10.61 diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index 9829c13e..510565f5 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Amazon Software License (the "License"). * You may not use this file except in compliance with the License. @@ -119,7 +119,7 @@ public class KinesisClientLibConfiguration { /** * User agent set when Amazon Kinesis Client Library makes AWS requests. */ - public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.6.1"; + public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.6.2"; /** * KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls @@ -127,6 +127,33 @@ public class KinesisClientLibConfiguration { */ public static final boolean DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING = true; + /** + * The max number of leases (shards) this worker should process. + * This can be useful to avoid overloading (and thrashing) a worker when a host has resource constraints + * or during deployment. + * NOTE: Setting this to a low value can cause data loss if workers are not able to pick up all shards in the + * stream due to the max limit. + */ + public static final int DEFAULT_MAX_LEASES_FOR_WORKER = Integer.MAX_VALUE; + + /** + * Max leases to steal from another worker at one time (for load balancing). + * Setting this to a higher number can allow for faster load convergence (e.g. during deployments, cold starts), + * but can cause higher churn in the system. + */ + public static final int DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME = 1; + + /** + * The Amazon DynamoDB table used for tracking leases will be provisioned with this read capacity. + */ + public static final int DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY = 10; + + /** + * The Amazon DynamoDB table used for tracking leases will be provisioned with this write capacity. + */ + public static final int DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY = 10; + + private String applicationName; private String streamName; private String kinesisEndpoint; @@ -153,6 +180,10 @@ public class KinesisClientLibConfiguration { private Set metricsEnabledDimensions; private boolean validateSequenceNumberBeforeCheckpointing; private String regionName; + private int maxLeasesForWorker; + private int maxLeasesToStealAtOneTime; + private int initialLeaseTableReadCapacity; + private int initialLeaseTableWriteCapacity; /** * Constructor. @@ -293,6 +324,10 @@ public class KinesisClientLibConfiguration { this.metricsEnabledDimensions = DEFAULT_METRICS_ENABLED_DIMENSIONS; this.validateSequenceNumberBeforeCheckpointing = validateSequenceNumberBeforeCheckpointing; this.regionName = regionName; + this.maxLeasesForWorker = DEFAULT_MAX_LEASES_FOR_WORKER; + this.maxLeasesToStealAtOneTime = DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME; + this.initialLeaseTableReadCapacity = DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY; + this.initialLeaseTableWriteCapacity = DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY; } // Check if value is positive, otherwise throw an exception @@ -508,6 +543,34 @@ public class KinesisClientLibConfiguration { return regionName; } + /** + * @return Max leases this Worker can handle at a time + */ + public int getMaxLeasesForWorker() { + return maxLeasesForWorker; + } + + /** + * @return Max leases to steal at one time (for load balancing) + */ + public int getMaxLeasesToStealAtOneTime() { + return maxLeasesToStealAtOneTime; + } + + /** + * @return Read capacity to provision when creating the lease table. + */ + public int getInitialLeaseTableReadCapacity() { + return initialLeaseTableReadCapacity; + } + + /** + * @return Write capacity to provision when creating the lease table. + */ + public int getInitialLeaseTableWriteCapacity() { + return initialLeaseTableWriteCapacity; + } + // CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 190 LINES /** * @param kinesisEndpoint Kinesis endpoint @@ -748,4 +811,57 @@ public class KinesisClientLibConfiguration { this.regionName = regionName; return this; } + + /** + * Worker will not acquire more than the specified max number of leases even if there are more + * shards that need to be processed. This can be used in scenarios where a worker is resource constrained or + * to prevent lease thrashing when small number of workers pick up all leases for small amount of time during + * deployment. + * Note that setting a low value may cause data loss (e.g. if there aren't enough Workers to make progress on all + * shards). When setting the value for this property, one must ensure enough workers are present to process + * shards and should consider future resharding, child shards that may be blocked on parent shards, some workers + * becoming unhealthy, etc. + * + * @param maxLeasesForWorker Max leases this Worker can handle at a time + * @return KinesisClientLibConfiguration + */ + public KinesisClientLibConfiguration withMaxLeasesForWorker(int maxLeasesForWorker) { + checkIsValuePositive("maxLeasesForWorker", maxLeasesForWorker); + this.maxLeasesForWorker = maxLeasesForWorker; + return this; + } + + /** + * Max leases to steal from a more loaded Worker at one time (for load balancing). + * Setting this to a higher number can allow for faster load convergence (e.g. during deployments, cold starts), + * but can cause higher churn in the system. + * + * @param maxLeasesToStealAtOneTime Steal up to this many leases at one time (for load balancing) + * @return KinesisClientLibConfiguration + */ + public KinesisClientLibConfiguration withMaxLeasesToStealAtOneTime(int maxLeasesToStealAtOneTime) { + checkIsValuePositive("maxLeasesToStealAtOneTime", maxLeasesToStealAtOneTime); + this.maxLeasesToStealAtOneTime = maxLeasesToStealAtOneTime; + return this; + } + + /** + * @param initialLeaseTableReadCapacity Read capacity to provision when creating the lease table. + * @return KinesisClientLibConfiguration + */ + public KinesisClientLibConfiguration withInitialLeaseTableReadCapacity(int initialLeaseTableReadCapacity) { + checkIsValuePositive("initialLeaseTableReadCapacity", initialLeaseTableReadCapacity); + this.initialLeaseTableReadCapacity = initialLeaseTableReadCapacity; + return this; + } + + /** + * @param initialLeaseTableWriteCapacity Write capacity to provision when creating the lease table. + * @return KinesisClientLibConfiguration + */ + public KinesisClientLibConfiguration withInitialLeaseTableWriteCapacity(int initialLeaseTableWriteCapacity) { + checkIsValuePositive("initialLeaseTableWriteCapacity", initialLeaseTableWriteCapacity); + this.initialLeaseTableWriteCapacity = initialLeaseTableWriteCapacity; + return this; + } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java index 36948a59..20ca4d90 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Amazon Software License (the "License"). * You may not use this file except in compliance with the License. @@ -44,9 +44,14 @@ import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; class KinesisClientLibLeaseCoordinator extends LeaseCoordinator implements ICheckpoint { private static final Log LOG = LogFactory.getLog(KinesisClientLibLeaseCoordinator.class); + + private static final long DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY = 10L; + private static final long DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY = 10L; + private final ILeaseManager leaseManager; - private final long initialLeaseTableReadCapacity = 10L; - private final long initialLeaseTableWriteCapacity = 10L; + + private long initialLeaseTableReadCapacity = DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY; + private long initialLeaseTableWriteCapacity = DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY; /** * @param leaseManager Lease manager which provides CRUD lease operations. @@ -78,6 +83,53 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator leaseManager, + String workerIdentifier, + long leaseDurationMillis, + long epsilonMillis, + int maxLeasesForWorker, + int maxLeasesToStealAtOneTime, + IMetricsFactory metricsFactory) { + super(leaseManager, workerIdentifier, leaseDurationMillis, epsilonMillis, maxLeasesForWorker, + maxLeasesToStealAtOneTime, metricsFactory); + this.leaseManager = leaseManager; + } + + /** + * @param readCapacity The DynamoDB table used for tracking leases will be provisioned with the specified initial + * read capacity + * @return KinesisClientLibLeaseCoordinator + */ + public KinesisClientLibLeaseCoordinator withInitialLeaseTableReadCapacity(long readCapacity) { + if (readCapacity <= 0) { + throw new IllegalArgumentException("readCapacity should be >= 1"); + } + this.initialLeaseTableReadCapacity = readCapacity; + return this; + } + + /** + * @param writeCapacity The DynamoDB table used for tracking leases will be provisioned with the specified initial + * write capacity + * @return KinesisClientLibLeaseCoordinator + */ + public KinesisClientLibLeaseCoordinator withInitialLeaseTableWriteCapacity(long writeCapacity) { + if (writeCapacity <= 0) { + throw new IllegalArgumentException("writeCapacity should be >= 1"); + } + this.initialLeaseTableWriteCapacity = writeCapacity; + return this; + } + /** * Sets the checkpoint for a shard and updates ownerSwitchesSinceCheckpoint. * @@ -173,7 +225,9 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator assignedShards = new HashSet(); @@ -359,8 +371,8 @@ public class Worker implements Runnable { wlog.resetInfoLogging(); } - LOG.info("Stopping LeaseCoordinator."); - leaseCoordinator.stop(); + finalShutdown(); + LOG.info("Worker loop is complete. Exiting from worker."); } private void initialize() { @@ -425,7 +437,7 @@ public class Worker implements Runnable { void cleanupShardConsumers(Set assignedShards) { for (ShardInfo shard : shardInfoShardConsumerMap.keySet()) { if (!assignedShards.contains(shard)) { - // Shutdown the consumer since we are not longer responsible for + // Shutdown the consumer since we are no longer responsible for // the shard. boolean isShutdown = shardInfoShardConsumerMap.get(shard).beginShutdown(); if (isShutdown) { @@ -459,11 +471,60 @@ public class Worker implements Runnable { } /** - * Sets the killed flag so this worker will stop on the next iteration of - * its loop. + * Signals worker to shutdown. Worker will try initiating shutdown of all record processors. Note that + * if executor services were passed to the worker by the user, worker will not attempt to shutdown + * those resources. */ public void shutdown() { - this.shutdown = true; + LOG.info("Worker shutdown requested."); + + // Set shutdown flag, so Worker.run can start shutdown process. + shutdown = true; + shutdownStartTimeMillis = System.currentTimeMillis(); + + // Stop lease coordinator, so leases are not renewed or stolen from other workers. + // Lost leases will force Worker to begin shutdown process for all shard consumers in + // Worker.run(). + leaseCoordinator.stop(); + } + + /** + * Perform final shutdown related tasks for the worker including shutting down worker owned + * executor services, threads, etc. + */ + private void finalShutdown() { + LOG.info("Starting worker's final shutdown."); + + if (executorService instanceof WorkerThreadPoolExecutor) { + // This should interrupt all active record processor tasks. + executorService.shutdownNow(); + } + if (metricsFactory instanceof WorkerCWMetricsFactory) { + ((CWMetricsFactory) metricsFactory).shutdown(); + } + } + + /** + * Returns whether worker can shutdown immediately. Note that this method is called from Worker's {{@link #run()} + * method before every loop run, so method must do minimum amount of work to not impact shard processing timings. + * @return Whether worker should shutdown immediately. + */ + private boolean shouldShutdown() { + if (executorService.isShutdown()) { + LOG.error("Worker executor service has been shutdown, so record processors cannot be shutdown."); + return true; + } + if (shutdown) { + if (shardInfoShardConsumerMap.isEmpty()) { + LOG.info("All record processors have been shutdown successfully."); + return true; + } + if ((System.currentTimeMillis() - shutdownStartTimeMillis) >= failoverTimeMillis) { + LOG.info("Lease failover time is reached, so forcing shutdown."); + return true; + } + } + return false; } /** @@ -475,33 +536,31 @@ public class Worker implements Runnable { * @return ShardConsumer for the shard */ ShardConsumer createOrGetShardConsumer(ShardInfo shardInfo, IRecordProcessorFactory factory) { - synchronized (shardInfoShardConsumerMap) { - ShardConsumer consumer = shardInfoShardConsumerMap.get(shardInfo); - // Instantiate a new consumer if we don't have one, or the one we - // had was from an earlier - // lease instance (and was shutdown). Don't need to create another - // one if the shard has been - // completely processed (shutdown reason terminate). - if ((consumer == null) - || (consumer.isShutdown() && consumer.getShutdownReason().equals(ShutdownReason.ZOMBIE))) { - IRecordProcessor recordProcessor = factory.createProcessor(); + ShardConsumer consumer = shardInfoShardConsumerMap.get(shardInfo); + // Instantiate a new consumer if we don't have one, or the one we + // had was from an earlier + // lease instance (and was shutdown). Don't need to create another + // one if the shard has been + // completely processed (shutdown reason terminate). + if ((consumer == null) + || (consumer.isShutdown() && consumer.getShutdownReason().equals(ShutdownReason.ZOMBIE))) { + IRecordProcessor recordProcessor = factory.createProcessor(); - consumer = - new ShardConsumer(shardInfo, - streamConfig, - checkpointTracker, - recordProcessor, - leaseCoordinator.getLeaseManager(), - parentShardPollIntervalMillis, - cleanupLeasesUponShardCompletion, - executorService, - metricsFactory, - taskBackoffTimeMillis); - shardInfoShardConsumerMap.put(shardInfo, consumer); - wlog.infoForce("Created new shardConsumer for : " + shardInfo); - } - return consumer; + consumer = + new ShardConsumer(shardInfo, + streamConfig, + checkpointTracker, + recordProcessor, + leaseCoordinator.getLeaseManager(), + parentShardPollIntervalMillis, + cleanupLeasesUponShardCompletion, + executorService, + metricsFactory, + taskBackoffTimeMillis); + shardInfoShardConsumerMap.put(shardInfo, consumer); + wlog.infoForce("Created new shardConsumer for : " + shardInfo); } + return consumer; } /** @@ -648,14 +707,65 @@ public class Worker implements Runnable { */ private static IMetricsFactory getMetricsFactory( AmazonCloudWatch cloudWatchClient, KinesisClientLibConfiguration config) { - return config.getMetricsLevel() == MetricsLevel.NONE - ? new NullMetricsFactory() : new CWMetricsFactory( - cloudWatchClient, - config.getApplicationName(), - config.getMetricsBufferTimeMillis(), - config.getMetricsMaxQueueSize(), - config.getMetricsLevel(), - config.getMetricsEnabledDimensions()); + IMetricsFactory metricsFactory; + if (config.getMetricsLevel() == MetricsLevel.NONE) { + metricsFactory = new NullMetricsFactory(); + } else { + if (config.getRegionName() != null) { + Region region = RegionUtils.getRegion(config.getRegionName()); + cloudWatchClient.setRegion(region); + LOG.debug("The region of Amazon CloudWatch client has been set to " + config.getRegionName()); + } + metricsFactory = new WorkerCWMetricsFactory( + cloudWatchClient, + config.getApplicationName(), + config.getMetricsBufferTimeMillis(), + config.getMetricsMaxQueueSize(), + config.getMetricsLevel(), + config.getMetricsEnabledDimensions()); + } + return metricsFactory; + } + + /** + * Returns default executor service that should be used by the worker. + * @return Default executor service that should be used by the worker. + */ + private static ExecutorService getExecutorService() { + return new WorkerThreadPoolExecutor(); + } + + /** + * Extension to CWMetricsFactory, so worker can identify whether it owns the metrics factory instance + * or not. + * Visible and non-final only for testing. + */ + static class WorkerCWMetricsFactory extends CWMetricsFactory { + + WorkerCWMetricsFactory(AmazonCloudWatch cloudWatchClient, + String namespace, + long bufferTimeMillis, + int maxQueueSize, + MetricsLevel metricsLevel, + Set metricsEnabledDimensions) { + super(cloudWatchClient, namespace, bufferTimeMillis, + maxQueueSize, metricsLevel, metricsEnabledDimensions); + } + } + + /** + * Extension to ThreadPoolExecutor, so worker can identify whether it owns the executor service instance + * or not. + * Visible and non-final only for testing. + */ + static class WorkerThreadPoolExecutor extends ThreadPoolExecutor { + private static final long DEFAULT_KEEP_ALIVE_TIME = 60L; + + WorkerThreadPoolExecutor() { + // Defaults are based on Executors.newCachedThreadPool() + super(0, Integer.MAX_VALUE, DEFAULT_KEEP_ALIVE_TIME, TimeUnit.SECONDS, + new SynchronousQueue()); + } } /** @@ -785,8 +895,9 @@ public class Worker implements Runnable { throw new IllegalArgumentException( "A Record Processor Factory needs to be provided to build Worker"); } + if (execService == null) { - execService = Executors.newCachedThreadPool(); + execService = getExecutorService(); } if (kinesisClient == null) { kinesisClient = new AmazonKinesisClient(config.getKinesisCredentialsProvider(), @@ -846,10 +957,15 @@ public class Worker implements Runnable { config.getWorkerIdentifier(), config.getFailoverTimeMillis(), config.getEpsilonMillis(), - metricsFactory), + config.getMaxLeasesForWorker(), + config.getMaxLeasesToStealAtOneTime(), + metricsFactory) + .withInitialLeaseTableReadCapacity(config.getInitialLeaseTableReadCapacity()) + .withInitialLeaseTableWriteCapacity(config.getInitialLeaseTableWriteCapacity()), execService, metricsFactory, - config.getTaskBackoffTimeMillis()); + config.getTaskBackoffTimeMillis(), + config.getFailoverTimeMillis()); } } diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCoordinator.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCoordinator.java index fb5670c1..68ea9f1a 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCoordinator.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCoordinator.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Amazon Software License (the "License"). * You may not use this file except in compliance with the License. @@ -17,9 +17,12 @@ package com.amazonaws.services.kinesis.leases.impl; import java.util.Collection; import java.util.Map; import java.util.UUID; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; @@ -58,21 +61,31 @@ public class LeaseCoordinator { // Time to wait for in-flight Runnables to finish when calling .stop(); private static final long STOP_WAIT_TIME_MILLIS = 2000L; - private static final ThreadFactory THREAD_FACTORY = new NamedThreadFactory("LeaseCoordinator-"); + private static final int DEFAULT_MAX_LEASES_FOR_WORKER = Integer.MAX_VALUE; + private static final int DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME = 1; + + private static final ThreadFactory LEASE_COORDINATOR_THREAD_FACTORY = new NamedThreadFactory("LeaseCoordinator-"); + private static final ThreadFactory LEASE_RENEWAL_THREAD_FACTORY = new NamedThreadFactory("LeaseRenewer-"); + + // Package level access for testing. + static final int MAX_LEASE_RENEWAL_THREAD_COUNT = 20; private final ILeaseRenewer leaseRenewer; private final ILeaseTaker leaseTaker; private final long renewerIntervalMillis; private final long takerIntervalMillis; + private final Object shutdownLock = new Object(); + protected final IMetricsFactory metricsFactory; - private ScheduledExecutorService threadpool; + private ScheduledExecutorService leaseCoordinatorThreadPool; + private ExecutorService leaseRenewalThreadpool; private volatile boolean running = false; /** * Constructor. - * + * * @param leaseManager LeaseManager instance to use * @param workerIdentifier Identifies the worker (e.g. useful to track lease ownership) * @param leaseDurationMillis Duration of a lease @@ -87,7 +100,7 @@ public class LeaseCoordinator { /** * Constructor. - * + * * @param leaseManager LeaseManager instance to use * @param workerIdentifier Identifies the worker (e.g. useful to track lease ownership) * @param leaseDurationMillis Duration of a lease @@ -99,17 +112,47 @@ public class LeaseCoordinator { long leaseDurationMillis, long epsilonMillis, IMetricsFactory metricsFactory) { - this.leaseTaker = new LeaseTaker(leaseManager, workerIdentifier, leaseDurationMillis); - this.leaseRenewer = new LeaseRenewer(leaseManager, workerIdentifier, leaseDurationMillis); + this(leaseManager, workerIdentifier, leaseDurationMillis, epsilonMillis, + DEFAULT_MAX_LEASES_FOR_WORKER, DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME, metricsFactory); + } + + /** + * Constructor. + * + * @param leaseManager LeaseManager instance to use + * @param workerIdentifier Identifies the worker (e.g. useful to track lease ownership) + * @param leaseDurationMillis Duration of a lease + * @param epsilonMillis Allow for some variance when calculating lease expirations + * @param maxLeasesForWorker Max leases this Worker can handle at a time + * @param maxLeasesToStealAtOneTime Steal up to these many leases at a time (for load balancing) + * @param metricsFactory Used to publish metrics about lease operations + */ + public LeaseCoordinator(ILeaseManager leaseManager, + String workerIdentifier, + long leaseDurationMillis, + long epsilonMillis, + int maxLeasesForWorker, + int maxLeasesToStealAtOneTime, + IMetricsFactory metricsFactory) { + this.leaseRenewalThreadpool = getLeaseRenewalExecutorService(MAX_LEASE_RENEWAL_THREAD_COUNT); + this.leaseTaker = new LeaseTaker(leaseManager, workerIdentifier, leaseDurationMillis) + .withMaxLeasesForWorker(maxLeasesForWorker) + .withMaxLeasesToStealAtOneTime(maxLeasesToStealAtOneTime); + this.leaseRenewer = new LeaseRenewer( + leaseManager, workerIdentifier, leaseDurationMillis, leaseRenewalThreadpool); this.renewerIntervalMillis = leaseDurationMillis / 3 - epsilonMillis; this.takerIntervalMillis = (leaseDurationMillis + epsilonMillis) * 2; this.metricsFactory = metricsFactory; - LOG.info(String.format("With failover time %dms and epsilon %dms, LeaseCoordinator will renew leases every %dms and take leases every %dms", + LOG.info(String.format( + "With failover time %d ms and epsilon %d ms, LeaseCoordinator will renew leases every %d ms, take" + + "leases every %d ms, process maximum of %d leases and steal %d lease(s) at a time.", leaseDurationMillis, epsilonMillis, renewerIntervalMillis, - takerIntervalMillis)); + takerIntervalMillis, + maxLeasesForWorker, + maxLeasesToStealAtOneTime)); } private class TakerRunnable implements Runnable { @@ -150,14 +193,14 @@ public class LeaseCoordinator { */ public void start() throws DependencyException, InvalidStateException, ProvisionedThroughputException { leaseRenewer.initialize(); - + // 2 because we know we'll have at most 2 concurrent tasks at a time. - threadpool = Executors.newScheduledThreadPool(2, THREAD_FACTORY); + leaseCoordinatorThreadPool = Executors.newScheduledThreadPool(2, LEASE_COORDINATOR_THREAD_FACTORY); // Taker runs with fixed DELAY because we want it to run slower in the event of performance degredation. - threadpool.scheduleWithFixedDelay(new TakerRunnable(), 0L, takerIntervalMillis, TimeUnit.MILLISECONDS); + leaseCoordinatorThreadPool.scheduleWithFixedDelay(new TakerRunnable(), 0L, takerIntervalMillis, TimeUnit.MILLISECONDS); // Renewer runs at fixed INTERVAL because we want it to run at the same rate in the event of degredation. - threadpool.scheduleAtFixedRate(new RenewerRunnable(), 0L, renewerIntervalMillis, TimeUnit.MILLISECONDS); + leaseCoordinatorThreadPool.scheduleAtFixedRate(new RenewerRunnable(), 0L, renewerIntervalMillis, TimeUnit.MILLISECONDS); running = true; } @@ -175,7 +218,13 @@ public class LeaseCoordinator { try { Map takenLeases = leaseTaker.takeLeases(); - leaseRenewer.addLeasesToRenew(takenLeases.values()); + // Only add taken leases to renewer if coordinator is still running. + synchronized (shutdownLock) { + if (running) { + leaseRenewer.addLeasesToRenew(takenLeases.values()); + } + } + success = true; } finally { scope.addDimension(WORKER_IDENTIFIER_METRIC, getWorkerIdentifier()); @@ -229,16 +278,19 @@ public class LeaseCoordinator { } /** - * Stops background threads. + * Stops background threads and waits for {@link #STOP_WAIT_TIME_MILLIS} for all background tasks to complete. + * If tasks are not completed after this time, method will shutdown thread pool forcefully and return. */ public void stop() { - if (threadpool != null) { - threadpool.shutdown(); + if (leaseCoordinatorThreadPool != null) { + leaseCoordinatorThreadPool.shutdown(); try { - if (threadpool.awaitTermination(STOP_WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)) { - LOG.info(String.format("Worker %s has successfully stopped lease-tracking threads", leaseTaker.getWorkerIdentifier())); + if (leaseCoordinatorThreadPool.awaitTermination(STOP_WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)) { + LOG.info(String.format("Worker %s has successfully stopped lease-tracking threads", + leaseTaker.getWorkerIdentifier())); } else { - threadpool.shutdownNow(); + leaseCoordinatorThreadPool.shutdownNow(); + leaseRenewalThreadpool.shutdownNow(); LOG.info(String.format("Worker %s stopped lease-tracking threads %dms after stop", leaseTaker.getWorkerIdentifier(), STOP_WAIT_TIME_MILLIS)); @@ -250,8 +302,10 @@ public class LeaseCoordinator { LOG.debug("Threadpool was null, no need to shutdown/terminate threadpool."); } - leaseRenewer.clearCurrentlyHeldLeases(); - running = false; + synchronized (shutdownLock) { + leaseRenewer.clearCurrentlyHeldLeases(); + running = false; + } } /** @@ -278,4 +332,15 @@ public class LeaseCoordinator { return leaseRenewer.updateLease(lease, concurrencyToken); } + /** + * Returns executor service that should be used for lease renewal. + * @param maximumPoolSize Maximum allowed thread pool size + * @return Executor service that should be used for lease renewal. + */ + private static ExecutorService getLeaseRenewalExecutorService(int maximumPoolSize) { + ThreadPoolExecutor exec = new ThreadPoolExecutor(maximumPoolSize, maximumPoolSize, + 60L, TimeUnit.SECONDS, new LinkedBlockingQueue(), LEASE_RENEWAL_THREAD_FACTORY); + exec.allowCoreThreadTimeOut(true); + return exec; + } } diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseRenewer.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseRenewer.java index 009467c5..ae8040a5 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseRenewer.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseRenewer.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Amazon Software License (the "License"). * You may not use this file except in compliance with the License. @@ -14,14 +14,19 @@ */ package com.amazonaws.services.kinesis.leases.impl; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; @@ -34,6 +39,8 @@ import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputExc import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager; import com.amazonaws.services.kinesis.leases.interfaces.ILeaseRenewer; import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper; +import com.amazonaws.services.kinesis.metrics.impl.ThreadSafeMetricsDelegatingScope; +import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope; import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; /** @@ -48,6 +55,7 @@ public class LeaseRenewer implements ILeaseRenewer { private final ConcurrentNavigableMap ownedLeases = new ConcurrentSkipListMap(); private final String workerIdentifier; private final long leaseDurationNanos; + private final ExecutorService executorService; /** * Constructor. @@ -55,11 +63,14 @@ public class LeaseRenewer implements ILeaseRenewer { * @param leaseManager LeaseManager to use * @param workerIdentifier identifier of this worker * @param leaseDurationMillis duration of a lease in milliseconds + * @param executorService ExecutorService to use for renewing leases in parallel */ - public LeaseRenewer(ILeaseManager leaseManager, String workerIdentifier, long leaseDurationMillis) { + public LeaseRenewer(ILeaseManager leaseManager, String workerIdentifier, long leaseDurationMillis, + ExecutorService executorService) { this.leaseManager = leaseManager; this.workerIdentifier = workerIdentifier; this.leaseDurationNanos = TimeUnit.MILLISECONDS.toNanos(leaseDurationMillis); + this.executorService = executorService; } /** @@ -76,24 +87,79 @@ public class LeaseRenewer implements ILeaseRenewer { ownedLeases)); } + /* + * Lease renewals are done in parallel so many leases can be renewed for short lease fail over time + * configuration. In this case, metrics scope is also shared across different threads, so scope must be thread + * safe. + */ + IMetricsScope renewLeaseTaskMetricsScope = new ThreadSafeMetricsDelegatingScope( + MetricsHelper.getMetricsScope()); + /* * We iterate in descending order here so that the synchronized(lease) inside renewLease doesn't "lead" calls * to getCurrentlyHeldLeases. They'll still cross paths, but they won't interleave their executions. */ int lostLeases = 0; + List> renewLeaseTasks = new ArrayList>(); for (T lease : ownedLeases.descendingMap().values()) { - if (!renewLease(lease)) { - lostLeases++; + renewLeaseTasks.add(executorService.submit(new RenewLeaseTask(lease, renewLeaseTaskMetricsScope))); + } + int leasesInUnknownState = 0; + Exception lastException = null; + for (Future renewLeaseTask : renewLeaseTasks) { + try { + if (!renewLeaseTask.get()) { + lostLeases++; + } + } catch (InterruptedException e) { + LOG.info("Interrupted while waiting for a lease to renew."); + leasesInUnknownState += 1; + Thread.currentThread().interrupt(); + } catch (ExecutionException e) { + LOG.error("Encountered an exception while renewing a lease.", e.getCause()); + leasesInUnknownState += 1; + lastException = e; } } - MetricsHelper.getMetricsScope().addData( + renewLeaseTaskMetricsScope.addData( "LostLeases", lostLeases, StandardUnit.Count, MetricsLevel.SUMMARY); - MetricsHelper.getMetricsScope().addData( + renewLeaseTaskMetricsScope.addData( "CurrentLeases", ownedLeases.size(), StandardUnit.Count, MetricsLevel.SUMMARY); + if (leasesInUnknownState > 0) { + throw new DependencyException(String.format("Encountered an exception while renewing leases. " + + "The number of leases which might not have been renewed is %d", + leasesInUnknownState), + lastException); + } + } + + private class RenewLeaseTask implements Callable { + + private final T lease; + private final IMetricsScope metricsScope; + + public RenewLeaseTask(T lease, IMetricsScope metricsScope) { + this.lease = lease; + this.metricsScope = metricsScope; + } + + @Override + public Boolean call() throws Exception { + MetricsHelper.setMetricsScope(metricsScope); + try { + return renewLease(lease); + } finally { + MetricsHelper.unsetMetricsScope(); + } + } } private boolean renewLease(T lease) throws DependencyException, InvalidStateException { + return renewLease(lease, false); + } + + private boolean renewLease(T lease, boolean renewEvenIfExpired) throws DependencyException, InvalidStateException { String leaseKey = lease.getLeaseKey(); boolean success = false; @@ -103,7 +169,12 @@ public class LeaseRenewer implements ILeaseRenewer { for (int i = 1; i <= RENEWAL_RETRIES; i++) { try { synchronized (lease) { - renewedLease = leaseManager.renewLease(lease); + // Don't renew expired lease during regular renewals. getCopyOfHeldLease may have returned null + // triggering the application processing to treat this as a lost lease (fail checkpoint with + // ShutdownException). + if (renewEvenIfExpired || (!lease.isExpired(leaseDurationNanos, System.nanoTime()))) { + renewedLease = leaseManager.renewLease(lease); + } if (renewedLease) { lease.setLastCounterIncrementNanos(System.nanoTime()); } @@ -305,11 +376,15 @@ public class LeaseRenewer implements ILeaseRenewer { public void initialize() throws DependencyException, InvalidStateException, ProvisionedThroughputException { Collection leases = leaseManager.listLeases(); List myLeases = new LinkedList(); + boolean renewEvenIfExpired = true; for (T lease : leases) { if (workerIdentifier.equals(lease.getLeaseOwner())) { LOG.info(String.format(" Worker %s found lease %s", workerIdentifier, lease)); - if (renewLease(lease)) { + // Okay to renew even if lease is expired, because we start with an empty list and we add the lease to + // our list only after a successful renew. So we don't need to worry about the edge case where we could + // continue renewing a lease after signaling a lease loss to the application. + if (renewLease(lease, renewEvenIfExpired)) { myLeases.add(lease); } } else { @@ -319,7 +394,7 @@ public class LeaseRenewer implements ILeaseRenewer { addLeasesToRenew(myLeases); } - + private void verifyNotNull(Object object, String message) { if (object == null) { throw new IllegalArgumentException(message); diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseTaker.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseTaker.java index c0323e96..42a3995a 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseTaker.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseTaker.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Amazon Software License (the "License"). * You may not use this file except in compliance with the License. @@ -22,7 +22,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Random; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; @@ -63,8 +62,9 @@ public class LeaseTaker implements ILeaseTaker { private final String workerIdentifier; private final Map allLeases = new HashMap(); private final long leaseDurationNanos; + private int maxLeasesForWorker = Integer.MAX_VALUE; + private int maxLeasesToStealAtOneTime = 1; - private Random random = new Random(); private long lastScanTimeNanos = 0L; public LeaseTaker(ILeaseManager leaseManager, String workerIdentifier, long leaseDurationMillis) { @@ -73,6 +73,43 @@ public class LeaseTaker implements ILeaseTaker { this.leaseDurationNanos = TimeUnit.MILLISECONDS.toNanos(leaseDurationMillis); } + /** + * Worker will not acquire more than the specified max number of leases even if there are more + * shards that need to be processed. This can be used in scenarios where a worker is resource constrained or + * to prevent lease thrashing when small number of workers pick up all leases for small amount of time during + * deployment. + * Note that setting a low value may cause data loss (e.g. if there aren't enough Workers to make progress on all + * shards). When setting the value for this property, one must ensure enough workers are present to process + * shards and should consider future resharding, child shards that may be blocked on parent shards, some workers + * becoming unhealthy, etc. + * + * @param maxLeasesForWorker Max leases this Worker can handle at a time + * @return LeaseTaker + */ + public LeaseTaker withMaxLeasesForWorker(int maxLeasesForWorker) { + if (maxLeasesForWorker <= 0) { + throw new IllegalArgumentException("maxLeasesForWorker should be >= 1"); + } + this.maxLeasesForWorker = maxLeasesForWorker; + return this; + } + + /** + * Max leases to steal from a more loaded Worker at one time (for load balancing). + * Setting this to a higher number can allow for faster load convergence (e.g. during deployments, cold starts), + * but can cause higher churn in the system. + * + * @param maxLeasesToStealAtOneTime Steal up to this many leases at one time (for load balancing) + * @return LeaseTaker + */ + public LeaseTaker withMaxLeasesToStealAtOneTime(int maxLeasesToStealAtOneTime) { + if (maxLeasesToStealAtOneTime <= 0) { + throw new IllegalArgumentException("maxLeasesToStealAtOneTime should be >= 1"); + } + this.maxLeasesToStealAtOneTime = maxLeasesToStealAtOneTime; + return this; + } + /** * {@inheritDoc} */ @@ -293,6 +330,7 @@ public class LeaseTaker implements ILeaseTaker { private Set computeLeasesToTake(List expiredLeases) { Map leaseCounts = computeLeaseCounts(expiredLeases); Set leasesToTake = new HashSet(); + IMetricsScope metrics = MetricsHelper.getMetricsScope(); int numLeases = allLeases.size(); int numWorkers = leaseCounts.size(); @@ -313,6 +351,22 @@ public class LeaseTaker implements ILeaseTaker { * Our target for each worker is numLeases / numWorkers (+1 if numWorkers doesn't evenly divide numLeases) */ target = numLeases / numWorkers + (numLeases % numWorkers == 0 ? 0 : 1); + + // Spill over is the number of leases this worker should have claimed, but did not because it would + // exceed the max allowed for this worker. + int leaseSpillover = Math.max(0, target - maxLeasesForWorker); + if (target > maxLeasesForWorker) { + LOG.warn(String.format("Worker %s target is %d leases and maxLeasesForWorker is %d." + + " Resetting target to %d, lease spillover is %d. " + + " Note that some shards may not be processed if no other workers are able to pick them up.", + workerIdentifier, + target, + maxLeasesForWorker, + maxLeasesForWorker, + leaseSpillover)); + target = maxLeasesForWorker; + } + metrics.addData("LeaseSpillover", leaseSpillover, StandardUnit.Count, MetricsLevel.SUMMARY); } int myCount = leaseCounts.get(workerIdentifier); @@ -333,9 +387,9 @@ public class LeaseTaker implements ILeaseTaker { leasesToTake.add(expiredLeases.remove(0)); } } else { - // If there are no expired leases and we need a lease, consider stealing one - T leaseToSteal = chooseLeaseToSteal(leaseCounts, numLeasesToReachTarget, target); - if (leaseToSteal != null) { + // If there are no expired leases and we need a lease, consider stealing. + List leasesToSteal = chooseLeasesToSteal(leaseCounts, numLeasesToReachTarget, target); + for (T leaseToSteal : leasesToSteal) { LOG.info(String.format("Worker %s needed %d leases but none were expired, so it will steal lease %s from %s", workerIdentifier, numLeasesToReachTarget, @@ -356,8 +410,7 @@ public class LeaseTaker implements ILeaseTaker { myCount, leasesToTake.size())); } - - IMetricsScope metrics = MetricsHelper.getMetricsScope(); + metrics.addData("TotalLeases", numLeases, StandardUnit.Count, MetricsLevel.DETAILED); metrics.addData("ExpiredLeases", originalExpiredLeasesSize, StandardUnit.Count, MetricsLevel.SUMMARY); metrics.addData("NumWorkers", numWorkers, StandardUnit.Count, MetricsLevel.SUMMARY); @@ -368,17 +421,21 @@ public class LeaseTaker implements ILeaseTaker { } /** - * Choose a lease to steal by randomly selecting one from the most loaded worker. Stealing rules: + * Choose leases to steal by randomly selecting one or more (up to max) from the most loaded worker. + * Stealing rules: * - * Steal one lease from the most loaded worker if - * a) he has > target leases and I need >= 1 leases - * b) he has == target leases and I need > 1 leases + * Steal up to maxLeasesToStealAtOneTime leases from the most loaded worker if + * a) he has > target leases and I need >= 1 leases : steal min(leases needed, maxLeasesToStealAtOneTime) + * b) he has == target leases and I need > 1 leases : steal 1 * * @param leaseCounts map of workerIdentifier to lease count + * @param needed # of leases needed to reach the target leases for the worker * @param target target # of leases per worker - * @return Lease to steal, or null if we should not steal + * @return Leases to steal, or empty list if we should not steal */ - private T chooseLeaseToSteal(Map leaseCounts, int needed, int target) { + private List chooseLeasesToSteal(Map leaseCounts, int needed, int target) { + List leasesToSteal = new ArrayList<>(); + Entry mostLoadedWorker = null; // Find the most loaded worker for (Entry worker : leaseCounts.entrySet()) { @@ -387,7 +444,18 @@ public class LeaseTaker implements ILeaseTaker { } } - if (mostLoadedWorker.getValue() < target + (needed > 1 ? 0 : 1)) { + int numLeasesToSteal = 0; + if ((mostLoadedWorker.getValue() >= target) && (needed > 0)) { + int leasesOverTarget = mostLoadedWorker.getValue() - target; + numLeasesToSteal = Math.min(needed, leasesOverTarget); + // steal 1 if we need > 1 and max loaded worker has target leases. + if ((needed > 1) && (numLeasesToSteal == 0)) { + numLeasesToSteal = 1; + } + numLeasesToSteal = Math.min(numLeasesToSteal, maxLeasesToStealAtOneTime); + } + + if (numLeasesToSteal <= 0) { if (LOG.isDebugEnabled()) { LOG.debug(String.format("Worker %s not stealing from most loaded worker %s. He has %d," + " target is %d, and I need %d", @@ -397,8 +465,18 @@ public class LeaseTaker implements ILeaseTaker { target, needed)); } - - return null; + return leasesToSteal; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Worker %s will attempt to steal %d leases from most loaded worker %s. " + + " He has %d leases, target is %d, I need %d, maxLeasesToSteatAtOneTime is %d.", + workerIdentifier, + mostLoadedWorker.getKey(), + mostLoadedWorker.getValue(), + target, + needed, + maxLeasesToStealAtOneTime)); + } } String mostLoadedWorkerIdentifier = mostLoadedWorker.getKey(); @@ -410,9 +488,12 @@ public class LeaseTaker implements ILeaseTaker { } } - // Return a random one - int randomIndex = random.nextInt(candidates.size()); - return candidates.get(randomIndex); + // Return random ones + Collections.shuffle(candidates); + int toIndex = Math.min(candidates.size(), numLeasesToSteal); + leasesToSteal.addAll(candidates.subList(0, toIndex)); + + return leasesToSteal; } /** diff --git a/src/main/java/com/amazonaws/services/kinesis/metrics/impl/DefaultCWMetricsPublisher.java b/src/main/java/com/amazonaws/services/kinesis/metrics/impl/DefaultCWMetricsPublisher.java index 6cab88c4..76ae7a05 100644 --- a/src/main/java/com/amazonaws/services/kinesis/metrics/impl/DefaultCWMetricsPublisher.java +++ b/src/main/java/com/amazonaws/services/kinesis/metrics/impl/DefaultCWMetricsPublisher.java @@ -62,7 +62,7 @@ public class DefaultCWMetricsPublisher implements ICWMetricsPublisher stderrReadTask; @@ -115,8 +118,7 @@ public class MultiLangRecordProcessor implements IRecordProcessor { * The process builder has thrown an exception while starting the child process so we would like to shut * down */ - stopProcessing("Failed to start client executable", e); - return; + throw new IOException("Failed to start client executable", e); } // Initialize all of our utility objects that will handle interacting with the process over // STDIN/STDOUT/STDERR @@ -131,7 +133,10 @@ public class MultiLangRecordProcessor implements IRecordProcessor { if (!protocol.initialize()) { throw new RuntimeException("Failed to initialize child process"); } + + initialized = true; } catch (Throwable t) { + // Any exception in initialize results in MultiLangDaemon shutdown. stopProcessing("Encountered an error while trying to initialize record processor", t); } } @@ -149,6 +154,15 @@ public class MultiLangRecordProcessor implements IRecordProcessor { @Override public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) { + // In cases where KCL loses lease for the shard after creating record processor instance but before + // record processor initialize() is called, then shutdown() may be called directly before initialize(). + if (!initialized) { + LOG.info("Record processor was not initialized and will not have a child process, " + + "so not invoking child process shutdown."); + this.state = ProcessState.SHUTDOWN; + return; + } + try { if (ProcessState.ACTIVE.equals(this.state)) { if (!protocol.shutdown(checkpointer, reason)) {