[WIP] KCL: create configuration and interface for kinesis client library

This is to create configuration and client interface in order to give
user an overview on how the Kinesis client library works.

In order not to reinvent wheel, the api is designed closely aligned with
Amazon Kinesis Client Library in Java.

add errors.
remove @throws and use @error instead.

https://jira.eng.vmware.com/browse/CNA-614

Change-Id: I78a269b328c14df37f878eccef192ff022a669cc
This commit is contained in:
Tao Jiang 2018-04-10 20:50:18 -07:00
parent 7b35571d9e
commit 702335374c
13 changed files with 925 additions and 1 deletions

View file

@ -0,0 +1,233 @@
package config
import (
"log"
"math"
"strings"
"time"
)
const (
EPSILON_MS = 25
// LATEST start after the most recent data record (fetch new data).
LATEST = InitialPositionInStream(1)
// TRIM_HORIZON start from the oldest available data record
TRIM_HORIZON = LATEST + 1
// AT_TIMESTAMP start from the record at or after the specified server-side timestamp.
AT_TIMESTAMP = TRIM_HORIZON + 1
// The location in the shard from which the KinesisClientLibrary will start fetching records from
// when the application starts for the first time and there is no checkpoint for the shard.
DEFAULT_INITIAL_POSITION_IN_STREAM = LATEST
// Fail over time in milliseconds. A worker which does not renew it's lease within this time interval
// will be regarded as having problems and it's shards will be assigned to other workers.
// For applications that have a large number of shards, this may be set to a higher number to reduce
// the number of DynamoDB IOPS required for tracking leases.
DEFAULT_FAILOVER_TIME_MILLIS = 10000
// Max records to fetch from Kinesis in a single GetRecords call.
DEFAULT_MAX_RECORDS = 10000
// The default value for how long the {@link ShardConsumer} should sleep if no records are returned from the call to
DEFAULT_IDLETIME_BETWEEN_READS_MILLIS = 1000
// Don't call processRecords() on the record processor for empty record lists.
DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST = false
// Interval in milliseconds between polling to check for parent shard completion.
// Polling frequently will take up more DynamoDB IOPS (when there are leases for shards waiting on
// completion of parent shards).
DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS = 10000
// Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks.
DEFAULT_SHARD_SYNC_INTERVAL_MILLIS = 60000
// Cleanup leases upon shards completion (don't wait until they expire in Kinesis).
// Keeping leases takes some tracking/resources (e.g. they need to be renewed, assigned), so by default we try
// to delete the ones we don't need any longer.
DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION = true
// Backoff time in milliseconds for Amazon Kinesis Client Library tasks (in the event of failures).
DEFAULT_TASK_BACKOFF_TIME_MILLIS = 500
// Buffer metrics for at most this long before publishing to CloudWatch.
DEFAULT_METRICS_BUFFER_TIME_MILLIS = 10000
// Buffer at most this many metrics before publishing to CloudWatch.
DEFAULT_METRICS_MAX_QUEUE_SIZE = 10000
// KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls
// to {@link RecordProcessorCheckpointer#checkpoint(String)} by default.
DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING = true
// The max number of leases (shards) this worker should process.
// This can be useful to avoid overloading (and thrashing) a worker when a host has resource constraints
// or during deployment.
// NOTE: Setting this to a low value can cause data loss if workers are not able to pick up all shards in the
// stream due to the max limit.
DEFAULT_MAX_LEASES_FOR_WORKER = math.MaxInt16
// Max leases to steal from another worker at one time (for load balancing).
// Setting this to a higher number can allow for faster load convergence (e.g. during deployments, cold starts),
// but can cause higher churn in the system.
DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME = 1
// The Amazon DynamoDB table used for tracking leases will be provisioned with this read capacity.
DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY = 10
// The Amazon DynamoDB table used for tracking leases will be provisioned with this write capacity.
DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY = 10
// The Worker will skip shard sync during initialization if there are one or more leases in the lease table. This
// assumes that the shards and leases are in-sync. This enables customers to choose faster startup times (e.g.
// during incremental deployments of an application).
DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST = false
// The amount of milliseconds to wait before graceful shutdown forcefully terminates.
DEFAULT_SHUTDOWN_GRACE_MILLIS = 5000
// The size of the thread pool to create for the lease renewer to use.
DEFAULT_MAX_LEASE_RENEWAL_THREADS = 20
// The sleep time between two listShards calls from the proxy when throttled.
DEFAULT_LIST_SHARDS_BACKOFF_TIME_IN_MILLIS = 1500
// The number of times the Proxy will retry listShards call when throttled.
DEFAULT_MAX_LIST_SHARDS_RETRY_ATTEMPTS = 50
)
type (
// InitialPositionInStream Used to specify the position in the stream where a new application should start from
// This is used during initial application bootstrap (when a checkpoint doesn't exist for a shard or its parents)
InitialPositionInStream int
// Class that houses the entities needed to specify the position in the stream from where a new application should
// start.
InitialPositionInStreamExtended struct {
position InitialPositionInStream
// The time stamp of the data record from which to start reading. Used with
// shard iterator type AT_TIMESTAMP. A time stamp is the Unix epoch date with
// precision in milliseconds. For example, 2016-04-04T19:58:46.480-00:00 or
// 1459799926.480. If a record with this exact time stamp does not exist, the
// iterator returned is for the next (later) record. If the time stamp is older
// than the current trim horizon, the iterator returned is for the oldest untrimmed
// data record (TRIM_HORIZON).
timestamp *time.Time `type:"timestamp" timestampFormat:"unix"`
}
// Configuration for the Kinesis Client Library.
KinesisClientLibConfiguration struct {
// applicationName is name of application. Kinesis allows multiple applications to consume the same stream.
applicationName string
// tableName is name of the dynamo db table for managing kinesis stream default to applicationName
tableName string
// streamName is the name of Kinesis stream
streamName string
// workerID used to distinguish different workers/processes of a Kinesis application
workerID string
// kinesisEndpoint endpoint
kinesisEndpoint string
// dynamoDB endpoint
dynamoDBEndpoint string
// initialPositionInStream specifies the position in the stream where a new application should start from
initialPositionInStream InitialPositionInStream
// initialPositionInStreamExtended provides actual AT_TMESTAMP value
initialPositionInStreamExtended InitialPositionInStreamExtended
// credentials to access Kinesis/Dynamo/CloudWatch: https://docs.aws.amazon.com/sdk-for-go/api/aws/credentials/
// Note: No need to configure here. Use NewEnvCredentials for testing and EC2RoleProvider for production
// failoverTimeMillis Lease duration (leases not renewed within this period will be claimed by others)
failoverTimeMillis int
/// maxRecords Max records to read per Kinesis getRecords() call
maxRecords int
// idleTimeBetweenReadsInMillis Idle time between calls to fetch data from Kinesis
idleTimeBetweenReadsInMillis int
// callProcessRecordsEvenForEmptyRecordList Call the IRecordProcessor::processRecords() API even if
// GetRecords returned an empty record list.
callProcessRecordsEvenForEmptyRecordList bool
// parentShardPollIntervalMillis Wait for this long between polls to check if parent shards are done
parentShardPollIntervalMillis int
// shardSyncIntervalMillis Time between tasks to sync leases and Kinesis shards
shardSyncIntervalMillis int
// cleanupTerminatedShardsBeforeExpiry Clean up shards we've finished processing (don't wait for expiration)
cleanupTerminatedShardsBeforeExpiry bool
// kinesisClientConfig Client Configuration used by Kinesis client
// dynamoDBClientConfig Client Configuration used by DynamoDB client
// cloudWatchClientConfig Client Configuration used by CloudWatch client
// Note: we will use default client provided by AWS SDK
// taskBackoffTimeMillis Backoff period when tasks encounter an exception
taskBackoffTimeMillis int
// metricsBufferTimeMillis Metrics are buffered for at most this long before publishing to CloudWatch
metricsBufferTimeMillis int
// metricsMaxQueueSize Max number of metrics to buffer before publishing to CloudWatch
metricsMaxQueueSize int
// validateSequenceNumberBeforeCheckpointing whether KCL should validate client provided sequence numbers
validateSequenceNumberBeforeCheckpointing bool
// regionName The region name for the service
regionName string
// shutdownGraceMillis The number of milliseconds before graceful shutdown terminates forcefully
shutdownGraceMillis int
// Operation parameters
// Max leases this Worker can handle at a time
maxLeasesForWorker int
// Max leases to steal at one time (for load balancing)
maxLeasesToStealAtOneTime int
// Read capacity to provision when creating the lease table (dynamoDB).
initialLeaseTableReadCapacity int
// Write capacity to provision when creating the lease table.
initialLeaseTableWriteCapacity int
// Worker should skip syncing shards and leases at startup if leases are present
// This is useful for optimizing deployments to large fleets working on a stable stream.
skipShardSyncAtWorkerInitializationIfLeasesExist bool
}
)
func empty(s string) bool {
return len(strings.TrimSpace(s)) == 0
}
// checkIsValuePositive make sure the value is possitive.
func checkIsValueNotEmpty(key string, value string) {
if empty(value) {
// There is no point to continue for incorrect configuration. Fail fast!
log.Panicf("Non-empty value exepected for %v, actual: %v", key, value)
}
}
// checkIsValuePositive make sure the value is possitive.
func checkIsValuePositive(key string, value int) {
if value <= 0 {
// There is no point to continue for incorrect configuration. Fail fast!
log.Panicf("Positive value exepected for %v, actual: %v", key, value)
}
}

View file

@ -0,0 +1,23 @@
package config
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestConfig(t *testing.T) {
kclConfig := NewKinesisClientLibConfig("appName", "streamName", "workerId").
WithFailoverTimeMillis(500).
WithMaxRecords(100).
WithInitialPositionInStream(TRIM_HORIZON).
WithIdleTimeBetweenReadsInMillis(20).
WithCallProcessRecordsEvenForEmptyRecordList(true).
WithTaskBackoffTimeMillis(10).
WithMetricsBufferTimeMillis(500).
WithMetricsMaxQueueSize(200).
WithRegionName("us-west-2")
assert.Equal(t, "appName", kclConfig.applicationName)
assert.Equal(t, "500", kclConfig.failoverTimeMillis)
}

View file

@ -0,0 +1,13 @@
package config
import (
"time"
)
func newInitialPositionAtTimestamp(timestamp *time.Time) *InitialPositionInStreamExtended {
return &InitialPositionInStreamExtended{position: AT_TIMESTAMP, timestamp: timestamp}
}
func newInitialPosition(position InitialPositionInStream) *InitialPositionInStreamExtended {
return &InitialPositionInStreamExtended{position: position, timestamp: nil}
}

View file

@ -0,0 +1,147 @@
package config
import (
"clientlibrary/utils"
"time"
)
// NewKinesisClientLibConfig to create a default KinesisClientLibConfiguration based on the required fields.
func NewKinesisClientLibConfig(applicationName, streamName, workerID string) *KinesisClientLibConfiguration {
checkIsValueNotEmpty("applicationName", applicationName)
checkIsValueNotEmpty("streamName", streamName)
checkIsValueNotEmpty("applicationName", applicationName)
if empty(workerID) {
workerID = utils.MustNewUUID()
}
// populate the KCL configuration with default values
return &KinesisClientLibConfiguration{
applicationName: applicationName,
tableName: applicationName,
streamName: streamName,
workerID: workerID,
kinesisEndpoint: "",
initialPositionInStream: DEFAULT_INITIAL_POSITION_IN_STREAM,
initialPositionInStreamExtended: *newInitialPosition(DEFAULT_INITIAL_POSITION_IN_STREAM),
failoverTimeMillis: DEFAULT_FAILOVER_TIME_MILLIS,
maxRecords: DEFAULT_MAX_RECORDS,
idleTimeBetweenReadsInMillis: DEFAULT_IDLETIME_BETWEEN_READS_MILLIS,
callProcessRecordsEvenForEmptyRecordList: DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST,
parentShardPollIntervalMillis: DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS,
shardSyncIntervalMillis: DEFAULT_SHARD_SYNC_INTERVAL_MILLIS,
cleanupTerminatedShardsBeforeExpiry: DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION,
taskBackoffTimeMillis: DEFAULT_TASK_BACKOFF_TIME_MILLIS,
metricsBufferTimeMillis: DEFAULT_METRICS_BUFFER_TIME_MILLIS,
metricsMaxQueueSize: DEFAULT_METRICS_MAX_QUEUE_SIZE,
validateSequenceNumberBeforeCheckpointing: DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING,
regionName: "",
shutdownGraceMillis: DEFAULT_SHUTDOWN_GRACE_MILLIS,
maxLeasesForWorker: DEFAULT_MAX_LEASES_FOR_WORKER,
maxLeasesToStealAtOneTime: DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME,
initialLeaseTableReadCapacity: DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY,
initialLeaseTableWriteCapacity: DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY,
skipShardSyncAtWorkerInitializationIfLeasesExist: DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
}
}
// WithTableName to provide alternative lease table in DynamoDB
func (c *KinesisClientLibConfiguration) WithTableName(tableName string) *KinesisClientLibConfiguration {
c.tableName = tableName
return c
}
func (c *KinesisClientLibConfiguration) WithKinesisEndpoint(kinesisEndpoint string) *KinesisClientLibConfiguration {
c.kinesisEndpoint = kinesisEndpoint
return c
}
func (c *KinesisClientLibConfiguration) WithInitialPositionInStream(initialPositionInStream InitialPositionInStream) *KinesisClientLibConfiguration {
c.initialPositionInStream = initialPositionInStream
c.initialPositionInStreamExtended = *newInitialPosition(initialPositionInStream)
return c
}
func (c *KinesisClientLibConfiguration) WithTimestampAtInitialPositionInStream(timestamp *time.Time) *KinesisClientLibConfiguration {
c.initialPositionInStream = AT_TIMESTAMP
c.initialPositionInStreamExtended = *newInitialPositionAtTimestamp(timestamp)
return c
}
func (c *KinesisClientLibConfiguration) WithFailoverTimeMillis(failoverTimeMillis int) *KinesisClientLibConfiguration {
checkIsValuePositive("FailoverTimeMillis", failoverTimeMillis)
c.failoverTimeMillis = failoverTimeMillis
return c
}
func (c *KinesisClientLibConfiguration) WithShardSyncIntervalMillis(shardSyncIntervalMillis int) *KinesisClientLibConfiguration {
checkIsValuePositive("ShardSyncIntervalMillis", shardSyncIntervalMillis)
c.shardSyncIntervalMillis = shardSyncIntervalMillis
return c
}
func (c *KinesisClientLibConfiguration) WithMaxRecords(maxRecords int) *KinesisClientLibConfiguration {
checkIsValuePositive("MaxRecords", maxRecords)
c.maxRecords = maxRecords
return c
}
/**
* Controls how long the KCL will sleep if no records are returned from Kinesis
*
* <p>
* This value is only used when no records are returned; if records are returned, the {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask} will
* immediately retrieve the next set of records after the call to
* {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor#processRecords(ProcessRecordsInput)}
* has returned. Setting this value to high may result in the KCL being unable to catch up. If you are changing this
* value it's recommended that you enable {@link #withCallProcessRecordsEvenForEmptyRecordList(boolean)}, and
* monitor how far behind the records retrieved are by inspecting
* {@link com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput#getMillisBehindLatest()}, and the
* <a href=
* "http://docs.aws.amazon.com/streams/latest/dev/monitoring-with-cloudwatch.html#kinesis-metrics-stream">CloudWatch
* Metric: GetRecords.MillisBehindLatest</a>
* </p>
*
* @param idleTimeBetweenReadsInMillis
* how long to sleep between GetRecords calls when no records are returned.
* @return KinesisClientLibConfiguration
*/
func (c *KinesisClientLibConfiguration) WithIdleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis int) *KinesisClientLibConfiguration {
checkIsValuePositive("IdleTimeBetweenReadsInMillis", idleTimeBetweenReadsInMillis)
c.idleTimeBetweenReadsInMillis = idleTimeBetweenReadsInMillis
return c
}
func (c *KinesisClientLibConfiguration) WithCallProcessRecordsEvenForEmptyRecordList(callProcessRecordsEvenForEmptyRecordList bool) *KinesisClientLibConfiguration {
c.callProcessRecordsEvenForEmptyRecordList = callProcessRecordsEvenForEmptyRecordList
return c
}
func (c *KinesisClientLibConfiguration) WithTaskBackoffTimeMillis(taskBackoffTimeMillis int) *KinesisClientLibConfiguration {
checkIsValuePositive("taskBackoffTimeMillis", taskBackoffTimeMillis)
c.taskBackoffTimeMillis = taskBackoffTimeMillis
return c
}
// WithMetricsBufferTimeMillis configures Metrics are buffered for at most this long before publishing to CloudWatch
func (c *KinesisClientLibConfiguration) WithMetricsBufferTimeMillis(metricsBufferTimeMillis int) *KinesisClientLibConfiguration {
checkIsValuePositive("metricsBufferTimeMillis", metricsBufferTimeMillis)
c.metricsBufferTimeMillis = metricsBufferTimeMillis
return c
}
// WithMetricsMaxQueueSize configures Max number of metrics to buffer before publishing to CloudWatch
func (c *KinesisClientLibConfiguration) WithMetricsMaxQueueSize(metricsMaxQueueSize int) *KinesisClientLibConfiguration {
checkIsValuePositive("metricsMaxQueueSize", metricsMaxQueueSize)
c.metricsMaxQueueSize = metricsMaxQueueSize
return c
}
// WithRegionName configures region for the stream
func (c *KinesisClientLibConfiguration) WithRegionName(regionName string) *KinesisClientLibConfiguration {
checkIsValueNotEmpty("regionName", regionName)
c.regionName = regionName
return c
}
// Getters

View file

@ -0,0 +1,227 @@
package interfaces
import (
ks "github.com/aws/aws-sdk-go/service/kinesis"
. "clientlibrary/types"
)
type (
IPreparedCheckpointer interface {
getPendingCheckpoint() ExtendedSequenceNumber
/**
* This method will record a pending checkpoint.
*
* @error ThrottlingError Can't store checkpoint. Can be caused by checkpointing too frequently.
* Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency.
* @error ShutdownError The record processor instance has been shutdown. Another instance may have
* started processing some of these records already.
* The application should abort processing via this RecordProcessor instance.
* @error InvalidStateError Can't store checkpoint.
* Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist).
* @error KinesisClientLibDependencyError Encountered an issue when storing the checkpoint. The application can
* backoff and retry.
* @error IllegalArgumentError The sequence number being checkpointed is invalid because it is out of range,
* i.e. it is smaller than the last check point value (prepared or committed), or larger than the greatest
* sequence number seen by the associated record processor.
*/
checkpoint() error
}
/**
* Used by RecordProcessors when they want to checkpoint their progress.
* The Kinesis Client Library will pass an object implementing this interface to RecordProcessors, so they can
* checkpoint their progress.
*/
IRecordProcessorCheckpointer interface {
/**
* This method will checkpoint the progress at the last data record that was delivered to the record processor.
* Upon fail over (after a successful checkpoint() call), the new/replacement RecordProcessor instance
* will receive data records whose sequenceNumber > checkpoint position (for each partition key).
* In steady state, applications should checkpoint periodically (e.g. once every 5 minutes).
* Calling this API too frequently can slow down the application (because it puts pressure on the underlying
* checkpoint storage layer).
*
* @error ThrottlingError Can't store checkpoint. Can be caused by checkpointing too frequently.
* Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency.
* @error ShutdownError The record processor instance has been shutdown. Another instance may have
* started processing some of these records already.
* The application should abort processing via this RecordProcessor instance.
* @error InvalidStateError Can't store checkpoint.
* Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist).
* @error KinesisClientLibDependencyError Encountered an issue when storing the checkpoint. The application can
* backoff and retry.
*/
checkpoint() error
/**
* This method will checkpoint the progress at the provided record. This method is analogous to
* {@link #checkpoint()} but provides the ability to specify the record at which to
* checkpoint.
*
* @param record A record at which to checkpoint in this shard. Upon failover,
* the Kinesis Client Library will start fetching records after this record's sequence number.
* @error ThrottlingError Can't store checkpoint. Can be caused by checkpointing too frequently.
* Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency.
* @error ShutdownError The record processor instance has been shutdown. Another instance may have
* started processing some of these records already.
* The application should abort processing via this RecordProcessor instance.
* @error InvalidStateError Can't store checkpoint.
* Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist).
* @error KinesisClientLibDependencyError Encountered an issue when storing the checkpoint. The application can
* backoff and retry.
*/
checkpointByRecord(record *ks.Record) error
/**
* This method will checkpoint the progress at the provided sequenceNumber. This method is analogous to
* {@link #checkpoint()} but provides the ability to specify the sequence number at which to
* checkpoint.
*
* @param sequenceNumber A sequence number at which to checkpoint in this shard. Upon failover,
* the Kinesis Client Library will start fetching records after this sequence number.
* @error ThrottlingError Can't store checkpoint. Can be caused by checkpointing too frequently.
* Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency.
* @error ShutdownError The record processor instance has been shutdown. Another instance may have
* started processing some of these records already.
* The application should abort processing via this RecordProcessor instance.
* @error InvalidStateError Can't store checkpoint.
* Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist).
* @error KinesisClientLibDependencyError Encountered an issue when storing the checkpoint. The application can
* backoff and retry.
* @error IllegalArgumentError The sequence number is invalid for one of the following reasons:
* 1.) It appears to be out of range, i.e. it is smaller than the last check point value, or larger than the
* greatest sequence number seen by the associated record processor.
* 2.) It is not a valid sequence number for a record in this shard.
*/
checkpointBySequenceNumber(sequenceNumber string) error
/**
* This method will checkpoint the progress at the provided sequenceNumber and subSequenceNumber, the latter for
* aggregated records produced with the Producer Library. This method is analogous to {@link #checkpoint()}
* but provides the ability to specify the sequence and subsequence numbers at which to checkpoint.
*
* @param sequenceNumber A sequence number at which to checkpoint in this shard. Upon failover, the Kinesis
* Client Library will start fetching records after the given sequence and subsequence numbers.
* @param subSequenceNumber A subsequence number at which to checkpoint within this shard. Upon failover, the
* Kinesis Client Library will start fetching records after the given sequence and subsequence numbers.
* @error ThrottlingError Can't store checkpoint. Can be caused by checkpointing too frequently.
* Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency.
* @error ShutdownError The record processor instance has been shutdown. Another instance may have
* started processing some of these records already.
* The application should abort processing via this RecordProcessor instance.
* @error InvalidStateError Can't store checkpoint.
* Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist).
* @error KinesisClientLibDependencyError Encountered an issue when storing the checkpoint. The application can
* backoff and retry.
* @error IllegalArgumentError The sequence number is invalid for one of the following reasons:
* 1.) It appears to be out of range, i.e. it is smaller than the last check point value, or larger than the
* greatest sequence number seen by the associated record processor.
* 2.) It is not a valid sequence number for a record in this shard.
*/
checkpointBySequenceNumberEx(sequenceNumber string, subSequenceNumber int64) error
/**
* This method will record a pending checkpoint at the last data record that was delivered to the record processor.
* If the application fails over between calling prepareCheckpoint() and checkpoint(), the init() method of the next
* IRecordProcessor for this shard will be informed of the prepared sequence number
*
* Application should use this to assist with idempotency across failover by calling prepareCheckpoint before having
* side effects, then by calling checkpoint on the returned PreparedCheckpointer after side effects are complete.
* Use the sequence number passed in to init() to behave idempotently.
*
* @return an IPreparedCheckpointer object that can be called later to persist the checkpoint.
*
* @error ThrottlingError Can't store pending checkpoint. Can be caused by checkpointing too frequently.
* Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency.
* @error ShutdownError The record processor instance has been shutdown. Another instance may have
* started processing some of these records already.
* The application should abort processing via this RecordProcessor instance.
* @error InvalidStateError Can't store pending checkpoint.
* Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist).
* @error KinesisClientLibDependencyError Encountered an issue when storing the pending checkpoint. The
* application can backoff and retry.
*/
prepareCheckpoint() (*IPreparedCheckpointer, error)
/**
* This method will record a pending checkpoint at the at the provided record. This method is analogous to
* {@link #prepareCheckpoint()} but provides the ability to specify the record at which to prepare the checkpoint.
*
* @param record A record at which to prepare checkpoint in this shard.
*
* Application should use this to assist with idempotency across failover by calling prepareCheckpoint before having
* side effects, then by calling checkpoint on the returned PreparedCheckpointer after side effects are complete.
* Use the sequence number and application state passed in to init() to behave idempotently.
*
* @return an IPreparedCheckpointer object that can be called later to persist the checkpoint.
*
* @error ThrottlingError Can't store pending checkpoint. Can be caused by checkpointing too frequently.
* Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency.
* @error ShutdownError The record processor instance has been shutdown. Another instance may have
* started processing some of these records already.
* The application should abort processing via this RecordProcessor instance.
* @error InvalidStateError Can't store pending checkpoint.
* Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist).
* @error KinesisClientLibDependencyError Encountered an issue when storing the pending checkpoint. The
* application can backoff and retry.
* @error IllegalArgumentError The sequence number is invalid for one of the following reasons:
* 1.) It appears to be out of range, i.e. it is smaller than the last check point value, or larger than the
* greatest sequence number seen by the associated record processor.
* 2.) It is not a valid sequence number for a record in this shard.
*/
prepareCheckpointByRecord(record *ks.Record) (*IPreparedCheckpointer, error)
/**
* This method will record a pending checkpoint at the provided sequenceNumber. This method is analogous to
* {@link #prepareCheckpoint()} but provides the ability to specify the sequence number at which to checkpoint.
*
* @param sequenceNumber A sequence number at which to prepare checkpoint in this shard.
* @return an IPreparedCheckpointer object that can be called later to persist the checkpoint.
*
* @error ThrottlingError Can't store pending checkpoint. Can be caused by checkpointing too frequently.
* Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency.
* @error ShutdownError The record processor instance has been shutdown. Another instance may have
* started processing some of these records already.
* The application should abort processing via this RecordProcessor instance.
* @error InvalidStateError Can't store pending checkpoint.
* Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist).
* @error KinesisClientLibDependencyError Encountered an issue when storing the pending checkpoint. The
* application can backoff and retry.
* @error IllegalArgumentError The sequence number is invalid for one of the following reasons:
* 1.) It appears to be out of range, i.e. it is smaller than the last check point value, or larger than the
* greatest sequence number seen by the associated record processor.
* 2.) It is not a valid sequence number for a record in this shard.
*/
prepareCheckpointBySequenceNumber(sequenceNumber string) (*IPreparedCheckpointer, error)
/**
* This method will record a pending checkpoint at the provided sequenceNumber and subSequenceNumber, the latter for
* aggregated records produced with the Producer Library. This method is analogous to {@link #prepareCheckpoint()}
* but provides the ability to specify the sequence number at which to checkpoint
*
* @param sequenceNumber A sequence number at which to prepare checkpoint in this shard.
* @param subSequenceNumber A subsequence number at which to prepare checkpoint within this shard.
*
* @return an IPreparedCheckpointer object that can be called later to persist the checkpoint.
*
* @error ThrottlingError Can't store pending checkpoint. Can be caused by checkpointing too frequently.
* Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency.
* @error ShutdownError The record processor instance has been shutdown. Another instance may have
* started processing some of these records already.
* The application should abort processing via this RecordProcessor instance.
* @error InvalidStateError Can't store pending checkpoint.
* Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist).
* @error KinesisClientLibDependencyError Encountered an issue when storing the pending checkpoint. The
* application can backoff and retry.
* @error IllegalArgumentError The sequence number is invalid for one of the following reasons:
* 1.) It appears to be out of range, i.e. it is smaller than the last check point value, or larger than the
* greatest sequence number seen by the associated record processor.
* 2.) It is not a valid sequence number for a record in this shard.
*/
prepareCheckpointBySequenceNumberEx(sequenceNumber string, subSequenceNumber int64) (*IPreparedCheckpointer, error)
}
)

View file

@ -0,0 +1,44 @@
package interfaces
import (
. "clientlibrary/types"
)
// IRecordProcessor is the interface for some callback functions invoked by KCL will
// The main task of using KCL is to provide implementation on IRecordProcessor interface.
// Note: This is exactly the same interface as Amazon KCL IRecordProcessor v2
type IRecordProcessor interface {
/**
* Invoked by the Amazon Kinesis Client Library before data records are delivered to the RecordProcessor instance
* (via processRecords).
*
* @param initializationInput Provides information related to initialization
*/
initialize(initializationInput InitializationInput)
/**
* Process data records. The Amazon Kinesis Client Library will invoke this method to deliver data records to the
* application.
* Upon fail over, the new instance will get records with sequence number > checkpoint position
* for each partition key.
*
* @param processRecordsInput Provides the records to be processed as well as information and capabilities related
* to them (eg checkpointing).
*/
processRecords(processRecordsInput ProcessRecordsInput)
/**
* Invoked by the Amazon Kinesis Client Library to indicate it will no longer send data records to this
* RecordProcessor instance.
*
* <h2><b>Warning</b></h2>
*
* When the value of {@link ShutdownInput#getShutdownReason()} is
* {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason#TERMINATE} it is required that you
* checkpoint. Failure to do so will result in an IllegalArgumentException, and the KCL no longer making progress.
*
* @param shutdownInput
* Provides information and capabilities (eg checkpointing) related to shutdown of this record processor.
*/
shutdown(shutdownInput ShutdownInput)
}

View file

@ -0,0 +1,26 @@
package checkpoint
import (
. "clientlibrary/types"
)
const (
// TRIM_HORIZON starts from the first available record in the shard.
TRIM_HORIZON = SentinelCheckpoint(iota + 1)
// LATEST starts from the latest record in the shard.
LATEST
// SHARD_END We've completely processed all records in this shard.
SHARD_END
// AT_TIMESTAMP starts from the record at or after the specified server-side timestamp.
AT_TIMESTAMP
)
type (
SentinelCheckpoint int
// Checkpoint: a class encapsulating the 2 pieces of state stored in a checkpoint.
Checkpoint struct {
checkpoint *ExtendedSequenceNumber
pendingCheckpoint *ExtendedSequenceNumber
}
)

View file

@ -0,0 +1 @@
package worker

View file

@ -0,0 +1,39 @@
package types
import (
"time"
ks "github.com/aws/aws-sdk-go/service/kinesis"
. "clientlibrary/interfaces"
)
const (
REQUESTED = ShutdownReason(1)
TERMINATE = REQUESTED + 1
ZOMBIE = TERMINATE + 1
)
// Containers for the parameters to the IRecordProcessor
type (
ShutdownReason int
InitializationInput struct {
shardId string
extendedSequenceNumber *ExtendedSequenceNumber
pendingCheckpointSequenceNumber *ExtendedSequenceNumber
}
ProcessRecordsInput struct {
cacheEntryTime *time.Time
cacheExitTime *time.Time
records []*ks.Record
checkpointer *IRecordProcessorCheckpointer
millisBehindLatest int64
}
ShutdownInput struct {
shutdownReason ShutdownReason
checkpointer *IRecordProcessorCheckpointer
}
)

View file

@ -0,0 +1,11 @@
package types
// ExtendedSequenceNumber represents a two-part sequence number for records aggregated by the Kinesis Producer Library.
//
// The KPL combines multiple user records into a single Kinesis record. Each user record therefore has an integer
// sub-sequence number, in addition to the regular sequence number of the Kinesis record. The sub-sequence number
// is used to checkpoint within an aggregated record.
type ExtendedSequenceNumber struct {
sequenceNumber string
subSequenceNumber int64
}

View file

@ -0,0 +1,14 @@
package utils
import (
guuid "github.com/google/uuid"
)
// MustNewUUID generates a new UUID and panics if failed
func MustNewUUID() string {
id, err := guuid.NewUUID()
if err != nil {
panic(err)
}
return id.String()
}

146
src/common/errors.go Normal file
View file

@ -0,0 +1,146 @@
package common
import (
"fmt"
"net/http"
)
// ErrorCode is unified definition of numerical error codes
type ErrorCode int32
// pre-defined error codes
const (
// System Wide 20000 - 20199
KinesisClientLibError ErrorCode = 20000
// KinesisClientLibrary Retryable Errors 20001 - 20099
KinesisClientLibRetryableError ErrorCode = 20001
KinesisClientLibIOError ErrorCode = 20002
BlockedOnParentShardError ErrorCode = 20003
KinesisClientLibDependencyError ErrorCode = 20004
ThrottlingError ErrorCode = 20005
// KinesisClientLibrary NonRetryable Errors 20100 - 20149
KinesisClientLibNonRetryableException ErrorCode = 20000
InvalidStateError ErrorCode = 20101
ShutdownError ErrorCode = 20102
// Kinesis Lease Errors 20150 - 20199
LeasingError ErrorCode = 20150
LeasingInvalidStateError ErrorCode = 20151
LeasingDependencyError ErrorCode = 20152
LeasingProvisionedThroughputError ErrorCode = 20153
// Error indicates passing illegal or inappropriate argument
IllegalArgumentError ErrorCode = 20198
// NotImplemented
KinesisClientLibNotImplemented ErrorCode = 20199
)
var errorMap = map[ErrorCode]ClientLibraryError{
KinesisClientLibError: {ErrorCode: KinesisClientLibError, Retryable: true, Status: http.StatusServiceUnavailable, Msg: "Top level error of Kinesis Client Library"},
// Retryable
KinesisClientLibRetryableError: {ErrorCode: KinesisClientLibRetryableError, Retryable: true, Status: http.StatusServiceUnavailable, Msg: "Retryable exceptions (e.g. transient errors). The request/operation is expected to succeed upon (back off and) retry."},
KinesisClientLibIOError: {ErrorCode: KinesisClientLibIOError, Retryable: true, Status: http.StatusServiceUnavailable, Msg: "Error in reading/writing information (e.g. shard information from Kinesis may not be current/complete)."},
BlockedOnParentShardError: {ErrorCode: BlockedOnParentShardError, Retryable: true, Status: http.StatusServiceUnavailable, Msg: "Cannot start processing data for a shard because the data from the parent shard has not been completely processed (yet)."},
KinesisClientLibDependencyError: {ErrorCode: KinesisClientLibDependencyError, Retryable: true, Status: http.StatusServiceUnavailable, Msg: "Cannot talk to its dependencies (e.g. fetching data from Kinesis, DynamoDB table reads/writes, emitting metrics to CloudWatch)."},
ThrottlingError: {ErrorCode: ThrottlingError, Retryable: true, Status: http.StatusTooManyRequests, Msg: "Requests are throttled by a service (e.g. DynamoDB when storing a checkpoint)."},
// Non-Retryable
KinesisClientLibNonRetryableException: {ErrorCode: KinesisClientLibNonRetryableException, Retryable: false, Status: http.StatusServiceUnavailable, Msg: "Non-retryable exceptions. Simply retrying the same request/operation is not expected to succeed."},
InvalidStateError: {ErrorCode: InvalidStateError, Retryable: false, Status: http.StatusServiceUnavailable, Msg: "Kinesis Library has issues with internal state (e.g. DynamoDB table is not found)."},
ShutdownError: {ErrorCode: ShutdownError, Retryable: false, Status: http.StatusServiceUnavailable, Msg: "The RecordProcessor instance has been shutdown (e.g. and attempts a checkpiont)."},
// Leasing
LeasingError: {ErrorCode: LeasingError, Retryable: true, Status: http.StatusServiceUnavailable, Msg: "Top-level error type for the leasing code."},
LeasingInvalidStateError: {ErrorCode: LeasingInvalidStateError, Retryable: true, Status: http.StatusServiceUnavailable, Msg: "Error in a lease operation has failed because DynamoDB is an invalid state"},
LeasingDependencyError: {ErrorCode: LeasingDependencyError, Retryable: true, Status: http.StatusServiceUnavailable, Msg: "Error in a lease operation has failed because a dependency of the leasing system has failed."},
LeasingProvisionedThroughputError: {ErrorCode: LeasingProvisionedThroughputError, Retryable: false, Status: http.StatusServiceUnavailable, Msg: "Error in a lease operation has failed due to lack of provisioned throughput for a DynamoDB table."},
// IllegalArgumentError
IllegalArgumentError: {ErrorCode: IllegalArgumentError, Retryable: false, Status: http.StatusBadRequest, Msg: "Error indicates that a method has been passed an illegal or inappropriate argument."},
// Not Implemented
KinesisClientLibNotImplemented: {ErrorCode: KinesisClientLibNotImplemented, Retryable: false, Status: http.StatusNotImplemented, Msg: "Not Implemented"},
}
// Message returns the message of the error code
func (c ErrorCode) Message() string {
return errorMap[c].Msg
}
// MakeErr makes an error with default message
func (c ErrorCode) MakeErr() *ClientLibraryError {
e := errorMap[c]
return &e
}
// MakeError makes an error with message and data
func (c ErrorCode) MakeError(detail string) error {
e := errorMap[c]
return e.WithDetail(detail)
}
// ClientLibraryError is unified error
type ClientLibraryError struct {
// ErrorCode is the numerical error code.
ErrorCode `json:"code"`
// Retryable is a bool flag to indicate the whether the error is retryable or not.
Retryable bool `json:"tryable"`
// Status is the HTTP status code.
Status int `json:"status"`
// Msg provides a terse description of the error. Its value is defined in errorMap.
Msg string `json:"msg"`
// Detail provides a detailed description of the error. Its value is set using WithDetail.
Detail string `json:"detail"`
}
// Error implements error
func (e *ClientLibraryError) Error() string {
var prefix string
if e.Retryable {
prefix = "Retryable"
} else {
prefix = "NonRetryable"
}
msg := fmt.Sprintf("%v Error [%d]: %s", prefix, int32(e.ErrorCode), e.Msg)
if e.Detail != "" {
msg = fmt.Sprintf("%s, detail: %s", msg, e.Detail)
}
return msg
}
// WithMsg overwrites the default error message
func (e *ClientLibraryError) WithMsg(format string, v ...interface{}) *ClientLibraryError {
e.Msg = fmt.Sprintf(format, v...)
return e
}
// WithDetail adds a detailed message to error
func (e *ClientLibraryError) WithDetail(format string, v ...interface{}) *ClientLibraryError {
if len(e.Detail) == 0 {
e.Detail = fmt.Sprintf(format, v...)
} else {
e.Detail += ", " + fmt.Sprintf(format, v...)
}
return e
}
// WithCause adds CauseBy to error
func (e *ClientLibraryError) WithCause(err error) *ClientLibraryError {
if err != nil {
// Store error message in Detail, so the info can be preserved
// when CascadeError is marshaled to json.
if len(e.Detail) == 0 {
e.Detail = err.Error()
} else {
e.Detail += ", cause: " + err.Error()
}
}
return e
}

2
src/vendor/manifest vendored
View file

@ -159,7 +159,7 @@
"importpath": "gopkg.in/yaml.v2", "importpath": "gopkg.in/yaml.v2",
"repository": "https://gopkg.in/yaml.v2", "repository": "https://gopkg.in/yaml.v2",
"vcs": "git", "vcs": "git",
"revision": "cd8b52f8269e0feb286dfeef29f8fe4d5b397e0b", "revision": "5420a8b6744d3b0345ab293f6fcba19c978f1183",
"branch": "v2", "branch": "v2",
"notests": true "notests": true
} }