KCL: Implement Worker

This is the core part of KCL by implementing worker.
It has exactly the same interface as Amazon's KCL. Internally,
it uses code from GoKini in order to get the library
functionaly quickly.

This is a working version. The test code worker_test.go
shows how to use this library.

Dynamic resharding feature is out of the scope of M4.

Test:

1. A Kinesis stream named "kcl-test" has been created under photon-infra
account.
2. Download your AWS Credential from IAM user page.
3. Modify the worker_test.go to fill in your aws credential.
4. hmake test

Jira CNA-637

Change-Id: I886d255bab9adaf7a13bca11bfda51bedaacaaed
This commit is contained in:
Tao Jiang 2018-04-17 09:25:41 -07:00
parent 1969713863
commit a323d2fd51
20 changed files with 1554 additions and 290 deletions

View file

@ -5,17 +5,17 @@ import (
"math" "math"
"strings" "strings"
"time" "time"
"github.com/aws/aws-sdk-go/aws"
) )
const ( const (
EPSILON_MS = 25
// LATEST start after the most recent data record (fetch new data). // LATEST start after the most recent data record (fetch new data).
LATEST = InitialPositionInStream(1) LATEST InitialPositionInStream = iota + 1
// TRIM_HORIZON start from the oldest available data record // TRIM_HORIZON start from the oldest available data record
TRIM_HORIZON = LATEST + 1 TRIM_HORIZON
// AT_TIMESTAMP start from the record at or after the specified server-side Timestamp. // AT_TIMESTAMP start from the record at or after the specified server-side Timestamp.
AT_TIMESTAMP = TRIM_HORIZON + 1 AT_TIMESTAMP
// The location in the shard from which the KinesisClientLibrary will start fetching records from // The location in the shard from which the KinesisClientLibrary will start fetching records from
// when the application starts for the first time and there is no checkpoint for the shard. // when the application starts for the first time and there is no checkpoint for the shard.
@ -119,6 +119,7 @@ type (
} }
// Configuration for the Kinesis Client Library. // Configuration for the Kinesis Client Library.
// Note: There is no need to configure credential provider. Credential can be get from InstanceProfile.
KinesisClientLibConfiguration struct { KinesisClientLibConfiguration struct {
// ApplicationName is name of application. Kinesis allows multiple applications to consume the same stream. // ApplicationName is name of application. Kinesis allows multiple applications to consume the same stream.
ApplicationName string ApplicationName string
@ -132,12 +133,6 @@ type (
// WorkerID used to distinguish different workers/processes of a Kinesis application // WorkerID used to distinguish different workers/processes of a Kinesis application
WorkerID string WorkerID string
// KinesisEndpoint endpoint
KinesisEndpoint string
// DynamoDB endpoint
DynamoDBEndpoint string
// InitialPositionInStream specifies the Position in the stream where a new application should start from // InitialPositionInStream specifies the Position in the stream where a new application should start from
InitialPositionInStream InitialPositionInStream InitialPositionInStream InitialPositionInStream
@ -209,12 +204,19 @@ type (
// Worker should skip syncing shards and leases at startup if leases are present // Worker should skip syncing shards and leases at startup if leases are present
// This is useful for optimizing deployments to large fleets working on a stable stream. // This is useful for optimizing deployments to large fleets working on a stable stream.
SkipShardSyncAtWorkerInitializationIfLeasesExist bool SkipShardSyncAtWorkerInitializationIfLeasesExist bool
// The max number of threads in the worker thread pool to getRecords.
WorkerThreadPoolSize int
} }
) )
var positionMap = map[InitialPositionInStream]*string{
LATEST: aws.String("LATEST"),
TRIM_HORIZON: aws.String("TRIM_HORIZON"),
AT_TIMESTAMP: aws.String("AT_TIMESTAMP"),
}
func InitalPositionInStreamToShardIteratorType(pos InitialPositionInStream) *string {
return positionMap[pos]
}
func empty(s string) bool { func empty(s string) bool {
return len(strings.TrimSpace(s)) == 0 return len(strings.TrimSpace(s)) == 0
} }

View file

@ -7,7 +7,7 @@ import (
) )
func TestConfig(t *testing.T) { func TestConfig(t *testing.T) {
kclConfig := NewKinesisClientLibConfig("appName", "StreamName", "workerId"). kclConfig := NewKinesisClientLibConfig("appName", "StreamName", "us-west-2", "workerId").
WithFailoverTimeMillis(500). WithFailoverTimeMillis(500).
WithMaxRecords(100). WithMaxRecords(100).
WithInitialPositionInStream(TRIM_HORIZON). WithInitialPositionInStream(TRIM_HORIZON).
@ -15,8 +15,7 @@ func TestConfig(t *testing.T) {
WithCallProcessRecordsEvenForEmptyRecordList(true). WithCallProcessRecordsEvenForEmptyRecordList(true).
WithTaskBackoffTimeMillis(10). WithTaskBackoffTimeMillis(10).
WithMetricsBufferTimeMillis(500). WithMetricsBufferTimeMillis(500).
WithMetricsMaxQueueSize(200). WithMetricsMaxQueueSize(200)
WithRegionName("us-west-2")
assert.Equal(t, "appName", kclConfig.ApplicationName) assert.Equal(t, "appName", kclConfig.ApplicationName)
assert.Equal(t, 500, kclConfig.FailoverTimeMillis) assert.Equal(t, 500, kclConfig.FailoverTimeMillis)

View file

@ -6,10 +6,10 @@ import (
) )
// NewKinesisClientLibConfig to create a default KinesisClientLibConfiguration based on the required fields. // NewKinesisClientLibConfig to create a default KinesisClientLibConfiguration based on the required fields.
func NewKinesisClientLibConfig(applicationName, streamName, workerID string) *KinesisClientLibConfiguration { func NewKinesisClientLibConfig(applicationName, streamName, regionName, workerID string) *KinesisClientLibConfiguration {
checkIsValueNotEmpty("ApplicationName", applicationName) checkIsValueNotEmpty("ApplicationName", applicationName)
checkIsValueNotEmpty("StreamName", streamName) checkIsValueNotEmpty("StreamName", streamName)
checkIsValueNotEmpty("ApplicationName", applicationName) checkIsValueNotEmpty("RegionName", regionName)
if empty(workerID) { if empty(workerID) {
workerID = utils.MustNewUUID() workerID = utils.MustNewUUID()
@ -20,8 +20,8 @@ func NewKinesisClientLibConfig(applicationName, streamName, workerID string) *Ki
ApplicationName: applicationName, ApplicationName: applicationName,
TableName: applicationName, TableName: applicationName,
StreamName: streamName, StreamName: streamName,
RegionName: regionName,
WorkerID: workerID, WorkerID: workerID,
KinesisEndpoint: "",
InitialPositionInStream: DEFAULT_INITIAL_POSITION_IN_STREAM, InitialPositionInStream: DEFAULT_INITIAL_POSITION_IN_STREAM,
InitialPositionInStreamExtended: *newInitialPosition(DEFAULT_INITIAL_POSITION_IN_STREAM), InitialPositionInStreamExtended: *newInitialPosition(DEFAULT_INITIAL_POSITION_IN_STREAM),
FailoverTimeMillis: DEFAULT_FAILOVER_TIME_MILLIS, FailoverTimeMillis: DEFAULT_FAILOVER_TIME_MILLIS,
@ -35,14 +35,12 @@ func NewKinesisClientLibConfig(applicationName, streamName, workerID string) *Ki
MetricsBufferTimeMillis: DEFAULT_METRICS_BUFFER_TIME_MILLIS, MetricsBufferTimeMillis: DEFAULT_METRICS_BUFFER_TIME_MILLIS,
MetricsMaxQueueSize: DEFAULT_METRICS_MAX_QUEUE_SIZE, MetricsMaxQueueSize: DEFAULT_METRICS_MAX_QUEUE_SIZE,
ValidateSequenceNumberBeforeCheckpointing: DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING, ValidateSequenceNumberBeforeCheckpointing: DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING,
RegionName: "",
ShutdownGraceMillis: DEFAULT_SHUTDOWN_GRACE_MILLIS, ShutdownGraceMillis: DEFAULT_SHUTDOWN_GRACE_MILLIS,
MaxLeasesForWorker: DEFAULT_MAX_LEASES_FOR_WORKER, MaxLeasesForWorker: DEFAULT_MAX_LEASES_FOR_WORKER,
MaxLeasesToStealAtOneTime: DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME, MaxLeasesToStealAtOneTime: DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME,
InitialLeaseTableReadCapacity: DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY, InitialLeaseTableReadCapacity: DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY,
InitialLeaseTableWriteCapacity: DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY, InitialLeaseTableWriteCapacity: DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY,
SkipShardSyncAtWorkerInitializationIfLeasesExist: DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, SkipShardSyncAtWorkerInitializationIfLeasesExist: DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
WorkerThreadPoolSize: 1,
} }
} }
@ -52,11 +50,6 @@ func (c *KinesisClientLibConfiguration) WithTableName(tableName string) *Kinesis
return c return c
} }
func (c *KinesisClientLibConfiguration) WithKinesisEndpoint(kinesisEndpoint string) *KinesisClientLibConfiguration {
c.KinesisEndpoint = kinesisEndpoint
return c
}
func (c *KinesisClientLibConfiguration) WithInitialPositionInStream(initialPositionInStream InitialPositionInStream) *KinesisClientLibConfiguration { func (c *KinesisClientLibConfiguration) WithInitialPositionInStream(initialPositionInStream InitialPositionInStream) *KinesisClientLibConfiguration {
c.InitialPositionInStream = initialPositionInStream c.InitialPositionInStream = initialPositionInStream
c.InitialPositionInStreamExtended = *newInitialPosition(initialPositionInStream) c.InitialPositionInStreamExtended = *newInitialPosition(initialPositionInStream)
@ -87,6 +80,14 @@ func (c *KinesisClientLibConfiguration) WithMaxRecords(maxRecords int) *KinesisC
return c return c
} }
// WithMaxLeasesForWorker configures maximum lease this worker can handles. It determines how maximun number of shards
// this worker can handle.
func (c *KinesisClientLibConfiguration) WithMaxLeasesForWorker(n int) *KinesisClientLibConfiguration {
checkIsValuePositive("MaxLeasesForWorker", n)
c.MaxLeasesForWorker = n
return c
}
/** /**
* Controls how long the KCL will sleep if no records are returned from Kinesis * Controls how long the KCL will sleep if no records are returned from Kinesis
* *
@ -137,17 +138,3 @@ func (c *KinesisClientLibConfiguration) WithMetricsMaxQueueSize(metricsMaxQueueS
c.MetricsMaxQueueSize = metricsMaxQueueSize c.MetricsMaxQueueSize = metricsMaxQueueSize
return c return c
} }
// WithRegionName configures region for the stream
func (c *KinesisClientLibConfiguration) WithRegionName(regionName string) *KinesisClientLibConfiguration {
checkIsValueNotEmpty("RegionName", regionName)
c.RegionName = regionName
return c
}
// WithWorkerThreadPoolSize configures worker thread pool size
func (c *KinesisClientLibConfiguration) WithWorkerThreadPoolSize(n int) *KinesisClientLibConfiguration {
checkIsValuePositive("WorkerThreadPoolSize", n)
c.WorkerThreadPoolSize = n
return c
}

View file

@ -7,31 +7,56 @@ import (
) )
const ( const (
REQUESTED = ShutdownReason(1) /**
TERMINATE = REQUESTED + 1 * Indicates that the entire application is being shutdown, and if desired the record processor will be given a
ZOMBIE = TERMINATE + 1 * final chance to checkpoint. This state will not trigger a direct call to
* {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor#shutdown(ShutdownInput)}, but
* instead depend on a different interface for backward compatibility.
*/
REQUESTED ShutdownReason = iota + 1
/**
* Terminate processing for this RecordProcessor (resharding use case).
* Indicates that the shard is closed and all records from the shard have been delivered to the application.
* Applications SHOULD checkpoint their progress to indicate that they have successfully processed all records
* from this shard and processing of child shards can be started.
*/
TERMINATE
/**
* Processing will be moved to a different record processor (fail over, load balancing use cases).
* Applications SHOULD NOT checkpoint their progress (as another record processor may have already started
* processing data).
*/
ZOMBIE
) )
// Containers for the parameters to the IRecordProcessor // Containers for the parameters to the IRecordProcessor
type ( type (
/**
* Reason the RecordProcessor is being shutdown.
* Used to distinguish between a fail-over vs. a termination (shard is closed and all records have been delivered).
* In case of a fail over, applications should NOT checkpoint as part of shutdown,
* since another record processor may have already started processing records for that shard.
* In case of termination (resharding use case), applications SHOULD checkpoint their progress to indicate
* that they have successfully processed all the records (processing of child shards can then begin).
*/
ShutdownReason int ShutdownReason int
InitializationInput struct { InitializationInput struct {
shardId string ShardId string
extendedSequenceNumber *ExtendedSequenceNumber ExtendedSequenceNumber *ExtendedSequenceNumber
pendingCheckpointSequenceNumber *ExtendedSequenceNumber PendingCheckpointSequenceNumber *ExtendedSequenceNumber
} }
ProcessRecordsInput struct { ProcessRecordsInput struct {
cacheEntryTime *time.Time CacheEntryTime *time.Time
cacheExitTime *time.Time CacheExitTime *time.Time
records []*ks.Record Records []*ks.Record
checkpointer *IRecordProcessorCheckpointer Checkpointer IRecordProcessorCheckpointer
millisBehindLatest int64 MillisBehindLatest int64
} }
ShutdownInput struct { ShutdownInput struct {
shutdownReason ShutdownReason ShutdownReason ShutdownReason
checkpointer *IRecordProcessorCheckpointer Checkpointer IRecordProcessorCheckpointer
} }
) )

View file

@ -1,12 +1,8 @@
package interfaces package interfaces
import (
ks "github.com/aws/aws-sdk-go/service/kinesis"
)
type ( type (
IPreparedCheckpointer interface { IPreparedCheckpointer interface {
getPendingCheckpoint() ExtendedSequenceNumber GetPendingCheckpoint() *ExtendedSequenceNumber
/** /**
* This method will record a pending checkpoint. * This method will record a pending checkpoint.
@ -24,7 +20,7 @@ type (
* i.e. it is smaller than the last check point value (prepared or committed), or larger than the greatest * i.e. it is smaller than the last check point value (prepared or committed), or larger than the greatest
* sequence number seen by the associated record processor. * sequence number seen by the associated record processor.
*/ */
checkpoint() error Checkpoint() error
} }
/** /**
@ -33,46 +29,6 @@ type (
* checkpoint their progress. * checkpoint their progress.
*/ */
IRecordProcessorCheckpointer interface { IRecordProcessorCheckpointer interface {
/**
* This method will checkpoint the progress at the last data record that was delivered to the record processor.
* Upon fail over (after a successful checkpoint() call), the new/replacement RecordProcessor instance
* will receive data records whose sequenceNumber > checkpoint position (for each partition key).
* In steady state, applications should checkpoint periodically (e.g. once every 5 minutes).
* Calling this API too frequently can slow down the application (because it puts pressure on the underlying
* checkpoint storage layer).
*
* @error ThrottlingError Can't store checkpoint. Can be caused by checkpointing too frequently.
* Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency.
* @error ShutdownError The record processor instance has been shutdown. Another instance may have
* started processing some of these records already.
* The application should abort processing via this RecordProcessor instance.
* @error InvalidStateError Can't store checkpoint.
* Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist).
* @error KinesisClientLibDependencyError Encountered an issue when storing the checkpoint. The application can
* backoff and retry.
*/
checkpoint() error
/**
* This method will checkpoint the progress at the provided record. This method is analogous to
* {@link #checkpoint()} but provides the ability to specify the record at which to
* checkpoint.
*
* @param record A record at which to checkpoint in this shard. Upon failover,
* the Kinesis Client Library will start fetching records after this record's sequence number.
* @error ThrottlingError Can't store checkpoint. Can be caused by checkpointing too frequently.
* Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency.
* @error ShutdownError The record processor instance has been shutdown. Another instance may have
* started processing some of these records already.
* The application should abort processing via this RecordProcessor instance.
* @error InvalidStateError Can't store checkpoint.
* Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist).
* @error KinesisClientLibDependencyError Encountered an issue when storing the checkpoint. The application can
* backoff and retry.
*/
checkpointByRecord(record *ks.Record) error
/** /**
* This method will checkpoint the progress at the provided sequenceNumber. This method is analogous to * This method will checkpoint the progress at the provided sequenceNumber. This method is analogous to
* {@link #checkpoint()} but provides the ability to specify the sequence number at which to * {@link #checkpoint()} but provides the ability to specify the sequence number at which to
@ -94,87 +50,10 @@ type (
* greatest sequence number seen by the associated record processor. * greatest sequence number seen by the associated record processor.
* 2.) It is not a valid sequence number for a record in this shard. * 2.) It is not a valid sequence number for a record in this shard.
*/ */
checkpointBySequenceNumber(sequenceNumber string) error Checkpoint(sequenceNumber *string) error
/** /**
* This method will checkpoint the progress at the provided sequenceNumber and subSequenceNumber, the latter for * This method will record a pending checkpoint at the provided sequenceNumber.
* aggregated records produced with the Producer Library. This method is analogous to {@link #checkpoint()}
* but provides the ability to specify the sequence and subsequence numbers at which to checkpoint.
*
* @param sequenceNumber A sequence number at which to checkpoint in this shard. Upon failover, the Kinesis
* Client Library will start fetching records after the given sequence and subsequence numbers.
* @param subSequenceNumber A subsequence number at which to checkpoint within this shard. Upon failover, the
* Kinesis Client Library will start fetching records after the given sequence and subsequence numbers.
* @error ThrottlingError Can't store checkpoint. Can be caused by checkpointing too frequently.
* Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency.
* @error ShutdownError The record processor instance has been shutdown. Another instance may have
* started processing some of these records already.
* The application should abort processing via this RecordProcessor instance.
* @error InvalidStateError Can't store checkpoint.
* Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist).
* @error KinesisClientLibDependencyError Encountered an issue when storing the checkpoint. The application can
* backoff and retry.
* @error IllegalArgumentError The sequence number is invalid for one of the following reasons:
* 1.) It appears to be out of range, i.e. it is smaller than the last check point value, or larger than the
* greatest sequence number seen by the associated record processor.
* 2.) It is not a valid sequence number for a record in this shard.
*/
checkpointBySequenceNumberEx(sequenceNumber string, subSequenceNumber int64) error
/**
* This method will record a pending checkpoint at the last data record that was delivered to the record processor.
* If the application fails over between calling prepareCheckpoint() and checkpoint(), the init() method of the next
* IRecordProcessor for this shard will be informed of the prepared sequence number
*
* Application should use this to assist with idempotency across failover by calling prepareCheckpoint before having
* side effects, then by calling checkpoint on the returned PreparedCheckpointer after side effects are complete.
* Use the sequence number passed in to init() to behave idempotently.
*
* @return an IPreparedCheckpointer object that can be called later to persist the checkpoint.
*
* @error ThrottlingError Can't store pending checkpoint. Can be caused by checkpointing too frequently.
* Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency.
* @error ShutdownError The record processor instance has been shutdown. Another instance may have
* started processing some of these records already.
* The application should abort processing via this RecordProcessor instance.
* @error InvalidStateError Can't store pending checkpoint.
* Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist).
* @error KinesisClientLibDependencyError Encountered an issue when storing the pending checkpoint. The
* application can backoff and retry.
*/
prepareCheckpoint() (*IPreparedCheckpointer, error)
/**
* This method will record a pending checkpoint at the at the provided record. This method is analogous to
* {@link #prepareCheckpoint()} but provides the ability to specify the record at which to prepare the checkpoint.
*
* @param record A record at which to prepare checkpoint in this shard.
*
* Application should use this to assist with idempotency across failover by calling prepareCheckpoint before having
* side effects, then by calling checkpoint on the returned PreparedCheckpointer after side effects are complete.
* Use the sequence number and application state passed in to init() to behave idempotently.
*
* @return an IPreparedCheckpointer object that can be called later to persist the checkpoint.
*
* @error ThrottlingError Can't store pending checkpoint. Can be caused by checkpointing too frequently.
* Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency.
* @error ShutdownError The record processor instance has been shutdown. Another instance may have
* started processing some of these records already.
* The application should abort processing via this RecordProcessor instance.
* @error InvalidStateError Can't store pending checkpoint.
* Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist).
* @error KinesisClientLibDependencyError Encountered an issue when storing the pending checkpoint. The
* application can backoff and retry.
* @error IllegalArgumentError The sequence number is invalid for one of the following reasons:
* 1.) It appears to be out of range, i.e. it is smaller than the last check point value, or larger than the
* greatest sequence number seen by the associated record processor.
* 2.) It is not a valid sequence number for a record in this shard.
*/
prepareCheckpointByRecord(record *ks.Record) (*IPreparedCheckpointer, error)
/**
* This method will record a pending checkpoint at the provided sequenceNumber. This method is analogous to
* {@link #prepareCheckpoint()} but provides the ability to specify the sequence number at which to checkpoint.
* *
* @param sequenceNumber A sequence number at which to prepare checkpoint in this shard. * @param sequenceNumber A sequence number at which to prepare checkpoint in this shard.
@ -194,32 +73,6 @@ type (
* greatest sequence number seen by the associated record processor. * greatest sequence number seen by the associated record processor.
* 2.) It is not a valid sequence number for a record in this shard. * 2.) It is not a valid sequence number for a record in this shard.
*/ */
prepareCheckpointBySequenceNumber(sequenceNumber string) (*IPreparedCheckpointer, error) PrepareCheckpoint(sequenceNumber *string) (IPreparedCheckpointer, error)
/**
* This method will record a pending checkpoint at the provided sequenceNumber and subSequenceNumber, the latter for
* aggregated records produced with the Producer Library. This method is analogous to {@link #prepareCheckpoint()}
* but provides the ability to specify the sequence number at which to checkpoint
*
* @param sequenceNumber A sequence number at which to prepare checkpoint in this shard.
* @param subSequenceNumber A subsequence number at which to prepare checkpoint within this shard.
*
* @return an IPreparedCheckpointer object that can be called later to persist the checkpoint.
*
* @error ThrottlingError Can't store pending checkpoint. Can be caused by checkpointing too frequently.
* Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency.
* @error ShutdownError The record processor instance has been shutdown. Another instance may have
* started processing some of these records already.
* The application should abort processing via this RecordProcessor instance.
* @error InvalidStateError Can't store pending checkpoint.
* Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist).
* @error KinesisClientLibDependencyError Encountered an issue when storing the pending checkpoint. The
* application can backoff and retry.
* @error IllegalArgumentError The sequence number is invalid for one of the following reasons:
* 1.) It appears to be out of range, i.e. it is smaller than the last check point value, or larger than the
* greatest sequence number seen by the associated record processor.
* 2.) It is not a valid sequence number for a record in this shard.
*/
prepareCheckpointBySequenceNumberEx(sequenceNumber string, subSequenceNumber int64) (*IPreparedCheckpointer, error)
} }
) )

View file

@ -1,16 +1,17 @@
package interfaces package interfaces
type (
// IRecordProcessor is the interface for some callback functions invoked by KCL will // IRecordProcessor is the interface for some callback functions invoked by KCL will
// The main task of using KCL is to provide implementation on IRecordProcessor interface. // The main task of using KCL is to provide implementation on IRecordProcessor interface.
// Note: This is exactly the same interface as Amazon KCL IRecordProcessor v2 // Note: This is exactly the same interface as Amazon KCL IRecordProcessor v2
type IRecordProcessor interface { IRecordProcessor interface {
/** /**
* Invoked by the Amazon Kinesis Client Library before data records are delivered to the RecordProcessor instance * Invoked by the Amazon Kinesis Client Library before data records are delivered to the RecordProcessor instance
* (via processRecords). * (via processRecords).
* *
* @param initializationInput Provides information related to initialization * @param initializationInput Provides information related to initialization
*/ */
initialize(initializationInput InitializationInput) Initialize(initializationInput *InitializationInput)
/** /**
* Process data records. The Amazon Kinesis Client Library will invoke this method to deliver data records to the * Process data records. The Amazon Kinesis Client Library will invoke this method to deliver data records to the
@ -21,7 +22,7 @@ type IRecordProcessor interface {
* @param processRecordsInput Provides the records to be processed as well as information and capabilities related * @param processRecordsInput Provides the records to be processed as well as information and capabilities related
* to them (eg checkpointing). * to them (eg checkpointing).
*/ */
processRecords(processRecordsInput ProcessRecordsInput) ProcessRecords(processRecordsInput *ProcessRecordsInput)
/** /**
* Invoked by the Amazon Kinesis Client Library to indicate it will no longer send data records to this * Invoked by the Amazon Kinesis Client Library to indicate it will no longer send data records to this
@ -36,5 +37,18 @@ type IRecordProcessor interface {
* @param shutdownInput * @param shutdownInput
* Provides information and capabilities (eg checkpointing) related to shutdown of this record processor. * Provides information and capabilities (eg checkpointing) related to shutdown of this record processor.
*/ */
shutdown(shutdownInput ShutdownInput) Shutdown(shutdownInput *ShutdownInput)
} }
// IRecordProcessorFactory is interface for creating IRecordProcessor. Each Worker can have multiple threads
// for processing shard. Client can choose either creating one processor per shard or sharing them.
IRecordProcessorFactory interface {
/**
* Returns a record processor to be used for processing data records for a (assigned) shard.
*
* @return Returns a processor object.
*/
CreateProcessor() IRecordProcessor
}
)

View file

@ -6,6 +6,6 @@ package interfaces
// sub-sequence number, in addition to the regular sequence number of the Kinesis record. The sub-sequence number // sub-sequence number, in addition to the regular sequence number of the Kinesis record. The sub-sequence number
// is used to checkpoint within an aggregated record. // is used to checkpoint within an aggregated record.
type ExtendedSequenceNumber struct { type ExtendedSequenceNumber struct {
sequenceNumber string SequenceNumber *string
subSequenceNumber int64 SubSequenceNumber int64
} }

View file

@ -1,26 +0,0 @@
package checkpoint
import (
. "clientlibrary/interfaces"
)
const (
// TRIM_HORIZON starts from the first available record in the shard.
TRIM_HORIZON = SentinelCheckpoint(iota + 1)
// LATEST starts from the latest record in the shard.
LATEST
// SHARD_END We've completely processed all records in this shard.
SHARD_END
// AT_TIMESTAMP starts from the record at or after the specified server-side timestamp.
AT_TIMESTAMP
)
type (
SentinelCheckpoint int
// Checkpoint: a class encapsulating the 2 pieces of state stored in a checkpoint.
Checkpoint struct {
checkpoint *ExtendedSequenceNumber
pendingCheckpoint *ExtendedSequenceNumber
}
)

View file

@ -1 +0,0 @@
package worker

View file

@ -0,0 +1,274 @@
package metrics
import (
"sync"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface"
log "github.com/sirupsen/logrus"
)
type CloudWatchMonitoringService struct {
Namespace string
KinesisStream string
WorkerID string
// What granularity we should send metrics to CW at. Note setting this to 1 will cost quite a bit of money
// At the time of writing (March 2018) about US$200 per month
ResolutionSec int
svc cloudwatchiface.CloudWatchAPI
shardMetrics map[string]*cloudWatchMetrics
}
type cloudWatchMetrics struct {
processedRecords int64
processedBytes int64
behindLatestMillis []float64
leasesHeld int64
leaseRenewals int64
getRecordsTime []float64
processRecordsTime []float64
sync.Mutex
}
func (cw *CloudWatchMonitoringService) Init() error {
if cw.ResolutionSec == 0 {
cw.ResolutionSec = 60
}
session, err := session.NewSessionWithOptions(
session.Options{
SharedConfigState: session.SharedConfigEnable,
},
)
if err != nil {
return err
}
cw.svc = cloudwatch.New(session)
cw.shardMetrics = make(map[string]*cloudWatchMetrics)
return nil
}
func (cw *CloudWatchMonitoringService) flushDaemon() {
previousFlushTime := time.Now()
resolutionDuration := time.Duration(cw.ResolutionSec) * time.Second
for {
time.Sleep(resolutionDuration - time.Now().Sub(previousFlushTime))
err := cw.flush()
if err != nil {
log.Errorln("Error sending metrics to CloudWatch", err)
}
previousFlushTime = time.Now()
}
}
func (cw *CloudWatchMonitoringService) flush() error {
for shard, metric := range cw.shardMetrics {
metric.Lock()
defaultDimensions := []*cloudwatch.Dimension{
&cloudwatch.Dimension{
Name: aws.String("shard"),
Value: &shard,
},
&cloudwatch.Dimension{
Name: aws.String("KinesisStreamName"),
Value: &cw.KinesisStream,
},
}
leaseDimensions := make([]*cloudwatch.Dimension, len(defaultDimensions))
copy(defaultDimensions, leaseDimensions)
leaseDimensions = append(leaseDimensions, &cloudwatch.Dimension{
Name: aws.String("WorkerID"),
Value: &cw.WorkerID,
})
metricTimestamp := time.Now()
_, err := cw.svc.PutMetricData(&cloudwatch.PutMetricDataInput{
Namespace: aws.String(cw.Namespace),
MetricData: []*cloudwatch.MetricDatum{
&cloudwatch.MetricDatum{
Dimensions: defaultDimensions,
MetricName: aws.String("RecordsProcessed"),
Unit: aws.String("Count"),
Timestamp: &metricTimestamp,
Value: aws.Float64(float64(metric.processedRecords)),
},
&cloudwatch.MetricDatum{
Dimensions: defaultDimensions,
MetricName: aws.String("DataBytesProcessed"),
Unit: aws.String("Byte"),
Timestamp: &metricTimestamp,
Value: aws.Float64(float64(metric.processedBytes)),
},
&cloudwatch.MetricDatum{
Dimensions: defaultDimensions,
MetricName: aws.String("MillisBehindLatest"),
Unit: aws.String("Milliseconds"),
Timestamp: &metricTimestamp,
StatisticValues: &cloudwatch.StatisticSet{
SampleCount: aws.Float64(float64(len(metric.behindLatestMillis))),
Sum: sumFloat64(metric.behindLatestMillis),
Maximum: maxFloat64(metric.behindLatestMillis),
Minimum: minFloat64(metric.behindLatestMillis),
},
},
&cloudwatch.MetricDatum{
Dimensions: defaultDimensions,
MetricName: aws.String("KinesisDataFetcher.getRecords.Time"),
Unit: aws.String("Milliseconds"),
Timestamp: &metricTimestamp,
StatisticValues: &cloudwatch.StatisticSet{
SampleCount: aws.Float64(float64(len(metric.getRecordsTime))),
Sum: sumFloat64(metric.getRecordsTime),
Maximum: maxFloat64(metric.getRecordsTime),
Minimum: minFloat64(metric.getRecordsTime),
},
},
&cloudwatch.MetricDatum{
Dimensions: defaultDimensions,
MetricName: aws.String("RecordProcessor.processRecords.Time"),
Unit: aws.String("Milliseconds"),
Timestamp: &metricTimestamp,
StatisticValues: &cloudwatch.StatisticSet{
SampleCount: aws.Float64(float64(len(metric.processRecordsTime))),
Sum: sumFloat64(metric.processRecordsTime),
Maximum: maxFloat64(metric.processRecordsTime),
Minimum: minFloat64(metric.processRecordsTime),
},
},
&cloudwatch.MetricDatum{
Dimensions: leaseDimensions,
MetricName: aws.String("RenewLease.Success"),
Unit: aws.String("Count"),
Timestamp: &metricTimestamp,
Value: aws.Float64(float64(metric.leaseRenewals)),
},
&cloudwatch.MetricDatum{
Dimensions: leaseDimensions,
MetricName: aws.String("CurrentLeases"),
Unit: aws.String("Count"),
Timestamp: &metricTimestamp,
Value: aws.Float64(float64(metric.leasesHeld)),
},
},
})
if err == nil {
metric.processedRecords = 0
metric.processedBytes = 0
metric.behindLatestMillis = []float64{}
metric.leaseRenewals = 0
metric.getRecordsTime = []float64{}
metric.processRecordsTime = []float64{}
}
metric.Unlock()
return err
}
return nil
}
func (cw *CloudWatchMonitoringService) IncrRecordsProcessed(shard string, count int) {
if _, ok := cw.shardMetrics[shard]; !ok {
cw.shardMetrics[shard] = &cloudWatchMetrics{}
}
cw.shardMetrics[shard].Lock()
defer cw.shardMetrics[shard].Unlock()
cw.shardMetrics[shard].processedRecords += int64(count)
}
func (cw *CloudWatchMonitoringService) IncrBytesProcessed(shard string, count int64) {
if _, ok := cw.shardMetrics[shard]; !ok {
cw.shardMetrics[shard] = &cloudWatchMetrics{}
}
cw.shardMetrics[shard].Lock()
defer cw.shardMetrics[shard].Unlock()
cw.shardMetrics[shard].processedBytes += count
}
func (cw *CloudWatchMonitoringService) MillisBehindLatest(shard string, millSeconds float64) {
if _, ok := cw.shardMetrics[shard]; !ok {
cw.shardMetrics[shard] = &cloudWatchMetrics{}
}
cw.shardMetrics[shard].Lock()
defer cw.shardMetrics[shard].Unlock()
cw.shardMetrics[shard].behindLatestMillis = append(cw.shardMetrics[shard].behindLatestMillis, millSeconds)
}
func (cw *CloudWatchMonitoringService) LeaseGained(shard string) {
if _, ok := cw.shardMetrics[shard]; !ok {
cw.shardMetrics[shard] = &cloudWatchMetrics{}
}
cw.shardMetrics[shard].Lock()
defer cw.shardMetrics[shard].Unlock()
cw.shardMetrics[shard].leasesHeld++
}
func (cw *CloudWatchMonitoringService) LeaseLost(shard string) {
if _, ok := cw.shardMetrics[shard]; !ok {
cw.shardMetrics[shard] = &cloudWatchMetrics{}
}
cw.shardMetrics[shard].Lock()
defer cw.shardMetrics[shard].Unlock()
cw.shardMetrics[shard].leasesHeld--
}
func (cw *CloudWatchMonitoringService) LeaseRenewed(shard string) {
if _, ok := cw.shardMetrics[shard]; !ok {
cw.shardMetrics[shard] = &cloudWatchMetrics{}
}
cw.shardMetrics[shard].Lock()
defer cw.shardMetrics[shard].Unlock()
cw.shardMetrics[shard].leaseRenewals++
}
func (cw *CloudWatchMonitoringService) RecordGetRecordsTime(shard string, time float64) {
if _, ok := cw.shardMetrics[shard]; !ok {
cw.shardMetrics[shard] = &cloudWatchMetrics{}
}
cw.shardMetrics[shard].Lock()
defer cw.shardMetrics[shard].Unlock()
cw.shardMetrics[shard].getRecordsTime = append(cw.shardMetrics[shard].getRecordsTime, time)
}
func (cw *CloudWatchMonitoringService) RecordProcessRecordsTime(shard string, time float64) {
if _, ok := cw.shardMetrics[shard]; !ok {
cw.shardMetrics[shard] = &cloudWatchMetrics{}
}
cw.shardMetrics[shard].Lock()
defer cw.shardMetrics[shard].Unlock()
cw.shardMetrics[shard].processRecordsTime = append(cw.shardMetrics[shard].processRecordsTime, time)
}
func sumFloat64(slice []float64) *float64 {
sum := float64(0)
for _, num := range slice {
sum += num
}
return &sum
}
func maxFloat64(slice []float64) *float64 {
if len(slice) < 1 {
return aws.Float64(0)
}
max := slice[0]
for _, num := range slice {
if num > max {
max = num
}
}
return &max
}
func minFloat64(slice []float64) *float64 {
if len(slice) < 1 {
return aws.Float64(0)
}
min := slice[0]
for _, num := range slice {
if num < min {
min = num
}
}
return &min
}

View file

@ -0,0 +1,66 @@
package metrics
import (
"fmt"
)
// MonitoringConfiguration allows you to configure how record processing metrics are exposed
type MonitoringConfiguration struct {
MonitoringService string // Type of monitoring to expose. Supported types are "prometheus"
Prometheus PrometheusMonitoringService
CloudWatch CloudWatchMonitoringService
service MonitoringService
}
type MonitoringService interface {
Init() error
IncrRecordsProcessed(string, int)
IncrBytesProcessed(string, int64)
MillisBehindLatest(string, float64)
LeaseGained(string)
LeaseLost(string)
LeaseRenewed(string)
RecordGetRecordsTime(string, float64)
RecordProcessRecordsTime(string, float64)
}
func (m *MonitoringConfiguration) Init(nameSpace, streamName string, workerID string) error {
if m.MonitoringService == "" {
m.service = &noopMonitoringService{}
return nil
}
switch m.MonitoringService {
case "prometheus":
m.Prometheus.Namespace = nameSpace
m.Prometheus.KinesisStream = streamName
m.Prometheus.WorkerID = workerID
m.service = &m.Prometheus
case "cloudwatch":
m.CloudWatch.KinesisStream = streamName
m.CloudWatch.WorkerID = workerID
m.service = &m.CloudWatch
default:
return fmt.Errorf("Invalid monitoring service type %s", m.MonitoringService)
}
return m.service.Init()
}
func (m *MonitoringConfiguration) GetMonitoringService() MonitoringService {
return m.service
}
type noopMonitoringService struct{}
func (n *noopMonitoringService) Init() error {
return nil
}
func (n *noopMonitoringService) IncrRecordsProcessed(shard string, count int) {}
func (n *noopMonitoringService) IncrBytesProcessed(shard string, count int64) {}
func (n *noopMonitoringService) MillisBehindLatest(shard string, millSeconds float64) {}
func (n *noopMonitoringService) LeaseGained(shard string) {}
func (n *noopMonitoringService) LeaseLost(shard string) {}
func (n *noopMonitoringService) LeaseRenewed(shard string) {}
func (n *noopMonitoringService) RecordGetRecordsTime(shard string, time float64) {}
func (n *noopMonitoringService) RecordProcessRecordsTime(shard string, time float64) {}

View file

@ -0,0 +1,113 @@
package metrics
import (
"net/http"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
log "github.com/sirupsen/logrus"
)
type PrometheusMonitoringService struct {
ListenAddress string
Namespace string
KinesisStream string
WorkerID string
processedRecords *prometheus.CounterVec
processedBytes *prometheus.CounterVec
behindLatestMillis *prometheus.GaugeVec
leasesHeld *prometheus.GaugeVec
leaseRenewals *prometheus.CounterVec
getRecordsTime *prometheus.HistogramVec
processRecordsTime *prometheus.HistogramVec
}
func (p *PrometheusMonitoringService) Init() error {
p.processedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: p.Namespace + `_processed_bytes`,
Help: "Number of bytes processed",
}, []string{"kinesisStream", "shard"})
p.processedRecords = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: p.Namespace + `_processed_records`,
Help: "Number of records processed",
}, []string{"kinesisStream", "shard"})
p.behindLatestMillis = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: p.Namespace + `_behind_latest_millis`,
Help: "The amount of milliseconds processing is behind",
}, []string{"kinesisStream", "shard"})
p.leasesHeld = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: p.Namespace + `_leases_held`,
Help: "The number of leases held by the worker",
}, []string{"kinesisStream", "shard", "workerID"})
p.leaseRenewals = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: p.Namespace + `_lease_renewals`,
Help: "The number of successful lease renewals",
}, []string{"kinesisStream", "shard", "workerID"})
p.getRecordsTime = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: p.Namespace + `_get_records_duration_milliseconds`,
Help: "The time taken to fetch records and process them",
}, []string{"kinesisStream", "shard"})
p.processRecordsTime = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: p.Namespace + `_process_records_duration_milliseconds`,
Help: "The time taken to process records",
}, []string{"kinesisStream", "shard"})
metrics := []prometheus.Collector{
p.processedBytes,
p.processedRecords,
p.behindLatestMillis,
p.leasesHeld,
p.leaseRenewals,
p.getRecordsTime,
p.processRecordsTime,
}
for _, metric := range metrics {
err := prometheus.Register(metric)
if err != nil {
return err
}
}
http.Handle("/metrics", promhttp.Handler())
go func() {
log.Debugf("Starting Prometheus listener on %s", p.ListenAddress)
err := http.ListenAndServe(p.ListenAddress, nil)
if err != nil {
log.Errorln("Error starting Prometheus metrics endpoint", err)
}
}()
return nil
}
func (p *PrometheusMonitoringService) IncrRecordsProcessed(shard string, count int) {
p.processedRecords.With(prometheus.Labels{"shard": shard, "kinesisStream": p.KinesisStream}).Add(float64(count))
}
func (p *PrometheusMonitoringService) IncrBytesProcessed(shard string, count int64) {
p.processedBytes.With(prometheus.Labels{"shard": shard, "kinesisStream": p.KinesisStream}).Add(float64(count))
}
func (p *PrometheusMonitoringService) MillisBehindLatest(shard string, millSeconds float64) {
p.behindLatestMillis.With(prometheus.Labels{"shard": shard, "kinesisStream": p.KinesisStream}).Set(millSeconds)
}
func (p *PrometheusMonitoringService) LeaseGained(shard string) {
p.leasesHeld.With(prometheus.Labels{"shard": shard, "kinesisStream": p.KinesisStream, "workerID": p.WorkerID}).Inc()
}
func (p *PrometheusMonitoringService) LeaseLost(shard string) {
p.leasesHeld.With(prometheus.Labels{"shard": shard, "kinesisStream": p.KinesisStream, "workerID": p.WorkerID}).Dec()
}
func (p *PrometheusMonitoringService) LeaseRenewed(shard string) {
p.leaseRenewals.With(prometheus.Labels{"shard": shard, "kinesisStream": p.KinesisStream, "workerID": p.WorkerID}).Inc()
}
func (p *PrometheusMonitoringService) RecordGetRecordsTime(shard string, time float64) {
p.getRecordsTime.With(prometheus.Labels{"shard": shard, "kinesisStream": p.KinesisStream}).Observe(time)
}
func (p *PrometheusMonitoringService) RecordProcessRecordsTime(shard string, time float64) {
p.processRecordsTime.With(prometheus.Labels{"shard": shard, "kinesisStream": p.KinesisStream}).Observe(time)
}

View file

@ -0,0 +1,30 @@
package utils
import (
"math/rand"
)
const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
const (
letterIdxBits = 6 // 6 bits to represent a letter index
letterIdxMask = 1<<letterIdxBits - 1 // All 1-bits, as many as letterIdxBits
letterIdxMax = 63 / letterIdxBits // # of letter indices fitting in 63 bits
)
func RandStringBytesMaskImpr(n int) string {
b := make([]byte, n)
// A rand.Int63() generates 63 random bits, enough for letterIdxMax letters!
for i, cache, remain := n-1, rand.Int63(), letterIdxMax; i >= 0; {
if remain == 0 {
cache, remain = rand.Int63(), letterIdxMax
}
if idx := int(cache & letterIdxMask); idx < len(letterBytes) {
b[i] = letterBytes[idx]
i--
}
cache >>= letterIdxBits
remain--
}
return string(b)
}

View file

@ -0,0 +1,276 @@
package worker
import (
"errors"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface"
"github.com/matryer/try"
log "github.com/sirupsen/logrus"
"clientlibrary/config"
)
const (
// ErrLeaseNotAquired is returned when we failed to get a lock on the shard
ErrLeaseNotAquired = "Lease is already held by another node"
// ErrInvalidDynamoDBSchema is returned when there are one or more fields missing from the table
ErrInvalidDynamoDBSchema = "The DynamoDB schema is invalid and may need to be re-created"
)
// Checkpointer handles checkpointing when a record has been processed
type Checkpointer interface {
Init() error
GetLease(*shardStatus, string) error
CheckpointSequence(*shardStatus) error
FetchCheckpoint(*shardStatus) error
}
// ErrSequenceIDNotFound is returned by FetchCheckpoint when no SequenceID is found
var ErrSequenceIDNotFound = errors.New("SequenceIDNotFoundForShard")
// DynamoCheckpoint implements the Checkpoint interface using DynamoDB as a backend
type DynamoCheckpoint struct {
TableName string
leaseTableReadCapacity int64
leaseTableWriteCapacity int64
LeaseDuration int
svc dynamodbiface.DynamoDBAPI
kclConfig *config.KinesisClientLibConfiguration
Retries int
}
func NewDynamoCheckpoint(dynamo dynamodbiface.DynamoDBAPI, kclConfig *config.KinesisClientLibConfiguration) Checkpointer {
checkpointer := &DynamoCheckpoint{
TableName: kclConfig.TableName,
leaseTableReadCapacity: int64(kclConfig.InitialLeaseTableReadCapacity),
leaseTableWriteCapacity: int64(kclConfig.InitialLeaseTableWriteCapacity),
LeaseDuration: kclConfig.FailoverTimeMillis,
svc: dynamo,
kclConfig: kclConfig,
Retries: 5,
}
return checkpointer
}
// Init initialises the DynamoDB Checkpoint
func (checkpointer *DynamoCheckpoint) Init() error {
if !checkpointer.doesTableExist() {
return checkpointer.createTable()
}
return nil
}
// GetLease attempts to gain a lock on the given shard
func (checkpointer *DynamoCheckpoint) GetLease(shard *shardStatus, newAssignTo string) error {
newLeaseTimeout := time.Now().Add(time.Duration(checkpointer.LeaseDuration) * time.Millisecond).UTC()
newLeaseTimeoutString := newLeaseTimeout.Format(time.RFC3339)
currentCheckpoint, err := checkpointer.getItem(shard.ID)
if err != nil {
return err
}
assignedVar, assignedToOk := currentCheckpoint["AssignedTo"]
leaseVar, leaseTimeoutOk := currentCheckpoint["LeaseTimeout"]
var conditionalExpression string
var expressionAttributeValues map[string]*dynamodb.AttributeValue
if !leaseTimeoutOk || !assignedToOk {
conditionalExpression = "attribute_not_exists(AssignedTo)"
} else {
assignedTo := *assignedVar.S
leaseTimeout := *leaseVar.S
currentLeaseTimeout, err := time.Parse(time.RFC3339, leaseTimeout)
if err != nil {
return err
}
if !time.Now().UTC().After(currentLeaseTimeout) && assignedTo != newAssignTo {
return errors.New(ErrLeaseNotAquired)
}
log.Debugf("Attempting to get a lock for shard: %s, leaseTimeout: %s, assignedTo: %s", shard.ID, currentLeaseTimeout, assignedTo)
conditionalExpression = "ShardID = :id AND AssignedTo = :assigned_to AND LeaseTimeout = :lease_timeout"
expressionAttributeValues = map[string]*dynamodb.AttributeValue{
":id": {
S: &shard.ID,
},
":assigned_to": {
S: &assignedTo,
},
":lease_timeout": {
S: &leaseTimeout,
},
}
}
marshalledCheckpoint := map[string]*dynamodb.AttributeValue{
"ShardID": {
S: &shard.ID,
},
"AssignedTo": {
S: &newAssignTo,
},
"LeaseTimeout": {
S: &newLeaseTimeoutString,
},
}
if shard.Checkpoint != "" {
marshalledCheckpoint["Checkpoint"] = &dynamodb.AttributeValue{
S: &shard.Checkpoint,
}
}
err = checkpointer.conditionalUpdate(conditionalExpression, expressionAttributeValues, marshalledCheckpoint)
if err != nil {
if awsErr, ok := err.(awserr.Error); ok {
if awsErr.Code() == dynamodb.ErrCodeConditionalCheckFailedException {
return errors.New(ErrLeaseNotAquired)
}
}
return err
}
shard.mux.Lock()
shard.AssignedTo = newAssignTo
shard.LeaseTimeout = newLeaseTimeout
shard.mux.Unlock()
return nil
}
// CheckpointSequence writes a checkpoint at the designated sequence ID
func (checkpointer *DynamoCheckpoint) CheckpointSequence(shard *shardStatus) error {
leaseTimeout := shard.LeaseTimeout.UTC().Format(time.RFC3339)
marshalledCheckpoint := map[string]*dynamodb.AttributeValue{
"ShardID": {
S: &shard.ID,
},
"SequenceID": {
S: &shard.Checkpoint,
},
"AssignedTo": {
S: &shard.AssignedTo,
},
"LeaseTimeout": {
S: &leaseTimeout,
},
}
return checkpointer.saveItem(marshalledCheckpoint)
}
// FetchCheckpoint retrieves the checkpoint for the given shard
func (checkpointer *DynamoCheckpoint) FetchCheckpoint(shard *shardStatus) error {
checkpoint, err := checkpointer.getItem(shard.ID)
if err != nil {
return err
}
sequenceID, ok := checkpoint["SequenceID"]
if !ok {
return ErrSequenceIDNotFound
}
log.Debugf("Retrieved Shard Iterator %s", *sequenceID.S)
shard.mux.Lock()
defer shard.mux.Unlock()
shard.Checkpoint = *sequenceID.S
if assignedTo, ok := checkpoint["Assignedto"]; ok {
shard.AssignedTo = *assignedTo.S
}
return nil
}
func (checkpointer *DynamoCheckpoint) createTable() error {
input := &dynamodb.CreateTableInput{
AttributeDefinitions: []*dynamodb.AttributeDefinition{
{
AttributeName: aws.String("ShardID"),
AttributeType: aws.String("S"),
},
},
KeySchema: []*dynamodb.KeySchemaElement{
{
AttributeName: aws.String("ShardID"),
KeyType: aws.String("HASH"),
},
},
ProvisionedThroughput: &dynamodb.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(checkpointer.leaseTableReadCapacity),
WriteCapacityUnits: aws.Int64(checkpointer.leaseTableWriteCapacity),
},
TableName: aws.String(checkpointer.TableName),
}
_, err := checkpointer.svc.CreateTable(input)
return err
}
func (checkpointer *DynamoCheckpoint) doesTableExist() bool {
input := &dynamodb.DescribeTableInput{
TableName: aws.String(checkpointer.TableName),
}
_, err := checkpointer.svc.DescribeTable(input)
return (err == nil)
}
func (checkpointer *DynamoCheckpoint) saveItem(item map[string]*dynamodb.AttributeValue) error {
return checkpointer.putItem(&dynamodb.PutItemInput{
TableName: aws.String(checkpointer.TableName),
Item: item,
})
}
func (checkpointer *DynamoCheckpoint) conditionalUpdate(conditionExpression string, expressionAttributeValues map[string]*dynamodb.AttributeValue, item map[string]*dynamodb.AttributeValue) error {
return checkpointer.putItem(&dynamodb.PutItemInput{
ConditionExpression: aws.String(conditionExpression),
TableName: aws.String(checkpointer.TableName),
Item: item,
ExpressionAttributeValues: expressionAttributeValues,
})
}
func (checkpointer *DynamoCheckpoint) putItem(input *dynamodb.PutItemInput) error {
return try.Do(func(attempt int) (bool, error) {
_, err := checkpointer.svc.PutItem(input)
if awsErr, ok := err.(awserr.Error); ok {
if awsErr.Code() == dynamodb.ErrCodeProvisionedThroughputExceededException ||
awsErr.Code() == dynamodb.ErrCodeInternalServerError &&
attempt < checkpointer.Retries {
// Backoff time as recommended by https://docs.aws.amazon.com/general/latest/gr/api-retries.html
time.Sleep(time.Duration(2^attempt*100) * time.Millisecond)
return true, err
}
}
return false, err
})
}
func (checkpointer *DynamoCheckpoint) getItem(shardID string) (map[string]*dynamodb.AttributeValue, error) {
var item *dynamodb.GetItemOutput
err := try.Do(func(attempt int) (bool, error) {
var err error
item, err = checkpointer.svc.GetItem(&dynamodb.GetItemInput{
TableName: aws.String(checkpointer.TableName),
Key: map[string]*dynamodb.AttributeValue{
"ShardID": {
S: aws.String(shardID),
},
},
})
if awsErr, ok := err.(awserr.Error); ok {
if awsErr.Code() == dynamodb.ErrCodeProvisionedThroughputExceededException ||
awsErr.Code() == dynamodb.ErrCodeInternalServerError &&
attempt < checkpointer.Retries {
// Backoff time as recommended by https://docs.aws.amazon.com/general/latest/gr/api-retries.html
time.Sleep(time.Duration(2^attempt*100) * time.Millisecond)
return true, err
}
}
return false, err
})
return item.Item, err
}

View file

@ -0,0 +1,56 @@
package worker
import (
"github.com/aws/aws-sdk-go/aws"
kcl "clientlibrary/interfaces"
)
type (
/* Objects of this class are prepared to checkpoint at a specific sequence number. They use an
* IRecordProcessorCheckpointer to do the actual checkpointing, so their checkpoint is subject to the same 'didn't go
* backwards' validation as a normal checkpoint.
*/
PreparedCheckpointer struct {
pendingCheckpointSequenceNumber *kcl.ExtendedSequenceNumber
checkpointer kcl.IRecordProcessorCheckpointer
}
/**
* This class is used to enable RecordProcessors to checkpoint their progress.
* The Amazon Kinesis Client Library will instantiate an object and provide a reference to the application
* RecordProcessor instance. Amazon Kinesis Client Library will create one instance per shard assignment.
*/
RecordProcessorCheckpointer struct {
shard *shardStatus
checkpoint Checkpointer
}
)
func NewRecordProcessorCheckpoint(shard *shardStatus, checkpoint Checkpointer) kcl.IRecordProcessorCheckpointer {
return &RecordProcessorCheckpointer{
shard: shard,
checkpoint: checkpoint,
}
}
func (pc *PreparedCheckpointer) GetPendingCheckpoint() *kcl.ExtendedSequenceNumber {
return pc.pendingCheckpointSequenceNumber
}
func (pc *PreparedCheckpointer) Checkpoint() error {
return pc.checkpointer.Checkpoint(pc.pendingCheckpointSequenceNumber.SequenceNumber)
}
func (rc *RecordProcessorCheckpointer) Checkpoint(sequenceNumber *string) error {
rc.shard.mux.Lock()
rc.shard.Checkpoint = aws.StringValue(sequenceNumber)
rc.shard.mux.Unlock()
return rc.checkpoint.CheckpointSequence(rc.shard)
}
func (rc *RecordProcessorCheckpointer) PrepareCheckpoint(sequenceNumber *string) (kcl.IPreparedCheckpointer, error) {
return &PreparedCheckpointer{}, nil
}

View file

@ -0,0 +1,195 @@
package worker
import (
log "github.com/sirupsen/logrus"
"sync"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
"clientlibrary/config"
kcl "clientlibrary/interfaces"
"clientlibrary/metrics"
)
const (
// This is the initial state of a shard consumer. This causes the consumer to remain blocked until the all
// parent shards have been completed.
WAITING_ON_PARENT_SHARDS ShardConsumerState = iota + 1
// This state is responsible for initializing the record processor with the shard information.
INITIALIZING
//
PROCESSING
SHUTDOWN_REQUESTED
SHUTTING_DOWN
SHUTDOWN_COMPLETE
// ErrCodeKMSThrottlingException is defined in the API Reference https://docs.aws.amazon.com/sdk-for-go/api/service/kinesis/#Kinesis.GetRecords
// But it's not a constant?
ErrCodeKMSThrottlingException = "KMSThrottlingException"
)
type ShardConsumerState int
// ShardConsumer is responsible for consuming data records of a (specified) shard.
// Note: ShardConsumer only deal with one shard.
type ShardConsumer struct {
streamName string
shard *shardStatus
kc kinesisiface.KinesisAPI
checkpointer Checkpointer
recordProcessor kcl.IRecordProcessor
kclConfig *config.KinesisClientLibConfiguration
stop *chan struct{}
waitGroup *sync.WaitGroup
consumerID string
mService metrics.MonitoringService
state ShardConsumerState
}
func (sc *ShardConsumer) getShardIterator(shard *shardStatus) (*string, error) {
err := sc.checkpointer.FetchCheckpoint(shard)
if err != nil && err != ErrSequenceIDNotFound {
return nil, err
}
// If there isn't any checkpoint for the shard, use the configuration value.
if shard.Checkpoint == "" {
initPos := sc.kclConfig.InitialPositionInStream
shardIterArgs := &kinesis.GetShardIteratorInput{
ShardId: &shard.ID,
ShardIteratorType: config.InitalPositionInStreamToShardIteratorType(initPos),
StreamName: &sc.streamName,
}
iterResp, err := sc.kc.GetShardIterator(shardIterArgs)
if err != nil {
return nil, err
}
return iterResp.ShardIterator, nil
}
shardIterArgs := &kinesis.GetShardIteratorInput{
ShardId: &shard.ID,
ShardIteratorType: aws.String("AFTER_SEQUENCE_NUMBER"),
StartingSequenceNumber: &shard.Checkpoint,
StreamName: &sc.streamName,
}
iterResp, err := sc.kc.GetShardIterator(shardIterArgs)
if err != nil {
return nil, err
}
return iterResp.ShardIterator, nil
}
func (sc *ShardConsumer) getRecords(shard *shardStatus) error {
defer sc.waitGroup.Done()
shardIterator, err := sc.getShardIterator(shard)
if err != nil {
log.Errorf("Unable to get shard iterator for %s: %v", shard.ID, err)
return err
}
recordCheckpointer := NewRecordProcessorCheckpoint(shard, sc.checkpointer)
var retriedErrors int
for {
getRecordsStartTime := time.Now()
if time.Now().UTC().After(shard.LeaseTimeout.Add(-5 * time.Second)) {
log.Debugf("Refreshing lease on shard: %s for worker: %s", shard.ID, sc.consumerID)
err = sc.checkpointer.GetLease(shard, sc.consumerID)
if err != nil {
if err.Error() == ErrLeaseNotAquired {
shard.setLeaseOwner("")
sc.mService.LeaseLost(shard.ID)
log.Warnf("Failed in acquiring lease on shard: %s for worker: %s", shard.ID, sc.consumerID)
return nil
}
log.Fatal(err)
}
}
log.Debugf("Trying to read %d record from iterator: %v", sc.kclConfig.MaxRecords, aws.StringValue(shardIterator))
getRecordsArgs := &kinesis.GetRecordsInput{
Limit: aws.Int64(int64(sc.kclConfig.MaxRecords)),
ShardIterator: shardIterator,
}
// Get records from stream and retry as needed
getResp, err := sc.kc.GetRecords(getRecordsArgs)
if err != nil {
if awsErr, ok := err.(awserr.Error); ok {
if awsErr.Code() == kinesis.ErrCodeProvisionedThroughputExceededException || awsErr.Code() == ErrCodeKMSThrottlingException {
log.Errorf("Error getting records from shard %v: %v", shard.ID, err)
retriedErrors++
// exponential backoff
time.Sleep(time.Duration(2^retriedErrors*100) * time.Millisecond)
continue
}
}
log.Fatalf("Error getting records from Kinesis that cannot be retried: %s\nRequest: %s", err, getRecordsArgs)
}
retriedErrors = 0
// IRecordProcessorCheckpointer
input := &kcl.ProcessRecordsInput{
Records: getResp.Records,
MillisBehindLatest: aws.Int64Value(getResp.MillisBehindLatest),
Checkpointer: recordCheckpointer,
}
recordLength := len(input.Records)
recordBytes := int64(0)
log.Debugf("Received %d records", recordLength)
for _, r := range getResp.Records {
recordBytes += int64(len(r.Data))
}
if recordLength > 0 || sc.kclConfig.CallProcessRecordsEvenForEmptyRecordList {
processRecordsStartTime := time.Now()
// Delivery the events to the record processor
sc.recordProcessor.ProcessRecords(input)
// Convert from nanoseconds to milliseconds
processedRecordsTiming := time.Since(processRecordsStartTime) / 1000000
sc.mService.RecordProcessRecordsTime(shard.ID, float64(processedRecordsTiming))
}
// Idle between each read, the user is responsible for checkpoint the progress
time.Sleep(time.Duration(sc.kclConfig.IdleTimeBetweenReadsInMillis) * time.Millisecond)
sc.mService.IncrRecordsProcessed(shard.ID, recordLength)
sc.mService.IncrBytesProcessed(shard.ID, recordBytes)
sc.mService.MillisBehindLatest(shard.ID, float64(*getResp.MillisBehindLatest))
// Convert from nanoseconds to milliseconds
getRecordsTime := time.Since(getRecordsStartTime) / 1000000
sc.mService.RecordGetRecordsTime(shard.ID, float64(getRecordsTime))
// The shard has been closed, so no new records can be read from it
if getResp.NextShardIterator == nil {
log.Infof("Shard %s closed", shard.ID)
shutdownInput := &kcl.ShutdownInput{ShutdownReason: kcl.TERMINATE, Checkpointer: recordCheckpointer}
sc.recordProcessor.Shutdown(shutdownInput)
return nil
}
shardIterator = getResp.NextShardIterator
select {
case <-*sc.stop:
shutdownInput := &kcl.ShutdownInput{ShutdownReason: kcl.REQUESTED, Checkpointer: recordCheckpointer}
sc.recordProcessor.Shutdown(shutdownInput)
return nil
case <-time.After(1 * time.Nanosecond):
}
}
}

View file

@ -0,0 +1,289 @@
package worker
import (
"errors"
log "github.com/sirupsen/logrus"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
"clientlibrary/config"
kcl "clientlibrary/interfaces"
"clientlibrary/metrics"
)
type shardStatus struct {
ID string
Checkpoint string
AssignedTo string
mux *sync.Mutex
LeaseTimeout time.Time
}
func (ss *shardStatus) getLeaseOwner() string {
ss.mux.Lock()
defer ss.mux.Unlock()
return ss.AssignedTo
}
func (ss *shardStatus) setLeaseOwner(owner string) {
ss.mux.Lock()
defer ss.mux.Unlock()
ss.AssignedTo = owner
}
/**
* Worker is the high level class that Kinesis applications use to start processing data. It initializes and oversees
* different components (e.g. syncing shard and lease information, tracking shard assignments, and processing data from
* the shards).
*/
type Worker struct {
streamName string
regionName string
workerID string
processorFactory kcl.IRecordProcessorFactory
kclConfig *config.KinesisClientLibConfiguration
kc kinesisiface.KinesisAPI
dynamo dynamodbiface.DynamoDBAPI
checkpointer Checkpointer
stop *chan struct{}
waitGroup *sync.WaitGroup
sigs *chan os.Signal
shardStatus map[string]*shardStatus
metricsConfig *metrics.MonitoringConfiguration
mService metrics.MonitoringService
}
// NewWorker constructs a Worker instance for processing Kinesis stream data.
func NewWorker(factory kcl.IRecordProcessorFactory, kclConfig *config.KinesisClientLibConfiguration, metricsConfig *metrics.MonitoringConfiguration) *Worker {
w := &Worker{
streamName: kclConfig.StreamName,
regionName: kclConfig.RegionName,
workerID: kclConfig.WorkerID,
processorFactory: factory,
kclConfig: kclConfig,
metricsConfig: metricsConfig,
}
// create session for Kinesis
log.Info("Creating Kinesis session")
s := session.New(&aws.Config{Region: aws.String(w.regionName)})
w.kc = kinesis.New(s)
log.Info("Creating DynamoDB session")
s = session.New(&aws.Config{Region: aws.String(w.regionName)})
w.dynamo = dynamodb.New(s)
w.checkpointer = NewDynamoCheckpoint(w.dynamo, kclConfig)
if w.metricsConfig == nil {
w.metricsConfig = &metrics.MonitoringConfiguration{MonitoringService: ""}
}
return w
}
// Run starts consuming data from the stream, and pass it to the application record processors.
func (w *Worker) Start() error {
if err := w.initialize(); err != nil {
log.Errorf("Failed to start Worker: %+v", err)
return err
}
log.Info("Initialization complete. Starting worker event loop.")
// entering event loop
go w.eventLoop()
return nil
}
// Shutdown signals worker to shutdown. Worker will try initiating shutdown of all record processors.
func (w *Worker) Shutdown() {
log.Info("Worker shutdown in requested.")
close(*w.stop)
w.waitGroup.Wait()
log.Info("Worker loop is complete. Exiting from worker.")
}
// Publish to write some data into stream. This function is mainly used for testing purpose.
func (w *Worker) Publish(streamName, partitionKey string, data []byte) error {
_, err := w.kc.PutRecord(&kinesis.PutRecordInput{
Data: data,
StreamName: aws.String(streamName),
PartitionKey: aws.String(partitionKey),
})
if err != nil {
log.Errorf("Error in publishing data to %s/%s. Error: %+v", streamName, partitionKey, err)
}
return err
}
// initialize
func (w *Worker) initialize() error {
log.Info("Worker initialization in progress...")
err := w.metricsConfig.Init(w.kclConfig.ApplicationName, w.streamName, w.workerID)
if err != nil {
log.Errorf("Failed to start monitoring service: %s", err)
}
w.mService = w.metricsConfig.GetMonitoringService()
log.Info("Initializing Checkpointer")
if err := w.checkpointer.Init(); err != nil {
log.Errorf("Failed to start Checkpointer: %+v", err)
return err
}
w.shardStatus = make(map[string]*shardStatus)
sigs := make(chan os.Signal, 1)
w.sigs = &sigs
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
stopChan := make(chan struct{})
w.stop = &stopChan
wg := sync.WaitGroup{}
w.waitGroup = &wg
err = w.getShardIDs("")
if err != nil {
log.Errorf("Error getting Kinesis shards: %s", err)
return err
}
return nil
}
// newShardConsumer to create a shard consumer instance
func (w *Worker) newShardConsumer(shard *shardStatus) *ShardConsumer {
return &ShardConsumer{
streamName: w.streamName,
shard: shard,
kc: w.kc,
checkpointer: w.checkpointer,
recordProcessor: w.processorFactory.CreateProcessor(),
kclConfig: w.kclConfig,
consumerID: w.workerID,
stop: w.stop,
waitGroup: w.waitGroup,
mService: w.mService,
state: WAITING_ON_PARENT_SHARDS,
}
}
// eventLoop
func (w *Worker) eventLoop() {
for {
err := w.getShardIDs("")
if err != nil {
log.Errorf("Error getting Kinesis shards: %v", err)
// Back-off?
time.Sleep(500 * time.Millisecond)
}
log.Infof("Found %d shards", len(w.shardStatus))
// Count the number of leases hold by this worker
counter := 0
for _, shard := range w.shardStatus {
if shard.getLeaseOwner() == w.workerID {
counter++
}
}
// max number of lease has not been reached
if counter < w.kclConfig.MaxLeasesForWorker {
for _, shard := range w.shardStatus {
// We already own this shard so carry on
if shard.getLeaseOwner() == w.workerID {
continue
}
err := w.checkpointer.FetchCheckpoint(shard)
if err != nil {
if err != ErrSequenceIDNotFound {
log.Fatal(err)
}
}
err = w.checkpointer.GetLease(shard, w.workerID)
if err != nil {
if err.Error() == ErrLeaseNotAquired {
continue
}
log.Fatal(err)
}
w.mService.LeaseGained(shard.ID)
log.Infof("Start Shard Consumer for shard: %v", shard.ID)
sc := w.newShardConsumer(shard)
go sc.getRecords(shard)
w.waitGroup.Add(1)
}
}
select {
case sig := <-*w.sigs:
log.Infof("Received signal %s. Exiting", sig)
w.Shutdown()
return
case <-*w.stop:
log.Info("Shutting down")
return
case <-time.After(time.Duration(w.kclConfig.ShardSyncIntervalMillis) * time.Millisecond):
}
}
}
// List all ACTIVE shard and store them into shardStatus table
func (w *Worker) getShardIDs(startShardID string) error {
args := &kinesis.DescribeStreamInput{
StreamName: aws.String(w.streamName),
}
if startShardID != "" {
args.ExclusiveStartShardId = aws.String(startShardID)
}
streamDesc, err := w.kc.DescribeStream(args)
if err != nil {
return err
}
if *streamDesc.StreamDescription.StreamStatus != "ACTIVE" {
return errors.New("Stream not active")
}
var lastShardID string
for _, s := range streamDesc.StreamDescription.Shards {
if _, ok := w.shardStatus[*s.ShardId]; !ok {
log.Debugf("Found shard with id %s", *s.ShardId)
w.shardStatus[*s.ShardId] = &shardStatus{
ID: *s.ShardId,
mux: &sync.Mutex{},
}
}
lastShardID = *s.ShardId
}
if *streamDesc.StreamDescription.HasMoreShards {
err := w.getShardIDs(lastShardID)
if err != nil {
return err
}
}
return nil
}

View file

@ -0,0 +1,108 @@
package worker
import (
"os"
"testing"
"time"
log "github.com/sirupsen/logrus"
cfg "clientlibrary/config"
kc "clientlibrary/interfaces"
"clientlibrary/utils"
"github.com/stretchr/testify/assert"
)
const (
streamName = "kcl-test"
regionName = "us-west-2"
workerID = "test-worker"
)
const specstr = `{"name":"kube-qQyhk","networking":{"containerNetworkCidr":"10.2.0.0/16"},"orgName":"BVT-Org-cLQch","projectName":"project-tDSJd","serviceLevel":"DEVELOPER","size":{"count":1},"version":"1.8.1-4"}`
func TestWorker(t *testing.T) {
os.Setenv("AWS_ACCESS_KEY_ID", "your aws access key id")
os.Setenv("AWS_SECRET_ACCESS_KEY", "your aws secret access key")
defer os.Unsetenv("AWS_ACCESS_KEY_ID")
defer os.Unsetenv("AWS_SECRET_ACCESS_KEY")
kclConfig := cfg.NewKinesisClientLibConfig("appName", streamName, regionName, workerID).
WithInitialPositionInStream(cfg.LATEST).
WithMaxRecords(40).
WithMaxLeasesForWorker(1).
WithShardSyncIntervalMillis(5000)
log.SetOutput(os.Stdout)
log.SetLevel(log.DebugLevel)
assert.Equal(t, regionName, kclConfig.RegionName)
assert.Equal(t, streamName, kclConfig.StreamName)
worker := NewWorker(recordProcessorFactory(t), kclConfig, nil)
assert.Equal(t, regionName, worker.regionName)
assert.Equal(t, streamName, worker.streamName)
err := worker.Start()
assert.Nil(t, err)
// Put some data into stream.
for i := 0; i < 100; i++ {
// Use random string as partition key to ensure even distribution across shards
err := worker.Publish(streamName, utils.RandStringBytesMaskImpr(10), []byte(specstr))
if err != nil {
t.Errorf("Errorin Publish. %+v", err)
}
}
time.Sleep(10 * time.Second)
worker.Shutdown()
}
// Record processor factory is used to create RecordProcessor
func recordProcessorFactory(t *testing.T) kc.IRecordProcessorFactory {
return &dumpRecordProcessorFactory{t: t}
}
// simple record processor and dump everything
type dumpRecordProcessorFactory struct {
t *testing.T
}
func (d *dumpRecordProcessorFactory) CreateProcessor() kc.IRecordProcessor {
return &dumpRecordProcessor{
t: d.t,
}
}
// Create a dump record processor for printing out all data from record.
type dumpRecordProcessor struct {
t *testing.T
}
func (dd *dumpRecordProcessor) Initialize(input *kc.InitializationInput) {
dd.t.Logf("sharId=%v", input.ShardId)
}
func (dd *dumpRecordProcessor) ProcessRecords(input *kc.ProcessRecordsInput) {
dd.t.Log("Processing Records...")
// don't process empty record
if len(input.Records) == 0 {
return
}
for _, v := range input.Records {
dd.t.Logf("Record = %s", v.Data)
assert.Equal(dd.t, specstr, string(v.Data))
}
dd.t.Logf("Checkpoint it and MillisBehindLatest = %v", input.MillisBehindLatest)
// checkpoint it after processing this batch
lastRecordSequenceNubmer := input.Records[len(input.Records)-1].SequenceNumber
input.Checkpointer.Checkpoint(lastRecordSequenceNubmer)
}
func (dd *dumpRecordProcessor) Shutdown(input *kc.ShutdownInput) {
dd.t.Logf("Shutdown Reason = %v", input.ShutdownReason)
}

View file

@ -16,8 +16,11 @@ const (
// processing the corresponding unit of work, or until it fails. When the worker stops holding the lease, another worker will // processing the corresponding unit of work, or until it fails. When the worker stops holding the lease, another worker will
// take and hold the lease. // take and hold the lease.
type Lease struct { type Lease struct {
// shard-id
leaseKey string leaseKey string
// worker#
leaseOwner string leaseOwner string
// ccounter incremented periodically
leaseCounter int64 leaseCounter int64
// This field is used to prevent updates to leases that we have lost and re-acquired. It is deliberately not // This field is used to prevent updates to leases that we have lost and re-acquired. It is deliberately not

View file

@ -14,6 +14,7 @@ local_go_pkgs() {
grep -Fv '/tmp/' | \ grep -Fv '/tmp/' | \
grep -Fv '/run/' | \ grep -Fv '/run/' | \
grep -Fv '/tests/' | \ grep -Fv '/tests/' | \
grep -Fv '/gokini/' | \
sed -r 's|(.+)/[^/]+\.go$|\1|g' | \ sed -r 's|(.+)/[^/]+\.go$|\1|g' | \
sort -u sort -u
} }