diff --git a/clientlibrary/checkpoint/checkpointer.go b/clientlibrary/checkpoint/checkpointer.go index 4ff6fd2..2345533 100644 --- a/clientlibrary/checkpoint/checkpointer.go +++ b/clientlibrary/checkpoint/checkpointer.go @@ -42,8 +42,8 @@ const ( // We've completely processed all records in this shard. ShardEnd = "SHARD_END" - // ErrLeaseNotAquired is returned when we failed to get a lock on the shard - ErrLeaseNotAquired = "Lease is already held by another node" + // ErrLeaseNotAcquired is returned when we failed to get a lock on the shard + ErrLeaseNotAcquired = "lease is already held by another node" ) // Checkpointer handles checkpointing when a record has been processed diff --git a/clientlibrary/checkpoint/dynamodb-checkpointer.go b/clientlibrary/checkpoint/dynamodb-checkpointer.go index 1145454..fd6751e 100644 --- a/clientlibrary/checkpoint/dynamodb-checkpointer.go +++ b/clientlibrary/checkpoint/dynamodb-checkpointer.go @@ -142,7 +142,7 @@ func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssign } if time.Now().UTC().Before(currentLeaseTimeout) && assignedTo != newAssignTo { - return errors.New(ErrLeaseNotAquired) + return errors.New(ErrLeaseNotAcquired) } checkpointer.log.Debugf("Attempting to get a lock for shard: %s, leaseTimeout: %s, assignedTo: %s", shard.ID, currentLeaseTimeout, assignedTo) @@ -186,7 +186,7 @@ func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssign if err != nil { if awsErr, ok := err.(awserr.Error); ok { if awsErr.Code() == dynamodb.ErrCodeConditionalCheckFailedException { - return errors.New(ErrLeaseNotAquired) + return errors.New(ErrLeaseNotAcquired) } } return err diff --git a/clientlibrary/checkpoint/dynamodb-checkpointer_test.go b/clientlibrary/checkpoint/dynamodb-checkpointer_test.go index ec3894b..55ec973 100644 --- a/clientlibrary/checkpoint/dynamodb-checkpointer_test.go +++ b/clientlibrary/checkpoint/dynamodb-checkpointer_test.go @@ -85,7 +85,7 @@ func TestGetLeaseNotAquired(t *testing.T) { Checkpoint: "", Mux: &sync.Mutex{}, }, "ijkl-mnop") - if err == nil || err.Error() != ErrLeaseNotAquired { + if err == nil || err.Error() != ErrLeaseNotAcquired { t.Errorf("Got a lease when it was already held by abcd-efgh: %s", err) } } diff --git a/clientlibrary/config/config.go b/clientlibrary/config/config.go index 91d73b8..5337269 100644 --- a/clientlibrary/config/config.go +++ b/clientlibrary/config/config.go @@ -55,72 +55,72 @@ const ( // The location in the shard from which the KinesisClientLibrary will start fetching records from // when the application starts for the first time and there is no checkpoint for the shard. - DEFAULT_INITIAL_POSITION_IN_STREAM = LATEST + DefaultInitialPositionInStream = LATEST // Fail over time in milliseconds. A worker which does not renew it's lease within this time interval // will be regarded as having problems and it's shards will be assigned to other workers. // For applications that have a large number of shards, this may be set to a higher number to reduce // the number of DynamoDB IOPS required for tracking leases. - DEFAULT_FAILOVER_TIME_MILLIS = 10000 + DefaultFailoverTimeMillis = 10000 // Period before the end of lease during which a lease is refreshed by the owner. - DEFAULT_LEASE_REFRESH_PERIOD_MILLIS = 5000 + DefaultLeaseRefreshPeriodMillis = 5000 // Max records to fetch from Kinesis in a single GetRecords call. - DEFAULT_MAX_RECORDS = 10000 + DefaultMaxRecords = 10000 // The default value for how long the {@link ShardConsumer} should sleep if no records are returned // from the call to - DEFAULT_IDLETIME_BETWEEN_READS_MILLIS = 1000 + DefaultIdletimeBetweenReadsMillis = 1000 // Don't call processRecords() on the record processor for empty record lists. - DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST = false + DefaultDontCallProcessRecordsForEmptyRecordList = false // Interval in milliseconds between polling to check for parent shard completion. // Polling frequently will take up more DynamoDB IOPS (when there are leases for shards waiting on // completion of parent shards). - DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS = 10000 + DefaultParentShardPollIntervalMillis = 10000 // Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks. - DEFAULT_SHARD_SYNC_INTERVAL_MILLIS = 60000 + DefaultShardSyncIntervalMillis = 60000 // Cleanup leases upon shards completion (don't wait until they expire in Kinesis). // Keeping leases takes some tracking/resources (e.g. they need to be renewed, assigned), so by // default we try to delete the ones we don't need any longer. - DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION = true + DefaultCleanupLeasesUponShardsCompletion = true // Backoff time in milliseconds for Amazon Kinesis Client Library tasks (in the event of failures). - DEFAULT_TASK_BACKOFF_TIME_MILLIS = 500 + DefaultTaskBackoffTimeMillis = 500 // KCL will validate client provided sequence numbers with a call to Amazon Kinesis before // checkpointing for calls to {@link RecordProcessorCheckpointer#checkpoint(String)} by default. - DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING = true + DefaultValidateSequenceNumberBeforeCheckpointing = true // The max number of leases (shards) this worker should process. // This can be useful to avoid overloading (and thrashing) a worker when a host has resource constraints // or during deployment. // NOTE: Setting this to a low value can cause data loss if workers are not able to pick up all shards in the // stream due to the max limit. - DEFAULT_MAX_LEASES_FOR_WORKER = math.MaxInt16 + DefaultMaxLeasesForWorker = math.MaxInt16 // Max leases to steal from another worker at one time (for load balancing). // Setting this to a higher number can allow for faster load convergence (e.g. during deployments, cold starts), // but can cause higher churn in the system. - DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME = 1 + DefaultMaxLeasesToStealAtOneTime = 1 // The Amazon DynamoDB table used for tracking leases will be provisioned with this read capacity. - DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY = 10 + DefaultInitialLeaseTableReadCapacity = 10 // The Amazon DynamoDB table used for tracking leases will be provisioned with this write capacity. - DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY = 10 + DefaultInitialLeaseTableWriteCapacity = 10 // The Worker will skip shard sync during initialization if there are one or more leases in the lease table. This // assumes that the shards and leases are in-sync. This enables customers to choose faster startup times (e.g. // during incremental deployments of an application). - DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST = false + DefaultSkipShardSyncAtStartupIfLeasesExist = false // The amount of milliseconds to wait before graceful shutdown forcefully terminates. - DEFAULT_SHUTDOWN_GRACE_MILLIS = 5000 + DefaultShutdownGraceMillis = 5000 ) type ( diff --git a/clientlibrary/config/kcl-config.go b/clientlibrary/config/kcl-config.go index d7960e9..e42d864 100644 --- a/clientlibrary/config/kcl-config.go +++ b/clientlibrary/config/kcl-config.go @@ -76,24 +76,24 @@ func NewKinesisClientLibConfigWithCredentials(applicationName, streamName, regio StreamName: streamName, RegionName: regionName, WorkerID: workerID, - InitialPositionInStream: DEFAULT_INITIAL_POSITION_IN_STREAM, - InitialPositionInStreamExtended: *newInitialPosition(DEFAULT_INITIAL_POSITION_IN_STREAM), - FailoverTimeMillis: DEFAULT_FAILOVER_TIME_MILLIS, - LeaseRefreshPeriodMillis: DEFAULT_LEASE_REFRESH_PERIOD_MILLIS, - MaxRecords: DEFAULT_MAX_RECORDS, - IdleTimeBetweenReadsInMillis: DEFAULT_IDLETIME_BETWEEN_READS_MILLIS, - CallProcessRecordsEvenForEmptyRecordList: DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST, - ParentShardPollIntervalMillis: DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS, - ShardSyncIntervalMillis: DEFAULT_SHARD_SYNC_INTERVAL_MILLIS, - CleanupTerminatedShardsBeforeExpiry: DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION, - TaskBackoffTimeMillis: DEFAULT_TASK_BACKOFF_TIME_MILLIS, - ValidateSequenceNumberBeforeCheckpointing: DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING, - ShutdownGraceMillis: DEFAULT_SHUTDOWN_GRACE_MILLIS, - MaxLeasesForWorker: DEFAULT_MAX_LEASES_FOR_WORKER, - MaxLeasesToStealAtOneTime: DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME, - InitialLeaseTableReadCapacity: DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY, - InitialLeaseTableWriteCapacity: DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY, - SkipShardSyncAtWorkerInitializationIfLeasesExist: DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, + InitialPositionInStream: DefaultInitialPositionInStream, + InitialPositionInStreamExtended: *newInitialPosition(DefaultInitialPositionInStream), + FailoverTimeMillis: DefaultFailoverTimeMillis, + LeaseRefreshPeriodMillis: DefaultLeaseRefreshPeriodMillis, + MaxRecords: DefaultMaxRecords, + IdleTimeBetweenReadsInMillis: DefaultIdletimeBetweenReadsMillis, + CallProcessRecordsEvenForEmptyRecordList: DefaultDontCallProcessRecordsForEmptyRecordList, + ParentShardPollIntervalMillis: DefaultParentShardPollIntervalMillis, + ShardSyncIntervalMillis: DefaultShardSyncIntervalMillis, + CleanupTerminatedShardsBeforeExpiry: DefaultCleanupLeasesUponShardsCompletion, + TaskBackoffTimeMillis: DefaultTaskBackoffTimeMillis, + ValidateSequenceNumberBeforeCheckpointing: DefaultValidateSequenceNumberBeforeCheckpointing, + ShutdownGraceMillis: DefaultShutdownGraceMillis, + MaxLeasesForWorker: DefaultMaxLeasesForWorker, + MaxLeasesToStealAtOneTime: DefaultMaxLeasesToStealAtOneTime, + InitialLeaseTableReadCapacity: DefaultInitialLeaseTableReadCapacity, + InitialLeaseTableWriteCapacity: DefaultInitialLeaseTableWriteCapacity, + SkipShardSyncAtWorkerInitializationIfLeasesExist: DefaultSkipShardSyncAtStartupIfLeasesExist, Logger: logger.GetDefaultLogger(), } } diff --git a/clientlibrary/worker/shard-consumer.go b/clientlibrary/worker/shard-consumer.go index 4cb963f..6d3a330 100644 --- a/clientlibrary/worker/shard-consumer.go +++ b/clientlibrary/worker/shard-consumer.go @@ -162,7 +162,7 @@ func (sc *ShardConsumer) getRecords(shard *par.ShardStatus) error { log.Debugf("Refreshing lease on shard: %s for worker: %s", shard.ID, sc.consumerID) err = sc.checkpointer.GetLease(shard, sc.consumerID) if err != nil { - if err.Error() == chk.ErrLeaseNotAquired { + if err.Error() == chk.ErrLeaseNotAcquired { log.Warnf("Failed in acquiring lease on shard: %s for worker: %s", shard.ID, sc.consumerID) return nil } @@ -225,7 +225,7 @@ func (sc *ShardConsumer) getRecords(shard *par.ShardStatus) error { recordLength := len(input.Records) recordBytes := int64(0) - log.Debugf("Received %d records, MillisBehindLatest: %v", recordLength, input.MillisBehindLatest) + log.Debugf("Received %d de-aggregated records, MillisBehindLatest: %v", recordLength, input.MillisBehindLatest) for _, r := range dars { recordBytes += int64(len(r.Data)) diff --git a/clientlibrary/worker/worker.go b/clientlibrary/worker/worker.go index 4b42847..907bf5d 100644 --- a/clientlibrary/worker/worker.go +++ b/clientlibrary/worker/worker.go @@ -277,7 +277,7 @@ func (w *Worker) eventLoop() { err = w.checkpointer.GetLease(shard, w.workerID) if err != nil { // cannot get lease on the shard - if err.Error() != chk.ErrLeaseNotAquired { + if err.Error() != chk.ErrLeaseNotAcquired { log.Errorf("Cannot get lease: %+v", err) } continue diff --git a/test/record_processor_test.go b/test/record_processor_test.go index f392efb..31a8556 100644 --- a/test/record_processor_test.go +++ b/test/record_processor_test.go @@ -71,7 +71,9 @@ func (dd *dumpRecordProcessor) ProcessRecords(input *kc.ProcessRecordsInput) { // Especially, for processing de-aggregated KPL records, checkpointing has to happen at the end of batch // because de-aggregated records share the same sequence number. lastRecordSequenceNumber := input.Records[len(input.Records)-1].SequenceNumber - dd.t.Logf("Checkpoint progress at: %v, MillisBehindLatest = %v", lastRecordSequenceNumber, input.MillisBehindLatest) + // Calculate the time taken from polling records and delivering to record processor for a batch. + diff := input.CacheExitTime.Sub(*input.CacheEntryTime) + dd.t.Logf("Checkpoint progress at: %v, MillisBehindLatest = %v, KCLProcessTime = %v", lastRecordSequenceNumber, input.MillisBehindLatest, diff) input.Checkpointer.Checkpoint(lastRecordSequenceNumber) }