From 702335374c1a52d248babcfaeab4cd57f7a591a5 Mon Sep 17 00:00:00 2001 From: Tao Jiang Date: Tue, 10 Apr 2018 20:50:18 -0700 Subject: [PATCH] [WIP] KCL: create configuration and interface for kinesis client library This is to create configuration and client interface in order to give user an overview on how the Kinesis client library works. In order not to reinvent wheel, the api is designed closely aligned with Amazon Kinesis Client Library in Java. add errors. remove @throws and use @error instead. https://jira.eng.vmware.com/browse/CNA-614 Change-Id: I78a269b328c14df37f878eccef192ff022a669cc --- src/clientlibrary/config/config.go | 233 ++++++++++++++++++ src/clientlibrary/config/config_test.go | 23 ++ .../config/initial-stream-pos.go | 13 + src/clientlibrary/config/kcl-config.go | 147 +++++++++++ .../record-processor-checkpointer.go | 227 +++++++++++++++++ .../interfaces/record-processor.go | 44 ++++ .../lib/checkpoint/checkpoint.go | 26 ++ src/clientlibrary/lib/worker/worker.go | 1 + src/clientlibrary/types/inputs.go | 39 +++ src/clientlibrary/types/sequence-number.go | 11 + src/clientlibrary/utils/uuid.go | 14 ++ src/common/errors.go | 146 +++++++++++ src/vendor/manifest | 2 +- 13 files changed, 925 insertions(+), 1 deletion(-) create mode 100644 src/clientlibrary/config/config.go create mode 100644 src/clientlibrary/config/config_test.go create mode 100644 src/clientlibrary/config/initial-stream-pos.go create mode 100644 src/clientlibrary/config/kcl-config.go create mode 100644 src/clientlibrary/interfaces/record-processor-checkpointer.go create mode 100644 src/clientlibrary/interfaces/record-processor.go create mode 100644 src/clientlibrary/lib/checkpoint/checkpoint.go create mode 100644 src/clientlibrary/lib/worker/worker.go create mode 100644 src/clientlibrary/types/inputs.go create mode 100644 src/clientlibrary/types/sequence-number.go create mode 100644 src/clientlibrary/utils/uuid.go create mode 100644 src/common/errors.go diff --git a/src/clientlibrary/config/config.go b/src/clientlibrary/config/config.go new file mode 100644 index 0000000..add0c4b --- /dev/null +++ b/src/clientlibrary/config/config.go @@ -0,0 +1,233 @@ +package config + +import ( + "log" + "math" + "strings" + "time" +) + +const ( + EPSILON_MS = 25 + + // LATEST start after the most recent data record (fetch new data). + LATEST = InitialPositionInStream(1) + // TRIM_HORIZON start from the oldest available data record + TRIM_HORIZON = LATEST + 1 + // AT_TIMESTAMP start from the record at or after the specified server-side timestamp. + AT_TIMESTAMP = TRIM_HORIZON + 1 + + // The location in the shard from which the KinesisClientLibrary will start fetching records from + // when the application starts for the first time and there is no checkpoint for the shard. + DEFAULT_INITIAL_POSITION_IN_STREAM = LATEST + + // Fail over time in milliseconds. A worker which does not renew it's lease within this time interval + // will be regarded as having problems and it's shards will be assigned to other workers. + // For applications that have a large number of shards, this may be set to a higher number to reduce + // the number of DynamoDB IOPS required for tracking leases. + DEFAULT_FAILOVER_TIME_MILLIS = 10000 + + // Max records to fetch from Kinesis in a single GetRecords call. + DEFAULT_MAX_RECORDS = 10000 + + // The default value for how long the {@link ShardConsumer} should sleep if no records are returned from the call to + DEFAULT_IDLETIME_BETWEEN_READS_MILLIS = 1000 + + // Don't call processRecords() on the record processor for empty record lists. + DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST = false + + // Interval in milliseconds between polling to check for parent shard completion. + // Polling frequently will take up more DynamoDB IOPS (when there are leases for shards waiting on + // completion of parent shards). + DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS = 10000 + + // Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks. + DEFAULT_SHARD_SYNC_INTERVAL_MILLIS = 60000 + + // Cleanup leases upon shards completion (don't wait until they expire in Kinesis). + // Keeping leases takes some tracking/resources (e.g. they need to be renewed, assigned), so by default we try + // to delete the ones we don't need any longer. + DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION = true + + // Backoff time in milliseconds for Amazon Kinesis Client Library tasks (in the event of failures). + DEFAULT_TASK_BACKOFF_TIME_MILLIS = 500 + + // Buffer metrics for at most this long before publishing to CloudWatch. + DEFAULT_METRICS_BUFFER_TIME_MILLIS = 10000 + + // Buffer at most this many metrics before publishing to CloudWatch. + DEFAULT_METRICS_MAX_QUEUE_SIZE = 10000 + + // KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls + // to {@link RecordProcessorCheckpointer#checkpoint(String)} by default. + DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING = true + + // The max number of leases (shards) this worker should process. + // This can be useful to avoid overloading (and thrashing) a worker when a host has resource constraints + // or during deployment. + // NOTE: Setting this to a low value can cause data loss if workers are not able to pick up all shards in the + // stream due to the max limit. + DEFAULT_MAX_LEASES_FOR_WORKER = math.MaxInt16 + + // Max leases to steal from another worker at one time (for load balancing). + // Setting this to a higher number can allow for faster load convergence (e.g. during deployments, cold starts), + // but can cause higher churn in the system. + DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME = 1 + + // The Amazon DynamoDB table used for tracking leases will be provisioned with this read capacity. + DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY = 10 + + // The Amazon DynamoDB table used for tracking leases will be provisioned with this write capacity. + DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY = 10 + + // The Worker will skip shard sync during initialization if there are one or more leases in the lease table. This + // assumes that the shards and leases are in-sync. This enables customers to choose faster startup times (e.g. + // during incremental deployments of an application). + DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST = false + + // The amount of milliseconds to wait before graceful shutdown forcefully terminates. + DEFAULT_SHUTDOWN_GRACE_MILLIS = 5000 + + // The size of the thread pool to create for the lease renewer to use. + DEFAULT_MAX_LEASE_RENEWAL_THREADS = 20 + + // The sleep time between two listShards calls from the proxy when throttled. + DEFAULT_LIST_SHARDS_BACKOFF_TIME_IN_MILLIS = 1500 + + // The number of times the Proxy will retry listShards call when throttled. + DEFAULT_MAX_LIST_SHARDS_RETRY_ATTEMPTS = 50 +) + +type ( + // InitialPositionInStream Used to specify the position in the stream where a new application should start from + // This is used during initial application bootstrap (when a checkpoint doesn't exist for a shard or its parents) + InitialPositionInStream int + + // Class that houses the entities needed to specify the position in the stream from where a new application should + // start. + InitialPositionInStreamExtended struct { + position InitialPositionInStream + + // The time stamp of the data record from which to start reading. Used with + // shard iterator type AT_TIMESTAMP. A time stamp is the Unix epoch date with + // precision in milliseconds. For example, 2016-04-04T19:58:46.480-00:00 or + // 1459799926.480. If a record with this exact time stamp does not exist, the + // iterator returned is for the next (later) record. If the time stamp is older + // than the current trim horizon, the iterator returned is for the oldest untrimmed + // data record (TRIM_HORIZON). + timestamp *time.Time `type:"timestamp" timestampFormat:"unix"` + } + + // Configuration for the Kinesis Client Library. + KinesisClientLibConfiguration struct { + // applicationName is name of application. Kinesis allows multiple applications to consume the same stream. + applicationName string + + // tableName is name of the dynamo db table for managing kinesis stream default to applicationName + tableName string + + // streamName is the name of Kinesis stream + streamName string + + // workerID used to distinguish different workers/processes of a Kinesis application + workerID string + + // kinesisEndpoint endpoint + kinesisEndpoint string + + // dynamoDB endpoint + dynamoDBEndpoint string + + // initialPositionInStream specifies the position in the stream where a new application should start from + initialPositionInStream InitialPositionInStream + + // initialPositionInStreamExtended provides actual AT_TMESTAMP value + initialPositionInStreamExtended InitialPositionInStreamExtended + + // credentials to access Kinesis/Dynamo/CloudWatch: https://docs.aws.amazon.com/sdk-for-go/api/aws/credentials/ + // Note: No need to configure here. Use NewEnvCredentials for testing and EC2RoleProvider for production + + // failoverTimeMillis Lease duration (leases not renewed within this period will be claimed by others) + failoverTimeMillis int + + /// maxRecords Max records to read per Kinesis getRecords() call + maxRecords int + + // idleTimeBetweenReadsInMillis Idle time between calls to fetch data from Kinesis + idleTimeBetweenReadsInMillis int + + // callProcessRecordsEvenForEmptyRecordList Call the IRecordProcessor::processRecords() API even if + // GetRecords returned an empty record list. + callProcessRecordsEvenForEmptyRecordList bool + + // parentShardPollIntervalMillis Wait for this long between polls to check if parent shards are done + parentShardPollIntervalMillis int + + // shardSyncIntervalMillis Time between tasks to sync leases and Kinesis shards + shardSyncIntervalMillis int + + // cleanupTerminatedShardsBeforeExpiry Clean up shards we've finished processing (don't wait for expiration) + cleanupTerminatedShardsBeforeExpiry bool + + // kinesisClientConfig Client Configuration used by Kinesis client + // dynamoDBClientConfig Client Configuration used by DynamoDB client + // cloudWatchClientConfig Client Configuration used by CloudWatch client + // Note: we will use default client provided by AWS SDK + + // taskBackoffTimeMillis Backoff period when tasks encounter an exception + taskBackoffTimeMillis int + + // metricsBufferTimeMillis Metrics are buffered for at most this long before publishing to CloudWatch + metricsBufferTimeMillis int + + // metricsMaxQueueSize Max number of metrics to buffer before publishing to CloudWatch + metricsMaxQueueSize int + + // validateSequenceNumberBeforeCheckpointing whether KCL should validate client provided sequence numbers + validateSequenceNumberBeforeCheckpointing bool + + // regionName The region name for the service + regionName string + + // shutdownGraceMillis The number of milliseconds before graceful shutdown terminates forcefully + shutdownGraceMillis int + + // Operation parameters + + // Max leases this Worker can handle at a time + maxLeasesForWorker int + + // Max leases to steal at one time (for load balancing) + maxLeasesToStealAtOneTime int + + // Read capacity to provision when creating the lease table (dynamoDB). + initialLeaseTableReadCapacity int + + // Write capacity to provision when creating the lease table. + initialLeaseTableWriteCapacity int + + // Worker should skip syncing shards and leases at startup if leases are present + // This is useful for optimizing deployments to large fleets working on a stable stream. + skipShardSyncAtWorkerInitializationIfLeasesExist bool + } +) + +func empty(s string) bool { + return len(strings.TrimSpace(s)) == 0 +} + +// checkIsValuePositive make sure the value is possitive. +func checkIsValueNotEmpty(key string, value string) { + if empty(value) { + // There is no point to continue for incorrect configuration. Fail fast! + log.Panicf("Non-empty value exepected for %v, actual: %v", key, value) + } +} + +// checkIsValuePositive make sure the value is possitive. +func checkIsValuePositive(key string, value int) { + if value <= 0 { + // There is no point to continue for incorrect configuration. Fail fast! + log.Panicf("Positive value exepected for %v, actual: %v", key, value) + } +} diff --git a/src/clientlibrary/config/config_test.go b/src/clientlibrary/config/config_test.go new file mode 100644 index 0000000..30318e4 --- /dev/null +++ b/src/clientlibrary/config/config_test.go @@ -0,0 +1,23 @@ +package config + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestConfig(t *testing.T) { + kclConfig := NewKinesisClientLibConfig("appName", "streamName", "workerId"). + WithFailoverTimeMillis(500). + WithMaxRecords(100). + WithInitialPositionInStream(TRIM_HORIZON). + WithIdleTimeBetweenReadsInMillis(20). + WithCallProcessRecordsEvenForEmptyRecordList(true). + WithTaskBackoffTimeMillis(10). + WithMetricsBufferTimeMillis(500). + WithMetricsMaxQueueSize(200). + WithRegionName("us-west-2") + + assert.Equal(t, "appName", kclConfig.applicationName) + assert.Equal(t, "500", kclConfig.failoverTimeMillis) +} diff --git a/src/clientlibrary/config/initial-stream-pos.go b/src/clientlibrary/config/initial-stream-pos.go new file mode 100644 index 0000000..54e9d39 --- /dev/null +++ b/src/clientlibrary/config/initial-stream-pos.go @@ -0,0 +1,13 @@ +package config + +import ( + "time" +) + +func newInitialPositionAtTimestamp(timestamp *time.Time) *InitialPositionInStreamExtended { + return &InitialPositionInStreamExtended{position: AT_TIMESTAMP, timestamp: timestamp} +} + +func newInitialPosition(position InitialPositionInStream) *InitialPositionInStreamExtended { + return &InitialPositionInStreamExtended{position: position, timestamp: nil} +} diff --git a/src/clientlibrary/config/kcl-config.go b/src/clientlibrary/config/kcl-config.go new file mode 100644 index 0000000..bbe8e6a --- /dev/null +++ b/src/clientlibrary/config/kcl-config.go @@ -0,0 +1,147 @@ +package config + +import ( + "clientlibrary/utils" + "time" +) + +// NewKinesisClientLibConfig to create a default KinesisClientLibConfiguration based on the required fields. +func NewKinesisClientLibConfig(applicationName, streamName, workerID string) *KinesisClientLibConfiguration { + checkIsValueNotEmpty("applicationName", applicationName) + checkIsValueNotEmpty("streamName", streamName) + checkIsValueNotEmpty("applicationName", applicationName) + + if empty(workerID) { + workerID = utils.MustNewUUID() + } + + // populate the KCL configuration with default values + return &KinesisClientLibConfiguration{ + applicationName: applicationName, + tableName: applicationName, + streamName: streamName, + workerID: workerID, + kinesisEndpoint: "", + initialPositionInStream: DEFAULT_INITIAL_POSITION_IN_STREAM, + initialPositionInStreamExtended: *newInitialPosition(DEFAULT_INITIAL_POSITION_IN_STREAM), + failoverTimeMillis: DEFAULT_FAILOVER_TIME_MILLIS, + maxRecords: DEFAULT_MAX_RECORDS, + idleTimeBetweenReadsInMillis: DEFAULT_IDLETIME_BETWEEN_READS_MILLIS, + callProcessRecordsEvenForEmptyRecordList: DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST, + parentShardPollIntervalMillis: DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS, + shardSyncIntervalMillis: DEFAULT_SHARD_SYNC_INTERVAL_MILLIS, + cleanupTerminatedShardsBeforeExpiry: DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION, + taskBackoffTimeMillis: DEFAULT_TASK_BACKOFF_TIME_MILLIS, + metricsBufferTimeMillis: DEFAULT_METRICS_BUFFER_TIME_MILLIS, + metricsMaxQueueSize: DEFAULT_METRICS_MAX_QUEUE_SIZE, + validateSequenceNumberBeforeCheckpointing: DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING, + regionName: "", + shutdownGraceMillis: DEFAULT_SHUTDOWN_GRACE_MILLIS, + maxLeasesForWorker: DEFAULT_MAX_LEASES_FOR_WORKER, + maxLeasesToStealAtOneTime: DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME, + initialLeaseTableReadCapacity: DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY, + initialLeaseTableWriteCapacity: DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY, + skipShardSyncAtWorkerInitializationIfLeasesExist: DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, + } +} + +// WithTableName to provide alternative lease table in DynamoDB +func (c *KinesisClientLibConfiguration) WithTableName(tableName string) *KinesisClientLibConfiguration { + c.tableName = tableName + return c +} + +func (c *KinesisClientLibConfiguration) WithKinesisEndpoint(kinesisEndpoint string) *KinesisClientLibConfiguration { + c.kinesisEndpoint = kinesisEndpoint + return c +} + +func (c *KinesisClientLibConfiguration) WithInitialPositionInStream(initialPositionInStream InitialPositionInStream) *KinesisClientLibConfiguration { + c.initialPositionInStream = initialPositionInStream + c.initialPositionInStreamExtended = *newInitialPosition(initialPositionInStream) + return c +} + +func (c *KinesisClientLibConfiguration) WithTimestampAtInitialPositionInStream(timestamp *time.Time) *KinesisClientLibConfiguration { + c.initialPositionInStream = AT_TIMESTAMP + c.initialPositionInStreamExtended = *newInitialPositionAtTimestamp(timestamp) + return c +} + +func (c *KinesisClientLibConfiguration) WithFailoverTimeMillis(failoverTimeMillis int) *KinesisClientLibConfiguration { + checkIsValuePositive("FailoverTimeMillis", failoverTimeMillis) + c.failoverTimeMillis = failoverTimeMillis + return c +} + +func (c *KinesisClientLibConfiguration) WithShardSyncIntervalMillis(shardSyncIntervalMillis int) *KinesisClientLibConfiguration { + checkIsValuePositive("ShardSyncIntervalMillis", shardSyncIntervalMillis) + c.shardSyncIntervalMillis = shardSyncIntervalMillis + return c +} + +func (c *KinesisClientLibConfiguration) WithMaxRecords(maxRecords int) *KinesisClientLibConfiguration { + checkIsValuePositive("MaxRecords", maxRecords) + c.maxRecords = maxRecords + return c +} + +/** + * Controls how long the KCL will sleep if no records are returned from Kinesis + * + *

+ * This value is only used when no records are returned; if records are returned, the {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask} will + * immediately retrieve the next set of records after the call to + * {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor#processRecords(ProcessRecordsInput)} + * has returned. Setting this value to high may result in the KCL being unable to catch up. If you are changing this + * value it's recommended that you enable {@link #withCallProcessRecordsEvenForEmptyRecordList(boolean)}, and + * monitor how far behind the records retrieved are by inspecting + * {@link com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput#getMillisBehindLatest()}, and the + * CloudWatch + * Metric: GetRecords.MillisBehindLatest + *

+ * + * @param idleTimeBetweenReadsInMillis + * how long to sleep between GetRecords calls when no records are returned. + * @return KinesisClientLibConfiguration + */ +func (c *KinesisClientLibConfiguration) WithIdleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis int) *KinesisClientLibConfiguration { + checkIsValuePositive("IdleTimeBetweenReadsInMillis", idleTimeBetweenReadsInMillis) + c.idleTimeBetweenReadsInMillis = idleTimeBetweenReadsInMillis + return c +} + +func (c *KinesisClientLibConfiguration) WithCallProcessRecordsEvenForEmptyRecordList(callProcessRecordsEvenForEmptyRecordList bool) *KinesisClientLibConfiguration { + c.callProcessRecordsEvenForEmptyRecordList = callProcessRecordsEvenForEmptyRecordList + return c +} + +func (c *KinesisClientLibConfiguration) WithTaskBackoffTimeMillis(taskBackoffTimeMillis int) *KinesisClientLibConfiguration { + checkIsValuePositive("taskBackoffTimeMillis", taskBackoffTimeMillis) + c.taskBackoffTimeMillis = taskBackoffTimeMillis + return c +} + +// WithMetricsBufferTimeMillis configures Metrics are buffered for at most this long before publishing to CloudWatch +func (c *KinesisClientLibConfiguration) WithMetricsBufferTimeMillis(metricsBufferTimeMillis int) *KinesisClientLibConfiguration { + checkIsValuePositive("metricsBufferTimeMillis", metricsBufferTimeMillis) + c.metricsBufferTimeMillis = metricsBufferTimeMillis + return c +} + +// WithMetricsMaxQueueSize configures Max number of metrics to buffer before publishing to CloudWatch +func (c *KinesisClientLibConfiguration) WithMetricsMaxQueueSize(metricsMaxQueueSize int) *KinesisClientLibConfiguration { + checkIsValuePositive("metricsMaxQueueSize", metricsMaxQueueSize) + c.metricsMaxQueueSize = metricsMaxQueueSize + return c +} + +// WithRegionName configures region for the stream +func (c *KinesisClientLibConfiguration) WithRegionName(regionName string) *KinesisClientLibConfiguration { + checkIsValueNotEmpty("regionName", regionName) + c.regionName = regionName + return c +} + +// Getters diff --git a/src/clientlibrary/interfaces/record-processor-checkpointer.go b/src/clientlibrary/interfaces/record-processor-checkpointer.go new file mode 100644 index 0000000..c752f04 --- /dev/null +++ b/src/clientlibrary/interfaces/record-processor-checkpointer.go @@ -0,0 +1,227 @@ +package interfaces + +import ( + ks "github.com/aws/aws-sdk-go/service/kinesis" + + . "clientlibrary/types" +) + +type ( + IPreparedCheckpointer interface { + getPendingCheckpoint() ExtendedSequenceNumber + + /** + * This method will record a pending checkpoint. + * + * @error ThrottlingError Can't store checkpoint. Can be caused by checkpointing too frequently. + * Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency. + * @error ShutdownError The record processor instance has been shutdown. Another instance may have + * started processing some of these records already. + * The application should abort processing via this RecordProcessor instance. + * @error InvalidStateError Can't store checkpoint. + * Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist). + * @error KinesisClientLibDependencyError Encountered an issue when storing the checkpoint. The application can + * backoff and retry. + * @error IllegalArgumentError The sequence number being checkpointed is invalid because it is out of range, + * i.e. it is smaller than the last check point value (prepared or committed), or larger than the greatest + * sequence number seen by the associated record processor. + */ + checkpoint() error + } + + /** + * Used by RecordProcessors when they want to checkpoint their progress. + * The Kinesis Client Library will pass an object implementing this interface to RecordProcessors, so they can + * checkpoint their progress. + */ + IRecordProcessorCheckpointer interface { + + /** + * This method will checkpoint the progress at the last data record that was delivered to the record processor. + * Upon fail over (after a successful checkpoint() call), the new/replacement RecordProcessor instance + * will receive data records whose sequenceNumber > checkpoint position (for each partition key). + * In steady state, applications should checkpoint periodically (e.g. once every 5 minutes). + * Calling this API too frequently can slow down the application (because it puts pressure on the underlying + * checkpoint storage layer). + * + * @error ThrottlingError Can't store checkpoint. Can be caused by checkpointing too frequently. + * Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency. + * @error ShutdownError The record processor instance has been shutdown. Another instance may have + * started processing some of these records already. + * The application should abort processing via this RecordProcessor instance. + * @error InvalidStateError Can't store checkpoint. + * Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist). + * @error KinesisClientLibDependencyError Encountered an issue when storing the checkpoint. The application can + * backoff and retry. + */ + checkpoint() error + + /** + * This method will checkpoint the progress at the provided record. This method is analogous to + * {@link #checkpoint()} but provides the ability to specify the record at which to + * checkpoint. + * + * @param record A record at which to checkpoint in this shard. Upon failover, + * the Kinesis Client Library will start fetching records after this record's sequence number. + * @error ThrottlingError Can't store checkpoint. Can be caused by checkpointing too frequently. + * Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency. + * @error ShutdownError The record processor instance has been shutdown. Another instance may have + * started processing some of these records already. + * The application should abort processing via this RecordProcessor instance. + * @error InvalidStateError Can't store checkpoint. + * Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist). + * @error KinesisClientLibDependencyError Encountered an issue when storing the checkpoint. The application can + * backoff and retry. + */ + checkpointByRecord(record *ks.Record) error + + /** + * This method will checkpoint the progress at the provided sequenceNumber. This method is analogous to + * {@link #checkpoint()} but provides the ability to specify the sequence number at which to + * checkpoint. + * + * @param sequenceNumber A sequence number at which to checkpoint in this shard. Upon failover, + * the Kinesis Client Library will start fetching records after this sequence number. + * @error ThrottlingError Can't store checkpoint. Can be caused by checkpointing too frequently. + * Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency. + * @error ShutdownError The record processor instance has been shutdown. Another instance may have + * started processing some of these records already. + * The application should abort processing via this RecordProcessor instance. + * @error InvalidStateError Can't store checkpoint. + * Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist). + * @error KinesisClientLibDependencyError Encountered an issue when storing the checkpoint. The application can + * backoff and retry. + * @error IllegalArgumentError The sequence number is invalid for one of the following reasons: + * 1.) It appears to be out of range, i.e. it is smaller than the last check point value, or larger than the + * greatest sequence number seen by the associated record processor. + * 2.) It is not a valid sequence number for a record in this shard. + */ + checkpointBySequenceNumber(sequenceNumber string) error + + /** + * This method will checkpoint the progress at the provided sequenceNumber and subSequenceNumber, the latter for + * aggregated records produced with the Producer Library. This method is analogous to {@link #checkpoint()} + * but provides the ability to specify the sequence and subsequence numbers at which to checkpoint. + * + * @param sequenceNumber A sequence number at which to checkpoint in this shard. Upon failover, the Kinesis + * Client Library will start fetching records after the given sequence and subsequence numbers. + * @param subSequenceNumber A subsequence number at which to checkpoint within this shard. Upon failover, the + * Kinesis Client Library will start fetching records after the given sequence and subsequence numbers. + * @error ThrottlingError Can't store checkpoint. Can be caused by checkpointing too frequently. + * Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency. + * @error ShutdownError The record processor instance has been shutdown. Another instance may have + * started processing some of these records already. + * The application should abort processing via this RecordProcessor instance. + * @error InvalidStateError Can't store checkpoint. + * Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist). + * @error KinesisClientLibDependencyError Encountered an issue when storing the checkpoint. The application can + * backoff and retry. + * @error IllegalArgumentError The sequence number is invalid for one of the following reasons: + * 1.) It appears to be out of range, i.e. it is smaller than the last check point value, or larger than the + * greatest sequence number seen by the associated record processor. + * 2.) It is not a valid sequence number for a record in this shard. + */ + checkpointBySequenceNumberEx(sequenceNumber string, subSequenceNumber int64) error + + /** + * This method will record a pending checkpoint at the last data record that was delivered to the record processor. + * If the application fails over between calling prepareCheckpoint() and checkpoint(), the init() method of the next + * IRecordProcessor for this shard will be informed of the prepared sequence number + * + * Application should use this to assist with idempotency across failover by calling prepareCheckpoint before having + * side effects, then by calling checkpoint on the returned PreparedCheckpointer after side effects are complete. + * Use the sequence number passed in to init() to behave idempotently. + * + * @return an IPreparedCheckpointer object that can be called later to persist the checkpoint. + * + * @error ThrottlingError Can't store pending checkpoint. Can be caused by checkpointing too frequently. + * Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency. + * @error ShutdownError The record processor instance has been shutdown. Another instance may have + * started processing some of these records already. + * The application should abort processing via this RecordProcessor instance. + * @error InvalidStateError Can't store pending checkpoint. + * Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist). + * @error KinesisClientLibDependencyError Encountered an issue when storing the pending checkpoint. The + * application can backoff and retry. + */ + prepareCheckpoint() (*IPreparedCheckpointer, error) + + /** + * This method will record a pending checkpoint at the at the provided record. This method is analogous to + * {@link #prepareCheckpoint()} but provides the ability to specify the record at which to prepare the checkpoint. + * + * @param record A record at which to prepare checkpoint in this shard. + * + * Application should use this to assist with idempotency across failover by calling prepareCheckpoint before having + * side effects, then by calling checkpoint on the returned PreparedCheckpointer after side effects are complete. + * Use the sequence number and application state passed in to init() to behave idempotently. + * + * @return an IPreparedCheckpointer object that can be called later to persist the checkpoint. + * + * @error ThrottlingError Can't store pending checkpoint. Can be caused by checkpointing too frequently. + * Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency. + * @error ShutdownError The record processor instance has been shutdown. Another instance may have + * started processing some of these records already. + * The application should abort processing via this RecordProcessor instance. + * @error InvalidStateError Can't store pending checkpoint. + * Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist). + * @error KinesisClientLibDependencyError Encountered an issue when storing the pending checkpoint. The + * application can backoff and retry. + * @error IllegalArgumentError The sequence number is invalid for one of the following reasons: + * 1.) It appears to be out of range, i.e. it is smaller than the last check point value, or larger than the + * greatest sequence number seen by the associated record processor. + * 2.) It is not a valid sequence number for a record in this shard. + */ + prepareCheckpointByRecord(record *ks.Record) (*IPreparedCheckpointer, error) + + /** + * This method will record a pending checkpoint at the provided sequenceNumber. This method is analogous to + * {@link #prepareCheckpoint()} but provides the ability to specify the sequence number at which to checkpoint. + * + * @param sequenceNumber A sequence number at which to prepare checkpoint in this shard. + + * @return an IPreparedCheckpointer object that can be called later to persist the checkpoint. + * + * @error ThrottlingError Can't store pending checkpoint. Can be caused by checkpointing too frequently. + * Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency. + * @error ShutdownError The record processor instance has been shutdown. Another instance may have + * started processing some of these records already. + * The application should abort processing via this RecordProcessor instance. + * @error InvalidStateError Can't store pending checkpoint. + * Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist). + * @error KinesisClientLibDependencyError Encountered an issue when storing the pending checkpoint. The + * application can backoff and retry. + * @error IllegalArgumentError The sequence number is invalid for one of the following reasons: + * 1.) It appears to be out of range, i.e. it is smaller than the last check point value, or larger than the + * greatest sequence number seen by the associated record processor. + * 2.) It is not a valid sequence number for a record in this shard. + */ + prepareCheckpointBySequenceNumber(sequenceNumber string) (*IPreparedCheckpointer, error) + + /** + * This method will record a pending checkpoint at the provided sequenceNumber and subSequenceNumber, the latter for + * aggregated records produced with the Producer Library. This method is analogous to {@link #prepareCheckpoint()} + * but provides the ability to specify the sequence number at which to checkpoint + * + * @param sequenceNumber A sequence number at which to prepare checkpoint in this shard. + * @param subSequenceNumber A subsequence number at which to prepare checkpoint within this shard. + * + * @return an IPreparedCheckpointer object that can be called later to persist the checkpoint. + * + * @error ThrottlingError Can't store pending checkpoint. Can be caused by checkpointing too frequently. + * Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency. + * @error ShutdownError The record processor instance has been shutdown. Another instance may have + * started processing some of these records already. + * The application should abort processing via this RecordProcessor instance. + * @error InvalidStateError Can't store pending checkpoint. + * Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist). + * @error KinesisClientLibDependencyError Encountered an issue when storing the pending checkpoint. The + * application can backoff and retry. + * @error IllegalArgumentError The sequence number is invalid for one of the following reasons: + * 1.) It appears to be out of range, i.e. it is smaller than the last check point value, or larger than the + * greatest sequence number seen by the associated record processor. + * 2.) It is not a valid sequence number for a record in this shard. + */ + prepareCheckpointBySequenceNumberEx(sequenceNumber string, subSequenceNumber int64) (*IPreparedCheckpointer, error) + } +) diff --git a/src/clientlibrary/interfaces/record-processor.go b/src/clientlibrary/interfaces/record-processor.go new file mode 100644 index 0000000..ab704a2 --- /dev/null +++ b/src/clientlibrary/interfaces/record-processor.go @@ -0,0 +1,44 @@ +package interfaces + +import ( + . "clientlibrary/types" +) + +// IRecordProcessor is the interface for some callback functions invoked by KCL will +// The main task of using KCL is to provide implementation on IRecordProcessor interface. +// Note: This is exactly the same interface as Amazon KCL IRecordProcessor v2 +type IRecordProcessor interface { + /** + * Invoked by the Amazon Kinesis Client Library before data records are delivered to the RecordProcessor instance + * (via processRecords). + * + * @param initializationInput Provides information related to initialization + */ + initialize(initializationInput InitializationInput) + + /** + * Process data records. The Amazon Kinesis Client Library will invoke this method to deliver data records to the + * application. + * Upon fail over, the new instance will get records with sequence number > checkpoint position + * for each partition key. + * + * @param processRecordsInput Provides the records to be processed as well as information and capabilities related + * to them (eg checkpointing). + */ + processRecords(processRecordsInput ProcessRecordsInput) + + /** + * Invoked by the Amazon Kinesis Client Library to indicate it will no longer send data records to this + * RecordProcessor instance. + * + *

Warning

+ * + * When the value of {@link ShutdownInput#getShutdownReason()} is + * {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason#TERMINATE} it is required that you + * checkpoint. Failure to do so will result in an IllegalArgumentException, and the KCL no longer making progress. + * + * @param shutdownInput + * Provides information and capabilities (eg checkpointing) related to shutdown of this record processor. + */ + shutdown(shutdownInput ShutdownInput) +} diff --git a/src/clientlibrary/lib/checkpoint/checkpoint.go b/src/clientlibrary/lib/checkpoint/checkpoint.go new file mode 100644 index 0000000..9f0facc --- /dev/null +++ b/src/clientlibrary/lib/checkpoint/checkpoint.go @@ -0,0 +1,26 @@ +package checkpoint + +import ( + . "clientlibrary/types" +) + +const ( + // TRIM_HORIZON starts from the first available record in the shard. + TRIM_HORIZON = SentinelCheckpoint(iota + 1) + // LATEST starts from the latest record in the shard. + LATEST + // SHARD_END We've completely processed all records in this shard. + SHARD_END + // AT_TIMESTAMP starts from the record at or after the specified server-side timestamp. + AT_TIMESTAMP +) + +type ( + SentinelCheckpoint int + + // Checkpoint: a class encapsulating the 2 pieces of state stored in a checkpoint. + Checkpoint struct { + checkpoint *ExtendedSequenceNumber + pendingCheckpoint *ExtendedSequenceNumber + } +) diff --git a/src/clientlibrary/lib/worker/worker.go b/src/clientlibrary/lib/worker/worker.go new file mode 100644 index 0000000..4df0094 --- /dev/null +++ b/src/clientlibrary/lib/worker/worker.go @@ -0,0 +1 @@ +package worker diff --git a/src/clientlibrary/types/inputs.go b/src/clientlibrary/types/inputs.go new file mode 100644 index 0000000..9668e64 --- /dev/null +++ b/src/clientlibrary/types/inputs.go @@ -0,0 +1,39 @@ +package types + +import ( + "time" + + ks "github.com/aws/aws-sdk-go/service/kinesis" + + . "clientlibrary/interfaces" +) + +const ( + REQUESTED = ShutdownReason(1) + TERMINATE = REQUESTED + 1 + ZOMBIE = TERMINATE + 1 +) + +// Containers for the parameters to the IRecordProcessor +type ( + ShutdownReason int + + InitializationInput struct { + shardId string + extendedSequenceNumber *ExtendedSequenceNumber + pendingCheckpointSequenceNumber *ExtendedSequenceNumber + } + + ProcessRecordsInput struct { + cacheEntryTime *time.Time + cacheExitTime *time.Time + records []*ks.Record + checkpointer *IRecordProcessorCheckpointer + millisBehindLatest int64 + } + + ShutdownInput struct { + shutdownReason ShutdownReason + checkpointer *IRecordProcessorCheckpointer + } +) diff --git a/src/clientlibrary/types/sequence-number.go b/src/clientlibrary/types/sequence-number.go new file mode 100644 index 0000000..0dddb57 --- /dev/null +++ b/src/clientlibrary/types/sequence-number.go @@ -0,0 +1,11 @@ +package types + +// ExtendedSequenceNumber represents a two-part sequence number for records aggregated by the Kinesis Producer Library. +// +// The KPL combines multiple user records into a single Kinesis record. Each user record therefore has an integer +// sub-sequence number, in addition to the regular sequence number of the Kinesis record. The sub-sequence number +// is used to checkpoint within an aggregated record. +type ExtendedSequenceNumber struct { + sequenceNumber string + subSequenceNumber int64 +} diff --git a/src/clientlibrary/utils/uuid.go b/src/clientlibrary/utils/uuid.go new file mode 100644 index 0000000..64883b8 --- /dev/null +++ b/src/clientlibrary/utils/uuid.go @@ -0,0 +1,14 @@ +package utils + +import ( + guuid "github.com/google/uuid" +) + +// MustNewUUID generates a new UUID and panics if failed +func MustNewUUID() string { + id, err := guuid.NewUUID() + if err != nil { + panic(err) + } + return id.String() +} diff --git a/src/common/errors.go b/src/common/errors.go new file mode 100644 index 0000000..f9ab2af --- /dev/null +++ b/src/common/errors.go @@ -0,0 +1,146 @@ +package common + +import ( + "fmt" + "net/http" +) + +// ErrorCode is unified definition of numerical error codes +type ErrorCode int32 + +// pre-defined error codes +const ( + // System Wide 20000 - 20199 + KinesisClientLibError ErrorCode = 20000 + + // KinesisClientLibrary Retryable Errors 20001 - 20099 + KinesisClientLibRetryableError ErrorCode = 20001 + + KinesisClientLibIOError ErrorCode = 20002 + BlockedOnParentShardError ErrorCode = 20003 + KinesisClientLibDependencyError ErrorCode = 20004 + ThrottlingError ErrorCode = 20005 + + // KinesisClientLibrary NonRetryable Errors 20100 - 20149 + KinesisClientLibNonRetryableException ErrorCode = 20000 + + InvalidStateError ErrorCode = 20101 + ShutdownError ErrorCode = 20102 + + // Kinesis Lease Errors 20150 - 20199 + LeasingError ErrorCode = 20150 + + LeasingInvalidStateError ErrorCode = 20151 + LeasingDependencyError ErrorCode = 20152 + LeasingProvisionedThroughputError ErrorCode = 20153 + + // Error indicates passing illegal or inappropriate argument + IllegalArgumentError ErrorCode = 20198 + + // NotImplemented + KinesisClientLibNotImplemented ErrorCode = 20199 +) + +var errorMap = map[ErrorCode]ClientLibraryError{ + KinesisClientLibError: {ErrorCode: KinesisClientLibError, Retryable: true, Status: http.StatusServiceUnavailable, Msg: "Top level error of Kinesis Client Library"}, + + // Retryable + KinesisClientLibRetryableError: {ErrorCode: KinesisClientLibRetryableError, Retryable: true, Status: http.StatusServiceUnavailable, Msg: "Retryable exceptions (e.g. transient errors). The request/operation is expected to succeed upon (back off and) retry."}, + KinesisClientLibIOError: {ErrorCode: KinesisClientLibIOError, Retryable: true, Status: http.StatusServiceUnavailable, Msg: "Error in reading/writing information (e.g. shard information from Kinesis may not be current/complete)."}, + BlockedOnParentShardError: {ErrorCode: BlockedOnParentShardError, Retryable: true, Status: http.StatusServiceUnavailable, Msg: "Cannot start processing data for a shard because the data from the parent shard has not been completely processed (yet)."}, + KinesisClientLibDependencyError: {ErrorCode: KinesisClientLibDependencyError, Retryable: true, Status: http.StatusServiceUnavailable, Msg: "Cannot talk to its dependencies (e.g. fetching data from Kinesis, DynamoDB table reads/writes, emitting metrics to CloudWatch)."}, + ThrottlingError: {ErrorCode: ThrottlingError, Retryable: true, Status: http.StatusTooManyRequests, Msg: "Requests are throttled by a service (e.g. DynamoDB when storing a checkpoint)."}, + + // Non-Retryable + KinesisClientLibNonRetryableException: {ErrorCode: KinesisClientLibNonRetryableException, Retryable: false, Status: http.StatusServiceUnavailable, Msg: "Non-retryable exceptions. Simply retrying the same request/operation is not expected to succeed."}, + InvalidStateError: {ErrorCode: InvalidStateError, Retryable: false, Status: http.StatusServiceUnavailable, Msg: "Kinesis Library has issues with internal state (e.g. DynamoDB table is not found)."}, + ShutdownError: {ErrorCode: ShutdownError, Retryable: false, Status: http.StatusServiceUnavailable, Msg: "The RecordProcessor instance has been shutdown (e.g. and attempts a checkpiont)."}, + + // Leasing + LeasingError: {ErrorCode: LeasingError, Retryable: true, Status: http.StatusServiceUnavailable, Msg: "Top-level error type for the leasing code."}, + LeasingInvalidStateError: {ErrorCode: LeasingInvalidStateError, Retryable: true, Status: http.StatusServiceUnavailable, Msg: "Error in a lease operation has failed because DynamoDB is an invalid state"}, + LeasingDependencyError: {ErrorCode: LeasingDependencyError, Retryable: true, Status: http.StatusServiceUnavailable, Msg: "Error in a lease operation has failed because a dependency of the leasing system has failed."}, + LeasingProvisionedThroughputError: {ErrorCode: LeasingProvisionedThroughputError, Retryable: false, Status: http.StatusServiceUnavailable, Msg: "Error in a lease operation has failed due to lack of provisioned throughput for a DynamoDB table."}, + + // IllegalArgumentError + IllegalArgumentError: {ErrorCode: IllegalArgumentError, Retryable: false, Status: http.StatusBadRequest, Msg: "Error indicates that a method has been passed an illegal or inappropriate argument."}, + + // Not Implemented + KinesisClientLibNotImplemented: {ErrorCode: KinesisClientLibNotImplemented, Retryable: false, Status: http.StatusNotImplemented, Msg: "Not Implemented"}, +} + +// Message returns the message of the error code +func (c ErrorCode) Message() string { + return errorMap[c].Msg +} + +// MakeErr makes an error with default message +func (c ErrorCode) MakeErr() *ClientLibraryError { + e := errorMap[c] + return &e +} + +// MakeError makes an error with message and data +func (c ErrorCode) MakeError(detail string) error { + e := errorMap[c] + return e.WithDetail(detail) +} + +// ClientLibraryError is unified error +type ClientLibraryError struct { + // ErrorCode is the numerical error code. + ErrorCode `json:"code"` + // Retryable is a bool flag to indicate the whether the error is retryable or not. + Retryable bool `json:"tryable"` + // Status is the HTTP status code. + Status int `json:"status"` + // Msg provides a terse description of the error. Its value is defined in errorMap. + Msg string `json:"msg"` + // Detail provides a detailed description of the error. Its value is set using WithDetail. + Detail string `json:"detail"` +} + +// Error implements error +func (e *ClientLibraryError) Error() string { + var prefix string + if e.Retryable { + prefix = "Retryable" + } else { + prefix = "NonRetryable" + } + msg := fmt.Sprintf("%v Error [%d]: %s", prefix, int32(e.ErrorCode), e.Msg) + if e.Detail != "" { + msg = fmt.Sprintf("%s, detail: %s", msg, e.Detail) + } + return msg +} + +// WithMsg overwrites the default error message +func (e *ClientLibraryError) WithMsg(format string, v ...interface{}) *ClientLibraryError { + e.Msg = fmt.Sprintf(format, v...) + return e +} + +// WithDetail adds a detailed message to error +func (e *ClientLibraryError) WithDetail(format string, v ...interface{}) *ClientLibraryError { + if len(e.Detail) == 0 { + e.Detail = fmt.Sprintf(format, v...) + } else { + e.Detail += ", " + fmt.Sprintf(format, v...) + } + return e +} + +// WithCause adds CauseBy to error +func (e *ClientLibraryError) WithCause(err error) *ClientLibraryError { + if err != nil { + // Store error message in Detail, so the info can be preserved + // when CascadeError is marshaled to json. + if len(e.Detail) == 0 { + e.Detail = err.Error() + } else { + e.Detail += ", cause: " + err.Error() + } + } + return e +} diff --git a/src/vendor/manifest b/src/vendor/manifest index 9d112c1..1b8ad4e 100644 --- a/src/vendor/manifest +++ b/src/vendor/manifest @@ -159,7 +159,7 @@ "importpath": "gopkg.in/yaml.v2", "repository": "https://gopkg.in/yaml.v2", "vcs": "git", - "revision": "cd8b52f8269e0feb286dfeef29f8fe4d5b397e0b", + "revision": "5420a8b6744d3b0345ab293f6fcba19c978f1183", "branch": "v2", "notests": true }