Merge remote-tracking branch 'upstream/master'

This commit is contained in:
André Pinto 2015-06-02 14:28:29 -03:00
commit 04c588e9ca
13 changed files with 44 additions and 40 deletions

View file

@ -2,7 +2,7 @@ Manifest-Version: 1.0
Bundle-ManifestVersion: 2
Bundle-Name: Amazon Kinesis Client Library for Java
Bundle-SymbolicName: com.amazonaws.kinesisclientlibrary;singleton:=true
Bundle-Version: 1.2.1
Bundle-Version: 1.3.0
Bundle-Vendor: Amazon Technologies, Inc
Bundle-RequiredExecutionEnvironment: JavaSE-1.7
Require-Bundle: org.apache.commons.codec;bundle-version="1.6",
@ -12,7 +12,7 @@ Require-Bundle: org.apache.commons.codec;bundle-version="1.6",
com.fasterxml.jackson.core.jackson-annotations;bundle-version="2.3.0",
org.apache.httpcomponents.httpcore;bundle-version="4.3.2",
org.apache.httpcomponents.httpclient;bundle-version="4.3.4"
com.amazonaws.sdk;bundle-version="1.9.16",
com.amazonaws.sdk;bundle-version="1.9.37",
Export-Package: com.amazonaws.services.kinesis,
com.amazonaws.services.kinesis.clientlibrary,
com.amazonaws.services.kinesis.clientlibrary.config,

View file

@ -26,6 +26,9 @@ After you've downloaded the code from GitHub, you can build it using Maven. To d
To make it easier for developers to write record processors in other languages, we have implemented a Java based daemon, called MultiLangDaemon that does all the heavy lifting. Our approach has the daemon spawn a sub-process, which in turn runs the record processor, which can be written in any language. The MultiLangDaemon process and the record processor sub-process communicate with each other over [STDIN and STDOUT using a defined protocol][multi-lang-protocol]. There will be a one to one correspondence amongst record processors, child processes, and shards. For Python developers specifically, we have abstracted these implementation details away and [expose an interface][kclpy] that enables you to focus on writing record processing logic in Python. This approach enables KCL to be language agnostic, while providing identical features and similar parallel processing model across all languages.
## Release Notes
### Release 1.3.0 (May 22, 2015)
* A new metric called "MillisBehindLatest", which tracks how far consumers are from real time, is now uploaded to CloudWatch.
### Release 1.2.1 (January 26, 2015)
* **MultiLangDaemon** Changes to the MultiLangDaemon to make it easier to provide a custom worker.

View file

@ -6,7 +6,7 @@
<artifactId>amazon-kinesis-client</artifactId>
<packaging>jar</packaging>
<name>Amazon Kinesis Client Library for Java</name>
<version>1.2.1</version>
<version>1.3.0</version>
<description>The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data from Amazon Kinesis.</description>
<url>https://aws.amazon.com/kinesis</url>
@ -23,7 +23,7 @@
</licenses>
<properties>
<aws-java-sdk.version>1.9.16</aws-java-sdk.version>
<aws-java-sdk.version>1.9.37</aws-java-sdk.version>
</properties>
<dependencies>

View file

@ -88,7 +88,7 @@ public class KinesisClientLibConfiguration {
/**
* User agent set when Amazon Kinesis Client Library makes AWS requests.
*/
public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.2.1";
public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.3.0";
/**
* KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls

View file

@ -14,13 +14,10 @@
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.SentinelCheckpoint;
@ -57,17 +54,15 @@ class KinesisDataFetcher {
* @param maxRecords Max records to fetch
* @return list of records of up to maxRecords size
*/
public List<Record> getRecords(int maxRecords) {
public GetRecordsResult getRecords(int maxRecords) {
if (!isInitialized) {
throw new IllegalArgumentException("KinesisDataFetcher.getRecords called before initialization.");
}
List<Record> records = null;
GetRecordsResult response = null;
if (nextIterator != null) {
try {
response = kinesisProxy.get(nextIterator, maxRecords);
records = response.getRecords();
nextIterator = response.getNextShardIterator();
} catch (ResourceNotFoundException e) {
LOG.info("Caught ResourceNotFoundException when fetching records for shard " + shardId);
@ -80,7 +75,7 @@ class KinesisDataFetcher {
isShardEndReached = true;
}
return records;
return response;
}
/**

View file

@ -22,6 +22,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.cloudwatch.model.StandardUnit;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibException;
@ -37,6 +38,8 @@ class ProcessTask implements ITask {
private static final String EXPIRED_ITERATOR_METRIC = "ExpiredIterator";
private static final String DATA_BYTES_PROCESSED_METRIC = "DataBytesProcessed";
private static final String RECORDS_PROCESSED_METRIC = "RecordsProcessed";
private static final String MILLIS_BEHIND_LATEST_METRIC = "MillisBehindLatest";
private static final Log LOG = LogFactory.getLog(ProcessTask.class);
private final ShardInfo shardInfo;
@ -93,7 +96,14 @@ class ProcessTask implements ITask {
boolean shardEndReached = true;
return new TaskResult(null, shardEndReached);
}
List<Record> records = getRecords();
final GetRecordsResult getRecordsResult = getRecords();
if (getRecordsResult.getMillisBehindLatest() != null) {
scope.addData(MILLIS_BEHIND_LATEST_METRIC, getRecordsResult.getMillisBehindLatest(),
StandardUnit.Milliseconds);
}
final List<Record> records = getRecordsResult.getRecords();
if (records.isEmpty()) {
LOG.debug("Kinesis didn't return any records for shard " + shardInfo.getShardId());
@ -180,7 +190,7 @@ class ProcessTask implements ITask {
* @throws KinesisClientLibException if reading checkpoints fails in the edge case where we haven't passed any
* records to the client code yet
*/
private List<Record> getRecords() throws KinesisClientLibException {
private GetRecordsResult getRecords() throws KinesisClientLibException {
int maxRecords = streamConfig.getMaxRecords();
try {
return dataFetcher.getRecords(maxRecords);

View file

@ -49,7 +49,7 @@ public class KinesisClientLease extends Lease {
}
KinesisClientLease casted = (KinesisClientLease) other;
// Do not update ownerSwitchesSinceCheckpoint here - that field is maintained by the leasing library.
setOwnerSwitchesSinceCheckpoint(casted.ownerSwitchesSinceCheckpoint);
setCheckpoint(casted.checkpoint);
setParentShardIds(casted.parentShardIds);
}

View file

@ -95,16 +95,12 @@ public class KinesisClientLeaseSerializer implements ILeaseSerializer<KinesisCli
public Map<String, AttributeValueUpdate> getDynamoTakeLeaseUpdate(KinesisClientLease lease, String newOwner) {
Map<String, AttributeValueUpdate> result = baseSerializer.getDynamoTakeLeaseUpdate(lease, newOwner);
Long ownerSwitchesSinceCheckpoint = lease.getOwnerSwitchesSinceCheckpoint();
String oldOwner = lease.getLeaseOwner();
if (oldOwner != null && !oldOwner.equals(newOwner)) {
ownerSwitchesSinceCheckpoint++;
result.put(OWNER_SWITCHES_KEY, new AttributeValueUpdate(DynamoUtils.createAttributeValue(1L),
AttributeAction.ADD));
}
result.put(OWNER_SWITCHES_KEY,
new AttributeValueUpdate(DynamoUtils.createAttributeValue(ownerSwitchesSinceCheckpoint),
AttributeAction.PUT));
return result;
}

View file

@ -15,6 +15,7 @@
package com.amazonaws.services.kinesis.leases.impl;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import com.amazonaws.util.json.JSONObject;
@ -31,11 +32,8 @@ public class Lease {
*
* Sometimes System.nanoTime's return values will wrap due to overflow. When they do, the difference between two
* values will be very large. We will consider leases to be expired if they are more than a year old.
*
* 365 days per year * 24 hours per day * 60 minutes per hour * 60 seconds per minute * 1000000000
* nanoseconds/second
*/
private static final long MAX_ABS_AGE_NANOS = 365 * 24 * 60 * 60 * 1000000000L;
private static final long MAX_ABS_AGE_NANOS = TimeUnit.DAYS.toNanos(365);
private String leaseKey;
private String leaseOwner;

View file

@ -17,6 +17,7 @@ package com.amazonaws.services.kinesis.leases.impl;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -160,14 +161,14 @@ public class LeaseManager<T extends Lease> implements ILeaseManager<T> {
@Override
public boolean waitUntilLeaseTableExists(long secondsBetweenPolls, long timeoutSeconds) throws DependencyException {
long sleepTimeRemaining = timeoutSeconds * 1000;
long sleepTimeRemaining = TimeUnit.SECONDS.toMillis(timeoutSeconds);
while (!leaseTableExists()) {
if (sleepTimeRemaining <= 0) {
return false;
}
long timeToSleepMillis = Math.min(1000 * secondsBetweenPolls, sleepTimeRemaining);
long timeToSleepMillis = Math.min(TimeUnit.SECONDS.toMillis(secondsBetweenPolls), sleepTimeRemaining);
sleepTimeRemaining -= sleep(timeToSleepMillis);
}
@ -385,7 +386,7 @@ public class LeaseManager<T extends Lease> implements ILeaseManager<T> {
verifyNotNull(owner, "owner cannot be null");
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Taking lease with shardId %s from %s to %s",
LOG.debug(String.format("Taking lease with leaseKey %s from %s to %s",
lease.getLeaseKey(),
lease.getLeaseOwner() == null ? "nobody" : lease.getLeaseOwner(),
owner));
@ -428,7 +429,7 @@ public class LeaseManager<T extends Lease> implements ILeaseManager<T> {
verifyNotNull(lease, "lease cannot be null");
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Voiding lease with shardId %s owned by %s",
LOG.debug(String.format("Evicting lease with leaseKey %s owned by %s",
lease.getLeaseKey(),
lease.getLeaseOwner()));
}
@ -485,7 +486,7 @@ public class LeaseManager<T extends Lease> implements ILeaseManager<T> {
verifyNotNull(lease, "lease cannot be null");
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Deleting lease with shardId %s", lease.getLeaseKey()));
LOG.debug(String.format("Deleting lease with leaseKey %s", lease.getLeaseKey()));
}
DeleteItemRequest deleteRequest = new DeleteItemRequest();

View file

@ -22,6 +22,7 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -57,7 +58,7 @@ public class LeaseRenewer<T extends Lease> implements ILeaseRenewer<T> {
public LeaseRenewer(ILeaseManager<T> leaseManager, String workerIdentifier, long leaseDurationMillis) {
this.leaseManager = leaseManager;
this.workerIdentifier = workerIdentifier;
this.leaseDurationNanos = leaseDurationMillis * 1000000L;
this.leaseDurationNanos = TimeUnit.MILLISECONDS.toNanos(leaseDurationMillis);
}
/**

View file

@ -25,6 +25,7 @@ import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -68,7 +69,7 @@ public class LeaseTaker<T extends Lease> implements ILeaseTaker<T> {
public LeaseTaker(ILeaseManager<T> leaseManager, String workerIdentifier, long leaseDurationMillis) {
this.leaseManager = leaseManager;
this.workerIdentifier = workerIdentifier;
this.leaseDurationNanos = leaseDurationMillis * 1000000;
this.leaseDurationNanos = TimeUnit.MILLISECONDS.toNanos(leaseDurationMillis);
}
/**

View file

@ -80,16 +80,16 @@ public class MetricsHelper {
public static void addSuccessAndLatency(String prefix, long startTimeMillis, boolean success) {
addSuccessAndLatencyPerShard(null, prefix, startTimeMillis, success);
}
public static void addSuccessAndLatencyPerShard (
String shardId,
String prefix,
long startTimeMillis,
String shardId,
String prefix,
long startTimeMillis,
boolean success) {
IMetricsScope scope = getMetricsScope();
String realPrefix = prefix == null ? "" : prefix + SEP;
if (shardId != null) {
scope.addDimension("ShardId", shardId);
}
@ -103,10 +103,9 @@ public class MetricsHelper {
public static void endScope() {
IMetricsScope scope = getMetricsScope();
if (scope != null) {
Integer refCount = referenceCount.get();
refCount--;
referenceCount.set(referenceCount.get() - 1);
if (refCount == 0) {
if (referenceCount.get() == 0) {
scope.end();
currentScope.remove();
}