Version 1.6.0 of the Amazon Kinesis Client Library
This commit is contained in:
parent
b596a0223e
commit
97e606ffeb
7 changed files with 78 additions and 38 deletions
|
|
@ -2,7 +2,7 @@ Manifest-Version: 1.0
|
|||
Bundle-ManifestVersion: 2
|
||||
Bundle-Name: Amazon Kinesis Client Library for Java
|
||||
Bundle-SymbolicName: com.amazonaws.kinesisclientlibrary;singleton:=true
|
||||
Bundle-Version: 1.5.1
|
||||
Bundle-Version: 1.6.0
|
||||
Bundle-Vendor: Amazon Technologies, Inc
|
||||
Bundle-RequiredExecutionEnvironment: JavaSE-1.7
|
||||
Require-Bundle: org.apache.commons.codec;bundle-version="1.6",
|
||||
|
|
@ -12,7 +12,7 @@ Require-Bundle: org.apache.commons.codec;bundle-version="1.6",
|
|||
com.fasterxml.jackson.core.jackson-annotations;bundle-version="2.3.0",
|
||||
org.apache.httpcomponents.httpcore;bundle-version="4.3.2",
|
||||
org.apache.httpcomponents.httpclient;bundle-version="4.3.4"
|
||||
com.amazonaws.sdk;bundle-version="1.9.37",
|
||||
com.amazonaws.sdk;bundle-version="1.10.8",
|
||||
Export-Package: com.amazonaws.services.kinesis,
|
||||
com.amazonaws.services.kinesis.clientlibrary,
|
||||
com.amazonaws.services.kinesis.clientlibrary.config,
|
||||
|
|
|
|||
|
|
@ -29,6 +29,9 @@ For producer-side developers using the **[Kinesis Producer Library (KPL)][kinesi
|
|||
To make it easier for developers to write record processors in other languages, we have implemented a Java based daemon, called MultiLangDaemon that does all the heavy lifting. Our approach has the daemon spawn a sub-process, which in turn runs the record processor, which can be written in any language. The MultiLangDaemon process and the record processor sub-process communicate with each other over [STDIN and STDOUT using a defined protocol][multi-lang-protocol]. There will be a one to one correspondence amongst record processors, child processes, and shards. For Python developers specifically, we have abstracted these implementation details away and [expose an interface][kclpy] that enables you to focus on writing record processing logic in Python. This approach enables KCL to be language agnostic, while providing identical features and similar parallel processing model across all languages.
|
||||
|
||||
## Release Notes
|
||||
### Release 1.6.0 (July 31, 2015)
|
||||
* Restores compatibility with [dynamodb-streams-kinesis-adapter](https://github.com/awslabs/dynamodb-streams-kinesis-adapter) (which was broken in 1.4.0).
|
||||
|
||||
### Release 1.5.1 (July 20, 2015)
|
||||
* KCL maven artifact 1.5.0 does not work with JDK 7. This release addresses this issue.
|
||||
|
||||
|
|
|
|||
4
pom.xml
4
pom.xml
|
|
@ -6,7 +6,7 @@
|
|||
<artifactId>amazon-kinesis-client</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
<name>Amazon Kinesis Client Library for Java</name>
|
||||
<version>1.5.1</version>
|
||||
<version>1.6.0</version>
|
||||
<description>The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data from Amazon Kinesis.</description>
|
||||
<url>https://aws.amazon.com/kinesis</url>
|
||||
|
||||
|
|
@ -23,7 +23,7 @@
|
|||
</licenses>
|
||||
|
||||
<properties>
|
||||
<aws-java-sdk.version>1.9.37</aws-java-sdk.version>
|
||||
<aws-java-sdk.version>1.10.8</aws-java-sdk.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
|||
|
|
@ -119,7 +119,7 @@ public class KinesisClientLibConfiguration {
|
|||
/**
|
||||
* User agent set when Amazon Kinesis Client Library makes AWS requests.
|
||||
*/
|
||||
public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.5.1";
|
||||
public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.6.0";
|
||||
|
||||
/**
|
||||
* KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls
|
||||
|
|
|
|||
|
|
@ -15,7 +15,6 @@
|
|||
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||
|
||||
import java.math.BigInteger;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.ListIterator;
|
||||
|
|
@ -98,7 +97,9 @@ class ProcessTask implements ITask {
|
|||
*
|
||||
* @see com.amazonaws.services.kinesis.clientlibrary.lib.worker.ITask#call()
|
||||
*/
|
||||
|
||||
// CHECKSTYLE:OFF CyclomaticComplexity
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public TaskResult call() {
|
||||
long startTimeMillis = System.currentTimeMillis();
|
||||
|
|
@ -117,9 +118,11 @@ class ProcessTask implements ITask {
|
|||
}
|
||||
|
||||
final GetRecordsResult getRecordsResult = getRecordsResult();
|
||||
final List<Record> records = getRecordsResult.getRecords();
|
||||
|
||||
if (records.isEmpty()) {
|
||||
List<Record> records = getRecordsResult.getRecords();
|
||||
|
||||
if (!records.isEmpty()) {
|
||||
scope.addData(RECORDS_PROCESSED_METRIC, records.size(), StandardUnit.Count, MetricsLevel.SUMMARY);
|
||||
} else {
|
||||
LOG.debug("Kinesis didn't return any records for shard " + shardInfo.getShardId());
|
||||
|
||||
long sleepTimeMillis =
|
||||
|
|
@ -135,35 +138,30 @@ class ProcessTask implements ITask {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
int numKinesisRecords = records.size();
|
||||
int numUserRecords = 0;
|
||||
List<UserRecord> subRecords = new ArrayList<>();
|
||||
|
||||
// If we got more records, record the max extended sequence number. Sleep if there are no records.
|
||||
if (!records.isEmpty()) {
|
||||
scope.addData(RECORDS_PROCESSED_METRIC, numKinesisRecords, StandardUnit.Count, MetricsLevel.SUMMARY);
|
||||
|
||||
// We deaggregate if and only if we got actual Kinesis records, i.e.
|
||||
// not instances of some subclass thereof.
|
||||
if (!records.isEmpty() && records.get(0).getClass().equals(Record.class)) {
|
||||
if (this.shard != null) {
|
||||
subRecords = UserRecord.deaggregate(records,
|
||||
records = (List<Record>) (List<?>) UserRecord.deaggregate(records,
|
||||
new BigInteger(this.shard.getHashKeyRange().getStartingHashKey()),
|
||||
new BigInteger(this.shard.getHashKeyRange().getEndingHashKey()));
|
||||
} else {
|
||||
subRecords = UserRecord.deaggregate(records);
|
||||
records = (List<Record>) (List<?>) UserRecord.deaggregate(records);
|
||||
}
|
||||
recordProcessorCheckpointer.setLargestPermittedCheckpointValue(
|
||||
filterAndGetMaxExtendedSequenceNumber(scope, subRecords,
|
||||
recordProcessorCheckpointer.getLastCheckpointValue()));
|
||||
numUserRecords = subRecords.size();
|
||||
}
|
||||
|
||||
recordProcessorCheckpointer.setLargestPermittedCheckpointValue(
|
||||
filterAndGetMaxExtendedSequenceNumber(scope, records,
|
||||
recordProcessorCheckpointer.getLastCheckpointValue()));
|
||||
|
||||
if ((!subRecords.isEmpty()) || streamConfig.shouldCallProcessRecordsEvenForEmptyRecordList()) {
|
||||
LOG.debug("Calling application processRecords() with " + numKinesisRecords + " Kinesis records ("
|
||||
+ numUserRecords + " user records) from " + shardInfo.getShardId());
|
||||
@SuppressWarnings("unchecked")
|
||||
if ((!records.isEmpty()) || streamConfig.shouldCallProcessRecordsEvenForEmptyRecordList()) {
|
||||
LOG.debug("Calling application processRecords() with " + records.size()
|
||||
+ " records from " + shardInfo.getShardId());
|
||||
final ProcessRecordsInput processRecordsInput = new ProcessRecordsInput()
|
||||
.withRecords((List<Record>) (List<?>) subRecords)
|
||||
.withCheckpointer(recordProcessorCheckpointer)
|
||||
.withMillisBehindLatest(getRecordsResult.getMillisBehindLatest());
|
||||
.withRecords(records)
|
||||
.withCheckpointer(recordProcessorCheckpointer)
|
||||
.withMillisBehindLatest(getRecordsResult.getMillisBehindLatest());
|
||||
|
||||
final long recordProcessorStartTimeMillis = System.currentTimeMillis();
|
||||
try {
|
||||
|
|
@ -172,7 +170,7 @@ class ProcessTask implements ITask {
|
|||
LOG.error("ShardId " + shardInfo.getShardId()
|
||||
+ ": Application processRecords() threw an exception when processing shard ", e);
|
||||
LOG.error("ShardId " + shardInfo.getShardId() + ": Skipping over the following data records: "
|
||||
+ subRecords);
|
||||
+ records);
|
||||
} finally {
|
||||
MetricsHelper.addLatencyPerShard(shardInfo.getShardId(), RECORD_PROCESSOR_PROCESS_RECORDS_METRIC,
|
||||
recordProcessorStartTimeMillis, MetricsLevel.SUMMARY);
|
||||
|
|
@ -208,14 +206,17 @@ class ProcessTask implements ITask {
|
|||
* @param lastCheckpointValue the most recent checkpoint value
|
||||
* @return the largest extended sequence number among the retained records
|
||||
*/
|
||||
private ExtendedSequenceNumber filterAndGetMaxExtendedSequenceNumber(IMetricsScope scope, List<UserRecord> records,
|
||||
private ExtendedSequenceNumber filterAndGetMaxExtendedSequenceNumber(IMetricsScope scope, List<Record> records,
|
||||
final ExtendedSequenceNumber lastCheckpointValue) {
|
||||
ExtendedSequenceNumber largestExtendedSequenceNumber = lastCheckpointValue;
|
||||
ListIterator<UserRecord> recordIterator = records.listIterator();
|
||||
ListIterator<Record> recordIterator = records.listIterator();
|
||||
while (recordIterator.hasNext()) {
|
||||
UserRecord record = recordIterator.next();
|
||||
ExtendedSequenceNumber extendedSequenceNumber
|
||||
= new ExtendedSequenceNumber(record.getSequenceNumber(), record.getSubSequenceNumber());
|
||||
Record record = recordIterator.next();
|
||||
ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber(
|
||||
record.getSequenceNumber(),
|
||||
record instanceof UserRecord
|
||||
? ((UserRecord) record).getSubSequenceNumber()
|
||||
: null);
|
||||
|
||||
if (extendedSequenceNumber.compareTo(lastCheckpointValue) <= 0) {
|
||||
recordIterator.remove();
|
||||
|
|
|
|||
|
|
@ -0,0 +1,32 @@
|
|||
package com.amazonaws.services.kinesis.clientlibrary.utils;
|
||||
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
|
||||
/**
|
||||
* Custom thread factory that sets thread names based on the specified prefix.
|
||||
*/
|
||||
public class NamedThreadFactory implements ThreadFactory {
|
||||
|
||||
private String threadPrefix;
|
||||
private ThreadFactory defaultFactory = Executors.defaultThreadFactory();
|
||||
private AtomicInteger counter = new AtomicInteger(0);
|
||||
|
||||
/**
|
||||
* Construct a thread factory that uses the specified parameter as the thread prefix.
|
||||
*
|
||||
* @param threadPrefix the prefix with witch all created threads will be named
|
||||
*/
|
||||
public NamedThreadFactory(String threadPrefix) {
|
||||
this.threadPrefix = threadPrefix;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Thread newThread(Runnable r) {
|
||||
Thread thread = defaultFactory.newThread(r);
|
||||
thread.setName(threadPrefix + counter.incrementAndGet());
|
||||
return thread;
|
||||
}
|
||||
}
|
||||
|
|
@ -19,11 +19,13 @@ import java.util.Map;
|
|||
import java.util.UUID;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import com.amazonaws.services.kinesis.clientlibrary.utils.NamedThreadFactory;
|
||||
import com.amazonaws.services.kinesis.leases.exceptions.DependencyException;
|
||||
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
|
||||
import com.amazonaws.services.kinesis.leases.exceptions.LeasingException;
|
||||
|
|
@ -56,6 +58,8 @@ public class LeaseCoordinator<T extends Lease> {
|
|||
// Time to wait for in-flight Runnables to finish when calling .stop();
|
||||
private static final long STOP_WAIT_TIME_MILLIS = 2000L;
|
||||
|
||||
private static final ThreadFactory THREAD_FACTORY = new NamedThreadFactory("LeaseCoordinator-");
|
||||
|
||||
private final ILeaseRenewer<T> leaseRenewer;
|
||||
private final ILeaseTaker<T> leaseTaker;
|
||||
private final long renewerIntervalMillis;
|
||||
|
|
@ -64,7 +68,7 @@ public class LeaseCoordinator<T extends Lease> {
|
|||
protected final IMetricsFactory metricsFactory;
|
||||
|
||||
private ScheduledExecutorService threadpool;
|
||||
private boolean running = false;
|
||||
private volatile boolean running = false;
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
|
|
@ -148,7 +152,7 @@ public class LeaseCoordinator<T extends Lease> {
|
|||
leaseRenewer.initialize();
|
||||
|
||||
// 2 because we know we'll have at most 2 concurrent tasks at a time.
|
||||
threadpool = Executors.newScheduledThreadPool(2);
|
||||
threadpool = Executors.newScheduledThreadPool(2, THREAD_FACTORY);
|
||||
|
||||
// Taker runs with fixed DELAY because we want it to run slower in the event of performance degredation.
|
||||
threadpool.scheduleWithFixedDelay(new TakerRunnable(), 0L, takerIntervalMillis, TimeUnit.MILLISECONDS);
|
||||
|
|
|
|||
Loading…
Reference in a new issue