Version 1.4.0 of the Amazon Kinesis Client Library
This commit is contained in:
parent
1861f12db7
commit
4dfc17d04a
34 changed files with 4309 additions and 213 deletions
|
|
@ -2,7 +2,7 @@ Manifest-Version: 1.0
|
|||
Bundle-ManifestVersion: 2
|
||||
Bundle-Name: Amazon Kinesis Client Library for Java
|
||||
Bundle-SymbolicName: com.amazonaws.kinesisclientlibrary;singleton:=true
|
||||
Bundle-Version: 1.3.0
|
||||
Bundle-Version: 1.4.0
|
||||
Bundle-Vendor: Amazon Technologies, Inc
|
||||
Bundle-RequiredExecutionEnvironment: JavaSE-1.7
|
||||
Require-Bundle: org.apache.commons.codec;bundle-version="1.6",
|
||||
|
|
|
|||
12
README.md
12
README.md
|
|
@ -22,10 +22,20 @@ The **Amazon Kinesis Client Library for Java** (Amazon KCL) enables Java develop
|
|||
|
||||
After you've downloaded the code from GitHub, you can build it using Maven. To disable GPG signing in the build, use this command: `mvn clean install -Dgpg.skip=true`
|
||||
|
||||
## Integration with the Kinesis Producer Library
|
||||
For producer-side developers using the **[Kinesis Producer Library (KPL)][kinesis-guide-kpl]**, the KCL integrates without additional effort. When the KCL retrieves an aggregated Amazon Kinesis record consisting of multiple KPL user records, it will automatically invoke the KPL to extract the individual user records before returning them to the user.
|
||||
|
||||
## Amazon KCL support for other languages
|
||||
To make it easier for developers to write record processors in other languages, we have implemented a Java based daemon, called MultiLangDaemon that does all the heavy lifting. Our approach has the daemon spawn a sub-process, which in turn runs the record processor, which can be written in any language. The MultiLangDaemon process and the record processor sub-process communicate with each other over [STDIN and STDOUT using a defined protocol][multi-lang-protocol]. There will be a one to one correspondence amongst record processors, child processes, and shards. For Python developers specifically, we have abstracted these implementation details away and [expose an interface][kclpy] that enables you to focus on writing record processing logic in Python. This approach enables KCL to be language agnostic, while providing identical features and similar parallel processing model across all languages.
|
||||
|
||||
## Release Notes
|
||||
### Release 1.4.0 (June 2, 2015)
|
||||
* Integration with the **[Kinesis Producer Library (KPL)][kinesis-guide-kpl]**
|
||||
* Automatically de-aggregate records put into the Kinesis stream using the KPL.
|
||||
* Support checkpointing at the individual user record level when multiple user records are aggregated into one Kinesis record using the KPL.
|
||||
|
||||
See [Consumer De-aggregation with the KCL][kinesis-guide-consumer-deaggregation] for details.
|
||||
|
||||
### Release 1.3.0 (May 22, 2015)
|
||||
* A new metric called "MillisBehindLatest", which tracks how far consumers are from real time, is now uploaded to CloudWatch.
|
||||
|
||||
|
|
@ -47,6 +57,8 @@ To make it easier for developers to write record processors in other languages,
|
|||
[kinesis-guide-begin]: http://docs.aws.amazon.com/kinesis/latest/dev/before-you-begin.html
|
||||
[kinesis-guide-create]: http://docs.aws.amazon.com/kinesis/latest/dev/step-one-create-stream.html
|
||||
[kinesis-guide-applications]: http://docs.aws.amazon.com/kinesis/latest/dev/kinesis-record-processor-app.html
|
||||
[kinesis-guide-kpl]: http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html
|
||||
[kinesis-guide-consumer-deaggregation]: http://docs.aws.amazon.com//kinesis/latest/dev/kinesis-kpl-consumer-deaggregation.html
|
||||
[kclpy]: https://github.com/awslabs/amazon-kinesis-client-python
|
||||
[multi-lang-protocol]: https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/multilang/package-info.java
|
||||
|
||||
|
|
|
|||
12
pom.xml
12
pom.xml
|
|
@ -6,7 +6,7 @@
|
|||
<artifactId>amazon-kinesis-client</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
<name>Amazon Kinesis Client Library for Java</name>
|
||||
<version>1.3.0</version>
|
||||
<version>1.4.0</version>
|
||||
<description>The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data from Amazon Kinesis.</description>
|
||||
<url>https://aws.amazon.com/kinesis</url>
|
||||
|
||||
|
|
@ -32,6 +32,16 @@
|
|||
<artifactId>aws-java-sdk</artifactId>
|
||||
<version>${aws-java-sdk.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.protobuf</groupId>
|
||||
<artifactId>protobuf-java</artifactId>
|
||||
<version>2.6.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>commons-lang</groupId>
|
||||
<artifactId>commons-lang</artifactId>
|
||||
<version>2.6</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<developers>
|
||||
|
|
|
|||
|
|
@ -44,5 +44,4 @@ public class BlockedOnParentShardException extends KinesisClientLibRetryableExce
|
|||
super(message, e);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@
|
|||
package com.amazonaws.services.kinesis.clientlibrary.interfaces;
|
||||
|
||||
import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibException;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
|
||||
|
||||
/**
|
||||
* Interface for checkpoint trackers.
|
||||
|
|
@ -22,17 +23,17 @@ import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibE
|
|||
public interface ICheckpoint {
|
||||
|
||||
/**
|
||||
* Record a checkpoint for a shard (e.g. sequence number of last record processed by application).
|
||||
* Upon failover, record processing is resumed from this point.
|
||||
* Record a checkpoint for a shard (e.g. sequence and subsequence numbers of last record processed
|
||||
* by application). Upon failover, record processing is resumed from this point.
|
||||
*
|
||||
* @param shardId Checkpoint is specified for this shard.
|
||||
* @param checkpointValue Value of the checkpoint (e.g. Kinesis sequence number)
|
||||
* @param checkpointValue Value of the checkpoint (e.g. Kinesis sequence number and subsequence number)
|
||||
* @param concurrencyToken Used with conditional writes to prevent stale updates
|
||||
* (e.g. if there was a fail over to a different record processor, we don't want to
|
||||
* overwrite it's checkpoint)
|
||||
* @throws KinesisClientLibException Thrown if we were unable to save the checkpoint
|
||||
*/
|
||||
void setCheckpoint(String shardId, String checkpointValue, String concurrencyToken)
|
||||
void setCheckpoint(String shardId, ExtendedSequenceNumber checkpointValue, String concurrencyToken)
|
||||
throws KinesisClientLibException;
|
||||
|
||||
/**
|
||||
|
|
@ -43,6 +44,6 @@ public interface ICheckpoint {
|
|||
* @return Current checkpoint for this shard, null if there is no record for this shard.
|
||||
* @throws KinesisClientLibException Thrown if we are unable to fetch the checkpoint
|
||||
*/
|
||||
String getCheckpoint(String shardId) throws KinesisClientLibException;
|
||||
ExtendedSequenceNumber getCheckpoint(String shardId) throws KinesisClientLibException;
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -14,10 +14,11 @@
|
|||
*/
|
||||
package com.amazonaws.services.kinesis.clientlibrary.interfaces;
|
||||
|
||||
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
|
||||
import com.amazonaws.services.kinesis.model.Record;
|
||||
|
||||
/**
|
||||
* Used by RecordProcessors when they want to checkpoint their progress.
|
||||
|
|
@ -47,6 +48,26 @@ public interface IRecordProcessorCheckpointer {
|
|||
void checkpoint()
|
||||
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException;
|
||||
|
||||
/**
|
||||
* This method will checkpoint the progress at the provided record. This method is analogous to
|
||||
* {@link #checkpoint()} but provides the ability to specify the record at which to
|
||||
* checkpoint.
|
||||
*
|
||||
* @param record A record at which to checkpoint in this shard. Upon failover,
|
||||
* the Kinesis Client Library will start fetching records after this record's sequence number.
|
||||
* @throws ThrottlingException Can't store checkpoint. Can be caused by checkpointing too frequently.
|
||||
* Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency.
|
||||
* @throws ShutdownException The record processor instance has been shutdown. Another instance may have
|
||||
* started processing some of these records already.
|
||||
* The application should abort processing via this RecordProcessor instance.
|
||||
* @throws InvalidStateException Can't store checkpoint.
|
||||
* Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist).
|
||||
* @throws KinesisClientLibDependencyException Encountered an issue when storing the checkpoint. The application can
|
||||
* backoff and retry.
|
||||
*/
|
||||
void checkpoint(Record record)
|
||||
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException;
|
||||
|
||||
/**
|
||||
* This method will checkpoint the progress at the provided sequenceNumber. This method is analogous to
|
||||
* {@link #checkpoint()} but provides the ability to specify the sequence number at which to
|
||||
|
|
@ -72,4 +93,31 @@ public interface IRecordProcessorCheckpointer {
|
|||
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException,
|
||||
IllegalArgumentException;
|
||||
|
||||
|
||||
/**
|
||||
* This method will checkpoint the progress at the provided sequenceNumber and subSequenceNumber, the latter for
|
||||
* aggregated records produced with the Producer Library. This method is analogous to {@link #checkpoint()}
|
||||
* but provides the ability to specify the sequence and subsequence numbers at which to checkpoint.
|
||||
*
|
||||
* @param sequenceNumber A sequence number at which to checkpoint in this shard. Upon failover, the Kinesis
|
||||
* Client Library will start fetching records after the given sequence and subsequence numbers.
|
||||
* @param subSequenceNumber A subsequence number at which to checkpoint within this shard. Upon failover, the
|
||||
* Kinesis Client Library will start fetching records after the given sequence and subsequence numbers.
|
||||
* @throws ThrottlingException Can't store checkpoint. Can be caused by checkpointing too frequently.
|
||||
* Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency.
|
||||
* @throws ShutdownException The record processor instance has been shutdown. Another instance may have
|
||||
* started processing some of these records already.
|
||||
* The application should abort processing via this RecordProcessor instance.
|
||||
* @throws InvalidStateException Can't store checkpoint.
|
||||
* Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist).
|
||||
* @throws KinesisClientLibDependencyException Encountered an issue when storing the checkpoint. The application can
|
||||
* backoff and retry.
|
||||
* @throws IllegalArgumentException The sequence number is invalid for one of the following reasons:
|
||||
* 1.) It appears to be out of range, i.e. it is smaller than the last check point value, or larger than the
|
||||
* greatest sequence number seen by the associated record processor.
|
||||
* 2.) It is not a valid sequence number for a record in this shard.
|
||||
*/
|
||||
void checkpoint(String sequenceNumber, long subSequenceNumber)
|
||||
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException,
|
||||
IllegalArgumentException;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,55 @@
|
|||
/*
|
||||
* Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.clientlibrary.interfaces.v2;
|
||||
|
||||
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
|
||||
|
||||
/**
|
||||
* The Amazon Kinesis Client Library will instantiate record processors to process data records fetched from Amazon
|
||||
* Kinesis.
|
||||
*/
|
||||
public interface IRecordProcessor {
|
||||
|
||||
/**
|
||||
* Invoked by the Amazon Kinesis Client Library before data records are delivered to the RecordProcessor instance
|
||||
* (via processRecords).
|
||||
*
|
||||
* @param initializationInput Provides information related to initialization
|
||||
*/
|
||||
void initialize(InitializationInput initializationInput);
|
||||
|
||||
/**
|
||||
* Process data records. The Amazon Kinesis Client Library will invoke this method to deliver data records to the
|
||||
* application.
|
||||
* Upon fail over, the new instance will get records with sequence number > checkpoint position
|
||||
* for each partition key.
|
||||
*
|
||||
* @param processRecordsInput Provides the records to be processed as well as information and capabilities related
|
||||
* to them (eg checkpointing).
|
||||
*/
|
||||
void processRecords(ProcessRecordsInput processRecordsInput);
|
||||
|
||||
/**
|
||||
* Invoked by the Amazon Kinesis Client Library to indicate it will no longer send data records to this
|
||||
* RecordProcessor instance.
|
||||
*
|
||||
* @param shutdownInput Provides information and capabilities (eg checkpointing) related to shutdown of this record
|
||||
* processor.
|
||||
*/
|
||||
void shutdown(ShutdownInput shutdownInput);
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,31 @@
|
|||
/*
|
||||
* Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.clientlibrary.interfaces.v2;
|
||||
|
||||
|
||||
/**
|
||||
* The Amazon Kinesis Client Library will use this to instantiate a record processor per shard.
|
||||
* Clients may choose to create separate instantiations, or re-use instantiations.
|
||||
*/
|
||||
public interface IRecordProcessorFactory {
|
||||
|
||||
/**
|
||||
* Returns a record processor to be used for processing data records for a (assigned) shard.
|
||||
*
|
||||
* @return Returns a processor object.
|
||||
*/
|
||||
IRecordProcessor createProcessor();
|
||||
|
||||
}
|
||||
|
|
@ -18,7 +18,7 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.BlockedOnParentShardException;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.SentinelCheckpoint;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
|
||||
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
|
||||
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
|
||||
|
||||
|
|
@ -65,8 +65,8 @@ class BlockOnParentShardTask implements ITask {
|
|||
for (String shardId : shardInfo.getParentShardIds()) {
|
||||
KinesisClientLease lease = leaseManager.getLease(shardId);
|
||||
if (lease != null) {
|
||||
String checkpoint = lease.getCheckpoint();
|
||||
if ((checkpoint == null) || (!checkpoint.equals(SentinelCheckpoint.SHARD_END.toString()))) {
|
||||
ExtendedSequenceNumber checkpoint = lease.getCheckpoint();
|
||||
if ((checkpoint == null) || (!checkpoint.equals(ExtendedSequenceNumber.SHARD_END))) {
|
||||
LOG.debug("Shard " + shardId + " is not yet done. Its current checkpoint is " + checkpoint);
|
||||
blockedOnParentShard = true;
|
||||
exception = new BlockedOnParentShardException("Parent shard not yet done");
|
||||
|
|
|
|||
|
|
@ -18,7 +18,9 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
|
||||
|
||||
/**
|
||||
* Task for initializing shard position and invoking the RecordProcessor initialize() API.
|
||||
|
|
@ -55,7 +57,7 @@ class InitializeTask implements ITask {
|
|||
/*
|
||||
* Initializes the data fetcher (position in shard) and invokes the RecordProcessor initialize() API.
|
||||
* (non-Javadoc)
|
||||
*
|
||||
*
|
||||
* @see com.amazonaws.services.kinesis.clientlibrary.lib.worker.ITask#call()
|
||||
*/
|
||||
@Override
|
||||
|
|
@ -65,13 +67,17 @@ class InitializeTask implements ITask {
|
|||
|
||||
try {
|
||||
LOG.debug("Initializing ShardId " + shardInfo.getShardId());
|
||||
String initialCheckpoint = checkpoint.getCheckpoint(shardInfo.getShardId());
|
||||
dataFetcher.initialize(initialCheckpoint);
|
||||
ExtendedSequenceNumber initialCheckpoint = checkpoint.getCheckpoint(shardInfo.getShardId());
|
||||
|
||||
dataFetcher.initialize(initialCheckpoint.getSequenceNumber());
|
||||
recordProcessorCheckpointer.setLargestPermittedCheckpointValue(initialCheckpoint);
|
||||
recordProcessorCheckpointer.setInitialCheckpointValue(initialCheckpoint);
|
||||
try {
|
||||
LOG.debug("Calling the record processor initialize().");
|
||||
recordProcessor.initialize(shardInfo.getShardId());
|
||||
final InitializationInput initializationInput = new InitializationInput()
|
||||
.withShardId(shardInfo.getShardId())
|
||||
.withExtendedSequenceNumber(initialCheckpoint);
|
||||
recordProcessor.initialize(initializationInput);
|
||||
LOG.debug("Record processor initialize() completed.");
|
||||
} catch (Exception e) {
|
||||
applicationException = true;
|
||||
|
|
@ -99,7 +105,7 @@ class InitializeTask implements ITask {
|
|||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
*
|
||||
* @see com.amazonaws.services.kinesis.clientlibrary.lib.worker.ITask#getTaskType()
|
||||
*/
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -85,7 +85,7 @@ public class KinesisClientLibConfiguration {
|
|||
/**
|
||||
* User agent set when Amazon Kinesis Client Library makes AWS requests.
|
||||
*/
|
||||
public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.3.0";
|
||||
public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.4.0";
|
||||
|
||||
/**
|
||||
* KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls
|
||||
|
|
|
|||
|
|
@ -29,6 +29,7 @@ import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException
|
|||
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.KinesisClientLibIOException;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
|
||||
import com.amazonaws.services.kinesis.leases.exceptions.DependencyException;
|
||||
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
|
||||
import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException;
|
||||
|
|
@ -90,7 +91,7 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator<KinesisClientLea
|
|||
* @throws ProvisionedThroughputException if DynamoDB update fails due to lack of capacity
|
||||
* @throws DependencyException if DynamoDB update fails in an unexpected way
|
||||
*/
|
||||
boolean setCheckpoint(String shardId, String checkpoint, UUID concurrencyToken)
|
||||
boolean setCheckpoint(String shardId, ExtendedSequenceNumber checkpoint, UUID concurrencyToken)
|
||||
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
|
||||
KinesisClientLease lease = getCurrentlyHeldLease(shardId);
|
||||
if (lease == null) {
|
||||
|
|
@ -111,7 +112,7 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator<KinesisClientLea
|
|||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public void setCheckpoint(String shardId, String checkpointValue, String concurrencyToken)
|
||||
public void setCheckpoint(String shardId, ExtendedSequenceNumber checkpointValue, String concurrencyToken)
|
||||
throws KinesisClientLibException {
|
||||
try {
|
||||
boolean wasSuccessful = setCheckpoint(shardId, checkpointValue, UUID.fromString(concurrencyToken));
|
||||
|
|
@ -133,7 +134,7 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator<KinesisClientLea
|
|||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public String getCheckpoint(String shardId) throws KinesisClientLibException {
|
||||
public ExtendedSequenceNumber getCheckpoint(String shardId) throws KinesisClientLibException {
|
||||
try {
|
||||
return leaseManager.getLease(shardId).getCheckpoint();
|
||||
} catch (DependencyException | InvalidStateException | ProvisionedThroughputException e) {
|
||||
|
|
|
|||
|
|
@ -23,6 +23,8 @@ import com.amazonaws.services.kinesis.model.ShardIteratorType;
|
|||
import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.SentinelCheckpoint;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.proxies.MetricsCollectingKinesisProxyDecorator;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
|
||||
|
||||
/**
|
||||
* Used to get data from Amazon Kinesis. Tracks iterator state internally.
|
||||
*/
|
||||
|
|
@ -37,7 +39,7 @@ class KinesisDataFetcher {
|
|||
private boolean isInitialized;
|
||||
|
||||
/**
|
||||
*
|
||||
*
|
||||
* @param kinesisProxy Kinesis proxy
|
||||
* @param shardId shardId (we'll fetch data for this shard)
|
||||
* @param checkpoint used to get current checkpoint from which to start fetching records
|
||||
|
|
@ -50,7 +52,7 @@ class KinesisDataFetcher {
|
|||
|
||||
/**
|
||||
* Get records from the current position in the stream (up to maxRecords).
|
||||
*
|
||||
*
|
||||
* @param maxRecords Max records to fetch
|
||||
* @return list of records of up to maxRecords size
|
||||
*/
|
||||
|
|
@ -79,32 +81,28 @@ class KinesisDataFetcher {
|
|||
}
|
||||
|
||||
/**
|
||||
* Initializes this KinesisDataFetcher's iterator based on the checkpoint.
|
||||
* @param initialCheckpoint Current checkpoint for this shard.
|
||||
*
|
||||
* Initializes this KinesisDataFetcher's iterator based on the checkpointed sequence number.
|
||||
* @param initialCheckpoint Current checkpoint sequence number for this shard.
|
||||
*
|
||||
*/
|
||||
public void initialize(String initialCheckpoint) {
|
||||
|
||||
LOG.info("Initializing shard " + shardId + " with " + initialCheckpoint);
|
||||
advanceIteratorAfter(initialCheckpoint);
|
||||
advanceIteratorTo(initialCheckpoint);
|
||||
isInitialized = true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Advances this KinesisDataFetcher's internal iterator to be after the passed-in sequence number.
|
||||
*
|
||||
* @param sequenceNumber advance the iterator to the first record after this sequence number.
|
||||
*/
|
||||
private void advanceIteratorAfterSequenceNumber(String sequenceNumber) {
|
||||
nextIterator = getIterator(ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), sequenceNumber);
|
||||
public void initialize(ExtendedSequenceNumber initialCheckpoint) {
|
||||
LOG.info("Initializing shard " + shardId + " with " + initialCheckpoint.getSequenceNumber());
|
||||
advanceIteratorTo(initialCheckpoint.getSequenceNumber());
|
||||
isInitialized = true;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Advances this KinesisDataFetcher's internal iterator to be after the passed-in sequence number.
|
||||
*
|
||||
* @param sequenceNumber advance the iterator to the first record after this sequence number.
|
||||
* Advances this KinesisDataFetcher's internal iterator to be at the passed-in sequence number.
|
||||
*
|
||||
* @param sequenceNumber advance the iterator to the record at this sequence number.
|
||||
*/
|
||||
void advanceIteratorAfter(String sequenceNumber) {
|
||||
void advanceIteratorTo(String sequenceNumber) {
|
||||
if (sequenceNumber == null) {
|
||||
throw new IllegalArgumentException("SequenceNumber should not be null: shardId " + shardId);
|
||||
} else if (sequenceNumber.equals(SentinelCheckpoint.LATEST.toString())) {
|
||||
|
|
@ -114,24 +112,24 @@ class KinesisDataFetcher {
|
|||
} else if (sequenceNumber.equals(SentinelCheckpoint.SHARD_END.toString())) {
|
||||
nextIterator = null;
|
||||
} else {
|
||||
advanceIteratorAfterSequenceNumber(sequenceNumber);
|
||||
nextIterator = getIterator(ShardIteratorType.AT_SEQUENCE_NUMBER.toString(), sequenceNumber);
|
||||
}
|
||||
if (nextIterator == null) {
|
||||
isShardEndReached = true;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param iteratorType
|
||||
* @param sequenceNumber
|
||||
*
|
||||
*
|
||||
* @return iterator or null if we catch a ResourceNotFound exception
|
||||
*/
|
||||
private String getIterator(String iteratorType, String sequenceNumber) {
|
||||
String iterator = null;
|
||||
try {
|
||||
try {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Calling getIterator for " + shardId + ", iterator type " + iteratorType
|
||||
LOG.debug("Calling getIterator for " + shardId + ", iterator type " + iteratorType
|
||||
+ " and sequence number " + sequenceNumber);
|
||||
}
|
||||
iterator = kinesisProxy.getIterator(shardId, iteratorType, sequenceNumber);
|
||||
|
|
|
|||
|
|
@ -15,20 +15,27 @@
|
|||
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||
|
||||
import java.math.BigInteger;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.ListIterator;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import com.amazonaws.services.cloudwatch.model.StandardUnit;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibException;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxyExtended;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
|
||||
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
|
||||
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope;
|
||||
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
|
||||
import com.amazonaws.services.kinesis.model.GetRecordsResult;
|
||||
import com.amazonaws.services.kinesis.model.Record;
|
||||
import com.amazonaws.services.cloudwatch.model.StandardUnit;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibException;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
|
||||
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
|
||||
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope;
|
||||
import com.amazonaws.services.kinesis.model.Shard;
|
||||
|
||||
/**
|
||||
* Task for fetching data records and invoking processRecords() on the record processor instance.
|
||||
|
|
@ -49,6 +56,7 @@ class ProcessTask implements ITask {
|
|||
private final TaskType taskType = TaskType.PROCESS;
|
||||
private final StreamConfig streamConfig;
|
||||
private final long backoffTimeMillis;
|
||||
private final Shard shard;
|
||||
|
||||
/**
|
||||
* @param shardInfo contains information about the shard
|
||||
|
|
@ -72,11 +80,20 @@ class ProcessTask implements ITask {
|
|||
this.dataFetcher = dataFetcher;
|
||||
this.streamConfig = streamConfig;
|
||||
this.backoffTimeMillis = backoffTimeMillis;
|
||||
IKinesisProxy kinesisProxy = this.streamConfig.getStreamProxy();
|
||||
if (kinesisProxy instanceof IKinesisProxyExtended) {
|
||||
this.shard = ((IKinesisProxyExtended) kinesisProxy).getShard(this.shardInfo.getShardId());
|
||||
} else {
|
||||
LOG.warn("Cannot get the shard for this ProcessTask, so duplicate KPL user records "
|
||||
+ "in the event of resharding will not be dropped during deaggregation of Amazon "
|
||||
+ "Kinesis records.");
|
||||
this.shard = null;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
*
|
||||
* @see com.amazonaws.services.kinesis.clientlibrary.lib.worker.ITask#call()
|
||||
*/
|
||||
// CHECKSTYLE:OFF CyclomaticComplexity
|
||||
|
|
@ -96,6 +113,7 @@ class ProcessTask implements ITask {
|
|||
boolean shardEndReached = true;
|
||||
return new TaskResult(null, shardEndReached);
|
||||
}
|
||||
|
||||
final GetRecordsResult getRecordsResult = getRecords();
|
||||
|
||||
if (getRecordsResult.getMillisBehindLatest() != null) {
|
||||
|
|
@ -122,22 +140,41 @@ class ProcessTask implements ITask {
|
|||
}
|
||||
}
|
||||
|
||||
if ((!records.isEmpty()) || streamConfig.shouldCallProcessRecordsEvenForEmptyRecordList()) {
|
||||
int numKinesisRecords = records.size();
|
||||
int numUserRecords = 0;
|
||||
List<UserRecord> subRecords = new ArrayList<>();
|
||||
|
||||
// If we got more records, record the max sequence number. Sleep if there are no records.
|
||||
if (!records.isEmpty()) {
|
||||
String maxSequenceNumber = getMaxSequenceNumber(scope, records);
|
||||
recordProcessorCheckpointer.setLargestPermittedCheckpointValue(maxSequenceNumber);
|
||||
// If we got more records, record the max extended sequence number. Sleep if there are no records.
|
||||
if (!records.isEmpty()) {
|
||||
scope.addData(RECORDS_PROCESSED_METRIC, numKinesisRecords, StandardUnit.Count);
|
||||
if (this.shard != null) {
|
||||
subRecords = UserRecord.deaggregate(records,
|
||||
new BigInteger(this.shard.getHashKeyRange().getStartingHashKey()),
|
||||
new BigInteger(this.shard.getHashKeyRange().getEndingHashKey()));
|
||||
} else {
|
||||
subRecords = UserRecord.deaggregate(records);
|
||||
}
|
||||
recordProcessorCheckpointer.setLargestPermittedCheckpointValue(
|
||||
filterAndGetMaxExtendedSequenceNumber(scope, subRecords,
|
||||
recordProcessorCheckpointer.getLastCheckpointValue()));
|
||||
numUserRecords = subRecords.size();
|
||||
}
|
||||
|
||||
if ((!subRecords.isEmpty()) || streamConfig.shouldCallProcessRecordsEvenForEmptyRecordList()) {
|
||||
try {
|
||||
LOG.debug("Calling application processRecords() with " + records.size() + " records from "
|
||||
+ shardInfo.getShardId());
|
||||
recordProcessor.processRecords(records, recordProcessorCheckpointer);
|
||||
LOG.debug("Calling application processRecords() with " + numKinesisRecords + " Kinesis records ("
|
||||
+ numUserRecords + " user records) from " + shardInfo.getShardId());
|
||||
@SuppressWarnings("unchecked")
|
||||
final ProcessRecordsInput processRecordsInput = new ProcessRecordsInput()
|
||||
.withRecords((List<Record>) (List<?>) subRecords)
|
||||
.withCheckpointer(recordProcessorCheckpointer);
|
||||
|
||||
recordProcessor.processRecords(processRecordsInput);
|
||||
} catch (Exception e) {
|
||||
LOG.error("ShardId " + shardInfo.getShardId()
|
||||
+ ": Application processRecords() threw an exception when processing shard ", e);
|
||||
LOG.error("ShardId " + shardInfo.getShardId() + ": Skipping over the following data records: "
|
||||
+ records);
|
||||
+ subRecords);
|
||||
}
|
||||
}
|
||||
} catch (RuntimeException | KinesisClientLibException e) {
|
||||
|
|
@ -155,37 +192,44 @@ class ProcessTask implements ITask {
|
|||
return new TaskResult(exception);
|
||||
}
|
||||
|
||||
// CHECKSTYLE:ON CyclomaticComplexity
|
||||
|
||||
/**
|
||||
* Scans a list of records and returns the greatest sequence number from the records. Also emits metrics about the
|
||||
* records.
|
||||
*
|
||||
* Scans a list of records to filter out records up to and including the most recent checkpoint value and to get
|
||||
* the greatest extended sequence number from the retained records. Also emits metrics about the records.
|
||||
*
|
||||
* @param scope metrics scope to emit metrics into
|
||||
* @param records list of records to scan
|
||||
* @return greatest sequence number out of all the records.
|
||||
* @param records list of records to scan and change in-place as needed
|
||||
* @param lastCheckpointValue the most recent checkpoint value
|
||||
* @return the largest extended sequence number among the retained records
|
||||
*/
|
||||
private String getMaxSequenceNumber(IMetricsScope scope, List<Record> records) {
|
||||
scope.addData(RECORDS_PROCESSED_METRIC, records.size(), StandardUnit.Count);
|
||||
ListIterator<Record> recordIterator = records.listIterator();
|
||||
BigInteger maxSequenceNumber = BigInteger.ZERO;
|
||||
|
||||
private ExtendedSequenceNumber filterAndGetMaxExtendedSequenceNumber(IMetricsScope scope, List<UserRecord> records,
|
||||
final ExtendedSequenceNumber lastCheckpointValue) {
|
||||
ExtendedSequenceNumber largestExtendedSequenceNumber = lastCheckpointValue;
|
||||
ListIterator<UserRecord> recordIterator = records.listIterator();
|
||||
while (recordIterator.hasNext()) {
|
||||
Record record = recordIterator.next();
|
||||
BigInteger sequenceNumber = new BigInteger(record.getSequenceNumber());
|
||||
if (maxSequenceNumber.compareTo(sequenceNumber) < 0) {
|
||||
maxSequenceNumber = sequenceNumber;
|
||||
UserRecord record = recordIterator.next();
|
||||
ExtendedSequenceNumber extendedSequenceNumber
|
||||
= new ExtendedSequenceNumber(record.getSequenceNumber(), record.getSubSequenceNumber());
|
||||
|
||||
if (extendedSequenceNumber.compareTo(lastCheckpointValue) <= 0) {
|
||||
recordIterator.remove();
|
||||
LOG.debug("removing record with ESN " + extendedSequenceNumber
|
||||
+ " because the ESN is <= checkpoint (" + lastCheckpointValue + ")");
|
||||
continue;
|
||||
}
|
||||
|
||||
if (largestExtendedSequenceNumber == null
|
||||
|| largestExtendedSequenceNumber.compareTo(extendedSequenceNumber) < 0) {
|
||||
largestExtendedSequenceNumber = extendedSequenceNumber;
|
||||
}
|
||||
|
||||
scope.addData(DATA_BYTES_PROCESSED_METRIC, record.getData().limit(), StandardUnit.Bytes);
|
||||
}
|
||||
|
||||
return maxSequenceNumber.toString();
|
||||
return largestExtendedSequenceNumber;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets records from Kinesis and retries once in the event of an ExpiredIteratorException.
|
||||
*
|
||||
*
|
||||
* @return list of data records from Kinesis
|
||||
* @throws KinesisClientLibException if reading checkpoints fails in the edge case where we haven't passed any
|
||||
* records to the client code yet
|
||||
|
|
@ -205,7 +249,8 @@ class ProcessTask implements ITask {
|
|||
* Advance the iterator to after the greatest processed sequence number (remembered by
|
||||
* recordProcessorCheckpointer).
|
||||
*/
|
||||
dataFetcher.advanceIteratorAfter(recordProcessorCheckpointer.getLargestPermittedCheckpointValue());
|
||||
dataFetcher.advanceIteratorTo(
|
||||
recordProcessorCheckpointer.getLargestPermittedCheckpointValue().getSequenceNumber());
|
||||
|
||||
// Try a second time - if we fail this time, expose the failure.
|
||||
try {
|
||||
|
|
|
|||
|
|
@ -24,7 +24,9 @@ import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException
|
|||
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.SentinelCheckpoint;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
|
||||
import com.amazonaws.services.kinesis.model.Record;
|
||||
|
||||
/**
|
||||
* This class is used to enable RecordProcessors to checkpoint their progress.
|
||||
|
|
@ -37,34 +39,29 @@ class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer {
|
|||
|
||||
private ICheckpoint checkpoint;
|
||||
|
||||
private String largestPermittedCheckpointValue;
|
||||
private ExtendedSequenceNumber largestPermittedCheckpointValue;
|
||||
// Set to the last value set via checkpoint().
|
||||
// Sample use: verify application shutdown() invoked checkpoint() at the end of a shard.
|
||||
private String lastCheckpointValue;
|
||||
private ExtendedSequenceNumber lastCheckpointValue;
|
||||
|
||||
private ShardInfo shardInfo;
|
||||
|
||||
private SequenceNumberValidator sequenceNumberValidator;
|
||||
|
||||
private CheckpointValueComparator checkpointValueComparator;
|
||||
|
||||
private String sequenceNumberAtShardEnd;
|
||||
private ExtendedSequenceNumber sequenceNumberAtShardEnd;
|
||||
|
||||
/**
|
||||
* Only has package level access, since only the Amazon Kinesis Client Library should be creating these.
|
||||
*
|
||||
*
|
||||
* @param checkpoint Used to checkpoint progress of a RecordProcessor
|
||||
* @param validator Used for validating sequence numbers
|
||||
* @param comparator Used for checking the order of checkpoint values
|
||||
*/
|
||||
RecordProcessorCheckpointer(ShardInfo shardInfo,
|
||||
ICheckpoint checkpoint,
|
||||
SequenceNumberValidator validator,
|
||||
CheckpointValueComparator comparator) {
|
||||
SequenceNumberValidator validator) {
|
||||
this.shardInfo = shardInfo;
|
||||
this.checkpoint = checkpoint;
|
||||
this.sequenceNumberValidator = validator;
|
||||
this.checkpointValueComparator = comparator;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -80,6 +77,22 @@ class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer {
|
|||
advancePosition(this.largestPermittedCheckpointValue);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public synchronized void checkpoint(Record record)
|
||||
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException,
|
||||
IllegalArgumentException {
|
||||
if (record == null) {
|
||||
throw new IllegalArgumentException("Could not checkpoint a null record");
|
||||
} else if (record instanceof UserRecord) {
|
||||
checkpoint(record.getSequenceNumber(), ((UserRecord) record).getSubSequenceNumber());
|
||||
} else {
|
||||
checkpoint(record.getSequenceNumber(), 0);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
|
|
@ -87,6 +100,21 @@ class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer {
|
|||
public synchronized void checkpoint(String sequenceNumber)
|
||||
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException,
|
||||
IllegalArgumentException {
|
||||
checkpoint(sequenceNumber, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public synchronized void checkpoint(String sequenceNumber, long subSequenceNumber)
|
||||
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException,
|
||||
IllegalArgumentException {
|
||||
|
||||
if (subSequenceNumber < 0) {
|
||||
throw new IllegalArgumentException("Could not checkpoint at invalid, negative subsequence number "
|
||||
+ subSequenceNumber);
|
||||
}
|
||||
|
||||
// throws exception if sequence number shouldn't be checkpointed for this shard
|
||||
sequenceNumberValidator.validateSequenceNumber(sequenceNumber);
|
||||
|
|
@ -98,66 +126,68 @@ class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer {
|
|||
* If there isn't a last checkpoint value, we only care about checking the upper bound.
|
||||
* If there is a last checkpoint value, we want to check both the lower and upper bound.
|
||||
*/
|
||||
if ((checkpointValueComparator.compare(lastCheckpointValue, sequenceNumber) <= 0)
|
||||
&& checkpointValueComparator.compare(sequenceNumber, largestPermittedCheckpointValue) <= 0) {
|
||||
ExtendedSequenceNumber newCheckpoint = new ExtendedSequenceNumber(sequenceNumber, subSequenceNumber);
|
||||
if ((lastCheckpointValue.compareTo(newCheckpoint) <= 0)
|
||||
&& newCheckpoint.compareTo(largestPermittedCheckpointValue) <= 0) {
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Checkpointing " + shardInfo.getShardId() + ", token " + shardInfo.getConcurrencyToken()
|
||||
+ " at specific sequence number " + sequenceNumber);
|
||||
+ " at specific extended sequence number " + newCheckpoint);
|
||||
}
|
||||
this.advancePosition(sequenceNumber);
|
||||
this.advancePosition(newCheckpoint);
|
||||
} else {
|
||||
throw new IllegalArgumentException("Could not checkpoint at sequence number " + sequenceNumber
|
||||
+ " it did not fall into acceptable range between the last sequence number checkpointed "
|
||||
+ this.lastCheckpointValue + " and the greatest sequence number passed to this record processor "
|
||||
+ this.largestPermittedCheckpointValue);
|
||||
throw new IllegalArgumentException(String.format(
|
||||
"Could not checkpoint at extended sequence number %s as it did not fall into acceptable range "
|
||||
+ "between the last checkpoint %s and the greatest extended sequence number passed to this "
|
||||
+ "record processor %s",
|
||||
newCheckpoint, this.lastCheckpointValue, this.largestPermittedCheckpointValue));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the lastCheckpointValue
|
||||
*/
|
||||
String getLastCheckpointValue() {
|
||||
ExtendedSequenceNumber getLastCheckpointValue() {
|
||||
return lastCheckpointValue;
|
||||
}
|
||||
|
||||
|
||||
synchronized void setInitialCheckpointValue(String initialCheckpoint) {
|
||||
synchronized void setInitialCheckpointValue(ExtendedSequenceNumber initialCheckpoint) {
|
||||
lastCheckpointValue = initialCheckpoint;
|
||||
}
|
||||
|
||||
/**
|
||||
* Used for testing.
|
||||
*
|
||||
* @return the sequenceNumber
|
||||
*
|
||||
* @return the largest permitted checkpoint
|
||||
*/
|
||||
synchronized String getLargestPermittedCheckpointValue() {
|
||||
synchronized ExtendedSequenceNumber getLargestPermittedCheckpointValue() {
|
||||
return largestPermittedCheckpointValue;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param largestPermittedCheckpointValue the checkpoint value to set
|
||||
* @param checkpoint the checkpoint value to set
|
||||
*/
|
||||
synchronized void setLargestPermittedCheckpointValue(String checkpointValue) {
|
||||
this.largestPermittedCheckpointValue = checkpointValue;
|
||||
synchronized void setLargestPermittedCheckpointValue(ExtendedSequenceNumber largestPermittedCheckpointValue) {
|
||||
this.largestPermittedCheckpointValue = largestPermittedCheckpointValue;
|
||||
}
|
||||
|
||||
/**
|
||||
* Used to remember the last sequence number before SHARD_END to allow us to prevent the checkpointer from
|
||||
* checkpointing at the end of the shard twice (i.e. at the last sequence number and then again at SHARD_END).
|
||||
*
|
||||
* @param sequenceNumber
|
||||
* Used to remember the last extended sequence number before SHARD_END to allow us to prevent the checkpointer
|
||||
* from checkpointing at the end of the shard twice (i.e. at the last extended sequence number and then again
|
||||
* at SHARD_END).
|
||||
*
|
||||
* @param extendedSequenceNumber
|
||||
*/
|
||||
synchronized void setSequenceNumberAtShardEnd(String sequenceNumber) {
|
||||
this.sequenceNumberAtShardEnd = sequenceNumber;
|
||||
synchronized void setSequenceNumberAtShardEnd(ExtendedSequenceNumber extendedSequenceNumber) {
|
||||
this.sequenceNumberAtShardEnd = extendedSequenceNumber;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Internal API - has package level access only for testing purposes.
|
||||
*
|
||||
*
|
||||
* @param sequenceNumber
|
||||
*
|
||||
*
|
||||
* @throws KinesisClientLibDependencyException
|
||||
* @throws ThrottlingException
|
||||
* @throws ShutdownException
|
||||
|
|
@ -165,21 +195,26 @@ class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer {
|
|||
*/
|
||||
void advancePosition(String sequenceNumber)
|
||||
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
|
||||
String checkpointValue = sequenceNumber;
|
||||
if (sequenceNumberAtShardEnd != null && sequenceNumberAtShardEnd.equals(sequenceNumber)) {
|
||||
advancePosition(new ExtendedSequenceNumber(sequenceNumber));
|
||||
}
|
||||
|
||||
void advancePosition(ExtendedSequenceNumber extendedSequenceNumber)
|
||||
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
|
||||
ExtendedSequenceNumber checkpointToRecord = extendedSequenceNumber;
|
||||
if (sequenceNumberAtShardEnd != null && sequenceNumberAtShardEnd.equals(extendedSequenceNumber)) {
|
||||
// If we are about to checkpoint the very last sequence number for this shard, we might as well
|
||||
// just checkpoint at SHARD_END
|
||||
checkpointValue = SentinelCheckpoint.SHARD_END.toString();
|
||||
checkpointToRecord = ExtendedSequenceNumber.SHARD_END;
|
||||
}
|
||||
// Don't checkpoint a value we already successfully checkpointed
|
||||
if (sequenceNumber != null && !sequenceNumber.equals(lastCheckpointValue)) {
|
||||
if (extendedSequenceNumber != null && !extendedSequenceNumber.equals(lastCheckpointValue)) {
|
||||
try {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Setting " + shardInfo.getShardId() + ", token " + shardInfo.getConcurrencyToken()
|
||||
+ " checkpoint to " + checkpointValue);
|
||||
+ " checkpoint to " + checkpointToRecord);
|
||||
}
|
||||
checkpoint.setCheckpoint(shardInfo.getShardId(), checkpointValue, shardInfo.getConcurrencyToken());
|
||||
lastCheckpointValue = checkpointValue;
|
||||
checkpoint.setCheckpoint(shardInfo.getShardId(), checkpointToRecord, shardInfo.getConcurrencyToken());
|
||||
lastCheckpointValue = checkpointToRecord;
|
||||
} catch (ThrottlingException | ShutdownException | InvalidStateException
|
||||
| KinesisClientLibDependencyException e) {
|
||||
throw e;
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ import com.amazonaws.AmazonServiceException;
|
|||
import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
|
||||
import com.amazonaws.services.kinesis.model.InvalidArgumentException;
|
||||
import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
|
||||
import com.amazonaws.services.kinesis.model.ShardIteratorType;
|
||||
|
|
@ -33,7 +34,7 @@ import com.amazonaws.services.kinesis.model.ShardIteratorType;
|
|||
* which could prevent another shard consumer instance from processing the shard later on). This class also provides a
|
||||
* utility function {@link #isDigits(String)} which is used to check whether a string is all digits
|
||||
*/
|
||||
class SequenceNumberValidator {
|
||||
public class SequenceNumberValidator {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(SequenceNumberValidator.class);
|
||||
|
||||
|
|
@ -98,6 +99,15 @@ class SequenceNumberValidator {
|
|||
}
|
||||
}
|
||||
|
||||
void validateSequenceNumber(ExtendedSequenceNumber checkpoint)
|
||||
throws IllegalArgumentException, ThrottlingException, KinesisClientLibDependencyException {
|
||||
validateSequenceNumber(checkpoint.getSequenceNumber());
|
||||
if (checkpoint.getSubSequenceNumber() < 0) {
|
||||
throw new IllegalArgumentException("SubSequence number must be non-negative, but was "
|
||||
+ checkpoint.getSubSequenceNumber());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if the string is composed of only digits.
|
||||
*
|
||||
|
|
|
|||
|
|
@ -23,7 +23,7 @@ import org.apache.commons.logging.LogFactory;
|
|||
|
||||
import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.BlockedOnParentShardException;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
|
||||
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
|
||||
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
|
||||
|
|
@ -107,8 +107,7 @@ class ShardConsumer {
|
|||
checkpoint,
|
||||
new SequenceNumberValidator(streamConfig.getStreamProxy(),
|
||||
shardInfo.getShardId(),
|
||||
streamConfig.shouldValidateSequenceNumberBeforeCheckpointing()),
|
||||
new CheckpointValueComparator());
|
||||
streamConfig.shouldValidateSequenceNumberBeforeCheckpointing()));
|
||||
this.dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo);
|
||||
this.leaseManager = leaseManager;
|
||||
this.metricsFactory = metricsFactory;
|
||||
|
|
|
|||
|
|
@ -29,16 +29,16 @@ import java.util.Set;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import com.amazonaws.services.kinesis.model.Shard;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.KinesisClientLibIOException;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.SentinelCheckpoint;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
|
||||
import com.amazonaws.services.kinesis.leases.exceptions.DependencyException;
|
||||
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
|
||||
import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException;
|
||||
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
|
||||
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
|
||||
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
|
||||
import com.amazonaws.services.kinesis.model.Shard;
|
||||
|
||||
/**
|
||||
* Helper class to sync leases with shards of the Kinesis stream.
|
||||
|
|
@ -364,7 +364,7 @@ class ShardSyncer {
|
|||
shardIdToNewLeaseMap,
|
||||
memoizationContext);
|
||||
if (isDescendant) {
|
||||
newLease.setCheckpoint(SentinelCheckpoint.TRIM_HORIZON.toString());
|
||||
newLease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON);
|
||||
} else {
|
||||
newLease.setCheckpoint(convertToCheckpoint(initialPosition));
|
||||
}
|
||||
|
|
@ -449,7 +449,7 @@ class ShardSyncer {
|
|||
}
|
||||
|
||||
if (descendantParentShardIds.contains(parentShardId)) {
|
||||
lease.setCheckpoint(SentinelCheckpoint.TRIM_HORIZON.toString());
|
||||
lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON);
|
||||
} else {
|
||||
lease.setCheckpoint(convertToCheckpoint(initialPosition));
|
||||
}
|
||||
|
|
@ -607,7 +607,7 @@ class ShardSyncer {
|
|||
Set<String> shardIdsOfClosedShards = new HashSet<>();
|
||||
List<KinesisClientLease> leasesOfClosedShards = new ArrayList<>();
|
||||
for (KinesisClientLease lease : currentLeases) {
|
||||
if (lease.getCheckpoint().equals(SentinelCheckpoint.SHARD_END.toString())) {
|
||||
if (lease.getCheckpoint().equals(ExtendedSequenceNumber.SHARD_END)) {
|
||||
shardIdsOfClosedShards.add(lease.getLeaseKey());
|
||||
leasesOfClosedShards.add(lease);
|
||||
}
|
||||
|
|
@ -662,11 +662,11 @@ class ShardSyncer {
|
|||
}
|
||||
|
||||
if ((leaseForClosedShard != null)
|
||||
&& (leaseForClosedShard.getCheckpoint().equals(SentinelCheckpoint.SHARD_END.toString()))
|
||||
&& (leaseForClosedShard.getCheckpoint().equals(ExtendedSequenceNumber.SHARD_END))
|
||||
&& (childShardLeases.size() == childShardIds.size())) {
|
||||
boolean okayToDelete = true;
|
||||
for (KinesisClientLease lease : childShardLeases) {
|
||||
if (lease.getCheckpoint().equals(SentinelCheckpoint.TRIM_HORIZON.toString())) {
|
||||
if (lease.getCheckpoint().equals(ExtendedSequenceNumber.TRIM_HORIZON)) {
|
||||
okayToDelete = false;
|
||||
break;
|
||||
}
|
||||
|
|
@ -736,13 +736,13 @@ class ShardSyncer {
|
|||
return openShards;
|
||||
}
|
||||
|
||||
private static String convertToCheckpoint(InitialPositionInStream position) {
|
||||
String checkpoint = null;
|
||||
private static ExtendedSequenceNumber convertToCheckpoint(InitialPositionInStream position) {
|
||||
ExtendedSequenceNumber checkpoint = null;
|
||||
|
||||
if (position.equals(InitialPositionInStream.TRIM_HORIZON)) {
|
||||
checkpoint = SentinelCheckpoint.TRIM_HORIZON.toString();
|
||||
checkpoint = ExtendedSequenceNumber.TRIM_HORIZON;
|
||||
} else if (position.equals(InitialPositionInStream.LATEST)) {
|
||||
checkpoint = SentinelCheckpoint.LATEST.toString();
|
||||
checkpoint = ExtendedSequenceNumber.LATEST;
|
||||
}
|
||||
|
||||
return checkpoint;
|
||||
|
|
|
|||
|
|
@ -17,9 +17,10 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.SentinelCheckpoint;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
|
||||
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
|
||||
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
|
||||
|
|
@ -81,17 +82,21 @@ class ShutdownTask implements ITask {
|
|||
if (reason == ShutdownReason.TERMINATE) {
|
||||
recordProcessorCheckpointer.setSequenceNumberAtShardEnd(
|
||||
recordProcessorCheckpointer.getLargestPermittedCheckpointValue());
|
||||
recordProcessorCheckpointer.setLargestPermittedCheckpointValue(SentinelCheckpoint.SHARD_END.toString());
|
||||
recordProcessorCheckpointer.setLargestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END);
|
||||
}
|
||||
|
||||
LOG.debug("Invoking shutdown() for shard " + shardInfo.getShardId() + ", concurrencyToken "
|
||||
+ shardInfo.getConcurrencyToken() + ". Shutdown reason: " + reason);
|
||||
try {
|
||||
recordProcessor.shutdown(recordProcessorCheckpointer, reason);
|
||||
String lastCheckpointValue = recordProcessorCheckpointer.getLastCheckpointValue();
|
||||
final ShutdownInput shutdownInput = new ShutdownInput()
|
||||
.withShutdownReason(reason)
|
||||
.withCheckpointer(recordProcessorCheckpointer);
|
||||
recordProcessor.shutdown(shutdownInput);
|
||||
ExtendedSequenceNumber lastCheckpointValue = recordProcessorCheckpointer.getLastCheckpointValue();
|
||||
|
||||
if (reason == ShutdownReason.TERMINATE) {
|
||||
if ((lastCheckpointValue == null)
|
||||
|| (!lastCheckpointValue.equals(SentinelCheckpoint.SHARD_END.toString()))) {
|
||||
|| (!lastCheckpointValue.equals(ExtendedSequenceNumber.SHARD_END))) {
|
||||
throw new IllegalArgumentException("Application didn't checkpoint at end of shard "
|
||||
+ shardInfo.getShardId());
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,51 @@
|
|||
/*
|
||||
* Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||
|
||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
|
||||
|
||||
/**
|
||||
* Adapts a V1 {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor IRecordProcessor}
|
||||
* to V2 {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor IRecordProcessor}.
|
||||
*/
|
||||
class V1ToV2RecordProcessorAdapter implements IRecordProcessor {
|
||||
|
||||
private com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor recordProcessor;
|
||||
|
||||
V1ToV2RecordProcessorAdapter(
|
||||
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor recordProcessor) {
|
||||
this.recordProcessor = recordProcessor;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void initialize(InitializationInput initializationInput) {
|
||||
recordProcessor.initialize(initializationInput.getShardId());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void processRecords(ProcessRecordsInput processRecordsInput) {
|
||||
recordProcessor.processRecords(processRecordsInput.getRecords(), processRecordsInput.getCheckpointer());
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdown(ShutdownInput shutdownInput) {
|
||||
recordProcessor.shutdown(shutdownInput.getCheckpointer(), shutdownInput.getShutdownReason());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,38 @@
|
|||
/*
|
||||
* Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||
|
||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
|
||||
|
||||
/**
|
||||
* Adapts a V1 {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory
|
||||
* IRecordProcessorFactory} to V2
|
||||
* {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory IRecordProcessorFactory}.
|
||||
*/
|
||||
class V1ToV2RecordProcessorFactoryAdapter implements IRecordProcessorFactory {
|
||||
|
||||
private com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory factory;
|
||||
|
||||
V1ToV2RecordProcessorFactoryAdapter(
|
||||
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory factory) {
|
||||
this.factory = factory;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IRecordProcessor createProcessor() {
|
||||
return new V1ToV2RecordProcessorAdapter(factory.createProcessor());
|
||||
}
|
||||
}
|
||||
|
|
@ -35,8 +35,8 @@ import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
|
|||
import com.amazonaws.services.kinesis.AmazonKinesis;
|
||||
import com.amazonaws.services.kinesis.AmazonKinesisClient;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxyFactory;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
|
||||
import com.amazonaws.services.kinesis.leases.exceptions.LeasingException;
|
||||
|
|
@ -84,23 +84,26 @@ public class Worker implements Runnable {
|
|||
|
||||
/**
|
||||
* Constructor.
|
||||
*
|
||||
*
|
||||
* @param recordProcessorFactory Used to get record processor instances for processing data from shards
|
||||
* @param config Kinesis Client Library configuration
|
||||
*/
|
||||
public Worker(IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config) {
|
||||
public Worker(
|
||||
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory,
|
||||
KinesisClientLibConfiguration config) {
|
||||
this(recordProcessorFactory, config, Executors.newCachedThreadPool());
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
*
|
||||
*
|
||||
* @param recordProcessorFactory Used to get record processor instances for processing data from shards
|
||||
* @param config Kinesis Client Library configuration
|
||||
* @param execService ExecutorService to use for processing records (support for multi-threaded
|
||||
* consumption)
|
||||
*/
|
||||
public Worker(IRecordProcessorFactory recordProcessorFactory,
|
||||
public Worker(
|
||||
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory,
|
||||
KinesisClientLibConfiguration config,
|
||||
ExecutorService execService) {
|
||||
this(recordProcessorFactory, config, new AmazonKinesisClient(config.getKinesisCredentialsProvider(),
|
||||
|
|
@ -116,7 +119,8 @@ public class Worker implements Runnable {
|
|||
* @param config Kinesis Client Library configuration
|
||||
* @param metricsFactory Metrics factory used to emit metrics
|
||||
*/
|
||||
public Worker(IRecordProcessorFactory recordProcessorFactory,
|
||||
public Worker(
|
||||
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory,
|
||||
KinesisClientLibConfiguration config,
|
||||
IMetricsFactory metricsFactory) {
|
||||
this(recordProcessorFactory, config, metricsFactory, Executors.newCachedThreadPool());
|
||||
|
|
@ -129,7 +133,8 @@ public class Worker implements Runnable {
|
|||
* @param execService ExecutorService to use for processing records (support for multi-threaded
|
||||
* consumption)
|
||||
*/
|
||||
public Worker(IRecordProcessorFactory recordProcessorFactory,
|
||||
public Worker(
|
||||
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory,
|
||||
KinesisClientLibConfiguration config,
|
||||
IMetricsFactory metricsFactory,
|
||||
ExecutorService execService) {
|
||||
|
|
@ -146,7 +151,8 @@ public class Worker implements Runnable {
|
|||
* @param dynamoDBClient DynamoDB client used for checkpoints and tracking leases
|
||||
* @param cloudWatchClient CloudWatch Client for publishing metrics
|
||||
*/
|
||||
public Worker(IRecordProcessorFactory recordProcessorFactory,
|
||||
public Worker(
|
||||
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory,
|
||||
KinesisClientLibConfiguration config,
|
||||
AmazonKinesis kinesisClient,
|
||||
AmazonDynamoDB dynamoDBClient,
|
||||
|
|
@ -164,7 +170,8 @@ public class Worker implements Runnable {
|
|||
* @param execService ExecutorService to use for processing records (support for multi-threaded
|
||||
* consumption)
|
||||
*/
|
||||
public Worker(IRecordProcessorFactory recordProcessorFactory,
|
||||
public Worker(
|
||||
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory,
|
||||
KinesisClientLibConfiguration config,
|
||||
AmazonKinesis kinesisClient,
|
||||
AmazonDynamoDB dynamoDBClient,
|
||||
|
|
@ -190,27 +197,34 @@ public class Worker implements Runnable {
|
|||
* @param execService ExecutorService to use for processing records (support for multi-threaded
|
||||
* consumption)
|
||||
*/
|
||||
public Worker(IRecordProcessorFactory recordProcessorFactory,
|
||||
public Worker(
|
||||
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory,
|
||||
KinesisClientLibConfiguration config,
|
||||
AmazonKinesis kinesisClient,
|
||||
AmazonDynamoDB dynamoDBClient,
|
||||
IMetricsFactory metricsFactory,
|
||||
ExecutorService execService) {
|
||||
this(
|
||||
recordProcessorFactory,
|
||||
config,
|
||||
new StreamConfig(new KinesisProxyFactory(config.getKinesisCredentialsProvider(),
|
||||
kinesisClient).getProxy(config.getStreamName()),
|
||||
config.getMaxRecords(),
|
||||
config.getIdleTimeBetweenReadsInMillis(),
|
||||
config.getApplicationName(),
|
||||
new V1ToV2RecordProcessorFactoryAdapter(recordProcessorFactory),
|
||||
new StreamConfig(
|
||||
new KinesisProxyFactory(config.getKinesisCredentialsProvider(), kinesisClient)
|
||||
.getProxy(config.getStreamName()),
|
||||
config.getMaxRecords(), config.getIdleTimeBetweenReadsInMillis(),
|
||||
config.shouldCallProcessRecordsEvenForEmptyRecordList(),
|
||||
config.shouldValidateSequenceNumberBeforeCheckpointing()),
|
||||
new KinesisClientLibLeaseCoordinator(new KinesisClientLeaseManager(config.getApplicationName(),
|
||||
dynamoDBClient),
|
||||
config.getWorkerIdentifier(),
|
||||
config.getFailoverTimeMillis(),
|
||||
config.getEpsilonMillis(),
|
||||
metricsFactory), metricsFactory, execService);
|
||||
config.getInitialPositionInStream(),
|
||||
config.getParentShardPollIntervalMillis(),
|
||||
config.getShardSyncIntervalMillis(),
|
||||
config.shouldCleanupLeasesUponShardCompletion(),
|
||||
null,
|
||||
new KinesisClientLibLeaseCoordinator(
|
||||
new KinesisClientLeaseManager(config.getApplicationName(), dynamoDBClient),
|
||||
config.getWorkerIdentifier(), config.getFailoverTimeMillis(), config.getEpsilonMillis(),
|
||||
metricsFactory),
|
||||
execService,
|
||||
metricsFactory,
|
||||
config.getTaskBackoffTimeMillis());
|
||||
// If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB.
|
||||
if (config.getRegionName() != null) {
|
||||
Region region = RegionUtils.getRegion(config.getRegionName());
|
||||
|
|
@ -223,7 +237,7 @@ public class Worker implements Runnable {
|
|||
if (config.getKinesisEndpoint() != null) {
|
||||
kinesisClient.setEndpoint(config.getKinesisEndpoint());
|
||||
if (config.getRegionName() != null) {
|
||||
LOG.warn("Received configuration for both region name as " + config.getRegionName()
|
||||
LOG.warn("Received configuration for both region name as " + config.getRegionName()
|
||||
+ ", and Amazon Kinesis endpoint as " + config.getKinesisEndpoint()
|
||||
+ ". Amazon Kinesis endpoint will overwrite region name.");
|
||||
LOG.debug("The region of Amazon Kinesis client has been overwritten to " + config.getKinesisEndpoint());
|
||||
|
|
@ -233,27 +247,6 @@ public class Worker implements Runnable {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param recordProcessorFactory Used to get record processor instances for processing data from shards
|
||||
* @param config Kinesis Client Library configuration
|
||||
* @param streamConfig Stream configuration
|
||||
* @param leaseCoordinator Lease coordinator (coordinates currently owned leases and checkpoints)
|
||||
* @param metricsFactory Metrics factory used to emit metrics
|
||||
* @param execService ExecutorService to use for processing records (support for multi-threaded
|
||||
* consumption)
|
||||
*/
|
||||
private Worker(IRecordProcessorFactory recordProcessorFactory,
|
||||
KinesisClientLibConfiguration config,
|
||||
StreamConfig streamConfig,
|
||||
KinesisClientLibLeaseCoordinator leaseCoordinator,
|
||||
IMetricsFactory metricsFactory,
|
||||
ExecutorService execService) {
|
||||
this(config.getApplicationName(), recordProcessorFactory, streamConfig, config.getInitialPositionInStream(),
|
||||
config.getParentShardPollIntervalMillis(), config.getShardSyncIntervalMillis(),
|
||||
config.shouldCleanupLeasesUponShardCompletion(), leaseCoordinator, leaseCoordinator, execService,
|
||||
metricsFactory, config.getTaskBackoffTimeMillis());
|
||||
}
|
||||
|
||||
/**
|
||||
* @param applicationName Name of the Kinesis application
|
||||
* @param recordProcessorFactory Used to get record processor instances for processing data from shards
|
||||
|
|
@ -292,7 +285,7 @@ public class Worker implements Runnable {
|
|||
this.initialPosition = initialPositionInStream;
|
||||
this.parentShardPollIntervalMillis = parentShardPollIntervalMillis;
|
||||
this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion;
|
||||
this.checkpointTracker = checkpoint;
|
||||
this.checkpointTracker = checkpoint != null ? checkpoint : leaseCoordinator;
|
||||
this.idleTimeInMilliseconds = streamConfig.getIdleTimeInMilliseconds();
|
||||
this.executorService = execService;
|
||||
this.leaseCoordinator = leaseCoordinator;
|
||||
|
|
@ -421,7 +414,7 @@ public class Worker implements Runnable {
|
|||
/**
|
||||
* NOTE: This method is internal/private to the Worker class. It has package
|
||||
* access solely for testing.
|
||||
*
|
||||
*
|
||||
* This method relies on ShardInfo.equals() method returning true for ShardInfo objects which may have been
|
||||
* instantiated with parentShardIds in a different order (and rest of the fields being the equal). For example
|
||||
* shardInfo1.equals(shardInfo2) should return true with shardInfo1 and shardInfo2 defined as follows.
|
||||
|
|
@ -475,7 +468,7 @@ public class Worker implements Runnable {
|
|||
/**
|
||||
* NOTE: This method is internal/private to the Worker class. It has package
|
||||
* access solely for testing.
|
||||
*
|
||||
*
|
||||
* @param shardInfo Kinesis shard info
|
||||
* @param factory RecordProcessor factory
|
||||
* @return ShardConsumer for the shard
|
||||
|
|
@ -572,14 +565,15 @@ public class Worker implements Runnable {
|
|||
/**
|
||||
* This constructor is for binary compatibility with code compiled against
|
||||
* version of the KCL that only have constructors taking "Client" objects.
|
||||
*
|
||||
*
|
||||
* @param recordProcessorFactory Used to get record processor instances for processing data from shards
|
||||
* @param config Kinesis Client Library configuration
|
||||
* @param kinesisClient Kinesis Client used for fetching data
|
||||
* @param dynamoDBClient DynamoDB client used for checkpoints and tracking leases
|
||||
* @param cloudWatchClient CloudWatch Client for publishing metrics
|
||||
*/
|
||||
public Worker(IRecordProcessorFactory recordProcessorFactory,
|
||||
public Worker(
|
||||
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory,
|
||||
KinesisClientLibConfiguration config,
|
||||
AmazonKinesisClient kinesisClient,
|
||||
AmazonDynamoDBClient dynamoDBClient,
|
||||
|
|
@ -594,7 +588,7 @@ public class Worker implements Runnable {
|
|||
/**
|
||||
* This constructor is for binary compatibility with code compiled against
|
||||
* version of the KCL that only have constructors taking "Client" objects.
|
||||
*
|
||||
*
|
||||
* @param recordProcessorFactory Used to get record processor instances for processing data from shards
|
||||
* @param config Kinesis Client Library configuration
|
||||
* @param kinesisClient Kinesis Client used for fetching data
|
||||
|
|
@ -603,7 +597,8 @@ public class Worker implements Runnable {
|
|||
* @param execService ExecutorService to use for processing records (support for multi-threaded
|
||||
* consumption)
|
||||
*/
|
||||
public Worker(IRecordProcessorFactory recordProcessorFactory,
|
||||
public Worker(
|
||||
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory,
|
||||
KinesisClientLibConfiguration config,
|
||||
AmazonKinesisClient kinesisClient,
|
||||
AmazonDynamoDBClient dynamoDBClient,
|
||||
|
|
@ -620,7 +615,7 @@ public class Worker implements Runnable {
|
|||
/**
|
||||
* This constructor is for binary compatibility with code compiled against
|
||||
* version of the KCL that only have constructors taking "Client" objects.
|
||||
*
|
||||
*
|
||||
* @param recordProcessorFactory Used to get record processor instances for processing data from shards
|
||||
* @param config Kinesis Client Library configuration
|
||||
* @param kinesisClient Kinesis Client used for fetching data
|
||||
|
|
@ -629,7 +624,8 @@ public class Worker implements Runnable {
|
|||
* @param execService ExecutorService to use for processing records (support for multi-threaded
|
||||
* consumption)
|
||||
*/
|
||||
public Worker(IRecordProcessorFactory recordProcessorFactory,
|
||||
public Worker(
|
||||
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory,
|
||||
KinesisClientLibConfiguration config,
|
||||
AmazonKinesisClient kinesisClient,
|
||||
AmazonDynamoDBClient dynamoDBClient,
|
||||
|
|
@ -642,4 +638,202 @@ public class Worker implements Runnable {
|
|||
metricsFactory,
|
||||
execService);
|
||||
}
|
||||
|
||||
/**
|
||||
* Builder to construct a Worker instance.
|
||||
*/
|
||||
public static class Builder {
|
||||
|
||||
private IRecordProcessorFactory recordProcessorFactory;
|
||||
private KinesisClientLibConfiguration config;
|
||||
private AmazonKinesis kinesisClient;
|
||||
private AmazonDynamoDB dynamoDBClient;
|
||||
private AmazonCloudWatch cloudWatchClient;
|
||||
private IMetricsFactory metricsFactory;
|
||||
private ExecutorService execService;
|
||||
|
||||
/**
|
||||
* Default constructor.
|
||||
*/
|
||||
public Builder() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Provide a V1
|
||||
* {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor IRecordProcessor}.
|
||||
*
|
||||
* @param recordProcessorFactory Used to get record processor instances for processing data from shards
|
||||
* @return A reference to this updated object so that method calls can be chained together.
|
||||
*/
|
||||
public Builder recordProcessorFactory(
|
||||
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory
|
||||
recordProcessorFactory) {
|
||||
this.recordProcessorFactory = new V1ToV2RecordProcessorFactoryAdapter(recordProcessorFactory);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Provide a V2
|
||||
* {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor IRecordProcessor}.
|
||||
*
|
||||
* @param recordProcessorFactory Used to get record processor instances for processing data from shards
|
||||
* @return A reference to this updated object so that method calls can be chained together.
|
||||
*/
|
||||
public Builder recordProcessorFactory(IRecordProcessorFactory recordProcessorFactory) {
|
||||
this.recordProcessorFactory = recordProcessorFactory;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the Worker config.
|
||||
*
|
||||
* @param config Kinesis Client Library configuration
|
||||
* @return A reference to this updated object so that method calls can be chained together.
|
||||
*/
|
||||
public Builder config(KinesisClientLibConfiguration config) {
|
||||
this.config = config;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the Kinesis client.
|
||||
*
|
||||
* @param kinesisClient Kinesis Client used for fetching data
|
||||
* @return A reference to this updated object so that method calls can be chained together.
|
||||
*/
|
||||
public Builder kinesisClient(AmazonKinesis kinesisClient) {
|
||||
this.kinesisClient = kinesisClient;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the DynamoDB client.
|
||||
*
|
||||
* @param dynamoDBClient DynamoDB client used for checkpoints and tracking leases
|
||||
* @return A reference to this updated object so that method calls can be chained together.
|
||||
*/
|
||||
public Builder dynamoDBClient(AmazonDynamoDB dynamoDBClient) {
|
||||
this.dynamoDBClient = dynamoDBClient;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the Cloudwatch client.
|
||||
*
|
||||
* @param cloudWatchClient CloudWatch Client for publishing metrics
|
||||
* @return A reference to this updated object so that method calls can be chained together.
|
||||
*/
|
||||
public Builder cloudWatchClient(AmazonCloudWatch cloudWatchClient) {
|
||||
this.cloudWatchClient = cloudWatchClient;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the metrics factory.
|
||||
*
|
||||
* @param metricsFactory Metrics factory used to emit metrics
|
||||
* @return A reference to this updated object so that method calls can be chained together.
|
||||
*/
|
||||
public Builder metricsFactory(IMetricsFactory metricsFactory) {
|
||||
this.metricsFactory = metricsFactory;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the executor service for processing records.
|
||||
*
|
||||
* @param execService ExecutorService to use for processing records (support for multi-threaded consumption)
|
||||
* @return A reference to this updated object so that method calls can be chained together.
|
||||
*/
|
||||
public Builder execService(ExecutorService execService) {
|
||||
this.execService = execService;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Build the Worker instance.
|
||||
*
|
||||
* @return a Worker instance.
|
||||
*/
|
||||
// CHECKSTYLE:OFF CyclomaticComplexity
|
||||
// CHECKSTYLE:OFF NPathComplexity
|
||||
public Worker build() {
|
||||
if (config == null) {
|
||||
throw new IllegalArgumentException(
|
||||
"Kinesis Client Library configuration needs to be provided to build Worker");
|
||||
}
|
||||
if (recordProcessorFactory == null) {
|
||||
throw new IllegalArgumentException(
|
||||
"A Record Processor Factory needs to be provided to build Worker");
|
||||
}
|
||||
if (execService == null) {
|
||||
execService = Executors.newCachedThreadPool();
|
||||
}
|
||||
if (kinesisClient == null) {
|
||||
kinesisClient = new AmazonKinesisClient(config.getKinesisCredentialsProvider(),
|
||||
config.getKinesisClientConfiguration());
|
||||
}
|
||||
if (dynamoDBClient == null) {
|
||||
dynamoDBClient = new AmazonDynamoDBClient(config.getDynamoDBCredentialsProvider(),
|
||||
config.getDynamoDBClientConfiguration());
|
||||
}
|
||||
if (cloudWatchClient == null) {
|
||||
cloudWatchClient = new AmazonCloudWatchClient(config.getCloudWatchCredentialsProvider(),
|
||||
config.getCloudWatchClientConfiguration());
|
||||
}
|
||||
// If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB.
|
||||
if (config.getRegionName() != null) {
|
||||
Region region = RegionUtils.getRegion(config.getRegionName());
|
||||
cloudWatchClient.setRegion(region);
|
||||
LOG.debug("The region of Amazon CloudWatch client has been set to " + config.getRegionName());
|
||||
kinesisClient.setRegion(region);
|
||||
LOG.debug("The region of Amazon Kinesis client has been set to " + config.getRegionName());
|
||||
dynamoDBClient.setRegion(region);
|
||||
LOG.debug("The region of Amazon DynamoDB client has been set to " + config.getRegionName());
|
||||
}
|
||||
// If a kinesis endpoint was explicitly specified, use it to set the region of kinesis.
|
||||
if (config.getKinesisEndpoint() != null) {
|
||||
kinesisClient.setEndpoint(config.getKinesisEndpoint());
|
||||
if (config.getRegionName() != null) {
|
||||
LOG.warn("Received configuration for both region name as " + config.getRegionName()
|
||||
+ ", and Amazon Kinesis endpoint as " + config.getKinesisEndpoint()
|
||||
+ ". Amazon Kinesis endpoint will overwrite region name.");
|
||||
LOG.debug("The region of Amazon Kinesis client has been overwritten to "
|
||||
+ config.getKinesisEndpoint());
|
||||
} else {
|
||||
LOG.debug("The region of Amazon Kinesis client has been set to " + config.getKinesisEndpoint());
|
||||
}
|
||||
}
|
||||
if (metricsFactory == null) {
|
||||
metricsFactory = new CWMetricsFactory(cloudWatchClient,
|
||||
config.getApplicationName(),
|
||||
config.getMetricsBufferTimeMillis(),
|
||||
config.getMetricsMaxQueueSize());
|
||||
}
|
||||
|
||||
return new Worker(config.getApplicationName(),
|
||||
recordProcessorFactory,
|
||||
new StreamConfig(new KinesisProxyFactory(config.getKinesisCredentialsProvider(),
|
||||
kinesisClient).getProxy(config.getStreamName()),
|
||||
config.getMaxRecords(),
|
||||
config.getIdleTimeBetweenReadsInMillis(),
|
||||
config.shouldCallProcessRecordsEvenForEmptyRecordList(),
|
||||
config.shouldValidateSequenceNumberBeforeCheckpointing()),
|
||||
config.getInitialPositionInStream(),
|
||||
config.getParentShardPollIntervalMillis(),
|
||||
config.getShardSyncIntervalMillis(),
|
||||
config.shouldCleanupLeasesUponShardCompletion(),
|
||||
null,
|
||||
new KinesisClientLibLeaseCoordinator(new KinesisClientLeaseManager(config.getApplicationName(),
|
||||
dynamoDBClient),
|
||||
config.getWorkerIdentifier(),
|
||||
config.getFailoverTimeMillis(),
|
||||
config.getEpsilonMillis(),
|
||||
metricsFactory),
|
||||
execService,
|
||||
metricsFactory,
|
||||
config.getTaskBackoffTimeMillis());
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,35 @@
|
|||
/*
|
||||
* Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.clientlibrary.proxies;
|
||||
|
||||
import com.amazonaws.services.kinesis.model.Shard;
|
||||
|
||||
/**
|
||||
* Kinesis proxy interface extended with addition method(s). Operates on a
|
||||
* single stream (set up at initialization).
|
||||
*
|
||||
*/
|
||||
public interface IKinesisProxyExtended extends IKinesisProxy {
|
||||
|
||||
/**
|
||||
* Get the Shard corresponding to shardId associated with this
|
||||
* IKinesisProxy.
|
||||
*
|
||||
* @param shardId
|
||||
* Fetch the Shard with this given shardId
|
||||
* @return the Shard with the given shardId
|
||||
*/
|
||||
Shard getShard(String shardId);
|
||||
}
|
||||
|
|
@ -19,6 +19,7 @@ import java.util.ArrayList;
|
|||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
|
@ -44,7 +45,7 @@ import com.amazonaws.services.kinesis.model.StreamStatus;
|
|||
/**
|
||||
* Kinesis proxy - used to make calls to Amazon Kinesis (e.g. fetch data records and list of shards).
|
||||
*/
|
||||
public class KinesisProxy implements IKinesisProxy {
|
||||
public class KinesisProxy implements IKinesisProxyExtended {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(KinesisProxy.class);
|
||||
|
||||
|
|
@ -53,6 +54,7 @@ public class KinesisProxy implements IKinesisProxy {
|
|||
|
||||
private AmazonKinesis client;
|
||||
private AWSCredentialsProvider credentialsProvider;
|
||||
private AtomicReference<List<Shard>> listOfShardsSinceLastGet = new AtomicReference();
|
||||
|
||||
private final String streamName;
|
||||
|
||||
|
|
@ -187,6 +189,26 @@ public class KinesisProxy implements IKinesisProxy {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public Shard getShard(String shardId) {
|
||||
if (this.listOfShardsSinceLastGet.get() == null) {
|
||||
//Update this.listOfShardsSinceLastGet as needed.
|
||||
this.getShardList();
|
||||
}
|
||||
|
||||
for (Shard shard : listOfShardsSinceLastGet.get()) {
|
||||
if (shard.getShardId().equals(shardId)) {
|
||||
return shard;
|
||||
}
|
||||
}
|
||||
|
||||
LOG.warn("Cannot find the shard given the shardId " + shardId);
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
|
|
@ -212,7 +234,7 @@ public class KinesisProxy implements IKinesisProxy {
|
|||
lastShardId = shards.get(shards.size() - 1).getShardId();
|
||||
}
|
||||
} while (response.getStreamDescription().isHasMoreShards());
|
||||
|
||||
this.listOfShardsSinceLastGet.set(result);
|
||||
return result;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,247 @@
|
|||
/*
|
||||
* Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.clientlibrary.types;
|
||||
|
||||
import java.math.BigInteger;
|
||||
|
||||
//import com.amazonaws.services.kinesis.clientlibrary.lib.worker.String;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.SentinelCheckpoint;
|
||||
|
||||
/**
|
||||
* Represents a two-part sequence number for records aggregated by the Kinesis
|
||||
* Producer Library.
|
||||
*
|
||||
* <p>
|
||||
* The KPL combines multiple user records into a single Kinesis record. Each
|
||||
* user record therefore has an integer sub-sequence number, in addition to the
|
||||
* regular sequence number of the Kinesis record. The sub-sequence number is
|
||||
* used to checkpoint within an aggregated record.
|
||||
*
|
||||
* @author daphnliu
|
||||
*
|
||||
*/
|
||||
public class ExtendedSequenceNumber implements Comparable<ExtendedSequenceNumber> {
|
||||
private final String sequenceNumber;
|
||||
private final long subSequenceNumber;
|
||||
|
||||
// Define TRIM_HORIZON and LATEST to be less than all sequence numbers
|
||||
private static final BigInteger TRIM_HORIZON_BIG_INTEGER_VALUE = BigInteger.valueOf(-2);
|
||||
private static final BigInteger LATEST_BIG_INTEGER_VALUE = BigInteger.valueOf(-1);
|
||||
|
||||
/**
|
||||
* Special value for LATEST.
|
||||
*/
|
||||
public static final ExtendedSequenceNumber LATEST =
|
||||
new ExtendedSequenceNumber(SentinelCheckpoint.LATEST.toString());
|
||||
|
||||
/**
|
||||
* Special value for SHARD_END.
|
||||
*/
|
||||
public static final ExtendedSequenceNumber SHARD_END =
|
||||
new ExtendedSequenceNumber(SentinelCheckpoint.SHARD_END.toString());
|
||||
/**
|
||||
*
|
||||
* Special value for TRIM_HORIZON.
|
||||
*/
|
||||
public static final ExtendedSequenceNumber TRIM_HORIZON =
|
||||
new ExtendedSequenceNumber(SentinelCheckpoint.TRIM_HORIZON.toString());
|
||||
|
||||
/**
|
||||
* Construct an ExtendedSequenceNumber. The sub-sequence number defaults to
|
||||
* 0.
|
||||
*
|
||||
* @param sequenceNumber
|
||||
* Sequence number of the Kinesis record
|
||||
*/
|
||||
public ExtendedSequenceNumber(String sequenceNumber) {
|
||||
this(sequenceNumber, 0L);
|
||||
}
|
||||
|
||||
/**
|
||||
* Construct an ExtendedSequenceNumber.
|
||||
*
|
||||
* @param sequenceNumber
|
||||
* Sequence number of the Kinesis record
|
||||
* @param subSequenceNumber
|
||||
* Sub-sequence number of the user record within the Kinesis
|
||||
* record
|
||||
*/
|
||||
public ExtendedSequenceNumber(String sequenceNumber, Long subSequenceNumber) {
|
||||
this.sequenceNumber = sequenceNumber;
|
||||
this.subSequenceNumber = subSequenceNumber == null ? 0 : subSequenceNumber.longValue();
|
||||
}
|
||||
|
||||
/**
|
||||
* Compares this with another ExtendedSequenceNumber using these rules.
|
||||
*
|
||||
* SHARD_END is considered greatest
|
||||
* TRIM_HORIZON and LATEST are considered less than sequence numbers
|
||||
* sequence numbers are given their big integer value
|
||||
*
|
||||
* @param extendedSequenceNumber The ExtendedSequenceNumber to compare against
|
||||
* @return returns negative/0/positive if this is less than/equal to/greater than extendedSequenceNumber
|
||||
*/
|
||||
@Override
|
||||
public int compareTo(ExtendedSequenceNumber extendedSequenceNumber) {
|
||||
String secondSequenceNumber = extendedSequenceNumber.getSequenceNumber();
|
||||
|
||||
if (!isDigitsOrSentinelValue(sequenceNumber) || !isDigitsOrSentinelValue(secondSequenceNumber)) {
|
||||
throw new IllegalArgumentException("Expected a sequence number or a sentinel checkpoint value but "
|
||||
+ "received: first=" + sequenceNumber + " and second=" + secondSequenceNumber);
|
||||
}
|
||||
|
||||
// SHARD_END is the greatest
|
||||
if (SentinelCheckpoint.SHARD_END.toString().equals(sequenceNumber)
|
||||
&& SentinelCheckpoint.SHARD_END.toString().equals(secondSequenceNumber)) {
|
||||
return 0;
|
||||
} else if (SentinelCheckpoint.SHARD_END.toString().equals(secondSequenceNumber)) {
|
||||
return -1;
|
||||
} else if (SentinelCheckpoint.SHARD_END.toString().equals(sequenceNumber)) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
// Compare other sentinel values and serial numbers after converting them to a big integer value
|
||||
int result = bigIntegerValue(sequenceNumber).compareTo(bigIntegerValue(secondSequenceNumber));
|
||||
return result == 0 ? Long.compare(subSequenceNumber, extendedSequenceNumber.subSequenceNumber) : result;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @return The sequence number of the Kinesis record.
|
||||
*/
|
||||
public String getSequenceNumber() {
|
||||
return sequenceNumber;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @return The sub-sequence number of the user record within the enclosing
|
||||
* Kinesis record.
|
||||
*/
|
||||
public long getSubSequenceNumber() {
|
||||
return subSequenceNumber;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("{");
|
||||
if (getSequenceNumber() != null) {
|
||||
sb.append("SequenceNumber: " + getSequenceNumber() + ",");
|
||||
}
|
||||
if (subSequenceNumber >= 0) {
|
||||
sb.append("SubsequenceNumber: " + getSubSequenceNumber());
|
||||
}
|
||||
sb.append("}");
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
final int prime = 31;
|
||||
final int shift = 32;
|
||||
int hashCode = 1;
|
||||
hashCode = prime * hashCode + ((sequenceNumber == null) ? 0 : sequenceNumber.hashCode());
|
||||
hashCode = prime * hashCode + ((subSequenceNumber < 0)
|
||||
? 0
|
||||
: (int) (subSequenceNumber ^ (subSequenceNumber >>> shift)));
|
||||
return hashCode;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj) {
|
||||
return true;
|
||||
}
|
||||
if (obj == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!(obj instanceof ExtendedSequenceNumber)) {
|
||||
return false;
|
||||
}
|
||||
ExtendedSequenceNumber other = (ExtendedSequenceNumber) obj;
|
||||
|
||||
if (!sequenceNumber.equals(other.getSequenceNumber())) {
|
||||
return false;
|
||||
}
|
||||
return subSequenceNumber == other.getSubSequenceNumber();
|
||||
}
|
||||
|
||||
/**
|
||||
* Sequence numbers are converted, sentinels are given a value of -1. Note this method is only used after special
|
||||
* logic associated with SHARD_END and the case of comparing two sentinel values has already passed, so we map
|
||||
* sentinel values LATEST and TRIM_HORIZON to negative numbers so that they are considered less than sequence
|
||||
* numbers.
|
||||
*
|
||||
* @param sequenceNumber The string to convert to big integer value
|
||||
* @return a BigInteger value representation of the sequenceNumber
|
||||
*/
|
||||
private static BigInteger bigIntegerValue(String sequenceNumber) {
|
||||
if (isDigits(sequenceNumber)) {
|
||||
return new BigInteger(sequenceNumber);
|
||||
} else if (SentinelCheckpoint.LATEST.toString().equals(sequenceNumber)) {
|
||||
return LATEST_BIG_INTEGER_VALUE;
|
||||
} else if (SentinelCheckpoint.TRIM_HORIZON.toString().equals(sequenceNumber)) {
|
||||
return TRIM_HORIZON_BIG_INTEGER_VALUE;
|
||||
} else {
|
||||
throw new IllegalArgumentException("Expected a string of digits, TRIM_HORIZON, or LATEST but received "
|
||||
+ sequenceNumber);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if the string is all digits or one of the SentinelCheckpoint values.
|
||||
*
|
||||
* @param string
|
||||
* @return true if and only if the string is all digits or one of the SentinelCheckpoint values
|
||||
*/
|
||||
private static boolean isDigitsOrSentinelValue(String string) {
|
||||
return isDigits(string) || isSentinelValue(string);
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if the string is a SentinelCheckpoint value.
|
||||
*
|
||||
* @param string
|
||||
* @return true if and only if the string can be converted to a SentinelCheckpoint
|
||||
*/
|
||||
private static boolean isSentinelValue(String string) {
|
||||
try {
|
||||
SentinelCheckpoint.valueOf(string);
|
||||
return true;
|
||||
} catch (Exception e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if the string is composed of only digits.
|
||||
*
|
||||
* @param string
|
||||
* @return true for a string of all digits, false otherwise (including false for null and empty string)
|
||||
*/
|
||||
private static boolean isDigits(String string) {
|
||||
if (string == null || string.length() == 0) {
|
||||
return false;
|
||||
}
|
||||
for (int i = 0; i < string.length(); ++i) {
|
||||
if (!Character.isDigit(string.charAt(i))) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,74 @@
|
|||
/*
|
||||
* Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.clientlibrary.types;
|
||||
|
||||
/**
|
||||
* Container for the parameters to the IRecordProcessor's
|
||||
* {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor#initialize(InitializationInput
|
||||
* initializationInput) initialize} method.
|
||||
*/
|
||||
public class InitializationInput {
|
||||
|
||||
private String shardId;
|
||||
private ExtendedSequenceNumber extendedSequenceNumber;
|
||||
|
||||
/**
|
||||
* Default constructor.
|
||||
*/
|
||||
public InitializationInput() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Get shard Id.
|
||||
*
|
||||
* @return The record processor will be responsible for processing records of this shard.
|
||||
*/
|
||||
public String getShardId() {
|
||||
return shardId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set shard Id.
|
||||
*
|
||||
* @param shardId The record processor will be responsible for processing records of this shard.
|
||||
* @return A reference to this updated object so that method calls can be chained together.
|
||||
*/
|
||||
public InitializationInput withShardId(String shardId) {
|
||||
this.shardId = shardId;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get starting {@link ExtendedSequenceNumber}.
|
||||
*
|
||||
* @return The {@link ExtendedSequenceNumber} in the shard from which records will be delivered to this
|
||||
* record processor.
|
||||
*/
|
||||
public ExtendedSequenceNumber getExtendedSequenceNumber() {
|
||||
return extendedSequenceNumber;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set starting {@link ExtendedSequenceNumber}.
|
||||
*
|
||||
* @param extendedSequenceNumber The {@link ExtendedSequenceNumber} in the shard from which records will be
|
||||
* delivered to this record processor.
|
||||
* @return A reference to this updated object so that method calls can be chained together.
|
||||
*/
|
||||
public InitializationInput withExtendedSequenceNumber(ExtendedSequenceNumber extendedSequenceNumber) {
|
||||
this.extendedSequenceNumber = extendedSequenceNumber;
|
||||
return this;
|
||||
}
|
||||
}
|
||||
File diff suppressed because it is too large
Load diff
|
|
@ -0,0 +1,76 @@
|
|||
/*
|
||||
* Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.clientlibrary.types;
|
||||
|
||||
import java.util.List;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
|
||||
import com.amazonaws.services.kinesis.model.Record;
|
||||
|
||||
/**
|
||||
* Container for the parameters to the IRecordProcessor's
|
||||
* {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor#processRecords(
|
||||
* ProcessRecordsInput processRecordsInput) processRecords} method.
|
||||
*/
|
||||
public class ProcessRecordsInput {
|
||||
|
||||
private List<Record> records;
|
||||
private IRecordProcessorCheckpointer checkpointer;
|
||||
|
||||
/**
|
||||
* Default constructor.
|
||||
*/
|
||||
public ProcessRecordsInput() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Get records.
|
||||
*
|
||||
* @return Data records to be processed
|
||||
*/
|
||||
public List<Record> getRecords() {
|
||||
return records;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set records.
|
||||
*
|
||||
* @param records Data records to be processed
|
||||
* @return A reference to this updated object so that method calls can be chained together.
|
||||
*/
|
||||
public ProcessRecordsInput withRecords(List<Record> records) {
|
||||
this.records = records;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get Checkpointer.
|
||||
*
|
||||
* @return RecordProcessor should use this instance to checkpoint their progress.
|
||||
*/
|
||||
public IRecordProcessorCheckpointer getCheckpointer() {
|
||||
return checkpointer;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set Checkpointer.
|
||||
*
|
||||
* @param checkpointer RecordProcessor should use this instance to checkpoint their progress.
|
||||
* @return A reference to this updated object so that method calls can be chained together.
|
||||
*/
|
||||
public ProcessRecordsInput withCheckpointer(IRecordProcessorCheckpointer checkpointer) {
|
||||
this.checkpointer = checkpointer;
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,76 @@
|
|||
/*
|
||||
* Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.clientlibrary.types;
|
||||
|
||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
|
||||
|
||||
/**
|
||||
* Container for the parameters to the IRecordProcessor's
|
||||
* {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor#shutdown(ShutdownInput
|
||||
* shutdownInput) shutdown} method.
|
||||
*/
|
||||
public class ShutdownInput {
|
||||
|
||||
private ShutdownReason shutdownReason;
|
||||
private IRecordProcessorCheckpointer checkpointer;
|
||||
|
||||
/**
|
||||
* Default constructor.
|
||||
*/
|
||||
public ShutdownInput() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Get shutdown reason.
|
||||
*
|
||||
* @return Reason for the shutdown (ShutdownReason.TERMINATE indicates the shard is closed and there are no
|
||||
* more records to process. Shutdown.ZOMBIE indicates a fail over has occurred).
|
||||
*/
|
||||
public ShutdownReason getShutdownReason() {
|
||||
return shutdownReason;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set shutdown reason.
|
||||
*
|
||||
* @param shutdownReason Reason for the shutdown
|
||||
* @return A reference to this updated object so that method calls can be chained together.
|
||||
*/
|
||||
public ShutdownInput withShutdownReason(ShutdownReason shutdownReason) {
|
||||
this.shutdownReason = shutdownReason;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get Checkpointer.
|
||||
*
|
||||
* @return The checkpointer object that the record processor should use to checkpoint
|
||||
*/
|
||||
public IRecordProcessorCheckpointer getCheckpointer() {
|
||||
return checkpointer;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the checkpointer.
|
||||
*
|
||||
* @param checkpointer The checkpointer object that the record processor should use to checkpoint
|
||||
* @return A reference to this updated object so that method calls can be chained together.
|
||||
*/
|
||||
public ShutdownInput withCheckpointer(IRecordProcessorCheckpointer checkpointer) {
|
||||
this.checkpointer = checkpointer;
|
||||
return this;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,298 @@
|
|||
/*
|
||||
* Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.clientlibrary.types;
|
||||
|
||||
import java.math.BigInteger;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.security.MessageDigest;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import com.amazonaws.services.kinesis.model.Record;
|
||||
import com.google.protobuf.InvalidProtocolBufferException;
|
||||
|
||||
/**
|
||||
* This class represents a KPL user record.
|
||||
*/
|
||||
@SuppressWarnings("serial")
|
||||
public class UserRecord extends Record {
|
||||
private static final Log LOG = LogFactory.getLog(UserRecord.class);
|
||||
|
||||
private static final byte[] AGGREGATED_RECORD_MAGIC = new byte[] {-13, -119, -102, -62 };
|
||||
private static final int DIGEST_SIZE = 16;
|
||||
private static final BigInteger SMALLEST_HASH_KEY = new BigInteger("0");
|
||||
// largest hash key = 2^128-1
|
||||
private static final BigInteger LARGEST_HASH_KEY = new BigInteger(StringUtils.repeat("FF", 16), 16);
|
||||
|
||||
private final long subSequenceNumber;
|
||||
private final String explicitHashKey;
|
||||
private final boolean aggregated;
|
||||
|
||||
/**
|
||||
* Create a User Record from a Kinesis Record.
|
||||
*
|
||||
* @param record Kinesis record
|
||||
*/
|
||||
public UserRecord(Record record) {
|
||||
this(false, record, 0, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a User Record.
|
||||
*
|
||||
* @param aggregated whether the record is aggregated
|
||||
* @param record Kinesis record
|
||||
* @param subSequenceNumber subsequence number
|
||||
* @param explicitHashKey explicit hash key
|
||||
*/
|
||||
protected UserRecord(boolean aggregated, Record record, long subSequenceNumber, String explicitHashKey) {
|
||||
if (subSequenceNumber < 0) {
|
||||
throw new IllegalArgumentException("Cannot have an invalid, negative subsequence number");
|
||||
}
|
||||
|
||||
this.aggregated = aggregated;
|
||||
this.subSequenceNumber = subSequenceNumber;
|
||||
this.explicitHashKey = explicitHashKey;
|
||||
|
||||
this.setSequenceNumber(record.getSequenceNumber());
|
||||
this.setData(record.getData());
|
||||
this.setPartitionKey(record.getPartitionKey());
|
||||
}
|
||||
|
||||
/**
|
||||
* @return subSequenceNumber of this UserRecord.
|
||||
*/
|
||||
public long getSubSequenceNumber() {
|
||||
return subSequenceNumber;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return explicitHashKey of this UserRecord.
|
||||
*/
|
||||
public String getExplicitHashKey() {
|
||||
return explicitHashKey;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return a boolean indicating whether this UserRecord is aggregated.
|
||||
*/
|
||||
public boolean isAggregated() {
|
||||
return aggregated;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the String representation of this UserRecord.
|
||||
*/
|
||||
@Override
|
||||
public String toString() {
|
||||
return "UserRecord [subSequenceNumber=" + subSequenceNumber + ", explicitHashKey=" + explicitHashKey
|
||||
+ ", aggregated=" + aggregated + ", getSequenceNumber()=" + getSequenceNumber() + ", getData()="
|
||||
+ getData() + ", getPartitionKey()=" + getPartitionKey() + "]";
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public int hashCode() {
|
||||
final int prime = 31;
|
||||
int result = super.hashCode();
|
||||
result = prime * result + (aggregated ? 1231 : 1237);
|
||||
result = prime * result + ((explicitHashKey == null) ? 0 : explicitHashKey.hashCode());
|
||||
result = prime * result + (int) (subSequenceNumber ^ (subSequenceNumber >>> 32));
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj) {
|
||||
return true;
|
||||
}
|
||||
if (!super.equals(obj)) {
|
||||
return false;
|
||||
}
|
||||
if (getClass() != obj.getClass()) {
|
||||
return false;
|
||||
}
|
||||
UserRecord other = (UserRecord) obj;
|
||||
if (aggregated != other.aggregated) {
|
||||
return false;
|
||||
}
|
||||
if (explicitHashKey == null) {
|
||||
if (other.explicitHashKey != null) {
|
||||
return false;
|
||||
}
|
||||
} else if (!explicitHashKey.equals(other.explicitHashKey)) {
|
||||
return false;
|
||||
}
|
||||
if (subSequenceNumber != other.subSequenceNumber) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private static byte[] md5(byte[] data) {
|
||||
try {
|
||||
MessageDigest d = MessageDigest.getInstance("MD5");
|
||||
return d.digest(data);
|
||||
} catch (NoSuchAlgorithmException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This method deaggregates the given list of Amazon Kinesis records into a
|
||||
* list of KPL user records. This method will then return the resulting list
|
||||
* of KPL user records.
|
||||
*
|
||||
* @param records
|
||||
* A list of Amazon Kinesis records, each possibly aggregated.
|
||||
* @return A resulting list of deaggregated KPL user records.
|
||||
*/
|
||||
public static List<UserRecord> deaggregate(List<Record> records) {
|
||||
return deaggregate(records, SMALLEST_HASH_KEY, LARGEST_HASH_KEY);
|
||||
}
|
||||
|
||||
/**
|
||||
* This method deaggregates the given list of Amazon Kinesis records into a
|
||||
* list of KPL user records. Any KPL user records whose explicit hash key or
|
||||
* partition key falls outside the range of the startingHashKey and the
|
||||
* endingHashKey are discarded from the resulting list. This method will
|
||||
* then return the resulting list of KPL user records.
|
||||
*
|
||||
* @param records
|
||||
* A list of Amazon Kinesis records, each possibly aggregated.
|
||||
* @param startingHashKey
|
||||
* A BigInteger representing the starting hash key that the
|
||||
* explicit hash keys or partition keys of retained resulting KPL
|
||||
* user records must be greater than or equal to.
|
||||
* @param endingHashKey
|
||||
* A BigInteger representing the ending hash key that the the
|
||||
* explicit hash keys or partition keys of retained resulting KPL
|
||||
* user records must be smaller than or equal to.
|
||||
* @return A resulting list of KPL user records whose explicit hash keys or
|
||||
* partition keys fall within the range of the startingHashKey and
|
||||
* the endingHashKey.
|
||||
*/
|
||||
public static List<UserRecord> deaggregate(List<Record> records, BigInteger startingHashKey,
|
||||
BigInteger endingHashKey) {
|
||||
List<UserRecord> result = new ArrayList<>();
|
||||
byte[] magic = new byte[AGGREGATED_RECORD_MAGIC.length];
|
||||
byte[] digest = new byte[DIGEST_SIZE];
|
||||
|
||||
for (Record r : records) {
|
||||
boolean isAggregated = true;
|
||||
long subSeqNum = 0;
|
||||
ByteBuffer bb = r.getData();
|
||||
|
||||
if (bb.remaining() >= magic.length) {
|
||||
bb.get(magic);
|
||||
} else {
|
||||
isAggregated = false;
|
||||
}
|
||||
|
||||
if (!Arrays.equals(AGGREGATED_RECORD_MAGIC, magic) || bb.remaining() <= DIGEST_SIZE) {
|
||||
isAggregated = false;
|
||||
}
|
||||
|
||||
if (isAggregated) {
|
||||
int oldLimit = bb.limit();
|
||||
bb.limit(oldLimit - DIGEST_SIZE);
|
||||
byte[] messageData = new byte[bb.remaining()];
|
||||
bb.get(messageData);
|
||||
bb.limit(oldLimit);
|
||||
bb.get(digest);
|
||||
byte[] calculatedDigest = md5(messageData);
|
||||
|
||||
if (!Arrays.equals(digest, calculatedDigest)) {
|
||||
isAggregated = false;
|
||||
} else {
|
||||
try {
|
||||
Messages.AggregatedRecord ar = Messages.AggregatedRecord.parseFrom(messageData);
|
||||
List<String> pks = ar.getPartitionKeyTableList();
|
||||
List<String> ehks = ar.getExplicitHashKeyTableList();
|
||||
try {
|
||||
int recordsInCurrRecord = 0;
|
||||
for (Messages.Record mr : ar.getRecordsList()) {
|
||||
String explicitHashKey = null;
|
||||
String partitionKey = pks.get((int) mr.getPartitionKeyIndex());
|
||||
if (mr.hasExplicitHashKeyIndex()) {
|
||||
explicitHashKey = ehks.get((int) mr.getExplicitHashKeyIndex());
|
||||
}
|
||||
|
||||
BigInteger effectiveHashKey = explicitHashKey != null
|
||||
? new BigInteger(explicitHashKey)
|
||||
: new BigInteger(1, md5(partitionKey.getBytes("UTF-8")));
|
||||
|
||||
if (effectiveHashKey.compareTo(startingHashKey) < 0
|
||||
|| effectiveHashKey.compareTo(endingHashKey) > 0) {
|
||||
for (int toRemove = 0; toRemove < recordsInCurrRecord; ++toRemove) {
|
||||
result.remove(result.size() - 1);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
++recordsInCurrRecord;
|
||||
Record record = new Record()
|
||||
.withData(ByteBuffer.wrap(mr.getData().toByteArray()))
|
||||
.withPartitionKey(partitionKey)
|
||||
.withSequenceNumber(r.getSequenceNumber());
|
||||
result.add(new UserRecord(true, record, subSeqNum++, explicitHashKey));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("Unexpected exception during deaggregation, record was:\n");
|
||||
sb.append("PKS:\n");
|
||||
for (String s : pks) {
|
||||
sb.append(s).append("\n");
|
||||
}
|
||||
sb.append("EHKS: \n");
|
||||
for (String s : ehks) {
|
||||
sb.append(s).append("\n");
|
||||
}
|
||||
for (Messages.Record mr : ar.getRecordsList()) {
|
||||
sb.append("Record: [hasEhk=").append(mr.hasExplicitHashKeyIndex()).append(", ")
|
||||
.append("ehkIdx=").append(mr.getExplicitHashKeyIndex()).append(", ")
|
||||
.append("pkIdx=").append(mr.getPartitionKeyIndex()).append(", ")
|
||||
.append("dataLen=").append(mr.getData().toByteArray().length).append("]\n");
|
||||
}
|
||||
sb.append("Sequence number: ").append(r.getSequenceNumber()).append("\n")
|
||||
.append("Raw data: ")
|
||||
.append(javax.xml.bind.DatatypeConverter.printBase64Binary(messageData)).append("\n");
|
||||
LOG.error(sb.toString(), e);
|
||||
}
|
||||
} catch (InvalidProtocolBufferException e) {
|
||||
isAggregated = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!isAggregated) {
|
||||
bb.rewind();
|
||||
result.add(new UserRecord(r));
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
|
@ -15,15 +15,18 @@
|
|||
package com.amazonaws.services.kinesis.leases.impl;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
|
||||
|
||||
/**
|
||||
* A Lease subclass containing KinesisClientLibrary related fields for checkpoints.
|
||||
*/
|
||||
public class KinesisClientLease extends Lease {
|
||||
|
||||
private String checkpoint;
|
||||
private ExtendedSequenceNumber checkpoint;
|
||||
private Long ownerSwitchesSinceCheckpoint = 0L;
|
||||
private Set<String> parentShardIds = new HashSet<String>();
|
||||
|
||||
|
|
@ -58,7 +61,7 @@ public class KinesisClientLease extends Lease {
|
|||
* @return most recently application-supplied checkpoint value. During fail over, the new worker will pick up after
|
||||
* the old worker's last checkpoint.
|
||||
*/
|
||||
public String getCheckpoint() {
|
||||
public ExtendedSequenceNumber getCheckpoint() {
|
||||
return checkpoint;
|
||||
}
|
||||
|
||||
|
|
@ -81,7 +84,7 @@ public class KinesisClientLease extends Lease {
|
|||
*
|
||||
* @param checkpoint may not be null
|
||||
*/
|
||||
public void setCheckpoint(String checkpoint) {
|
||||
public void setCheckpoint(ExtendedSequenceNumber checkpoint) {
|
||||
verifyNotNull(checkpoint, "Checkpoint should not be null");
|
||||
|
||||
this.checkpoint = checkpoint;
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
|
||||
import com.amazonaws.services.kinesis.leases.exceptions.DependencyException;
|
||||
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
|
||||
import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException;
|
||||
|
|
@ -74,9 +75,9 @@ public class KinesisClientLeaseManager extends LeaseManager<KinesisClientLease>
|
|||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public String getCheckpoint(String shardId)
|
||||
public ExtendedSequenceNumber getCheckpoint(String shardId)
|
||||
throws ProvisionedThroughputException, InvalidStateException, DependencyException {
|
||||
String checkpoint = null;
|
||||
ExtendedSequenceNumber checkpoint = null;
|
||||
KinesisClientLease lease = getLease(shardId);
|
||||
if (lease != null) {
|
||||
checkpoint = lease.getCheckpoint();
|
||||
|
|
|
|||
|
|
@ -14,6 +14,7 @@
|
|||
*/
|
||||
package com.amazonaws.services.kinesis.leases.impl;
|
||||
|
||||
import java.lang.Long;
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
|
||||
|
|
@ -23,6 +24,7 @@ import com.amazonaws.services.dynamodbv2.model.AttributeValue;
|
|||
import com.amazonaws.services.dynamodbv2.model.AttributeValueUpdate;
|
||||
import com.amazonaws.services.dynamodbv2.model.ExpectedAttributeValue;
|
||||
import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
|
||||
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseSerializer;
|
||||
import com.amazonaws.services.kinesis.leases.util.DynamoUtils;
|
||||
|
||||
|
|
@ -32,7 +34,8 @@ import com.amazonaws.services.kinesis.leases.util.DynamoUtils;
|
|||
public class KinesisClientLeaseSerializer implements ILeaseSerializer<KinesisClientLease> {
|
||||
|
||||
private static final String OWNER_SWITCHES_KEY = "ownerSwitchesSinceCheckpoint";
|
||||
private static final String CHECKPOINT_KEY = "checkpoint";
|
||||
private static final String CHECKPOINT_SEQUENCE_NUMBER_KEY = "checkpoint";
|
||||
private static final String CHECKPOINT_SUBSEQUENCE_NUMBER_KEY = "checkpointSubSequenceNumber";
|
||||
public final String PARENT_SHARD_ID_KEY = "parentShardId";
|
||||
|
||||
private final LeaseSerializer baseSerializer = new LeaseSerializer(KinesisClientLease.class);
|
||||
|
|
@ -42,7 +45,8 @@ public class KinesisClientLeaseSerializer implements ILeaseSerializer<KinesisCli
|
|||
Map<String, AttributeValue> result = baseSerializer.toDynamoRecord(lease);
|
||||
|
||||
result.put(OWNER_SWITCHES_KEY, DynamoUtils.createAttributeValue(lease.getOwnerSwitchesSinceCheckpoint()));
|
||||
result.put(CHECKPOINT_KEY, DynamoUtils.createAttributeValue(lease.getCheckpoint()));
|
||||
result.put(CHECKPOINT_SEQUENCE_NUMBER_KEY, DynamoUtils.createAttributeValue(lease.getCheckpoint().getSequenceNumber()));
|
||||
result.put(CHECKPOINT_SUBSEQUENCE_NUMBER_KEY, DynamoUtils.createAttributeValue(lease.getCheckpoint().getSubSequenceNumber()));
|
||||
if (lease.getParentShardIds() != null && !lease.getParentShardIds().isEmpty()) {
|
||||
result.put(PARENT_SHARD_ID_KEY, DynamoUtils.createAttributeValue(lease.getParentShardIds()));
|
||||
}
|
||||
|
|
@ -55,7 +59,11 @@ public class KinesisClientLeaseSerializer implements ILeaseSerializer<KinesisCli
|
|||
KinesisClientLease result = (KinesisClientLease) baseSerializer.fromDynamoRecord(dynamoRecord);
|
||||
|
||||
result.setOwnerSwitchesSinceCheckpoint(DynamoUtils.safeGetLong(dynamoRecord, OWNER_SWITCHES_KEY));
|
||||
result.setCheckpoint(DynamoUtils.safeGetString(dynamoRecord, CHECKPOINT_KEY));
|
||||
result.setCheckpoint(
|
||||
new ExtendedSequenceNumber(
|
||||
DynamoUtils.safeGetString(dynamoRecord, CHECKPOINT_SEQUENCE_NUMBER_KEY),
|
||||
DynamoUtils.safeGetLong(dynamoRecord, CHECKPOINT_SUBSEQUENCE_NUMBER_KEY))
|
||||
);
|
||||
result.setParentShardIds(DynamoUtils.safeGetSS(dynamoRecord, PARENT_SHARD_ID_KEY));
|
||||
|
||||
return result;
|
||||
|
|
@ -113,7 +121,9 @@ public class KinesisClientLeaseSerializer implements ILeaseSerializer<KinesisCli
|
|||
public Map<String, AttributeValueUpdate> getDynamoUpdateLeaseUpdate(KinesisClientLease lease) {
|
||||
Map<String, AttributeValueUpdate> result = baseSerializer.getDynamoUpdateLeaseUpdate(lease);
|
||||
|
||||
result.put(CHECKPOINT_KEY, new AttributeValueUpdate(DynamoUtils.createAttributeValue(lease.getCheckpoint()),
|
||||
result.put(CHECKPOINT_SEQUENCE_NUMBER_KEY, new AttributeValueUpdate(DynamoUtils.createAttributeValue(lease.getCheckpoint().getSequenceNumber()),
|
||||
AttributeAction.PUT));
|
||||
result.put(CHECKPOINT_SUBSEQUENCE_NUMBER_KEY, new AttributeValueUpdate(DynamoUtils.createAttributeValue(lease.getCheckpoint().getSubSequenceNumber()),
|
||||
AttributeAction.PUT));
|
||||
result.put(OWNER_SWITCHES_KEY,
|
||||
new AttributeValueUpdate(DynamoUtils.createAttributeValue(lease.getOwnerSwitchesSinceCheckpoint()),
|
||||
|
|
|
|||
|
|
@ -14,6 +14,7 @@
|
|||
*/
|
||||
package com.amazonaws.services.kinesis.leases.interfaces;
|
||||
|
||||
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
|
||||
import com.amazonaws.services.kinesis.leases.exceptions.DependencyException;
|
||||
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
|
||||
import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException;
|
||||
|
|
@ -35,7 +36,7 @@ public interface IKinesisClientLeaseManager extends ILeaseManager<KinesisClientL
|
|||
* @throws InvalidStateException if lease table does not exist
|
||||
* @throws DependencyException if DynamoDB update fails in an unexpected way
|
||||
*/
|
||||
public abstract String getCheckpoint(String shardId)
|
||||
public abstract ExtendedSequenceNumber getCheckpoint(String shardId)
|
||||
throws ProvisionedThroughputException, InvalidStateException, DependencyException;
|
||||
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue