diff --git a/src/common/errors.go b/src/clientlibrary/common/errors.go similarity index 84% rename from src/common/errors.go rename to src/clientlibrary/common/errors.go index f9ab2af..adddc26 100644 --- a/src/common/errors.go +++ b/src/clientlibrary/common/errors.go @@ -10,35 +10,36 @@ type ErrorCode int32 // pre-defined error codes const ( - // System Wide 20000 - 20199 - KinesisClientLibError ErrorCode = 20000 + // System Wide 41000 - 42000 + KinesisClientLibError ErrorCode = 41000 - // KinesisClientLibrary Retryable Errors 20001 - 20099 - KinesisClientLibRetryableError ErrorCode = 20001 + // KinesisClientLibrary Retryable Errors 41001 - 41100 + KinesisClientLibRetryableError ErrorCode = 41001 - KinesisClientLibIOError ErrorCode = 20002 - BlockedOnParentShardError ErrorCode = 20003 - KinesisClientLibDependencyError ErrorCode = 20004 - ThrottlingError ErrorCode = 20005 + KinesisClientLibIOError ErrorCode = 41002 + BlockedOnParentShardError ErrorCode = 41003 + KinesisClientLibDependencyError ErrorCode = 41004 + ThrottlingError ErrorCode = 41005 - // KinesisClientLibrary NonRetryable Errors 20100 - 20149 - KinesisClientLibNonRetryableException ErrorCode = 20000 + // KinesisClientLibrary NonRetryable Errors 41100 - 41200 + KinesisClientLibNonRetryableException ErrorCode = 41100 - InvalidStateError ErrorCode = 20101 - ShutdownError ErrorCode = 20102 + InvalidStateError ErrorCode = 41101 + ShutdownError ErrorCode = 41102 - // Kinesis Lease Errors 20150 - 20199 - LeasingError ErrorCode = 20150 + // Kinesis Lease Errors 41200 - 41300 + LeasingError ErrorCode = 41200 - LeasingInvalidStateError ErrorCode = 20151 - LeasingDependencyError ErrorCode = 20152 - LeasingProvisionedThroughputError ErrorCode = 20153 + LeasingInvalidStateError ErrorCode = 41201 + LeasingDependencyError ErrorCode = 41202 + LeasingProvisionedThroughputError ErrorCode = 41203 - // Error indicates passing illegal or inappropriate argument - IllegalArgumentError ErrorCode = 20198 + // Misc Errors 41300 - 41400 + // NotImplemented + KinesisClientLibNotImplemented ErrorCode = 41301 - // NotImplemented - KinesisClientLibNotImplemented ErrorCode = 20199 + // Error indicates passing illegal or inappropriate argument + IllegalArgumentError ErrorCode = 41302 ) var errorMap = map[ErrorCode]ClientLibraryError{ diff --git a/src/clientlibrary/config/config.go b/src/clientlibrary/config/config.go index add0c4b..e2c82c5 100644 --- a/src/clientlibrary/config/config.go +++ b/src/clientlibrary/config/config.go @@ -14,7 +14,7 @@ const ( LATEST = InitialPositionInStream(1) // TRIM_HORIZON start from the oldest available data record TRIM_HORIZON = LATEST + 1 - // AT_TIMESTAMP start from the record at or after the specified server-side timestamp. + // AT_TIMESTAMP start from the record at or after the specified server-side Timestamp. AT_TIMESTAMP = TRIM_HORIZON + 1 // The location in the shard from which the KinesisClientLibrary will start fetching records from @@ -99,14 +99,14 @@ const ( ) type ( - // InitialPositionInStream Used to specify the position in the stream where a new application should start from + // InitialPositionInStream Used to specify the Position in the stream where a new application should start from // This is used during initial application bootstrap (when a checkpoint doesn't exist for a shard or its parents) InitialPositionInStream int - // Class that houses the entities needed to specify the position in the stream from where a new application should + // Class that houses the entities needed to specify the Position in the stream from where a new application should // start. InitialPositionInStreamExtended struct { - position InitialPositionInStream + Position InitialPositionInStream // The time stamp of the data record from which to start reading. Used with // shard iterator type AT_TIMESTAMP. A time stamp is the Unix epoch date with @@ -115,100 +115,103 @@ type ( // iterator returned is for the next (later) record. If the time stamp is older // than the current trim horizon, the iterator returned is for the oldest untrimmed // data record (TRIM_HORIZON). - timestamp *time.Time `type:"timestamp" timestampFormat:"unix"` + Timestamp *time.Time `type:"Timestamp" timestampFormat:"unix"` } // Configuration for the Kinesis Client Library. KinesisClientLibConfiguration struct { - // applicationName is name of application. Kinesis allows multiple applications to consume the same stream. - applicationName string + // ApplicationName is name of application. Kinesis allows multiple applications to consume the same stream. + ApplicationName string - // tableName is name of the dynamo db table for managing kinesis stream default to applicationName - tableName string + // TableName is name of the dynamo db table for managing kinesis stream default to ApplicationName + TableName string - // streamName is the name of Kinesis stream - streamName string + // StreamName is the name of Kinesis stream + StreamName string - // workerID used to distinguish different workers/processes of a Kinesis application - workerID string + // WorkerID used to distinguish different workers/processes of a Kinesis application + WorkerID string - // kinesisEndpoint endpoint - kinesisEndpoint string + // KinesisEndpoint endpoint + KinesisEndpoint string - // dynamoDB endpoint - dynamoDBEndpoint string + // DynamoDB endpoint + DynamoDBEndpoint string - // initialPositionInStream specifies the position in the stream where a new application should start from - initialPositionInStream InitialPositionInStream + // InitialPositionInStream specifies the Position in the stream where a new application should start from + InitialPositionInStream InitialPositionInStream - // initialPositionInStreamExtended provides actual AT_TMESTAMP value - initialPositionInStreamExtended InitialPositionInStreamExtended + // InitialPositionInStreamExtended provides actual AT_TMESTAMP value + InitialPositionInStreamExtended InitialPositionInStreamExtended // credentials to access Kinesis/Dynamo/CloudWatch: https://docs.aws.amazon.com/sdk-for-go/api/aws/credentials/ // Note: No need to configure here. Use NewEnvCredentials for testing and EC2RoleProvider for production - // failoverTimeMillis Lease duration (leases not renewed within this period will be claimed by others) - failoverTimeMillis int + // FailoverTimeMillis Lease duration (leases not renewed within this period will be claimed by others) + FailoverTimeMillis int - /// maxRecords Max records to read per Kinesis getRecords() call - maxRecords int + /// MaxRecords Max records to read per Kinesis getRecords() call + MaxRecords int - // idleTimeBetweenReadsInMillis Idle time between calls to fetch data from Kinesis - idleTimeBetweenReadsInMillis int + // IdleTimeBetweenReadsInMillis Idle time between calls to fetch data from Kinesis + IdleTimeBetweenReadsInMillis int - // callProcessRecordsEvenForEmptyRecordList Call the IRecordProcessor::processRecords() API even if + // CallProcessRecordsEvenForEmptyRecordList Call the IRecordProcessor::processRecords() API even if // GetRecords returned an empty record list. - callProcessRecordsEvenForEmptyRecordList bool + CallProcessRecordsEvenForEmptyRecordList bool - // parentShardPollIntervalMillis Wait for this long between polls to check if parent shards are done - parentShardPollIntervalMillis int + // ParentShardPollIntervalMillis Wait for this long between polls to check if parent shards are done + ParentShardPollIntervalMillis int - // shardSyncIntervalMillis Time between tasks to sync leases and Kinesis shards - shardSyncIntervalMillis int + // ShardSyncIntervalMillis Time between tasks to sync leases and Kinesis shards + ShardSyncIntervalMillis int - // cleanupTerminatedShardsBeforeExpiry Clean up shards we've finished processing (don't wait for expiration) - cleanupTerminatedShardsBeforeExpiry bool + // CleanupTerminatedShardsBeforeExpiry Clean up shards we've finished processing (don't wait for expiration) + CleanupTerminatedShardsBeforeExpiry bool // kinesisClientConfig Client Configuration used by Kinesis client // dynamoDBClientConfig Client Configuration used by DynamoDB client // cloudWatchClientConfig Client Configuration used by CloudWatch client // Note: we will use default client provided by AWS SDK - // taskBackoffTimeMillis Backoff period when tasks encounter an exception - taskBackoffTimeMillis int + // TaskBackoffTimeMillis Backoff period when tasks encounter an exception + TaskBackoffTimeMillis int - // metricsBufferTimeMillis Metrics are buffered for at most this long before publishing to CloudWatch - metricsBufferTimeMillis int + // MetricsBufferTimeMillis Metrics are buffered for at most this long before publishing to CloudWatch + MetricsBufferTimeMillis int - // metricsMaxQueueSize Max number of metrics to buffer before publishing to CloudWatch - metricsMaxQueueSize int + // MetricsMaxQueueSize Max number of metrics to buffer before publishing to CloudWatch + MetricsMaxQueueSize int - // validateSequenceNumberBeforeCheckpointing whether KCL should validate client provided sequence numbers - validateSequenceNumberBeforeCheckpointing bool + // ValidateSequenceNumberBeforeCheckpointing whether KCL should validate client provided sequence numbers + ValidateSequenceNumberBeforeCheckpointing bool - // regionName The region name for the service - regionName string + // RegionName The region name for the service + RegionName string - // shutdownGraceMillis The number of milliseconds before graceful shutdown terminates forcefully - shutdownGraceMillis int + // ShutdownGraceMillis The number of milliseconds before graceful shutdown terminates forcefully + ShutdownGraceMillis int // Operation parameters // Max leases this Worker can handle at a time - maxLeasesForWorker int + MaxLeasesForWorker int // Max leases to steal at one time (for load balancing) - maxLeasesToStealAtOneTime int + MaxLeasesToStealAtOneTime int // Read capacity to provision when creating the lease table (dynamoDB). - initialLeaseTableReadCapacity int + InitialLeaseTableReadCapacity int // Write capacity to provision when creating the lease table. - initialLeaseTableWriteCapacity int + InitialLeaseTableWriteCapacity int // Worker should skip syncing shards and leases at startup if leases are present // This is useful for optimizing deployments to large fleets working on a stable stream. - skipShardSyncAtWorkerInitializationIfLeasesExist bool + SkipShardSyncAtWorkerInitializationIfLeasesExist bool + + // The max number of threads in the worker thread pool to getRecords. + WorkerThreadPoolSize int } ) diff --git a/src/clientlibrary/config/config_test.go b/src/clientlibrary/config/config_test.go index 30318e4..19f1481 100644 --- a/src/clientlibrary/config/config_test.go +++ b/src/clientlibrary/config/config_test.go @@ -7,7 +7,7 @@ import ( ) func TestConfig(t *testing.T) { - kclConfig := NewKinesisClientLibConfig("appName", "streamName", "workerId"). + kclConfig := NewKinesisClientLibConfig("appName", "StreamName", "workerId"). WithFailoverTimeMillis(500). WithMaxRecords(100). WithInitialPositionInStream(TRIM_HORIZON). @@ -18,6 +18,6 @@ func TestConfig(t *testing.T) { WithMetricsMaxQueueSize(200). WithRegionName("us-west-2") - assert.Equal(t, "appName", kclConfig.applicationName) - assert.Equal(t, "500", kclConfig.failoverTimeMillis) + assert.Equal(t, "appName", kclConfig.ApplicationName) + assert.Equal(t, "500", kclConfig.FailoverTimeMillis) } diff --git a/src/clientlibrary/config/initial-stream-pos.go b/src/clientlibrary/config/initial-stream-pos.go index 54e9d39..20ecbfc 100644 --- a/src/clientlibrary/config/initial-stream-pos.go +++ b/src/clientlibrary/config/initial-stream-pos.go @@ -5,9 +5,9 @@ import ( ) func newInitialPositionAtTimestamp(timestamp *time.Time) *InitialPositionInStreamExtended { - return &InitialPositionInStreamExtended{position: AT_TIMESTAMP, timestamp: timestamp} + return &InitialPositionInStreamExtended{Position: AT_TIMESTAMP, Timestamp: timestamp} } func newInitialPosition(position InitialPositionInStream) *InitialPositionInStreamExtended { - return &InitialPositionInStreamExtended{position: position, timestamp: nil} + return &InitialPositionInStreamExtended{Position: position, Timestamp: nil} } diff --git a/src/clientlibrary/config/kcl-config.go b/src/clientlibrary/config/kcl-config.go index bbe8e6a..2f8bc0b 100644 --- a/src/clientlibrary/config/kcl-config.go +++ b/src/clientlibrary/config/kcl-config.go @@ -7,9 +7,9 @@ import ( // NewKinesisClientLibConfig to create a default KinesisClientLibConfiguration based on the required fields. func NewKinesisClientLibConfig(applicationName, streamName, workerID string) *KinesisClientLibConfiguration { - checkIsValueNotEmpty("applicationName", applicationName) - checkIsValueNotEmpty("streamName", streamName) - checkIsValueNotEmpty("applicationName", applicationName) + checkIsValueNotEmpty("ApplicationName", applicationName) + checkIsValueNotEmpty("StreamName", streamName) + checkIsValueNotEmpty("ApplicationName", applicationName) if empty(workerID) { workerID = utils.MustNewUUID() @@ -17,72 +17,73 @@ func NewKinesisClientLibConfig(applicationName, streamName, workerID string) *Ki // populate the KCL configuration with default values return &KinesisClientLibConfiguration{ - applicationName: applicationName, - tableName: applicationName, - streamName: streamName, - workerID: workerID, - kinesisEndpoint: "", - initialPositionInStream: DEFAULT_INITIAL_POSITION_IN_STREAM, - initialPositionInStreamExtended: *newInitialPosition(DEFAULT_INITIAL_POSITION_IN_STREAM), - failoverTimeMillis: DEFAULT_FAILOVER_TIME_MILLIS, - maxRecords: DEFAULT_MAX_RECORDS, - idleTimeBetweenReadsInMillis: DEFAULT_IDLETIME_BETWEEN_READS_MILLIS, - callProcessRecordsEvenForEmptyRecordList: DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST, - parentShardPollIntervalMillis: DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS, - shardSyncIntervalMillis: DEFAULT_SHARD_SYNC_INTERVAL_MILLIS, - cleanupTerminatedShardsBeforeExpiry: DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION, - taskBackoffTimeMillis: DEFAULT_TASK_BACKOFF_TIME_MILLIS, - metricsBufferTimeMillis: DEFAULT_METRICS_BUFFER_TIME_MILLIS, - metricsMaxQueueSize: DEFAULT_METRICS_MAX_QUEUE_SIZE, - validateSequenceNumberBeforeCheckpointing: DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING, - regionName: "", - shutdownGraceMillis: DEFAULT_SHUTDOWN_GRACE_MILLIS, - maxLeasesForWorker: DEFAULT_MAX_LEASES_FOR_WORKER, - maxLeasesToStealAtOneTime: DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME, - initialLeaseTableReadCapacity: DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY, - initialLeaseTableWriteCapacity: DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY, - skipShardSyncAtWorkerInitializationIfLeasesExist: DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, + ApplicationName: applicationName, + TableName: applicationName, + StreamName: streamName, + WorkerID: workerID, + KinesisEndpoint: "", + InitialPositionInStream: DEFAULT_INITIAL_POSITION_IN_STREAM, + InitialPositionInStreamExtended: *newInitialPosition(DEFAULT_INITIAL_POSITION_IN_STREAM), + FailoverTimeMillis: DEFAULT_FAILOVER_TIME_MILLIS, + MaxRecords: DEFAULT_MAX_RECORDS, + IdleTimeBetweenReadsInMillis: DEFAULT_IDLETIME_BETWEEN_READS_MILLIS, + CallProcessRecordsEvenForEmptyRecordList: DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST, + ParentShardPollIntervalMillis: DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS, + ShardSyncIntervalMillis: DEFAULT_SHARD_SYNC_INTERVAL_MILLIS, + CleanupTerminatedShardsBeforeExpiry: DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION, + TaskBackoffTimeMillis: DEFAULT_TASK_BACKOFF_TIME_MILLIS, + MetricsBufferTimeMillis: DEFAULT_METRICS_BUFFER_TIME_MILLIS, + MetricsMaxQueueSize: DEFAULT_METRICS_MAX_QUEUE_SIZE, + ValidateSequenceNumberBeforeCheckpointing: DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING, + RegionName: "", + ShutdownGraceMillis: DEFAULT_SHUTDOWN_GRACE_MILLIS, + MaxLeasesForWorker: DEFAULT_MAX_LEASES_FOR_WORKER, + MaxLeasesToStealAtOneTime: DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME, + InitialLeaseTableReadCapacity: DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY, + InitialLeaseTableWriteCapacity: DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY, + SkipShardSyncAtWorkerInitializationIfLeasesExist: DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, + WorkerThreadPoolSize: 1, } } // WithTableName to provide alternative lease table in DynamoDB func (c *KinesisClientLibConfiguration) WithTableName(tableName string) *KinesisClientLibConfiguration { - c.tableName = tableName + c.TableName = tableName return c } func (c *KinesisClientLibConfiguration) WithKinesisEndpoint(kinesisEndpoint string) *KinesisClientLibConfiguration { - c.kinesisEndpoint = kinesisEndpoint + c.KinesisEndpoint = kinesisEndpoint return c } func (c *KinesisClientLibConfiguration) WithInitialPositionInStream(initialPositionInStream InitialPositionInStream) *KinesisClientLibConfiguration { - c.initialPositionInStream = initialPositionInStream - c.initialPositionInStreamExtended = *newInitialPosition(initialPositionInStream) + c.InitialPositionInStream = initialPositionInStream + c.InitialPositionInStreamExtended = *newInitialPosition(initialPositionInStream) return c } func (c *KinesisClientLibConfiguration) WithTimestampAtInitialPositionInStream(timestamp *time.Time) *KinesisClientLibConfiguration { - c.initialPositionInStream = AT_TIMESTAMP - c.initialPositionInStreamExtended = *newInitialPositionAtTimestamp(timestamp) + c.InitialPositionInStream = AT_TIMESTAMP + c.InitialPositionInStreamExtended = *newInitialPositionAtTimestamp(timestamp) return c } func (c *KinesisClientLibConfiguration) WithFailoverTimeMillis(failoverTimeMillis int) *KinesisClientLibConfiguration { checkIsValuePositive("FailoverTimeMillis", failoverTimeMillis) - c.failoverTimeMillis = failoverTimeMillis + c.FailoverTimeMillis = failoverTimeMillis return c } func (c *KinesisClientLibConfiguration) WithShardSyncIntervalMillis(shardSyncIntervalMillis int) *KinesisClientLibConfiguration { checkIsValuePositive("ShardSyncIntervalMillis", shardSyncIntervalMillis) - c.shardSyncIntervalMillis = shardSyncIntervalMillis + c.ShardSyncIntervalMillis = shardSyncIntervalMillis return c } func (c *KinesisClientLibConfiguration) WithMaxRecords(maxRecords int) *KinesisClientLibConfiguration { checkIsValuePositive("MaxRecords", maxRecords) - c.maxRecords = maxRecords + c.MaxRecords = maxRecords return c } @@ -102,46 +103,52 @@ func (c *KinesisClientLibConfiguration) WithMaxRecords(maxRecords int) *KinesisC * Metric: GetRecords.MillisBehindLatest *

* - * @param idleTimeBetweenReadsInMillis + * @param IdleTimeBetweenReadsInMillis * how long to sleep between GetRecords calls when no records are returned. * @return KinesisClientLibConfiguration */ func (c *KinesisClientLibConfiguration) WithIdleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis int) *KinesisClientLibConfiguration { checkIsValuePositive("IdleTimeBetweenReadsInMillis", idleTimeBetweenReadsInMillis) - c.idleTimeBetweenReadsInMillis = idleTimeBetweenReadsInMillis + c.IdleTimeBetweenReadsInMillis = idleTimeBetweenReadsInMillis return c } func (c *KinesisClientLibConfiguration) WithCallProcessRecordsEvenForEmptyRecordList(callProcessRecordsEvenForEmptyRecordList bool) *KinesisClientLibConfiguration { - c.callProcessRecordsEvenForEmptyRecordList = callProcessRecordsEvenForEmptyRecordList + c.CallProcessRecordsEvenForEmptyRecordList = callProcessRecordsEvenForEmptyRecordList return c } func (c *KinesisClientLibConfiguration) WithTaskBackoffTimeMillis(taskBackoffTimeMillis int) *KinesisClientLibConfiguration { - checkIsValuePositive("taskBackoffTimeMillis", taskBackoffTimeMillis) - c.taskBackoffTimeMillis = taskBackoffTimeMillis + checkIsValuePositive("TaskBackoffTimeMillis", taskBackoffTimeMillis) + c.TaskBackoffTimeMillis = taskBackoffTimeMillis return c } // WithMetricsBufferTimeMillis configures Metrics are buffered for at most this long before publishing to CloudWatch func (c *KinesisClientLibConfiguration) WithMetricsBufferTimeMillis(metricsBufferTimeMillis int) *KinesisClientLibConfiguration { - checkIsValuePositive("metricsBufferTimeMillis", metricsBufferTimeMillis) - c.metricsBufferTimeMillis = metricsBufferTimeMillis + checkIsValuePositive("MetricsBufferTimeMillis", metricsBufferTimeMillis) + c.MetricsBufferTimeMillis = metricsBufferTimeMillis return c } // WithMetricsMaxQueueSize configures Max number of metrics to buffer before publishing to CloudWatch func (c *KinesisClientLibConfiguration) WithMetricsMaxQueueSize(metricsMaxQueueSize int) *KinesisClientLibConfiguration { - checkIsValuePositive("metricsMaxQueueSize", metricsMaxQueueSize) - c.metricsMaxQueueSize = metricsMaxQueueSize + checkIsValuePositive("MetricsMaxQueueSize", metricsMaxQueueSize) + c.MetricsMaxQueueSize = metricsMaxQueueSize return c } // WithRegionName configures region for the stream func (c *KinesisClientLibConfiguration) WithRegionName(regionName string) *KinesisClientLibConfiguration { - checkIsValueNotEmpty("regionName", regionName) - c.regionName = regionName + checkIsValueNotEmpty("RegionName", regionName) + c.RegionName = regionName return c } -// Getters +// WithWorkerThreadPoolSize configures worker thread pool size +func (c *KinesisClientLibConfiguration) WithWorkerThreadPoolSize(n int) *KinesisClientLibConfiguration { + checkIsValuePositive("WorkerThreadPoolSize", n) + c.WorkerThreadPoolSize = n + return c +} + diff --git a/src/leases/dynamoutils/dynamoutils.go b/src/leases/dynamoutils/dynamoutils.go new file mode 100644 index 0000000..8d286be --- /dev/null +++ b/src/leases/dynamoutils/dynamoutils.go @@ -0,0 +1,78 @@ +package util + +import ( + "strconv" + + "clientlibrary/common" + "github.com/aws/aws-sdk-go/service/dynamodb" +) + +/** + * Some static utility functions used by our LeaseSerializers. + */ + +func CreateAttributeValueFromSS(collectionValue []*string) (*dynamodb.AttributeValue, error) { + if len(collectionValue) == 0 { + return nil, common.IllegalArgumentError.MakeErr().WithDetail("Collection attributeValues cannot be null or empty.") + } + + attrib := &dynamodb.AttributeValue{} + attrib.SetSS(collectionValue) + + return attrib, nil +} + +func CreateAttributeValueFromString(stringValue string) (*dynamodb.AttributeValue, error) { + if len(stringValue) == 0 { + return nil, common.IllegalArgumentError.MakeErr().WithDetail("String attributeValues cannot be null or empty.") + } + + attrib := &dynamodb.AttributeValue{} + attrib.SetS(stringValue) + + return attrib, nil +} + +func CreateAttributeValueFromLong(longValue int64) (*dynamodb.AttributeValue, error) { + attrib := &dynamodb.AttributeValue{} + attrib.SetN(strconv.FormatInt(longValue, 10)) + + return attrib, nil +} + +func SafeGetLong(dynamoRecord map[string]*dynamodb.AttributeValue, key string) int64 { + av := dynamoRecord[key] + + if av == nil || av.N == nil { + return 0 + } + + var val int64 + val, err := strconv.ParseInt(*av.N, 10, 64) + + if err != nil { + return 0 + } + + return val +} + +func SafeGetString(dynamoRecord map[string]*dynamodb.AttributeValue, key string) *string { + av := dynamoRecord[key] + if av == nil { + return nil + } + + return av.S +} + +func SafeGetSS(dynamoRecord map[string]*dynamodb.AttributeValue, key string) []*string { + av := dynamoRecord[key] + + if av == nil { + var emptyslice []*string + return emptyslice + } + + return av.SS +} diff --git a/src/leases/impl/kinesis-client-lease.go b/src/leases/impl/kinesis-client-lease.go new file mode 100644 index 0000000..6132a40 --- /dev/null +++ b/src/leases/impl/kinesis-client-lease.go @@ -0,0 +1,116 @@ +package impl + +import ( + . "clientlibrary/types" +) + +// KinesisClientLease is a Lease subclass containing KinesisClientLibrary related fields for checkpoints. +type KinesisClientLease struct { + checkpoint *ExtendedSequenceNumber + pendingCheckpoint *ExtendedSequenceNumber + ownerSwitchesSinceCheckpoint int64 + parentShardIds *[]string + + // coreLease to hold lease information + // Note: golang doesn't support inheritance, use composition instead. + coreLease Lease +} + +// GetCheckpoint returns most recently application-supplied checkpoint value. During fail over, the new worker +// will pick up after the old worker's last checkpoint. +func (l *KinesisClientLease) GetCheckpoint() *ExtendedSequenceNumber { + return l.checkpoint +} + +// GetPendingCheckpoint returns pending checkpoint, possibly null. +func (l *KinesisClientLease) GetPendingCheckpoint() *ExtendedSequenceNumber { + return l.pendingCheckpoint +} + +// GetOwnerSwitchesSinceCheckpoint counts of distinct lease holders between checkpoints. +func (l *KinesisClientLease) GetOwnerSwitchesSinceCheckpoint() int64 { + return l.ownerSwitchesSinceCheckpoint +} + +// GetParentShardIds returns shardIds that parent this lease. Used for resharding. +func (l *KinesisClientLease) GetParentShardIds() *[]string { + return l.parentShardIds +} + +// SetCheckpoint +func (l *KinesisClientLease) SetCheckpoint(checkpoint *ExtendedSequenceNumber) { + l.checkpoint = checkpoint +} + +// SetPendingCheckpoint +func (l *KinesisClientLease) SetPendingCheckpoint(pendingCheckpoint *ExtendedSequenceNumber) { + l.pendingCheckpoint = pendingCheckpoint +} + +// SetOwnerSwitchesSinceCheckpoint +func (l *KinesisClientLease) SetOwnerSwitchesSinceCheckpoint(ownerSwitchesSinceCheckpoint int64) { + l.ownerSwitchesSinceCheckpoint = ownerSwitchesSinceCheckpoint +} + +// SetParentShardIds +func (l *KinesisClientLease) SetParentShardIds(parentShardIds *[]string) { + l.parentShardIds = parentShardIds +} + +// GetLeaseKey retrieves leaseKey - identifies the unit of work associated with this lease. +func (l *KinesisClientLease) GetLeaseKey() string { + return l.coreLease.GetLeaseKey() +} + +// GetLeaseOwner gets current owner of the lease, may be "". +func (l *KinesisClientLease) GetLeaseOwner() string { + return l.coreLease.GetLeaseOwner() +} + +// GetLeaseCounter retrieves leaseCounter which is incremented periodically by the holder of the lease. Used for optimistic locking. +func (l *KinesisClientLease) GetLeaseCounter() int64 { + return l.coreLease.GetLeaseCounter() +} + +// GetConcurrencyToken returns concurrency token +func (l *KinesisClientLease) GetConcurrencyToken() string { + return l.coreLease.GetConcurrencyToken() +} + +// GetLastCounterIncrementNanos returns concurrency token +func (l *KinesisClientLease) GetLastCounterIncrementNanos() int64 { + return l.coreLease.GetLastCounterIncrementNanos() +} + +// SetLeaseKey sets leaseKey - LeaseKey is immutable once set. +func (l *KinesisClientLease) SetLeaseKey(leaseKey string) error { + return l.coreLease.SetLeaseKey(leaseKey) +} + +// SetLeaseOwner set current owner of the lease, may be "". +func (l *KinesisClientLease) SetLeaseOwner(leaseOwner string) { + l.coreLease.SetLeaseOwner(leaseOwner) +} + +// SetLeaseCounter sets leaseCounter which is incremented periodically by the holder of the lease. Used for optimistic locking. +func (l *KinesisClientLease) SetLeaseCounter(leaseCounter int64) { + l.coreLease.SetLeaseCounter(leaseCounter) +} + +// SetConcurrencyToken +func (l *KinesisClientLease) SetConcurrencyToken(concurrencyToken string) { + l.coreLease.SetConcurrencyToken(concurrencyToken) +} + +// SetLastCounterIncrementNanos returns concurrency token +func (l *KinesisClientLease) SetLastCounterIncrementNanos(lastCounterIncrementNanos int64) { + l.coreLease.SetLastCounterIncrementNanos(lastCounterIncrementNanos) +} + +// IsExpired to check whether lease expired using +// @param leaseDurationNanos duration of lease in nanoseconds +// @param asOfNanos time in nanoseconds to check expiration as-of +// @return true if lease is expired as-of given time, false otherwise +func (l *KinesisClientLease) IsExpired(leaseDurationNanos, asOfNanos int64) bool { + return l.coreLease.IsExpired(leaseDurationNanos, asOfNanos) +} diff --git a/src/leases/impl/lease-manager.go b/src/leases/impl/lease-manager.go new file mode 100644 index 0000000..a447e11 --- /dev/null +++ b/src/leases/impl/lease-manager.go @@ -0,0 +1,440 @@ +package impl + +import ( + "log" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/dynamodb" + "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface" + + . "leases/interfaces" +) + +const ( + // CREATING - The table is being created. + TABLE_CREATING = "CREATING" + + // UPDATING - The table is being updated. + TABLE_UPDATING = "UPDATING" + + // DELETING - The table is being deleted. + TABLE_DELETING = "DELETING" + + // ACTIVE - The table is ready for use. + TABLE_ACTIVE = "ACTIVE" +) + +// LeaseManager is an implementation of ILeaseManager that uses DynamoDB. +type LeaseManager struct { + tableName string + dynamoDBClient dynamodbiface.DynamoDBAPI + serializer ILeaseSerializer + consistentReads bool +} + +func NewLeaseManager(tableName string, dynamoDBClient dynamodbiface.DynamoDBAPI, serializer ILeaseSerializer) *LeaseManager { + return &LeaseManager{ + tableName: tableName, + dynamoDBClient: dynamoDBClient, + serializer: serializer, + consistentReads: false, + } +} + +/** + * Creates the table that will store leases. Succeeds if table already exists. + * + * @param readCapacity + * @param writeCapacity + * + * @return true if we created a new table (table didn't exist before) + * + * @error ProvisionedThroughputError if we cannot create the lease table due to per-AWS-account capacity + * restrictions. + * @error LeasingDependencyError if DynamoDB createTable fails in an unexpected way + */ +func (l *LeaseManager) CreateLeaseTableIfNotExists(readCapacity, writeCapacity int64) (bool, error) { + status, _ := l.tableStatus() + + if status != nil { + return false, nil + } + + input := &dynamodb.CreateTableInput{ + AttributeDefinitions: l.serializer.GetAttributeDefinitions(), + KeySchema: l.serializer.GetKeySchema(), + ProvisionedThroughput: &dynamodb.ProvisionedThroughput{ + ReadCapacityUnits: aws.Int64(readCapacity), + WriteCapacityUnits: aws.Int64(writeCapacity), + }, + TableName: aws.String(l.tableName), + } + _, err := l.dynamoDBClient.CreateTable(input) + + if err != nil { + return false, err + } + return true, nil +} + +/** + * @return true if the lease table already exists. + * + * @error LeasingDependencyError if DynamoDB describeTable fails in an unexpected way + */ +func (l *LeaseManager) LeaseTableExists() (bool, error) { + status, _ := l.tableStatus() + + if status != nil || aws.StringValue(status) == TABLE_ACTIVE { + return true, nil + } + return false, nil +} + +/** + * Blocks until the lease table exists by polling leaseTableExists. + * + * @param secondsBetweenPolls time to wait between polls in seconds + * @param timeoutSeconds total time to wait in seconds + * + * @return true if table exists, false if timeout was reached + * + * @error LeasingDependencyError if DynamoDB describeTable fails in an unexpected way + */ +func (l *LeaseManager) WaitUntilLeaseTableExists(secondsBetweenPolls, timeoutSeconds int64) (bool, error) { + delay := time.Duration(secondsBetweenPolls) * time.Second + deadline := time.Now().Add(time.Duration(timeoutSeconds) * time.Second) + + var err error + for time.Now().Before(deadline) { + flag := false + flag, err = l.LeaseTableExists() + + if flag { + return true, nil + } + + time.Sleep(delay) + } + + return false, err +} + +/** + * List all objects in table synchronously. + * + * @error LeasingDependencyError if DynamoDB scan fails in an unexpected way + * @error LeasingInvalidStateError if lease table does not exist + * @error ProvisionedThroughputError if DynamoDB scan fails due to lack of capacity + * + * @return list of leases + */ +func (l *LeaseManager) ListLeases() ([]ILease, error) { + return l.list(0) +} + +/** + * Create a new lease. Conditional on a lease not already existing with this shardId. + * + * @param lease the lease to create + * + * @return true if lease was created, false if lease already exists + * + * @error LeasingDependencyError if DynamoDB put fails in an unexpected way + * @error LeasingInvalidStateError if lease table does not exist + * @error ProvisionedThroughputError if DynamoDB put fails due to lack of capacity + */ +func (l *LeaseManager) CreateLeaseIfNotExists(lease ILease) (bool, error) { + input := &dynamodb.PutItemInput{ + TableName: aws.String(l.tableName), + Item: l.serializer.ToDynamoRecord(lease), + Expected: l.serializer.GetDynamoNonexistantExpectation(), + } + _, err := l.dynamoDBClient.PutItem(input) + return err != nil, err +} + +/** + * @param shardId Get the lease for this shardId and it is the leaseKey + * + * @error LeasingInvalidStateError if lease table does not exist + * @error ProvisionedThroughputError if DynamoDB get fails due to lack of capacity + * @error LeasingDependencyError if DynamoDB get fails in an unexpected way + * + * @return lease for the specified shardId, or null if one doesn't exist + */ +func (l *LeaseManager) GetLease(shardId string) (ILease, error) { + input := &dynamodb.GetItemInput{ + TableName: aws.String(l.tableName), + Key: l.serializer.GetDynamoHashKey(shardId), + ConsistentRead: aws.Bool(l.consistentReads), + } + result, err := l.dynamoDBClient.GetItem(input) + if err != nil { + return nil, err + } + dynamoRecord := result.Item + if dynamoRecord == nil { + return nil, nil + } + lease := l.serializer.FromDynamoRecord(dynamoRecord) + return lease, nil +} + +/** + * Renew a lease by incrementing the lease counter. Conditional on the leaseCounter in DynamoDB matching the leaseCounter + * of the input. Mutates the leaseCounter of the passed-in lease object after updating the record in DynamoDB. + * + * @param lease the lease to renew + * + * @return true if renewal succeeded, false otherwise + * + * @error LeasingInvalidStateError if lease table does not exist + * @error ProvisionedThroughputError if DynamoDB update fails due to lack of capacity + * @error LeasingDependencyError if DynamoDB update fails in an unexpected way + */ +func (l *LeaseManager) RenewLease(lease ILease) (bool, error) { + input := &dynamodb.UpdateItemInput{ + TableName: aws.String(l.tableName), + Key: l.serializer.GetDynamoHashKey(lease.GetLeaseKey()), + Expected: l.serializer.GetDynamoLeaseCounterExpectation(lease), + } + _, err := l.dynamoDBClient.UpdateItem(input) + + if err != nil { + // If we had a spurious retry during the Dynamo update, then this conditional PUT failure + // might be incorrect. So, we get the item straight away and check if the lease owner + lease counter + // are what we expected. + expectedOwner := lease.GetLeaseOwner() + expectedCounter := lease.GetLeaseCounter() + 1 + updatedLease, _ := l.GetLease(lease.GetLeaseKey()) + if updatedLease == nil || expectedOwner != updatedLease.GetLeaseOwner() || + expectedCounter != updatedLease.GetLeaseCounter() { + return false, nil + } + + log.Println("Detected spurious renewal failure for lease with key " + lease.GetLeaseKey() + ", but recovered") + } + + lease.SetLeaseCounter(lease.GetLeaseCounter() + 1) + return err != nil, err + +} + +/** + * Take a lease for the given owner by incrementing its leaseCounter and setting its owner field. Conditional on + * the leaseCounter in DynamoDB matching the leaseCounter of the input. Mutates the leaseCounter and owner of the + * passed-in lease object after updating DynamoDB. + * + * @param lease the lease to take + * @param owner the new owner + * + * @return true if lease was successfully taken, false otherwise + * + * @error LeasingInvalidStateError if lease table does not exist + * @error ProvisionedThroughputError if DynamoDB update fails due to lack of capacity + * @error LeasingDependencyError if DynamoDB update fails in an unexpected way + */ +func (l *LeaseManager) TakeLease(lease ILease, owner string) (bool, error) { + input := &dynamodb.UpdateItemInput{ + TableName: aws.String(l.tableName), + Key: l.serializer.GetDynamoHashKey(lease.GetLeaseKey()), + Expected: l.serializer.GetDynamoLeaseCounterExpectation(lease), + } + + updates := l.serializer.GetDynamoLeaseCounterUpdate(lease) + + // putAll to updates + for k, v := range l.serializer.GetDynamoTakeLeaseUpdate(lease, owner) { + updates[k] = v + } + input.SetAttributeUpdates(updates) + _, err := l.dynamoDBClient.UpdateItem(input) + + if err != nil { + return false, err + } + + lease.SetLeaseCounter(lease.GetLeaseCounter() + 1) + lease.SetLeaseOwner(owner) + return true, nil +} + +/** + * Evict the current owner of lease by setting owner to null. Conditional on the owner in DynamoDB matching the owner of + * the input. Mutates the lease counter and owner of the passed-in lease object after updating the record in DynamoDB. + * + * @param lease the lease to void + * + * @return true if eviction succeeded, false otherwise + * + * @error LeasingInvalidStateError if lease table does not exist + * @error ProvisionedThroughputError if DynamoDB update fails due to lack of capacity + * @error LeasingDependencyError if DynamoDB update fails in an unexpected way + */ +func (l *LeaseManager) EvictLease(lease ILease) (bool, error) { + input := &dynamodb.UpdateItemInput{ + TableName: aws.String(l.tableName), + Key: l.serializer.GetDynamoHashKey(lease.GetLeaseKey()), + Expected: l.serializer.GetDynamoLeaseCounterExpectation(lease), + } + + updates := l.serializer.GetDynamoLeaseCounterUpdate(lease) + + // putAll to updates + for k, v := range l.serializer.GetDynamoEvictLeaseUpdate(lease) { + updates[k] = v + } + input.SetAttributeUpdates(updates) + _, err := l.dynamoDBClient.UpdateItem(input) + + if err != nil { + return false, err + } + + lease.SetLeaseCounter(lease.GetLeaseCounter() + 1) + lease.SetLeaseOwner("") + return true, nil +} + +/** + * Delete the given lease from DynamoDB. Does nothing when passed a lease that does not exist in DynamoDB. + * + * @param lease the lease to delete + * + * @error LeasingInvalidStateError if lease table does not exist + * @error ProvisionedThroughputError if DynamoDB delete fails due to lack of capacity + * @error LeasingDependencyError if DynamoDB delete fails in an unexpected way + */ +func (l *LeaseManager) DeleteLease(lease ILease) error { + input := &dynamodb.DeleteItemInput{ + TableName: aws.String(l.tableName), + Key: l.serializer.GetDynamoHashKey(lease.GetLeaseKey()), + } + _, err := l.dynamoDBClient.DeleteItem(input) + return err +} + +/** + * Delete all leases from DynamoDB. Useful for tools/utils and testing. + * + * @error LeasingInvalidStateError if lease table does not exist + * @error ProvisionedThroughputError if DynamoDB scan or delete fail due to lack of capacity + * @error LeasingDependencyError if DynamoDB scan or delete fail in an unexpected way + */ +func (l *LeaseManager) DeleteAll() error { + allLeases, err := l.ListLeases() + if err != nil { + return err + } + + for _, v := range allLeases { + err := l.DeleteLease(v) + if err != nil { + return err + } + } + return nil +} + +/** + * Update application-specific fields of the given lease in DynamoDB. Does not update fields managed by the leasing + * library such as leaseCounter, leaseOwner, or leaseKey. Conditional on the leaseCounter in DynamoDB matching the + * leaseCounter of the input. Increments the lease counter in DynamoDB so that updates can be contingent on other + * updates. Mutates the lease counter of the passed-in lease object. + * + * @return true if update succeeded, false otherwise + * + * @error LeasingInvalidStateError if lease table does not exist + * @error ProvisionedThroughputError if DynamoDB update fails due to lack of capacity + * @error LeasingDependencyError if DynamoDB update fails in an unexpected way + */ +func (l *LeaseManager) UpdateLease(lease ILease) (bool, error) { + input := &dynamodb.UpdateItemInput{ + TableName: aws.String(l.tableName), + Key: l.serializer.GetDynamoHashKey(lease.GetLeaseKey()), + Expected: l.serializer.GetDynamoLeaseCounterExpectation(lease), + } + + updates := l.serializer.GetDynamoLeaseCounterUpdate(lease) + + // putAll to updates + for k, v := range l.serializer.GetDynamoUpdateLeaseUpdate(lease) { + updates[k] = v + } + input.SetAttributeUpdates(updates) + _, err := l.dynamoDBClient.UpdateItem(input) + + if err != nil { + return false, err + } + + lease.SetLeaseCounter(lease.GetLeaseCounter() + 1) + return true, nil +} + +/** + * Check (synchronously) if there are any leases in the lease table. + * + * @return true if there are no leases in the lease table + * + * @error LeasingDependencyError if DynamoDB scan fails in an unexpected way + * @error LeasingInvalidStateError if lease table does not exist + * @error ProvisionedThroughputError if DynamoDB scan fails due to lack of capacity + */ +func (l *LeaseManager) IsLeaseTableEmpty() (bool, error) { + result, err := l.list(1) + if err != nil { + return true, err + } + return len(result) > 0, nil +} + +// tableStatus check the current lease table status +func (l *LeaseManager) tableStatus() (*string, error) { + input := &dynamodb.DescribeTableInput{ + TableName: aws.String(l.tableName), + } + + result, err := l.dynamoDBClient.DescribeTable(input) + if err != nil { + return nil, err + } + + return result.Table.TableStatus, nil +} + +// List with the given page size (number of items to consider at a time). Package access for integration testing. +func (l *LeaseManager) list(limit int64) ([]ILease, error) { + input := &dynamodb.ScanInput{ + TableName: aws.String(l.tableName), + } + + if limit > 0 { + input.SetLimit(limit) + } + + result := []ILease{} + + for { + scanResult, err := l.dynamoDBClient.Scan(input) + if err != nil || scanResult == nil { + break + } + + for _, v := range scanResult.Items { + result = append(result, l.serializer.FromDynamoRecord(v)) + } + + lastEvaluatedKey := scanResult.LastEvaluatedKey + if lastEvaluatedKey == nil { + scanResult = nil + break + } else { + input.SetExclusiveStartKey(lastEvaluatedKey) + } + } + + return result, nil +} diff --git a/src/leases/impl/lease-serializer.go b/src/leases/impl/lease-serializer.go new file mode 100644 index 0000000..3e3f3b3 --- /dev/null +++ b/src/leases/impl/lease-serializer.go @@ -0,0 +1,184 @@ +package impl + +import ( + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/dynamodb" + + dynamoutils "leases/dynamoutils" + . "leases/interfaces" +) + +const ( + LEASE_KEY_KEY = "leaseKey" + LEASE_OWNER_KEY = "leaseOwner" + LEASE_COUNTER_KEY = "leaseCounter" +) + +/** + * An implementation of ILeaseSerializer for basic Lease objects. Can also instantiate subclasses of Lease so that + * LeaseSerializer can be decorated by other classes if you need to add fields to leases. + */ +type LeaseSerializer struct { +} + +/** + * Construct a DynamoDB record out of a Lease object + * + * @param lease lease object to serialize + * @return an attribute value map representing the lease object + */ +func (lc *LeaseSerializer) ToDynamoRecord(lease ILease) map[string]*dynamodb.AttributeValue { + result := map[string]*dynamodb.AttributeValue{} + + result[LEASE_KEY_KEY], _ = dynamoutils.CreateAttributeValueFromString(lease.GetLeaseKey()) + result[LEASE_COUNTER_KEY], _ = dynamoutils.CreateAttributeValueFromLong(lease.GetLeaseCounter()) + + if len(lease.GetLeaseOwner()) > 0 { + result[LEASE_OWNER_KEY], _ = dynamoutils.CreateAttributeValueFromString(lease.GetLeaseOwner()) + } + + return result +} + +/** + * Construct a Lease object out of a DynamoDB record. + * + * @param dynamoRecord attribute value map from DynamoDB + * @return a deserialized lease object representing the attribute value map + */ +func (lc *LeaseSerializer) FromDynamoRecord(dynamoRecord map[string]*dynamodb.AttributeValue) ILease { + result := &Lease{} + + result.SetLeaseKey(aws.StringValue(dynamoutils.SafeGetString(dynamoRecord, LEASE_KEY_KEY))) + result.SetLeaseOwner(aws.StringValue(dynamoutils.SafeGetString(dynamoRecord, LEASE_OWNER_KEY))) + result.SetLeaseCounter(dynamoutils.SafeGetLong(dynamoRecord, LEASE_COUNTER_KEY)) + return result +} + +/** + * Special getDynamoHashKey implementation used by ILeaseManager.getLease(). + * + * @param leaseKey + * @return the attribute value map representing a Lease's hash key given a string. + */ +func (lc *LeaseSerializer) GetDynamoHashKey(leaseKey string) map[string]*dynamodb.AttributeValue { + result := map[string]*dynamodb.AttributeValue{} + result[LEASE_KEY_KEY], _ = dynamoutils.CreateAttributeValueFromString(leaseKey) + return result +} + +/** + * @param lease + * @return the attribute value map asserting that a lease counter is what we expect. + */ +func (lc *LeaseSerializer) GetDynamoLeaseCounterExpectation(lease ILease) map[string]*dynamodb.ExpectedAttributeValue { + result := map[string]*dynamodb.ExpectedAttributeValue{} + expectedAV := &dynamodb.ExpectedAttributeValue{} + val, _ := dynamoutils.CreateAttributeValueFromLong(lease.GetLeaseCounter()) + expectedAV.SetValue(val) + result[LEASE_COUNTER_KEY] = expectedAV + return result +} + +/** + * @param lease + * @return the attribute value map asserting that the lease owner is what we expect. + */ +func (lc *LeaseSerializer) GetDynamoLeaseOwnerExpectation(lease ILease) map[string]*dynamodb.ExpectedAttributeValue { + result := map[string]*dynamodb.ExpectedAttributeValue{} + expectedAV := &dynamodb.ExpectedAttributeValue{} + val, _ := dynamoutils.CreateAttributeValueFromString(lease.GetLeaseOwner()) + expectedAV.SetValue(val) + result[LEASE_OWNER_KEY] = expectedAV + return result + +} + +/** + * @return the attribute value map asserting that a lease does not exist. + */ +func (lc *LeaseSerializer) GetDynamoNonexistantExpectation() map[string]*dynamodb.ExpectedAttributeValue { + result := map[string]*dynamodb.ExpectedAttributeValue{} + expectedAV := &dynamodb.ExpectedAttributeValue{} + expectedAV.SetExists(false) + result[LEASE_KEY_KEY] = expectedAV + + return result +} + +/** + * @param lease + * @return the attribute value map that increments a lease counter + */ +func (lc *LeaseSerializer) GetDynamoLeaseCounterUpdate(lease ILease) map[string]*dynamodb.AttributeValueUpdate { + result := map[string]*dynamodb.AttributeValueUpdate{} + updatedAV := &dynamodb.AttributeValueUpdate{} + // Increase the lease counter by 1 + val, _ := dynamoutils.CreateAttributeValueFromLong(lease.GetLeaseCounter() + 1) + updatedAV.SetValue(val) + updatedAV.SetAction(dynamodb.AttributeActionPut) + result[LEASE_COUNTER_KEY] = updatedAV + return result +} + +/** + * @param lease + * @param newOwner + * @return the attribute value map that takes a lease for a new owner + */ +func (lc *LeaseSerializer) GetDynamoTakeLeaseUpdate(lease ILease, newOwner string) map[string]*dynamodb.AttributeValueUpdate { + result := map[string]*dynamodb.AttributeValueUpdate{} + updatedAV := &dynamodb.AttributeValueUpdate{} + val, _ := dynamoutils.CreateAttributeValueFromString(lease.GetLeaseOwner()) + updatedAV.SetValue(val) + updatedAV.SetAction(dynamodb.AttributeActionPut) + result[LEASE_OWNER_KEY] = updatedAV + return result +} + +/** + * @param lease + * @return the attribute value map that voids a lease + */ +func (lc *LeaseSerializer) GetDynamoEvictLeaseUpdate(lease ILease) map[string]*dynamodb.AttributeValueUpdate { + result := map[string]*dynamodb.AttributeValueUpdate{} + updatedAV := &dynamodb.AttributeValueUpdate{} + updatedAV.SetValue(nil) + updatedAV.SetAction(dynamodb.AttributeActionDelete) + result[LEASE_OWNER_KEY] = updatedAV + return result +} + +/** + * @param lease + * @return the attribute value map that updates application-specific data for a lease and increments the lease + * counter + */ +func (lc *LeaseSerializer) GetDynamoUpdateLeaseUpdate(lease ILease) map[string]*dynamodb.AttributeValueUpdate { + result := map[string]*dynamodb.AttributeValueUpdate{} + return result +} + +/** + * @return the key schema for creating a DynamoDB table to store leases + */ +func (lc *LeaseSerializer) GetKeySchema() []*dynamodb.KeySchemaElement { + keySchema := []*dynamodb.KeySchemaElement{} + schemaElement := &dynamodb.KeySchemaElement{} + schemaElement.SetAttributeName(LEASE_KEY_KEY) + schemaElement.SetKeyType(dynamodb.KeyTypeHash) + keySchema = append(keySchema, schemaElement) + return keySchema +} + +/** + * @return attribute definitions for creating a DynamoDB table to store leases + */ +func (lc *LeaseSerializer) GetAttributeDefinitions() []*dynamodb.AttributeDefinition { + definitions := []*dynamodb.AttributeDefinition{} + definition := &dynamodb.AttributeDefinition{} + definition.SetAttributeName(LEASE_KEY_KEY) + definition.SetAttributeType(dynamodb.ScalarAttributeTypeS) + definitions = append(definitions, definition) + return definitions +} diff --git a/src/leases/impl/lease.go b/src/leases/impl/lease.go new file mode 100644 index 0000000..b87ecf3 --- /dev/null +++ b/src/leases/impl/lease.go @@ -0,0 +1,113 @@ +package impl + +import ( + cc "clientlibrary/common" + "time" +) + +const ( + // We will consider leases to be expired if they are more than 90 days. + MAX_ABS_AGE_NANOS = int64(90 * 24 * time.Hour) +) + +// Lease structure contains data pertaining to a Lease. Distributed systems may use leases to partition work across a +// fleet of workers. Each unit of work (identified by a leaseKey) has a corresponding Lease. Every worker will contend +// for all leases - only one worker will successfully take each one. The worker should hold the lease until it is ready to stop +// processing the corresponding unit of work, or until it fails. When the worker stops holding the lease, another worker will +// take and hold the lease. +type Lease struct { + leaseKey string + leaseOwner string + leaseCounter int64 + + // This field is used to prevent updates to leases that we have lost and re-acquired. It is deliberately not + // persisted in DynamoDB and excluded from hashCode and equals. + concurrencyToken string + + // This field is used by LeaseRenewer and LeaseTaker to track the last time a lease counter was incremented. It is + // deliberately not persisted in DynamoDB and excluded from hashCode and equals. + lastCounterIncrementNanos int64 +} + +// CloneLease to clone a lease object +func CopyLease(lease *Lease) *Lease { + return &Lease{ + leaseKey: lease.leaseKey, + leaseOwner: lease.leaseOwner, + leaseCounter: lease.leaseCounter, + concurrencyToken: lease.concurrencyToken, + lastCounterIncrementNanos: lease.lastCounterIncrementNanos, + } +} + +// GetLeaseKey retrieves leaseKey - identifies the unit of work associated with this lease. +func (l *Lease) GetLeaseKey() string { + return l.leaseKey +} + +// GetLeaseOwner gets current owner of the lease, may be "". +func (l *Lease) GetLeaseOwner() string { + return l.leaseOwner +} + +// GetLeaseCounter retrieves leaseCounter which is incremented periodically by the holder of the lease. Used for optimistic locking. +func (l *Lease) GetLeaseCounter() int64 { + return l.leaseCounter +} + +// GetConcurrencyToken returns concurrency token +func (l *Lease) GetConcurrencyToken() string { + return l.concurrencyToken +} + +// GetLastCounterIncrementNanos returns concurrency token +func (l *Lease) GetLastCounterIncrementNanos() int64 { + return l.lastCounterIncrementNanos +} + +// SetLeaseKey sets leaseKey - LeaseKey is immutable once set. +func (l *Lease) SetLeaseKey(leaseKey string) error { + if len(l.leaseKey) > 0 { + return cc.IllegalArgumentError.MakeErr().WithDetail("LeaseKey is immutable once set") + } + + l.leaseKey = leaseKey + return nil +} + +// SetLeaseOwner set current owner of the lease, may be "". +func (l *Lease) SetLeaseOwner(leaseOwner string) { + l.leaseOwner = leaseOwner +} + +// SetLeaseCounter sets leaseCounter which is incremented periodically by the holder of the lease. Used for optimistic locking. +func (l *Lease) SetLeaseCounter(leaseCounter int64) { + l.leaseCounter = leaseCounter +} + +// SetConcurrencyToken +func (l *Lease) SetConcurrencyToken(concurrencyToken string) { + l.concurrencyToken = concurrencyToken +} + +// SetLastCounterIncrementNanos returns concurrency token +func (l *Lease) SetLastCounterIncrementNanos(lastCounterIncrementNanos int64) { + l.lastCounterIncrementNanos = lastCounterIncrementNanos +} + +// IsExpired to check whether lease expired using +// @param leaseDurationNanos duration of lease in nanoseconds +// @param asOfNanos time in nanoseconds to check expiration as-of +// @return true if lease is expired as-of given time, false otherwise +func (l *Lease) IsExpired(leaseDurationNanos, asOfNanos int64) bool { + if l.lastCounterIncrementNanos == 0 { + return true + } + + age := asOfNanos - l.lastCounterIncrementNanos + if age > MAX_ABS_AGE_NANOS { + return true + } else { + return age > leaseDurationNanos + } +} diff --git a/src/leases/interfaces/lease-manager.go b/src/leases/interfaces/lease-manager.go new file mode 100644 index 0000000..8f27aa2 --- /dev/null +++ b/src/leases/interfaces/lease-manager.go @@ -0,0 +1,162 @@ +package interfaces + +// ILeaseManager supports basic CRUD operations for Leases. +type ILeaseManager interface { + + /** + * Creates the table that will store leases. Succeeds if table already exists. + * + * @param readCapacity + * @param writeCapacity + * + * @return true if we created a new table (table didn't exist before) + * + * @error ProvisionedThroughputError if we cannot create the lease table due to per-AWS-account capacity + * restrictions. + * @error LeasingDependencyError if DynamoDB createTable fails in an unexpected way + */ + CreateLeaseTableIfNotExists(readCapacity, writeCapacity int64) (bool, error) + + /** + * @return true if the lease table already exists. + * + * @error LeasingDependencyError if DynamoDB describeTable fails in an unexpected way + */ + LeaseTableExists() (bool, error) + + /** + * Blocks until the lease table exists by polling leaseTableExists. + * + * @param secondsBetweenPolls time to wait between polls in seconds + * @param timeoutSeconds total time to wait in seconds + * + * @return true if table exists, false if timeout was reached + * + * @error LeasingDependencyError if DynamoDB describeTable fails in an unexpected way + */ + WaitUntilLeaseTableExists(secondsBetweenPolls, timeoutSeconds int64) (bool, error) + + /** + * List all objects in table synchronously. + * + * @error LeasingDependencyError if DynamoDB scan fails in an unexpected way + * @error LeasingInvalidStateError if lease table does not exist + * @error ProvisionedThroughputError if DynamoDB scan fails due to lack of capacity + * + * @return list of leases + */ + ListLeases() ([]ILease, error) + + /** + * Create a new lease. Conditional on a lease not already existing with this shardId. + * + * @param lease the lease to create + * + * @return true if lease was created, false if lease already exists + * + * @error LeasingDependencyError if DynamoDB put fails in an unexpected way + * @error LeasingInvalidStateError if lease table does not exist + * @error ProvisionedThroughputError if DynamoDB put fails due to lack of capacity + */ + CreateLeaseIfNotExists(lease ILease) (bool, error) + + /** + * @param shardId Get the lease for this shardId + * + * @error LeasingInvalidStateError if lease table does not exist + * @error ProvisionedThroughputError if DynamoDB get fails due to lack of capacity + * @error LeasingDependencyError if DynamoDB get fails in an unexpected way + * + * @return lease for the specified shardId, or null if one doesn't exist + */ + GetLease(shardId string) (ILease, error) + + /** + * Renew a lease by incrementing the lease counter. Conditional on the leaseCounter in DynamoDB matching the leaseCounter + * of the input. Mutates the leaseCounter of the passed-in lease object after updating the record in DynamoDB. + * + * @param lease the lease to renew + * + * @return true if renewal succeeded, false otherwise + * + * @error LeasingInvalidStateError if lease table does not exist + * @error ProvisionedThroughputError if DynamoDB update fails due to lack of capacity + * @error LeasingDependencyError if DynamoDB update fails in an unexpected way + */ + RenewLease(lease ILease) (bool, error) + + /** + * Take a lease for the given owner by incrementing its leaseCounter and setting its owner field. Conditional on + * the leaseCounter in DynamoDB matching the leaseCounter of the input. Mutates the leaseCounter and owner of the + * passed-in lease object after updating DynamoDB. + * + * @param lease the lease to take + * @param owner the new owner + * + * @return true if lease was successfully taken, false otherwise + * + * @error LeasingInvalidStateError if lease table does not exist + * @error ProvisionedThroughputError if DynamoDB update fails due to lack of capacity + * @error LeasingDependencyError if DynamoDB update fails in an unexpected way + */ + TakeLease(lease ILease, owner string) (bool, error) + + /** + * Evict the current owner of lease by setting owner to null. Conditional on the owner in DynamoDB matching the owner of + * the input. Mutates the lease counter and owner of the passed-in lease object after updating the record in DynamoDB. + * + * @param lease the lease to void + * + * @return true if eviction succeeded, false otherwise + * + * @error LeasingInvalidStateError if lease table does not exist + * @error ProvisionedThroughputError if DynamoDB update fails due to lack of capacity + * @error LeasingDependencyError if DynamoDB update fails in an unexpected way + */ + EvictLease(lease ILease) (bool, error) + + /** + * Delete the given lease from DynamoDB. Does nothing when passed a lease that does not exist in DynamoDB. + * + * @param lease the lease to delete + * + * @error LeasingInvalidStateError if lease table does not exist + * @error ProvisionedThroughputError if DynamoDB delete fails due to lack of capacity + * @error LeasingDependencyError if DynamoDB delete fails in an unexpected way + */ + DeleteLease(lease ILease) error + + /** + * Delete all leases from DynamoDB. Useful for tools/utils and testing. + * + * @error LeasingInvalidStateError if lease table does not exist + * @error ProvisionedThroughputError if DynamoDB scan or delete fail due to lack of capacity + * @error LeasingDependencyError if DynamoDB scan or delete fail in an unexpected way + */ + DeleteAll() error + + /** + * Update application-specific fields of the given lease in DynamoDB. Does not update fields managed by the leasing + * library such as leaseCounter, leaseOwner, or leaseKey. Conditional on the leaseCounter in DynamoDB matching the + * leaseCounter of the input. Increments the lease counter in DynamoDB so that updates can be contingent on other + * updates. Mutates the lease counter of the passed-in lease object. + * + * @return true if update succeeded, false otherwise + * + * @error LeasingInvalidStateError if lease table does not exist + * @error ProvisionedThroughputError if DynamoDB update fails due to lack of capacity + * @error LeasingDependencyError if DynamoDB update fails in an unexpected way + */ + UpdateLease(lease ILease) (bool, error) + + /** + * Check (synchronously) if there are any leases in the lease table. + * + * @return true if there are no leases in the lease table + * + * @error LeasingDependencyError if DynamoDB scan fails in an unexpected way + * @error LeasingInvalidStateError if lease table does not exist + * @error ProvisionedThroughputError if DynamoDB scan fails due to lack of capacity + */ + IsLeaseTableEmpty() (bool, error) +} diff --git a/src/leases/interfaces/lease-renewer.go b/src/leases/interfaces/lease-renewer.go new file mode 100644 index 0000000..6e52049 --- /dev/null +++ b/src/leases/interfaces/lease-renewer.go @@ -0,0 +1,78 @@ +package interfaces + +// LeaseTable hold current lease mapping shardId --> Lease +type LeaseTable map[string]*ILease + +/** + * ILeaseRenewer objects are used by LeaseCoordinator to renew leases held by the LeaseCoordinator. Each + * LeaseCoordinator instance corresponds to one worker, and uses exactly one ILeaseRenewer to manage lease renewal for + * that worker. + */ +type ILeaseRenewer interface { + + /** + * Bootstrap initial set of leases from the LeaseManager (e.g. upon process restart, pick up leases we own) + * @error LeasingDependencyError on unexpected DynamoDB failures + * @error LeasingInvalidStateError if lease table doesn't exist + * @error ProvisionedThroughputError if DynamoDB reads fail due to insufficient capacity + */ + Initialize() error + + /** + * Attempt to renew all currently held leases. + * + * @error LeasingDependencyError on unexpected DynamoDB failures + * @error LeasingInvalidStateError if lease table does not exist + */ + RenewLeases() error + + /** + * @return currently held leases. Key is shardId, value is corresponding Lease object. A lease is currently held if + * we successfully renewed it on the last run of renewLeases(). Lease objects returned are deep copies - + * their lease counters will not tick. + */ + GetCurrentlyHeldLeases() *LeaseTable + + /** + * @param leaseKey key of the lease to retrieve + * + * @return a deep copy of a currently held lease, or null if we don't hold the lease + */ + GetCurrentlyHeldLease(leaseKey string) *ILease + + /** + * Adds leases to this LeaseRenewer's set of currently held leases. Leases must have lastRenewalNanos set to the + * last time the lease counter was incremented before being passed to this method. + * + * @param newLeases new leases. + */ + AddLeasesToRenew(newLeases []ILease) + + /** + * Clears this LeaseRenewer's set of currently held leases. + */ + ClearCurrentlyHeldLeases() + + /** + * Stops the lease renewer from continunig to maintain the given lease. + * + * @param lease the lease to drop. + */ + DropLease(lease ILease) + + /** + * Update application-specific fields in a currently held lease. Cannot be used to update internal fields such as + * leaseCounter, leaseOwner, etc. Fails if we do not hold the lease, or if the concurrency token does not match + * the concurrency token on the internal authoritative copy of the lease (ie, if we lost and re-acquired the lease). + * + * @param lease lease object containing updated data + * @param concurrencyToken obtained by calling Lease.getConcurrencyToken for a currently held lease + * + * @return true if update succeeds, false otherwise + * + * @error LeasingInvalidStateError if lease table does not exist + * @error ProvisionedThroughputError if DynamoDB update fails due to lack of capacity + * @error LeasingDependencyError if DynamoDB update fails in an unexpected way + */ + UpdateLease(lease ILease, concurrencyToken string) (bool, error) +} diff --git a/src/leases/interfaces/lease-serializer.go b/src/leases/interfaces/lease-serializer.go new file mode 100644 index 0000000..a8601d0 --- /dev/null +++ b/src/leases/interfaces/lease-serializer.go @@ -0,0 +1,86 @@ +package interfaces + +import ( + "github.com/aws/aws-sdk-go/service/dynamodb" +) + +// ILeaseSerializer an utility class that manages the mapping of Lease objects/operations to records in DynamoDB. +type ILeaseSerializer interface { + + /** + * Construct a DynamoDB record out of a Lease object + * + * @param lease lease object to serialize + * @return an attribute value map representing the lease object + */ + ToDynamoRecord(lease ILease) map[string]*dynamodb.AttributeValue + + /** + * Construct a Lease object out of a DynamoDB record. + * + * @param dynamoRecord attribute value map from DynamoDB + * @return a deserialized lease object representing the attribute value map + */ + FromDynamoRecord(dynamoRecord map[string]*dynamodb.AttributeValue) ILease + + /** + * Special getDynamoHashKey implementation used by ILeaseManager.getLease(). + * + * @param leaseKey + * @return the attribute value map representing a Lease's hash key given a string. + */ + GetDynamoHashKey(leaseKey string) map[string]*dynamodb.AttributeValue + + /** + * @param lease + * @return the attribute value map asserting that a lease counter is what we expect. + */ + GetDynamoLeaseCounterExpectation(lease ILease) map[string]*dynamodb.ExpectedAttributeValue + + /** + * @param lease + * @return the attribute value map asserting that the lease owner is what we expect. + */ + GetDynamoLeaseOwnerExpectation(lease ILease) map[string]*dynamodb.ExpectedAttributeValue + + /** + * @return the attribute value map asserting that a lease does not exist. + */ + GetDynamoNonexistantExpectation() map[string]*dynamodb.ExpectedAttributeValue + + /** + * @param lease + * @return the attribute value map that increments a lease counter + */ + GetDynamoLeaseCounterUpdate(lease ILease) map[string]*dynamodb.AttributeValueUpdate + + /** + * @param lease + * @param newOwner + * @return the attribute value map that takes a lease for a new owner + */ + GetDynamoTakeLeaseUpdate(lease ILease, newOwner string) map[string]*dynamodb.AttributeValueUpdate + + /** + * @param lease + * @return the attribute value map that voids a lease + */ + GetDynamoEvictLeaseUpdate(lease ILease) map[string]*dynamodb.AttributeValueUpdate + + /** + * @param lease + * @return the attribute value map that updates application-specific data for a lease and increments the lease + * counter + */ + GetDynamoUpdateLeaseUpdate(lease ILease) map[string]*dynamodb.AttributeValueUpdate + + /** + * @return the key schema for creating a DynamoDB table to store leases + */ + GetKeySchema() []*dynamodb.KeySchemaElement + + /** + * @return attribute definitions for creating a DynamoDB table to store leases + */ + GetAttributeDefinitions() []*dynamodb.AttributeDefinition +} diff --git a/src/leases/interfaces/lease-taker.go b/src/leases/interfaces/lease-taker.go new file mode 100644 index 0000000..0dbaf1b --- /dev/null +++ b/src/leases/interfaces/lease-taker.go @@ -0,0 +1,28 @@ +package interfaces + +/** + * ILeaseTaker is used by LeaseCoordinator to take new leases, or leases that other workers fail to renew. Each + * LeaseCoordinator instance corresponds to one worker and uses exactly one ILeaseTaker to take leases for that worker. + */ +type ILeaseTaker interface { + + /** + * Compute the set of leases available to be taken and attempt to take them. Lease taking rules are: + * + * 1) If a lease's counter hasn't changed in long enough, try to take it. + * 2) If we see a lease we've never seen before, take it only if owner == null. If it's owned, odds are the owner is + * holding it. We can't tell until we see it more than once. + * 3) For load balancing purposes, you may violate rules 1 and 2 for EXACTLY ONE lease per call of takeLeases(). + * + * @return map of shardId to Lease object for leases we just successfully took. + * + * @error LeasingDependencyError on unexpected DynamoDB failures + * @error LeasingInvalidStateError if lease table does not exist + */ + TakeLeases() map[string]ILease + + /** + * @return workerIdentifier for this LeaseTaker + */ + GetWorkerIdentifier() string +} diff --git a/src/leases/interfaces/lease.go b/src/leases/interfaces/lease.go new file mode 100644 index 0000000..f3da35a --- /dev/null +++ b/src/leases/interfaces/lease.go @@ -0,0 +1,21 @@ +package interfaces + +// ILease is the interface for all Leases +type ILease interface { + GetLeaseKey() string + SetLeaseKey(leaseKey string) error + + GetLeaseOwner() string + SetLeaseOwner(leaseOwner string) + + GetLeaseCounter() int64 + SetLeaseCounter(leaseCounter int64) + + GetConcurrencyToken() string + SetConcurrencyToken(concurrencyToken string) + + GetLastCounterIncrementNanos() int64 + SetLastCounterIncrementNanos(lastCounterIncrementNanos int64) + + IsExpired(leaseDurationNanos, asOfNanos int64) bool +}