diff --git a/src/clientlibrary/config/config.go b/src/clientlibrary/config/config.go
index 8f3322c..eef22b6 100644
--- a/src/clientlibrary/config/config.go
+++ b/src/clientlibrary/config/config.go
@@ -5,17 +5,17 @@ import (
"math"
"strings"
"time"
+
+ "github.com/aws/aws-sdk-go/aws"
)
const (
- EPSILON_MS = 25
-
// LATEST start after the most recent data record (fetch new data).
- LATEST = InitialPositionInStream(1)
+ LATEST InitialPositionInStream = iota + 1
// TRIM_HORIZON start from the oldest available data record
- TRIM_HORIZON = LATEST + 1
+ TRIM_HORIZON
// AT_TIMESTAMP start from the record at or after the specified server-side Timestamp.
- AT_TIMESTAMP = TRIM_HORIZON + 1
+ AT_TIMESTAMP
// The location in the shard from which the KinesisClientLibrary will start fetching records from
// when the application starts for the first time and there is no checkpoint for the shard.
@@ -119,6 +119,7 @@ type (
}
// Configuration for the Kinesis Client Library.
+ // Note: There is no need to configure credential provider. Credential can be get from InstanceProfile.
KinesisClientLibConfiguration struct {
// ApplicationName is name of application. Kinesis allows multiple applications to consume the same stream.
ApplicationName string
@@ -132,12 +133,6 @@ type (
// WorkerID used to distinguish different workers/processes of a Kinesis application
WorkerID string
- // KinesisEndpoint endpoint
- KinesisEndpoint string
-
- // DynamoDB endpoint
- DynamoDBEndpoint string
-
// InitialPositionInStream specifies the Position in the stream where a new application should start from
InitialPositionInStream InitialPositionInStream
@@ -209,12 +204,19 @@ type (
// Worker should skip syncing shards and leases at startup if leases are present
// This is useful for optimizing deployments to large fleets working on a stable stream.
SkipShardSyncAtWorkerInitializationIfLeasesExist bool
-
- // The max number of threads in the worker thread pool to getRecords.
- WorkerThreadPoolSize int
}
)
+var positionMap = map[InitialPositionInStream]*string{
+ LATEST: aws.String("LATEST"),
+ TRIM_HORIZON: aws.String("TRIM_HORIZON"),
+ AT_TIMESTAMP: aws.String("AT_TIMESTAMP"),
+}
+
+func InitalPositionInStreamToShardIteratorType(pos InitialPositionInStream) *string {
+ return positionMap[pos]
+}
+
func empty(s string) bool {
return len(strings.TrimSpace(s)) == 0
}
diff --git a/src/clientlibrary/config/config_test.go b/src/clientlibrary/config/config_test.go
index 7d72137..88bc75d 100644
--- a/src/clientlibrary/config/config_test.go
+++ b/src/clientlibrary/config/config_test.go
@@ -7,7 +7,7 @@ import (
)
func TestConfig(t *testing.T) {
- kclConfig := NewKinesisClientLibConfig("appName", "StreamName", "workerId").
+ kclConfig := NewKinesisClientLibConfig("appName", "StreamName", "us-west-2", "workerId").
WithFailoverTimeMillis(500).
WithMaxRecords(100).
WithInitialPositionInStream(TRIM_HORIZON).
@@ -15,8 +15,7 @@ func TestConfig(t *testing.T) {
WithCallProcessRecordsEvenForEmptyRecordList(true).
WithTaskBackoffTimeMillis(10).
WithMetricsBufferTimeMillis(500).
- WithMetricsMaxQueueSize(200).
- WithRegionName("us-west-2")
+ WithMetricsMaxQueueSize(200)
assert.Equal(t, "appName", kclConfig.ApplicationName)
assert.Equal(t, 500, kclConfig.FailoverTimeMillis)
diff --git a/src/clientlibrary/config/kcl-config.go b/src/clientlibrary/config/kcl-config.go
index edc63db..bfba4aa 100644
--- a/src/clientlibrary/config/kcl-config.go
+++ b/src/clientlibrary/config/kcl-config.go
@@ -6,10 +6,10 @@ import (
)
// NewKinesisClientLibConfig to create a default KinesisClientLibConfiguration based on the required fields.
-func NewKinesisClientLibConfig(applicationName, streamName, workerID string) *KinesisClientLibConfiguration {
+func NewKinesisClientLibConfig(applicationName, streamName, regionName, workerID string) *KinesisClientLibConfiguration {
checkIsValueNotEmpty("ApplicationName", applicationName)
checkIsValueNotEmpty("StreamName", streamName)
- checkIsValueNotEmpty("ApplicationName", applicationName)
+ checkIsValueNotEmpty("RegionName", regionName)
if empty(workerID) {
workerID = utils.MustNewUUID()
@@ -17,32 +17,30 @@ func NewKinesisClientLibConfig(applicationName, streamName, workerID string) *Ki
// populate the KCL configuration with default values
return &KinesisClientLibConfiguration{
- ApplicationName: applicationName,
- TableName: applicationName,
- StreamName: streamName,
- WorkerID: workerID,
- KinesisEndpoint: "",
- InitialPositionInStream: DEFAULT_INITIAL_POSITION_IN_STREAM,
- InitialPositionInStreamExtended: *newInitialPosition(DEFAULT_INITIAL_POSITION_IN_STREAM),
- FailoverTimeMillis: DEFAULT_FAILOVER_TIME_MILLIS,
- MaxRecords: DEFAULT_MAX_RECORDS,
- IdleTimeBetweenReadsInMillis: DEFAULT_IDLETIME_BETWEEN_READS_MILLIS,
- CallProcessRecordsEvenForEmptyRecordList: DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST,
- ParentShardPollIntervalMillis: DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS,
- ShardSyncIntervalMillis: DEFAULT_SHARD_SYNC_INTERVAL_MILLIS,
- CleanupTerminatedShardsBeforeExpiry: DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION,
- TaskBackoffTimeMillis: DEFAULT_TASK_BACKOFF_TIME_MILLIS,
- MetricsBufferTimeMillis: DEFAULT_METRICS_BUFFER_TIME_MILLIS,
- MetricsMaxQueueSize: DEFAULT_METRICS_MAX_QUEUE_SIZE,
- ValidateSequenceNumberBeforeCheckpointing: DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING,
- RegionName: "",
+ ApplicationName: applicationName,
+ TableName: applicationName,
+ StreamName: streamName,
+ RegionName: regionName,
+ WorkerID: workerID,
+ InitialPositionInStream: DEFAULT_INITIAL_POSITION_IN_STREAM,
+ InitialPositionInStreamExtended: *newInitialPosition(DEFAULT_INITIAL_POSITION_IN_STREAM),
+ FailoverTimeMillis: DEFAULT_FAILOVER_TIME_MILLIS,
+ MaxRecords: DEFAULT_MAX_RECORDS,
+ IdleTimeBetweenReadsInMillis: DEFAULT_IDLETIME_BETWEEN_READS_MILLIS,
+ CallProcessRecordsEvenForEmptyRecordList: DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST,
+ ParentShardPollIntervalMillis: DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS,
+ ShardSyncIntervalMillis: DEFAULT_SHARD_SYNC_INTERVAL_MILLIS,
+ CleanupTerminatedShardsBeforeExpiry: DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION,
+ TaskBackoffTimeMillis: DEFAULT_TASK_BACKOFF_TIME_MILLIS,
+ MetricsBufferTimeMillis: DEFAULT_METRICS_BUFFER_TIME_MILLIS,
+ MetricsMaxQueueSize: DEFAULT_METRICS_MAX_QUEUE_SIZE,
+ ValidateSequenceNumberBeforeCheckpointing: DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING,
ShutdownGraceMillis: DEFAULT_SHUTDOWN_GRACE_MILLIS,
MaxLeasesForWorker: DEFAULT_MAX_LEASES_FOR_WORKER,
MaxLeasesToStealAtOneTime: DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME,
InitialLeaseTableReadCapacity: DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY,
InitialLeaseTableWriteCapacity: DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY,
SkipShardSyncAtWorkerInitializationIfLeasesExist: DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
- WorkerThreadPoolSize: 1,
}
}
@@ -52,11 +50,6 @@ func (c *KinesisClientLibConfiguration) WithTableName(tableName string) *Kinesis
return c
}
-func (c *KinesisClientLibConfiguration) WithKinesisEndpoint(kinesisEndpoint string) *KinesisClientLibConfiguration {
- c.KinesisEndpoint = kinesisEndpoint
- return c
-}
-
func (c *KinesisClientLibConfiguration) WithInitialPositionInStream(initialPositionInStream InitialPositionInStream) *KinesisClientLibConfiguration {
c.InitialPositionInStream = initialPositionInStream
c.InitialPositionInStreamExtended = *newInitialPosition(initialPositionInStream)
@@ -87,6 +80,14 @@ func (c *KinesisClientLibConfiguration) WithMaxRecords(maxRecords int) *KinesisC
return c
}
+// WithMaxLeasesForWorker configures maximum lease this worker can handles. It determines how maximun number of shards
+// this worker can handle.
+func (c *KinesisClientLibConfiguration) WithMaxLeasesForWorker(n int) *KinesisClientLibConfiguration {
+ checkIsValuePositive("MaxLeasesForWorker", n)
+ c.MaxLeasesForWorker = n
+ return c
+}
+
/**
* Controls how long the KCL will sleep if no records are returned from Kinesis
*
@@ -137,17 +138,3 @@ func (c *KinesisClientLibConfiguration) WithMetricsMaxQueueSize(metricsMaxQueueS
c.MetricsMaxQueueSize = metricsMaxQueueSize
return c
}
-
-// WithRegionName configures region for the stream
-func (c *KinesisClientLibConfiguration) WithRegionName(regionName string) *KinesisClientLibConfiguration {
- checkIsValueNotEmpty("RegionName", regionName)
- c.RegionName = regionName
- return c
-}
-
-// WithWorkerThreadPoolSize configures worker thread pool size
-func (c *KinesisClientLibConfiguration) WithWorkerThreadPoolSize(n int) *KinesisClientLibConfiguration {
- checkIsValuePositive("WorkerThreadPoolSize", n)
- c.WorkerThreadPoolSize = n
- return c
-}
diff --git a/src/clientlibrary/interfaces/inputs.go b/src/clientlibrary/interfaces/inputs.go
index 1b21999..27590c3 100644
--- a/src/clientlibrary/interfaces/inputs.go
+++ b/src/clientlibrary/interfaces/inputs.go
@@ -7,31 +7,56 @@ import (
)
const (
- REQUESTED = ShutdownReason(1)
- TERMINATE = REQUESTED + 1
- ZOMBIE = TERMINATE + 1
+ /**
+ * Indicates that the entire application is being shutdown, and if desired the record processor will be given a
+ * final chance to checkpoint. This state will not trigger a direct call to
+ * {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor#shutdown(ShutdownInput)}, but
+ * instead depend on a different interface for backward compatibility.
+ */
+ REQUESTED ShutdownReason = iota + 1
+ /**
+ * Terminate processing for this RecordProcessor (resharding use case).
+ * Indicates that the shard is closed and all records from the shard have been delivered to the application.
+ * Applications SHOULD checkpoint their progress to indicate that they have successfully processed all records
+ * from this shard and processing of child shards can be started.
+ */
+ TERMINATE
+ /**
+ * Processing will be moved to a different record processor (fail over, load balancing use cases).
+ * Applications SHOULD NOT checkpoint their progress (as another record processor may have already started
+ * processing data).
+ */
+ ZOMBIE
)
// Containers for the parameters to the IRecordProcessor
type (
+ /**
+ * Reason the RecordProcessor is being shutdown.
+ * Used to distinguish between a fail-over vs. a termination (shard is closed and all records have been delivered).
+ * In case of a fail over, applications should NOT checkpoint as part of shutdown,
+ * since another record processor may have already started processing records for that shard.
+ * In case of termination (resharding use case), applications SHOULD checkpoint their progress to indicate
+ * that they have successfully processed all the records (processing of child shards can then begin).
+ */
ShutdownReason int
InitializationInput struct {
- shardId string
- extendedSequenceNumber *ExtendedSequenceNumber
- pendingCheckpointSequenceNumber *ExtendedSequenceNumber
+ ShardId string
+ ExtendedSequenceNumber *ExtendedSequenceNumber
+ PendingCheckpointSequenceNumber *ExtendedSequenceNumber
}
ProcessRecordsInput struct {
- cacheEntryTime *time.Time
- cacheExitTime *time.Time
- records []*ks.Record
- checkpointer *IRecordProcessorCheckpointer
- millisBehindLatest int64
+ CacheEntryTime *time.Time
+ CacheExitTime *time.Time
+ Records []*ks.Record
+ Checkpointer IRecordProcessorCheckpointer
+ MillisBehindLatest int64
}
ShutdownInput struct {
- shutdownReason ShutdownReason
- checkpointer *IRecordProcessorCheckpointer
+ ShutdownReason ShutdownReason
+ Checkpointer IRecordProcessorCheckpointer
}
)
diff --git a/src/clientlibrary/interfaces/record-processor-checkpointer.go b/src/clientlibrary/interfaces/record-processor-checkpointer.go
index 296fd6a..ffea0e8 100644
--- a/src/clientlibrary/interfaces/record-processor-checkpointer.go
+++ b/src/clientlibrary/interfaces/record-processor-checkpointer.go
@@ -1,12 +1,8 @@
package interfaces
-import (
- ks "github.com/aws/aws-sdk-go/service/kinesis"
-)
-
type (
IPreparedCheckpointer interface {
- getPendingCheckpoint() ExtendedSequenceNumber
+ GetPendingCheckpoint() *ExtendedSequenceNumber
/**
* This method will record a pending checkpoint.
@@ -24,7 +20,7 @@ type (
* i.e. it is smaller than the last check point value (prepared or committed), or larger than the greatest
* sequence number seen by the associated record processor.
*/
- checkpoint() error
+ Checkpoint() error
}
/**
@@ -33,46 +29,6 @@ type (
* checkpoint their progress.
*/
IRecordProcessorCheckpointer interface {
-
- /**
- * This method will checkpoint the progress at the last data record that was delivered to the record processor.
- * Upon fail over (after a successful checkpoint() call), the new/replacement RecordProcessor instance
- * will receive data records whose sequenceNumber > checkpoint position (for each partition key).
- * In steady state, applications should checkpoint periodically (e.g. once every 5 minutes).
- * Calling this API too frequently can slow down the application (because it puts pressure on the underlying
- * checkpoint storage layer).
- *
- * @error ThrottlingError Can't store checkpoint. Can be caused by checkpointing too frequently.
- * Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency.
- * @error ShutdownError The record processor instance has been shutdown. Another instance may have
- * started processing some of these records already.
- * The application should abort processing via this RecordProcessor instance.
- * @error InvalidStateError Can't store checkpoint.
- * Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist).
- * @error KinesisClientLibDependencyError Encountered an issue when storing the checkpoint. The application can
- * backoff and retry.
- */
- checkpoint() error
-
- /**
- * This method will checkpoint the progress at the provided record. This method is analogous to
- * {@link #checkpoint()} but provides the ability to specify the record at which to
- * checkpoint.
- *
- * @param record A record at which to checkpoint in this shard. Upon failover,
- * the Kinesis Client Library will start fetching records after this record's sequence number.
- * @error ThrottlingError Can't store checkpoint. Can be caused by checkpointing too frequently.
- * Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency.
- * @error ShutdownError The record processor instance has been shutdown. Another instance may have
- * started processing some of these records already.
- * The application should abort processing via this RecordProcessor instance.
- * @error InvalidStateError Can't store checkpoint.
- * Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist).
- * @error KinesisClientLibDependencyError Encountered an issue when storing the checkpoint. The application can
- * backoff and retry.
- */
- checkpointByRecord(record *ks.Record) error
-
/**
* This method will checkpoint the progress at the provided sequenceNumber. This method is analogous to
* {@link #checkpoint()} but provides the ability to specify the sequence number at which to
@@ -94,87 +50,10 @@ type (
* greatest sequence number seen by the associated record processor.
* 2.) It is not a valid sequence number for a record in this shard.
*/
- checkpointBySequenceNumber(sequenceNumber string) error
+ Checkpoint(sequenceNumber *string) error
/**
- * This method will checkpoint the progress at the provided sequenceNumber and subSequenceNumber, the latter for
- * aggregated records produced with the Producer Library. This method is analogous to {@link #checkpoint()}
- * but provides the ability to specify the sequence and subsequence numbers at which to checkpoint.
- *
- * @param sequenceNumber A sequence number at which to checkpoint in this shard. Upon failover, the Kinesis
- * Client Library will start fetching records after the given sequence and subsequence numbers.
- * @param subSequenceNumber A subsequence number at which to checkpoint within this shard. Upon failover, the
- * Kinesis Client Library will start fetching records after the given sequence and subsequence numbers.
- * @error ThrottlingError Can't store checkpoint. Can be caused by checkpointing too frequently.
- * Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency.
- * @error ShutdownError The record processor instance has been shutdown. Another instance may have
- * started processing some of these records already.
- * The application should abort processing via this RecordProcessor instance.
- * @error InvalidStateError Can't store checkpoint.
- * Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist).
- * @error KinesisClientLibDependencyError Encountered an issue when storing the checkpoint. The application can
- * backoff and retry.
- * @error IllegalArgumentError The sequence number is invalid for one of the following reasons:
- * 1.) It appears to be out of range, i.e. it is smaller than the last check point value, or larger than the
- * greatest sequence number seen by the associated record processor.
- * 2.) It is not a valid sequence number for a record in this shard.
- */
- checkpointBySequenceNumberEx(sequenceNumber string, subSequenceNumber int64) error
-
- /**
- * This method will record a pending checkpoint at the last data record that was delivered to the record processor.
- * If the application fails over between calling prepareCheckpoint() and checkpoint(), the init() method of the next
- * IRecordProcessor for this shard will be informed of the prepared sequence number
- *
- * Application should use this to assist with idempotency across failover by calling prepareCheckpoint before having
- * side effects, then by calling checkpoint on the returned PreparedCheckpointer after side effects are complete.
- * Use the sequence number passed in to init() to behave idempotently.
- *
- * @return an IPreparedCheckpointer object that can be called later to persist the checkpoint.
- *
- * @error ThrottlingError Can't store pending checkpoint. Can be caused by checkpointing too frequently.
- * Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency.
- * @error ShutdownError The record processor instance has been shutdown. Another instance may have
- * started processing some of these records already.
- * The application should abort processing via this RecordProcessor instance.
- * @error InvalidStateError Can't store pending checkpoint.
- * Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist).
- * @error KinesisClientLibDependencyError Encountered an issue when storing the pending checkpoint. The
- * application can backoff and retry.
- */
- prepareCheckpoint() (*IPreparedCheckpointer, error)
-
- /**
- * This method will record a pending checkpoint at the at the provided record. This method is analogous to
- * {@link #prepareCheckpoint()} but provides the ability to specify the record at which to prepare the checkpoint.
- *
- * @param record A record at which to prepare checkpoint in this shard.
- *
- * Application should use this to assist with idempotency across failover by calling prepareCheckpoint before having
- * side effects, then by calling checkpoint on the returned PreparedCheckpointer after side effects are complete.
- * Use the sequence number and application state passed in to init() to behave idempotently.
- *
- * @return an IPreparedCheckpointer object that can be called later to persist the checkpoint.
- *
- * @error ThrottlingError Can't store pending checkpoint. Can be caused by checkpointing too frequently.
- * Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency.
- * @error ShutdownError The record processor instance has been shutdown. Another instance may have
- * started processing some of these records already.
- * The application should abort processing via this RecordProcessor instance.
- * @error InvalidStateError Can't store pending checkpoint.
- * Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist).
- * @error KinesisClientLibDependencyError Encountered an issue when storing the pending checkpoint. The
- * application can backoff and retry.
- * @error IllegalArgumentError The sequence number is invalid for one of the following reasons:
- * 1.) It appears to be out of range, i.e. it is smaller than the last check point value, or larger than the
- * greatest sequence number seen by the associated record processor.
- * 2.) It is not a valid sequence number for a record in this shard.
- */
- prepareCheckpointByRecord(record *ks.Record) (*IPreparedCheckpointer, error)
-
- /**
- * This method will record a pending checkpoint at the provided sequenceNumber. This method is analogous to
- * {@link #prepareCheckpoint()} but provides the ability to specify the sequence number at which to checkpoint.
+ * This method will record a pending checkpoint at the provided sequenceNumber.
*
* @param sequenceNumber A sequence number at which to prepare checkpoint in this shard.
@@ -194,32 +73,6 @@ type (
* greatest sequence number seen by the associated record processor.
* 2.) It is not a valid sequence number for a record in this shard.
*/
- prepareCheckpointBySequenceNumber(sequenceNumber string) (*IPreparedCheckpointer, error)
-
- /**
- * This method will record a pending checkpoint at the provided sequenceNumber and subSequenceNumber, the latter for
- * aggregated records produced with the Producer Library. This method is analogous to {@link #prepareCheckpoint()}
- * but provides the ability to specify the sequence number at which to checkpoint
- *
- * @param sequenceNumber A sequence number at which to prepare checkpoint in this shard.
- * @param subSequenceNumber A subsequence number at which to prepare checkpoint within this shard.
- *
- * @return an IPreparedCheckpointer object that can be called later to persist the checkpoint.
- *
- * @error ThrottlingError Can't store pending checkpoint. Can be caused by checkpointing too frequently.
- * Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency.
- * @error ShutdownError The record processor instance has been shutdown. Another instance may have
- * started processing some of these records already.
- * The application should abort processing via this RecordProcessor instance.
- * @error InvalidStateError Can't store pending checkpoint.
- * Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist).
- * @error KinesisClientLibDependencyError Encountered an issue when storing the pending checkpoint. The
- * application can backoff and retry.
- * @error IllegalArgumentError The sequence number is invalid for one of the following reasons:
- * 1.) It appears to be out of range, i.e. it is smaller than the last check point value, or larger than the
- * greatest sequence number seen by the associated record processor.
- * 2.) It is not a valid sequence number for a record in this shard.
- */
- prepareCheckpointBySequenceNumberEx(sequenceNumber string, subSequenceNumber int64) (*IPreparedCheckpointer, error)
+ PrepareCheckpoint(sequenceNumber *string) (IPreparedCheckpointer, error)
}
)
diff --git a/src/clientlibrary/interfaces/record-processor.go b/src/clientlibrary/interfaces/record-processor.go
index f704d0e..d64414c 100644
--- a/src/clientlibrary/interfaces/record-processor.go
+++ b/src/clientlibrary/interfaces/record-processor.go
@@ -1,40 +1,54 @@
package interfaces
-// IRecordProcessor is the interface for some callback functions invoked by KCL will
-// The main task of using KCL is to provide implementation on IRecordProcessor interface.
-// Note: This is exactly the same interface as Amazon KCL IRecordProcessor v2
-type IRecordProcessor interface {
- /**
- * Invoked by the Amazon Kinesis Client Library before data records are delivered to the RecordProcessor instance
- * (via processRecords).
- *
- * @param initializationInput Provides information related to initialization
- */
- initialize(initializationInput InitializationInput)
+type (
+ // IRecordProcessor is the interface for some callback functions invoked by KCL will
+ // The main task of using KCL is to provide implementation on IRecordProcessor interface.
+ // Note: This is exactly the same interface as Amazon KCL IRecordProcessor v2
+ IRecordProcessor interface {
+ /**
+ * Invoked by the Amazon Kinesis Client Library before data records are delivered to the RecordProcessor instance
+ * (via processRecords).
+ *
+ * @param initializationInput Provides information related to initialization
+ */
+ Initialize(initializationInput *InitializationInput)
- /**
- * Process data records. The Amazon Kinesis Client Library will invoke this method to deliver data records to the
- * application.
- * Upon fail over, the new instance will get records with sequence number > checkpoint position
- * for each partition key.
- *
- * @param processRecordsInput Provides the records to be processed as well as information and capabilities related
- * to them (eg checkpointing).
- */
- processRecords(processRecordsInput ProcessRecordsInput)
+ /**
+ * Process data records. The Amazon Kinesis Client Library will invoke this method to deliver data records to the
+ * application.
+ * Upon fail over, the new instance will get records with sequence number > checkpoint position
+ * for each partition key.
+ *
+ * @param processRecordsInput Provides the records to be processed as well as information and capabilities related
+ * to them (eg checkpointing).
+ */
+ ProcessRecords(processRecordsInput *ProcessRecordsInput)
- /**
- * Invoked by the Amazon Kinesis Client Library to indicate it will no longer send data records to this
- * RecordProcessor instance.
- *
- *
Warning
- *
- * When the value of {@link ShutdownInput#getShutdownReason()} is
- * {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason#TERMINATE} it is required that you
- * checkpoint. Failure to do so will result in an IllegalArgumentException, and the KCL no longer making progress.
- *
- * @param shutdownInput
- * Provides information and capabilities (eg checkpointing) related to shutdown of this record processor.
- */
- shutdown(shutdownInput ShutdownInput)
-}
+ /**
+ * Invoked by the Amazon Kinesis Client Library to indicate it will no longer send data records to this
+ * RecordProcessor instance.
+ *
+ * Warning
+ *
+ * When the value of {@link ShutdownInput#getShutdownReason()} is
+ * {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason#TERMINATE} it is required that you
+ * checkpoint. Failure to do so will result in an IllegalArgumentException, and the KCL no longer making progress.
+ *
+ * @param shutdownInput
+ * Provides information and capabilities (eg checkpointing) related to shutdown of this record processor.
+ */
+ Shutdown(shutdownInput *ShutdownInput)
+ }
+
+ // IRecordProcessorFactory is interface for creating IRecordProcessor. Each Worker can have multiple threads
+ // for processing shard. Client can choose either creating one processor per shard or sharing them.
+ IRecordProcessorFactory interface {
+
+ /**
+ * Returns a record processor to be used for processing data records for a (assigned) shard.
+ *
+ * @return Returns a processor object.
+ */
+ CreateProcessor() IRecordProcessor
+ }
+)
diff --git a/src/clientlibrary/interfaces/sequence-number.go b/src/clientlibrary/interfaces/sequence-number.go
index 80ac68f..f9c01ad 100644
--- a/src/clientlibrary/interfaces/sequence-number.go
+++ b/src/clientlibrary/interfaces/sequence-number.go
@@ -6,6 +6,6 @@ package interfaces
// sub-sequence number, in addition to the regular sequence number of the Kinesis record. The sub-sequence number
// is used to checkpoint within an aggregated record.
type ExtendedSequenceNumber struct {
- sequenceNumber string
- subSequenceNumber int64
+ SequenceNumber *string
+ SubSequenceNumber int64
}
diff --git a/src/clientlibrary/lib/checkpoint/checkpoint.go b/src/clientlibrary/lib/checkpoint/checkpoint.go
deleted file mode 100644
index 1f480d8..0000000
--- a/src/clientlibrary/lib/checkpoint/checkpoint.go
+++ /dev/null
@@ -1,26 +0,0 @@
-package checkpoint
-
-import (
- . "clientlibrary/interfaces"
-)
-
-const (
- // TRIM_HORIZON starts from the first available record in the shard.
- TRIM_HORIZON = SentinelCheckpoint(iota + 1)
- // LATEST starts from the latest record in the shard.
- LATEST
- // SHARD_END We've completely processed all records in this shard.
- SHARD_END
- // AT_TIMESTAMP starts from the record at or after the specified server-side timestamp.
- AT_TIMESTAMP
-)
-
-type (
- SentinelCheckpoint int
-
- // Checkpoint: a class encapsulating the 2 pieces of state stored in a checkpoint.
- Checkpoint struct {
- checkpoint *ExtendedSequenceNumber
- pendingCheckpoint *ExtendedSequenceNumber
- }
-)
diff --git a/src/clientlibrary/lib/worker/worker.go b/src/clientlibrary/lib/worker/worker.go
deleted file mode 100644
index 4df0094..0000000
--- a/src/clientlibrary/lib/worker/worker.go
+++ /dev/null
@@ -1 +0,0 @@
-package worker
diff --git a/src/clientlibrary/metrics/cloudwatch.go b/src/clientlibrary/metrics/cloudwatch.go
new file mode 100644
index 0000000..f5a76d6
--- /dev/null
+++ b/src/clientlibrary/metrics/cloudwatch.go
@@ -0,0 +1,274 @@
+package metrics
+
+import (
+ "sync"
+ "time"
+
+ "github.com/aws/aws-sdk-go/aws"
+ "github.com/aws/aws-sdk-go/aws/session"
+ "github.com/aws/aws-sdk-go/service/cloudwatch"
+ "github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface"
+ log "github.com/sirupsen/logrus"
+)
+
+type CloudWatchMonitoringService struct {
+ Namespace string
+ KinesisStream string
+ WorkerID string
+ // What granularity we should send metrics to CW at. Note setting this to 1 will cost quite a bit of money
+ // At the time of writing (March 2018) about US$200 per month
+ ResolutionSec int
+ svc cloudwatchiface.CloudWatchAPI
+ shardMetrics map[string]*cloudWatchMetrics
+}
+
+type cloudWatchMetrics struct {
+ processedRecords int64
+ processedBytes int64
+ behindLatestMillis []float64
+ leasesHeld int64
+ leaseRenewals int64
+ getRecordsTime []float64
+ processRecordsTime []float64
+ sync.Mutex
+}
+
+func (cw *CloudWatchMonitoringService) Init() error {
+ if cw.ResolutionSec == 0 {
+ cw.ResolutionSec = 60
+ }
+
+ session, err := session.NewSessionWithOptions(
+ session.Options{
+ SharedConfigState: session.SharedConfigEnable,
+ },
+ )
+ if err != nil {
+ return err
+ }
+
+ cw.svc = cloudwatch.New(session)
+ cw.shardMetrics = make(map[string]*cloudWatchMetrics)
+ return nil
+}
+
+func (cw *CloudWatchMonitoringService) flushDaemon() {
+ previousFlushTime := time.Now()
+ resolutionDuration := time.Duration(cw.ResolutionSec) * time.Second
+ for {
+ time.Sleep(resolutionDuration - time.Now().Sub(previousFlushTime))
+ err := cw.flush()
+ if err != nil {
+ log.Errorln("Error sending metrics to CloudWatch", err)
+ }
+ previousFlushTime = time.Now()
+ }
+}
+
+func (cw *CloudWatchMonitoringService) flush() error {
+ for shard, metric := range cw.shardMetrics {
+ metric.Lock()
+ defaultDimensions := []*cloudwatch.Dimension{
+ &cloudwatch.Dimension{
+ Name: aws.String("shard"),
+ Value: &shard,
+ },
+ &cloudwatch.Dimension{
+ Name: aws.String("KinesisStreamName"),
+ Value: &cw.KinesisStream,
+ },
+ }
+ leaseDimensions := make([]*cloudwatch.Dimension, len(defaultDimensions))
+ copy(defaultDimensions, leaseDimensions)
+ leaseDimensions = append(leaseDimensions, &cloudwatch.Dimension{
+ Name: aws.String("WorkerID"),
+ Value: &cw.WorkerID,
+ })
+ metricTimestamp := time.Now()
+ _, err := cw.svc.PutMetricData(&cloudwatch.PutMetricDataInput{
+ Namespace: aws.String(cw.Namespace),
+ MetricData: []*cloudwatch.MetricDatum{
+ &cloudwatch.MetricDatum{
+ Dimensions: defaultDimensions,
+ MetricName: aws.String("RecordsProcessed"),
+ Unit: aws.String("Count"),
+ Timestamp: &metricTimestamp,
+ Value: aws.Float64(float64(metric.processedRecords)),
+ },
+ &cloudwatch.MetricDatum{
+ Dimensions: defaultDimensions,
+ MetricName: aws.String("DataBytesProcessed"),
+ Unit: aws.String("Byte"),
+ Timestamp: &metricTimestamp,
+ Value: aws.Float64(float64(metric.processedBytes)),
+ },
+ &cloudwatch.MetricDatum{
+ Dimensions: defaultDimensions,
+ MetricName: aws.String("MillisBehindLatest"),
+ Unit: aws.String("Milliseconds"),
+ Timestamp: &metricTimestamp,
+ StatisticValues: &cloudwatch.StatisticSet{
+ SampleCount: aws.Float64(float64(len(metric.behindLatestMillis))),
+ Sum: sumFloat64(metric.behindLatestMillis),
+ Maximum: maxFloat64(metric.behindLatestMillis),
+ Minimum: minFloat64(metric.behindLatestMillis),
+ },
+ },
+ &cloudwatch.MetricDatum{
+ Dimensions: defaultDimensions,
+ MetricName: aws.String("KinesisDataFetcher.getRecords.Time"),
+ Unit: aws.String("Milliseconds"),
+ Timestamp: &metricTimestamp,
+ StatisticValues: &cloudwatch.StatisticSet{
+ SampleCount: aws.Float64(float64(len(metric.getRecordsTime))),
+ Sum: sumFloat64(metric.getRecordsTime),
+ Maximum: maxFloat64(metric.getRecordsTime),
+ Minimum: minFloat64(metric.getRecordsTime),
+ },
+ },
+ &cloudwatch.MetricDatum{
+ Dimensions: defaultDimensions,
+ MetricName: aws.String("RecordProcessor.processRecords.Time"),
+ Unit: aws.String("Milliseconds"),
+ Timestamp: &metricTimestamp,
+ StatisticValues: &cloudwatch.StatisticSet{
+ SampleCount: aws.Float64(float64(len(metric.processRecordsTime))),
+ Sum: sumFloat64(metric.processRecordsTime),
+ Maximum: maxFloat64(metric.processRecordsTime),
+ Minimum: minFloat64(metric.processRecordsTime),
+ },
+ },
+ &cloudwatch.MetricDatum{
+ Dimensions: leaseDimensions,
+ MetricName: aws.String("RenewLease.Success"),
+ Unit: aws.String("Count"),
+ Timestamp: &metricTimestamp,
+ Value: aws.Float64(float64(metric.leaseRenewals)),
+ },
+ &cloudwatch.MetricDatum{
+ Dimensions: leaseDimensions,
+ MetricName: aws.String("CurrentLeases"),
+ Unit: aws.String("Count"),
+ Timestamp: &metricTimestamp,
+ Value: aws.Float64(float64(metric.leasesHeld)),
+ },
+ },
+ })
+ if err == nil {
+ metric.processedRecords = 0
+ metric.processedBytes = 0
+ metric.behindLatestMillis = []float64{}
+ metric.leaseRenewals = 0
+ metric.getRecordsTime = []float64{}
+ metric.processRecordsTime = []float64{}
+ }
+ metric.Unlock()
+ return err
+ }
+ return nil
+}
+
+func (cw *CloudWatchMonitoringService) IncrRecordsProcessed(shard string, count int) {
+ if _, ok := cw.shardMetrics[shard]; !ok {
+ cw.shardMetrics[shard] = &cloudWatchMetrics{}
+ }
+ cw.shardMetrics[shard].Lock()
+ defer cw.shardMetrics[shard].Unlock()
+ cw.shardMetrics[shard].processedRecords += int64(count)
+}
+
+func (cw *CloudWatchMonitoringService) IncrBytesProcessed(shard string, count int64) {
+ if _, ok := cw.shardMetrics[shard]; !ok {
+ cw.shardMetrics[shard] = &cloudWatchMetrics{}
+ }
+ cw.shardMetrics[shard].Lock()
+ defer cw.shardMetrics[shard].Unlock()
+ cw.shardMetrics[shard].processedBytes += count
+}
+
+func (cw *CloudWatchMonitoringService) MillisBehindLatest(shard string, millSeconds float64) {
+ if _, ok := cw.shardMetrics[shard]; !ok {
+ cw.shardMetrics[shard] = &cloudWatchMetrics{}
+ }
+ cw.shardMetrics[shard].Lock()
+ defer cw.shardMetrics[shard].Unlock()
+ cw.shardMetrics[shard].behindLatestMillis = append(cw.shardMetrics[shard].behindLatestMillis, millSeconds)
+}
+
+func (cw *CloudWatchMonitoringService) LeaseGained(shard string) {
+ if _, ok := cw.shardMetrics[shard]; !ok {
+ cw.shardMetrics[shard] = &cloudWatchMetrics{}
+ }
+ cw.shardMetrics[shard].Lock()
+ defer cw.shardMetrics[shard].Unlock()
+ cw.shardMetrics[shard].leasesHeld++
+}
+
+func (cw *CloudWatchMonitoringService) LeaseLost(shard string) {
+ if _, ok := cw.shardMetrics[shard]; !ok {
+ cw.shardMetrics[shard] = &cloudWatchMetrics{}
+ }
+ cw.shardMetrics[shard].Lock()
+ defer cw.shardMetrics[shard].Unlock()
+ cw.shardMetrics[shard].leasesHeld--
+}
+
+func (cw *CloudWatchMonitoringService) LeaseRenewed(shard string) {
+ if _, ok := cw.shardMetrics[shard]; !ok {
+ cw.shardMetrics[shard] = &cloudWatchMetrics{}
+ }
+ cw.shardMetrics[shard].Lock()
+ defer cw.shardMetrics[shard].Unlock()
+ cw.shardMetrics[shard].leaseRenewals++
+}
+
+func (cw *CloudWatchMonitoringService) RecordGetRecordsTime(shard string, time float64) {
+ if _, ok := cw.shardMetrics[shard]; !ok {
+ cw.shardMetrics[shard] = &cloudWatchMetrics{}
+ }
+ cw.shardMetrics[shard].Lock()
+ defer cw.shardMetrics[shard].Unlock()
+ cw.shardMetrics[shard].getRecordsTime = append(cw.shardMetrics[shard].getRecordsTime, time)
+}
+func (cw *CloudWatchMonitoringService) RecordProcessRecordsTime(shard string, time float64) {
+ if _, ok := cw.shardMetrics[shard]; !ok {
+ cw.shardMetrics[shard] = &cloudWatchMetrics{}
+ }
+ cw.shardMetrics[shard].Lock()
+ defer cw.shardMetrics[shard].Unlock()
+ cw.shardMetrics[shard].processRecordsTime = append(cw.shardMetrics[shard].processRecordsTime, time)
+}
+
+func sumFloat64(slice []float64) *float64 {
+ sum := float64(0)
+ for _, num := range slice {
+ sum += num
+ }
+ return &sum
+}
+
+func maxFloat64(slice []float64) *float64 {
+ if len(slice) < 1 {
+ return aws.Float64(0)
+ }
+ max := slice[0]
+ for _, num := range slice {
+ if num > max {
+ max = num
+ }
+ }
+ return &max
+}
+
+func minFloat64(slice []float64) *float64 {
+ if len(slice) < 1 {
+ return aws.Float64(0)
+ }
+ min := slice[0]
+ for _, num := range slice {
+ if num < min {
+ min = num
+ }
+ }
+ return &min
+}
diff --git a/src/clientlibrary/metrics/interfaces.go b/src/clientlibrary/metrics/interfaces.go
new file mode 100644
index 0000000..141e644
--- /dev/null
+++ b/src/clientlibrary/metrics/interfaces.go
@@ -0,0 +1,66 @@
+package metrics
+
+import (
+ "fmt"
+)
+
+// MonitoringConfiguration allows you to configure how record processing metrics are exposed
+type MonitoringConfiguration struct {
+ MonitoringService string // Type of monitoring to expose. Supported types are "prometheus"
+ Prometheus PrometheusMonitoringService
+ CloudWatch CloudWatchMonitoringService
+ service MonitoringService
+}
+
+type MonitoringService interface {
+ Init() error
+ IncrRecordsProcessed(string, int)
+ IncrBytesProcessed(string, int64)
+ MillisBehindLatest(string, float64)
+ LeaseGained(string)
+ LeaseLost(string)
+ LeaseRenewed(string)
+ RecordGetRecordsTime(string, float64)
+ RecordProcessRecordsTime(string, float64)
+}
+
+func (m *MonitoringConfiguration) Init(nameSpace, streamName string, workerID string) error {
+ if m.MonitoringService == "" {
+ m.service = &noopMonitoringService{}
+ return nil
+ }
+
+ switch m.MonitoringService {
+ case "prometheus":
+ m.Prometheus.Namespace = nameSpace
+ m.Prometheus.KinesisStream = streamName
+ m.Prometheus.WorkerID = workerID
+ m.service = &m.Prometheus
+ case "cloudwatch":
+ m.CloudWatch.KinesisStream = streamName
+ m.CloudWatch.WorkerID = workerID
+ m.service = &m.CloudWatch
+ default:
+ return fmt.Errorf("Invalid monitoring service type %s", m.MonitoringService)
+ }
+ return m.service.Init()
+}
+
+func (m *MonitoringConfiguration) GetMonitoringService() MonitoringService {
+ return m.service
+}
+
+type noopMonitoringService struct{}
+
+func (n *noopMonitoringService) Init() error {
+ return nil
+}
+
+func (n *noopMonitoringService) IncrRecordsProcessed(shard string, count int) {}
+func (n *noopMonitoringService) IncrBytesProcessed(shard string, count int64) {}
+func (n *noopMonitoringService) MillisBehindLatest(shard string, millSeconds float64) {}
+func (n *noopMonitoringService) LeaseGained(shard string) {}
+func (n *noopMonitoringService) LeaseLost(shard string) {}
+func (n *noopMonitoringService) LeaseRenewed(shard string) {}
+func (n *noopMonitoringService) RecordGetRecordsTime(shard string, time float64) {}
+func (n *noopMonitoringService) RecordProcessRecordsTime(shard string, time float64) {}
diff --git a/src/clientlibrary/metrics/prometheus.go b/src/clientlibrary/metrics/prometheus.go
new file mode 100644
index 0000000..4ec13fd
--- /dev/null
+++ b/src/clientlibrary/metrics/prometheus.go
@@ -0,0 +1,113 @@
+package metrics
+
+import (
+ "net/http"
+
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promhttp"
+ log "github.com/sirupsen/logrus"
+)
+
+type PrometheusMonitoringService struct {
+ ListenAddress string
+
+ Namespace string
+ KinesisStream string
+ WorkerID string
+ processedRecords *prometheus.CounterVec
+ processedBytes *prometheus.CounterVec
+ behindLatestMillis *prometheus.GaugeVec
+ leasesHeld *prometheus.GaugeVec
+ leaseRenewals *prometheus.CounterVec
+ getRecordsTime *prometheus.HistogramVec
+ processRecordsTime *prometheus.HistogramVec
+}
+
+func (p *PrometheusMonitoringService) Init() error {
+ p.processedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
+ Name: p.Namespace + `_processed_bytes`,
+ Help: "Number of bytes processed",
+ }, []string{"kinesisStream", "shard"})
+ p.processedRecords = prometheus.NewCounterVec(prometheus.CounterOpts{
+ Name: p.Namespace + `_processed_records`,
+ Help: "Number of records processed",
+ }, []string{"kinesisStream", "shard"})
+ p.behindLatestMillis = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+ Name: p.Namespace + `_behind_latest_millis`,
+ Help: "The amount of milliseconds processing is behind",
+ }, []string{"kinesisStream", "shard"})
+ p.leasesHeld = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+ Name: p.Namespace + `_leases_held`,
+ Help: "The number of leases held by the worker",
+ }, []string{"kinesisStream", "shard", "workerID"})
+ p.leaseRenewals = prometheus.NewCounterVec(prometheus.CounterOpts{
+ Name: p.Namespace + `_lease_renewals`,
+ Help: "The number of successful lease renewals",
+ }, []string{"kinesisStream", "shard", "workerID"})
+ p.getRecordsTime = prometheus.NewHistogramVec(prometheus.HistogramOpts{
+ Name: p.Namespace + `_get_records_duration_milliseconds`,
+ Help: "The time taken to fetch records and process them",
+ }, []string{"kinesisStream", "shard"})
+ p.processRecordsTime = prometheus.NewHistogramVec(prometheus.HistogramOpts{
+ Name: p.Namespace + `_process_records_duration_milliseconds`,
+ Help: "The time taken to process records",
+ }, []string{"kinesisStream", "shard"})
+
+ metrics := []prometheus.Collector{
+ p.processedBytes,
+ p.processedRecords,
+ p.behindLatestMillis,
+ p.leasesHeld,
+ p.leaseRenewals,
+ p.getRecordsTime,
+ p.processRecordsTime,
+ }
+ for _, metric := range metrics {
+ err := prometheus.Register(metric)
+ if err != nil {
+ return err
+ }
+ }
+
+ http.Handle("/metrics", promhttp.Handler())
+ go func() {
+ log.Debugf("Starting Prometheus listener on %s", p.ListenAddress)
+ err := http.ListenAndServe(p.ListenAddress, nil)
+ if err != nil {
+ log.Errorln("Error starting Prometheus metrics endpoint", err)
+ }
+ }()
+ return nil
+}
+
+func (p *PrometheusMonitoringService) IncrRecordsProcessed(shard string, count int) {
+ p.processedRecords.With(prometheus.Labels{"shard": shard, "kinesisStream": p.KinesisStream}).Add(float64(count))
+}
+
+func (p *PrometheusMonitoringService) IncrBytesProcessed(shard string, count int64) {
+ p.processedBytes.With(prometheus.Labels{"shard": shard, "kinesisStream": p.KinesisStream}).Add(float64(count))
+}
+
+func (p *PrometheusMonitoringService) MillisBehindLatest(shard string, millSeconds float64) {
+ p.behindLatestMillis.With(prometheus.Labels{"shard": shard, "kinesisStream": p.KinesisStream}).Set(millSeconds)
+}
+
+func (p *PrometheusMonitoringService) LeaseGained(shard string) {
+ p.leasesHeld.With(prometheus.Labels{"shard": shard, "kinesisStream": p.KinesisStream, "workerID": p.WorkerID}).Inc()
+}
+
+func (p *PrometheusMonitoringService) LeaseLost(shard string) {
+ p.leasesHeld.With(prometheus.Labels{"shard": shard, "kinesisStream": p.KinesisStream, "workerID": p.WorkerID}).Dec()
+}
+
+func (p *PrometheusMonitoringService) LeaseRenewed(shard string) {
+ p.leaseRenewals.With(prometheus.Labels{"shard": shard, "kinesisStream": p.KinesisStream, "workerID": p.WorkerID}).Inc()
+}
+
+func (p *PrometheusMonitoringService) RecordGetRecordsTime(shard string, time float64) {
+ p.getRecordsTime.With(prometheus.Labels{"shard": shard, "kinesisStream": p.KinesisStream}).Observe(time)
+}
+
+func (p *PrometheusMonitoringService) RecordProcessRecordsTime(shard string, time float64) {
+ p.processRecordsTime.With(prometheus.Labels{"shard": shard, "kinesisStream": p.KinesisStream}).Observe(time)
+}
diff --git a/src/clientlibrary/utils/random.go b/src/clientlibrary/utils/random.go
new file mode 100644
index 0000000..ea0299a
--- /dev/null
+++ b/src/clientlibrary/utils/random.go
@@ -0,0 +1,30 @@
+package utils
+
+import (
+ "math/rand"
+)
+
+const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
+const (
+ letterIdxBits = 6 // 6 bits to represent a letter index
+ letterIdxMask = 1<= 0; {
+ if remain == 0 {
+ cache, remain = rand.Int63(), letterIdxMax
+ }
+ if idx := int(cache & letterIdxMask); idx < len(letterBytes) {
+ b[i] = letterBytes[idx]
+ i--
+ }
+ cache >>= letterIdxBits
+ remain--
+ }
+
+ return string(b)
+}
diff --git a/src/clientlibrary/worker/checkpointer.go b/src/clientlibrary/worker/checkpointer.go
new file mode 100644
index 0000000..2ca4dda
--- /dev/null
+++ b/src/clientlibrary/worker/checkpointer.go
@@ -0,0 +1,276 @@
+package worker
+
+import (
+ "errors"
+ "time"
+
+ "github.com/aws/aws-sdk-go/aws"
+ "github.com/aws/aws-sdk-go/aws/awserr"
+ "github.com/aws/aws-sdk-go/service/dynamodb"
+ "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface"
+ "github.com/matryer/try"
+ log "github.com/sirupsen/logrus"
+
+ "clientlibrary/config"
+)
+
+const (
+ // ErrLeaseNotAquired is returned when we failed to get a lock on the shard
+ ErrLeaseNotAquired = "Lease is already held by another node"
+ // ErrInvalidDynamoDBSchema is returned when there are one or more fields missing from the table
+ ErrInvalidDynamoDBSchema = "The DynamoDB schema is invalid and may need to be re-created"
+)
+
+// Checkpointer handles checkpointing when a record has been processed
+type Checkpointer interface {
+ Init() error
+ GetLease(*shardStatus, string) error
+ CheckpointSequence(*shardStatus) error
+ FetchCheckpoint(*shardStatus) error
+}
+
+// ErrSequenceIDNotFound is returned by FetchCheckpoint when no SequenceID is found
+var ErrSequenceIDNotFound = errors.New("SequenceIDNotFoundForShard")
+
+// DynamoCheckpoint implements the Checkpoint interface using DynamoDB as a backend
+type DynamoCheckpoint struct {
+ TableName string
+ leaseTableReadCapacity int64
+ leaseTableWriteCapacity int64
+
+ LeaseDuration int
+ svc dynamodbiface.DynamoDBAPI
+ kclConfig *config.KinesisClientLibConfiguration
+ Retries int
+}
+
+func NewDynamoCheckpoint(dynamo dynamodbiface.DynamoDBAPI, kclConfig *config.KinesisClientLibConfiguration) Checkpointer {
+ checkpointer := &DynamoCheckpoint{
+ TableName: kclConfig.TableName,
+ leaseTableReadCapacity: int64(kclConfig.InitialLeaseTableReadCapacity),
+ leaseTableWriteCapacity: int64(kclConfig.InitialLeaseTableWriteCapacity),
+ LeaseDuration: kclConfig.FailoverTimeMillis,
+ svc: dynamo,
+ kclConfig: kclConfig,
+ Retries: 5,
+ }
+ return checkpointer
+}
+
+// Init initialises the DynamoDB Checkpoint
+func (checkpointer *DynamoCheckpoint) Init() error {
+ if !checkpointer.doesTableExist() {
+ return checkpointer.createTable()
+ }
+ return nil
+}
+
+// GetLease attempts to gain a lock on the given shard
+func (checkpointer *DynamoCheckpoint) GetLease(shard *shardStatus, newAssignTo string) error {
+ newLeaseTimeout := time.Now().Add(time.Duration(checkpointer.LeaseDuration) * time.Millisecond).UTC()
+ newLeaseTimeoutString := newLeaseTimeout.Format(time.RFC3339)
+ currentCheckpoint, err := checkpointer.getItem(shard.ID)
+ if err != nil {
+ return err
+ }
+
+ assignedVar, assignedToOk := currentCheckpoint["AssignedTo"]
+ leaseVar, leaseTimeoutOk := currentCheckpoint["LeaseTimeout"]
+ var conditionalExpression string
+ var expressionAttributeValues map[string]*dynamodb.AttributeValue
+
+ if !leaseTimeoutOk || !assignedToOk {
+ conditionalExpression = "attribute_not_exists(AssignedTo)"
+ } else {
+ assignedTo := *assignedVar.S
+ leaseTimeout := *leaseVar.S
+
+ currentLeaseTimeout, err := time.Parse(time.RFC3339, leaseTimeout)
+ if err != nil {
+ return err
+ }
+ if !time.Now().UTC().After(currentLeaseTimeout) && assignedTo != newAssignTo {
+ return errors.New(ErrLeaseNotAquired)
+ }
+ log.Debugf("Attempting to get a lock for shard: %s, leaseTimeout: %s, assignedTo: %s", shard.ID, currentLeaseTimeout, assignedTo)
+ conditionalExpression = "ShardID = :id AND AssignedTo = :assigned_to AND LeaseTimeout = :lease_timeout"
+ expressionAttributeValues = map[string]*dynamodb.AttributeValue{
+ ":id": {
+ S: &shard.ID,
+ },
+ ":assigned_to": {
+ S: &assignedTo,
+ },
+ ":lease_timeout": {
+ S: &leaseTimeout,
+ },
+ }
+ }
+
+ marshalledCheckpoint := map[string]*dynamodb.AttributeValue{
+ "ShardID": {
+ S: &shard.ID,
+ },
+ "AssignedTo": {
+ S: &newAssignTo,
+ },
+ "LeaseTimeout": {
+ S: &newLeaseTimeoutString,
+ },
+ }
+
+ if shard.Checkpoint != "" {
+ marshalledCheckpoint["Checkpoint"] = &dynamodb.AttributeValue{
+ S: &shard.Checkpoint,
+ }
+ }
+
+ err = checkpointer.conditionalUpdate(conditionalExpression, expressionAttributeValues, marshalledCheckpoint)
+ if err != nil {
+ if awsErr, ok := err.(awserr.Error); ok {
+ if awsErr.Code() == dynamodb.ErrCodeConditionalCheckFailedException {
+ return errors.New(ErrLeaseNotAquired)
+ }
+ }
+ return err
+ }
+
+ shard.mux.Lock()
+ shard.AssignedTo = newAssignTo
+ shard.LeaseTimeout = newLeaseTimeout
+ shard.mux.Unlock()
+
+ return nil
+}
+
+// CheckpointSequence writes a checkpoint at the designated sequence ID
+func (checkpointer *DynamoCheckpoint) CheckpointSequence(shard *shardStatus) error {
+ leaseTimeout := shard.LeaseTimeout.UTC().Format(time.RFC3339)
+ marshalledCheckpoint := map[string]*dynamodb.AttributeValue{
+ "ShardID": {
+ S: &shard.ID,
+ },
+ "SequenceID": {
+ S: &shard.Checkpoint,
+ },
+ "AssignedTo": {
+ S: &shard.AssignedTo,
+ },
+ "LeaseTimeout": {
+ S: &leaseTimeout,
+ },
+ }
+ return checkpointer.saveItem(marshalledCheckpoint)
+}
+
+// FetchCheckpoint retrieves the checkpoint for the given shard
+func (checkpointer *DynamoCheckpoint) FetchCheckpoint(shard *shardStatus) error {
+ checkpoint, err := checkpointer.getItem(shard.ID)
+ if err != nil {
+ return err
+ }
+
+ sequenceID, ok := checkpoint["SequenceID"]
+ if !ok {
+ return ErrSequenceIDNotFound
+ }
+ log.Debugf("Retrieved Shard Iterator %s", *sequenceID.S)
+ shard.mux.Lock()
+ defer shard.mux.Unlock()
+ shard.Checkpoint = *sequenceID.S
+
+ if assignedTo, ok := checkpoint["Assignedto"]; ok {
+ shard.AssignedTo = *assignedTo.S
+ }
+ return nil
+}
+
+func (checkpointer *DynamoCheckpoint) createTable() error {
+ input := &dynamodb.CreateTableInput{
+ AttributeDefinitions: []*dynamodb.AttributeDefinition{
+ {
+ AttributeName: aws.String("ShardID"),
+ AttributeType: aws.String("S"),
+ },
+ },
+ KeySchema: []*dynamodb.KeySchemaElement{
+ {
+ AttributeName: aws.String("ShardID"),
+ KeyType: aws.String("HASH"),
+ },
+ },
+ ProvisionedThroughput: &dynamodb.ProvisionedThroughput{
+ ReadCapacityUnits: aws.Int64(checkpointer.leaseTableReadCapacity),
+ WriteCapacityUnits: aws.Int64(checkpointer.leaseTableWriteCapacity),
+ },
+ TableName: aws.String(checkpointer.TableName),
+ }
+ _, err := checkpointer.svc.CreateTable(input)
+ return err
+}
+
+func (checkpointer *DynamoCheckpoint) doesTableExist() bool {
+ input := &dynamodb.DescribeTableInput{
+ TableName: aws.String(checkpointer.TableName),
+ }
+ _, err := checkpointer.svc.DescribeTable(input)
+ return (err == nil)
+}
+
+func (checkpointer *DynamoCheckpoint) saveItem(item map[string]*dynamodb.AttributeValue) error {
+ return checkpointer.putItem(&dynamodb.PutItemInput{
+ TableName: aws.String(checkpointer.TableName),
+ Item: item,
+ })
+}
+
+func (checkpointer *DynamoCheckpoint) conditionalUpdate(conditionExpression string, expressionAttributeValues map[string]*dynamodb.AttributeValue, item map[string]*dynamodb.AttributeValue) error {
+ return checkpointer.putItem(&dynamodb.PutItemInput{
+ ConditionExpression: aws.String(conditionExpression),
+ TableName: aws.String(checkpointer.TableName),
+ Item: item,
+ ExpressionAttributeValues: expressionAttributeValues,
+ })
+}
+
+func (checkpointer *DynamoCheckpoint) putItem(input *dynamodb.PutItemInput) error {
+ return try.Do(func(attempt int) (bool, error) {
+ _, err := checkpointer.svc.PutItem(input)
+ if awsErr, ok := err.(awserr.Error); ok {
+ if awsErr.Code() == dynamodb.ErrCodeProvisionedThroughputExceededException ||
+ awsErr.Code() == dynamodb.ErrCodeInternalServerError &&
+ attempt < checkpointer.Retries {
+ // Backoff time as recommended by https://docs.aws.amazon.com/general/latest/gr/api-retries.html
+ time.Sleep(time.Duration(2^attempt*100) * time.Millisecond)
+ return true, err
+ }
+ }
+ return false, err
+ })
+}
+
+func (checkpointer *DynamoCheckpoint) getItem(shardID string) (map[string]*dynamodb.AttributeValue, error) {
+ var item *dynamodb.GetItemOutput
+ err := try.Do(func(attempt int) (bool, error) {
+ var err error
+ item, err = checkpointer.svc.GetItem(&dynamodb.GetItemInput{
+ TableName: aws.String(checkpointer.TableName),
+ Key: map[string]*dynamodb.AttributeValue{
+ "ShardID": {
+ S: aws.String(shardID),
+ },
+ },
+ })
+ if awsErr, ok := err.(awserr.Error); ok {
+ if awsErr.Code() == dynamodb.ErrCodeProvisionedThroughputExceededException ||
+ awsErr.Code() == dynamodb.ErrCodeInternalServerError &&
+ attempt < checkpointer.Retries {
+ // Backoff time as recommended by https://docs.aws.amazon.com/general/latest/gr/api-retries.html
+ time.Sleep(time.Duration(2^attempt*100) * time.Millisecond)
+ return true, err
+ }
+ }
+ return false, err
+ })
+ return item.Item, err
+}
diff --git a/src/clientlibrary/worker/record-processor-checkpointer.go b/src/clientlibrary/worker/record-processor-checkpointer.go
new file mode 100644
index 0000000..4f624f2
--- /dev/null
+++ b/src/clientlibrary/worker/record-processor-checkpointer.go
@@ -0,0 +1,56 @@
+package worker
+
+import (
+ "github.com/aws/aws-sdk-go/aws"
+
+ kcl "clientlibrary/interfaces"
+)
+
+type (
+
+ /* Objects of this class are prepared to checkpoint at a specific sequence number. They use an
+ * IRecordProcessorCheckpointer to do the actual checkpointing, so their checkpoint is subject to the same 'didn't go
+ * backwards' validation as a normal checkpoint.
+ */
+ PreparedCheckpointer struct {
+ pendingCheckpointSequenceNumber *kcl.ExtendedSequenceNumber
+ checkpointer kcl.IRecordProcessorCheckpointer
+ }
+
+ /**
+ * This class is used to enable RecordProcessors to checkpoint their progress.
+ * The Amazon Kinesis Client Library will instantiate an object and provide a reference to the application
+ * RecordProcessor instance. Amazon Kinesis Client Library will create one instance per shard assignment.
+ */
+ RecordProcessorCheckpointer struct {
+ shard *shardStatus
+ checkpoint Checkpointer
+ }
+)
+
+func NewRecordProcessorCheckpoint(shard *shardStatus, checkpoint Checkpointer) kcl.IRecordProcessorCheckpointer {
+ return &RecordProcessorCheckpointer{
+ shard: shard,
+ checkpoint: checkpoint,
+ }
+}
+
+func (pc *PreparedCheckpointer) GetPendingCheckpoint() *kcl.ExtendedSequenceNumber {
+ return pc.pendingCheckpointSequenceNumber
+}
+
+func (pc *PreparedCheckpointer) Checkpoint() error {
+ return pc.checkpointer.Checkpoint(pc.pendingCheckpointSequenceNumber.SequenceNumber)
+}
+
+func (rc *RecordProcessorCheckpointer) Checkpoint(sequenceNumber *string) error {
+ rc.shard.mux.Lock()
+ rc.shard.Checkpoint = aws.StringValue(sequenceNumber)
+ rc.shard.mux.Unlock()
+ return rc.checkpoint.CheckpointSequence(rc.shard)
+}
+
+func (rc *RecordProcessorCheckpointer) PrepareCheckpoint(sequenceNumber *string) (kcl.IPreparedCheckpointer, error) {
+ return &PreparedCheckpointer{}, nil
+
+}
diff --git a/src/clientlibrary/worker/shard-consumer.go b/src/clientlibrary/worker/shard-consumer.go
new file mode 100644
index 0000000..905b48b
--- /dev/null
+++ b/src/clientlibrary/worker/shard-consumer.go
@@ -0,0 +1,195 @@
+package worker
+
+import (
+ log "github.com/sirupsen/logrus"
+ "sync"
+ "time"
+
+ "github.com/aws/aws-sdk-go/aws"
+ "github.com/aws/aws-sdk-go/aws/awserr"
+ "github.com/aws/aws-sdk-go/service/kinesis"
+ "github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
+
+ "clientlibrary/config"
+ kcl "clientlibrary/interfaces"
+ "clientlibrary/metrics"
+)
+
+const (
+ // This is the initial state of a shard consumer. This causes the consumer to remain blocked until the all
+ // parent shards have been completed.
+ WAITING_ON_PARENT_SHARDS ShardConsumerState = iota + 1
+
+ // This state is responsible for initializing the record processor with the shard information.
+ INITIALIZING
+
+ //
+ PROCESSING
+
+ SHUTDOWN_REQUESTED
+
+ SHUTTING_DOWN
+
+ SHUTDOWN_COMPLETE
+
+ // ErrCodeKMSThrottlingException is defined in the API Reference https://docs.aws.amazon.com/sdk-for-go/api/service/kinesis/#Kinesis.GetRecords
+ // But it's not a constant?
+ ErrCodeKMSThrottlingException = "KMSThrottlingException"
+)
+
+type ShardConsumerState int
+
+// ShardConsumer is responsible for consuming data records of a (specified) shard.
+// Note: ShardConsumer only deal with one shard.
+type ShardConsumer struct {
+ streamName string
+ shard *shardStatus
+ kc kinesisiface.KinesisAPI
+ checkpointer Checkpointer
+ recordProcessor kcl.IRecordProcessor
+ kclConfig *config.KinesisClientLibConfiguration
+ stop *chan struct{}
+ waitGroup *sync.WaitGroup
+ consumerID string
+ mService metrics.MonitoringService
+ state ShardConsumerState
+}
+
+func (sc *ShardConsumer) getShardIterator(shard *shardStatus) (*string, error) {
+ err := sc.checkpointer.FetchCheckpoint(shard)
+ if err != nil && err != ErrSequenceIDNotFound {
+ return nil, err
+ }
+
+ // If there isn't any checkpoint for the shard, use the configuration value.
+ if shard.Checkpoint == "" {
+ initPos := sc.kclConfig.InitialPositionInStream
+ shardIterArgs := &kinesis.GetShardIteratorInput{
+ ShardId: &shard.ID,
+ ShardIteratorType: config.InitalPositionInStreamToShardIteratorType(initPos),
+ StreamName: &sc.streamName,
+ }
+ iterResp, err := sc.kc.GetShardIterator(shardIterArgs)
+ if err != nil {
+ return nil, err
+ }
+ return iterResp.ShardIterator, nil
+ }
+
+ shardIterArgs := &kinesis.GetShardIteratorInput{
+ ShardId: &shard.ID,
+ ShardIteratorType: aws.String("AFTER_SEQUENCE_NUMBER"),
+ StartingSequenceNumber: &shard.Checkpoint,
+ StreamName: &sc.streamName,
+ }
+ iterResp, err := sc.kc.GetShardIterator(shardIterArgs)
+ if err != nil {
+ return nil, err
+ }
+ return iterResp.ShardIterator, nil
+}
+
+func (sc *ShardConsumer) getRecords(shard *shardStatus) error {
+ defer sc.waitGroup.Done()
+
+ shardIterator, err := sc.getShardIterator(shard)
+ if err != nil {
+ log.Errorf("Unable to get shard iterator for %s: %v", shard.ID, err)
+ return err
+ }
+
+ recordCheckpointer := NewRecordProcessorCheckpoint(shard, sc.checkpointer)
+ var retriedErrors int
+
+ for {
+ getRecordsStartTime := time.Now()
+ if time.Now().UTC().After(shard.LeaseTimeout.Add(-5 * time.Second)) {
+ log.Debugf("Refreshing lease on shard: %s for worker: %s", shard.ID, sc.consumerID)
+ err = sc.checkpointer.GetLease(shard, sc.consumerID)
+ if err != nil {
+ if err.Error() == ErrLeaseNotAquired {
+ shard.setLeaseOwner("")
+ sc.mService.LeaseLost(shard.ID)
+ log.Warnf("Failed in acquiring lease on shard: %s for worker: %s", shard.ID, sc.consumerID)
+ return nil
+ }
+ log.Fatal(err)
+ }
+ }
+
+ log.Debugf("Trying to read %d record from iterator: %v", sc.kclConfig.MaxRecords, aws.StringValue(shardIterator))
+ getRecordsArgs := &kinesis.GetRecordsInput{
+ Limit: aws.Int64(int64(sc.kclConfig.MaxRecords)),
+ ShardIterator: shardIterator,
+ }
+ // Get records from stream and retry as needed
+ getResp, err := sc.kc.GetRecords(getRecordsArgs)
+ if err != nil {
+ if awsErr, ok := err.(awserr.Error); ok {
+ if awsErr.Code() == kinesis.ErrCodeProvisionedThroughputExceededException || awsErr.Code() == ErrCodeKMSThrottlingException {
+ log.Errorf("Error getting records from shard %v: %v", shard.ID, err)
+ retriedErrors++
+ // exponential backoff
+ time.Sleep(time.Duration(2^retriedErrors*100) * time.Millisecond)
+ continue
+ }
+ }
+ log.Fatalf("Error getting records from Kinesis that cannot be retried: %s\nRequest: %s", err, getRecordsArgs)
+ }
+ retriedErrors = 0
+
+ // IRecordProcessorCheckpointer
+ input := &kcl.ProcessRecordsInput{
+ Records: getResp.Records,
+ MillisBehindLatest: aws.Int64Value(getResp.MillisBehindLatest),
+ Checkpointer: recordCheckpointer,
+ }
+
+ recordLength := len(input.Records)
+ recordBytes := int64(0)
+ log.Debugf("Received %d records", recordLength)
+
+ for _, r := range getResp.Records {
+ recordBytes += int64(len(r.Data))
+ }
+
+ if recordLength > 0 || sc.kclConfig.CallProcessRecordsEvenForEmptyRecordList {
+ processRecordsStartTime := time.Now()
+
+ // Delivery the events to the record processor
+ sc.recordProcessor.ProcessRecords(input)
+
+ // Convert from nanoseconds to milliseconds
+ processedRecordsTiming := time.Since(processRecordsStartTime) / 1000000
+ sc.mService.RecordProcessRecordsTime(shard.ID, float64(processedRecordsTiming))
+ }
+
+ // Idle between each read, the user is responsible for checkpoint the progress
+ time.Sleep(time.Duration(sc.kclConfig.IdleTimeBetweenReadsInMillis) * time.Millisecond)
+
+ sc.mService.IncrRecordsProcessed(shard.ID, recordLength)
+ sc.mService.IncrBytesProcessed(shard.ID, recordBytes)
+ sc.mService.MillisBehindLatest(shard.ID, float64(*getResp.MillisBehindLatest))
+
+ // Convert from nanoseconds to milliseconds
+ getRecordsTime := time.Since(getRecordsStartTime) / 1000000
+ sc.mService.RecordGetRecordsTime(shard.ID, float64(getRecordsTime))
+
+ // The shard has been closed, so no new records can be read from it
+ if getResp.NextShardIterator == nil {
+ log.Infof("Shard %s closed", shard.ID)
+ shutdownInput := &kcl.ShutdownInput{ShutdownReason: kcl.TERMINATE, Checkpointer: recordCheckpointer}
+ sc.recordProcessor.Shutdown(shutdownInput)
+ return nil
+ }
+ shardIterator = getResp.NextShardIterator
+
+ select {
+ case <-*sc.stop:
+ shutdownInput := &kcl.ShutdownInput{ShutdownReason: kcl.REQUESTED, Checkpointer: recordCheckpointer}
+ sc.recordProcessor.Shutdown(shutdownInput)
+ return nil
+ case <-time.After(1 * time.Nanosecond):
+ }
+ }
+}
diff --git a/src/clientlibrary/worker/worker.go b/src/clientlibrary/worker/worker.go
new file mode 100644
index 0000000..39ed3d1
--- /dev/null
+++ b/src/clientlibrary/worker/worker.go
@@ -0,0 +1,289 @@
+package worker
+
+import (
+ "errors"
+ log "github.com/sirupsen/logrus"
+ "os"
+ "os/signal"
+ "sync"
+ "syscall"
+ "time"
+
+ "github.com/aws/aws-sdk-go/aws"
+ "github.com/aws/aws-sdk-go/aws/session"
+ "github.com/aws/aws-sdk-go/service/dynamodb"
+ "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface"
+ "github.com/aws/aws-sdk-go/service/kinesis"
+ "github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
+
+ "clientlibrary/config"
+ kcl "clientlibrary/interfaces"
+ "clientlibrary/metrics"
+)
+
+type shardStatus struct {
+ ID string
+ Checkpoint string
+ AssignedTo string
+ mux *sync.Mutex
+ LeaseTimeout time.Time
+}
+
+func (ss *shardStatus) getLeaseOwner() string {
+ ss.mux.Lock()
+ defer ss.mux.Unlock()
+ return ss.AssignedTo
+}
+
+func (ss *shardStatus) setLeaseOwner(owner string) {
+ ss.mux.Lock()
+ defer ss.mux.Unlock()
+ ss.AssignedTo = owner
+}
+
+/**
+ * Worker is the high level class that Kinesis applications use to start processing data. It initializes and oversees
+ * different components (e.g. syncing shard and lease information, tracking shard assignments, and processing data from
+ * the shards).
+ */
+type Worker struct {
+ streamName string
+ regionName string
+ workerID string
+
+ processorFactory kcl.IRecordProcessorFactory
+ kclConfig *config.KinesisClientLibConfiguration
+ kc kinesisiface.KinesisAPI
+ dynamo dynamodbiface.DynamoDBAPI
+ checkpointer Checkpointer
+
+ stop *chan struct{}
+ waitGroup *sync.WaitGroup
+ sigs *chan os.Signal
+
+ shardStatus map[string]*shardStatus
+
+ metricsConfig *metrics.MonitoringConfiguration
+ mService metrics.MonitoringService
+}
+
+// NewWorker constructs a Worker instance for processing Kinesis stream data.
+func NewWorker(factory kcl.IRecordProcessorFactory, kclConfig *config.KinesisClientLibConfiguration, metricsConfig *metrics.MonitoringConfiguration) *Worker {
+ w := &Worker{
+ streamName: kclConfig.StreamName,
+ regionName: kclConfig.RegionName,
+ workerID: kclConfig.WorkerID,
+ processorFactory: factory,
+ kclConfig: kclConfig,
+ metricsConfig: metricsConfig,
+ }
+
+ // create session for Kinesis
+ log.Info("Creating Kinesis session")
+ s := session.New(&aws.Config{Region: aws.String(w.regionName)})
+ w.kc = kinesis.New(s)
+
+ log.Info("Creating DynamoDB session")
+ s = session.New(&aws.Config{Region: aws.String(w.regionName)})
+ w.dynamo = dynamodb.New(s)
+ w.checkpointer = NewDynamoCheckpoint(w.dynamo, kclConfig)
+
+ if w.metricsConfig == nil {
+ w.metricsConfig = &metrics.MonitoringConfiguration{MonitoringService: ""}
+ }
+ return w
+}
+
+// Run starts consuming data from the stream, and pass it to the application record processors.
+func (w *Worker) Start() error {
+ if err := w.initialize(); err != nil {
+ log.Errorf("Failed to start Worker: %+v", err)
+ return err
+ }
+
+ log.Info("Initialization complete. Starting worker event loop.")
+
+ // entering event loop
+ go w.eventLoop()
+ return nil
+}
+
+// Shutdown signals worker to shutdown. Worker will try initiating shutdown of all record processors.
+func (w *Worker) Shutdown() {
+ log.Info("Worker shutdown in requested.")
+
+ close(*w.stop)
+ w.waitGroup.Wait()
+
+ log.Info("Worker loop is complete. Exiting from worker.")
+}
+
+// Publish to write some data into stream. This function is mainly used for testing purpose.
+func (w *Worker) Publish(streamName, partitionKey string, data []byte) error {
+ _, err := w.kc.PutRecord(&kinesis.PutRecordInput{
+ Data: data,
+ StreamName: aws.String(streamName),
+ PartitionKey: aws.String(partitionKey),
+ })
+ if err != nil {
+ log.Errorf("Error in publishing data to %s/%s. Error: %+v", streamName, partitionKey, err)
+ }
+ return err
+}
+
+// initialize
+func (w *Worker) initialize() error {
+ log.Info("Worker initialization in progress...")
+
+ err := w.metricsConfig.Init(w.kclConfig.ApplicationName, w.streamName, w.workerID)
+ if err != nil {
+ log.Errorf("Failed to start monitoring service: %s", err)
+ }
+ w.mService = w.metricsConfig.GetMonitoringService()
+
+ log.Info("Initializing Checkpointer")
+ if err := w.checkpointer.Init(); err != nil {
+ log.Errorf("Failed to start Checkpointer: %+v", err)
+ return err
+ }
+
+ w.shardStatus = make(map[string]*shardStatus)
+
+ sigs := make(chan os.Signal, 1)
+ w.sigs = &sigs
+ signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
+
+ stopChan := make(chan struct{})
+ w.stop = &stopChan
+
+ wg := sync.WaitGroup{}
+ w.waitGroup = &wg
+
+ err = w.getShardIDs("")
+ if err != nil {
+ log.Errorf("Error getting Kinesis shards: %s", err)
+ return err
+ }
+
+ return nil
+}
+
+// newShardConsumer to create a shard consumer instance
+func (w *Worker) newShardConsumer(shard *shardStatus) *ShardConsumer {
+ return &ShardConsumer{
+ streamName: w.streamName,
+ shard: shard,
+ kc: w.kc,
+ checkpointer: w.checkpointer,
+ recordProcessor: w.processorFactory.CreateProcessor(),
+ kclConfig: w.kclConfig,
+ consumerID: w.workerID,
+ stop: w.stop,
+ waitGroup: w.waitGroup,
+ mService: w.mService,
+ state: WAITING_ON_PARENT_SHARDS,
+ }
+}
+
+// eventLoop
+func (w *Worker) eventLoop() {
+ for {
+ err := w.getShardIDs("")
+ if err != nil {
+ log.Errorf("Error getting Kinesis shards: %v", err)
+ // Back-off?
+ time.Sleep(500 * time.Millisecond)
+ }
+ log.Infof("Found %d shards", len(w.shardStatus))
+
+ // Count the number of leases hold by this worker
+ counter := 0
+ for _, shard := range w.shardStatus {
+ if shard.getLeaseOwner() == w.workerID {
+ counter++
+ }
+ }
+
+ // max number of lease has not been reached
+ if counter < w.kclConfig.MaxLeasesForWorker {
+ for _, shard := range w.shardStatus {
+ // We already own this shard so carry on
+ if shard.getLeaseOwner() == w.workerID {
+ continue
+ }
+
+ err := w.checkpointer.FetchCheckpoint(shard)
+ if err != nil {
+ if err != ErrSequenceIDNotFound {
+ log.Fatal(err)
+ }
+ }
+
+ err = w.checkpointer.GetLease(shard, w.workerID)
+ if err != nil {
+ if err.Error() == ErrLeaseNotAquired {
+ continue
+ }
+ log.Fatal(err)
+ }
+
+ w.mService.LeaseGained(shard.ID)
+
+ log.Infof("Start Shard Consumer for shard: %v", shard.ID)
+ sc := w.newShardConsumer(shard)
+ go sc.getRecords(shard)
+ w.waitGroup.Add(1)
+ }
+ }
+
+ select {
+ case sig := <-*w.sigs:
+ log.Infof("Received signal %s. Exiting", sig)
+ w.Shutdown()
+ return
+ case <-*w.stop:
+ log.Info("Shutting down")
+ return
+ case <-time.After(time.Duration(w.kclConfig.ShardSyncIntervalMillis) * time.Millisecond):
+ }
+ }
+}
+
+// List all ACTIVE shard and store them into shardStatus table
+func (w *Worker) getShardIDs(startShardID string) error {
+ args := &kinesis.DescribeStreamInput{
+ StreamName: aws.String(w.streamName),
+ }
+ if startShardID != "" {
+ args.ExclusiveStartShardId = aws.String(startShardID)
+ }
+ streamDesc, err := w.kc.DescribeStream(args)
+ if err != nil {
+ return err
+ }
+
+ if *streamDesc.StreamDescription.StreamStatus != "ACTIVE" {
+ return errors.New("Stream not active")
+ }
+
+ var lastShardID string
+ for _, s := range streamDesc.StreamDescription.Shards {
+ if _, ok := w.shardStatus[*s.ShardId]; !ok {
+ log.Debugf("Found shard with id %s", *s.ShardId)
+ w.shardStatus[*s.ShardId] = &shardStatus{
+ ID: *s.ShardId,
+ mux: &sync.Mutex{},
+ }
+ }
+ lastShardID = *s.ShardId
+ }
+
+ if *streamDesc.StreamDescription.HasMoreShards {
+ err := w.getShardIDs(lastShardID)
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
diff --git a/src/clientlibrary/worker/worker_test.go b/src/clientlibrary/worker/worker_test.go
new file mode 100644
index 0000000..ebcbc3d
--- /dev/null
+++ b/src/clientlibrary/worker/worker_test.go
@@ -0,0 +1,108 @@
+package worker
+
+import (
+ "os"
+ "testing"
+ "time"
+
+ log "github.com/sirupsen/logrus"
+
+ cfg "clientlibrary/config"
+ kc "clientlibrary/interfaces"
+ "clientlibrary/utils"
+ "github.com/stretchr/testify/assert"
+)
+
+const (
+ streamName = "kcl-test"
+ regionName = "us-west-2"
+ workerID = "test-worker"
+)
+
+const specstr = `{"name":"kube-qQyhk","networking":{"containerNetworkCidr":"10.2.0.0/16"},"orgName":"BVT-Org-cLQch","projectName":"project-tDSJd","serviceLevel":"DEVELOPER","size":{"count":1},"version":"1.8.1-4"}`
+
+func TestWorker(t *testing.T) {
+ os.Setenv("AWS_ACCESS_KEY_ID", "your aws access key id")
+ os.Setenv("AWS_SECRET_ACCESS_KEY", "your aws secret access key")
+ defer os.Unsetenv("AWS_ACCESS_KEY_ID")
+ defer os.Unsetenv("AWS_SECRET_ACCESS_KEY")
+ kclConfig := cfg.NewKinesisClientLibConfig("appName", streamName, regionName, workerID).
+ WithInitialPositionInStream(cfg.LATEST).
+ WithMaxRecords(40).
+ WithMaxLeasesForWorker(1).
+ WithShardSyncIntervalMillis(5000)
+
+ log.SetOutput(os.Stdout)
+ log.SetLevel(log.DebugLevel)
+
+ assert.Equal(t, regionName, kclConfig.RegionName)
+ assert.Equal(t, streamName, kclConfig.StreamName)
+
+ worker := NewWorker(recordProcessorFactory(t), kclConfig, nil)
+ assert.Equal(t, regionName, worker.regionName)
+ assert.Equal(t, streamName, worker.streamName)
+
+ err := worker.Start()
+ assert.Nil(t, err)
+
+ // Put some data into stream.
+ for i := 0; i < 100; i++ {
+ // Use random string as partition key to ensure even distribution across shards
+ err := worker.Publish(streamName, utils.RandStringBytesMaskImpr(10), []byte(specstr))
+ if err != nil {
+ t.Errorf("Errorin Publish. %+v", err)
+ }
+ }
+
+ time.Sleep(10 * time.Second)
+ worker.Shutdown()
+}
+
+// Record processor factory is used to create RecordProcessor
+func recordProcessorFactory(t *testing.T) kc.IRecordProcessorFactory {
+ return &dumpRecordProcessorFactory{t: t}
+}
+
+// simple record processor and dump everything
+type dumpRecordProcessorFactory struct {
+ t *testing.T
+}
+
+func (d *dumpRecordProcessorFactory) CreateProcessor() kc.IRecordProcessor {
+ return &dumpRecordProcessor{
+ t: d.t,
+ }
+}
+
+// Create a dump record processor for printing out all data from record.
+type dumpRecordProcessor struct {
+ t *testing.T
+}
+
+func (dd *dumpRecordProcessor) Initialize(input *kc.InitializationInput) {
+ dd.t.Logf("sharId=%v", input.ShardId)
+}
+
+func (dd *dumpRecordProcessor) ProcessRecords(input *kc.ProcessRecordsInput) {
+ dd.t.Log("Processing Records...")
+
+ // don't process empty record
+ if len(input.Records) == 0 {
+ return
+ }
+
+ for _, v := range input.Records {
+ dd.t.Logf("Record = %s", v.Data)
+ assert.Equal(dd.t, specstr, string(v.Data))
+ }
+
+ dd.t.Logf("Checkpoint it and MillisBehindLatest = %v", input.MillisBehindLatest)
+ // checkpoint it after processing this batch
+ lastRecordSequenceNubmer := input.Records[len(input.Records)-1].SequenceNumber
+ input.Checkpointer.Checkpoint(lastRecordSequenceNubmer)
+}
+
+func (dd *dumpRecordProcessor) Shutdown(input *kc.ShutdownInput) {
+ dd.t.Logf("Shutdown Reason = %v", input.ShutdownReason)
+
+}
diff --git a/src/leases/impl/lease.go b/src/leases/impl/lease.go
index b87ecf3..394475f 100644
--- a/src/leases/impl/lease.go
+++ b/src/leases/impl/lease.go
@@ -16,8 +16,11 @@ const (
// processing the corresponding unit of work, or until it fails. When the worker stops holding the lease, another worker will
// take and hold the lease.
type Lease struct {
- leaseKey string
- leaseOwner string
+ // shard-id
+ leaseKey string
+ // worker#
+ leaseOwner string
+ // ccounter incremented periodically
leaseCounter int64
// This field is used to prevent updates to leases that we have lost and re-acquired. It is deliberately not
diff --git a/support/scripts/functions.sh b/support/scripts/functions.sh
index b7265ea..845e3c3 100644
--- a/support/scripts/functions.sh
+++ b/support/scripts/functions.sh
@@ -14,6 +14,7 @@ local_go_pkgs() {
grep -Fv '/tmp/' | \
grep -Fv '/run/' | \
grep -Fv '/tests/' | \
+ grep -Fv '/gokini/' | \
sed -r 's|(.+)/[^/]+\.go$|\1|g' | \
sort -u
}