KCL: Implement Shard Lease (part 1/2)
This is the first part of implementing shard lease for Kinesis Client library. It creates dynamoDB table for managing Kinesis stream shard lease. https://jira.eng.vmware.com/browse/CNA-636 Adjust error code value range. Change-Id: I16565fa15332843101235fb14545ee69c2599f2f
This commit is contained in:
parent
702335374c
commit
425daf70ce
15 changed files with 1444 additions and 127 deletions
|
|
@ -10,35 +10,36 @@ type ErrorCode int32
|
||||||
|
|
||||||
// pre-defined error codes
|
// pre-defined error codes
|
||||||
const (
|
const (
|
||||||
// System Wide 20000 - 20199
|
// System Wide 41000 - 42000
|
||||||
KinesisClientLibError ErrorCode = 20000
|
KinesisClientLibError ErrorCode = 41000
|
||||||
|
|
||||||
// KinesisClientLibrary Retryable Errors 20001 - 20099
|
// KinesisClientLibrary Retryable Errors 41001 - 41100
|
||||||
KinesisClientLibRetryableError ErrorCode = 20001
|
KinesisClientLibRetryableError ErrorCode = 41001
|
||||||
|
|
||||||
KinesisClientLibIOError ErrorCode = 20002
|
KinesisClientLibIOError ErrorCode = 41002
|
||||||
BlockedOnParentShardError ErrorCode = 20003
|
BlockedOnParentShardError ErrorCode = 41003
|
||||||
KinesisClientLibDependencyError ErrorCode = 20004
|
KinesisClientLibDependencyError ErrorCode = 41004
|
||||||
ThrottlingError ErrorCode = 20005
|
ThrottlingError ErrorCode = 41005
|
||||||
|
|
||||||
// KinesisClientLibrary NonRetryable Errors 20100 - 20149
|
// KinesisClientLibrary NonRetryable Errors 41100 - 41200
|
||||||
KinesisClientLibNonRetryableException ErrorCode = 20000
|
KinesisClientLibNonRetryableException ErrorCode = 41100
|
||||||
|
|
||||||
InvalidStateError ErrorCode = 20101
|
InvalidStateError ErrorCode = 41101
|
||||||
ShutdownError ErrorCode = 20102
|
ShutdownError ErrorCode = 41102
|
||||||
|
|
||||||
// Kinesis Lease Errors 20150 - 20199
|
// Kinesis Lease Errors 41200 - 41300
|
||||||
LeasingError ErrorCode = 20150
|
LeasingError ErrorCode = 41200
|
||||||
|
|
||||||
LeasingInvalidStateError ErrorCode = 20151
|
LeasingInvalidStateError ErrorCode = 41201
|
||||||
LeasingDependencyError ErrorCode = 20152
|
LeasingDependencyError ErrorCode = 41202
|
||||||
LeasingProvisionedThroughputError ErrorCode = 20153
|
LeasingProvisionedThroughputError ErrorCode = 41203
|
||||||
|
|
||||||
// Error indicates passing illegal or inappropriate argument
|
// Misc Errors 41300 - 41400
|
||||||
IllegalArgumentError ErrorCode = 20198
|
// NotImplemented
|
||||||
|
KinesisClientLibNotImplemented ErrorCode = 41301
|
||||||
|
|
||||||
// NotImplemented
|
// Error indicates passing illegal or inappropriate argument
|
||||||
KinesisClientLibNotImplemented ErrorCode = 20199
|
IllegalArgumentError ErrorCode = 41302
|
||||||
)
|
)
|
||||||
|
|
||||||
var errorMap = map[ErrorCode]ClientLibraryError{
|
var errorMap = map[ErrorCode]ClientLibraryError{
|
||||||
|
|
@ -14,7 +14,7 @@ const (
|
||||||
LATEST = InitialPositionInStream(1)
|
LATEST = InitialPositionInStream(1)
|
||||||
// TRIM_HORIZON start from the oldest available data record
|
// TRIM_HORIZON start from the oldest available data record
|
||||||
TRIM_HORIZON = LATEST + 1
|
TRIM_HORIZON = LATEST + 1
|
||||||
// AT_TIMESTAMP start from the record at or after the specified server-side timestamp.
|
// AT_TIMESTAMP start from the record at or after the specified server-side Timestamp.
|
||||||
AT_TIMESTAMP = TRIM_HORIZON + 1
|
AT_TIMESTAMP = TRIM_HORIZON + 1
|
||||||
|
|
||||||
// The location in the shard from which the KinesisClientLibrary will start fetching records from
|
// The location in the shard from which the KinesisClientLibrary will start fetching records from
|
||||||
|
|
@ -99,14 +99,14 @@ const (
|
||||||
)
|
)
|
||||||
|
|
||||||
type (
|
type (
|
||||||
// InitialPositionInStream Used to specify the position in the stream where a new application should start from
|
// InitialPositionInStream Used to specify the Position in the stream where a new application should start from
|
||||||
// This is used during initial application bootstrap (when a checkpoint doesn't exist for a shard or its parents)
|
// This is used during initial application bootstrap (when a checkpoint doesn't exist for a shard or its parents)
|
||||||
InitialPositionInStream int
|
InitialPositionInStream int
|
||||||
|
|
||||||
// Class that houses the entities needed to specify the position in the stream from where a new application should
|
// Class that houses the entities needed to specify the Position in the stream from where a new application should
|
||||||
// start.
|
// start.
|
||||||
InitialPositionInStreamExtended struct {
|
InitialPositionInStreamExtended struct {
|
||||||
position InitialPositionInStream
|
Position InitialPositionInStream
|
||||||
|
|
||||||
// The time stamp of the data record from which to start reading. Used with
|
// The time stamp of the data record from which to start reading. Used with
|
||||||
// shard iterator type AT_TIMESTAMP. A time stamp is the Unix epoch date with
|
// shard iterator type AT_TIMESTAMP. A time stamp is the Unix epoch date with
|
||||||
|
|
@ -115,100 +115,103 @@ type (
|
||||||
// iterator returned is for the next (later) record. If the time stamp is older
|
// iterator returned is for the next (later) record. If the time stamp is older
|
||||||
// than the current trim horizon, the iterator returned is for the oldest untrimmed
|
// than the current trim horizon, the iterator returned is for the oldest untrimmed
|
||||||
// data record (TRIM_HORIZON).
|
// data record (TRIM_HORIZON).
|
||||||
timestamp *time.Time `type:"timestamp" timestampFormat:"unix"`
|
Timestamp *time.Time `type:"Timestamp" timestampFormat:"unix"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// Configuration for the Kinesis Client Library.
|
// Configuration for the Kinesis Client Library.
|
||||||
KinesisClientLibConfiguration struct {
|
KinesisClientLibConfiguration struct {
|
||||||
// applicationName is name of application. Kinesis allows multiple applications to consume the same stream.
|
// ApplicationName is name of application. Kinesis allows multiple applications to consume the same stream.
|
||||||
applicationName string
|
ApplicationName string
|
||||||
|
|
||||||
// tableName is name of the dynamo db table for managing kinesis stream default to applicationName
|
// TableName is name of the dynamo db table for managing kinesis stream default to ApplicationName
|
||||||
tableName string
|
TableName string
|
||||||
|
|
||||||
// streamName is the name of Kinesis stream
|
// StreamName is the name of Kinesis stream
|
||||||
streamName string
|
StreamName string
|
||||||
|
|
||||||
// workerID used to distinguish different workers/processes of a Kinesis application
|
// WorkerID used to distinguish different workers/processes of a Kinesis application
|
||||||
workerID string
|
WorkerID string
|
||||||
|
|
||||||
// kinesisEndpoint endpoint
|
// KinesisEndpoint endpoint
|
||||||
kinesisEndpoint string
|
KinesisEndpoint string
|
||||||
|
|
||||||
// dynamoDB endpoint
|
// DynamoDB endpoint
|
||||||
dynamoDBEndpoint string
|
DynamoDBEndpoint string
|
||||||
|
|
||||||
// initialPositionInStream specifies the position in the stream where a new application should start from
|
// InitialPositionInStream specifies the Position in the stream where a new application should start from
|
||||||
initialPositionInStream InitialPositionInStream
|
InitialPositionInStream InitialPositionInStream
|
||||||
|
|
||||||
// initialPositionInStreamExtended provides actual AT_TMESTAMP value
|
// InitialPositionInStreamExtended provides actual AT_TMESTAMP value
|
||||||
initialPositionInStreamExtended InitialPositionInStreamExtended
|
InitialPositionInStreamExtended InitialPositionInStreamExtended
|
||||||
|
|
||||||
// credentials to access Kinesis/Dynamo/CloudWatch: https://docs.aws.amazon.com/sdk-for-go/api/aws/credentials/
|
// credentials to access Kinesis/Dynamo/CloudWatch: https://docs.aws.amazon.com/sdk-for-go/api/aws/credentials/
|
||||||
// Note: No need to configure here. Use NewEnvCredentials for testing and EC2RoleProvider for production
|
// Note: No need to configure here. Use NewEnvCredentials for testing and EC2RoleProvider for production
|
||||||
|
|
||||||
// failoverTimeMillis Lease duration (leases not renewed within this period will be claimed by others)
|
// FailoverTimeMillis Lease duration (leases not renewed within this period will be claimed by others)
|
||||||
failoverTimeMillis int
|
FailoverTimeMillis int
|
||||||
|
|
||||||
/// maxRecords Max records to read per Kinesis getRecords() call
|
/// MaxRecords Max records to read per Kinesis getRecords() call
|
||||||
maxRecords int
|
MaxRecords int
|
||||||
|
|
||||||
// idleTimeBetweenReadsInMillis Idle time between calls to fetch data from Kinesis
|
// IdleTimeBetweenReadsInMillis Idle time between calls to fetch data from Kinesis
|
||||||
idleTimeBetweenReadsInMillis int
|
IdleTimeBetweenReadsInMillis int
|
||||||
|
|
||||||
// callProcessRecordsEvenForEmptyRecordList Call the IRecordProcessor::processRecords() API even if
|
// CallProcessRecordsEvenForEmptyRecordList Call the IRecordProcessor::processRecords() API even if
|
||||||
// GetRecords returned an empty record list.
|
// GetRecords returned an empty record list.
|
||||||
callProcessRecordsEvenForEmptyRecordList bool
|
CallProcessRecordsEvenForEmptyRecordList bool
|
||||||
|
|
||||||
// parentShardPollIntervalMillis Wait for this long between polls to check if parent shards are done
|
// ParentShardPollIntervalMillis Wait for this long between polls to check if parent shards are done
|
||||||
parentShardPollIntervalMillis int
|
ParentShardPollIntervalMillis int
|
||||||
|
|
||||||
// shardSyncIntervalMillis Time between tasks to sync leases and Kinesis shards
|
// ShardSyncIntervalMillis Time between tasks to sync leases and Kinesis shards
|
||||||
shardSyncIntervalMillis int
|
ShardSyncIntervalMillis int
|
||||||
|
|
||||||
// cleanupTerminatedShardsBeforeExpiry Clean up shards we've finished processing (don't wait for expiration)
|
// CleanupTerminatedShardsBeforeExpiry Clean up shards we've finished processing (don't wait for expiration)
|
||||||
cleanupTerminatedShardsBeforeExpiry bool
|
CleanupTerminatedShardsBeforeExpiry bool
|
||||||
|
|
||||||
// kinesisClientConfig Client Configuration used by Kinesis client
|
// kinesisClientConfig Client Configuration used by Kinesis client
|
||||||
// dynamoDBClientConfig Client Configuration used by DynamoDB client
|
// dynamoDBClientConfig Client Configuration used by DynamoDB client
|
||||||
// cloudWatchClientConfig Client Configuration used by CloudWatch client
|
// cloudWatchClientConfig Client Configuration used by CloudWatch client
|
||||||
// Note: we will use default client provided by AWS SDK
|
// Note: we will use default client provided by AWS SDK
|
||||||
|
|
||||||
// taskBackoffTimeMillis Backoff period when tasks encounter an exception
|
// TaskBackoffTimeMillis Backoff period when tasks encounter an exception
|
||||||
taskBackoffTimeMillis int
|
TaskBackoffTimeMillis int
|
||||||
|
|
||||||
// metricsBufferTimeMillis Metrics are buffered for at most this long before publishing to CloudWatch
|
// MetricsBufferTimeMillis Metrics are buffered for at most this long before publishing to CloudWatch
|
||||||
metricsBufferTimeMillis int
|
MetricsBufferTimeMillis int
|
||||||
|
|
||||||
// metricsMaxQueueSize Max number of metrics to buffer before publishing to CloudWatch
|
// MetricsMaxQueueSize Max number of metrics to buffer before publishing to CloudWatch
|
||||||
metricsMaxQueueSize int
|
MetricsMaxQueueSize int
|
||||||
|
|
||||||
// validateSequenceNumberBeforeCheckpointing whether KCL should validate client provided sequence numbers
|
// ValidateSequenceNumberBeforeCheckpointing whether KCL should validate client provided sequence numbers
|
||||||
validateSequenceNumberBeforeCheckpointing bool
|
ValidateSequenceNumberBeforeCheckpointing bool
|
||||||
|
|
||||||
// regionName The region name for the service
|
// RegionName The region name for the service
|
||||||
regionName string
|
RegionName string
|
||||||
|
|
||||||
// shutdownGraceMillis The number of milliseconds before graceful shutdown terminates forcefully
|
// ShutdownGraceMillis The number of milliseconds before graceful shutdown terminates forcefully
|
||||||
shutdownGraceMillis int
|
ShutdownGraceMillis int
|
||||||
|
|
||||||
// Operation parameters
|
// Operation parameters
|
||||||
|
|
||||||
// Max leases this Worker can handle at a time
|
// Max leases this Worker can handle at a time
|
||||||
maxLeasesForWorker int
|
MaxLeasesForWorker int
|
||||||
|
|
||||||
// Max leases to steal at one time (for load balancing)
|
// Max leases to steal at one time (for load balancing)
|
||||||
maxLeasesToStealAtOneTime int
|
MaxLeasesToStealAtOneTime int
|
||||||
|
|
||||||
// Read capacity to provision when creating the lease table (dynamoDB).
|
// Read capacity to provision when creating the lease table (dynamoDB).
|
||||||
initialLeaseTableReadCapacity int
|
InitialLeaseTableReadCapacity int
|
||||||
|
|
||||||
// Write capacity to provision when creating the lease table.
|
// Write capacity to provision when creating the lease table.
|
||||||
initialLeaseTableWriteCapacity int
|
InitialLeaseTableWriteCapacity int
|
||||||
|
|
||||||
// Worker should skip syncing shards and leases at startup if leases are present
|
// Worker should skip syncing shards and leases at startup if leases are present
|
||||||
// This is useful for optimizing deployments to large fleets working on a stable stream.
|
// This is useful for optimizing deployments to large fleets working on a stable stream.
|
||||||
skipShardSyncAtWorkerInitializationIfLeasesExist bool
|
SkipShardSyncAtWorkerInitializationIfLeasesExist bool
|
||||||
|
|
||||||
|
// The max number of threads in the worker thread pool to getRecords.
|
||||||
|
WorkerThreadPoolSize int
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -7,7 +7,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestConfig(t *testing.T) {
|
func TestConfig(t *testing.T) {
|
||||||
kclConfig := NewKinesisClientLibConfig("appName", "streamName", "workerId").
|
kclConfig := NewKinesisClientLibConfig("appName", "StreamName", "workerId").
|
||||||
WithFailoverTimeMillis(500).
|
WithFailoverTimeMillis(500).
|
||||||
WithMaxRecords(100).
|
WithMaxRecords(100).
|
||||||
WithInitialPositionInStream(TRIM_HORIZON).
|
WithInitialPositionInStream(TRIM_HORIZON).
|
||||||
|
|
@ -18,6 +18,6 @@ func TestConfig(t *testing.T) {
|
||||||
WithMetricsMaxQueueSize(200).
|
WithMetricsMaxQueueSize(200).
|
||||||
WithRegionName("us-west-2")
|
WithRegionName("us-west-2")
|
||||||
|
|
||||||
assert.Equal(t, "appName", kclConfig.applicationName)
|
assert.Equal(t, "appName", kclConfig.ApplicationName)
|
||||||
assert.Equal(t, "500", kclConfig.failoverTimeMillis)
|
assert.Equal(t, "500", kclConfig.FailoverTimeMillis)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -5,9 +5,9 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func newInitialPositionAtTimestamp(timestamp *time.Time) *InitialPositionInStreamExtended {
|
func newInitialPositionAtTimestamp(timestamp *time.Time) *InitialPositionInStreamExtended {
|
||||||
return &InitialPositionInStreamExtended{position: AT_TIMESTAMP, timestamp: timestamp}
|
return &InitialPositionInStreamExtended{Position: AT_TIMESTAMP, Timestamp: timestamp}
|
||||||
}
|
}
|
||||||
|
|
||||||
func newInitialPosition(position InitialPositionInStream) *InitialPositionInStreamExtended {
|
func newInitialPosition(position InitialPositionInStream) *InitialPositionInStreamExtended {
|
||||||
return &InitialPositionInStreamExtended{position: position, timestamp: nil}
|
return &InitialPositionInStreamExtended{Position: position, Timestamp: nil}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -7,9 +7,9 @@ import (
|
||||||
|
|
||||||
// NewKinesisClientLibConfig to create a default KinesisClientLibConfiguration based on the required fields.
|
// NewKinesisClientLibConfig to create a default KinesisClientLibConfiguration based on the required fields.
|
||||||
func NewKinesisClientLibConfig(applicationName, streamName, workerID string) *KinesisClientLibConfiguration {
|
func NewKinesisClientLibConfig(applicationName, streamName, workerID string) *KinesisClientLibConfiguration {
|
||||||
checkIsValueNotEmpty("applicationName", applicationName)
|
checkIsValueNotEmpty("ApplicationName", applicationName)
|
||||||
checkIsValueNotEmpty("streamName", streamName)
|
checkIsValueNotEmpty("StreamName", streamName)
|
||||||
checkIsValueNotEmpty("applicationName", applicationName)
|
checkIsValueNotEmpty("ApplicationName", applicationName)
|
||||||
|
|
||||||
if empty(workerID) {
|
if empty(workerID) {
|
||||||
workerID = utils.MustNewUUID()
|
workerID = utils.MustNewUUID()
|
||||||
|
|
@ -17,72 +17,73 @@ func NewKinesisClientLibConfig(applicationName, streamName, workerID string) *Ki
|
||||||
|
|
||||||
// populate the KCL configuration with default values
|
// populate the KCL configuration with default values
|
||||||
return &KinesisClientLibConfiguration{
|
return &KinesisClientLibConfiguration{
|
||||||
applicationName: applicationName,
|
ApplicationName: applicationName,
|
||||||
tableName: applicationName,
|
TableName: applicationName,
|
||||||
streamName: streamName,
|
StreamName: streamName,
|
||||||
workerID: workerID,
|
WorkerID: workerID,
|
||||||
kinesisEndpoint: "",
|
KinesisEndpoint: "",
|
||||||
initialPositionInStream: DEFAULT_INITIAL_POSITION_IN_STREAM,
|
InitialPositionInStream: DEFAULT_INITIAL_POSITION_IN_STREAM,
|
||||||
initialPositionInStreamExtended: *newInitialPosition(DEFAULT_INITIAL_POSITION_IN_STREAM),
|
InitialPositionInStreamExtended: *newInitialPosition(DEFAULT_INITIAL_POSITION_IN_STREAM),
|
||||||
failoverTimeMillis: DEFAULT_FAILOVER_TIME_MILLIS,
|
FailoverTimeMillis: DEFAULT_FAILOVER_TIME_MILLIS,
|
||||||
maxRecords: DEFAULT_MAX_RECORDS,
|
MaxRecords: DEFAULT_MAX_RECORDS,
|
||||||
idleTimeBetweenReadsInMillis: DEFAULT_IDLETIME_BETWEEN_READS_MILLIS,
|
IdleTimeBetweenReadsInMillis: DEFAULT_IDLETIME_BETWEEN_READS_MILLIS,
|
||||||
callProcessRecordsEvenForEmptyRecordList: DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST,
|
CallProcessRecordsEvenForEmptyRecordList: DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST,
|
||||||
parentShardPollIntervalMillis: DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS,
|
ParentShardPollIntervalMillis: DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS,
|
||||||
shardSyncIntervalMillis: DEFAULT_SHARD_SYNC_INTERVAL_MILLIS,
|
ShardSyncIntervalMillis: DEFAULT_SHARD_SYNC_INTERVAL_MILLIS,
|
||||||
cleanupTerminatedShardsBeforeExpiry: DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION,
|
CleanupTerminatedShardsBeforeExpiry: DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION,
|
||||||
taskBackoffTimeMillis: DEFAULT_TASK_BACKOFF_TIME_MILLIS,
|
TaskBackoffTimeMillis: DEFAULT_TASK_BACKOFF_TIME_MILLIS,
|
||||||
metricsBufferTimeMillis: DEFAULT_METRICS_BUFFER_TIME_MILLIS,
|
MetricsBufferTimeMillis: DEFAULT_METRICS_BUFFER_TIME_MILLIS,
|
||||||
metricsMaxQueueSize: DEFAULT_METRICS_MAX_QUEUE_SIZE,
|
MetricsMaxQueueSize: DEFAULT_METRICS_MAX_QUEUE_SIZE,
|
||||||
validateSequenceNumberBeforeCheckpointing: DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING,
|
ValidateSequenceNumberBeforeCheckpointing: DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING,
|
||||||
regionName: "",
|
RegionName: "",
|
||||||
shutdownGraceMillis: DEFAULT_SHUTDOWN_GRACE_MILLIS,
|
ShutdownGraceMillis: DEFAULT_SHUTDOWN_GRACE_MILLIS,
|
||||||
maxLeasesForWorker: DEFAULT_MAX_LEASES_FOR_WORKER,
|
MaxLeasesForWorker: DEFAULT_MAX_LEASES_FOR_WORKER,
|
||||||
maxLeasesToStealAtOneTime: DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME,
|
MaxLeasesToStealAtOneTime: DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME,
|
||||||
initialLeaseTableReadCapacity: DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY,
|
InitialLeaseTableReadCapacity: DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY,
|
||||||
initialLeaseTableWriteCapacity: DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY,
|
InitialLeaseTableWriteCapacity: DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY,
|
||||||
skipShardSyncAtWorkerInitializationIfLeasesExist: DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
|
SkipShardSyncAtWorkerInitializationIfLeasesExist: DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
|
||||||
|
WorkerThreadPoolSize: 1,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithTableName to provide alternative lease table in DynamoDB
|
// WithTableName to provide alternative lease table in DynamoDB
|
||||||
func (c *KinesisClientLibConfiguration) WithTableName(tableName string) *KinesisClientLibConfiguration {
|
func (c *KinesisClientLibConfiguration) WithTableName(tableName string) *KinesisClientLibConfiguration {
|
||||||
c.tableName = tableName
|
c.TableName = tableName
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *KinesisClientLibConfiguration) WithKinesisEndpoint(kinesisEndpoint string) *KinesisClientLibConfiguration {
|
func (c *KinesisClientLibConfiguration) WithKinesisEndpoint(kinesisEndpoint string) *KinesisClientLibConfiguration {
|
||||||
c.kinesisEndpoint = kinesisEndpoint
|
c.KinesisEndpoint = kinesisEndpoint
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *KinesisClientLibConfiguration) WithInitialPositionInStream(initialPositionInStream InitialPositionInStream) *KinesisClientLibConfiguration {
|
func (c *KinesisClientLibConfiguration) WithInitialPositionInStream(initialPositionInStream InitialPositionInStream) *KinesisClientLibConfiguration {
|
||||||
c.initialPositionInStream = initialPositionInStream
|
c.InitialPositionInStream = initialPositionInStream
|
||||||
c.initialPositionInStreamExtended = *newInitialPosition(initialPositionInStream)
|
c.InitialPositionInStreamExtended = *newInitialPosition(initialPositionInStream)
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *KinesisClientLibConfiguration) WithTimestampAtInitialPositionInStream(timestamp *time.Time) *KinesisClientLibConfiguration {
|
func (c *KinesisClientLibConfiguration) WithTimestampAtInitialPositionInStream(timestamp *time.Time) *KinesisClientLibConfiguration {
|
||||||
c.initialPositionInStream = AT_TIMESTAMP
|
c.InitialPositionInStream = AT_TIMESTAMP
|
||||||
c.initialPositionInStreamExtended = *newInitialPositionAtTimestamp(timestamp)
|
c.InitialPositionInStreamExtended = *newInitialPositionAtTimestamp(timestamp)
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *KinesisClientLibConfiguration) WithFailoverTimeMillis(failoverTimeMillis int) *KinesisClientLibConfiguration {
|
func (c *KinesisClientLibConfiguration) WithFailoverTimeMillis(failoverTimeMillis int) *KinesisClientLibConfiguration {
|
||||||
checkIsValuePositive("FailoverTimeMillis", failoverTimeMillis)
|
checkIsValuePositive("FailoverTimeMillis", failoverTimeMillis)
|
||||||
c.failoverTimeMillis = failoverTimeMillis
|
c.FailoverTimeMillis = failoverTimeMillis
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *KinesisClientLibConfiguration) WithShardSyncIntervalMillis(shardSyncIntervalMillis int) *KinesisClientLibConfiguration {
|
func (c *KinesisClientLibConfiguration) WithShardSyncIntervalMillis(shardSyncIntervalMillis int) *KinesisClientLibConfiguration {
|
||||||
checkIsValuePositive("ShardSyncIntervalMillis", shardSyncIntervalMillis)
|
checkIsValuePositive("ShardSyncIntervalMillis", shardSyncIntervalMillis)
|
||||||
c.shardSyncIntervalMillis = shardSyncIntervalMillis
|
c.ShardSyncIntervalMillis = shardSyncIntervalMillis
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *KinesisClientLibConfiguration) WithMaxRecords(maxRecords int) *KinesisClientLibConfiguration {
|
func (c *KinesisClientLibConfiguration) WithMaxRecords(maxRecords int) *KinesisClientLibConfiguration {
|
||||||
checkIsValuePositive("MaxRecords", maxRecords)
|
checkIsValuePositive("MaxRecords", maxRecords)
|
||||||
c.maxRecords = maxRecords
|
c.MaxRecords = maxRecords
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -102,46 +103,52 @@ func (c *KinesisClientLibConfiguration) WithMaxRecords(maxRecords int) *KinesisC
|
||||||
* Metric: GetRecords.MillisBehindLatest</a>
|
* Metric: GetRecords.MillisBehindLatest</a>
|
||||||
* </p>
|
* </p>
|
||||||
*
|
*
|
||||||
* @param idleTimeBetweenReadsInMillis
|
* @param IdleTimeBetweenReadsInMillis
|
||||||
* how long to sleep between GetRecords calls when no records are returned.
|
* how long to sleep between GetRecords calls when no records are returned.
|
||||||
* @return KinesisClientLibConfiguration
|
* @return KinesisClientLibConfiguration
|
||||||
*/
|
*/
|
||||||
func (c *KinesisClientLibConfiguration) WithIdleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis int) *KinesisClientLibConfiguration {
|
func (c *KinesisClientLibConfiguration) WithIdleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis int) *KinesisClientLibConfiguration {
|
||||||
checkIsValuePositive("IdleTimeBetweenReadsInMillis", idleTimeBetweenReadsInMillis)
|
checkIsValuePositive("IdleTimeBetweenReadsInMillis", idleTimeBetweenReadsInMillis)
|
||||||
c.idleTimeBetweenReadsInMillis = idleTimeBetweenReadsInMillis
|
c.IdleTimeBetweenReadsInMillis = idleTimeBetweenReadsInMillis
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *KinesisClientLibConfiguration) WithCallProcessRecordsEvenForEmptyRecordList(callProcessRecordsEvenForEmptyRecordList bool) *KinesisClientLibConfiguration {
|
func (c *KinesisClientLibConfiguration) WithCallProcessRecordsEvenForEmptyRecordList(callProcessRecordsEvenForEmptyRecordList bool) *KinesisClientLibConfiguration {
|
||||||
c.callProcessRecordsEvenForEmptyRecordList = callProcessRecordsEvenForEmptyRecordList
|
c.CallProcessRecordsEvenForEmptyRecordList = callProcessRecordsEvenForEmptyRecordList
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *KinesisClientLibConfiguration) WithTaskBackoffTimeMillis(taskBackoffTimeMillis int) *KinesisClientLibConfiguration {
|
func (c *KinesisClientLibConfiguration) WithTaskBackoffTimeMillis(taskBackoffTimeMillis int) *KinesisClientLibConfiguration {
|
||||||
checkIsValuePositive("taskBackoffTimeMillis", taskBackoffTimeMillis)
|
checkIsValuePositive("TaskBackoffTimeMillis", taskBackoffTimeMillis)
|
||||||
c.taskBackoffTimeMillis = taskBackoffTimeMillis
|
c.TaskBackoffTimeMillis = taskBackoffTimeMillis
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithMetricsBufferTimeMillis configures Metrics are buffered for at most this long before publishing to CloudWatch
|
// WithMetricsBufferTimeMillis configures Metrics are buffered for at most this long before publishing to CloudWatch
|
||||||
func (c *KinesisClientLibConfiguration) WithMetricsBufferTimeMillis(metricsBufferTimeMillis int) *KinesisClientLibConfiguration {
|
func (c *KinesisClientLibConfiguration) WithMetricsBufferTimeMillis(metricsBufferTimeMillis int) *KinesisClientLibConfiguration {
|
||||||
checkIsValuePositive("metricsBufferTimeMillis", metricsBufferTimeMillis)
|
checkIsValuePositive("MetricsBufferTimeMillis", metricsBufferTimeMillis)
|
||||||
c.metricsBufferTimeMillis = metricsBufferTimeMillis
|
c.MetricsBufferTimeMillis = metricsBufferTimeMillis
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithMetricsMaxQueueSize configures Max number of metrics to buffer before publishing to CloudWatch
|
// WithMetricsMaxQueueSize configures Max number of metrics to buffer before publishing to CloudWatch
|
||||||
func (c *KinesisClientLibConfiguration) WithMetricsMaxQueueSize(metricsMaxQueueSize int) *KinesisClientLibConfiguration {
|
func (c *KinesisClientLibConfiguration) WithMetricsMaxQueueSize(metricsMaxQueueSize int) *KinesisClientLibConfiguration {
|
||||||
checkIsValuePositive("metricsMaxQueueSize", metricsMaxQueueSize)
|
checkIsValuePositive("MetricsMaxQueueSize", metricsMaxQueueSize)
|
||||||
c.metricsMaxQueueSize = metricsMaxQueueSize
|
c.MetricsMaxQueueSize = metricsMaxQueueSize
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithRegionName configures region for the stream
|
// WithRegionName configures region for the stream
|
||||||
func (c *KinesisClientLibConfiguration) WithRegionName(regionName string) *KinesisClientLibConfiguration {
|
func (c *KinesisClientLibConfiguration) WithRegionName(regionName string) *KinesisClientLibConfiguration {
|
||||||
checkIsValueNotEmpty("regionName", regionName)
|
checkIsValueNotEmpty("RegionName", regionName)
|
||||||
c.regionName = regionName
|
c.RegionName = regionName
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
// Getters
|
// WithWorkerThreadPoolSize configures worker thread pool size
|
||||||
|
func (c *KinesisClientLibConfiguration) WithWorkerThreadPoolSize(n int) *KinesisClientLibConfiguration {
|
||||||
|
checkIsValuePositive("WorkerThreadPoolSize", n)
|
||||||
|
c.WorkerThreadPoolSize = n
|
||||||
|
return c
|
||||||
|
}
|
||||||
|
|
||||||
|
|
|
||||||
78
src/leases/dynamoutils/dynamoutils.go
Normal file
78
src/leases/dynamoutils/dynamoutils.go
Normal file
|
|
@ -0,0 +1,78 @@
|
||||||
|
package util
|
||||||
|
|
||||||
|
import (
|
||||||
|
"strconv"
|
||||||
|
|
||||||
|
"clientlibrary/common"
|
||||||
|
"github.com/aws/aws-sdk-go/service/dynamodb"
|
||||||
|
)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Some static utility functions used by our LeaseSerializers.
|
||||||
|
*/
|
||||||
|
|
||||||
|
func CreateAttributeValueFromSS(collectionValue []*string) (*dynamodb.AttributeValue, error) {
|
||||||
|
if len(collectionValue) == 0 {
|
||||||
|
return nil, common.IllegalArgumentError.MakeErr().WithDetail("Collection attributeValues cannot be null or empty.")
|
||||||
|
}
|
||||||
|
|
||||||
|
attrib := &dynamodb.AttributeValue{}
|
||||||
|
attrib.SetSS(collectionValue)
|
||||||
|
|
||||||
|
return attrib, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func CreateAttributeValueFromString(stringValue string) (*dynamodb.AttributeValue, error) {
|
||||||
|
if len(stringValue) == 0 {
|
||||||
|
return nil, common.IllegalArgumentError.MakeErr().WithDetail("String attributeValues cannot be null or empty.")
|
||||||
|
}
|
||||||
|
|
||||||
|
attrib := &dynamodb.AttributeValue{}
|
||||||
|
attrib.SetS(stringValue)
|
||||||
|
|
||||||
|
return attrib, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func CreateAttributeValueFromLong(longValue int64) (*dynamodb.AttributeValue, error) {
|
||||||
|
attrib := &dynamodb.AttributeValue{}
|
||||||
|
attrib.SetN(strconv.FormatInt(longValue, 10))
|
||||||
|
|
||||||
|
return attrib, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func SafeGetLong(dynamoRecord map[string]*dynamodb.AttributeValue, key string) int64 {
|
||||||
|
av := dynamoRecord[key]
|
||||||
|
|
||||||
|
if av == nil || av.N == nil {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
var val int64
|
||||||
|
val, err := strconv.ParseInt(*av.N, 10, 64)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
return val
|
||||||
|
}
|
||||||
|
|
||||||
|
func SafeGetString(dynamoRecord map[string]*dynamodb.AttributeValue, key string) *string {
|
||||||
|
av := dynamoRecord[key]
|
||||||
|
if av == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return av.S
|
||||||
|
}
|
||||||
|
|
||||||
|
func SafeGetSS(dynamoRecord map[string]*dynamodb.AttributeValue, key string) []*string {
|
||||||
|
av := dynamoRecord[key]
|
||||||
|
|
||||||
|
if av == nil {
|
||||||
|
var emptyslice []*string
|
||||||
|
return emptyslice
|
||||||
|
}
|
||||||
|
|
||||||
|
return av.SS
|
||||||
|
}
|
||||||
116
src/leases/impl/kinesis-client-lease.go
Normal file
116
src/leases/impl/kinesis-client-lease.go
Normal file
|
|
@ -0,0 +1,116 @@
|
||||||
|
package impl
|
||||||
|
|
||||||
|
import (
|
||||||
|
. "clientlibrary/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
// KinesisClientLease is a Lease subclass containing KinesisClientLibrary related fields for checkpoints.
|
||||||
|
type KinesisClientLease struct {
|
||||||
|
checkpoint *ExtendedSequenceNumber
|
||||||
|
pendingCheckpoint *ExtendedSequenceNumber
|
||||||
|
ownerSwitchesSinceCheckpoint int64
|
||||||
|
parentShardIds *[]string
|
||||||
|
|
||||||
|
// coreLease to hold lease information
|
||||||
|
// Note: golang doesn't support inheritance, use composition instead.
|
||||||
|
coreLease Lease
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetCheckpoint returns most recently application-supplied checkpoint value. During fail over, the new worker
|
||||||
|
// will pick up after the old worker's last checkpoint.
|
||||||
|
func (l *KinesisClientLease) GetCheckpoint() *ExtendedSequenceNumber {
|
||||||
|
return l.checkpoint
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetPendingCheckpoint returns pending checkpoint, possibly null.
|
||||||
|
func (l *KinesisClientLease) GetPendingCheckpoint() *ExtendedSequenceNumber {
|
||||||
|
return l.pendingCheckpoint
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetOwnerSwitchesSinceCheckpoint counts of distinct lease holders between checkpoints.
|
||||||
|
func (l *KinesisClientLease) GetOwnerSwitchesSinceCheckpoint() int64 {
|
||||||
|
return l.ownerSwitchesSinceCheckpoint
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetParentShardIds returns shardIds that parent this lease. Used for resharding.
|
||||||
|
func (l *KinesisClientLease) GetParentShardIds() *[]string {
|
||||||
|
return l.parentShardIds
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetCheckpoint
|
||||||
|
func (l *KinesisClientLease) SetCheckpoint(checkpoint *ExtendedSequenceNumber) {
|
||||||
|
l.checkpoint = checkpoint
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetPendingCheckpoint
|
||||||
|
func (l *KinesisClientLease) SetPendingCheckpoint(pendingCheckpoint *ExtendedSequenceNumber) {
|
||||||
|
l.pendingCheckpoint = pendingCheckpoint
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetOwnerSwitchesSinceCheckpoint
|
||||||
|
func (l *KinesisClientLease) SetOwnerSwitchesSinceCheckpoint(ownerSwitchesSinceCheckpoint int64) {
|
||||||
|
l.ownerSwitchesSinceCheckpoint = ownerSwitchesSinceCheckpoint
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetParentShardIds
|
||||||
|
func (l *KinesisClientLease) SetParentShardIds(parentShardIds *[]string) {
|
||||||
|
l.parentShardIds = parentShardIds
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetLeaseKey retrieves leaseKey - identifies the unit of work associated with this lease.
|
||||||
|
func (l *KinesisClientLease) GetLeaseKey() string {
|
||||||
|
return l.coreLease.GetLeaseKey()
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetLeaseOwner gets current owner of the lease, may be "".
|
||||||
|
func (l *KinesisClientLease) GetLeaseOwner() string {
|
||||||
|
return l.coreLease.GetLeaseOwner()
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetLeaseCounter retrieves leaseCounter which is incremented periodically by the holder of the lease. Used for optimistic locking.
|
||||||
|
func (l *KinesisClientLease) GetLeaseCounter() int64 {
|
||||||
|
return l.coreLease.GetLeaseCounter()
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetConcurrencyToken returns concurrency token
|
||||||
|
func (l *KinesisClientLease) GetConcurrencyToken() string {
|
||||||
|
return l.coreLease.GetConcurrencyToken()
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetLastCounterIncrementNanos returns concurrency token
|
||||||
|
func (l *KinesisClientLease) GetLastCounterIncrementNanos() int64 {
|
||||||
|
return l.coreLease.GetLastCounterIncrementNanos()
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetLeaseKey sets leaseKey - LeaseKey is immutable once set.
|
||||||
|
func (l *KinesisClientLease) SetLeaseKey(leaseKey string) error {
|
||||||
|
return l.coreLease.SetLeaseKey(leaseKey)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetLeaseOwner set current owner of the lease, may be "".
|
||||||
|
func (l *KinesisClientLease) SetLeaseOwner(leaseOwner string) {
|
||||||
|
l.coreLease.SetLeaseOwner(leaseOwner)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetLeaseCounter sets leaseCounter which is incremented periodically by the holder of the lease. Used for optimistic locking.
|
||||||
|
func (l *KinesisClientLease) SetLeaseCounter(leaseCounter int64) {
|
||||||
|
l.coreLease.SetLeaseCounter(leaseCounter)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetConcurrencyToken
|
||||||
|
func (l *KinesisClientLease) SetConcurrencyToken(concurrencyToken string) {
|
||||||
|
l.coreLease.SetConcurrencyToken(concurrencyToken)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetLastCounterIncrementNanos returns concurrency token
|
||||||
|
func (l *KinesisClientLease) SetLastCounterIncrementNanos(lastCounterIncrementNanos int64) {
|
||||||
|
l.coreLease.SetLastCounterIncrementNanos(lastCounterIncrementNanos)
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsExpired to check whether lease expired using
|
||||||
|
// @param leaseDurationNanos duration of lease in nanoseconds
|
||||||
|
// @param asOfNanos time in nanoseconds to check expiration as-of
|
||||||
|
// @return true if lease is expired as-of given time, false otherwise
|
||||||
|
func (l *KinesisClientLease) IsExpired(leaseDurationNanos, asOfNanos int64) bool {
|
||||||
|
return l.coreLease.IsExpired(leaseDurationNanos, asOfNanos)
|
||||||
|
}
|
||||||
440
src/leases/impl/lease-manager.go
Normal file
440
src/leases/impl/lease-manager.go
Normal file
|
|
@ -0,0 +1,440 @@
|
||||||
|
package impl
|
||||||
|
|
||||||
|
import (
|
||||||
|
"log"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/aws/aws-sdk-go/aws"
|
||||||
|
"github.com/aws/aws-sdk-go/service/dynamodb"
|
||||||
|
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface"
|
||||||
|
|
||||||
|
. "leases/interfaces"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// CREATING - The table is being created.
|
||||||
|
TABLE_CREATING = "CREATING"
|
||||||
|
|
||||||
|
// UPDATING - The table is being updated.
|
||||||
|
TABLE_UPDATING = "UPDATING"
|
||||||
|
|
||||||
|
// DELETING - The table is being deleted.
|
||||||
|
TABLE_DELETING = "DELETING"
|
||||||
|
|
||||||
|
// ACTIVE - The table is ready for use.
|
||||||
|
TABLE_ACTIVE = "ACTIVE"
|
||||||
|
)
|
||||||
|
|
||||||
|
// LeaseManager is an implementation of ILeaseManager that uses DynamoDB.
|
||||||
|
type LeaseManager struct {
|
||||||
|
tableName string
|
||||||
|
dynamoDBClient dynamodbiface.DynamoDBAPI
|
||||||
|
serializer ILeaseSerializer
|
||||||
|
consistentReads bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewLeaseManager(tableName string, dynamoDBClient dynamodbiface.DynamoDBAPI, serializer ILeaseSerializer) *LeaseManager {
|
||||||
|
return &LeaseManager{
|
||||||
|
tableName: tableName,
|
||||||
|
dynamoDBClient: dynamoDBClient,
|
||||||
|
serializer: serializer,
|
||||||
|
consistentReads: false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates the table that will store leases. Succeeds if table already exists.
|
||||||
|
*
|
||||||
|
* @param readCapacity
|
||||||
|
* @param writeCapacity
|
||||||
|
*
|
||||||
|
* @return true if we created a new table (table didn't exist before)
|
||||||
|
*
|
||||||
|
* @error ProvisionedThroughputError if we cannot create the lease table due to per-AWS-account capacity
|
||||||
|
* restrictions.
|
||||||
|
* @error LeasingDependencyError if DynamoDB createTable fails in an unexpected way
|
||||||
|
*/
|
||||||
|
func (l *LeaseManager) CreateLeaseTableIfNotExists(readCapacity, writeCapacity int64) (bool, error) {
|
||||||
|
status, _ := l.tableStatus()
|
||||||
|
|
||||||
|
if status != nil {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
input := &dynamodb.CreateTableInput{
|
||||||
|
AttributeDefinitions: l.serializer.GetAttributeDefinitions(),
|
||||||
|
KeySchema: l.serializer.GetKeySchema(),
|
||||||
|
ProvisionedThroughput: &dynamodb.ProvisionedThroughput{
|
||||||
|
ReadCapacityUnits: aws.Int64(readCapacity),
|
||||||
|
WriteCapacityUnits: aws.Int64(writeCapacity),
|
||||||
|
},
|
||||||
|
TableName: aws.String(l.tableName),
|
||||||
|
}
|
||||||
|
_, err := l.dynamoDBClient.CreateTable(input)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return true if the lease table already exists.
|
||||||
|
*
|
||||||
|
* @error LeasingDependencyError if DynamoDB describeTable fails in an unexpected way
|
||||||
|
*/
|
||||||
|
func (l *LeaseManager) LeaseTableExists() (bool, error) {
|
||||||
|
status, _ := l.tableStatus()
|
||||||
|
|
||||||
|
if status != nil || aws.StringValue(status) == TABLE_ACTIVE {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Blocks until the lease table exists by polling leaseTableExists.
|
||||||
|
*
|
||||||
|
* @param secondsBetweenPolls time to wait between polls in seconds
|
||||||
|
* @param timeoutSeconds total time to wait in seconds
|
||||||
|
*
|
||||||
|
* @return true if table exists, false if timeout was reached
|
||||||
|
*
|
||||||
|
* @error LeasingDependencyError if DynamoDB describeTable fails in an unexpected way
|
||||||
|
*/
|
||||||
|
func (l *LeaseManager) WaitUntilLeaseTableExists(secondsBetweenPolls, timeoutSeconds int64) (bool, error) {
|
||||||
|
delay := time.Duration(secondsBetweenPolls) * time.Second
|
||||||
|
deadline := time.Now().Add(time.Duration(timeoutSeconds) * time.Second)
|
||||||
|
|
||||||
|
var err error
|
||||||
|
for time.Now().Before(deadline) {
|
||||||
|
flag := false
|
||||||
|
flag, err = l.LeaseTableExists()
|
||||||
|
|
||||||
|
if flag {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(delay)
|
||||||
|
}
|
||||||
|
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* List all objects in table synchronously.
|
||||||
|
*
|
||||||
|
* @error LeasingDependencyError if DynamoDB scan fails in an unexpected way
|
||||||
|
* @error LeasingInvalidStateError if lease table does not exist
|
||||||
|
* @error ProvisionedThroughputError if DynamoDB scan fails due to lack of capacity
|
||||||
|
*
|
||||||
|
* @return list of leases
|
||||||
|
*/
|
||||||
|
func (l *LeaseManager) ListLeases() ([]ILease, error) {
|
||||||
|
return l.list(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new lease. Conditional on a lease not already existing with this shardId.
|
||||||
|
*
|
||||||
|
* @param lease the lease to create
|
||||||
|
*
|
||||||
|
* @return true if lease was created, false if lease already exists
|
||||||
|
*
|
||||||
|
* @error LeasingDependencyError if DynamoDB put fails in an unexpected way
|
||||||
|
* @error LeasingInvalidStateError if lease table does not exist
|
||||||
|
* @error ProvisionedThroughputError if DynamoDB put fails due to lack of capacity
|
||||||
|
*/
|
||||||
|
func (l *LeaseManager) CreateLeaseIfNotExists(lease ILease) (bool, error) {
|
||||||
|
input := &dynamodb.PutItemInput{
|
||||||
|
TableName: aws.String(l.tableName),
|
||||||
|
Item: l.serializer.ToDynamoRecord(lease),
|
||||||
|
Expected: l.serializer.GetDynamoNonexistantExpectation(),
|
||||||
|
}
|
||||||
|
_, err := l.dynamoDBClient.PutItem(input)
|
||||||
|
return err != nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param shardId Get the lease for this shardId and it is the leaseKey
|
||||||
|
*
|
||||||
|
* @error LeasingInvalidStateError if lease table does not exist
|
||||||
|
* @error ProvisionedThroughputError if DynamoDB get fails due to lack of capacity
|
||||||
|
* @error LeasingDependencyError if DynamoDB get fails in an unexpected way
|
||||||
|
*
|
||||||
|
* @return lease for the specified shardId, or null if one doesn't exist
|
||||||
|
*/
|
||||||
|
func (l *LeaseManager) GetLease(shardId string) (ILease, error) {
|
||||||
|
input := &dynamodb.GetItemInput{
|
||||||
|
TableName: aws.String(l.tableName),
|
||||||
|
Key: l.serializer.GetDynamoHashKey(shardId),
|
||||||
|
ConsistentRead: aws.Bool(l.consistentReads),
|
||||||
|
}
|
||||||
|
result, err := l.dynamoDBClient.GetItem(input)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
dynamoRecord := result.Item
|
||||||
|
if dynamoRecord == nil {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
lease := l.serializer.FromDynamoRecord(dynamoRecord)
|
||||||
|
return lease, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Renew a lease by incrementing the lease counter. Conditional on the leaseCounter in DynamoDB matching the leaseCounter
|
||||||
|
* of the input. Mutates the leaseCounter of the passed-in lease object after updating the record in DynamoDB.
|
||||||
|
*
|
||||||
|
* @param lease the lease to renew
|
||||||
|
*
|
||||||
|
* @return true if renewal succeeded, false otherwise
|
||||||
|
*
|
||||||
|
* @error LeasingInvalidStateError if lease table does not exist
|
||||||
|
* @error ProvisionedThroughputError if DynamoDB update fails due to lack of capacity
|
||||||
|
* @error LeasingDependencyError if DynamoDB update fails in an unexpected way
|
||||||
|
*/
|
||||||
|
func (l *LeaseManager) RenewLease(lease ILease) (bool, error) {
|
||||||
|
input := &dynamodb.UpdateItemInput{
|
||||||
|
TableName: aws.String(l.tableName),
|
||||||
|
Key: l.serializer.GetDynamoHashKey(lease.GetLeaseKey()),
|
||||||
|
Expected: l.serializer.GetDynamoLeaseCounterExpectation(lease),
|
||||||
|
}
|
||||||
|
_, err := l.dynamoDBClient.UpdateItem(input)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
// If we had a spurious retry during the Dynamo update, then this conditional PUT failure
|
||||||
|
// might be incorrect. So, we get the item straight away and check if the lease owner + lease counter
|
||||||
|
// are what we expected.
|
||||||
|
expectedOwner := lease.GetLeaseOwner()
|
||||||
|
expectedCounter := lease.GetLeaseCounter() + 1
|
||||||
|
updatedLease, _ := l.GetLease(lease.GetLeaseKey())
|
||||||
|
if updatedLease == nil || expectedOwner != updatedLease.GetLeaseOwner() ||
|
||||||
|
expectedCounter != updatedLease.GetLeaseCounter() {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Println("Detected spurious renewal failure for lease with key " + lease.GetLeaseKey() + ", but recovered")
|
||||||
|
}
|
||||||
|
|
||||||
|
lease.SetLeaseCounter(lease.GetLeaseCounter() + 1)
|
||||||
|
return err != nil, err
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Take a lease for the given owner by incrementing its leaseCounter and setting its owner field. Conditional on
|
||||||
|
* the leaseCounter in DynamoDB matching the leaseCounter of the input. Mutates the leaseCounter and owner of the
|
||||||
|
* passed-in lease object after updating DynamoDB.
|
||||||
|
*
|
||||||
|
* @param lease the lease to take
|
||||||
|
* @param owner the new owner
|
||||||
|
*
|
||||||
|
* @return true if lease was successfully taken, false otherwise
|
||||||
|
*
|
||||||
|
* @error LeasingInvalidStateError if lease table does not exist
|
||||||
|
* @error ProvisionedThroughputError if DynamoDB update fails due to lack of capacity
|
||||||
|
* @error LeasingDependencyError if DynamoDB update fails in an unexpected way
|
||||||
|
*/
|
||||||
|
func (l *LeaseManager) TakeLease(lease ILease, owner string) (bool, error) {
|
||||||
|
input := &dynamodb.UpdateItemInput{
|
||||||
|
TableName: aws.String(l.tableName),
|
||||||
|
Key: l.serializer.GetDynamoHashKey(lease.GetLeaseKey()),
|
||||||
|
Expected: l.serializer.GetDynamoLeaseCounterExpectation(lease),
|
||||||
|
}
|
||||||
|
|
||||||
|
updates := l.serializer.GetDynamoLeaseCounterUpdate(lease)
|
||||||
|
|
||||||
|
// putAll to updates
|
||||||
|
for k, v := range l.serializer.GetDynamoTakeLeaseUpdate(lease, owner) {
|
||||||
|
updates[k] = v
|
||||||
|
}
|
||||||
|
input.SetAttributeUpdates(updates)
|
||||||
|
_, err := l.dynamoDBClient.UpdateItem(input)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
lease.SetLeaseCounter(lease.GetLeaseCounter() + 1)
|
||||||
|
lease.SetLeaseOwner(owner)
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Evict the current owner of lease by setting owner to null. Conditional on the owner in DynamoDB matching the owner of
|
||||||
|
* the input. Mutates the lease counter and owner of the passed-in lease object after updating the record in DynamoDB.
|
||||||
|
*
|
||||||
|
* @param lease the lease to void
|
||||||
|
*
|
||||||
|
* @return true if eviction succeeded, false otherwise
|
||||||
|
*
|
||||||
|
* @error LeasingInvalidStateError if lease table does not exist
|
||||||
|
* @error ProvisionedThroughputError if DynamoDB update fails due to lack of capacity
|
||||||
|
* @error LeasingDependencyError if DynamoDB update fails in an unexpected way
|
||||||
|
*/
|
||||||
|
func (l *LeaseManager) EvictLease(lease ILease) (bool, error) {
|
||||||
|
input := &dynamodb.UpdateItemInput{
|
||||||
|
TableName: aws.String(l.tableName),
|
||||||
|
Key: l.serializer.GetDynamoHashKey(lease.GetLeaseKey()),
|
||||||
|
Expected: l.serializer.GetDynamoLeaseCounterExpectation(lease),
|
||||||
|
}
|
||||||
|
|
||||||
|
updates := l.serializer.GetDynamoLeaseCounterUpdate(lease)
|
||||||
|
|
||||||
|
// putAll to updates
|
||||||
|
for k, v := range l.serializer.GetDynamoEvictLeaseUpdate(lease) {
|
||||||
|
updates[k] = v
|
||||||
|
}
|
||||||
|
input.SetAttributeUpdates(updates)
|
||||||
|
_, err := l.dynamoDBClient.UpdateItem(input)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
lease.SetLeaseCounter(lease.GetLeaseCounter() + 1)
|
||||||
|
lease.SetLeaseOwner("")
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Delete the given lease from DynamoDB. Does nothing when passed a lease that does not exist in DynamoDB.
|
||||||
|
*
|
||||||
|
* @param lease the lease to delete
|
||||||
|
*
|
||||||
|
* @error LeasingInvalidStateError if lease table does not exist
|
||||||
|
* @error ProvisionedThroughputError if DynamoDB delete fails due to lack of capacity
|
||||||
|
* @error LeasingDependencyError if DynamoDB delete fails in an unexpected way
|
||||||
|
*/
|
||||||
|
func (l *LeaseManager) DeleteLease(lease ILease) error {
|
||||||
|
input := &dynamodb.DeleteItemInput{
|
||||||
|
TableName: aws.String(l.tableName),
|
||||||
|
Key: l.serializer.GetDynamoHashKey(lease.GetLeaseKey()),
|
||||||
|
}
|
||||||
|
_, err := l.dynamoDBClient.DeleteItem(input)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Delete all leases from DynamoDB. Useful for tools/utils and testing.
|
||||||
|
*
|
||||||
|
* @error LeasingInvalidStateError if lease table does not exist
|
||||||
|
* @error ProvisionedThroughputError if DynamoDB scan or delete fail due to lack of capacity
|
||||||
|
* @error LeasingDependencyError if DynamoDB scan or delete fail in an unexpected way
|
||||||
|
*/
|
||||||
|
func (l *LeaseManager) DeleteAll() error {
|
||||||
|
allLeases, err := l.ListLeases()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, v := range allLeases {
|
||||||
|
err := l.DeleteLease(v)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update application-specific fields of the given lease in DynamoDB. Does not update fields managed by the leasing
|
||||||
|
* library such as leaseCounter, leaseOwner, or leaseKey. Conditional on the leaseCounter in DynamoDB matching the
|
||||||
|
* leaseCounter of the input. Increments the lease counter in DynamoDB so that updates can be contingent on other
|
||||||
|
* updates. Mutates the lease counter of the passed-in lease object.
|
||||||
|
*
|
||||||
|
* @return true if update succeeded, false otherwise
|
||||||
|
*
|
||||||
|
* @error LeasingInvalidStateError if lease table does not exist
|
||||||
|
* @error ProvisionedThroughputError if DynamoDB update fails due to lack of capacity
|
||||||
|
* @error LeasingDependencyError if DynamoDB update fails in an unexpected way
|
||||||
|
*/
|
||||||
|
func (l *LeaseManager) UpdateLease(lease ILease) (bool, error) {
|
||||||
|
input := &dynamodb.UpdateItemInput{
|
||||||
|
TableName: aws.String(l.tableName),
|
||||||
|
Key: l.serializer.GetDynamoHashKey(lease.GetLeaseKey()),
|
||||||
|
Expected: l.serializer.GetDynamoLeaseCounterExpectation(lease),
|
||||||
|
}
|
||||||
|
|
||||||
|
updates := l.serializer.GetDynamoLeaseCounterUpdate(lease)
|
||||||
|
|
||||||
|
// putAll to updates
|
||||||
|
for k, v := range l.serializer.GetDynamoUpdateLeaseUpdate(lease) {
|
||||||
|
updates[k] = v
|
||||||
|
}
|
||||||
|
input.SetAttributeUpdates(updates)
|
||||||
|
_, err := l.dynamoDBClient.UpdateItem(input)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
lease.SetLeaseCounter(lease.GetLeaseCounter() + 1)
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check (synchronously) if there are any leases in the lease table.
|
||||||
|
*
|
||||||
|
* @return true if there are no leases in the lease table
|
||||||
|
*
|
||||||
|
* @error LeasingDependencyError if DynamoDB scan fails in an unexpected way
|
||||||
|
* @error LeasingInvalidStateError if lease table does not exist
|
||||||
|
* @error ProvisionedThroughputError if DynamoDB scan fails due to lack of capacity
|
||||||
|
*/
|
||||||
|
func (l *LeaseManager) IsLeaseTableEmpty() (bool, error) {
|
||||||
|
result, err := l.list(1)
|
||||||
|
if err != nil {
|
||||||
|
return true, err
|
||||||
|
}
|
||||||
|
return len(result) > 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// tableStatus check the current lease table status
|
||||||
|
func (l *LeaseManager) tableStatus() (*string, error) {
|
||||||
|
input := &dynamodb.DescribeTableInput{
|
||||||
|
TableName: aws.String(l.tableName),
|
||||||
|
}
|
||||||
|
|
||||||
|
result, err := l.dynamoDBClient.DescribeTable(input)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return result.Table.TableStatus, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// List with the given page size (number of items to consider at a time). Package access for integration testing.
|
||||||
|
func (l *LeaseManager) list(limit int64) ([]ILease, error) {
|
||||||
|
input := &dynamodb.ScanInput{
|
||||||
|
TableName: aws.String(l.tableName),
|
||||||
|
}
|
||||||
|
|
||||||
|
if limit > 0 {
|
||||||
|
input.SetLimit(limit)
|
||||||
|
}
|
||||||
|
|
||||||
|
result := []ILease{}
|
||||||
|
|
||||||
|
for {
|
||||||
|
scanResult, err := l.dynamoDBClient.Scan(input)
|
||||||
|
if err != nil || scanResult == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, v := range scanResult.Items {
|
||||||
|
result = append(result, l.serializer.FromDynamoRecord(v))
|
||||||
|
}
|
||||||
|
|
||||||
|
lastEvaluatedKey := scanResult.LastEvaluatedKey
|
||||||
|
if lastEvaluatedKey == nil {
|
||||||
|
scanResult = nil
|
||||||
|
break
|
||||||
|
} else {
|
||||||
|
input.SetExclusiveStartKey(lastEvaluatedKey)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
184
src/leases/impl/lease-serializer.go
Normal file
184
src/leases/impl/lease-serializer.go
Normal file
|
|
@ -0,0 +1,184 @@
|
||||||
|
package impl
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/aws/aws-sdk-go/aws"
|
||||||
|
"github.com/aws/aws-sdk-go/service/dynamodb"
|
||||||
|
|
||||||
|
dynamoutils "leases/dynamoutils"
|
||||||
|
. "leases/interfaces"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
LEASE_KEY_KEY = "leaseKey"
|
||||||
|
LEASE_OWNER_KEY = "leaseOwner"
|
||||||
|
LEASE_COUNTER_KEY = "leaseCounter"
|
||||||
|
)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An implementation of ILeaseSerializer for basic Lease objects. Can also instantiate subclasses of Lease so that
|
||||||
|
* LeaseSerializer can be decorated by other classes if you need to add fields to leases.
|
||||||
|
*/
|
||||||
|
type LeaseSerializer struct {
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Construct a DynamoDB record out of a Lease object
|
||||||
|
*
|
||||||
|
* @param lease lease object to serialize
|
||||||
|
* @return an attribute value map representing the lease object
|
||||||
|
*/
|
||||||
|
func (lc *LeaseSerializer) ToDynamoRecord(lease ILease) map[string]*dynamodb.AttributeValue {
|
||||||
|
result := map[string]*dynamodb.AttributeValue{}
|
||||||
|
|
||||||
|
result[LEASE_KEY_KEY], _ = dynamoutils.CreateAttributeValueFromString(lease.GetLeaseKey())
|
||||||
|
result[LEASE_COUNTER_KEY], _ = dynamoutils.CreateAttributeValueFromLong(lease.GetLeaseCounter())
|
||||||
|
|
||||||
|
if len(lease.GetLeaseOwner()) > 0 {
|
||||||
|
result[LEASE_OWNER_KEY], _ = dynamoutils.CreateAttributeValueFromString(lease.GetLeaseOwner())
|
||||||
|
}
|
||||||
|
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Construct a Lease object out of a DynamoDB record.
|
||||||
|
*
|
||||||
|
* @param dynamoRecord attribute value map from DynamoDB
|
||||||
|
* @return a deserialized lease object representing the attribute value map
|
||||||
|
*/
|
||||||
|
func (lc *LeaseSerializer) FromDynamoRecord(dynamoRecord map[string]*dynamodb.AttributeValue) ILease {
|
||||||
|
result := &Lease{}
|
||||||
|
|
||||||
|
result.SetLeaseKey(aws.StringValue(dynamoutils.SafeGetString(dynamoRecord, LEASE_KEY_KEY)))
|
||||||
|
result.SetLeaseOwner(aws.StringValue(dynamoutils.SafeGetString(dynamoRecord, LEASE_OWNER_KEY)))
|
||||||
|
result.SetLeaseCounter(dynamoutils.SafeGetLong(dynamoRecord, LEASE_COUNTER_KEY))
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Special getDynamoHashKey implementation used by ILeaseManager.getLease().
|
||||||
|
*
|
||||||
|
* @param leaseKey
|
||||||
|
* @return the attribute value map representing a Lease's hash key given a string.
|
||||||
|
*/
|
||||||
|
func (lc *LeaseSerializer) GetDynamoHashKey(leaseKey string) map[string]*dynamodb.AttributeValue {
|
||||||
|
result := map[string]*dynamodb.AttributeValue{}
|
||||||
|
result[LEASE_KEY_KEY], _ = dynamoutils.CreateAttributeValueFromString(leaseKey)
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param lease
|
||||||
|
* @return the attribute value map asserting that a lease counter is what we expect.
|
||||||
|
*/
|
||||||
|
func (lc *LeaseSerializer) GetDynamoLeaseCounterExpectation(lease ILease) map[string]*dynamodb.ExpectedAttributeValue {
|
||||||
|
result := map[string]*dynamodb.ExpectedAttributeValue{}
|
||||||
|
expectedAV := &dynamodb.ExpectedAttributeValue{}
|
||||||
|
val, _ := dynamoutils.CreateAttributeValueFromLong(lease.GetLeaseCounter())
|
||||||
|
expectedAV.SetValue(val)
|
||||||
|
result[LEASE_COUNTER_KEY] = expectedAV
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param lease
|
||||||
|
* @return the attribute value map asserting that the lease owner is what we expect.
|
||||||
|
*/
|
||||||
|
func (lc *LeaseSerializer) GetDynamoLeaseOwnerExpectation(lease ILease) map[string]*dynamodb.ExpectedAttributeValue {
|
||||||
|
result := map[string]*dynamodb.ExpectedAttributeValue{}
|
||||||
|
expectedAV := &dynamodb.ExpectedAttributeValue{}
|
||||||
|
val, _ := dynamoutils.CreateAttributeValueFromString(lease.GetLeaseOwner())
|
||||||
|
expectedAV.SetValue(val)
|
||||||
|
result[LEASE_OWNER_KEY] = expectedAV
|
||||||
|
return result
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the attribute value map asserting that a lease does not exist.
|
||||||
|
*/
|
||||||
|
func (lc *LeaseSerializer) GetDynamoNonexistantExpectation() map[string]*dynamodb.ExpectedAttributeValue {
|
||||||
|
result := map[string]*dynamodb.ExpectedAttributeValue{}
|
||||||
|
expectedAV := &dynamodb.ExpectedAttributeValue{}
|
||||||
|
expectedAV.SetExists(false)
|
||||||
|
result[LEASE_KEY_KEY] = expectedAV
|
||||||
|
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param lease
|
||||||
|
* @return the attribute value map that increments a lease counter
|
||||||
|
*/
|
||||||
|
func (lc *LeaseSerializer) GetDynamoLeaseCounterUpdate(lease ILease) map[string]*dynamodb.AttributeValueUpdate {
|
||||||
|
result := map[string]*dynamodb.AttributeValueUpdate{}
|
||||||
|
updatedAV := &dynamodb.AttributeValueUpdate{}
|
||||||
|
// Increase the lease counter by 1
|
||||||
|
val, _ := dynamoutils.CreateAttributeValueFromLong(lease.GetLeaseCounter() + 1)
|
||||||
|
updatedAV.SetValue(val)
|
||||||
|
updatedAV.SetAction(dynamodb.AttributeActionPut)
|
||||||
|
result[LEASE_COUNTER_KEY] = updatedAV
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param lease
|
||||||
|
* @param newOwner
|
||||||
|
* @return the attribute value map that takes a lease for a new owner
|
||||||
|
*/
|
||||||
|
func (lc *LeaseSerializer) GetDynamoTakeLeaseUpdate(lease ILease, newOwner string) map[string]*dynamodb.AttributeValueUpdate {
|
||||||
|
result := map[string]*dynamodb.AttributeValueUpdate{}
|
||||||
|
updatedAV := &dynamodb.AttributeValueUpdate{}
|
||||||
|
val, _ := dynamoutils.CreateAttributeValueFromString(lease.GetLeaseOwner())
|
||||||
|
updatedAV.SetValue(val)
|
||||||
|
updatedAV.SetAction(dynamodb.AttributeActionPut)
|
||||||
|
result[LEASE_OWNER_KEY] = updatedAV
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param lease
|
||||||
|
* @return the attribute value map that voids a lease
|
||||||
|
*/
|
||||||
|
func (lc *LeaseSerializer) GetDynamoEvictLeaseUpdate(lease ILease) map[string]*dynamodb.AttributeValueUpdate {
|
||||||
|
result := map[string]*dynamodb.AttributeValueUpdate{}
|
||||||
|
updatedAV := &dynamodb.AttributeValueUpdate{}
|
||||||
|
updatedAV.SetValue(nil)
|
||||||
|
updatedAV.SetAction(dynamodb.AttributeActionDelete)
|
||||||
|
result[LEASE_OWNER_KEY] = updatedAV
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param lease
|
||||||
|
* @return the attribute value map that updates application-specific data for a lease and increments the lease
|
||||||
|
* counter
|
||||||
|
*/
|
||||||
|
func (lc *LeaseSerializer) GetDynamoUpdateLeaseUpdate(lease ILease) map[string]*dynamodb.AttributeValueUpdate {
|
||||||
|
result := map[string]*dynamodb.AttributeValueUpdate{}
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the key schema for creating a DynamoDB table to store leases
|
||||||
|
*/
|
||||||
|
func (lc *LeaseSerializer) GetKeySchema() []*dynamodb.KeySchemaElement {
|
||||||
|
keySchema := []*dynamodb.KeySchemaElement{}
|
||||||
|
schemaElement := &dynamodb.KeySchemaElement{}
|
||||||
|
schemaElement.SetAttributeName(LEASE_KEY_KEY)
|
||||||
|
schemaElement.SetKeyType(dynamodb.KeyTypeHash)
|
||||||
|
keySchema = append(keySchema, schemaElement)
|
||||||
|
return keySchema
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return attribute definitions for creating a DynamoDB table to store leases
|
||||||
|
*/
|
||||||
|
func (lc *LeaseSerializer) GetAttributeDefinitions() []*dynamodb.AttributeDefinition {
|
||||||
|
definitions := []*dynamodb.AttributeDefinition{}
|
||||||
|
definition := &dynamodb.AttributeDefinition{}
|
||||||
|
definition.SetAttributeName(LEASE_KEY_KEY)
|
||||||
|
definition.SetAttributeType(dynamodb.ScalarAttributeTypeS)
|
||||||
|
definitions = append(definitions, definition)
|
||||||
|
return definitions
|
||||||
|
}
|
||||||
113
src/leases/impl/lease.go
Normal file
113
src/leases/impl/lease.go
Normal file
|
|
@ -0,0 +1,113 @@
|
||||||
|
package impl
|
||||||
|
|
||||||
|
import (
|
||||||
|
cc "clientlibrary/common"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// We will consider leases to be expired if they are more than 90 days.
|
||||||
|
MAX_ABS_AGE_NANOS = int64(90 * 24 * time.Hour)
|
||||||
|
)
|
||||||
|
|
||||||
|
// Lease structure contains data pertaining to a Lease. Distributed systems may use leases to partition work across a
|
||||||
|
// fleet of workers. Each unit of work (identified by a leaseKey) has a corresponding Lease. Every worker will contend
|
||||||
|
// for all leases - only one worker will successfully take each one. The worker should hold the lease until it is ready to stop
|
||||||
|
// processing the corresponding unit of work, or until it fails. When the worker stops holding the lease, another worker will
|
||||||
|
// take and hold the lease.
|
||||||
|
type Lease struct {
|
||||||
|
leaseKey string
|
||||||
|
leaseOwner string
|
||||||
|
leaseCounter int64
|
||||||
|
|
||||||
|
// This field is used to prevent updates to leases that we have lost and re-acquired. It is deliberately not
|
||||||
|
// persisted in DynamoDB and excluded from hashCode and equals.
|
||||||
|
concurrencyToken string
|
||||||
|
|
||||||
|
// This field is used by LeaseRenewer and LeaseTaker to track the last time a lease counter was incremented. It is
|
||||||
|
// deliberately not persisted in DynamoDB and excluded from hashCode and equals.
|
||||||
|
lastCounterIncrementNanos int64
|
||||||
|
}
|
||||||
|
|
||||||
|
// CloneLease to clone a lease object
|
||||||
|
func CopyLease(lease *Lease) *Lease {
|
||||||
|
return &Lease{
|
||||||
|
leaseKey: lease.leaseKey,
|
||||||
|
leaseOwner: lease.leaseOwner,
|
||||||
|
leaseCounter: lease.leaseCounter,
|
||||||
|
concurrencyToken: lease.concurrencyToken,
|
||||||
|
lastCounterIncrementNanos: lease.lastCounterIncrementNanos,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetLeaseKey retrieves leaseKey - identifies the unit of work associated with this lease.
|
||||||
|
func (l *Lease) GetLeaseKey() string {
|
||||||
|
return l.leaseKey
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetLeaseOwner gets current owner of the lease, may be "".
|
||||||
|
func (l *Lease) GetLeaseOwner() string {
|
||||||
|
return l.leaseOwner
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetLeaseCounter retrieves leaseCounter which is incremented periodically by the holder of the lease. Used for optimistic locking.
|
||||||
|
func (l *Lease) GetLeaseCounter() int64 {
|
||||||
|
return l.leaseCounter
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetConcurrencyToken returns concurrency token
|
||||||
|
func (l *Lease) GetConcurrencyToken() string {
|
||||||
|
return l.concurrencyToken
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetLastCounterIncrementNanos returns concurrency token
|
||||||
|
func (l *Lease) GetLastCounterIncrementNanos() int64 {
|
||||||
|
return l.lastCounterIncrementNanos
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetLeaseKey sets leaseKey - LeaseKey is immutable once set.
|
||||||
|
func (l *Lease) SetLeaseKey(leaseKey string) error {
|
||||||
|
if len(l.leaseKey) > 0 {
|
||||||
|
return cc.IllegalArgumentError.MakeErr().WithDetail("LeaseKey is immutable once set")
|
||||||
|
}
|
||||||
|
|
||||||
|
l.leaseKey = leaseKey
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetLeaseOwner set current owner of the lease, may be "".
|
||||||
|
func (l *Lease) SetLeaseOwner(leaseOwner string) {
|
||||||
|
l.leaseOwner = leaseOwner
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetLeaseCounter sets leaseCounter which is incremented periodically by the holder of the lease. Used for optimistic locking.
|
||||||
|
func (l *Lease) SetLeaseCounter(leaseCounter int64) {
|
||||||
|
l.leaseCounter = leaseCounter
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetConcurrencyToken
|
||||||
|
func (l *Lease) SetConcurrencyToken(concurrencyToken string) {
|
||||||
|
l.concurrencyToken = concurrencyToken
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetLastCounterIncrementNanos returns concurrency token
|
||||||
|
func (l *Lease) SetLastCounterIncrementNanos(lastCounterIncrementNanos int64) {
|
||||||
|
l.lastCounterIncrementNanos = lastCounterIncrementNanos
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsExpired to check whether lease expired using
|
||||||
|
// @param leaseDurationNanos duration of lease in nanoseconds
|
||||||
|
// @param asOfNanos time in nanoseconds to check expiration as-of
|
||||||
|
// @return true if lease is expired as-of given time, false otherwise
|
||||||
|
func (l *Lease) IsExpired(leaseDurationNanos, asOfNanos int64) bool {
|
||||||
|
if l.lastCounterIncrementNanos == 0 {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
age := asOfNanos - l.lastCounterIncrementNanos
|
||||||
|
if age > MAX_ABS_AGE_NANOS {
|
||||||
|
return true
|
||||||
|
} else {
|
||||||
|
return age > leaseDurationNanos
|
||||||
|
}
|
||||||
|
}
|
||||||
162
src/leases/interfaces/lease-manager.go
Normal file
162
src/leases/interfaces/lease-manager.go
Normal file
|
|
@ -0,0 +1,162 @@
|
||||||
|
package interfaces
|
||||||
|
|
||||||
|
// ILeaseManager supports basic CRUD operations for Leases.
|
||||||
|
type ILeaseManager interface {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates the table that will store leases. Succeeds if table already exists.
|
||||||
|
*
|
||||||
|
* @param readCapacity
|
||||||
|
* @param writeCapacity
|
||||||
|
*
|
||||||
|
* @return true if we created a new table (table didn't exist before)
|
||||||
|
*
|
||||||
|
* @error ProvisionedThroughputError if we cannot create the lease table due to per-AWS-account capacity
|
||||||
|
* restrictions.
|
||||||
|
* @error LeasingDependencyError if DynamoDB createTable fails in an unexpected way
|
||||||
|
*/
|
||||||
|
CreateLeaseTableIfNotExists(readCapacity, writeCapacity int64) (bool, error)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return true if the lease table already exists.
|
||||||
|
*
|
||||||
|
* @error LeasingDependencyError if DynamoDB describeTable fails in an unexpected way
|
||||||
|
*/
|
||||||
|
LeaseTableExists() (bool, error)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Blocks until the lease table exists by polling leaseTableExists.
|
||||||
|
*
|
||||||
|
* @param secondsBetweenPolls time to wait between polls in seconds
|
||||||
|
* @param timeoutSeconds total time to wait in seconds
|
||||||
|
*
|
||||||
|
* @return true if table exists, false if timeout was reached
|
||||||
|
*
|
||||||
|
* @error LeasingDependencyError if DynamoDB describeTable fails in an unexpected way
|
||||||
|
*/
|
||||||
|
WaitUntilLeaseTableExists(secondsBetweenPolls, timeoutSeconds int64) (bool, error)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* List all objects in table synchronously.
|
||||||
|
*
|
||||||
|
* @error LeasingDependencyError if DynamoDB scan fails in an unexpected way
|
||||||
|
* @error LeasingInvalidStateError if lease table does not exist
|
||||||
|
* @error ProvisionedThroughputError if DynamoDB scan fails due to lack of capacity
|
||||||
|
*
|
||||||
|
* @return list of leases
|
||||||
|
*/
|
||||||
|
ListLeases() ([]ILease, error)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new lease. Conditional on a lease not already existing with this shardId.
|
||||||
|
*
|
||||||
|
* @param lease the lease to create
|
||||||
|
*
|
||||||
|
* @return true if lease was created, false if lease already exists
|
||||||
|
*
|
||||||
|
* @error LeasingDependencyError if DynamoDB put fails in an unexpected way
|
||||||
|
* @error LeasingInvalidStateError if lease table does not exist
|
||||||
|
* @error ProvisionedThroughputError if DynamoDB put fails due to lack of capacity
|
||||||
|
*/
|
||||||
|
CreateLeaseIfNotExists(lease ILease) (bool, error)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param shardId Get the lease for this shardId
|
||||||
|
*
|
||||||
|
* @error LeasingInvalidStateError if lease table does not exist
|
||||||
|
* @error ProvisionedThroughputError if DynamoDB get fails due to lack of capacity
|
||||||
|
* @error LeasingDependencyError if DynamoDB get fails in an unexpected way
|
||||||
|
*
|
||||||
|
* @return lease for the specified shardId, or null if one doesn't exist
|
||||||
|
*/
|
||||||
|
GetLease(shardId string) (ILease, error)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Renew a lease by incrementing the lease counter. Conditional on the leaseCounter in DynamoDB matching the leaseCounter
|
||||||
|
* of the input. Mutates the leaseCounter of the passed-in lease object after updating the record in DynamoDB.
|
||||||
|
*
|
||||||
|
* @param lease the lease to renew
|
||||||
|
*
|
||||||
|
* @return true if renewal succeeded, false otherwise
|
||||||
|
*
|
||||||
|
* @error LeasingInvalidStateError if lease table does not exist
|
||||||
|
* @error ProvisionedThroughputError if DynamoDB update fails due to lack of capacity
|
||||||
|
* @error LeasingDependencyError if DynamoDB update fails in an unexpected way
|
||||||
|
*/
|
||||||
|
RenewLease(lease ILease) (bool, error)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Take a lease for the given owner by incrementing its leaseCounter and setting its owner field. Conditional on
|
||||||
|
* the leaseCounter in DynamoDB matching the leaseCounter of the input. Mutates the leaseCounter and owner of the
|
||||||
|
* passed-in lease object after updating DynamoDB.
|
||||||
|
*
|
||||||
|
* @param lease the lease to take
|
||||||
|
* @param owner the new owner
|
||||||
|
*
|
||||||
|
* @return true if lease was successfully taken, false otherwise
|
||||||
|
*
|
||||||
|
* @error LeasingInvalidStateError if lease table does not exist
|
||||||
|
* @error ProvisionedThroughputError if DynamoDB update fails due to lack of capacity
|
||||||
|
* @error LeasingDependencyError if DynamoDB update fails in an unexpected way
|
||||||
|
*/
|
||||||
|
TakeLease(lease ILease, owner string) (bool, error)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Evict the current owner of lease by setting owner to null. Conditional on the owner in DynamoDB matching the owner of
|
||||||
|
* the input. Mutates the lease counter and owner of the passed-in lease object after updating the record in DynamoDB.
|
||||||
|
*
|
||||||
|
* @param lease the lease to void
|
||||||
|
*
|
||||||
|
* @return true if eviction succeeded, false otherwise
|
||||||
|
*
|
||||||
|
* @error LeasingInvalidStateError if lease table does not exist
|
||||||
|
* @error ProvisionedThroughputError if DynamoDB update fails due to lack of capacity
|
||||||
|
* @error LeasingDependencyError if DynamoDB update fails in an unexpected way
|
||||||
|
*/
|
||||||
|
EvictLease(lease ILease) (bool, error)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Delete the given lease from DynamoDB. Does nothing when passed a lease that does not exist in DynamoDB.
|
||||||
|
*
|
||||||
|
* @param lease the lease to delete
|
||||||
|
*
|
||||||
|
* @error LeasingInvalidStateError if lease table does not exist
|
||||||
|
* @error ProvisionedThroughputError if DynamoDB delete fails due to lack of capacity
|
||||||
|
* @error LeasingDependencyError if DynamoDB delete fails in an unexpected way
|
||||||
|
*/
|
||||||
|
DeleteLease(lease ILease) error
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Delete all leases from DynamoDB. Useful for tools/utils and testing.
|
||||||
|
*
|
||||||
|
* @error LeasingInvalidStateError if lease table does not exist
|
||||||
|
* @error ProvisionedThroughputError if DynamoDB scan or delete fail due to lack of capacity
|
||||||
|
* @error LeasingDependencyError if DynamoDB scan or delete fail in an unexpected way
|
||||||
|
*/
|
||||||
|
DeleteAll() error
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update application-specific fields of the given lease in DynamoDB. Does not update fields managed by the leasing
|
||||||
|
* library such as leaseCounter, leaseOwner, or leaseKey. Conditional on the leaseCounter in DynamoDB matching the
|
||||||
|
* leaseCounter of the input. Increments the lease counter in DynamoDB so that updates can be contingent on other
|
||||||
|
* updates. Mutates the lease counter of the passed-in lease object.
|
||||||
|
*
|
||||||
|
* @return true if update succeeded, false otherwise
|
||||||
|
*
|
||||||
|
* @error LeasingInvalidStateError if lease table does not exist
|
||||||
|
* @error ProvisionedThroughputError if DynamoDB update fails due to lack of capacity
|
||||||
|
* @error LeasingDependencyError if DynamoDB update fails in an unexpected way
|
||||||
|
*/
|
||||||
|
UpdateLease(lease ILease) (bool, error)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check (synchronously) if there are any leases in the lease table.
|
||||||
|
*
|
||||||
|
* @return true if there are no leases in the lease table
|
||||||
|
*
|
||||||
|
* @error LeasingDependencyError if DynamoDB scan fails in an unexpected way
|
||||||
|
* @error LeasingInvalidStateError if lease table does not exist
|
||||||
|
* @error ProvisionedThroughputError if DynamoDB scan fails due to lack of capacity
|
||||||
|
*/
|
||||||
|
IsLeaseTableEmpty() (bool, error)
|
||||||
|
}
|
||||||
78
src/leases/interfaces/lease-renewer.go
Normal file
78
src/leases/interfaces/lease-renewer.go
Normal file
|
|
@ -0,0 +1,78 @@
|
||||||
|
package interfaces
|
||||||
|
|
||||||
|
// LeaseTable hold current lease mapping shardId --> Lease
|
||||||
|
type LeaseTable map[string]*ILease
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ILeaseRenewer objects are used by LeaseCoordinator to renew leases held by the LeaseCoordinator. Each
|
||||||
|
* LeaseCoordinator instance corresponds to one worker, and uses exactly one ILeaseRenewer to manage lease renewal for
|
||||||
|
* that worker.
|
||||||
|
*/
|
||||||
|
type ILeaseRenewer interface {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Bootstrap initial set of leases from the LeaseManager (e.g. upon process restart, pick up leases we own)
|
||||||
|
* @error LeasingDependencyError on unexpected DynamoDB failures
|
||||||
|
* @error LeasingInvalidStateError if lease table doesn't exist
|
||||||
|
* @error ProvisionedThroughputError if DynamoDB reads fail due to insufficient capacity
|
||||||
|
*/
|
||||||
|
Initialize() error
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Attempt to renew all currently held leases.
|
||||||
|
*
|
||||||
|
* @error LeasingDependencyError on unexpected DynamoDB failures
|
||||||
|
* @error LeasingInvalidStateError if lease table does not exist
|
||||||
|
*/
|
||||||
|
RenewLeases() error
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return currently held leases. Key is shardId, value is corresponding Lease object. A lease is currently held if
|
||||||
|
* we successfully renewed it on the last run of renewLeases(). Lease objects returned are deep copies -
|
||||||
|
* their lease counters will not tick.
|
||||||
|
*/
|
||||||
|
GetCurrentlyHeldLeases() *LeaseTable
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param leaseKey key of the lease to retrieve
|
||||||
|
*
|
||||||
|
* @return a deep copy of a currently held lease, or null if we don't hold the lease
|
||||||
|
*/
|
||||||
|
GetCurrentlyHeldLease(leaseKey string) *ILease
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Adds leases to this LeaseRenewer's set of currently held leases. Leases must have lastRenewalNanos set to the
|
||||||
|
* last time the lease counter was incremented before being passed to this method.
|
||||||
|
*
|
||||||
|
* @param newLeases new leases.
|
||||||
|
*/
|
||||||
|
AddLeasesToRenew(newLeases []ILease)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Clears this LeaseRenewer's set of currently held leases.
|
||||||
|
*/
|
||||||
|
ClearCurrentlyHeldLeases()
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stops the lease renewer from continunig to maintain the given lease.
|
||||||
|
*
|
||||||
|
* @param lease the lease to drop.
|
||||||
|
*/
|
||||||
|
DropLease(lease ILease)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update application-specific fields in a currently held lease. Cannot be used to update internal fields such as
|
||||||
|
* leaseCounter, leaseOwner, etc. Fails if we do not hold the lease, or if the concurrency token does not match
|
||||||
|
* the concurrency token on the internal authoritative copy of the lease (ie, if we lost and re-acquired the lease).
|
||||||
|
*
|
||||||
|
* @param lease lease object containing updated data
|
||||||
|
* @param concurrencyToken obtained by calling Lease.getConcurrencyToken for a currently held lease
|
||||||
|
*
|
||||||
|
* @return true if update succeeds, false otherwise
|
||||||
|
*
|
||||||
|
* @error LeasingInvalidStateError if lease table does not exist
|
||||||
|
* @error ProvisionedThroughputError if DynamoDB update fails due to lack of capacity
|
||||||
|
* @error LeasingDependencyError if DynamoDB update fails in an unexpected way
|
||||||
|
*/
|
||||||
|
UpdateLease(lease ILease, concurrencyToken string) (bool, error)
|
||||||
|
}
|
||||||
86
src/leases/interfaces/lease-serializer.go
Normal file
86
src/leases/interfaces/lease-serializer.go
Normal file
|
|
@ -0,0 +1,86 @@
|
||||||
|
package interfaces
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/aws/aws-sdk-go/service/dynamodb"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ILeaseSerializer an utility class that manages the mapping of Lease objects/operations to records in DynamoDB.
|
||||||
|
type ILeaseSerializer interface {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Construct a DynamoDB record out of a Lease object
|
||||||
|
*
|
||||||
|
* @param lease lease object to serialize
|
||||||
|
* @return an attribute value map representing the lease object
|
||||||
|
*/
|
||||||
|
ToDynamoRecord(lease ILease) map[string]*dynamodb.AttributeValue
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Construct a Lease object out of a DynamoDB record.
|
||||||
|
*
|
||||||
|
* @param dynamoRecord attribute value map from DynamoDB
|
||||||
|
* @return a deserialized lease object representing the attribute value map
|
||||||
|
*/
|
||||||
|
FromDynamoRecord(dynamoRecord map[string]*dynamodb.AttributeValue) ILease
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Special getDynamoHashKey implementation used by ILeaseManager.getLease().
|
||||||
|
*
|
||||||
|
* @param leaseKey
|
||||||
|
* @return the attribute value map representing a Lease's hash key given a string.
|
||||||
|
*/
|
||||||
|
GetDynamoHashKey(leaseKey string) map[string]*dynamodb.AttributeValue
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param lease
|
||||||
|
* @return the attribute value map asserting that a lease counter is what we expect.
|
||||||
|
*/
|
||||||
|
GetDynamoLeaseCounterExpectation(lease ILease) map[string]*dynamodb.ExpectedAttributeValue
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param lease
|
||||||
|
* @return the attribute value map asserting that the lease owner is what we expect.
|
||||||
|
*/
|
||||||
|
GetDynamoLeaseOwnerExpectation(lease ILease) map[string]*dynamodb.ExpectedAttributeValue
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the attribute value map asserting that a lease does not exist.
|
||||||
|
*/
|
||||||
|
GetDynamoNonexistantExpectation() map[string]*dynamodb.ExpectedAttributeValue
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param lease
|
||||||
|
* @return the attribute value map that increments a lease counter
|
||||||
|
*/
|
||||||
|
GetDynamoLeaseCounterUpdate(lease ILease) map[string]*dynamodb.AttributeValueUpdate
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param lease
|
||||||
|
* @param newOwner
|
||||||
|
* @return the attribute value map that takes a lease for a new owner
|
||||||
|
*/
|
||||||
|
GetDynamoTakeLeaseUpdate(lease ILease, newOwner string) map[string]*dynamodb.AttributeValueUpdate
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param lease
|
||||||
|
* @return the attribute value map that voids a lease
|
||||||
|
*/
|
||||||
|
GetDynamoEvictLeaseUpdate(lease ILease) map[string]*dynamodb.AttributeValueUpdate
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param lease
|
||||||
|
* @return the attribute value map that updates application-specific data for a lease and increments the lease
|
||||||
|
* counter
|
||||||
|
*/
|
||||||
|
GetDynamoUpdateLeaseUpdate(lease ILease) map[string]*dynamodb.AttributeValueUpdate
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the key schema for creating a DynamoDB table to store leases
|
||||||
|
*/
|
||||||
|
GetKeySchema() []*dynamodb.KeySchemaElement
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return attribute definitions for creating a DynamoDB table to store leases
|
||||||
|
*/
|
||||||
|
GetAttributeDefinitions() []*dynamodb.AttributeDefinition
|
||||||
|
}
|
||||||
28
src/leases/interfaces/lease-taker.go
Normal file
28
src/leases/interfaces/lease-taker.go
Normal file
|
|
@ -0,0 +1,28 @@
|
||||||
|
package interfaces
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ILeaseTaker is used by LeaseCoordinator to take new leases, or leases that other workers fail to renew. Each
|
||||||
|
* LeaseCoordinator instance corresponds to one worker and uses exactly one ILeaseTaker to take leases for that worker.
|
||||||
|
*/
|
||||||
|
type ILeaseTaker interface {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Compute the set of leases available to be taken and attempt to take them. Lease taking rules are:
|
||||||
|
*
|
||||||
|
* 1) If a lease's counter hasn't changed in long enough, try to take it.
|
||||||
|
* 2) If we see a lease we've never seen before, take it only if owner == null. If it's owned, odds are the owner is
|
||||||
|
* holding it. We can't tell until we see it more than once.
|
||||||
|
* 3) For load balancing purposes, you may violate rules 1 and 2 for EXACTLY ONE lease per call of takeLeases().
|
||||||
|
*
|
||||||
|
* @return map of shardId to Lease object for leases we just successfully took.
|
||||||
|
*
|
||||||
|
* @error LeasingDependencyError on unexpected DynamoDB failures
|
||||||
|
* @error LeasingInvalidStateError if lease table does not exist
|
||||||
|
*/
|
||||||
|
TakeLeases() map[string]ILease
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return workerIdentifier for this LeaseTaker
|
||||||
|
*/
|
||||||
|
GetWorkerIdentifier() string
|
||||||
|
}
|
||||||
21
src/leases/interfaces/lease.go
Normal file
21
src/leases/interfaces/lease.go
Normal file
|
|
@ -0,0 +1,21 @@
|
||||||
|
package interfaces
|
||||||
|
|
||||||
|
// ILease is the interface for all Leases
|
||||||
|
type ILease interface {
|
||||||
|
GetLeaseKey() string
|
||||||
|
SetLeaseKey(leaseKey string) error
|
||||||
|
|
||||||
|
GetLeaseOwner() string
|
||||||
|
SetLeaseOwner(leaseOwner string)
|
||||||
|
|
||||||
|
GetLeaseCounter() int64
|
||||||
|
SetLeaseCounter(leaseCounter int64)
|
||||||
|
|
||||||
|
GetConcurrencyToken() string
|
||||||
|
SetConcurrencyToken(concurrencyToken string)
|
||||||
|
|
||||||
|
GetLastCounterIncrementNanos() int64
|
||||||
|
SetLastCounterIncrementNanos(lastCounterIncrementNanos int64)
|
||||||
|
|
||||||
|
IsExpired(leaseDurationNanos, asOfNanos int64) bool
|
||||||
|
}
|
||||||
Loading…
Reference in a new issue