From 6c9e5947514055d504bcfb8739ae6c288311e54c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien=20Rainone?= <476650+arl@users.noreply.github.com> Date: Thu, 14 Nov 2019 00:15:33 +0100 Subject: [PATCH] Make the lease refresh period configurable (#56) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Add LeaseRefreshSpanMillis in configuration For certain use cases of KCL the hard-coded value of 5s value, representing the time span before the end of a lease timeout in which the current owner gets to renew its own lease, is not sufficient. When the time taken by ProcessRecords is higher than 5s, the lease gets lost and the shard may end up to another worker. This commit adds a new configuration value, that defaults to 5s, to let the user set this value to its own needs. Signed-off-by: Aurélien Rainone * Slight code simplification Or readability improvement Signed-off-by: Aurélien Rainone --- clientlibrary/checkpoint/dynamodb-checkpointer.go | 2 +- clientlibrary/config/config.go | 8 +++++++- clientlibrary/config/kcl-config.go | 7 +++++++ clientlibrary/worker/shard-consumer.go | 4 ++-- 4 files changed, 17 insertions(+), 4 deletions(-) diff --git a/clientlibrary/checkpoint/dynamodb-checkpointer.go b/clientlibrary/checkpoint/dynamodb-checkpointer.go index 1cce247..1b61f45 100644 --- a/clientlibrary/checkpoint/dynamodb-checkpointer.go +++ b/clientlibrary/checkpoint/dynamodb-checkpointer.go @@ -135,7 +135,7 @@ func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssign return err } - if !time.Now().UTC().After(currentLeaseTimeout) && assignedTo != newAssignTo { + if time.Now().UTC().Before(currentLeaseTimeout) && assignedTo != newAssignTo { return errors.New(ErrLeaseNotAquired) } diff --git a/clientlibrary/config/config.go b/clientlibrary/config/config.go index e4fd36e..c517645 100644 --- a/clientlibrary/config/config.go +++ b/clientlibrary/config/config.go @@ -63,6 +63,9 @@ const ( // the number of DynamoDB IOPS required for tracking leases. DEFAULT_FAILOVER_TIME_MILLIS = 10000 + // Period before the end of lease during which a lease is refreshed by the owner. + DEFAULT_LEASE_REFRESH_PERIOD_MILLIS = 5000 + // Max records to fetch from Kinesis in a single GetRecords call. DEFAULT_MAX_RECORDS = 10000 @@ -190,7 +193,10 @@ type ( // FailoverTimeMillis Lease duration (leases not renewed within this period will be claimed by others) FailoverTimeMillis int - /// MaxRecords Max records to read per Kinesis getRecords() call + // LeaseRefreshPeriodMillis is the period before the end of lease during which a lease is refreshed by the owner. + LeaseRefreshPeriodMillis int + + // MaxRecords Max records to read per Kinesis getRecords() call MaxRecords int // IdleTimeBetweenReadsInMillis Idle time between calls to fetch data from Kinesis diff --git a/clientlibrary/config/kcl-config.go b/clientlibrary/config/kcl-config.go index ec59be9..d7960e9 100644 --- a/clientlibrary/config/kcl-config.go +++ b/clientlibrary/config/kcl-config.go @@ -79,6 +79,7 @@ func NewKinesisClientLibConfigWithCredentials(applicationName, streamName, regio InitialPositionInStream: DEFAULT_INITIAL_POSITION_IN_STREAM, InitialPositionInStreamExtended: *newInitialPosition(DEFAULT_INITIAL_POSITION_IN_STREAM), FailoverTimeMillis: DEFAULT_FAILOVER_TIME_MILLIS, + LeaseRefreshPeriodMillis: DEFAULT_LEASE_REFRESH_PERIOD_MILLIS, MaxRecords: DEFAULT_MAX_RECORDS, IdleTimeBetweenReadsInMillis: DEFAULT_IDLETIME_BETWEEN_READS_MILLIS, CallProcessRecordsEvenForEmptyRecordList: DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST, @@ -133,6 +134,12 @@ func (c *KinesisClientLibConfiguration) WithFailoverTimeMillis(failoverTimeMilli return c } +func (c *KinesisClientLibConfiguration) WithLeaseRefreshPeriodMillis(leaseRefreshPeriodMillis int) *KinesisClientLibConfiguration { + checkIsValuePositive("LeaseRefreshPeriodMillis", leaseRefreshPeriodMillis) + c.LeaseRefreshPeriodMillis = leaseRefreshPeriodMillis + return c +} + func (c *KinesisClientLibConfiguration) WithShardSyncIntervalMillis(shardSyncIntervalMillis int) *KinesisClientLibConfiguration { checkIsValuePositive("ShardSyncIntervalMillis", shardSyncIntervalMillis) c.ShardSyncIntervalMillis = shardSyncIntervalMillis diff --git a/clientlibrary/worker/shard-consumer.go b/clientlibrary/worker/shard-consumer.go index 06a09eb..476ab01 100644 --- a/clientlibrary/worker/shard-consumer.go +++ b/clientlibrary/worker/shard-consumer.go @@ -169,7 +169,7 @@ func (sc *ShardConsumer) getRecords(shard *par.ShardStatus) error { retriedErrors := 0 for { - if time.Now().UTC().After(shard.LeaseTimeout.Add(-5 * time.Second)) { + if time.Now().UTC().After(shard.LeaseTimeout.Add(-time.Duration(sc.kclConfig.LeaseRefreshPeriodMillis) * time.Millisecond)) { log.Debugf("Refreshing lease on shard: %s for worker: %s", shard.ID, sc.consumerID) err = sc.checkpointer.GetLease(shard, sc.consumerID) if err != nil { @@ -266,7 +266,7 @@ func (sc *ShardConsumer) getRecords(shard *par.ShardStatus) error { shutdownInput := &kcl.ShutdownInput{ShutdownReason: kcl.REQUESTED, Checkpointer: recordCheckpointer} sc.recordProcessor.Shutdown(shutdownInput) return nil - case <-time.After(1 * time.Nanosecond): + default: } } }