Version 1.6.2 of the Amazon Kinesis Client Library
This commit is contained in:
parent
c6e393c13e
commit
74c259ca11
15 changed files with 809 additions and 132 deletions
|
|
@ -2,7 +2,7 @@ Manifest-Version: 1.0
|
|||
Bundle-ManifestVersion: 2
|
||||
Bundle-Name: Amazon Kinesis Client Library for Java
|
||||
Bundle-SymbolicName: com.amazonaws.kinesisclientlibrary;singleton:=true
|
||||
Bundle-Version: 1.6.1
|
||||
Bundle-Version: 1.6.2
|
||||
Bundle-Vendor: Amazon Technologies, Inc
|
||||
Bundle-RequiredExecutionEnvironment: JavaSE-1.7
|
||||
Require-Bundle: org.apache.commons.codec;bundle-version="1.6",
|
||||
|
|
@ -12,7 +12,7 @@ Require-Bundle: org.apache.commons.codec;bundle-version="1.6",
|
|||
com.fasterxml.jackson.core.jackson-annotations;bundle-version="2.5.0",
|
||||
org.apache.httpcomponents.httpcore;bundle-version="4.3.3",
|
||||
org.apache.httpcomponents.httpclient;bundle-version="4.3.6"
|
||||
com.amazonaws.sdk;bundle-version="1.10.20",
|
||||
com.amazonaws.sdk;bundle-version="1.10.61",
|
||||
Export-Package: com.amazonaws.services.kinesis,
|
||||
com.amazonaws.services.kinesis.clientlibrary,
|
||||
com.amazonaws.services.kinesis.clientlibrary.config,
|
||||
|
|
|
|||
|
|
@ -29,6 +29,14 @@ For producer-side developers using the **[Kinesis Producer Library (KPL)][kinesi
|
|||
To make it easier for developers to write record processors in other languages, we have implemented a Java based daemon, called MultiLangDaemon that does all the heavy lifting. Our approach has the daemon spawn a sub-process, which in turn runs the record processor, which can be written in any language. The MultiLangDaemon process and the record processor sub-process communicate with each other over [STDIN and STDOUT using a defined protocol][multi-lang-protocol]. There will be a one to one correspondence amongst record processors, child processes, and shards. For Python developers specifically, we have abstracted these implementation details away and [expose an interface][kclpy] that enables you to focus on writing record processing logic in Python. This approach enables KCL to be language agnostic, while providing identical features and similar parallel processing model across all languages.
|
||||
|
||||
## Release Notes
|
||||
### Release 1.6.2 (March 23, 2016)
|
||||
* Support for specifying max leases per worker and max leases to steal at a time.
|
||||
* Support for specifying initial DynamoDB table read and write capacity.
|
||||
* Support for parallel lease renewal.
|
||||
* Support for graceful worker shutdown.
|
||||
* Change DefaultCWMetricsPublisher log level to debug. [PR # 49](https://github.com/awslabs/amazon-kinesis-client/pull/49)
|
||||
* Avoid NPE in MLD record processor shutdown if record processor was not initialized. [Issue # 29](https://github.com/awslabs/amazon-kinesis-client/issues/29)
|
||||
|
||||
### Release 1.6.1 (September 23, 2015)
|
||||
* Expose [approximateArrivalTimestamp](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html) for Records in processRecords API call.
|
||||
|
||||
|
|
|
|||
4
pom.xml
4
pom.xml
|
|
@ -6,7 +6,7 @@
|
|||
<artifactId>amazon-kinesis-client</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
<name>Amazon Kinesis Client Library for Java</name>
|
||||
<version>1.6.1</version>
|
||||
<version>1.6.2</version>
|
||||
<description>The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data from Amazon Kinesis.</description>
|
||||
<url>https://aws.amazon.com/kinesis</url>
|
||||
|
||||
|
|
@ -23,7 +23,7 @@
|
|||
</licenses>
|
||||
|
||||
<properties>
|
||||
<aws-java-sdk.version>1.10.20</aws-java-sdk.version>
|
||||
<aws-java-sdk.version>1.10.61</aws-java-sdk.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
|
|
@ -119,7 +119,7 @@ public class KinesisClientLibConfiguration {
|
|||
/**
|
||||
* User agent set when Amazon Kinesis Client Library makes AWS requests.
|
||||
*/
|
||||
public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.6.1";
|
||||
public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.6.2";
|
||||
|
||||
/**
|
||||
* KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls
|
||||
|
|
@ -127,6 +127,33 @@ public class KinesisClientLibConfiguration {
|
|||
*/
|
||||
public static final boolean DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING = true;
|
||||
|
||||
/**
|
||||
* The max number of leases (shards) this worker should process.
|
||||
* This can be useful to avoid overloading (and thrashing) a worker when a host has resource constraints
|
||||
* or during deployment.
|
||||
* NOTE: Setting this to a low value can cause data loss if workers are not able to pick up all shards in the
|
||||
* stream due to the max limit.
|
||||
*/
|
||||
public static final int DEFAULT_MAX_LEASES_FOR_WORKER = Integer.MAX_VALUE;
|
||||
|
||||
/**
|
||||
* Max leases to steal from another worker at one time (for load balancing).
|
||||
* Setting this to a higher number can allow for faster load convergence (e.g. during deployments, cold starts),
|
||||
* but can cause higher churn in the system.
|
||||
*/
|
||||
public static final int DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME = 1;
|
||||
|
||||
/**
|
||||
* The Amazon DynamoDB table used for tracking leases will be provisioned with this read capacity.
|
||||
*/
|
||||
public static final int DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY = 10;
|
||||
|
||||
/**
|
||||
* The Amazon DynamoDB table used for tracking leases will be provisioned with this write capacity.
|
||||
*/
|
||||
public static final int DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY = 10;
|
||||
|
||||
|
||||
private String applicationName;
|
||||
private String streamName;
|
||||
private String kinesisEndpoint;
|
||||
|
|
@ -153,6 +180,10 @@ public class KinesisClientLibConfiguration {
|
|||
private Set<String> metricsEnabledDimensions;
|
||||
private boolean validateSequenceNumberBeforeCheckpointing;
|
||||
private String regionName;
|
||||
private int maxLeasesForWorker;
|
||||
private int maxLeasesToStealAtOneTime;
|
||||
private int initialLeaseTableReadCapacity;
|
||||
private int initialLeaseTableWriteCapacity;
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
|
|
@ -293,6 +324,10 @@ public class KinesisClientLibConfiguration {
|
|||
this.metricsEnabledDimensions = DEFAULT_METRICS_ENABLED_DIMENSIONS;
|
||||
this.validateSequenceNumberBeforeCheckpointing = validateSequenceNumberBeforeCheckpointing;
|
||||
this.regionName = regionName;
|
||||
this.maxLeasesForWorker = DEFAULT_MAX_LEASES_FOR_WORKER;
|
||||
this.maxLeasesToStealAtOneTime = DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME;
|
||||
this.initialLeaseTableReadCapacity = DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY;
|
||||
this.initialLeaseTableWriteCapacity = DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY;
|
||||
}
|
||||
|
||||
// Check if value is positive, otherwise throw an exception
|
||||
|
|
@ -508,6 +543,34 @@ public class KinesisClientLibConfiguration {
|
|||
return regionName;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Max leases this Worker can handle at a time
|
||||
*/
|
||||
public int getMaxLeasesForWorker() {
|
||||
return maxLeasesForWorker;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Max leases to steal at one time (for load balancing)
|
||||
*/
|
||||
public int getMaxLeasesToStealAtOneTime() {
|
||||
return maxLeasesToStealAtOneTime;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Read capacity to provision when creating the lease table.
|
||||
*/
|
||||
public int getInitialLeaseTableReadCapacity() {
|
||||
return initialLeaseTableReadCapacity;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Write capacity to provision when creating the lease table.
|
||||
*/
|
||||
public int getInitialLeaseTableWriteCapacity() {
|
||||
return initialLeaseTableWriteCapacity;
|
||||
}
|
||||
|
||||
// CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 190 LINES
|
||||
/**
|
||||
* @param kinesisEndpoint Kinesis endpoint
|
||||
|
|
@ -748,4 +811,57 @@ public class KinesisClientLibConfiguration {
|
|||
this.regionName = regionName;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Worker will not acquire more than the specified max number of leases even if there are more
|
||||
* shards that need to be processed. This can be used in scenarios where a worker is resource constrained or
|
||||
* to prevent lease thrashing when small number of workers pick up all leases for small amount of time during
|
||||
* deployment.
|
||||
* Note that setting a low value may cause data loss (e.g. if there aren't enough Workers to make progress on all
|
||||
* shards). When setting the value for this property, one must ensure enough workers are present to process
|
||||
* shards and should consider future resharding, child shards that may be blocked on parent shards, some workers
|
||||
* becoming unhealthy, etc.
|
||||
*
|
||||
* @param maxLeasesForWorker Max leases this Worker can handle at a time
|
||||
* @return KinesisClientLibConfiguration
|
||||
*/
|
||||
public KinesisClientLibConfiguration withMaxLeasesForWorker(int maxLeasesForWorker) {
|
||||
checkIsValuePositive("maxLeasesForWorker", maxLeasesForWorker);
|
||||
this.maxLeasesForWorker = maxLeasesForWorker;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Max leases to steal from a more loaded Worker at one time (for load balancing).
|
||||
* Setting this to a higher number can allow for faster load convergence (e.g. during deployments, cold starts),
|
||||
* but can cause higher churn in the system.
|
||||
*
|
||||
* @param maxLeasesToStealAtOneTime Steal up to this many leases at one time (for load balancing)
|
||||
* @return KinesisClientLibConfiguration
|
||||
*/
|
||||
public KinesisClientLibConfiguration withMaxLeasesToStealAtOneTime(int maxLeasesToStealAtOneTime) {
|
||||
checkIsValuePositive("maxLeasesToStealAtOneTime", maxLeasesToStealAtOneTime);
|
||||
this.maxLeasesToStealAtOneTime = maxLeasesToStealAtOneTime;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param initialLeaseTableReadCapacity Read capacity to provision when creating the lease table.
|
||||
* @return KinesisClientLibConfiguration
|
||||
*/
|
||||
public KinesisClientLibConfiguration withInitialLeaseTableReadCapacity(int initialLeaseTableReadCapacity) {
|
||||
checkIsValuePositive("initialLeaseTableReadCapacity", initialLeaseTableReadCapacity);
|
||||
this.initialLeaseTableReadCapacity = initialLeaseTableReadCapacity;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param initialLeaseTableWriteCapacity Write capacity to provision when creating the lease table.
|
||||
* @return KinesisClientLibConfiguration
|
||||
*/
|
||||
public KinesisClientLibConfiguration withInitialLeaseTableWriteCapacity(int initialLeaseTableWriteCapacity) {
|
||||
checkIsValuePositive("initialLeaseTableWriteCapacity", initialLeaseTableWriteCapacity);
|
||||
this.initialLeaseTableWriteCapacity = initialLeaseTableWriteCapacity;
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
|
|
@ -44,9 +44,14 @@ import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
|
|||
class KinesisClientLibLeaseCoordinator extends LeaseCoordinator<KinesisClientLease> implements ICheckpoint {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(KinesisClientLibLeaseCoordinator.class);
|
||||
|
||||
private static final long DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY = 10L;
|
||||
private static final long DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY = 10L;
|
||||
|
||||
private final ILeaseManager<KinesisClientLease> leaseManager;
|
||||
private final long initialLeaseTableReadCapacity = 10L;
|
||||
private final long initialLeaseTableWriteCapacity = 10L;
|
||||
|
||||
private long initialLeaseTableReadCapacity = DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY;
|
||||
private long initialLeaseTableWriteCapacity = DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY;
|
||||
|
||||
/**
|
||||
* @param leaseManager Lease manager which provides CRUD lease operations.
|
||||
|
|
@ -78,6 +83,53 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator<KinesisClientLea
|
|||
this.leaseManager = leaseManager;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param leaseManager Lease manager which provides CRUD lease operations.
|
||||
* @param workerIdentifier Used to identify this worker process
|
||||
* @param leaseDurationMillis Duration of a lease in milliseconds
|
||||
* @param epsilonMillis Delta for timing operations (e.g. checking lease expiry)
|
||||
* @param maxLeasesForWorker Max leases this worker can handle at a time
|
||||
* @param maxLeasesToStealAtOneTime Steal up to this many leases at a time (for load balancing)
|
||||
* @param metricsFactory Metrics factory used to emit metrics
|
||||
*/
|
||||
public KinesisClientLibLeaseCoordinator(ILeaseManager<KinesisClientLease> leaseManager,
|
||||
String workerIdentifier,
|
||||
long leaseDurationMillis,
|
||||
long epsilonMillis,
|
||||
int maxLeasesForWorker,
|
||||
int maxLeasesToStealAtOneTime,
|
||||
IMetricsFactory metricsFactory) {
|
||||
super(leaseManager, workerIdentifier, leaseDurationMillis, epsilonMillis, maxLeasesForWorker,
|
||||
maxLeasesToStealAtOneTime, metricsFactory);
|
||||
this.leaseManager = leaseManager;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param readCapacity The DynamoDB table used for tracking leases will be provisioned with the specified initial
|
||||
* read capacity
|
||||
* @return KinesisClientLibLeaseCoordinator
|
||||
*/
|
||||
public KinesisClientLibLeaseCoordinator withInitialLeaseTableReadCapacity(long readCapacity) {
|
||||
if (readCapacity <= 0) {
|
||||
throw new IllegalArgumentException("readCapacity should be >= 1");
|
||||
}
|
||||
this.initialLeaseTableReadCapacity = readCapacity;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param writeCapacity The DynamoDB table used for tracking leases will be provisioned with the specified initial
|
||||
* write capacity
|
||||
* @return KinesisClientLibLeaseCoordinator
|
||||
*/
|
||||
public KinesisClientLibLeaseCoordinator withInitialLeaseTableWriteCapacity(long writeCapacity) {
|
||||
if (writeCapacity <= 0) {
|
||||
throw new IllegalArgumentException("writeCapacity should be >= 1");
|
||||
}
|
||||
this.initialLeaseTableWriteCapacity = writeCapacity;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the checkpoint for a shard and updates ownerSwitchesSinceCheckpoint.
|
||||
*
|
||||
|
|
@ -173,7 +225,9 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator<KinesisClientLea
|
|||
final boolean newTableCreated =
|
||||
leaseManager.createLeaseTableIfNotExists(initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity);
|
||||
if (newTableCreated) {
|
||||
LOG.info("Created new lease table for coordinator");
|
||||
LOG.info(String.format(
|
||||
"Created new lease table for coordinator with initial read capacity of %d and write capacity of %d.",
|
||||
initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity));
|
||||
}
|
||||
// Need to wait for table in active state.
|
||||
final long secondsBetweenPolls = 10L;
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
* Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
|
|
@ -16,6 +16,7 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
|||
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
|
@ -71,8 +72,8 @@ class ShardConsumer {
|
|||
* Used to track if we lost the primary responsibility. Once set to true, we will start shutting down.
|
||||
* If we regain primary responsibility before shutdown is complete, Worker should create a new ShardConsumer object.
|
||||
*/
|
||||
private boolean beginShutdown;
|
||||
private ShutdownReason shutdownReason;
|
||||
private volatile boolean beginShutdown;
|
||||
private volatile ShutdownReason shutdownReason;
|
||||
|
||||
/**
|
||||
* @param shardInfo Shard information
|
||||
|
|
@ -154,16 +155,26 @@ class ShardConsumer {
|
|||
}
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
} finally {
|
||||
// Setting future to null so we don't misinterpret task completion status in case of exceptions
|
||||
future = null;
|
||||
}
|
||||
}
|
||||
updateState(taskCompletedSuccessfully);
|
||||
ITask nextTask = getNextTask();
|
||||
if (nextTask != null) {
|
||||
currentTask = nextTask;
|
||||
future = executorService.submit(currentTask);
|
||||
currentTaskSubmitTime = System.currentTimeMillis();
|
||||
submittedNewTask = true;
|
||||
LOG.debug("Submitted new " + currentTask.getTaskType() + " task for shard " + shardInfo.getShardId());
|
||||
try {
|
||||
future = executorService.submit(currentTask);
|
||||
currentTaskSubmitTime = System.currentTimeMillis();
|
||||
submittedNewTask = true;
|
||||
LOG.debug("Submitted new " + currentTask.getTaskType()
|
||||
+ " task for shard " + shardInfo.getShardId());
|
||||
} catch (RejectedExecutionException e) {
|
||||
LOG.info(currentTask.getTaskType() + " task was not accepted for execution.", e);
|
||||
} catch (RuntimeException e) {
|
||||
LOG.info(currentTask.getTaskType() + " task encountered exception ", e);
|
||||
}
|
||||
} else {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(String.format("No new task to submit for shard %s, currentState %s",
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
|
|
@ -20,7 +20,8 @@ import java.util.Set;
|
|||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.SynchronousQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
|
|
@ -54,8 +55,10 @@ import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
|
|||
*/
|
||||
public class Worker implements Runnable {
|
||||
|
||||
private static final int MAX_INITIALIZATION_ATTEMPTS = 20;
|
||||
private static final Log LOG = LogFactory.getLog(Worker.class);
|
||||
|
||||
private static final int MAX_INITIALIZATION_ATTEMPTS = 20;
|
||||
|
||||
private WorkerLog wlog = new WorkerLog();
|
||||
|
||||
private final String applicationName;
|
||||
|
|
@ -71,12 +74,14 @@ public class Worker implements Runnable {
|
|||
private final IMetricsFactory metricsFactory;
|
||||
// Backoff time when running tasks if they encounter exceptions
|
||||
private final long taskBackoffTimeMillis;
|
||||
private final long failoverTimeMillis;
|
||||
|
||||
// private final KinesisClientLeaseManager leaseManager;
|
||||
private final KinesisClientLibLeaseCoordinator leaseCoordinator;
|
||||
private final ShardSyncTaskManager controlServer;
|
||||
|
||||
private volatile boolean shutdown;
|
||||
private volatile long shutdownStartTimeMillis;
|
||||
|
||||
// Holds consumers for shards the worker is currently tracking. Key is shard
|
||||
// info, value is ShardConsumer.
|
||||
|
|
@ -93,7 +98,7 @@ public class Worker implements Runnable {
|
|||
public Worker(
|
||||
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory,
|
||||
KinesisClientLibConfiguration config) {
|
||||
this(recordProcessorFactory, config, Executors.newCachedThreadPool());
|
||||
this(recordProcessorFactory, config, getExecutorService());
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -125,7 +130,7 @@ public class Worker implements Runnable {
|
|||
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory,
|
||||
KinesisClientLibConfiguration config,
|
||||
IMetricsFactory metricsFactory) {
|
||||
this(recordProcessorFactory, config, metricsFactory, Executors.newCachedThreadPool());
|
||||
this(recordProcessorFactory, config, metricsFactory, getExecutorService());
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -159,8 +164,7 @@ public class Worker implements Runnable {
|
|||
AmazonKinesis kinesisClient,
|
||||
AmazonDynamoDB dynamoDBClient,
|
||||
AmazonCloudWatch cloudWatchClient) {
|
||||
this(recordProcessorFactory, config, kinesisClient, dynamoDBClient, cloudWatchClient,
|
||||
Executors.newCachedThreadPool());
|
||||
this(recordProcessorFactory, config, kinesisClient, dynamoDBClient, cloudWatchClient, getExecutorService());
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -181,11 +185,6 @@ public class Worker implements Runnable {
|
|||
ExecutorService execService) {
|
||||
this(recordProcessorFactory, config, kinesisClient, dynamoDBClient,
|
||||
getMetricsFactory(cloudWatchClient, config), execService);
|
||||
if (config.getRegionName() != null) {
|
||||
Region region = RegionUtils.getRegion(config.getRegionName());
|
||||
cloudWatchClient.setRegion(region);
|
||||
LOG.debug("The region of Amazon CloudWatch client has been set to " + config.getRegionName());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -221,11 +220,18 @@ public class Worker implements Runnable {
|
|||
null,
|
||||
new KinesisClientLibLeaseCoordinator(
|
||||
new KinesisClientLeaseManager(config.getApplicationName(), dynamoDBClient),
|
||||
config.getWorkerIdentifier(), config.getFailoverTimeMillis(), config.getEpsilonMillis(),
|
||||
metricsFactory),
|
||||
config.getWorkerIdentifier(),
|
||||
config.getFailoverTimeMillis(),
|
||||
config.getEpsilonMillis(),
|
||||
config.getMaxLeasesForWorker(),
|
||||
config.getMaxLeasesToStealAtOneTime(),
|
||||
metricsFactory)
|
||||
.withInitialLeaseTableReadCapacity(config.getInitialLeaseTableReadCapacity())
|
||||
.withInitialLeaseTableWriteCapacity(config.getInitialLeaseTableWriteCapacity()),
|
||||
execService,
|
||||
metricsFactory,
|
||||
config.getTaskBackoffTimeMillis());
|
||||
config.getTaskBackoffTimeMillis(),
|
||||
config.getFailoverTimeMillis());
|
||||
// If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB.
|
||||
if (config.getRegionName() != null) {
|
||||
Region region = RegionUtils.getRegion(config.getRegionName());
|
||||
|
|
@ -279,7 +285,8 @@ public class Worker implements Runnable {
|
|||
KinesisClientLibLeaseCoordinator leaseCoordinator,
|
||||
ExecutorService execService,
|
||||
IMetricsFactory metricsFactory,
|
||||
long taskBackoffTimeMillis) {
|
||||
long taskBackoffTimeMillis,
|
||||
long failoverTimeMillis) {
|
||||
this.applicationName = applicationName;
|
||||
this.recordProcessorFactory = recordProcessorFactory;
|
||||
this.streamConfig = streamConfig;
|
||||
|
|
@ -300,6 +307,7 @@ public class Worker implements Runnable {
|
|||
metricsFactory,
|
||||
executorService);
|
||||
this.taskBackoffTimeMillis = taskBackoffTimeMillis;
|
||||
this.failoverTimeMillis = failoverTimeMillis;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -314,6 +322,10 @@ public class Worker implements Runnable {
|
|||
* record processors.
|
||||
*/
|
||||
public void run() {
|
||||
if (shutdown) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
initialize();
|
||||
LOG.info("Initialization complete. Starting worker loop.");
|
||||
|
|
@ -322,7 +334,7 @@ public class Worker implements Runnable {
|
|||
shutdown();
|
||||
}
|
||||
|
||||
while (!shutdown) {
|
||||
while (!shouldShutdown()) {
|
||||
try {
|
||||
boolean foundCompletedShard = false;
|
||||
Set<ShardInfo> assignedShards = new HashSet<ShardInfo>();
|
||||
|
|
@ -359,8 +371,8 @@ public class Worker implements Runnable {
|
|||
wlog.resetInfoLogging();
|
||||
}
|
||||
|
||||
LOG.info("Stopping LeaseCoordinator.");
|
||||
leaseCoordinator.stop();
|
||||
finalShutdown();
|
||||
LOG.info("Worker loop is complete. Exiting from worker.");
|
||||
}
|
||||
|
||||
private void initialize() {
|
||||
|
|
@ -425,7 +437,7 @@ public class Worker implements Runnable {
|
|||
void cleanupShardConsumers(Set<ShardInfo> assignedShards) {
|
||||
for (ShardInfo shard : shardInfoShardConsumerMap.keySet()) {
|
||||
if (!assignedShards.contains(shard)) {
|
||||
// Shutdown the consumer since we are not longer responsible for
|
||||
// Shutdown the consumer since we are no longer responsible for
|
||||
// the shard.
|
||||
boolean isShutdown = shardInfoShardConsumerMap.get(shard).beginShutdown();
|
||||
if (isShutdown) {
|
||||
|
|
@ -459,11 +471,60 @@ public class Worker implements Runnable {
|
|||
}
|
||||
|
||||
/**
|
||||
* Sets the killed flag so this worker will stop on the next iteration of
|
||||
* its loop.
|
||||
* Signals worker to shutdown. Worker will try initiating shutdown of all record processors. Note that
|
||||
* if executor services were passed to the worker by the user, worker will not attempt to shutdown
|
||||
* those resources.
|
||||
*/
|
||||
public void shutdown() {
|
||||
this.shutdown = true;
|
||||
LOG.info("Worker shutdown requested.");
|
||||
|
||||
// Set shutdown flag, so Worker.run can start shutdown process.
|
||||
shutdown = true;
|
||||
shutdownStartTimeMillis = System.currentTimeMillis();
|
||||
|
||||
// Stop lease coordinator, so leases are not renewed or stolen from other workers.
|
||||
// Lost leases will force Worker to begin shutdown process for all shard consumers in
|
||||
// Worker.run().
|
||||
leaseCoordinator.stop();
|
||||
}
|
||||
|
||||
/**
|
||||
* Perform final shutdown related tasks for the worker including shutting down worker owned
|
||||
* executor services, threads, etc.
|
||||
*/
|
||||
private void finalShutdown() {
|
||||
LOG.info("Starting worker's final shutdown.");
|
||||
|
||||
if (executorService instanceof WorkerThreadPoolExecutor) {
|
||||
// This should interrupt all active record processor tasks.
|
||||
executorService.shutdownNow();
|
||||
}
|
||||
if (metricsFactory instanceof WorkerCWMetricsFactory) {
|
||||
((CWMetricsFactory) metricsFactory).shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns whether worker can shutdown immediately. Note that this method is called from Worker's {{@link #run()}
|
||||
* method before every loop run, so method must do minimum amount of work to not impact shard processing timings.
|
||||
* @return Whether worker should shutdown immediately.
|
||||
*/
|
||||
private boolean shouldShutdown() {
|
||||
if (executorService.isShutdown()) {
|
||||
LOG.error("Worker executor service has been shutdown, so record processors cannot be shutdown.");
|
||||
return true;
|
||||
}
|
||||
if (shutdown) {
|
||||
if (shardInfoShardConsumerMap.isEmpty()) {
|
||||
LOG.info("All record processors have been shutdown successfully.");
|
||||
return true;
|
||||
}
|
||||
if ((System.currentTimeMillis() - shutdownStartTimeMillis) >= failoverTimeMillis) {
|
||||
LOG.info("Lease failover time is reached, so forcing shutdown.");
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -475,33 +536,31 @@ public class Worker implements Runnable {
|
|||
* @return ShardConsumer for the shard
|
||||
*/
|
||||
ShardConsumer createOrGetShardConsumer(ShardInfo shardInfo, IRecordProcessorFactory factory) {
|
||||
synchronized (shardInfoShardConsumerMap) {
|
||||
ShardConsumer consumer = shardInfoShardConsumerMap.get(shardInfo);
|
||||
// Instantiate a new consumer if we don't have one, or the one we
|
||||
// had was from an earlier
|
||||
// lease instance (and was shutdown). Don't need to create another
|
||||
// one if the shard has been
|
||||
// completely processed (shutdown reason terminate).
|
||||
if ((consumer == null)
|
||||
|| (consumer.isShutdown() && consumer.getShutdownReason().equals(ShutdownReason.ZOMBIE))) {
|
||||
IRecordProcessor recordProcessor = factory.createProcessor();
|
||||
ShardConsumer consumer = shardInfoShardConsumerMap.get(shardInfo);
|
||||
// Instantiate a new consumer if we don't have one, or the one we
|
||||
// had was from an earlier
|
||||
// lease instance (and was shutdown). Don't need to create another
|
||||
// one if the shard has been
|
||||
// completely processed (shutdown reason terminate).
|
||||
if ((consumer == null)
|
||||
|| (consumer.isShutdown() && consumer.getShutdownReason().equals(ShutdownReason.ZOMBIE))) {
|
||||
IRecordProcessor recordProcessor = factory.createProcessor();
|
||||
|
||||
consumer =
|
||||
new ShardConsumer(shardInfo,
|
||||
streamConfig,
|
||||
checkpointTracker,
|
||||
recordProcessor,
|
||||
leaseCoordinator.getLeaseManager(),
|
||||
parentShardPollIntervalMillis,
|
||||
cleanupLeasesUponShardCompletion,
|
||||
executorService,
|
||||
metricsFactory,
|
||||
taskBackoffTimeMillis);
|
||||
shardInfoShardConsumerMap.put(shardInfo, consumer);
|
||||
wlog.infoForce("Created new shardConsumer for : " + shardInfo);
|
||||
}
|
||||
return consumer;
|
||||
consumer =
|
||||
new ShardConsumer(shardInfo,
|
||||
streamConfig,
|
||||
checkpointTracker,
|
||||
recordProcessor,
|
||||
leaseCoordinator.getLeaseManager(),
|
||||
parentShardPollIntervalMillis,
|
||||
cleanupLeasesUponShardCompletion,
|
||||
executorService,
|
||||
metricsFactory,
|
||||
taskBackoffTimeMillis);
|
||||
shardInfoShardConsumerMap.put(shardInfo, consumer);
|
||||
wlog.infoForce("Created new shardConsumer for : " + shardInfo);
|
||||
}
|
||||
return consumer;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -648,14 +707,65 @@ public class Worker implements Runnable {
|
|||
*/
|
||||
private static IMetricsFactory getMetricsFactory(
|
||||
AmazonCloudWatch cloudWatchClient, KinesisClientLibConfiguration config) {
|
||||
return config.getMetricsLevel() == MetricsLevel.NONE
|
||||
? new NullMetricsFactory() : new CWMetricsFactory(
|
||||
cloudWatchClient,
|
||||
config.getApplicationName(),
|
||||
config.getMetricsBufferTimeMillis(),
|
||||
config.getMetricsMaxQueueSize(),
|
||||
config.getMetricsLevel(),
|
||||
config.getMetricsEnabledDimensions());
|
||||
IMetricsFactory metricsFactory;
|
||||
if (config.getMetricsLevel() == MetricsLevel.NONE) {
|
||||
metricsFactory = new NullMetricsFactory();
|
||||
} else {
|
||||
if (config.getRegionName() != null) {
|
||||
Region region = RegionUtils.getRegion(config.getRegionName());
|
||||
cloudWatchClient.setRegion(region);
|
||||
LOG.debug("The region of Amazon CloudWatch client has been set to " + config.getRegionName());
|
||||
}
|
||||
metricsFactory = new WorkerCWMetricsFactory(
|
||||
cloudWatchClient,
|
||||
config.getApplicationName(),
|
||||
config.getMetricsBufferTimeMillis(),
|
||||
config.getMetricsMaxQueueSize(),
|
||||
config.getMetricsLevel(),
|
||||
config.getMetricsEnabledDimensions());
|
||||
}
|
||||
return metricsFactory;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns default executor service that should be used by the worker.
|
||||
* @return Default executor service that should be used by the worker.
|
||||
*/
|
||||
private static ExecutorService getExecutorService() {
|
||||
return new WorkerThreadPoolExecutor();
|
||||
}
|
||||
|
||||
/**
|
||||
* Extension to CWMetricsFactory, so worker can identify whether it owns the metrics factory instance
|
||||
* or not.
|
||||
* Visible and non-final only for testing.
|
||||
*/
|
||||
static class WorkerCWMetricsFactory extends CWMetricsFactory {
|
||||
|
||||
WorkerCWMetricsFactory(AmazonCloudWatch cloudWatchClient,
|
||||
String namespace,
|
||||
long bufferTimeMillis,
|
||||
int maxQueueSize,
|
||||
MetricsLevel metricsLevel,
|
||||
Set<String> metricsEnabledDimensions) {
|
||||
super(cloudWatchClient, namespace, bufferTimeMillis,
|
||||
maxQueueSize, metricsLevel, metricsEnabledDimensions);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Extension to ThreadPoolExecutor, so worker can identify whether it owns the executor service instance
|
||||
* or not.
|
||||
* Visible and non-final only for testing.
|
||||
*/
|
||||
static class WorkerThreadPoolExecutor extends ThreadPoolExecutor {
|
||||
private static final long DEFAULT_KEEP_ALIVE_TIME = 60L;
|
||||
|
||||
WorkerThreadPoolExecutor() {
|
||||
// Defaults are based on Executors.newCachedThreadPool()
|
||||
super(0, Integer.MAX_VALUE, DEFAULT_KEEP_ALIVE_TIME, TimeUnit.SECONDS,
|
||||
new SynchronousQueue<Runnable>());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -785,8 +895,9 @@ public class Worker implements Runnable {
|
|||
throw new IllegalArgumentException(
|
||||
"A Record Processor Factory needs to be provided to build Worker");
|
||||
}
|
||||
|
||||
if (execService == null) {
|
||||
execService = Executors.newCachedThreadPool();
|
||||
execService = getExecutorService();
|
||||
}
|
||||
if (kinesisClient == null) {
|
||||
kinesisClient = new AmazonKinesisClient(config.getKinesisCredentialsProvider(),
|
||||
|
|
@ -846,10 +957,15 @@ public class Worker implements Runnable {
|
|||
config.getWorkerIdentifier(),
|
||||
config.getFailoverTimeMillis(),
|
||||
config.getEpsilonMillis(),
|
||||
metricsFactory),
|
||||
config.getMaxLeasesForWorker(),
|
||||
config.getMaxLeasesToStealAtOneTime(),
|
||||
metricsFactory)
|
||||
.withInitialLeaseTableReadCapacity(config.getInitialLeaseTableReadCapacity())
|
||||
.withInitialLeaseTableWriteCapacity(config.getInitialLeaseTableWriteCapacity()),
|
||||
execService,
|
||||
metricsFactory,
|
||||
config.getTaskBackoffTimeMillis());
|
||||
config.getTaskBackoffTimeMillis(),
|
||||
config.getFailoverTimeMillis());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
|
|
@ -17,9 +17,12 @@ package com.amazonaws.services.kinesis.leases.impl;
|
|||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
|
|
@ -58,16 +61,26 @@ public class LeaseCoordinator<T extends Lease> {
|
|||
// Time to wait for in-flight Runnables to finish when calling .stop();
|
||||
private static final long STOP_WAIT_TIME_MILLIS = 2000L;
|
||||
|
||||
private static final ThreadFactory THREAD_FACTORY = new NamedThreadFactory("LeaseCoordinator-");
|
||||
private static final int DEFAULT_MAX_LEASES_FOR_WORKER = Integer.MAX_VALUE;
|
||||
private static final int DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME = 1;
|
||||
|
||||
private static final ThreadFactory LEASE_COORDINATOR_THREAD_FACTORY = new NamedThreadFactory("LeaseCoordinator-");
|
||||
private static final ThreadFactory LEASE_RENEWAL_THREAD_FACTORY = new NamedThreadFactory("LeaseRenewer-");
|
||||
|
||||
// Package level access for testing.
|
||||
static final int MAX_LEASE_RENEWAL_THREAD_COUNT = 20;
|
||||
|
||||
private final ILeaseRenewer<T> leaseRenewer;
|
||||
private final ILeaseTaker<T> leaseTaker;
|
||||
private final long renewerIntervalMillis;
|
||||
private final long takerIntervalMillis;
|
||||
|
||||
private final Object shutdownLock = new Object();
|
||||
|
||||
protected final IMetricsFactory metricsFactory;
|
||||
|
||||
private ScheduledExecutorService threadpool;
|
||||
private ScheduledExecutorService leaseCoordinatorThreadPool;
|
||||
private ExecutorService leaseRenewalThreadpool;
|
||||
private volatile boolean running = false;
|
||||
|
||||
/**
|
||||
|
|
@ -99,17 +112,47 @@ public class LeaseCoordinator<T extends Lease> {
|
|||
long leaseDurationMillis,
|
||||
long epsilonMillis,
|
||||
IMetricsFactory metricsFactory) {
|
||||
this.leaseTaker = new LeaseTaker<T>(leaseManager, workerIdentifier, leaseDurationMillis);
|
||||
this.leaseRenewer = new LeaseRenewer<T>(leaseManager, workerIdentifier, leaseDurationMillis);
|
||||
this(leaseManager, workerIdentifier, leaseDurationMillis, epsilonMillis,
|
||||
DEFAULT_MAX_LEASES_FOR_WORKER, DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME, metricsFactory);
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
*
|
||||
* @param leaseManager LeaseManager instance to use
|
||||
* @param workerIdentifier Identifies the worker (e.g. useful to track lease ownership)
|
||||
* @param leaseDurationMillis Duration of a lease
|
||||
* @param epsilonMillis Allow for some variance when calculating lease expirations
|
||||
* @param maxLeasesForWorker Max leases this Worker can handle at a time
|
||||
* @param maxLeasesToStealAtOneTime Steal up to these many leases at a time (for load balancing)
|
||||
* @param metricsFactory Used to publish metrics about lease operations
|
||||
*/
|
||||
public LeaseCoordinator(ILeaseManager<T> leaseManager,
|
||||
String workerIdentifier,
|
||||
long leaseDurationMillis,
|
||||
long epsilonMillis,
|
||||
int maxLeasesForWorker,
|
||||
int maxLeasesToStealAtOneTime,
|
||||
IMetricsFactory metricsFactory) {
|
||||
this.leaseRenewalThreadpool = getLeaseRenewalExecutorService(MAX_LEASE_RENEWAL_THREAD_COUNT);
|
||||
this.leaseTaker = new LeaseTaker<T>(leaseManager, workerIdentifier, leaseDurationMillis)
|
||||
.withMaxLeasesForWorker(maxLeasesForWorker)
|
||||
.withMaxLeasesToStealAtOneTime(maxLeasesToStealAtOneTime);
|
||||
this.leaseRenewer = new LeaseRenewer<T>(
|
||||
leaseManager, workerIdentifier, leaseDurationMillis, leaseRenewalThreadpool);
|
||||
this.renewerIntervalMillis = leaseDurationMillis / 3 - epsilonMillis;
|
||||
this.takerIntervalMillis = (leaseDurationMillis + epsilonMillis) * 2;
|
||||
this.metricsFactory = metricsFactory;
|
||||
|
||||
LOG.info(String.format("With failover time %dms and epsilon %dms, LeaseCoordinator will renew leases every %dms and take leases every %dms",
|
||||
LOG.info(String.format(
|
||||
"With failover time %d ms and epsilon %d ms, LeaseCoordinator will renew leases every %d ms, take" +
|
||||
"leases every %d ms, process maximum of %d leases and steal %d lease(s) at a time.",
|
||||
leaseDurationMillis,
|
||||
epsilonMillis,
|
||||
renewerIntervalMillis,
|
||||
takerIntervalMillis));
|
||||
takerIntervalMillis,
|
||||
maxLeasesForWorker,
|
||||
maxLeasesToStealAtOneTime));
|
||||
}
|
||||
|
||||
private class TakerRunnable implements Runnable {
|
||||
|
|
@ -152,12 +195,12 @@ public class LeaseCoordinator<T extends Lease> {
|
|||
leaseRenewer.initialize();
|
||||
|
||||
// 2 because we know we'll have at most 2 concurrent tasks at a time.
|
||||
threadpool = Executors.newScheduledThreadPool(2, THREAD_FACTORY);
|
||||
leaseCoordinatorThreadPool = Executors.newScheduledThreadPool(2, LEASE_COORDINATOR_THREAD_FACTORY);
|
||||
|
||||
// Taker runs with fixed DELAY because we want it to run slower in the event of performance degredation.
|
||||
threadpool.scheduleWithFixedDelay(new TakerRunnable(), 0L, takerIntervalMillis, TimeUnit.MILLISECONDS);
|
||||
leaseCoordinatorThreadPool.scheduleWithFixedDelay(new TakerRunnable(), 0L, takerIntervalMillis, TimeUnit.MILLISECONDS);
|
||||
// Renewer runs at fixed INTERVAL because we want it to run at the same rate in the event of degredation.
|
||||
threadpool.scheduleAtFixedRate(new RenewerRunnable(), 0L, renewerIntervalMillis, TimeUnit.MILLISECONDS);
|
||||
leaseCoordinatorThreadPool.scheduleAtFixedRate(new RenewerRunnable(), 0L, renewerIntervalMillis, TimeUnit.MILLISECONDS);
|
||||
running = true;
|
||||
}
|
||||
|
||||
|
|
@ -175,7 +218,13 @@ public class LeaseCoordinator<T extends Lease> {
|
|||
try {
|
||||
Map<String, T> takenLeases = leaseTaker.takeLeases();
|
||||
|
||||
leaseRenewer.addLeasesToRenew(takenLeases.values());
|
||||
// Only add taken leases to renewer if coordinator is still running.
|
||||
synchronized (shutdownLock) {
|
||||
if (running) {
|
||||
leaseRenewer.addLeasesToRenew(takenLeases.values());
|
||||
}
|
||||
}
|
||||
|
||||
success = true;
|
||||
} finally {
|
||||
scope.addDimension(WORKER_IDENTIFIER_METRIC, getWorkerIdentifier());
|
||||
|
|
@ -229,16 +278,19 @@ public class LeaseCoordinator<T extends Lease> {
|
|||
}
|
||||
|
||||
/**
|
||||
* Stops background threads.
|
||||
* Stops background threads and waits for {@link #STOP_WAIT_TIME_MILLIS} for all background tasks to complete.
|
||||
* If tasks are not completed after this time, method will shutdown thread pool forcefully and return.
|
||||
*/
|
||||
public void stop() {
|
||||
if (threadpool != null) {
|
||||
threadpool.shutdown();
|
||||
if (leaseCoordinatorThreadPool != null) {
|
||||
leaseCoordinatorThreadPool.shutdown();
|
||||
try {
|
||||
if (threadpool.awaitTermination(STOP_WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)) {
|
||||
LOG.info(String.format("Worker %s has successfully stopped lease-tracking threads", leaseTaker.getWorkerIdentifier()));
|
||||
if (leaseCoordinatorThreadPool.awaitTermination(STOP_WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)) {
|
||||
LOG.info(String.format("Worker %s has successfully stopped lease-tracking threads",
|
||||
leaseTaker.getWorkerIdentifier()));
|
||||
} else {
|
||||
threadpool.shutdownNow();
|
||||
leaseCoordinatorThreadPool.shutdownNow();
|
||||
leaseRenewalThreadpool.shutdownNow();
|
||||
LOG.info(String.format("Worker %s stopped lease-tracking threads %dms after stop",
|
||||
leaseTaker.getWorkerIdentifier(),
|
||||
STOP_WAIT_TIME_MILLIS));
|
||||
|
|
@ -250,8 +302,10 @@ public class LeaseCoordinator<T extends Lease> {
|
|||
LOG.debug("Threadpool was null, no need to shutdown/terminate threadpool.");
|
||||
}
|
||||
|
||||
leaseRenewer.clearCurrentlyHeldLeases();
|
||||
running = false;
|
||||
synchronized (shutdownLock) {
|
||||
leaseRenewer.clearCurrentlyHeldLeases();
|
||||
running = false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -278,4 +332,15 @@ public class LeaseCoordinator<T extends Lease> {
|
|||
return leaseRenewer.updateLease(lease, concurrencyToken);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns executor service that should be used for lease renewal.
|
||||
* @param maximumPoolSize Maximum allowed thread pool size
|
||||
* @return Executor service that should be used for lease renewal.
|
||||
*/
|
||||
private static ExecutorService getLeaseRenewalExecutorService(int maximumPoolSize) {
|
||||
ThreadPoolExecutor exec = new ThreadPoolExecutor(maximumPoolSize, maximumPoolSize,
|
||||
60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), LEASE_RENEWAL_THREAD_FACTORY);
|
||||
exec.allowCoreThreadTimeOut(true);
|
||||
return exec;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
|
|
@ -14,14 +14,19 @@
|
|||
*/
|
||||
package com.amazonaws.services.kinesis.leases.impl;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ConcurrentNavigableMap;
|
||||
import java.util.concurrent.ConcurrentSkipListMap;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
|
|
@ -34,6 +39,8 @@ import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputExc
|
|||
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
|
||||
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseRenewer;
|
||||
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
|
||||
import com.amazonaws.services.kinesis.metrics.impl.ThreadSafeMetricsDelegatingScope;
|
||||
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope;
|
||||
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
|
||||
|
||||
/**
|
||||
|
|
@ -48,6 +55,7 @@ public class LeaseRenewer<T extends Lease> implements ILeaseRenewer<T> {
|
|||
private final ConcurrentNavigableMap<String, T> ownedLeases = new ConcurrentSkipListMap<String, T>();
|
||||
private final String workerIdentifier;
|
||||
private final long leaseDurationNanos;
|
||||
private final ExecutorService executorService;
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
|
|
@ -55,11 +63,14 @@ public class LeaseRenewer<T extends Lease> implements ILeaseRenewer<T> {
|
|||
* @param leaseManager LeaseManager to use
|
||||
* @param workerIdentifier identifier of this worker
|
||||
* @param leaseDurationMillis duration of a lease in milliseconds
|
||||
* @param executorService ExecutorService to use for renewing leases in parallel
|
||||
*/
|
||||
public LeaseRenewer(ILeaseManager<T> leaseManager, String workerIdentifier, long leaseDurationMillis) {
|
||||
public LeaseRenewer(ILeaseManager<T> leaseManager, String workerIdentifier, long leaseDurationMillis,
|
||||
ExecutorService executorService) {
|
||||
this.leaseManager = leaseManager;
|
||||
this.workerIdentifier = workerIdentifier;
|
||||
this.leaseDurationNanos = TimeUnit.MILLISECONDS.toNanos(leaseDurationMillis);
|
||||
this.executorService = executorService;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -76,24 +87,79 @@ public class LeaseRenewer<T extends Lease> implements ILeaseRenewer<T> {
|
|||
ownedLeases));
|
||||
}
|
||||
|
||||
/*
|
||||
* Lease renewals are done in parallel so many leases can be renewed for short lease fail over time
|
||||
* configuration. In this case, metrics scope is also shared across different threads, so scope must be thread
|
||||
* safe.
|
||||
*/
|
||||
IMetricsScope renewLeaseTaskMetricsScope = new ThreadSafeMetricsDelegatingScope(
|
||||
MetricsHelper.getMetricsScope());
|
||||
|
||||
/*
|
||||
* We iterate in descending order here so that the synchronized(lease) inside renewLease doesn't "lead" calls
|
||||
* to getCurrentlyHeldLeases. They'll still cross paths, but they won't interleave their executions.
|
||||
*/
|
||||
int lostLeases = 0;
|
||||
List<Future<Boolean>> renewLeaseTasks = new ArrayList<Future<Boolean>>();
|
||||
for (T lease : ownedLeases.descendingMap().values()) {
|
||||
if (!renewLease(lease)) {
|
||||
lostLeases++;
|
||||
renewLeaseTasks.add(executorService.submit(new RenewLeaseTask(lease, renewLeaseTaskMetricsScope)));
|
||||
}
|
||||
int leasesInUnknownState = 0;
|
||||
Exception lastException = null;
|
||||
for (Future<Boolean> renewLeaseTask : renewLeaseTasks) {
|
||||
try {
|
||||
if (!renewLeaseTask.get()) {
|
||||
lostLeases++;
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
LOG.info("Interrupted while waiting for a lease to renew.");
|
||||
leasesInUnknownState += 1;
|
||||
Thread.currentThread().interrupt();
|
||||
} catch (ExecutionException e) {
|
||||
LOG.error("Encountered an exception while renewing a lease.", e.getCause());
|
||||
leasesInUnknownState += 1;
|
||||
lastException = e;
|
||||
}
|
||||
}
|
||||
|
||||
MetricsHelper.getMetricsScope().addData(
|
||||
renewLeaseTaskMetricsScope.addData(
|
||||
"LostLeases", lostLeases, StandardUnit.Count, MetricsLevel.SUMMARY);
|
||||
MetricsHelper.getMetricsScope().addData(
|
||||
renewLeaseTaskMetricsScope.addData(
|
||||
"CurrentLeases", ownedLeases.size(), StandardUnit.Count, MetricsLevel.SUMMARY);
|
||||
if (leasesInUnknownState > 0) {
|
||||
throw new DependencyException(String.format("Encountered an exception while renewing leases. "
|
||||
+ "The number of leases which might not have been renewed is %d",
|
||||
leasesInUnknownState),
|
||||
lastException);
|
||||
}
|
||||
}
|
||||
|
||||
private class RenewLeaseTask implements Callable<Boolean> {
|
||||
|
||||
private final T lease;
|
||||
private final IMetricsScope metricsScope;
|
||||
|
||||
public RenewLeaseTask(T lease, IMetricsScope metricsScope) {
|
||||
this.lease = lease;
|
||||
this.metricsScope = metricsScope;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Boolean call() throws Exception {
|
||||
MetricsHelper.setMetricsScope(metricsScope);
|
||||
try {
|
||||
return renewLease(lease);
|
||||
} finally {
|
||||
MetricsHelper.unsetMetricsScope();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private boolean renewLease(T lease) throws DependencyException, InvalidStateException {
|
||||
return renewLease(lease, false);
|
||||
}
|
||||
|
||||
private boolean renewLease(T lease, boolean renewEvenIfExpired) throws DependencyException, InvalidStateException {
|
||||
String leaseKey = lease.getLeaseKey();
|
||||
|
||||
boolean success = false;
|
||||
|
|
@ -103,7 +169,12 @@ public class LeaseRenewer<T extends Lease> implements ILeaseRenewer<T> {
|
|||
for (int i = 1; i <= RENEWAL_RETRIES; i++) {
|
||||
try {
|
||||
synchronized (lease) {
|
||||
renewedLease = leaseManager.renewLease(lease);
|
||||
// Don't renew expired lease during regular renewals. getCopyOfHeldLease may have returned null
|
||||
// triggering the application processing to treat this as a lost lease (fail checkpoint with
|
||||
// ShutdownException).
|
||||
if (renewEvenIfExpired || (!lease.isExpired(leaseDurationNanos, System.nanoTime()))) {
|
||||
renewedLease = leaseManager.renewLease(lease);
|
||||
}
|
||||
if (renewedLease) {
|
||||
lease.setLastCounterIncrementNanos(System.nanoTime());
|
||||
}
|
||||
|
|
@ -305,11 +376,15 @@ public class LeaseRenewer<T extends Lease> implements ILeaseRenewer<T> {
|
|||
public void initialize() throws DependencyException, InvalidStateException, ProvisionedThroughputException {
|
||||
Collection<T> leases = leaseManager.listLeases();
|
||||
List<T> myLeases = new LinkedList<T>();
|
||||
boolean renewEvenIfExpired = true;
|
||||
|
||||
for (T lease : leases) {
|
||||
if (workerIdentifier.equals(lease.getLeaseOwner())) {
|
||||
LOG.info(String.format(" Worker %s found lease %s", workerIdentifier, lease));
|
||||
if (renewLease(lease)) {
|
||||
// Okay to renew even if lease is expired, because we start with an empty list and we add the lease to
|
||||
// our list only after a successful renew. So we don't need to worry about the edge case where we could
|
||||
// continue renewing a lease after signaling a lease loss to the application.
|
||||
if (renewLease(lease, renewEvenIfExpired)) {
|
||||
myLeases.add(lease);
|
||||
}
|
||||
} else {
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
|
|
@ -22,7 +22,6 @@ import java.util.HashSet;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
|
@ -63,8 +62,9 @@ public class LeaseTaker<T extends Lease> implements ILeaseTaker<T> {
|
|||
private final String workerIdentifier;
|
||||
private final Map<String, T> allLeases = new HashMap<String, T>();
|
||||
private final long leaseDurationNanos;
|
||||
private int maxLeasesForWorker = Integer.MAX_VALUE;
|
||||
private int maxLeasesToStealAtOneTime = 1;
|
||||
|
||||
private Random random = new Random();
|
||||
private long lastScanTimeNanos = 0L;
|
||||
|
||||
public LeaseTaker(ILeaseManager<T> leaseManager, String workerIdentifier, long leaseDurationMillis) {
|
||||
|
|
@ -73,6 +73,43 @@ public class LeaseTaker<T extends Lease> implements ILeaseTaker<T> {
|
|||
this.leaseDurationNanos = TimeUnit.MILLISECONDS.toNanos(leaseDurationMillis);
|
||||
}
|
||||
|
||||
/**
|
||||
* Worker will not acquire more than the specified max number of leases even if there are more
|
||||
* shards that need to be processed. This can be used in scenarios where a worker is resource constrained or
|
||||
* to prevent lease thrashing when small number of workers pick up all leases for small amount of time during
|
||||
* deployment.
|
||||
* Note that setting a low value may cause data loss (e.g. if there aren't enough Workers to make progress on all
|
||||
* shards). When setting the value for this property, one must ensure enough workers are present to process
|
||||
* shards and should consider future resharding, child shards that may be blocked on parent shards, some workers
|
||||
* becoming unhealthy, etc.
|
||||
*
|
||||
* @param maxLeasesForWorker Max leases this Worker can handle at a time
|
||||
* @return LeaseTaker
|
||||
*/
|
||||
public LeaseTaker<T> withMaxLeasesForWorker(int maxLeasesForWorker) {
|
||||
if (maxLeasesForWorker <= 0) {
|
||||
throw new IllegalArgumentException("maxLeasesForWorker should be >= 1");
|
||||
}
|
||||
this.maxLeasesForWorker = maxLeasesForWorker;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Max leases to steal from a more loaded Worker at one time (for load balancing).
|
||||
* Setting this to a higher number can allow for faster load convergence (e.g. during deployments, cold starts),
|
||||
* but can cause higher churn in the system.
|
||||
*
|
||||
* @param maxLeasesToStealAtOneTime Steal up to this many leases at one time (for load balancing)
|
||||
* @return LeaseTaker
|
||||
*/
|
||||
public LeaseTaker<T> withMaxLeasesToStealAtOneTime(int maxLeasesToStealAtOneTime) {
|
||||
if (maxLeasesToStealAtOneTime <= 0) {
|
||||
throw new IllegalArgumentException("maxLeasesToStealAtOneTime should be >= 1");
|
||||
}
|
||||
this.maxLeasesToStealAtOneTime = maxLeasesToStealAtOneTime;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
|
|
@ -293,6 +330,7 @@ public class LeaseTaker<T extends Lease> implements ILeaseTaker<T> {
|
|||
private Set<T> computeLeasesToTake(List<T> expiredLeases) {
|
||||
Map<String, Integer> leaseCounts = computeLeaseCounts(expiredLeases);
|
||||
Set<T> leasesToTake = new HashSet<T>();
|
||||
IMetricsScope metrics = MetricsHelper.getMetricsScope();
|
||||
|
||||
int numLeases = allLeases.size();
|
||||
int numWorkers = leaseCounts.size();
|
||||
|
|
@ -313,6 +351,22 @@ public class LeaseTaker<T extends Lease> implements ILeaseTaker<T> {
|
|||
* Our target for each worker is numLeases / numWorkers (+1 if numWorkers doesn't evenly divide numLeases)
|
||||
*/
|
||||
target = numLeases / numWorkers + (numLeases % numWorkers == 0 ? 0 : 1);
|
||||
|
||||
// Spill over is the number of leases this worker should have claimed, but did not because it would
|
||||
// exceed the max allowed for this worker.
|
||||
int leaseSpillover = Math.max(0, target - maxLeasesForWorker);
|
||||
if (target > maxLeasesForWorker) {
|
||||
LOG.warn(String.format("Worker %s target is %d leases and maxLeasesForWorker is %d."
|
||||
+ " Resetting target to %d, lease spillover is %d. "
|
||||
+ " Note that some shards may not be processed if no other workers are able to pick them up.",
|
||||
workerIdentifier,
|
||||
target,
|
||||
maxLeasesForWorker,
|
||||
maxLeasesForWorker,
|
||||
leaseSpillover));
|
||||
target = maxLeasesForWorker;
|
||||
}
|
||||
metrics.addData("LeaseSpillover", leaseSpillover, StandardUnit.Count, MetricsLevel.SUMMARY);
|
||||
}
|
||||
|
||||
int myCount = leaseCounts.get(workerIdentifier);
|
||||
|
|
@ -333,9 +387,9 @@ public class LeaseTaker<T extends Lease> implements ILeaseTaker<T> {
|
|||
leasesToTake.add(expiredLeases.remove(0));
|
||||
}
|
||||
} else {
|
||||
// If there are no expired leases and we need a lease, consider stealing one
|
||||
T leaseToSteal = chooseLeaseToSteal(leaseCounts, numLeasesToReachTarget, target);
|
||||
if (leaseToSteal != null) {
|
||||
// If there are no expired leases and we need a lease, consider stealing.
|
||||
List<T> leasesToSteal = chooseLeasesToSteal(leaseCounts, numLeasesToReachTarget, target);
|
||||
for (T leaseToSteal : leasesToSteal) {
|
||||
LOG.info(String.format("Worker %s needed %d leases but none were expired, so it will steal lease %s from %s",
|
||||
workerIdentifier,
|
||||
numLeasesToReachTarget,
|
||||
|
|
@ -357,7 +411,6 @@ public class LeaseTaker<T extends Lease> implements ILeaseTaker<T> {
|
|||
leasesToTake.size()));
|
||||
}
|
||||
|
||||
IMetricsScope metrics = MetricsHelper.getMetricsScope();
|
||||
metrics.addData("TotalLeases", numLeases, StandardUnit.Count, MetricsLevel.DETAILED);
|
||||
metrics.addData("ExpiredLeases", originalExpiredLeasesSize, StandardUnit.Count, MetricsLevel.SUMMARY);
|
||||
metrics.addData("NumWorkers", numWorkers, StandardUnit.Count, MetricsLevel.SUMMARY);
|
||||
|
|
@ -368,17 +421,21 @@ public class LeaseTaker<T extends Lease> implements ILeaseTaker<T> {
|
|||
}
|
||||
|
||||
/**
|
||||
* Choose a lease to steal by randomly selecting one from the most loaded worker. Stealing rules:
|
||||
* Choose leases to steal by randomly selecting one or more (up to max) from the most loaded worker.
|
||||
* Stealing rules:
|
||||
*
|
||||
* Steal one lease from the most loaded worker if
|
||||
* a) he has > target leases and I need >= 1 leases
|
||||
* b) he has == target leases and I need > 1 leases
|
||||
* Steal up to maxLeasesToStealAtOneTime leases from the most loaded worker if
|
||||
* a) he has > target leases and I need >= 1 leases : steal min(leases needed, maxLeasesToStealAtOneTime)
|
||||
* b) he has == target leases and I need > 1 leases : steal 1
|
||||
*
|
||||
* @param leaseCounts map of workerIdentifier to lease count
|
||||
* @param needed # of leases needed to reach the target leases for the worker
|
||||
* @param target target # of leases per worker
|
||||
* @return Lease to steal, or null if we should not steal
|
||||
* @return Leases to steal, or empty list if we should not steal
|
||||
*/
|
||||
private T chooseLeaseToSteal(Map<String, Integer> leaseCounts, int needed, int target) {
|
||||
private List<T> chooseLeasesToSteal(Map<String, Integer> leaseCounts, int needed, int target) {
|
||||
List<T> leasesToSteal = new ArrayList<>();
|
||||
|
||||
Entry<String, Integer> mostLoadedWorker = null;
|
||||
// Find the most loaded worker
|
||||
for (Entry<String, Integer> worker : leaseCounts.entrySet()) {
|
||||
|
|
@ -387,7 +444,18 @@ public class LeaseTaker<T extends Lease> implements ILeaseTaker<T> {
|
|||
}
|
||||
}
|
||||
|
||||
if (mostLoadedWorker.getValue() < target + (needed > 1 ? 0 : 1)) {
|
||||
int numLeasesToSteal = 0;
|
||||
if ((mostLoadedWorker.getValue() >= target) && (needed > 0)) {
|
||||
int leasesOverTarget = mostLoadedWorker.getValue() - target;
|
||||
numLeasesToSteal = Math.min(needed, leasesOverTarget);
|
||||
// steal 1 if we need > 1 and max loaded worker has target leases.
|
||||
if ((needed > 1) && (numLeasesToSteal == 0)) {
|
||||
numLeasesToSteal = 1;
|
||||
}
|
||||
numLeasesToSteal = Math.min(numLeasesToSteal, maxLeasesToStealAtOneTime);
|
||||
}
|
||||
|
||||
if (numLeasesToSteal <= 0) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(String.format("Worker %s not stealing from most loaded worker %s. He has %d,"
|
||||
+ " target is %d, and I need %d",
|
||||
|
|
@ -397,8 +465,18 @@ public class LeaseTaker<T extends Lease> implements ILeaseTaker<T> {
|
|||
target,
|
||||
needed));
|
||||
}
|
||||
|
||||
return null;
|
||||
return leasesToSteal;
|
||||
} else {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(String.format("Worker %s will attempt to steal %d leases from most loaded worker %s. "
|
||||
+ " He has %d leases, target is %d, I need %d, maxLeasesToSteatAtOneTime is %d.",
|
||||
workerIdentifier,
|
||||
mostLoadedWorker.getKey(),
|
||||
mostLoadedWorker.getValue(),
|
||||
target,
|
||||
needed,
|
||||
maxLeasesToStealAtOneTime));
|
||||
}
|
||||
}
|
||||
|
||||
String mostLoadedWorkerIdentifier = mostLoadedWorker.getKey();
|
||||
|
|
@ -410,9 +488,12 @@ public class LeaseTaker<T extends Lease> implements ILeaseTaker<T> {
|
|||
}
|
||||
}
|
||||
|
||||
// Return a random one
|
||||
int randomIndex = random.nextInt(candidates.size());
|
||||
return candidates.get(randomIndex);
|
||||
// Return random ones
|
||||
Collections.shuffle(candidates);
|
||||
int toIndex = Math.min(candidates.size(), numLeasesToSteal);
|
||||
leasesToSteal.addAll(candidates.subList(0, toIndex));
|
||||
|
||||
return leasesToSteal;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -62,7 +62,7 @@ public class DefaultCWMetricsPublisher implements ICWMetricsPublisher<CWMetricKe
|
|||
try {
|
||||
cloudWatchClient.putMetricData(request);
|
||||
|
||||
LOG.info(String.format("Successfully published %d datums.", endIndex - startIndex));
|
||||
LOG.debug(String.format("Successfully published %d datums.", endIndex - startIndex));
|
||||
} catch (AmazonClientException e) {
|
||||
LOG.warn(String.format("Could not publish %d datums to CloudWatch", endIndex - startIndex), e);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
|
|
@ -63,6 +63,29 @@ public class MetricsHelper {
|
|||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets given metrics scope for the current thread.
|
||||
*
|
||||
* Method must be used with care. Metrics helper is designed such that separate metrics scopes are associated
|
||||
* with each thread. However, when sharing metrics scope and setting it explicitly on a thread, thread safety must
|
||||
* also be taken into account.
|
||||
* @param scope
|
||||
*/
|
||||
public static void setMetricsScope(IMetricsScope scope) {
|
||||
if (currentScope.get() != null) {
|
||||
throw new RuntimeException(String.format(
|
||||
"Metrics scope is already set for the current thread %s", Thread.currentThread().getName()));
|
||||
}
|
||||
currentScope.set(scope);
|
||||
}
|
||||
|
||||
/**
|
||||
* Unsets the metrics scope for the current thread.
|
||||
*/
|
||||
public static void unsetMetricsScope() {
|
||||
currentScope.remove();
|
||||
}
|
||||
|
||||
public static IMetricsScope getMetricsScope() {
|
||||
IMetricsScope result = currentScope.get();
|
||||
if (result == null) {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,44 @@
|
|||
/*
|
||||
* Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.metrics.impl;
|
||||
|
||||
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
|
||||
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope;
|
||||
|
||||
/**
|
||||
* Metrics scope factory that delegates metrics scope creation to another factory, but
|
||||
* returns metrics scope that is thread safe.
|
||||
*/
|
||||
public class ThreadSafeMetricsDelegatingFactory implements IMetricsFactory {
|
||||
|
||||
/** Metrics factory to delegate to. */
|
||||
private final IMetricsFactory delegate;
|
||||
|
||||
/**
|
||||
* Creates an instance of the metrics factory.
|
||||
* @param delegate metrics factory to delegate to
|
||||
*/
|
||||
public ThreadSafeMetricsDelegatingFactory(IMetricsFactory delegate) {
|
||||
this.delegate = delegate;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public IMetricsScope createMetrics() {
|
||||
return new ThreadSafeMetricsDelegatingScope(delegate.createMetrics());
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,70 @@
|
|||
/*
|
||||
* Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.metrics.impl;
|
||||
|
||||
import com.amazonaws.services.cloudwatch.model.StandardUnit;
|
||||
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope;
|
||||
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
|
||||
|
||||
/**
|
||||
* Metrics scope that delegates to another metrics scope and is thread safe to be shared
|
||||
* across different threads.
|
||||
*/
|
||||
public class ThreadSafeMetricsDelegatingScope implements IMetricsScope {
|
||||
|
||||
/** Metrics scope to delegate to. */
|
||||
private final IMetricsScope delegate;
|
||||
|
||||
/**
|
||||
* Creates an instance of the metrics scope.
|
||||
* @param delegate metrics scope to delegate to
|
||||
*/
|
||||
public ThreadSafeMetricsDelegatingScope(IMetricsScope delegate) {
|
||||
this.delegate = delegate;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public synchronized void addData(String name, double value, StandardUnit unit) {
|
||||
delegate.addData(name, value, unit);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public synchronized void addData(String name, double value, StandardUnit unit, MetricsLevel level) {
|
||||
delegate.addData(name, value, unit, level);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public synchronized void addDimension(String name, String value) {
|
||||
delegate.addDimension(name, value);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public synchronized void end() {
|
||||
delegate.end();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
* Copyright 2014-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
|
|
@ -41,6 +41,9 @@ public class MultiLangRecordProcessor implements IRecordProcessor {
|
|||
private static final Log LOG = LogFactory.getLog(MultiLangRecordProcessor.class);
|
||||
private static final int EXIT_VALUE = 1;
|
||||
|
||||
/** Whether or not record processor initialization is successful. Defaults to false. */
|
||||
private volatile boolean initialized;
|
||||
|
||||
private String shardId;
|
||||
|
||||
private Future<?> stderrReadTask;
|
||||
|
|
@ -115,8 +118,7 @@ public class MultiLangRecordProcessor implements IRecordProcessor {
|
|||
* The process builder has thrown an exception while starting the child process so we would like to shut
|
||||
* down
|
||||
*/
|
||||
stopProcessing("Failed to start client executable", e);
|
||||
return;
|
||||
throw new IOException("Failed to start client executable", e);
|
||||
}
|
||||
// Initialize all of our utility objects that will handle interacting with the process over
|
||||
// STDIN/STDOUT/STDERR
|
||||
|
|
@ -131,7 +133,10 @@ public class MultiLangRecordProcessor implements IRecordProcessor {
|
|||
if (!protocol.initialize()) {
|
||||
throw new RuntimeException("Failed to initialize child process");
|
||||
}
|
||||
|
||||
initialized = true;
|
||||
} catch (Throwable t) {
|
||||
// Any exception in initialize results in MultiLangDaemon shutdown.
|
||||
stopProcessing("Encountered an error while trying to initialize record processor", t);
|
||||
}
|
||||
}
|
||||
|
|
@ -149,6 +154,15 @@ public class MultiLangRecordProcessor implements IRecordProcessor {
|
|||
|
||||
@Override
|
||||
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) {
|
||||
// In cases where KCL loses lease for the shard after creating record processor instance but before
|
||||
// record processor initialize() is called, then shutdown() may be called directly before initialize().
|
||||
if (!initialized) {
|
||||
LOG.info("Record processor was not initialized and will not have a child process, "
|
||||
+ "so not invoking child process shutdown.");
|
||||
this.state = ProcessState.SHUTDOWN;
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
if (ProcessState.ACTIVE.equals(this.state)) {
|
||||
if (!protocol.shutdown(checkpointer, reason)) {
|
||||
|
|
|
|||
Loading…
Reference in a new issue