From 4aebaf1ae019fc0a5ac8cdeb9a2d9641130fba0d Mon Sep 17 00:00:00 2001 From: Shiva Pentakota Date: Mon, 3 Apr 2023 14:58:04 -0700 Subject: [PATCH] feat: make lease renewal async Signed-off-by: Shiva Pentakota --- clientlibrary/config/config.go | 6 +++ clientlibrary/config/kcl-config.go | 7 +++ clientlibrary/interfaces/record-processor.go | 2 +- clientlibrary/worker/common-shard-consumer.go | 9 +--- .../worker/polling-shard-consumer.go | 51 +++++++++++-------- .../worker/record-processor-checkpointer.go | 18 ------- 6 files changed, 45 insertions(+), 48 deletions(-) diff --git a/clientlibrary/config/config.go b/clientlibrary/config/config.go index 2d50ca8..57fd5a7 100644 --- a/clientlibrary/config/config.go +++ b/clientlibrary/config/config.go @@ -69,6 +69,9 @@ const ( // DefaultLeaseRefreshPeriodMillis Period before the end of lease during which a lease is refreshed by the owner. DefaultLeaseRefreshPeriodMillis = 5000 + // DefaultLeaseRefreshWaitTime is the period of time to wait before async lease renewal attempt + DefaultLeaseRefreshWaitTime = 2500 + // DefaultMaxRecords Max records to fetch from Kinesis in a single GetRecords call. DefaultMaxRecords = 10000 @@ -216,6 +219,9 @@ type ( // LeaseRefreshPeriodMillis is the period before the end of lease during which a lease is refreshed by the owner. LeaseRefreshPeriodMillis int + // LeaseRefreshWaitTime is the period of time to wait before async lease renewal attempt + LeaseRefreshWaitTime int + // MaxRecords Max records to read per Kinesis getRecords() call MaxRecords int diff --git a/clientlibrary/config/kcl-config.go b/clientlibrary/config/kcl-config.go index 4d7181b..2ac2fa5 100644 --- a/clientlibrary/config/kcl-config.go +++ b/clientlibrary/config/kcl-config.go @@ -102,6 +102,7 @@ func NewKinesisClientLibConfigWithCredentials(applicationName, streamName, regio LeaseStealingIntervalMillis: DefaultLeaseStealingIntervalMillis, LeaseStealingClaimTimeoutMillis: DefaultLeaseStealingClaimTimeoutMillis, LeaseSyncingTimeIntervalMillis: DefaultLeaseSyncingIntervalMillis, + LeaseRefreshWaitTime: DefaultLeaseRefreshWaitTime, MaxRetryCount: DefaultMaxRetryCount, Logger: logger.GetDefaultLogger(), } @@ -149,6 +150,12 @@ func (c *KinesisClientLibConfiguration) WithLeaseRefreshPeriodMillis(leaseRefres return c } +func (c *KinesisClientLibConfiguration) WithLeaseRefreshWaitTime(leaseRefreshWaitTime int) *KinesisClientLibConfiguration { + checkIsValuePositive("LeaseRefreshWaitTime", leaseRefreshWaitTime) + c.LeaseRefreshWaitTime = leaseRefreshWaitTime + return c +} + func (c *KinesisClientLibConfiguration) WithShardSyncIntervalMillis(shardSyncIntervalMillis int) *KinesisClientLibConfiguration { checkIsValuePositive("ShardSyncIntervalMillis", shardSyncIntervalMillis) c.ShardSyncIntervalMillis = shardSyncIntervalMillis diff --git a/clientlibrary/interfaces/record-processor.go b/clientlibrary/interfaces/record-processor.go index a4897d4..1c41d56 100644 --- a/clientlibrary/interfaces/record-processor.go +++ b/clientlibrary/interfaces/record-processor.go @@ -59,7 +59,7 @@ type ( * @param processRecordsInput Provides the records to be processed as well as information and capabilities related * to them (eg checkpointing). */ - ProcessRecords(processRecordsInput *ProcessRecordsInput) error + ProcessRecords(processRecordsInput *ProcessRecordsInput) // Shutdown /* diff --git a/clientlibrary/worker/common-shard-consumer.go b/clientlibrary/worker/common-shard-consumer.go index 36ddb77..e2b24f7 100644 --- a/clientlibrary/worker/common-shard-consumer.go +++ b/clientlibrary/worker/common-shard-consumer.go @@ -136,7 +136,7 @@ func (sc *commonShardConsumer) waitOnParentShard() error { } } -func (sc *commonShardConsumer) processRecords(getRecordsStartTime time.Time, records []types.Record, millisBehindLatest *int64, recordCheckpointer kcl.IRecordProcessorCheckpointer) error { +func (sc *commonShardConsumer) processRecords(getRecordsStartTime time.Time, records []types.Record, millisBehindLatest *int64, recordCheckpointer kcl.IRecordProcessorCheckpointer) { log := sc.kclConfig.Logger getRecordsTime := time.Since(getRecordsStartTime).Milliseconds() @@ -172,11 +172,7 @@ func (sc *commonShardConsumer) processRecords(getRecordsStartTime time.Time, rec // Delivery the events to the record processor input.CacheEntryTime = &getRecordsStartTime input.CacheExitTime = &processRecordsStartTime - err := sc.recordProcessor.ProcessRecords(input) - if err != nil { - return err - } - + sc.recordProcessor.ProcessRecords(input) processedRecordsTiming := time.Since(processRecordsStartTime).Milliseconds() sc.mService.RecordProcessRecordsTime(sc.shard.ID, float64(processedRecordsTiming)) } @@ -184,5 +180,4 @@ func (sc *commonShardConsumer) processRecords(getRecordsStartTime time.Time, rec sc.mService.IncrRecordsProcessed(sc.shard.ID, recordLength) sc.mService.IncrBytesProcessed(sc.shard.ID, recordBytes) sc.mService.MillisBehindLatest(sc.shard.ID, float64(*millisBehindLatest)) - return nil } diff --git a/clientlibrary/worker/polling-shard-consumer.go b/clientlibrary/worker/polling-shard-consumer.go index 7211842..0abc5fd 100644 --- a/clientlibrary/worker/polling-shard-consumer.go +++ b/clientlibrary/worker/polling-shard-consumer.go @@ -32,6 +32,7 @@ package worker import ( "context" "errors" + log "github.com/sirupsen/logrus" "math" "time" @@ -132,25 +133,12 @@ func (sc *PollingShardConsumer) getRecords() error { sc.callsLeft = kinesisReadTPSLimit sc.bytesRead = 0 sc.remBytes = MaxBytes - + // starting async lease renewal thread + leaseRenewalErrChan := make(chan error, 1) + go func() { + leaseRenewalErrChan <- sc.renewLease() + }() for { - if time.Now().UTC().After(sc.shard.GetLeaseTimeout().Add(-time.Duration(sc.kclConfig.LeaseRefreshPeriodMillis) * time.Millisecond)) { - log.Debugf("Refreshing lease on shard: %s for worker: %s", sc.shard.ID, sc.consumerID) - err = sc.checkpointer.GetLease(sc.shard, sc.consumerID) - if err != nil { - if errors.As(err, &chk.ErrLeaseNotAcquired{}) { - log.Warnf("Failed in acquiring lease on shard: %s for worker: %s", sc.shard.ID, sc.consumerID) - return nil - } - // log and return error - log.Errorf("Error in refreshing lease on shard: %s for worker: %s. Error: %+v", - sc.shard.ID, sc.consumerID, err) - return err - } - // log metric for renewed lease for worker - sc.mService.LeaseRenewed(sc.shard.ID) - } - getRecordsStartTime := time.Now() log.Debugf("Trying to read %d record from iterator: %v", sc.kclConfig.MaxRecords, aws.ToString(shardIterator)) @@ -214,10 +202,7 @@ func (sc *PollingShardConsumer) getRecords() error { // reset the retry count after success retriedErrors = 0 - err = sc.processRecords(getRecordsStartTime, getResp.Records, getResp.MillisBehindLatest, recordCheckpointer) - if err != nil { - return err - } + sc.processRecords(getRecordsStartTime, getResp.Records, getResp.MillisBehindLatest, recordCheckpointer) // The shard has been closed, so no new records can be read from it if getResp.NextShardIterator == nil { @@ -240,6 +225,8 @@ func (sc *PollingShardConsumer) getRecords() error { shutdownInput := &kcl.ShutdownInput{ShutdownReason: kcl.REQUESTED, Checkpointer: recordCheckpointer} sc.recordProcessor.Shutdown(shutdownInput) return nil + case leaseRenewalErr := <-leaseRenewalErrChan: + return leaseRenewalErr default: } } @@ -312,3 +299,23 @@ func (sc *PollingShardConsumer) callGetRecordsAPI(gri *kinesis.GetRecordsInput) return getResp, 0, err } + +func (sc *PollingShardConsumer) renewLease() error { + for { + time.Sleep(time.Duration(sc.kclConfig.LeaseRefreshWaitTime) * time.Millisecond) + log.Debugf("Refreshing lease on shard: %s for worker: %s", sc.shard.ID, sc.consumerID) + err := sc.checkpointer.GetLease(sc.shard, sc.consumerID) + if err != nil { + if errors.As(err, &chk.ErrLeaseNotAcquired{}) { + log.Warnf("Failed in acquiring lease on shard: %s for worker: %s", sc.shard.ID, sc.consumerID) + return nil + } + // log and return error + log.Errorf("Error in refreshing lease on shard: %s for worker: %s. Error: %+v", + sc.shard.ID, sc.consumerID, err) + return err + } + // log metric for renewed lease for worker + sc.mService.LeaseRenewed(sc.shard.ID) + } +} diff --git a/clientlibrary/worker/record-processor-checkpointer.go b/clientlibrary/worker/record-processor-checkpointer.go index 101137f..8f71f14 100644 --- a/clientlibrary/worker/record-processor-checkpointer.go +++ b/clientlibrary/worker/record-processor-checkpointer.go @@ -21,17 +21,10 @@ package worker import ( - "errors" "github.com/aws/aws-sdk-go-v2/aws" chk "github.com/vmware/vmware-go-kcl-v2/clientlibrary/checkpoint" kcl "github.com/vmware/vmware-go-kcl-v2/clientlibrary/interfaces" par "github.com/vmware/vmware-go-kcl-v2/clientlibrary/partition" - "time" -) - -var ( - ShutdownError = errors.New("another instance may have started processing some of these records already") - LeaseExpiredError = errors.New("the lease has on the shard has expired") ) type ( @@ -75,17 +68,6 @@ func (pc *PreparedCheckpointer) Checkpoint() error { } func (rc *RecordProcessorCheckpointer) Checkpoint(sequenceNumber *string) error { - // return shutdown error if lease is expired or another worker has started processing records for this shard - currLeaseOwner, err := rc.checkpoint.GetLeaseOwner(rc.shard.ID) - if err != nil { - return err - } - if rc.shard.AssignedTo != currLeaseOwner { - return ShutdownError - } - if time.Now().After(rc.shard.LeaseTimeout) { - return LeaseExpiredError - } // checkpoint the last sequence of a closed shard if sequenceNumber == nil { rc.shard.SetCheckpoint(chk.ShardEnd)