diff --git a/src/leases/dynamoutils/dynamoutils.go b/src/leases/dynamoutils/dynamoutils.go deleted file mode 100644 index 8d286be..0000000 --- a/src/leases/dynamoutils/dynamoutils.go +++ /dev/null @@ -1,78 +0,0 @@ -package util - -import ( - "strconv" - - "clientlibrary/common" - "github.com/aws/aws-sdk-go/service/dynamodb" -) - -/** - * Some static utility functions used by our LeaseSerializers. - */ - -func CreateAttributeValueFromSS(collectionValue []*string) (*dynamodb.AttributeValue, error) { - if len(collectionValue) == 0 { - return nil, common.IllegalArgumentError.MakeErr().WithDetail("Collection attributeValues cannot be null or empty.") - } - - attrib := &dynamodb.AttributeValue{} - attrib.SetSS(collectionValue) - - return attrib, nil -} - -func CreateAttributeValueFromString(stringValue string) (*dynamodb.AttributeValue, error) { - if len(stringValue) == 0 { - return nil, common.IllegalArgumentError.MakeErr().WithDetail("String attributeValues cannot be null or empty.") - } - - attrib := &dynamodb.AttributeValue{} - attrib.SetS(stringValue) - - return attrib, nil -} - -func CreateAttributeValueFromLong(longValue int64) (*dynamodb.AttributeValue, error) { - attrib := &dynamodb.AttributeValue{} - attrib.SetN(strconv.FormatInt(longValue, 10)) - - return attrib, nil -} - -func SafeGetLong(dynamoRecord map[string]*dynamodb.AttributeValue, key string) int64 { - av := dynamoRecord[key] - - if av == nil || av.N == nil { - return 0 - } - - var val int64 - val, err := strconv.ParseInt(*av.N, 10, 64) - - if err != nil { - return 0 - } - - return val -} - -func SafeGetString(dynamoRecord map[string]*dynamodb.AttributeValue, key string) *string { - av := dynamoRecord[key] - if av == nil { - return nil - } - - return av.S -} - -func SafeGetSS(dynamoRecord map[string]*dynamodb.AttributeValue, key string) []*string { - av := dynamoRecord[key] - - if av == nil { - var emptyslice []*string - return emptyslice - } - - return av.SS -} diff --git a/src/leases/impl/kinesis-client-lease.go b/src/leases/impl/kinesis-client-lease.go deleted file mode 100644 index abe049a..0000000 --- a/src/leases/impl/kinesis-client-lease.go +++ /dev/null @@ -1,116 +0,0 @@ -package impl - -import ( - . "clientlibrary/interfaces" -) - -// KinesisClientLease is a Lease subclass containing KinesisClientLibrary related fields for checkpoints. -type KinesisClientLease struct { - checkpoint *ExtendedSequenceNumber - pendingCheckpoint *ExtendedSequenceNumber - ownerSwitchesSinceCheckpoint int64 - parentShardIds *[]string - - // coreLease to hold lease information - // Note: golang doesn't support inheritance, use composition instead. - coreLease Lease -} - -// GetCheckpoint returns most recently application-supplied checkpoint value. During fail over, the new worker -// will pick up after the old worker's last checkpoint. -func (l *KinesisClientLease) GetCheckpoint() *ExtendedSequenceNumber { - return l.checkpoint -} - -// GetPendingCheckpoint returns pending checkpoint, possibly null. -func (l *KinesisClientLease) GetPendingCheckpoint() *ExtendedSequenceNumber { - return l.pendingCheckpoint -} - -// GetOwnerSwitchesSinceCheckpoint counts of distinct lease holders between checkpoints. -func (l *KinesisClientLease) GetOwnerSwitchesSinceCheckpoint() int64 { - return l.ownerSwitchesSinceCheckpoint -} - -// GetParentShardIds returns shardIds that parent this lease. Used for resharding. -func (l *KinesisClientLease) GetParentShardIds() *[]string { - return l.parentShardIds -} - -// SetCheckpoint -func (l *KinesisClientLease) SetCheckpoint(checkpoint *ExtendedSequenceNumber) { - l.checkpoint = checkpoint -} - -// SetPendingCheckpoint -func (l *KinesisClientLease) SetPendingCheckpoint(pendingCheckpoint *ExtendedSequenceNumber) { - l.pendingCheckpoint = pendingCheckpoint -} - -// SetOwnerSwitchesSinceCheckpoint -func (l *KinesisClientLease) SetOwnerSwitchesSinceCheckpoint(ownerSwitchesSinceCheckpoint int64) { - l.ownerSwitchesSinceCheckpoint = ownerSwitchesSinceCheckpoint -} - -// SetParentShardIds -func (l *KinesisClientLease) SetParentShardIds(parentShardIds *[]string) { - l.parentShardIds = parentShardIds -} - -// GetLeaseKey retrieves leaseKey - identifies the unit of work associated with this lease. -func (l *KinesisClientLease) GetLeaseKey() string { - return l.coreLease.GetLeaseKey() -} - -// GetLeaseOwner gets current owner of the lease, may be "". -func (l *KinesisClientLease) GetLeaseOwner() string { - return l.coreLease.GetLeaseOwner() -} - -// GetLeaseCounter retrieves leaseCounter which is incremented periodically by the holder of the lease. Used for optimistic locking. -func (l *KinesisClientLease) GetLeaseCounter() int64 { - return l.coreLease.GetLeaseCounter() -} - -// GetConcurrencyToken returns concurrency token -func (l *KinesisClientLease) GetConcurrencyToken() string { - return l.coreLease.GetConcurrencyToken() -} - -// GetLastCounterIncrementNanos returns concurrency token -func (l *KinesisClientLease) GetLastCounterIncrementNanos() int64 { - return l.coreLease.GetLastCounterIncrementNanos() -} - -// SetLeaseKey sets leaseKey - LeaseKey is immutable once set. -func (l *KinesisClientLease) SetLeaseKey(leaseKey string) error { - return l.coreLease.SetLeaseKey(leaseKey) -} - -// SetLeaseOwner set current owner of the lease, may be "". -func (l *KinesisClientLease) SetLeaseOwner(leaseOwner string) { - l.coreLease.SetLeaseOwner(leaseOwner) -} - -// SetLeaseCounter sets leaseCounter which is incremented periodically by the holder of the lease. Used for optimistic locking. -func (l *KinesisClientLease) SetLeaseCounter(leaseCounter int64) { - l.coreLease.SetLeaseCounter(leaseCounter) -} - -// SetConcurrencyToken -func (l *KinesisClientLease) SetConcurrencyToken(concurrencyToken string) { - l.coreLease.SetConcurrencyToken(concurrencyToken) -} - -// SetLastCounterIncrementNanos returns concurrency token -func (l *KinesisClientLease) SetLastCounterIncrementNanos(lastCounterIncrementNanos int64) { - l.coreLease.SetLastCounterIncrementNanos(lastCounterIncrementNanos) -} - -// IsExpired to check whether lease expired using -// @param leaseDurationNanos duration of lease in nanoseconds -// @param asOfNanos time in nanoseconds to check expiration as-of -// @return true if lease is expired as-of given time, false otherwise -func (l *KinesisClientLease) IsExpired(leaseDurationNanos, asOfNanos int64) bool { - return l.coreLease.IsExpired(leaseDurationNanos, asOfNanos) -} diff --git a/src/leases/impl/lease-manager.go b/src/leases/impl/lease-manager.go deleted file mode 100644 index a447e11..0000000 --- a/src/leases/impl/lease-manager.go +++ /dev/null @@ -1,440 +0,0 @@ -package impl - -import ( - "log" - "time" - - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/service/dynamodb" - "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface" - - . "leases/interfaces" -) - -const ( - // CREATING - The table is being created. - TABLE_CREATING = "CREATING" - - // UPDATING - The table is being updated. - TABLE_UPDATING = "UPDATING" - - // DELETING - The table is being deleted. - TABLE_DELETING = "DELETING" - - // ACTIVE - The table is ready for use. - TABLE_ACTIVE = "ACTIVE" -) - -// LeaseManager is an implementation of ILeaseManager that uses DynamoDB. -type LeaseManager struct { - tableName string - dynamoDBClient dynamodbiface.DynamoDBAPI - serializer ILeaseSerializer - consistentReads bool -} - -func NewLeaseManager(tableName string, dynamoDBClient dynamodbiface.DynamoDBAPI, serializer ILeaseSerializer) *LeaseManager { - return &LeaseManager{ - tableName: tableName, - dynamoDBClient: dynamoDBClient, - serializer: serializer, - consistentReads: false, - } -} - -/** - * Creates the table that will store leases. Succeeds if table already exists. - * - * @param readCapacity - * @param writeCapacity - * - * @return true if we created a new table (table didn't exist before) - * - * @error ProvisionedThroughputError if we cannot create the lease table due to per-AWS-account capacity - * restrictions. - * @error LeasingDependencyError if DynamoDB createTable fails in an unexpected way - */ -func (l *LeaseManager) CreateLeaseTableIfNotExists(readCapacity, writeCapacity int64) (bool, error) { - status, _ := l.tableStatus() - - if status != nil { - return false, nil - } - - input := &dynamodb.CreateTableInput{ - AttributeDefinitions: l.serializer.GetAttributeDefinitions(), - KeySchema: l.serializer.GetKeySchema(), - ProvisionedThroughput: &dynamodb.ProvisionedThroughput{ - ReadCapacityUnits: aws.Int64(readCapacity), - WriteCapacityUnits: aws.Int64(writeCapacity), - }, - TableName: aws.String(l.tableName), - } - _, err := l.dynamoDBClient.CreateTable(input) - - if err != nil { - return false, err - } - return true, nil -} - -/** - * @return true if the lease table already exists. - * - * @error LeasingDependencyError if DynamoDB describeTable fails in an unexpected way - */ -func (l *LeaseManager) LeaseTableExists() (bool, error) { - status, _ := l.tableStatus() - - if status != nil || aws.StringValue(status) == TABLE_ACTIVE { - return true, nil - } - return false, nil -} - -/** - * Blocks until the lease table exists by polling leaseTableExists. - * - * @param secondsBetweenPolls time to wait between polls in seconds - * @param timeoutSeconds total time to wait in seconds - * - * @return true if table exists, false if timeout was reached - * - * @error LeasingDependencyError if DynamoDB describeTable fails in an unexpected way - */ -func (l *LeaseManager) WaitUntilLeaseTableExists(secondsBetweenPolls, timeoutSeconds int64) (bool, error) { - delay := time.Duration(secondsBetweenPolls) * time.Second - deadline := time.Now().Add(time.Duration(timeoutSeconds) * time.Second) - - var err error - for time.Now().Before(deadline) { - flag := false - flag, err = l.LeaseTableExists() - - if flag { - return true, nil - } - - time.Sleep(delay) - } - - return false, err -} - -/** - * List all objects in table synchronously. - * - * @error LeasingDependencyError if DynamoDB scan fails in an unexpected way - * @error LeasingInvalidStateError if lease table does not exist - * @error ProvisionedThroughputError if DynamoDB scan fails due to lack of capacity - * - * @return list of leases - */ -func (l *LeaseManager) ListLeases() ([]ILease, error) { - return l.list(0) -} - -/** - * Create a new lease. Conditional on a lease not already existing with this shardId. - * - * @param lease the lease to create - * - * @return true if lease was created, false if lease already exists - * - * @error LeasingDependencyError if DynamoDB put fails in an unexpected way - * @error LeasingInvalidStateError if lease table does not exist - * @error ProvisionedThroughputError if DynamoDB put fails due to lack of capacity - */ -func (l *LeaseManager) CreateLeaseIfNotExists(lease ILease) (bool, error) { - input := &dynamodb.PutItemInput{ - TableName: aws.String(l.tableName), - Item: l.serializer.ToDynamoRecord(lease), - Expected: l.serializer.GetDynamoNonexistantExpectation(), - } - _, err := l.dynamoDBClient.PutItem(input) - return err != nil, err -} - -/** - * @param shardId Get the lease for this shardId and it is the leaseKey - * - * @error LeasingInvalidStateError if lease table does not exist - * @error ProvisionedThroughputError if DynamoDB get fails due to lack of capacity - * @error LeasingDependencyError if DynamoDB get fails in an unexpected way - * - * @return lease for the specified shardId, or null if one doesn't exist - */ -func (l *LeaseManager) GetLease(shardId string) (ILease, error) { - input := &dynamodb.GetItemInput{ - TableName: aws.String(l.tableName), - Key: l.serializer.GetDynamoHashKey(shardId), - ConsistentRead: aws.Bool(l.consistentReads), - } - result, err := l.dynamoDBClient.GetItem(input) - if err != nil { - return nil, err - } - dynamoRecord := result.Item - if dynamoRecord == nil { - return nil, nil - } - lease := l.serializer.FromDynamoRecord(dynamoRecord) - return lease, nil -} - -/** - * Renew a lease by incrementing the lease counter. Conditional on the leaseCounter in DynamoDB matching the leaseCounter - * of the input. Mutates the leaseCounter of the passed-in lease object after updating the record in DynamoDB. - * - * @param lease the lease to renew - * - * @return true if renewal succeeded, false otherwise - * - * @error LeasingInvalidStateError if lease table does not exist - * @error ProvisionedThroughputError if DynamoDB update fails due to lack of capacity - * @error LeasingDependencyError if DynamoDB update fails in an unexpected way - */ -func (l *LeaseManager) RenewLease(lease ILease) (bool, error) { - input := &dynamodb.UpdateItemInput{ - TableName: aws.String(l.tableName), - Key: l.serializer.GetDynamoHashKey(lease.GetLeaseKey()), - Expected: l.serializer.GetDynamoLeaseCounterExpectation(lease), - } - _, err := l.dynamoDBClient.UpdateItem(input) - - if err != nil { - // If we had a spurious retry during the Dynamo update, then this conditional PUT failure - // might be incorrect. So, we get the item straight away and check if the lease owner + lease counter - // are what we expected. - expectedOwner := lease.GetLeaseOwner() - expectedCounter := lease.GetLeaseCounter() + 1 - updatedLease, _ := l.GetLease(lease.GetLeaseKey()) - if updatedLease == nil || expectedOwner != updatedLease.GetLeaseOwner() || - expectedCounter != updatedLease.GetLeaseCounter() { - return false, nil - } - - log.Println("Detected spurious renewal failure for lease with key " + lease.GetLeaseKey() + ", but recovered") - } - - lease.SetLeaseCounter(lease.GetLeaseCounter() + 1) - return err != nil, err - -} - -/** - * Take a lease for the given owner by incrementing its leaseCounter and setting its owner field. Conditional on - * the leaseCounter in DynamoDB matching the leaseCounter of the input. Mutates the leaseCounter and owner of the - * passed-in lease object after updating DynamoDB. - * - * @param lease the lease to take - * @param owner the new owner - * - * @return true if lease was successfully taken, false otherwise - * - * @error LeasingInvalidStateError if lease table does not exist - * @error ProvisionedThroughputError if DynamoDB update fails due to lack of capacity - * @error LeasingDependencyError if DynamoDB update fails in an unexpected way - */ -func (l *LeaseManager) TakeLease(lease ILease, owner string) (bool, error) { - input := &dynamodb.UpdateItemInput{ - TableName: aws.String(l.tableName), - Key: l.serializer.GetDynamoHashKey(lease.GetLeaseKey()), - Expected: l.serializer.GetDynamoLeaseCounterExpectation(lease), - } - - updates := l.serializer.GetDynamoLeaseCounterUpdate(lease) - - // putAll to updates - for k, v := range l.serializer.GetDynamoTakeLeaseUpdate(lease, owner) { - updates[k] = v - } - input.SetAttributeUpdates(updates) - _, err := l.dynamoDBClient.UpdateItem(input) - - if err != nil { - return false, err - } - - lease.SetLeaseCounter(lease.GetLeaseCounter() + 1) - lease.SetLeaseOwner(owner) - return true, nil -} - -/** - * Evict the current owner of lease by setting owner to null. Conditional on the owner in DynamoDB matching the owner of - * the input. Mutates the lease counter and owner of the passed-in lease object after updating the record in DynamoDB. - * - * @param lease the lease to void - * - * @return true if eviction succeeded, false otherwise - * - * @error LeasingInvalidStateError if lease table does not exist - * @error ProvisionedThroughputError if DynamoDB update fails due to lack of capacity - * @error LeasingDependencyError if DynamoDB update fails in an unexpected way - */ -func (l *LeaseManager) EvictLease(lease ILease) (bool, error) { - input := &dynamodb.UpdateItemInput{ - TableName: aws.String(l.tableName), - Key: l.serializer.GetDynamoHashKey(lease.GetLeaseKey()), - Expected: l.serializer.GetDynamoLeaseCounterExpectation(lease), - } - - updates := l.serializer.GetDynamoLeaseCounterUpdate(lease) - - // putAll to updates - for k, v := range l.serializer.GetDynamoEvictLeaseUpdate(lease) { - updates[k] = v - } - input.SetAttributeUpdates(updates) - _, err := l.dynamoDBClient.UpdateItem(input) - - if err != nil { - return false, err - } - - lease.SetLeaseCounter(lease.GetLeaseCounter() + 1) - lease.SetLeaseOwner("") - return true, nil -} - -/** - * Delete the given lease from DynamoDB. Does nothing when passed a lease that does not exist in DynamoDB. - * - * @param lease the lease to delete - * - * @error LeasingInvalidStateError if lease table does not exist - * @error ProvisionedThroughputError if DynamoDB delete fails due to lack of capacity - * @error LeasingDependencyError if DynamoDB delete fails in an unexpected way - */ -func (l *LeaseManager) DeleteLease(lease ILease) error { - input := &dynamodb.DeleteItemInput{ - TableName: aws.String(l.tableName), - Key: l.serializer.GetDynamoHashKey(lease.GetLeaseKey()), - } - _, err := l.dynamoDBClient.DeleteItem(input) - return err -} - -/** - * Delete all leases from DynamoDB. Useful for tools/utils and testing. - * - * @error LeasingInvalidStateError if lease table does not exist - * @error ProvisionedThroughputError if DynamoDB scan or delete fail due to lack of capacity - * @error LeasingDependencyError if DynamoDB scan or delete fail in an unexpected way - */ -func (l *LeaseManager) DeleteAll() error { - allLeases, err := l.ListLeases() - if err != nil { - return err - } - - for _, v := range allLeases { - err := l.DeleteLease(v) - if err != nil { - return err - } - } - return nil -} - -/** - * Update application-specific fields of the given lease in DynamoDB. Does not update fields managed by the leasing - * library such as leaseCounter, leaseOwner, or leaseKey. Conditional on the leaseCounter in DynamoDB matching the - * leaseCounter of the input. Increments the lease counter in DynamoDB so that updates can be contingent on other - * updates. Mutates the lease counter of the passed-in lease object. - * - * @return true if update succeeded, false otherwise - * - * @error LeasingInvalidStateError if lease table does not exist - * @error ProvisionedThroughputError if DynamoDB update fails due to lack of capacity - * @error LeasingDependencyError if DynamoDB update fails in an unexpected way - */ -func (l *LeaseManager) UpdateLease(lease ILease) (bool, error) { - input := &dynamodb.UpdateItemInput{ - TableName: aws.String(l.tableName), - Key: l.serializer.GetDynamoHashKey(lease.GetLeaseKey()), - Expected: l.serializer.GetDynamoLeaseCounterExpectation(lease), - } - - updates := l.serializer.GetDynamoLeaseCounterUpdate(lease) - - // putAll to updates - for k, v := range l.serializer.GetDynamoUpdateLeaseUpdate(lease) { - updates[k] = v - } - input.SetAttributeUpdates(updates) - _, err := l.dynamoDBClient.UpdateItem(input) - - if err != nil { - return false, err - } - - lease.SetLeaseCounter(lease.GetLeaseCounter() + 1) - return true, nil -} - -/** - * Check (synchronously) if there are any leases in the lease table. - * - * @return true if there are no leases in the lease table - * - * @error LeasingDependencyError if DynamoDB scan fails in an unexpected way - * @error LeasingInvalidStateError if lease table does not exist - * @error ProvisionedThroughputError if DynamoDB scan fails due to lack of capacity - */ -func (l *LeaseManager) IsLeaseTableEmpty() (bool, error) { - result, err := l.list(1) - if err != nil { - return true, err - } - return len(result) > 0, nil -} - -// tableStatus check the current lease table status -func (l *LeaseManager) tableStatus() (*string, error) { - input := &dynamodb.DescribeTableInput{ - TableName: aws.String(l.tableName), - } - - result, err := l.dynamoDBClient.DescribeTable(input) - if err != nil { - return nil, err - } - - return result.Table.TableStatus, nil -} - -// List with the given page size (number of items to consider at a time). Package access for integration testing. -func (l *LeaseManager) list(limit int64) ([]ILease, error) { - input := &dynamodb.ScanInput{ - TableName: aws.String(l.tableName), - } - - if limit > 0 { - input.SetLimit(limit) - } - - result := []ILease{} - - for { - scanResult, err := l.dynamoDBClient.Scan(input) - if err != nil || scanResult == nil { - break - } - - for _, v := range scanResult.Items { - result = append(result, l.serializer.FromDynamoRecord(v)) - } - - lastEvaluatedKey := scanResult.LastEvaluatedKey - if lastEvaluatedKey == nil { - scanResult = nil - break - } else { - input.SetExclusiveStartKey(lastEvaluatedKey) - } - } - - return result, nil -} diff --git a/src/leases/impl/lease-serializer.go b/src/leases/impl/lease-serializer.go deleted file mode 100644 index 3e3f3b3..0000000 --- a/src/leases/impl/lease-serializer.go +++ /dev/null @@ -1,184 +0,0 @@ -package impl - -import ( - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/service/dynamodb" - - dynamoutils "leases/dynamoutils" - . "leases/interfaces" -) - -const ( - LEASE_KEY_KEY = "leaseKey" - LEASE_OWNER_KEY = "leaseOwner" - LEASE_COUNTER_KEY = "leaseCounter" -) - -/** - * An implementation of ILeaseSerializer for basic Lease objects. Can also instantiate subclasses of Lease so that - * LeaseSerializer can be decorated by other classes if you need to add fields to leases. - */ -type LeaseSerializer struct { -} - -/** - * Construct a DynamoDB record out of a Lease object - * - * @param lease lease object to serialize - * @return an attribute value map representing the lease object - */ -func (lc *LeaseSerializer) ToDynamoRecord(lease ILease) map[string]*dynamodb.AttributeValue { - result := map[string]*dynamodb.AttributeValue{} - - result[LEASE_KEY_KEY], _ = dynamoutils.CreateAttributeValueFromString(lease.GetLeaseKey()) - result[LEASE_COUNTER_KEY], _ = dynamoutils.CreateAttributeValueFromLong(lease.GetLeaseCounter()) - - if len(lease.GetLeaseOwner()) > 0 { - result[LEASE_OWNER_KEY], _ = dynamoutils.CreateAttributeValueFromString(lease.GetLeaseOwner()) - } - - return result -} - -/** - * Construct a Lease object out of a DynamoDB record. - * - * @param dynamoRecord attribute value map from DynamoDB - * @return a deserialized lease object representing the attribute value map - */ -func (lc *LeaseSerializer) FromDynamoRecord(dynamoRecord map[string]*dynamodb.AttributeValue) ILease { - result := &Lease{} - - result.SetLeaseKey(aws.StringValue(dynamoutils.SafeGetString(dynamoRecord, LEASE_KEY_KEY))) - result.SetLeaseOwner(aws.StringValue(dynamoutils.SafeGetString(dynamoRecord, LEASE_OWNER_KEY))) - result.SetLeaseCounter(dynamoutils.SafeGetLong(dynamoRecord, LEASE_COUNTER_KEY)) - return result -} - -/** - * Special getDynamoHashKey implementation used by ILeaseManager.getLease(). - * - * @param leaseKey - * @return the attribute value map representing a Lease's hash key given a string. - */ -func (lc *LeaseSerializer) GetDynamoHashKey(leaseKey string) map[string]*dynamodb.AttributeValue { - result := map[string]*dynamodb.AttributeValue{} - result[LEASE_KEY_KEY], _ = dynamoutils.CreateAttributeValueFromString(leaseKey) - return result -} - -/** - * @param lease - * @return the attribute value map asserting that a lease counter is what we expect. - */ -func (lc *LeaseSerializer) GetDynamoLeaseCounterExpectation(lease ILease) map[string]*dynamodb.ExpectedAttributeValue { - result := map[string]*dynamodb.ExpectedAttributeValue{} - expectedAV := &dynamodb.ExpectedAttributeValue{} - val, _ := dynamoutils.CreateAttributeValueFromLong(lease.GetLeaseCounter()) - expectedAV.SetValue(val) - result[LEASE_COUNTER_KEY] = expectedAV - return result -} - -/** - * @param lease - * @return the attribute value map asserting that the lease owner is what we expect. - */ -func (lc *LeaseSerializer) GetDynamoLeaseOwnerExpectation(lease ILease) map[string]*dynamodb.ExpectedAttributeValue { - result := map[string]*dynamodb.ExpectedAttributeValue{} - expectedAV := &dynamodb.ExpectedAttributeValue{} - val, _ := dynamoutils.CreateAttributeValueFromString(lease.GetLeaseOwner()) - expectedAV.SetValue(val) - result[LEASE_OWNER_KEY] = expectedAV - return result - -} - -/** - * @return the attribute value map asserting that a lease does not exist. - */ -func (lc *LeaseSerializer) GetDynamoNonexistantExpectation() map[string]*dynamodb.ExpectedAttributeValue { - result := map[string]*dynamodb.ExpectedAttributeValue{} - expectedAV := &dynamodb.ExpectedAttributeValue{} - expectedAV.SetExists(false) - result[LEASE_KEY_KEY] = expectedAV - - return result -} - -/** - * @param lease - * @return the attribute value map that increments a lease counter - */ -func (lc *LeaseSerializer) GetDynamoLeaseCounterUpdate(lease ILease) map[string]*dynamodb.AttributeValueUpdate { - result := map[string]*dynamodb.AttributeValueUpdate{} - updatedAV := &dynamodb.AttributeValueUpdate{} - // Increase the lease counter by 1 - val, _ := dynamoutils.CreateAttributeValueFromLong(lease.GetLeaseCounter() + 1) - updatedAV.SetValue(val) - updatedAV.SetAction(dynamodb.AttributeActionPut) - result[LEASE_COUNTER_KEY] = updatedAV - return result -} - -/** - * @param lease - * @param newOwner - * @return the attribute value map that takes a lease for a new owner - */ -func (lc *LeaseSerializer) GetDynamoTakeLeaseUpdate(lease ILease, newOwner string) map[string]*dynamodb.AttributeValueUpdate { - result := map[string]*dynamodb.AttributeValueUpdate{} - updatedAV := &dynamodb.AttributeValueUpdate{} - val, _ := dynamoutils.CreateAttributeValueFromString(lease.GetLeaseOwner()) - updatedAV.SetValue(val) - updatedAV.SetAction(dynamodb.AttributeActionPut) - result[LEASE_OWNER_KEY] = updatedAV - return result -} - -/** - * @param lease - * @return the attribute value map that voids a lease - */ -func (lc *LeaseSerializer) GetDynamoEvictLeaseUpdate(lease ILease) map[string]*dynamodb.AttributeValueUpdate { - result := map[string]*dynamodb.AttributeValueUpdate{} - updatedAV := &dynamodb.AttributeValueUpdate{} - updatedAV.SetValue(nil) - updatedAV.SetAction(dynamodb.AttributeActionDelete) - result[LEASE_OWNER_KEY] = updatedAV - return result -} - -/** - * @param lease - * @return the attribute value map that updates application-specific data for a lease and increments the lease - * counter - */ -func (lc *LeaseSerializer) GetDynamoUpdateLeaseUpdate(lease ILease) map[string]*dynamodb.AttributeValueUpdate { - result := map[string]*dynamodb.AttributeValueUpdate{} - return result -} - -/** - * @return the key schema for creating a DynamoDB table to store leases - */ -func (lc *LeaseSerializer) GetKeySchema() []*dynamodb.KeySchemaElement { - keySchema := []*dynamodb.KeySchemaElement{} - schemaElement := &dynamodb.KeySchemaElement{} - schemaElement.SetAttributeName(LEASE_KEY_KEY) - schemaElement.SetKeyType(dynamodb.KeyTypeHash) - keySchema = append(keySchema, schemaElement) - return keySchema -} - -/** - * @return attribute definitions for creating a DynamoDB table to store leases - */ -func (lc *LeaseSerializer) GetAttributeDefinitions() []*dynamodb.AttributeDefinition { - definitions := []*dynamodb.AttributeDefinition{} - definition := &dynamodb.AttributeDefinition{} - definition.SetAttributeName(LEASE_KEY_KEY) - definition.SetAttributeType(dynamodb.ScalarAttributeTypeS) - definitions = append(definitions, definition) - return definitions -} diff --git a/src/leases/impl/lease.go b/src/leases/impl/lease.go deleted file mode 100644 index 394475f..0000000 --- a/src/leases/impl/lease.go +++ /dev/null @@ -1,116 +0,0 @@ -package impl - -import ( - cc "clientlibrary/common" - "time" -) - -const ( - // We will consider leases to be expired if they are more than 90 days. - MAX_ABS_AGE_NANOS = int64(90 * 24 * time.Hour) -) - -// Lease structure contains data pertaining to a Lease. Distributed systems may use leases to partition work across a -// fleet of workers. Each unit of work (identified by a leaseKey) has a corresponding Lease. Every worker will contend -// for all leases - only one worker will successfully take each one. The worker should hold the lease until it is ready to stop -// processing the corresponding unit of work, or until it fails. When the worker stops holding the lease, another worker will -// take and hold the lease. -type Lease struct { - // shard-id - leaseKey string - // worker# - leaseOwner string - // ccounter incremented periodically - leaseCounter int64 - - // This field is used to prevent updates to leases that we have lost and re-acquired. It is deliberately not - // persisted in DynamoDB and excluded from hashCode and equals. - concurrencyToken string - - // This field is used by LeaseRenewer and LeaseTaker to track the last time a lease counter was incremented. It is - // deliberately not persisted in DynamoDB and excluded from hashCode and equals. - lastCounterIncrementNanos int64 -} - -// CloneLease to clone a lease object -func CopyLease(lease *Lease) *Lease { - return &Lease{ - leaseKey: lease.leaseKey, - leaseOwner: lease.leaseOwner, - leaseCounter: lease.leaseCounter, - concurrencyToken: lease.concurrencyToken, - lastCounterIncrementNanos: lease.lastCounterIncrementNanos, - } -} - -// GetLeaseKey retrieves leaseKey - identifies the unit of work associated with this lease. -func (l *Lease) GetLeaseKey() string { - return l.leaseKey -} - -// GetLeaseOwner gets current owner of the lease, may be "". -func (l *Lease) GetLeaseOwner() string { - return l.leaseOwner -} - -// GetLeaseCounter retrieves leaseCounter which is incremented periodically by the holder of the lease. Used for optimistic locking. -func (l *Lease) GetLeaseCounter() int64 { - return l.leaseCounter -} - -// GetConcurrencyToken returns concurrency token -func (l *Lease) GetConcurrencyToken() string { - return l.concurrencyToken -} - -// GetLastCounterIncrementNanos returns concurrency token -func (l *Lease) GetLastCounterIncrementNanos() int64 { - return l.lastCounterIncrementNanos -} - -// SetLeaseKey sets leaseKey - LeaseKey is immutable once set. -func (l *Lease) SetLeaseKey(leaseKey string) error { - if len(l.leaseKey) > 0 { - return cc.IllegalArgumentError.MakeErr().WithDetail("LeaseKey is immutable once set") - } - - l.leaseKey = leaseKey - return nil -} - -// SetLeaseOwner set current owner of the lease, may be "". -func (l *Lease) SetLeaseOwner(leaseOwner string) { - l.leaseOwner = leaseOwner -} - -// SetLeaseCounter sets leaseCounter which is incremented periodically by the holder of the lease. Used for optimistic locking. -func (l *Lease) SetLeaseCounter(leaseCounter int64) { - l.leaseCounter = leaseCounter -} - -// SetConcurrencyToken -func (l *Lease) SetConcurrencyToken(concurrencyToken string) { - l.concurrencyToken = concurrencyToken -} - -// SetLastCounterIncrementNanos returns concurrency token -func (l *Lease) SetLastCounterIncrementNanos(lastCounterIncrementNanos int64) { - l.lastCounterIncrementNanos = lastCounterIncrementNanos -} - -// IsExpired to check whether lease expired using -// @param leaseDurationNanos duration of lease in nanoseconds -// @param asOfNanos time in nanoseconds to check expiration as-of -// @return true if lease is expired as-of given time, false otherwise -func (l *Lease) IsExpired(leaseDurationNanos, asOfNanos int64) bool { - if l.lastCounterIncrementNanos == 0 { - return true - } - - age := asOfNanos - l.lastCounterIncrementNanos - if age > MAX_ABS_AGE_NANOS { - return true - } else { - return age > leaseDurationNanos - } -} diff --git a/src/leases/interfaces/lease-manager.go b/src/leases/interfaces/lease-manager.go deleted file mode 100644 index 8f27aa2..0000000 --- a/src/leases/interfaces/lease-manager.go +++ /dev/null @@ -1,162 +0,0 @@ -package interfaces - -// ILeaseManager supports basic CRUD operations for Leases. -type ILeaseManager interface { - - /** - * Creates the table that will store leases. Succeeds if table already exists. - * - * @param readCapacity - * @param writeCapacity - * - * @return true if we created a new table (table didn't exist before) - * - * @error ProvisionedThroughputError if we cannot create the lease table due to per-AWS-account capacity - * restrictions. - * @error LeasingDependencyError if DynamoDB createTable fails in an unexpected way - */ - CreateLeaseTableIfNotExists(readCapacity, writeCapacity int64) (bool, error) - - /** - * @return true if the lease table already exists. - * - * @error LeasingDependencyError if DynamoDB describeTable fails in an unexpected way - */ - LeaseTableExists() (bool, error) - - /** - * Blocks until the lease table exists by polling leaseTableExists. - * - * @param secondsBetweenPolls time to wait between polls in seconds - * @param timeoutSeconds total time to wait in seconds - * - * @return true if table exists, false if timeout was reached - * - * @error LeasingDependencyError if DynamoDB describeTable fails in an unexpected way - */ - WaitUntilLeaseTableExists(secondsBetweenPolls, timeoutSeconds int64) (bool, error) - - /** - * List all objects in table synchronously. - * - * @error LeasingDependencyError if DynamoDB scan fails in an unexpected way - * @error LeasingInvalidStateError if lease table does not exist - * @error ProvisionedThroughputError if DynamoDB scan fails due to lack of capacity - * - * @return list of leases - */ - ListLeases() ([]ILease, error) - - /** - * Create a new lease. Conditional on a lease not already existing with this shardId. - * - * @param lease the lease to create - * - * @return true if lease was created, false if lease already exists - * - * @error LeasingDependencyError if DynamoDB put fails in an unexpected way - * @error LeasingInvalidStateError if lease table does not exist - * @error ProvisionedThroughputError if DynamoDB put fails due to lack of capacity - */ - CreateLeaseIfNotExists(lease ILease) (bool, error) - - /** - * @param shardId Get the lease for this shardId - * - * @error LeasingInvalidStateError if lease table does not exist - * @error ProvisionedThroughputError if DynamoDB get fails due to lack of capacity - * @error LeasingDependencyError if DynamoDB get fails in an unexpected way - * - * @return lease for the specified shardId, or null if one doesn't exist - */ - GetLease(shardId string) (ILease, error) - - /** - * Renew a lease by incrementing the lease counter. Conditional on the leaseCounter in DynamoDB matching the leaseCounter - * of the input. Mutates the leaseCounter of the passed-in lease object after updating the record in DynamoDB. - * - * @param lease the lease to renew - * - * @return true if renewal succeeded, false otherwise - * - * @error LeasingInvalidStateError if lease table does not exist - * @error ProvisionedThroughputError if DynamoDB update fails due to lack of capacity - * @error LeasingDependencyError if DynamoDB update fails in an unexpected way - */ - RenewLease(lease ILease) (bool, error) - - /** - * Take a lease for the given owner by incrementing its leaseCounter and setting its owner field. Conditional on - * the leaseCounter in DynamoDB matching the leaseCounter of the input. Mutates the leaseCounter and owner of the - * passed-in lease object after updating DynamoDB. - * - * @param lease the lease to take - * @param owner the new owner - * - * @return true if lease was successfully taken, false otherwise - * - * @error LeasingInvalidStateError if lease table does not exist - * @error ProvisionedThroughputError if DynamoDB update fails due to lack of capacity - * @error LeasingDependencyError if DynamoDB update fails in an unexpected way - */ - TakeLease(lease ILease, owner string) (bool, error) - - /** - * Evict the current owner of lease by setting owner to null. Conditional on the owner in DynamoDB matching the owner of - * the input. Mutates the lease counter and owner of the passed-in lease object after updating the record in DynamoDB. - * - * @param lease the lease to void - * - * @return true if eviction succeeded, false otherwise - * - * @error LeasingInvalidStateError if lease table does not exist - * @error ProvisionedThroughputError if DynamoDB update fails due to lack of capacity - * @error LeasingDependencyError if DynamoDB update fails in an unexpected way - */ - EvictLease(lease ILease) (bool, error) - - /** - * Delete the given lease from DynamoDB. Does nothing when passed a lease that does not exist in DynamoDB. - * - * @param lease the lease to delete - * - * @error LeasingInvalidStateError if lease table does not exist - * @error ProvisionedThroughputError if DynamoDB delete fails due to lack of capacity - * @error LeasingDependencyError if DynamoDB delete fails in an unexpected way - */ - DeleteLease(lease ILease) error - - /** - * Delete all leases from DynamoDB. Useful for tools/utils and testing. - * - * @error LeasingInvalidStateError if lease table does not exist - * @error ProvisionedThroughputError if DynamoDB scan or delete fail due to lack of capacity - * @error LeasingDependencyError if DynamoDB scan or delete fail in an unexpected way - */ - DeleteAll() error - - /** - * Update application-specific fields of the given lease in DynamoDB. Does not update fields managed by the leasing - * library such as leaseCounter, leaseOwner, or leaseKey. Conditional on the leaseCounter in DynamoDB matching the - * leaseCounter of the input. Increments the lease counter in DynamoDB so that updates can be contingent on other - * updates. Mutates the lease counter of the passed-in lease object. - * - * @return true if update succeeded, false otherwise - * - * @error LeasingInvalidStateError if lease table does not exist - * @error ProvisionedThroughputError if DynamoDB update fails due to lack of capacity - * @error LeasingDependencyError if DynamoDB update fails in an unexpected way - */ - UpdateLease(lease ILease) (bool, error) - - /** - * Check (synchronously) if there are any leases in the lease table. - * - * @return true if there are no leases in the lease table - * - * @error LeasingDependencyError if DynamoDB scan fails in an unexpected way - * @error LeasingInvalidStateError if lease table does not exist - * @error ProvisionedThroughputError if DynamoDB scan fails due to lack of capacity - */ - IsLeaseTableEmpty() (bool, error) -} diff --git a/src/leases/interfaces/lease-renewer.go b/src/leases/interfaces/lease-renewer.go deleted file mode 100644 index 6e52049..0000000 --- a/src/leases/interfaces/lease-renewer.go +++ /dev/null @@ -1,78 +0,0 @@ -package interfaces - -// LeaseTable hold current lease mapping shardId --> Lease -type LeaseTable map[string]*ILease - -/** - * ILeaseRenewer objects are used by LeaseCoordinator to renew leases held by the LeaseCoordinator. Each - * LeaseCoordinator instance corresponds to one worker, and uses exactly one ILeaseRenewer to manage lease renewal for - * that worker. - */ -type ILeaseRenewer interface { - - /** - * Bootstrap initial set of leases from the LeaseManager (e.g. upon process restart, pick up leases we own) - * @error LeasingDependencyError on unexpected DynamoDB failures - * @error LeasingInvalidStateError if lease table doesn't exist - * @error ProvisionedThroughputError if DynamoDB reads fail due to insufficient capacity - */ - Initialize() error - - /** - * Attempt to renew all currently held leases. - * - * @error LeasingDependencyError on unexpected DynamoDB failures - * @error LeasingInvalidStateError if lease table does not exist - */ - RenewLeases() error - - /** - * @return currently held leases. Key is shardId, value is corresponding Lease object. A lease is currently held if - * we successfully renewed it on the last run of renewLeases(). Lease objects returned are deep copies - - * their lease counters will not tick. - */ - GetCurrentlyHeldLeases() *LeaseTable - - /** - * @param leaseKey key of the lease to retrieve - * - * @return a deep copy of a currently held lease, or null if we don't hold the lease - */ - GetCurrentlyHeldLease(leaseKey string) *ILease - - /** - * Adds leases to this LeaseRenewer's set of currently held leases. Leases must have lastRenewalNanos set to the - * last time the lease counter was incremented before being passed to this method. - * - * @param newLeases new leases. - */ - AddLeasesToRenew(newLeases []ILease) - - /** - * Clears this LeaseRenewer's set of currently held leases. - */ - ClearCurrentlyHeldLeases() - - /** - * Stops the lease renewer from continunig to maintain the given lease. - * - * @param lease the lease to drop. - */ - DropLease(lease ILease) - - /** - * Update application-specific fields in a currently held lease. Cannot be used to update internal fields such as - * leaseCounter, leaseOwner, etc. Fails if we do not hold the lease, or if the concurrency token does not match - * the concurrency token on the internal authoritative copy of the lease (ie, if we lost and re-acquired the lease). - * - * @param lease lease object containing updated data - * @param concurrencyToken obtained by calling Lease.getConcurrencyToken for a currently held lease - * - * @return true if update succeeds, false otherwise - * - * @error LeasingInvalidStateError if lease table does not exist - * @error ProvisionedThroughputError if DynamoDB update fails due to lack of capacity - * @error LeasingDependencyError if DynamoDB update fails in an unexpected way - */ - UpdateLease(lease ILease, concurrencyToken string) (bool, error) -} diff --git a/src/leases/interfaces/lease-serializer.go b/src/leases/interfaces/lease-serializer.go deleted file mode 100644 index a8601d0..0000000 --- a/src/leases/interfaces/lease-serializer.go +++ /dev/null @@ -1,86 +0,0 @@ -package interfaces - -import ( - "github.com/aws/aws-sdk-go/service/dynamodb" -) - -// ILeaseSerializer an utility class that manages the mapping of Lease objects/operations to records in DynamoDB. -type ILeaseSerializer interface { - - /** - * Construct a DynamoDB record out of a Lease object - * - * @param lease lease object to serialize - * @return an attribute value map representing the lease object - */ - ToDynamoRecord(lease ILease) map[string]*dynamodb.AttributeValue - - /** - * Construct a Lease object out of a DynamoDB record. - * - * @param dynamoRecord attribute value map from DynamoDB - * @return a deserialized lease object representing the attribute value map - */ - FromDynamoRecord(dynamoRecord map[string]*dynamodb.AttributeValue) ILease - - /** - * Special getDynamoHashKey implementation used by ILeaseManager.getLease(). - * - * @param leaseKey - * @return the attribute value map representing a Lease's hash key given a string. - */ - GetDynamoHashKey(leaseKey string) map[string]*dynamodb.AttributeValue - - /** - * @param lease - * @return the attribute value map asserting that a lease counter is what we expect. - */ - GetDynamoLeaseCounterExpectation(lease ILease) map[string]*dynamodb.ExpectedAttributeValue - - /** - * @param lease - * @return the attribute value map asserting that the lease owner is what we expect. - */ - GetDynamoLeaseOwnerExpectation(lease ILease) map[string]*dynamodb.ExpectedAttributeValue - - /** - * @return the attribute value map asserting that a lease does not exist. - */ - GetDynamoNonexistantExpectation() map[string]*dynamodb.ExpectedAttributeValue - - /** - * @param lease - * @return the attribute value map that increments a lease counter - */ - GetDynamoLeaseCounterUpdate(lease ILease) map[string]*dynamodb.AttributeValueUpdate - - /** - * @param lease - * @param newOwner - * @return the attribute value map that takes a lease for a new owner - */ - GetDynamoTakeLeaseUpdate(lease ILease, newOwner string) map[string]*dynamodb.AttributeValueUpdate - - /** - * @param lease - * @return the attribute value map that voids a lease - */ - GetDynamoEvictLeaseUpdate(lease ILease) map[string]*dynamodb.AttributeValueUpdate - - /** - * @param lease - * @return the attribute value map that updates application-specific data for a lease and increments the lease - * counter - */ - GetDynamoUpdateLeaseUpdate(lease ILease) map[string]*dynamodb.AttributeValueUpdate - - /** - * @return the key schema for creating a DynamoDB table to store leases - */ - GetKeySchema() []*dynamodb.KeySchemaElement - - /** - * @return attribute definitions for creating a DynamoDB table to store leases - */ - GetAttributeDefinitions() []*dynamodb.AttributeDefinition -} diff --git a/src/leases/interfaces/lease-taker.go b/src/leases/interfaces/lease-taker.go deleted file mode 100644 index 0dbaf1b..0000000 --- a/src/leases/interfaces/lease-taker.go +++ /dev/null @@ -1,28 +0,0 @@ -package interfaces - -/** - * ILeaseTaker is used by LeaseCoordinator to take new leases, or leases that other workers fail to renew. Each - * LeaseCoordinator instance corresponds to one worker and uses exactly one ILeaseTaker to take leases for that worker. - */ -type ILeaseTaker interface { - - /** - * Compute the set of leases available to be taken and attempt to take them. Lease taking rules are: - * - * 1) If a lease's counter hasn't changed in long enough, try to take it. - * 2) If we see a lease we've never seen before, take it only if owner == null. If it's owned, odds are the owner is - * holding it. We can't tell until we see it more than once. - * 3) For load balancing purposes, you may violate rules 1 and 2 for EXACTLY ONE lease per call of takeLeases(). - * - * @return map of shardId to Lease object for leases we just successfully took. - * - * @error LeasingDependencyError on unexpected DynamoDB failures - * @error LeasingInvalidStateError if lease table does not exist - */ - TakeLeases() map[string]ILease - - /** - * @return workerIdentifier for this LeaseTaker - */ - GetWorkerIdentifier() string -} diff --git a/src/leases/interfaces/lease.go b/src/leases/interfaces/lease.go deleted file mode 100644 index f3da35a..0000000 --- a/src/leases/interfaces/lease.go +++ /dev/null @@ -1,21 +0,0 @@ -package interfaces - -// ILease is the interface for all Leases -type ILease interface { - GetLeaseKey() string - SetLeaseKey(leaseKey string) error - - GetLeaseOwner() string - SetLeaseOwner(leaseOwner string) - - GetLeaseCounter() int64 - SetLeaseCounter(leaseCounter int64) - - GetConcurrencyToken() string - SetConcurrencyToken(concurrencyToken string) - - GetLastCounterIncrementNanos() int64 - SetLastCounterIncrementNanos(lastCounterIncrementNanos int64) - - IsExpired(leaseDurationNanos, asOfNanos int64) bool -} diff --git a/src/clientlibrary/common/errors.go b/src/vmware.com/cascade-kinesis-client/clientlibrary/common/errors.go similarity index 100% rename from src/clientlibrary/common/errors.go rename to src/vmware.com/cascade-kinesis-client/clientlibrary/common/errors.go diff --git a/src/clientlibrary/config/config.go b/src/vmware.com/cascade-kinesis-client/clientlibrary/config/config.go similarity index 100% rename from src/clientlibrary/config/config.go rename to src/vmware.com/cascade-kinesis-client/clientlibrary/config/config.go diff --git a/src/clientlibrary/config/config_test.go b/src/vmware.com/cascade-kinesis-client/clientlibrary/config/config_test.go similarity index 100% rename from src/clientlibrary/config/config_test.go rename to src/vmware.com/cascade-kinesis-client/clientlibrary/config/config_test.go diff --git a/src/clientlibrary/config/initial-stream-pos.go b/src/vmware.com/cascade-kinesis-client/clientlibrary/config/initial-stream-pos.go similarity index 100% rename from src/clientlibrary/config/initial-stream-pos.go rename to src/vmware.com/cascade-kinesis-client/clientlibrary/config/initial-stream-pos.go diff --git a/src/clientlibrary/config/kcl-config.go b/src/vmware.com/cascade-kinesis-client/clientlibrary/config/kcl-config.go similarity index 99% rename from src/clientlibrary/config/kcl-config.go rename to src/vmware.com/cascade-kinesis-client/clientlibrary/config/kcl-config.go index bfba4aa..1ac3f7f 100644 --- a/src/clientlibrary/config/kcl-config.go +++ b/src/vmware.com/cascade-kinesis-client/clientlibrary/config/kcl-config.go @@ -1,8 +1,8 @@ package config import ( - "clientlibrary/utils" "time" + "vmware.com/cascade-kinesis-client/clientlibrary/utils" ) // NewKinesisClientLibConfig to create a default KinesisClientLibConfiguration based on the required fields. diff --git a/src/clientlibrary/interfaces/inputs.go b/src/vmware.com/cascade-kinesis-client/clientlibrary/interfaces/inputs.go similarity index 100% rename from src/clientlibrary/interfaces/inputs.go rename to src/vmware.com/cascade-kinesis-client/clientlibrary/interfaces/inputs.go diff --git a/src/clientlibrary/interfaces/record-processor-checkpointer.go b/src/vmware.com/cascade-kinesis-client/clientlibrary/interfaces/record-processor-checkpointer.go similarity index 100% rename from src/clientlibrary/interfaces/record-processor-checkpointer.go rename to src/vmware.com/cascade-kinesis-client/clientlibrary/interfaces/record-processor-checkpointer.go diff --git a/src/clientlibrary/interfaces/record-processor.go b/src/vmware.com/cascade-kinesis-client/clientlibrary/interfaces/record-processor.go similarity index 100% rename from src/clientlibrary/interfaces/record-processor.go rename to src/vmware.com/cascade-kinesis-client/clientlibrary/interfaces/record-processor.go diff --git a/src/clientlibrary/interfaces/sequence-number.go b/src/vmware.com/cascade-kinesis-client/clientlibrary/interfaces/sequence-number.go similarity index 100% rename from src/clientlibrary/interfaces/sequence-number.go rename to src/vmware.com/cascade-kinesis-client/clientlibrary/interfaces/sequence-number.go diff --git a/src/clientlibrary/metrics/cloudwatch.go b/src/vmware.com/cascade-kinesis-client/clientlibrary/metrics/cloudwatch.go similarity index 100% rename from src/clientlibrary/metrics/cloudwatch.go rename to src/vmware.com/cascade-kinesis-client/clientlibrary/metrics/cloudwatch.go diff --git a/src/clientlibrary/metrics/interfaces.go b/src/vmware.com/cascade-kinesis-client/clientlibrary/metrics/interfaces.go similarity index 100% rename from src/clientlibrary/metrics/interfaces.go rename to src/vmware.com/cascade-kinesis-client/clientlibrary/metrics/interfaces.go diff --git a/src/clientlibrary/metrics/prometheus.go b/src/vmware.com/cascade-kinesis-client/clientlibrary/metrics/prometheus.go similarity index 100% rename from src/clientlibrary/metrics/prometheus.go rename to src/vmware.com/cascade-kinesis-client/clientlibrary/metrics/prometheus.go diff --git a/src/clientlibrary/utils/random.go b/src/vmware.com/cascade-kinesis-client/clientlibrary/utils/random.go similarity index 100% rename from src/clientlibrary/utils/random.go rename to src/vmware.com/cascade-kinesis-client/clientlibrary/utils/random.go diff --git a/src/clientlibrary/utils/uuid.go b/src/vmware.com/cascade-kinesis-client/clientlibrary/utils/uuid.go similarity index 100% rename from src/clientlibrary/utils/uuid.go rename to src/vmware.com/cascade-kinesis-client/clientlibrary/utils/uuid.go diff --git a/src/clientlibrary/worker/checkpointer.go b/src/vmware.com/cascade-kinesis-client/clientlibrary/worker/checkpointer.go similarity index 99% rename from src/clientlibrary/worker/checkpointer.go rename to src/vmware.com/cascade-kinesis-client/clientlibrary/worker/checkpointer.go index 4994e63..ec279b2 100644 --- a/src/clientlibrary/worker/checkpointer.go +++ b/src/vmware.com/cascade-kinesis-client/clientlibrary/worker/checkpointer.go @@ -11,7 +11,7 @@ import ( "github.com/matryer/try" log "github.com/sirupsen/logrus" - "clientlibrary/config" + "vmware.com/cascade-kinesis-client/clientlibrary/config" ) const ( diff --git a/src/clientlibrary/worker/record-processor-checkpointer.go b/src/vmware.com/cascade-kinesis-client/clientlibrary/worker/record-processor-checkpointer.go similarity index 96% rename from src/clientlibrary/worker/record-processor-checkpointer.go rename to src/vmware.com/cascade-kinesis-client/clientlibrary/worker/record-processor-checkpointer.go index 69a406e..94a090a 100644 --- a/src/clientlibrary/worker/record-processor-checkpointer.go +++ b/src/vmware.com/cascade-kinesis-client/clientlibrary/worker/record-processor-checkpointer.go @@ -3,7 +3,7 @@ package worker import ( "github.com/aws/aws-sdk-go/aws" - kcl "clientlibrary/interfaces" + kcl "vmware.com/cascade-kinesis-client/clientlibrary/interfaces" ) type ( diff --git a/src/clientlibrary/worker/shard-consumer.go b/src/vmware.com/cascade-kinesis-client/clientlibrary/worker/shard-consumer.go similarity index 97% rename from src/clientlibrary/worker/shard-consumer.go rename to src/vmware.com/cascade-kinesis-client/clientlibrary/worker/shard-consumer.go index 9b9f175..9a76309 100644 --- a/src/clientlibrary/worker/shard-consumer.go +++ b/src/vmware.com/cascade-kinesis-client/clientlibrary/worker/shard-consumer.go @@ -10,9 +10,9 @@ import ( "github.com/aws/aws-sdk-go/service/kinesis" "github.com/aws/aws-sdk-go/service/kinesis/kinesisiface" - "clientlibrary/config" - kcl "clientlibrary/interfaces" - "clientlibrary/metrics" + "vmware.com/cascade-kinesis-client/clientlibrary/config" + kcl "vmware.com/cascade-kinesis-client/clientlibrary/interfaces" + "vmware.com/cascade-kinesis-client/clientlibrary/metrics" ) const ( diff --git a/src/clientlibrary/worker/worker.go b/src/vmware.com/cascade-kinesis-client/clientlibrary/worker/worker.go similarity index 97% rename from src/clientlibrary/worker/worker.go rename to src/vmware.com/cascade-kinesis-client/clientlibrary/worker/worker.go index 1603ff4..a97573c 100644 --- a/src/clientlibrary/worker/worker.go +++ b/src/vmware.com/cascade-kinesis-client/clientlibrary/worker/worker.go @@ -16,9 +16,9 @@ import ( "github.com/aws/aws-sdk-go/service/kinesis" "github.com/aws/aws-sdk-go/service/kinesis/kinesisiface" - "clientlibrary/config" - kcl "clientlibrary/interfaces" - "clientlibrary/metrics" + "vmware.com/cascade-kinesis-client/clientlibrary/config" + kcl "vmware.com/cascade-kinesis-client/clientlibrary/interfaces" + "vmware.com/cascade-kinesis-client/clientlibrary/metrics" ) type shardStatus struct { diff --git a/src/clientlibrary/worker/worker_test.go b/src/vmware.com/cascade-kinesis-client/clientlibrary/worker/worker_test.go similarity index 93% rename from src/clientlibrary/worker/worker_test.go rename to src/vmware.com/cascade-kinesis-client/clientlibrary/worker/worker_test.go index e1dee71..bdabb54 100644 --- a/src/clientlibrary/worker/worker_test.go +++ b/src/vmware.com/cascade-kinesis-client/clientlibrary/worker/worker_test.go @@ -10,11 +10,11 @@ import ( "github.com/prometheus/common/expfmt" log "github.com/sirupsen/logrus" - cfg "clientlibrary/config" - kc "clientlibrary/interfaces" - "clientlibrary/metrics" - "clientlibrary/utils" "github.com/stretchr/testify/assert" + cfg "vmware.com/cascade-kinesis-client/clientlibrary/config" + kc "vmware.com/cascade-kinesis-client/clientlibrary/interfaces" + "vmware.com/cascade-kinesis-client/clientlibrary/metrics" + "vmware.com/cascade-kinesis-client/clientlibrary/utils" ) const ( @@ -27,10 +27,6 @@ const specstr = `{"name":"kube-qQyhk","networking":{"containerNetworkCidr":"10.2 const metricsSystem = "cloudwatch" func TestWorker(t *testing.T) { - os.Setenv("AWS_ACCESS_KEY_ID", "your aws access key id") - os.Setenv("AWS_SECRET_ACCESS_KEY", "your aws secret access key") - defer os.Unsetenv("AWS_ACCESS_KEY_ID") - defer os.Unsetenv("AWS_SECRET_ACCESS_KEY") kclConfig := cfg.NewKinesisClientLibConfig("appName", streamName, regionName, workerID). WithInitialPositionInStream(cfg.LATEST). WithMaxRecords(10).