From 1861f12db7748f240e4963a9cf82b9f11eae7760 Mon Sep 17 00:00:00 2001 From: "Dosani, Adnan" Date: Fri, 22 May 2015 02:09:47 -0700 Subject: [PATCH] Version 1.3.0 of the Amazon Kinesis Client Library A new metric called "MillisBehindLatest", which tracks how far consumers are from real time, is now uploaded to CloudWatch. --- META-INF/MANIFEST.MF | 4 ++-- README.md | 3 +++ pom.xml | 4 ++-- .../lib/worker/KinesisClientLibConfiguration.java | 2 +- .../lib/worker/KinesisDataFetcher.java | 9 ++------- .../clientlibrary/lib/worker/ProcessTask.java | 14 ++++++++++++-- .../kinesis/leases/impl/KinesisClientLease.java | 2 +- .../leases/impl/KinesisClientLeaseSerializer.java | 8 ++------ .../services/kinesis/leases/impl/Lease.java | 6 ++---- .../kinesis/leases/impl/LeaseManager.java | 11 ++++++----- .../kinesis/leases/impl/LeaseRenewer.java | 3 ++- .../services/kinesis/leases/impl/LeaseTaker.java | 3 ++- .../kinesis/metrics/impl/MetricsHelper.java | 15 +++++++-------- 13 files changed, 44 insertions(+), 40 deletions(-) diff --git a/META-INF/MANIFEST.MF b/META-INF/MANIFEST.MF index c7a8dc48..165ab34a 100644 --- a/META-INF/MANIFEST.MF +++ b/META-INF/MANIFEST.MF @@ -2,7 +2,7 @@ Manifest-Version: 1.0 Bundle-ManifestVersion: 2 Bundle-Name: Amazon Kinesis Client Library for Java Bundle-SymbolicName: com.amazonaws.kinesisclientlibrary;singleton:=true -Bundle-Version: 1.2.1 +Bundle-Version: 1.3.0 Bundle-Vendor: Amazon Technologies, Inc Bundle-RequiredExecutionEnvironment: JavaSE-1.7 Require-Bundle: org.apache.commons.codec;bundle-version="1.6", @@ -12,7 +12,7 @@ Require-Bundle: org.apache.commons.codec;bundle-version="1.6", com.fasterxml.jackson.core.jackson-annotations;bundle-version="2.3.0", org.apache.httpcomponents.httpcore;bundle-version="4.3.2", org.apache.httpcomponents.httpclient;bundle-version="4.3.4" - com.amazonaws.sdk;bundle-version="1.9.16", + com.amazonaws.sdk;bundle-version="1.9.37", Export-Package: com.amazonaws.services.kinesis, com.amazonaws.services.kinesis.clientlibrary, com.amazonaws.services.kinesis.clientlibrary.config, diff --git a/README.md b/README.md index b2e1bcf0..db248be0 100644 --- a/README.md +++ b/README.md @@ -26,6 +26,9 @@ After you've downloaded the code from GitHub, you can build it using Maven. To d To make it easier for developers to write record processors in other languages, we have implemented a Java based daemon, called MultiLangDaemon that does all the heavy lifting. Our approach has the daemon spawn a sub-process, which in turn runs the record processor, which can be written in any language. The MultiLangDaemon process and the record processor sub-process communicate with each other over [STDIN and STDOUT using a defined protocol][multi-lang-protocol]. There will be a one to one correspondence amongst record processors, child processes, and shards. For Python developers specifically, we have abstracted these implementation details away and [expose an interface][kclpy] that enables you to focus on writing record processing logic in Python. This approach enables KCL to be language agnostic, while providing identical features and similar parallel processing model across all languages. ## Release Notes +### Release 1.3.0 (May 22, 2015) +* A new metric called "MillisBehindLatest", which tracks how far consumers are from real time, is now uploaded to CloudWatch. + ### Release 1.2.1 (January 26, 2015) * **MultiLangDaemon** Changes to the MultiLangDaemon to make it easier to provide a custom worker. diff --git a/pom.xml b/pom.xml index 4f8ccf63..0eef743b 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ amazon-kinesis-client jar Amazon Kinesis Client Library for Java - 1.2.1 + 1.3.0 The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data from Amazon Kinesis. https://aws.amazon.com/kinesis @@ -23,7 +23,7 @@ - 1.9.16 + 1.9.37 diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index 74a3f2cd..8f08ebb6 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -85,7 +85,7 @@ public class KinesisClientLibConfiguration { /** * User agent set when Amazon Kinesis Client Library makes AWS requests. */ - public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.2.1"; + public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.3.0"; /** * KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java index 7b23c32f..d890ac16 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java @@ -14,13 +14,10 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; -import java.util.List; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import com.amazonaws.services.kinesis.model.GetRecordsResult; -import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.model.ResourceNotFoundException; import com.amazonaws.services.kinesis.model.ShardIteratorType; import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.SentinelCheckpoint; @@ -57,17 +54,15 @@ class KinesisDataFetcher { * @param maxRecords Max records to fetch * @return list of records of up to maxRecords size */ - public List getRecords(int maxRecords) { + public GetRecordsResult getRecords(int maxRecords) { if (!isInitialized) { throw new IllegalArgumentException("KinesisDataFetcher.getRecords called before initialization."); } - List records = null; GetRecordsResult response = null; if (nextIterator != null) { try { response = kinesisProxy.get(nextIterator, maxRecords); - records = response.getRecords(); nextIterator = response.getNextShardIterator(); } catch (ResourceNotFoundException e) { LOG.info("Caught ResourceNotFoundException when fetching records for shard " + shardId); @@ -80,7 +75,7 @@ class KinesisDataFetcher { isShardEndReached = true; } - return records; + return response; } /** diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java index d3f72d64..2886c299 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java @@ -22,6 +22,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import com.amazonaws.services.kinesis.model.ExpiredIteratorException; +import com.amazonaws.services.kinesis.model.GetRecordsResult; import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.cloudwatch.model.StandardUnit; import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibException; @@ -37,6 +38,8 @@ class ProcessTask implements ITask { private static final String EXPIRED_ITERATOR_METRIC = "ExpiredIterator"; private static final String DATA_BYTES_PROCESSED_METRIC = "DataBytesProcessed"; private static final String RECORDS_PROCESSED_METRIC = "RecordsProcessed"; + private static final String MILLIS_BEHIND_LATEST_METRIC = "MillisBehindLatest"; + private static final Log LOG = LogFactory.getLog(ProcessTask.class); private final ShardInfo shardInfo; @@ -93,7 +96,14 @@ class ProcessTask implements ITask { boolean shardEndReached = true; return new TaskResult(null, shardEndReached); } - List records = getRecords(); + final GetRecordsResult getRecordsResult = getRecords(); + + if (getRecordsResult.getMillisBehindLatest() != null) { + scope.addData(MILLIS_BEHIND_LATEST_METRIC, getRecordsResult.getMillisBehindLatest(), + StandardUnit.Milliseconds); + } + + final List records = getRecordsResult.getRecords(); if (records.isEmpty()) { LOG.debug("Kinesis didn't return any records for shard " + shardInfo.getShardId()); @@ -180,7 +190,7 @@ class ProcessTask implements ITask { * @throws KinesisClientLibException if reading checkpoints fails in the edge case where we haven't passed any * records to the client code yet */ - private List getRecords() throws KinesisClientLibException { + private GetRecordsResult getRecords() throws KinesisClientLibException { int maxRecords = streamConfig.getMaxRecords(); try { return dataFetcher.getRecords(maxRecords); diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLease.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLease.java index a24dd928..bd1c097d 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLease.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLease.java @@ -49,7 +49,7 @@ public class KinesisClientLease extends Lease { } KinesisClientLease casted = (KinesisClientLease) other; - // Do not update ownerSwitchesSinceCheckpoint here - that field is maintained by the leasing library. + setOwnerSwitchesSinceCheckpoint(casted.ownerSwitchesSinceCheckpoint); setCheckpoint(casted.checkpoint); setParentShardIds(casted.parentShardIds); } diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseSerializer.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseSerializer.java index 55f8abc8..28e55d19 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseSerializer.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseSerializer.java @@ -95,16 +95,12 @@ public class KinesisClientLeaseSerializer implements ILeaseSerializer getDynamoTakeLeaseUpdate(KinesisClientLease lease, String newOwner) { Map result = baseSerializer.getDynamoTakeLeaseUpdate(lease, newOwner); - Long ownerSwitchesSinceCheckpoint = lease.getOwnerSwitchesSinceCheckpoint(); String oldOwner = lease.getLeaseOwner(); if (oldOwner != null && !oldOwner.equals(newOwner)) { - ownerSwitchesSinceCheckpoint++; + result.put(OWNER_SWITCHES_KEY, new AttributeValueUpdate(DynamoUtils.createAttributeValue(1L), + AttributeAction.ADD)); } - result.put(OWNER_SWITCHES_KEY, - new AttributeValueUpdate(DynamoUtils.createAttributeValue(ownerSwitchesSinceCheckpoint), - AttributeAction.PUT)); - return result; } diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/Lease.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/Lease.java index b5f1a5ae..3cb365fb 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/Lease.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/Lease.java @@ -15,6 +15,7 @@ package com.amazonaws.services.kinesis.leases.impl; import java.util.UUID; +import java.util.concurrent.TimeUnit; import com.amazonaws.util.json.JSONObject; @@ -31,11 +32,8 @@ public class Lease { * * Sometimes System.nanoTime's return values will wrap due to overflow. When they do, the difference between two * values will be very large. We will consider leases to be expired if they are more than a year old. - * - * 365 days per year * 24 hours per day * 60 minutes per hour * 60 seconds per minute * 1000000000 - * nanoseconds/second */ - private static final long MAX_ABS_AGE_NANOS = 365 * 24 * 60 * 60 * 1000000000L; + private static final long MAX_ABS_AGE_NANOS = TimeUnit.DAYS.toNanos(365); private String leaseKey; private String leaseOwner; diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java index bb06fb4f..e2970def 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java @@ -17,6 +17,7 @@ package com.amazonaws.services.kinesis.leases.impl; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -160,14 +161,14 @@ public class LeaseManager implements ILeaseManager { @Override public boolean waitUntilLeaseTableExists(long secondsBetweenPolls, long timeoutSeconds) throws DependencyException { - long sleepTimeRemaining = timeoutSeconds * 1000; + long sleepTimeRemaining = TimeUnit.SECONDS.toMillis(timeoutSeconds); while (!leaseTableExists()) { if (sleepTimeRemaining <= 0) { return false; } - long timeToSleepMillis = Math.min(1000 * secondsBetweenPolls, sleepTimeRemaining); + long timeToSleepMillis = Math.min(TimeUnit.SECONDS.toMillis(secondsBetweenPolls), sleepTimeRemaining); sleepTimeRemaining -= sleep(timeToSleepMillis); } @@ -385,7 +386,7 @@ public class LeaseManager implements ILeaseManager { verifyNotNull(owner, "owner cannot be null"); if (LOG.isDebugEnabled()) { - LOG.debug(String.format("Taking lease with shardId %s from %s to %s", + LOG.debug(String.format("Taking lease with leaseKey %s from %s to %s", lease.getLeaseKey(), lease.getLeaseOwner() == null ? "nobody" : lease.getLeaseOwner(), owner)); @@ -428,7 +429,7 @@ public class LeaseManager implements ILeaseManager { verifyNotNull(lease, "lease cannot be null"); if (LOG.isDebugEnabled()) { - LOG.debug(String.format("Voiding lease with shardId %s owned by %s", + LOG.debug(String.format("Evicting lease with leaseKey %s owned by %s", lease.getLeaseKey(), lease.getLeaseOwner())); } @@ -485,7 +486,7 @@ public class LeaseManager implements ILeaseManager { verifyNotNull(lease, "lease cannot be null"); if (LOG.isDebugEnabled()) { - LOG.debug(String.format("Deleting lease with shardId %s", lease.getLeaseKey())); + LOG.debug(String.format("Deleting lease with leaseKey %s", lease.getLeaseKey())); } DeleteItemRequest deleteRequest = new DeleteItemRequest(); diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseRenewer.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseRenewer.java index 7f640074..e36deeb6 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseRenewer.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseRenewer.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -57,7 +58,7 @@ public class LeaseRenewer implements ILeaseRenewer { public LeaseRenewer(ILeaseManager leaseManager, String workerIdentifier, long leaseDurationMillis) { this.leaseManager = leaseManager; this.workerIdentifier = workerIdentifier; - this.leaseDurationNanos = leaseDurationMillis * 1000000L; + this.leaseDurationNanos = TimeUnit.MILLISECONDS.toNanos(leaseDurationMillis); } /** diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseTaker.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseTaker.java index 4617f927..7ae97f79 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseTaker.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseTaker.java @@ -25,6 +25,7 @@ import java.util.Map.Entry; import java.util.Random; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -68,7 +69,7 @@ public class LeaseTaker implements ILeaseTaker { public LeaseTaker(ILeaseManager leaseManager, String workerIdentifier, long leaseDurationMillis) { this.leaseManager = leaseManager; this.workerIdentifier = workerIdentifier; - this.leaseDurationNanos = leaseDurationMillis * 1000000; + this.leaseDurationNanos = TimeUnit.MILLISECONDS.toNanos(leaseDurationMillis); } /** diff --git a/src/main/java/com/amazonaws/services/kinesis/metrics/impl/MetricsHelper.java b/src/main/java/com/amazonaws/services/kinesis/metrics/impl/MetricsHelper.java index f1f44b0a..a4d9d9d2 100644 --- a/src/main/java/com/amazonaws/services/kinesis/metrics/impl/MetricsHelper.java +++ b/src/main/java/com/amazonaws/services/kinesis/metrics/impl/MetricsHelper.java @@ -80,16 +80,16 @@ public class MetricsHelper { public static void addSuccessAndLatency(String prefix, long startTimeMillis, boolean success) { addSuccessAndLatencyPerShard(null, prefix, startTimeMillis, success); } - + public static void addSuccessAndLatencyPerShard ( - String shardId, - String prefix, - long startTimeMillis, + String shardId, + String prefix, + long startTimeMillis, boolean success) { IMetricsScope scope = getMetricsScope(); String realPrefix = prefix == null ? "" : prefix + SEP; - + if (shardId != null) { scope.addDimension("ShardId", shardId); } @@ -103,10 +103,9 @@ public class MetricsHelper { public static void endScope() { IMetricsScope scope = getMetricsScope(); if (scope != null) { - Integer refCount = referenceCount.get(); - refCount--; + referenceCount.set(referenceCount.get() - 1); - if (refCount == 0) { + if (referenceCount.get() == 0) { scope.end(); currentScope.remove(); }