Version 1.3.0 of the Amazon Kinesis Client Library
A new metric called "MillisBehindLatest", which tracks how far consumers are from real time, is now uploaded to CloudWatch.
This commit is contained in:
parent
0fc90ff787
commit
1861f12db7
13 changed files with 44 additions and 40 deletions
|
|
@ -2,7 +2,7 @@ Manifest-Version: 1.0
|
|||
Bundle-ManifestVersion: 2
|
||||
Bundle-Name: Amazon Kinesis Client Library for Java
|
||||
Bundle-SymbolicName: com.amazonaws.kinesisclientlibrary;singleton:=true
|
||||
Bundle-Version: 1.2.1
|
||||
Bundle-Version: 1.3.0
|
||||
Bundle-Vendor: Amazon Technologies, Inc
|
||||
Bundle-RequiredExecutionEnvironment: JavaSE-1.7
|
||||
Require-Bundle: org.apache.commons.codec;bundle-version="1.6",
|
||||
|
|
@ -12,7 +12,7 @@ Require-Bundle: org.apache.commons.codec;bundle-version="1.6",
|
|||
com.fasterxml.jackson.core.jackson-annotations;bundle-version="2.3.0",
|
||||
org.apache.httpcomponents.httpcore;bundle-version="4.3.2",
|
||||
org.apache.httpcomponents.httpclient;bundle-version="4.3.4"
|
||||
com.amazonaws.sdk;bundle-version="1.9.16",
|
||||
com.amazonaws.sdk;bundle-version="1.9.37",
|
||||
Export-Package: com.amazonaws.services.kinesis,
|
||||
com.amazonaws.services.kinesis.clientlibrary,
|
||||
com.amazonaws.services.kinesis.clientlibrary.config,
|
||||
|
|
|
|||
|
|
@ -26,6 +26,9 @@ After you've downloaded the code from GitHub, you can build it using Maven. To d
|
|||
To make it easier for developers to write record processors in other languages, we have implemented a Java based daemon, called MultiLangDaemon that does all the heavy lifting. Our approach has the daemon spawn a sub-process, which in turn runs the record processor, which can be written in any language. The MultiLangDaemon process and the record processor sub-process communicate with each other over [STDIN and STDOUT using a defined protocol][multi-lang-protocol]. There will be a one to one correspondence amongst record processors, child processes, and shards. For Python developers specifically, we have abstracted these implementation details away and [expose an interface][kclpy] that enables you to focus on writing record processing logic in Python. This approach enables KCL to be language agnostic, while providing identical features and similar parallel processing model across all languages.
|
||||
|
||||
## Release Notes
|
||||
### Release 1.3.0 (May 22, 2015)
|
||||
* A new metric called "MillisBehindLatest", which tracks how far consumers are from real time, is now uploaded to CloudWatch.
|
||||
|
||||
### Release 1.2.1 (January 26, 2015)
|
||||
* **MultiLangDaemon** Changes to the MultiLangDaemon to make it easier to provide a custom worker.
|
||||
|
||||
|
|
|
|||
4
pom.xml
4
pom.xml
|
|
@ -6,7 +6,7 @@
|
|||
<artifactId>amazon-kinesis-client</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
<name>Amazon Kinesis Client Library for Java</name>
|
||||
<version>1.2.1</version>
|
||||
<version>1.3.0</version>
|
||||
<description>The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data from Amazon Kinesis.</description>
|
||||
<url>https://aws.amazon.com/kinesis</url>
|
||||
|
||||
|
|
@ -23,7 +23,7 @@
|
|||
</licenses>
|
||||
|
||||
<properties>
|
||||
<aws-java-sdk.version>1.9.16</aws-java-sdk.version>
|
||||
<aws-java-sdk.version>1.9.37</aws-java-sdk.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
|||
|
|
@ -85,7 +85,7 @@ public class KinesisClientLibConfiguration {
|
|||
/**
|
||||
* User agent set when Amazon Kinesis Client Library makes AWS requests.
|
||||
*/
|
||||
public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.2.1";
|
||||
public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.3.0";
|
||||
|
||||
/**
|
||||
* KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls
|
||||
|
|
|
|||
|
|
@ -14,13 +14,10 @@
|
|||
*/
|
||||
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import com.amazonaws.services.kinesis.model.GetRecordsResult;
|
||||
import com.amazonaws.services.kinesis.model.Record;
|
||||
import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
|
||||
import com.amazonaws.services.kinesis.model.ShardIteratorType;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.SentinelCheckpoint;
|
||||
|
|
@ -57,17 +54,15 @@ class KinesisDataFetcher {
|
|||
* @param maxRecords Max records to fetch
|
||||
* @return list of records of up to maxRecords size
|
||||
*/
|
||||
public List<Record> getRecords(int maxRecords) {
|
||||
public GetRecordsResult getRecords(int maxRecords) {
|
||||
if (!isInitialized) {
|
||||
throw new IllegalArgumentException("KinesisDataFetcher.getRecords called before initialization.");
|
||||
}
|
||||
|
||||
List<Record> records = null;
|
||||
GetRecordsResult response = null;
|
||||
if (nextIterator != null) {
|
||||
try {
|
||||
response = kinesisProxy.get(nextIterator, maxRecords);
|
||||
records = response.getRecords();
|
||||
nextIterator = response.getNextShardIterator();
|
||||
} catch (ResourceNotFoundException e) {
|
||||
LOG.info("Caught ResourceNotFoundException when fetching records for shard " + shardId);
|
||||
|
|
@ -80,7 +75,7 @@ class KinesisDataFetcher {
|
|||
isShardEndReached = true;
|
||||
}
|
||||
|
||||
return records;
|
||||
return response;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
|
||||
import com.amazonaws.services.kinesis.model.GetRecordsResult;
|
||||
import com.amazonaws.services.kinesis.model.Record;
|
||||
import com.amazonaws.services.cloudwatch.model.StandardUnit;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibException;
|
||||
|
|
@ -37,6 +38,8 @@ class ProcessTask implements ITask {
|
|||
private static final String EXPIRED_ITERATOR_METRIC = "ExpiredIterator";
|
||||
private static final String DATA_BYTES_PROCESSED_METRIC = "DataBytesProcessed";
|
||||
private static final String RECORDS_PROCESSED_METRIC = "RecordsProcessed";
|
||||
private static final String MILLIS_BEHIND_LATEST_METRIC = "MillisBehindLatest";
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(ProcessTask.class);
|
||||
|
||||
private final ShardInfo shardInfo;
|
||||
|
|
@ -93,7 +96,14 @@ class ProcessTask implements ITask {
|
|||
boolean shardEndReached = true;
|
||||
return new TaskResult(null, shardEndReached);
|
||||
}
|
||||
List<Record> records = getRecords();
|
||||
final GetRecordsResult getRecordsResult = getRecords();
|
||||
|
||||
if (getRecordsResult.getMillisBehindLatest() != null) {
|
||||
scope.addData(MILLIS_BEHIND_LATEST_METRIC, getRecordsResult.getMillisBehindLatest(),
|
||||
StandardUnit.Milliseconds);
|
||||
}
|
||||
|
||||
final List<Record> records = getRecordsResult.getRecords();
|
||||
|
||||
if (records.isEmpty()) {
|
||||
LOG.debug("Kinesis didn't return any records for shard " + shardInfo.getShardId());
|
||||
|
|
@ -180,7 +190,7 @@ class ProcessTask implements ITask {
|
|||
* @throws KinesisClientLibException if reading checkpoints fails in the edge case where we haven't passed any
|
||||
* records to the client code yet
|
||||
*/
|
||||
private List<Record> getRecords() throws KinesisClientLibException {
|
||||
private GetRecordsResult getRecords() throws KinesisClientLibException {
|
||||
int maxRecords = streamConfig.getMaxRecords();
|
||||
try {
|
||||
return dataFetcher.getRecords(maxRecords);
|
||||
|
|
|
|||
|
|
@ -49,7 +49,7 @@ public class KinesisClientLease extends Lease {
|
|||
}
|
||||
KinesisClientLease casted = (KinesisClientLease) other;
|
||||
|
||||
// Do not update ownerSwitchesSinceCheckpoint here - that field is maintained by the leasing library.
|
||||
setOwnerSwitchesSinceCheckpoint(casted.ownerSwitchesSinceCheckpoint);
|
||||
setCheckpoint(casted.checkpoint);
|
||||
setParentShardIds(casted.parentShardIds);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -95,16 +95,12 @@ public class KinesisClientLeaseSerializer implements ILeaseSerializer<KinesisCli
|
|||
public Map<String, AttributeValueUpdate> getDynamoTakeLeaseUpdate(KinesisClientLease lease, String newOwner) {
|
||||
Map<String, AttributeValueUpdate> result = baseSerializer.getDynamoTakeLeaseUpdate(lease, newOwner);
|
||||
|
||||
Long ownerSwitchesSinceCheckpoint = lease.getOwnerSwitchesSinceCheckpoint();
|
||||
String oldOwner = lease.getLeaseOwner();
|
||||
if (oldOwner != null && !oldOwner.equals(newOwner)) {
|
||||
ownerSwitchesSinceCheckpoint++;
|
||||
result.put(OWNER_SWITCHES_KEY, new AttributeValueUpdate(DynamoUtils.createAttributeValue(1L),
|
||||
AttributeAction.ADD));
|
||||
}
|
||||
|
||||
result.put(OWNER_SWITCHES_KEY,
|
||||
new AttributeValueUpdate(DynamoUtils.createAttributeValue(ownerSwitchesSinceCheckpoint),
|
||||
AttributeAction.PUT));
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@
|
|||
package com.amazonaws.services.kinesis.leases.impl;
|
||||
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import com.amazonaws.util.json.JSONObject;
|
||||
|
||||
|
|
@ -31,11 +32,8 @@ public class Lease {
|
|||
*
|
||||
* Sometimes System.nanoTime's return values will wrap due to overflow. When they do, the difference between two
|
||||
* values will be very large. We will consider leases to be expired if they are more than a year old.
|
||||
*
|
||||
* 365 days per year * 24 hours per day * 60 minutes per hour * 60 seconds per minute * 1000000000
|
||||
* nanoseconds/second
|
||||
*/
|
||||
private static final long MAX_ABS_AGE_NANOS = 365 * 24 * 60 * 60 * 1000000000L;
|
||||
private static final long MAX_ABS_AGE_NANOS = TimeUnit.DAYS.toNanos(365);
|
||||
|
||||
private String leaseKey;
|
||||
private String leaseOwner;
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@ package com.amazonaws.services.kinesis.leases.impl;
|
|||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
|
@ -160,14 +161,14 @@ public class LeaseManager<T extends Lease> implements ILeaseManager<T> {
|
|||
|
||||
@Override
|
||||
public boolean waitUntilLeaseTableExists(long secondsBetweenPolls, long timeoutSeconds) throws DependencyException {
|
||||
long sleepTimeRemaining = timeoutSeconds * 1000;
|
||||
long sleepTimeRemaining = TimeUnit.SECONDS.toMillis(timeoutSeconds);
|
||||
|
||||
while (!leaseTableExists()) {
|
||||
if (sleepTimeRemaining <= 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
long timeToSleepMillis = Math.min(1000 * secondsBetweenPolls, sleepTimeRemaining);
|
||||
long timeToSleepMillis = Math.min(TimeUnit.SECONDS.toMillis(secondsBetweenPolls), sleepTimeRemaining);
|
||||
|
||||
sleepTimeRemaining -= sleep(timeToSleepMillis);
|
||||
}
|
||||
|
|
@ -385,7 +386,7 @@ public class LeaseManager<T extends Lease> implements ILeaseManager<T> {
|
|||
verifyNotNull(owner, "owner cannot be null");
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(String.format("Taking lease with shardId %s from %s to %s",
|
||||
LOG.debug(String.format("Taking lease with leaseKey %s from %s to %s",
|
||||
lease.getLeaseKey(),
|
||||
lease.getLeaseOwner() == null ? "nobody" : lease.getLeaseOwner(),
|
||||
owner));
|
||||
|
|
@ -428,7 +429,7 @@ public class LeaseManager<T extends Lease> implements ILeaseManager<T> {
|
|||
verifyNotNull(lease, "lease cannot be null");
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(String.format("Voiding lease with shardId %s owned by %s",
|
||||
LOG.debug(String.format("Evicting lease with leaseKey %s owned by %s",
|
||||
lease.getLeaseKey(),
|
||||
lease.getLeaseOwner()));
|
||||
}
|
||||
|
|
@ -485,7 +486,7 @@ public class LeaseManager<T extends Lease> implements ILeaseManager<T> {
|
|||
verifyNotNull(lease, "lease cannot be null");
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(String.format("Deleting lease with shardId %s", lease.getLeaseKey()));
|
||||
LOG.debug(String.format("Deleting lease with leaseKey %s", lease.getLeaseKey()));
|
||||
}
|
||||
|
||||
DeleteItemRequest deleteRequest = new DeleteItemRequest();
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@ import java.util.Map;
|
|||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentNavigableMap;
|
||||
import java.util.concurrent.ConcurrentSkipListMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
|
@ -57,7 +58,7 @@ public class LeaseRenewer<T extends Lease> implements ILeaseRenewer<T> {
|
|||
public LeaseRenewer(ILeaseManager<T> leaseManager, String workerIdentifier, long leaseDurationMillis) {
|
||||
this.leaseManager = leaseManager;
|
||||
this.workerIdentifier = workerIdentifier;
|
||||
this.leaseDurationNanos = leaseDurationMillis * 1000000L;
|
||||
this.leaseDurationNanos = TimeUnit.MILLISECONDS.toNanos(leaseDurationMillis);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -25,6 +25,7 @@ import java.util.Map.Entry;
|
|||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
|
@ -68,7 +69,7 @@ public class LeaseTaker<T extends Lease> implements ILeaseTaker<T> {
|
|||
public LeaseTaker(ILeaseManager<T> leaseManager, String workerIdentifier, long leaseDurationMillis) {
|
||||
this.leaseManager = leaseManager;
|
||||
this.workerIdentifier = workerIdentifier;
|
||||
this.leaseDurationNanos = leaseDurationMillis * 1000000;
|
||||
this.leaseDurationNanos = TimeUnit.MILLISECONDS.toNanos(leaseDurationMillis);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -80,16 +80,16 @@ public class MetricsHelper {
|
|||
public static void addSuccessAndLatency(String prefix, long startTimeMillis, boolean success) {
|
||||
addSuccessAndLatencyPerShard(null, prefix, startTimeMillis, success);
|
||||
}
|
||||
|
||||
|
||||
public static void addSuccessAndLatencyPerShard (
|
||||
String shardId,
|
||||
String prefix,
|
||||
long startTimeMillis,
|
||||
String shardId,
|
||||
String prefix,
|
||||
long startTimeMillis,
|
||||
boolean success) {
|
||||
IMetricsScope scope = getMetricsScope();
|
||||
|
||||
String realPrefix = prefix == null ? "" : prefix + SEP;
|
||||
|
||||
|
||||
if (shardId != null) {
|
||||
scope.addDimension("ShardId", shardId);
|
||||
}
|
||||
|
|
@ -103,10 +103,9 @@ public class MetricsHelper {
|
|||
public static void endScope() {
|
||||
IMetricsScope scope = getMetricsScope();
|
||||
if (scope != null) {
|
||||
Integer refCount = referenceCount.get();
|
||||
refCount--;
|
||||
referenceCount.set(referenceCount.get() - 1);
|
||||
|
||||
if (refCount == 0) {
|
||||
if (referenceCount.get() == 0) {
|
||||
scope.end();
|
||||
currentScope.remove();
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue