Fix naming convention (#85)
Minor fix on constant naming convention. Signed-off-by: Tao Jiang <taoj@vmware.com>
This commit is contained in:
parent
1044485392
commit
adb264717b
8 changed files with 46 additions and 44 deletions
|
|
@ -42,8 +42,8 @@ const (
|
||||||
// We've completely processed all records in this shard.
|
// We've completely processed all records in this shard.
|
||||||
ShardEnd = "SHARD_END"
|
ShardEnd = "SHARD_END"
|
||||||
|
|
||||||
// ErrLeaseNotAquired is returned when we failed to get a lock on the shard
|
// ErrLeaseNotAcquired is returned when we failed to get a lock on the shard
|
||||||
ErrLeaseNotAquired = "Lease is already held by another node"
|
ErrLeaseNotAcquired = "lease is already held by another node"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Checkpointer handles checkpointing when a record has been processed
|
// Checkpointer handles checkpointing when a record has been processed
|
||||||
|
|
|
||||||
|
|
@ -142,7 +142,7 @@ func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssign
|
||||||
}
|
}
|
||||||
|
|
||||||
if time.Now().UTC().Before(currentLeaseTimeout) && assignedTo != newAssignTo {
|
if time.Now().UTC().Before(currentLeaseTimeout) && assignedTo != newAssignTo {
|
||||||
return errors.New(ErrLeaseNotAquired)
|
return errors.New(ErrLeaseNotAcquired)
|
||||||
}
|
}
|
||||||
|
|
||||||
checkpointer.log.Debugf("Attempting to get a lock for shard: %s, leaseTimeout: %s, assignedTo: %s", shard.ID, currentLeaseTimeout, assignedTo)
|
checkpointer.log.Debugf("Attempting to get a lock for shard: %s, leaseTimeout: %s, assignedTo: %s", shard.ID, currentLeaseTimeout, assignedTo)
|
||||||
|
|
@ -186,7 +186,7 @@ func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssign
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if awsErr, ok := err.(awserr.Error); ok {
|
if awsErr, ok := err.(awserr.Error); ok {
|
||||||
if awsErr.Code() == dynamodb.ErrCodeConditionalCheckFailedException {
|
if awsErr.Code() == dynamodb.ErrCodeConditionalCheckFailedException {
|
||||||
return errors.New(ErrLeaseNotAquired)
|
return errors.New(ErrLeaseNotAcquired)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
|
|
|
||||||
|
|
@ -85,7 +85,7 @@ func TestGetLeaseNotAquired(t *testing.T) {
|
||||||
Checkpoint: "",
|
Checkpoint: "",
|
||||||
Mux: &sync.Mutex{},
|
Mux: &sync.Mutex{},
|
||||||
}, "ijkl-mnop")
|
}, "ijkl-mnop")
|
||||||
if err == nil || err.Error() != ErrLeaseNotAquired {
|
if err == nil || err.Error() != ErrLeaseNotAcquired {
|
||||||
t.Errorf("Got a lease when it was already held by abcd-efgh: %s", err)
|
t.Errorf("Got a lease when it was already held by abcd-efgh: %s", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -55,72 +55,72 @@ const (
|
||||||
|
|
||||||
// The location in the shard from which the KinesisClientLibrary will start fetching records from
|
// The location in the shard from which the KinesisClientLibrary will start fetching records from
|
||||||
// when the application starts for the first time and there is no checkpoint for the shard.
|
// when the application starts for the first time and there is no checkpoint for the shard.
|
||||||
DEFAULT_INITIAL_POSITION_IN_STREAM = LATEST
|
DefaultInitialPositionInStream = LATEST
|
||||||
|
|
||||||
// Fail over time in milliseconds. A worker which does not renew it's lease within this time interval
|
// Fail over time in milliseconds. A worker which does not renew it's lease within this time interval
|
||||||
// will be regarded as having problems and it's shards will be assigned to other workers.
|
// will be regarded as having problems and it's shards will be assigned to other workers.
|
||||||
// For applications that have a large number of shards, this may be set to a higher number to reduce
|
// For applications that have a large number of shards, this may be set to a higher number to reduce
|
||||||
// the number of DynamoDB IOPS required for tracking leases.
|
// the number of DynamoDB IOPS required for tracking leases.
|
||||||
DEFAULT_FAILOVER_TIME_MILLIS = 10000
|
DefaultFailoverTimeMillis = 10000
|
||||||
|
|
||||||
// Period before the end of lease during which a lease is refreshed by the owner.
|
// Period before the end of lease during which a lease is refreshed by the owner.
|
||||||
DEFAULT_LEASE_REFRESH_PERIOD_MILLIS = 5000
|
DefaultLeaseRefreshPeriodMillis = 5000
|
||||||
|
|
||||||
// Max records to fetch from Kinesis in a single GetRecords call.
|
// Max records to fetch from Kinesis in a single GetRecords call.
|
||||||
DEFAULT_MAX_RECORDS = 10000
|
DefaultMaxRecords = 10000
|
||||||
|
|
||||||
// The default value for how long the {@link ShardConsumer} should sleep if no records are returned
|
// The default value for how long the {@link ShardConsumer} should sleep if no records are returned
|
||||||
// from the call to
|
// from the call to
|
||||||
DEFAULT_IDLETIME_BETWEEN_READS_MILLIS = 1000
|
DefaultIdletimeBetweenReadsMillis = 1000
|
||||||
|
|
||||||
// Don't call processRecords() on the record processor for empty record lists.
|
// Don't call processRecords() on the record processor for empty record lists.
|
||||||
DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST = false
|
DefaultDontCallProcessRecordsForEmptyRecordList = false
|
||||||
|
|
||||||
// Interval in milliseconds between polling to check for parent shard completion.
|
// Interval in milliseconds between polling to check for parent shard completion.
|
||||||
// Polling frequently will take up more DynamoDB IOPS (when there are leases for shards waiting on
|
// Polling frequently will take up more DynamoDB IOPS (when there are leases for shards waiting on
|
||||||
// completion of parent shards).
|
// completion of parent shards).
|
||||||
DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS = 10000
|
DefaultParentShardPollIntervalMillis = 10000
|
||||||
|
|
||||||
// Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks.
|
// Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks.
|
||||||
DEFAULT_SHARD_SYNC_INTERVAL_MILLIS = 60000
|
DefaultShardSyncIntervalMillis = 60000
|
||||||
|
|
||||||
// Cleanup leases upon shards completion (don't wait until they expire in Kinesis).
|
// Cleanup leases upon shards completion (don't wait until they expire in Kinesis).
|
||||||
// Keeping leases takes some tracking/resources (e.g. they need to be renewed, assigned), so by
|
// Keeping leases takes some tracking/resources (e.g. they need to be renewed, assigned), so by
|
||||||
// default we try to delete the ones we don't need any longer.
|
// default we try to delete the ones we don't need any longer.
|
||||||
DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION = true
|
DefaultCleanupLeasesUponShardsCompletion = true
|
||||||
|
|
||||||
// Backoff time in milliseconds for Amazon Kinesis Client Library tasks (in the event of failures).
|
// Backoff time in milliseconds for Amazon Kinesis Client Library tasks (in the event of failures).
|
||||||
DEFAULT_TASK_BACKOFF_TIME_MILLIS = 500
|
DefaultTaskBackoffTimeMillis = 500
|
||||||
|
|
||||||
// KCL will validate client provided sequence numbers with a call to Amazon Kinesis before
|
// KCL will validate client provided sequence numbers with a call to Amazon Kinesis before
|
||||||
// checkpointing for calls to {@link RecordProcessorCheckpointer#checkpoint(String)} by default.
|
// checkpointing for calls to {@link RecordProcessorCheckpointer#checkpoint(String)} by default.
|
||||||
DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING = true
|
DefaultValidateSequenceNumberBeforeCheckpointing = true
|
||||||
|
|
||||||
// The max number of leases (shards) this worker should process.
|
// The max number of leases (shards) this worker should process.
|
||||||
// This can be useful to avoid overloading (and thrashing) a worker when a host has resource constraints
|
// This can be useful to avoid overloading (and thrashing) a worker when a host has resource constraints
|
||||||
// or during deployment.
|
// or during deployment.
|
||||||
// NOTE: Setting this to a low value can cause data loss if workers are not able to pick up all shards in the
|
// NOTE: Setting this to a low value can cause data loss if workers are not able to pick up all shards in the
|
||||||
// stream due to the max limit.
|
// stream due to the max limit.
|
||||||
DEFAULT_MAX_LEASES_FOR_WORKER = math.MaxInt16
|
DefaultMaxLeasesForWorker = math.MaxInt16
|
||||||
|
|
||||||
// Max leases to steal from another worker at one time (for load balancing).
|
// Max leases to steal from another worker at one time (for load balancing).
|
||||||
// Setting this to a higher number can allow for faster load convergence (e.g. during deployments, cold starts),
|
// Setting this to a higher number can allow for faster load convergence (e.g. during deployments, cold starts),
|
||||||
// but can cause higher churn in the system.
|
// but can cause higher churn in the system.
|
||||||
DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME = 1
|
DefaultMaxLeasesToStealAtOneTime = 1
|
||||||
|
|
||||||
// The Amazon DynamoDB table used for tracking leases will be provisioned with this read capacity.
|
// The Amazon DynamoDB table used for tracking leases will be provisioned with this read capacity.
|
||||||
DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY = 10
|
DefaultInitialLeaseTableReadCapacity = 10
|
||||||
|
|
||||||
// The Amazon DynamoDB table used for tracking leases will be provisioned with this write capacity.
|
// The Amazon DynamoDB table used for tracking leases will be provisioned with this write capacity.
|
||||||
DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY = 10
|
DefaultInitialLeaseTableWriteCapacity = 10
|
||||||
|
|
||||||
// The Worker will skip shard sync during initialization if there are one or more leases in the lease table. This
|
// The Worker will skip shard sync during initialization if there are one or more leases in the lease table. This
|
||||||
// assumes that the shards and leases are in-sync. This enables customers to choose faster startup times (e.g.
|
// assumes that the shards and leases are in-sync. This enables customers to choose faster startup times (e.g.
|
||||||
// during incremental deployments of an application).
|
// during incremental deployments of an application).
|
||||||
DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST = false
|
DefaultSkipShardSyncAtStartupIfLeasesExist = false
|
||||||
|
|
||||||
// The amount of milliseconds to wait before graceful shutdown forcefully terminates.
|
// The amount of milliseconds to wait before graceful shutdown forcefully terminates.
|
||||||
DEFAULT_SHUTDOWN_GRACE_MILLIS = 5000
|
DefaultShutdownGraceMillis = 5000
|
||||||
)
|
)
|
||||||
|
|
||||||
type (
|
type (
|
||||||
|
|
|
||||||
|
|
@ -76,24 +76,24 @@ func NewKinesisClientLibConfigWithCredentials(applicationName, streamName, regio
|
||||||
StreamName: streamName,
|
StreamName: streamName,
|
||||||
RegionName: regionName,
|
RegionName: regionName,
|
||||||
WorkerID: workerID,
|
WorkerID: workerID,
|
||||||
InitialPositionInStream: DEFAULT_INITIAL_POSITION_IN_STREAM,
|
InitialPositionInStream: DefaultInitialPositionInStream,
|
||||||
InitialPositionInStreamExtended: *newInitialPosition(DEFAULT_INITIAL_POSITION_IN_STREAM),
|
InitialPositionInStreamExtended: *newInitialPosition(DefaultInitialPositionInStream),
|
||||||
FailoverTimeMillis: DEFAULT_FAILOVER_TIME_MILLIS,
|
FailoverTimeMillis: DefaultFailoverTimeMillis,
|
||||||
LeaseRefreshPeriodMillis: DEFAULT_LEASE_REFRESH_PERIOD_MILLIS,
|
LeaseRefreshPeriodMillis: DefaultLeaseRefreshPeriodMillis,
|
||||||
MaxRecords: DEFAULT_MAX_RECORDS,
|
MaxRecords: DefaultMaxRecords,
|
||||||
IdleTimeBetweenReadsInMillis: DEFAULT_IDLETIME_BETWEEN_READS_MILLIS,
|
IdleTimeBetweenReadsInMillis: DefaultIdletimeBetweenReadsMillis,
|
||||||
CallProcessRecordsEvenForEmptyRecordList: DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST,
|
CallProcessRecordsEvenForEmptyRecordList: DefaultDontCallProcessRecordsForEmptyRecordList,
|
||||||
ParentShardPollIntervalMillis: DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS,
|
ParentShardPollIntervalMillis: DefaultParentShardPollIntervalMillis,
|
||||||
ShardSyncIntervalMillis: DEFAULT_SHARD_SYNC_INTERVAL_MILLIS,
|
ShardSyncIntervalMillis: DefaultShardSyncIntervalMillis,
|
||||||
CleanupTerminatedShardsBeforeExpiry: DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION,
|
CleanupTerminatedShardsBeforeExpiry: DefaultCleanupLeasesUponShardsCompletion,
|
||||||
TaskBackoffTimeMillis: DEFAULT_TASK_BACKOFF_TIME_MILLIS,
|
TaskBackoffTimeMillis: DefaultTaskBackoffTimeMillis,
|
||||||
ValidateSequenceNumberBeforeCheckpointing: DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING,
|
ValidateSequenceNumberBeforeCheckpointing: DefaultValidateSequenceNumberBeforeCheckpointing,
|
||||||
ShutdownGraceMillis: DEFAULT_SHUTDOWN_GRACE_MILLIS,
|
ShutdownGraceMillis: DefaultShutdownGraceMillis,
|
||||||
MaxLeasesForWorker: DEFAULT_MAX_LEASES_FOR_WORKER,
|
MaxLeasesForWorker: DefaultMaxLeasesForWorker,
|
||||||
MaxLeasesToStealAtOneTime: DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME,
|
MaxLeasesToStealAtOneTime: DefaultMaxLeasesToStealAtOneTime,
|
||||||
InitialLeaseTableReadCapacity: DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY,
|
InitialLeaseTableReadCapacity: DefaultInitialLeaseTableReadCapacity,
|
||||||
InitialLeaseTableWriteCapacity: DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY,
|
InitialLeaseTableWriteCapacity: DefaultInitialLeaseTableWriteCapacity,
|
||||||
SkipShardSyncAtWorkerInitializationIfLeasesExist: DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
|
SkipShardSyncAtWorkerInitializationIfLeasesExist: DefaultSkipShardSyncAtStartupIfLeasesExist,
|
||||||
Logger: logger.GetDefaultLogger(),
|
Logger: logger.GetDefaultLogger(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -162,7 +162,7 @@ func (sc *ShardConsumer) getRecords(shard *par.ShardStatus) error {
|
||||||
log.Debugf("Refreshing lease on shard: %s for worker: %s", shard.ID, sc.consumerID)
|
log.Debugf("Refreshing lease on shard: %s for worker: %s", shard.ID, sc.consumerID)
|
||||||
err = sc.checkpointer.GetLease(shard, sc.consumerID)
|
err = sc.checkpointer.GetLease(shard, sc.consumerID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err.Error() == chk.ErrLeaseNotAquired {
|
if err.Error() == chk.ErrLeaseNotAcquired {
|
||||||
log.Warnf("Failed in acquiring lease on shard: %s for worker: %s", shard.ID, sc.consumerID)
|
log.Warnf("Failed in acquiring lease on shard: %s for worker: %s", shard.ID, sc.consumerID)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
@ -225,7 +225,7 @@ func (sc *ShardConsumer) getRecords(shard *par.ShardStatus) error {
|
||||||
|
|
||||||
recordLength := len(input.Records)
|
recordLength := len(input.Records)
|
||||||
recordBytes := int64(0)
|
recordBytes := int64(0)
|
||||||
log.Debugf("Received %d records, MillisBehindLatest: %v", recordLength, input.MillisBehindLatest)
|
log.Debugf("Received %d de-aggregated records, MillisBehindLatest: %v", recordLength, input.MillisBehindLatest)
|
||||||
|
|
||||||
for _, r := range dars {
|
for _, r := range dars {
|
||||||
recordBytes += int64(len(r.Data))
|
recordBytes += int64(len(r.Data))
|
||||||
|
|
|
||||||
|
|
@ -277,7 +277,7 @@ func (w *Worker) eventLoop() {
|
||||||
err = w.checkpointer.GetLease(shard, w.workerID)
|
err = w.checkpointer.GetLease(shard, w.workerID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// cannot get lease on the shard
|
// cannot get lease on the shard
|
||||||
if err.Error() != chk.ErrLeaseNotAquired {
|
if err.Error() != chk.ErrLeaseNotAcquired {
|
||||||
log.Errorf("Cannot get lease: %+v", err)
|
log.Errorf("Cannot get lease: %+v", err)
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
|
|
|
||||||
|
|
@ -71,7 +71,9 @@ func (dd *dumpRecordProcessor) ProcessRecords(input *kc.ProcessRecordsInput) {
|
||||||
// Especially, for processing de-aggregated KPL records, checkpointing has to happen at the end of batch
|
// Especially, for processing de-aggregated KPL records, checkpointing has to happen at the end of batch
|
||||||
// because de-aggregated records share the same sequence number.
|
// because de-aggregated records share the same sequence number.
|
||||||
lastRecordSequenceNumber := input.Records[len(input.Records)-1].SequenceNumber
|
lastRecordSequenceNumber := input.Records[len(input.Records)-1].SequenceNumber
|
||||||
dd.t.Logf("Checkpoint progress at: %v, MillisBehindLatest = %v", lastRecordSequenceNumber, input.MillisBehindLatest)
|
// Calculate the time taken from polling records and delivering to record processor for a batch.
|
||||||
|
diff := input.CacheExitTime.Sub(*input.CacheEntryTime)
|
||||||
|
dd.t.Logf("Checkpoint progress at: %v, MillisBehindLatest = %v, KCLProcessTime = %v", lastRecordSequenceNumber, input.MillisBehindLatest, diff)
|
||||||
input.Checkpointer.Checkpoint(lastRecordSequenceNumber)
|
input.Checkpointer.Checkpoint(lastRecordSequenceNumber)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue