feat: make lease renewal async

Signed-off-by: Shiva Pentakota <spentakota@vmware.com>
This commit is contained in:
Shiva Pentakota 2023-04-03 14:58:04 -07:00
parent 5be0422f33
commit 4aebaf1ae0
6 changed files with 45 additions and 48 deletions

View file

@ -69,6 +69,9 @@ const (
// DefaultLeaseRefreshPeriodMillis Period before the end of lease during which a lease is refreshed by the owner.
DefaultLeaseRefreshPeriodMillis = 5000
// DefaultLeaseRefreshWaitTime is the period of time to wait before async lease renewal attempt
DefaultLeaseRefreshWaitTime = 2500
// DefaultMaxRecords Max records to fetch from Kinesis in a single GetRecords call.
DefaultMaxRecords = 10000
@ -216,6 +219,9 @@ type (
// LeaseRefreshPeriodMillis is the period before the end of lease during which a lease is refreshed by the owner.
LeaseRefreshPeriodMillis int
// LeaseRefreshWaitTime is the period of time to wait before async lease renewal attempt
LeaseRefreshWaitTime int
// MaxRecords Max records to read per Kinesis getRecords() call
MaxRecords int

View file

@ -102,6 +102,7 @@ func NewKinesisClientLibConfigWithCredentials(applicationName, streamName, regio
LeaseStealingIntervalMillis: DefaultLeaseStealingIntervalMillis,
LeaseStealingClaimTimeoutMillis: DefaultLeaseStealingClaimTimeoutMillis,
LeaseSyncingTimeIntervalMillis: DefaultLeaseSyncingIntervalMillis,
LeaseRefreshWaitTime: DefaultLeaseRefreshWaitTime,
MaxRetryCount: DefaultMaxRetryCount,
Logger: logger.GetDefaultLogger(),
}
@ -149,6 +150,12 @@ func (c *KinesisClientLibConfiguration) WithLeaseRefreshPeriodMillis(leaseRefres
return c
}
func (c *KinesisClientLibConfiguration) WithLeaseRefreshWaitTime(leaseRefreshWaitTime int) *KinesisClientLibConfiguration {
checkIsValuePositive("LeaseRefreshWaitTime", leaseRefreshWaitTime)
c.LeaseRefreshWaitTime = leaseRefreshWaitTime
return c
}
func (c *KinesisClientLibConfiguration) WithShardSyncIntervalMillis(shardSyncIntervalMillis int) *KinesisClientLibConfiguration {
checkIsValuePositive("ShardSyncIntervalMillis", shardSyncIntervalMillis)
c.ShardSyncIntervalMillis = shardSyncIntervalMillis

View file

@ -59,7 +59,7 @@ type (
* @param processRecordsInput Provides the records to be processed as well as information and capabilities related
* to them (eg checkpointing).
*/
ProcessRecords(processRecordsInput *ProcessRecordsInput) error
ProcessRecords(processRecordsInput *ProcessRecordsInput)
// Shutdown
/*

View file

@ -136,7 +136,7 @@ func (sc *commonShardConsumer) waitOnParentShard() error {
}
}
func (sc *commonShardConsumer) processRecords(getRecordsStartTime time.Time, records []types.Record, millisBehindLatest *int64, recordCheckpointer kcl.IRecordProcessorCheckpointer) error {
func (sc *commonShardConsumer) processRecords(getRecordsStartTime time.Time, records []types.Record, millisBehindLatest *int64, recordCheckpointer kcl.IRecordProcessorCheckpointer) {
log := sc.kclConfig.Logger
getRecordsTime := time.Since(getRecordsStartTime).Milliseconds()
@ -172,11 +172,7 @@ func (sc *commonShardConsumer) processRecords(getRecordsStartTime time.Time, rec
// Delivery the events to the record processor
input.CacheEntryTime = &getRecordsStartTime
input.CacheExitTime = &processRecordsStartTime
err := sc.recordProcessor.ProcessRecords(input)
if err != nil {
return err
}
sc.recordProcessor.ProcessRecords(input)
processedRecordsTiming := time.Since(processRecordsStartTime).Milliseconds()
sc.mService.RecordProcessRecordsTime(sc.shard.ID, float64(processedRecordsTiming))
}
@ -184,5 +180,4 @@ func (sc *commonShardConsumer) processRecords(getRecordsStartTime time.Time, rec
sc.mService.IncrRecordsProcessed(sc.shard.ID, recordLength)
sc.mService.IncrBytesProcessed(sc.shard.ID, recordBytes)
sc.mService.MillisBehindLatest(sc.shard.ID, float64(*millisBehindLatest))
return nil
}

View file

@ -32,6 +32,7 @@ package worker
import (
"context"
"errors"
log "github.com/sirupsen/logrus"
"math"
"time"
@ -132,25 +133,12 @@ func (sc *PollingShardConsumer) getRecords() error {
sc.callsLeft = kinesisReadTPSLimit
sc.bytesRead = 0
sc.remBytes = MaxBytes
// starting async lease renewal thread
leaseRenewalErrChan := make(chan error, 1)
go func() {
leaseRenewalErrChan <- sc.renewLease()
}()
for {
if time.Now().UTC().After(sc.shard.GetLeaseTimeout().Add(-time.Duration(sc.kclConfig.LeaseRefreshPeriodMillis) * time.Millisecond)) {
log.Debugf("Refreshing lease on shard: %s for worker: %s", sc.shard.ID, sc.consumerID)
err = sc.checkpointer.GetLease(sc.shard, sc.consumerID)
if err != nil {
if errors.As(err, &chk.ErrLeaseNotAcquired{}) {
log.Warnf("Failed in acquiring lease on shard: %s for worker: %s", sc.shard.ID, sc.consumerID)
return nil
}
// log and return error
log.Errorf("Error in refreshing lease on shard: %s for worker: %s. Error: %+v",
sc.shard.ID, sc.consumerID, err)
return err
}
// log metric for renewed lease for worker
sc.mService.LeaseRenewed(sc.shard.ID)
}
getRecordsStartTime := time.Now()
log.Debugf("Trying to read %d record from iterator: %v", sc.kclConfig.MaxRecords, aws.ToString(shardIterator))
@ -214,10 +202,7 @@ func (sc *PollingShardConsumer) getRecords() error {
// reset the retry count after success
retriedErrors = 0
err = sc.processRecords(getRecordsStartTime, getResp.Records, getResp.MillisBehindLatest, recordCheckpointer)
if err != nil {
return err
}
sc.processRecords(getRecordsStartTime, getResp.Records, getResp.MillisBehindLatest, recordCheckpointer)
// The shard has been closed, so no new records can be read from it
if getResp.NextShardIterator == nil {
@ -240,6 +225,8 @@ func (sc *PollingShardConsumer) getRecords() error {
shutdownInput := &kcl.ShutdownInput{ShutdownReason: kcl.REQUESTED, Checkpointer: recordCheckpointer}
sc.recordProcessor.Shutdown(shutdownInput)
return nil
case leaseRenewalErr := <-leaseRenewalErrChan:
return leaseRenewalErr
default:
}
}
@ -312,3 +299,23 @@ func (sc *PollingShardConsumer) callGetRecordsAPI(gri *kinesis.GetRecordsInput)
return getResp, 0, err
}
func (sc *PollingShardConsumer) renewLease() error {
for {
time.Sleep(time.Duration(sc.kclConfig.LeaseRefreshWaitTime) * time.Millisecond)
log.Debugf("Refreshing lease on shard: %s for worker: %s", sc.shard.ID, sc.consumerID)
err := sc.checkpointer.GetLease(sc.shard, sc.consumerID)
if err != nil {
if errors.As(err, &chk.ErrLeaseNotAcquired{}) {
log.Warnf("Failed in acquiring lease on shard: %s for worker: %s", sc.shard.ID, sc.consumerID)
return nil
}
// log and return error
log.Errorf("Error in refreshing lease on shard: %s for worker: %s. Error: %+v",
sc.shard.ID, sc.consumerID, err)
return err
}
// log metric for renewed lease for worker
sc.mService.LeaseRenewed(sc.shard.ID)
}
}

View file

@ -21,17 +21,10 @@
package worker
import (
"errors"
"github.com/aws/aws-sdk-go-v2/aws"
chk "github.com/vmware/vmware-go-kcl-v2/clientlibrary/checkpoint"
kcl "github.com/vmware/vmware-go-kcl-v2/clientlibrary/interfaces"
par "github.com/vmware/vmware-go-kcl-v2/clientlibrary/partition"
"time"
)
var (
ShutdownError = errors.New("another instance may have started processing some of these records already")
LeaseExpiredError = errors.New("the lease has on the shard has expired")
)
type (
@ -75,17 +68,6 @@ func (pc *PreparedCheckpointer) Checkpoint() error {
}
func (rc *RecordProcessorCheckpointer) Checkpoint(sequenceNumber *string) error {
// return shutdown error if lease is expired or another worker has started processing records for this shard
currLeaseOwner, err := rc.checkpoint.GetLeaseOwner(rc.shard.ID)
if err != nil {
return err
}
if rc.shard.AssignedTo != currLeaseOwner {
return ShutdownError
}
if time.Now().After(rc.shard.LeaseTimeout) {
return LeaseExpiredError
}
// checkpoint the last sequence of a closed shard
if sequenceNumber == nil {
rc.shard.SetCheckpoint(chk.ShardEnd)