Merge pull request #34 from vmware/spentakota_asyncLeaseRenewal
feat: make lease renewal async
This commit is contained in:
commit
6516287f6d
6 changed files with 45 additions and 48 deletions
|
|
@ -69,6 +69,9 @@ const (
|
|||
// DefaultLeaseRefreshPeriodMillis Period before the end of lease during which a lease is refreshed by the owner.
|
||||
DefaultLeaseRefreshPeriodMillis = 5000
|
||||
|
||||
// DefaultLeaseRefreshWaitTime is the period of time to wait before async lease renewal attempt
|
||||
DefaultLeaseRefreshWaitTime = 2500
|
||||
|
||||
// DefaultMaxRecords Max records to fetch from Kinesis in a single GetRecords call.
|
||||
DefaultMaxRecords = 10000
|
||||
|
||||
|
|
@ -216,6 +219,9 @@ type (
|
|||
// LeaseRefreshPeriodMillis is the period before the end of lease during which a lease is refreshed by the owner.
|
||||
LeaseRefreshPeriodMillis int
|
||||
|
||||
// LeaseRefreshWaitTime is the period of time to wait before async lease renewal attempt
|
||||
LeaseRefreshWaitTime int
|
||||
|
||||
// MaxRecords Max records to read per Kinesis getRecords() call
|
||||
MaxRecords int
|
||||
|
||||
|
|
|
|||
|
|
@ -102,6 +102,7 @@ func NewKinesisClientLibConfigWithCredentials(applicationName, streamName, regio
|
|||
LeaseStealingIntervalMillis: DefaultLeaseStealingIntervalMillis,
|
||||
LeaseStealingClaimTimeoutMillis: DefaultLeaseStealingClaimTimeoutMillis,
|
||||
LeaseSyncingTimeIntervalMillis: DefaultLeaseSyncingIntervalMillis,
|
||||
LeaseRefreshWaitTime: DefaultLeaseRefreshWaitTime,
|
||||
MaxRetryCount: DefaultMaxRetryCount,
|
||||
Logger: logger.GetDefaultLogger(),
|
||||
}
|
||||
|
|
@ -149,6 +150,12 @@ func (c *KinesisClientLibConfiguration) WithLeaseRefreshPeriodMillis(leaseRefres
|
|||
return c
|
||||
}
|
||||
|
||||
func (c *KinesisClientLibConfiguration) WithLeaseRefreshWaitTime(leaseRefreshWaitTime int) *KinesisClientLibConfiguration {
|
||||
checkIsValuePositive("LeaseRefreshWaitTime", leaseRefreshWaitTime)
|
||||
c.LeaseRefreshWaitTime = leaseRefreshWaitTime
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *KinesisClientLibConfiguration) WithShardSyncIntervalMillis(shardSyncIntervalMillis int) *KinesisClientLibConfiguration {
|
||||
checkIsValuePositive("ShardSyncIntervalMillis", shardSyncIntervalMillis)
|
||||
c.ShardSyncIntervalMillis = shardSyncIntervalMillis
|
||||
|
|
|
|||
|
|
@ -59,7 +59,7 @@ type (
|
|||
* @param processRecordsInput Provides the records to be processed as well as information and capabilities related
|
||||
* to them (eg checkpointing).
|
||||
*/
|
||||
ProcessRecords(processRecordsInput *ProcessRecordsInput) error
|
||||
ProcessRecords(processRecordsInput *ProcessRecordsInput)
|
||||
|
||||
// Shutdown
|
||||
/*
|
||||
|
|
|
|||
|
|
@ -136,7 +136,7 @@ func (sc *commonShardConsumer) waitOnParentShard() error {
|
|||
}
|
||||
}
|
||||
|
||||
func (sc *commonShardConsumer) processRecords(getRecordsStartTime time.Time, records []types.Record, millisBehindLatest *int64, recordCheckpointer kcl.IRecordProcessorCheckpointer) error {
|
||||
func (sc *commonShardConsumer) processRecords(getRecordsStartTime time.Time, records []types.Record, millisBehindLatest *int64, recordCheckpointer kcl.IRecordProcessorCheckpointer) {
|
||||
log := sc.kclConfig.Logger
|
||||
|
||||
getRecordsTime := time.Since(getRecordsStartTime).Milliseconds()
|
||||
|
|
@ -172,11 +172,7 @@ func (sc *commonShardConsumer) processRecords(getRecordsStartTime time.Time, rec
|
|||
// Delivery the events to the record processor
|
||||
input.CacheEntryTime = &getRecordsStartTime
|
||||
input.CacheExitTime = &processRecordsStartTime
|
||||
err := sc.recordProcessor.ProcessRecords(input)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
sc.recordProcessor.ProcessRecords(input)
|
||||
processedRecordsTiming := time.Since(processRecordsStartTime).Milliseconds()
|
||||
sc.mService.RecordProcessRecordsTime(sc.shard.ID, float64(processedRecordsTiming))
|
||||
}
|
||||
|
|
@ -184,5 +180,4 @@ func (sc *commonShardConsumer) processRecords(getRecordsStartTime time.Time, rec
|
|||
sc.mService.IncrRecordsProcessed(sc.shard.ID, recordLength)
|
||||
sc.mService.IncrBytesProcessed(sc.shard.ID, recordBytes)
|
||||
sc.mService.MillisBehindLatest(sc.shard.ID, float64(*millisBehindLatest))
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -32,6 +32,7 @@ package worker
|
|||
import (
|
||||
"context"
|
||||
"errors"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"math"
|
||||
"time"
|
||||
|
||||
|
|
@ -132,25 +133,12 @@ func (sc *PollingShardConsumer) getRecords() error {
|
|||
sc.callsLeft = kinesisReadTPSLimit
|
||||
sc.bytesRead = 0
|
||||
sc.remBytes = MaxBytes
|
||||
|
||||
// starting async lease renewal thread
|
||||
leaseRenewalErrChan := make(chan error, 1)
|
||||
go func() {
|
||||
leaseRenewalErrChan <- sc.renewLease()
|
||||
}()
|
||||
for {
|
||||
if time.Now().UTC().After(sc.shard.GetLeaseTimeout().Add(-time.Duration(sc.kclConfig.LeaseRefreshPeriodMillis) * time.Millisecond)) {
|
||||
log.Debugf("Refreshing lease on shard: %s for worker: %s", sc.shard.ID, sc.consumerID)
|
||||
err = sc.checkpointer.GetLease(sc.shard, sc.consumerID)
|
||||
if err != nil {
|
||||
if errors.As(err, &chk.ErrLeaseNotAcquired{}) {
|
||||
log.Warnf("Failed in acquiring lease on shard: %s for worker: %s", sc.shard.ID, sc.consumerID)
|
||||
return nil
|
||||
}
|
||||
// log and return error
|
||||
log.Errorf("Error in refreshing lease on shard: %s for worker: %s. Error: %+v",
|
||||
sc.shard.ID, sc.consumerID, err)
|
||||
return err
|
||||
}
|
||||
// log metric for renewed lease for worker
|
||||
sc.mService.LeaseRenewed(sc.shard.ID)
|
||||
}
|
||||
|
||||
getRecordsStartTime := time.Now()
|
||||
|
||||
log.Debugf("Trying to read %d record from iterator: %v", sc.kclConfig.MaxRecords, aws.ToString(shardIterator))
|
||||
|
|
@ -214,10 +202,7 @@ func (sc *PollingShardConsumer) getRecords() error {
|
|||
// reset the retry count after success
|
||||
retriedErrors = 0
|
||||
|
||||
err = sc.processRecords(getRecordsStartTime, getResp.Records, getResp.MillisBehindLatest, recordCheckpointer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
sc.processRecords(getRecordsStartTime, getResp.Records, getResp.MillisBehindLatest, recordCheckpointer)
|
||||
|
||||
// The shard has been closed, so no new records can be read from it
|
||||
if getResp.NextShardIterator == nil {
|
||||
|
|
@ -240,6 +225,8 @@ func (sc *PollingShardConsumer) getRecords() error {
|
|||
shutdownInput := &kcl.ShutdownInput{ShutdownReason: kcl.REQUESTED, Checkpointer: recordCheckpointer}
|
||||
sc.recordProcessor.Shutdown(shutdownInput)
|
||||
return nil
|
||||
case leaseRenewalErr := <-leaseRenewalErrChan:
|
||||
return leaseRenewalErr
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
|
@ -312,3 +299,23 @@ func (sc *PollingShardConsumer) callGetRecordsAPI(gri *kinesis.GetRecordsInput)
|
|||
|
||||
return getResp, 0, err
|
||||
}
|
||||
|
||||
func (sc *PollingShardConsumer) renewLease() error {
|
||||
for {
|
||||
time.Sleep(time.Duration(sc.kclConfig.LeaseRefreshWaitTime) * time.Millisecond)
|
||||
log.Debugf("Refreshing lease on shard: %s for worker: %s", sc.shard.ID, sc.consumerID)
|
||||
err := sc.checkpointer.GetLease(sc.shard, sc.consumerID)
|
||||
if err != nil {
|
||||
if errors.As(err, &chk.ErrLeaseNotAcquired{}) {
|
||||
log.Warnf("Failed in acquiring lease on shard: %s for worker: %s", sc.shard.ID, sc.consumerID)
|
||||
return nil
|
||||
}
|
||||
// log and return error
|
||||
log.Errorf("Error in refreshing lease on shard: %s for worker: %s. Error: %+v",
|
||||
sc.shard.ID, sc.consumerID, err)
|
||||
return err
|
||||
}
|
||||
// log metric for renewed lease for worker
|
||||
sc.mService.LeaseRenewed(sc.shard.ID)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -21,17 +21,10 @@
|
|||
package worker
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"github.com/aws/aws-sdk-go-v2/aws"
|
||||
chk "github.com/vmware/vmware-go-kcl-v2/clientlibrary/checkpoint"
|
||||
kcl "github.com/vmware/vmware-go-kcl-v2/clientlibrary/interfaces"
|
||||
par "github.com/vmware/vmware-go-kcl-v2/clientlibrary/partition"
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
ShutdownError = errors.New("another instance may have started processing some of these records already")
|
||||
LeaseExpiredError = errors.New("the lease has on the shard has expired")
|
||||
)
|
||||
|
||||
type (
|
||||
|
|
@ -75,17 +68,6 @@ func (pc *PreparedCheckpointer) Checkpoint() error {
|
|||
}
|
||||
|
||||
func (rc *RecordProcessorCheckpointer) Checkpoint(sequenceNumber *string) error {
|
||||
// return shutdown error if lease is expired or another worker has started processing records for this shard
|
||||
currLeaseOwner, err := rc.checkpoint.GetLeaseOwner(rc.shard.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if rc.shard.AssignedTo != currLeaseOwner {
|
||||
return ShutdownError
|
||||
}
|
||||
if time.Now().After(rc.shard.LeaseTimeout) {
|
||||
return LeaseExpiredError
|
||||
}
|
||||
// checkpoint the last sequence of a closed shard
|
||||
if sequenceNumber == nil {
|
||||
rc.shard.SetCheckpoint(chk.ShardEnd)
|
||||
|
|
|
|||
Loading…
Reference in a new issue