Make the lease refresh period configurable (#56)
* Add LeaseRefreshSpanMillis in configuration For certain use cases of KCL the hard-coded value of 5s value, representing the time span before the end of a lease timeout in which the current owner gets to renew its own lease, is not sufficient. When the time taken by ProcessRecords is higher than 5s, the lease gets lost and the shard may end up to another worker. This commit adds a new configuration value, that defaults to 5s, to let the user set this value to its own needs. Signed-off-by: Aurélien Rainone <aurelien.rainone@gmail.com> * Slight code simplification Or readability improvement Signed-off-by: Aurélien Rainone <aurelien.rainone@gmail.com>
This commit is contained in:
parent
9ca9d901ca
commit
6c9e594751
4 changed files with 17 additions and 4 deletions
|
|
@ -135,7 +135,7 @@ func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssign
|
|||
return err
|
||||
}
|
||||
|
||||
if !time.Now().UTC().After(currentLeaseTimeout) && assignedTo != newAssignTo {
|
||||
if time.Now().UTC().Before(currentLeaseTimeout) && assignedTo != newAssignTo {
|
||||
return errors.New(ErrLeaseNotAquired)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -63,6 +63,9 @@ const (
|
|||
// the number of DynamoDB IOPS required for tracking leases.
|
||||
DEFAULT_FAILOVER_TIME_MILLIS = 10000
|
||||
|
||||
// Period before the end of lease during which a lease is refreshed by the owner.
|
||||
DEFAULT_LEASE_REFRESH_PERIOD_MILLIS = 5000
|
||||
|
||||
// Max records to fetch from Kinesis in a single GetRecords call.
|
||||
DEFAULT_MAX_RECORDS = 10000
|
||||
|
||||
|
|
@ -190,7 +193,10 @@ type (
|
|||
// FailoverTimeMillis Lease duration (leases not renewed within this period will be claimed by others)
|
||||
FailoverTimeMillis int
|
||||
|
||||
/// MaxRecords Max records to read per Kinesis getRecords() call
|
||||
// LeaseRefreshPeriodMillis is the period before the end of lease during which a lease is refreshed by the owner.
|
||||
LeaseRefreshPeriodMillis int
|
||||
|
||||
// MaxRecords Max records to read per Kinesis getRecords() call
|
||||
MaxRecords int
|
||||
|
||||
// IdleTimeBetweenReadsInMillis Idle time between calls to fetch data from Kinesis
|
||||
|
|
|
|||
|
|
@ -79,6 +79,7 @@ func NewKinesisClientLibConfigWithCredentials(applicationName, streamName, regio
|
|||
InitialPositionInStream: DEFAULT_INITIAL_POSITION_IN_STREAM,
|
||||
InitialPositionInStreamExtended: *newInitialPosition(DEFAULT_INITIAL_POSITION_IN_STREAM),
|
||||
FailoverTimeMillis: DEFAULT_FAILOVER_TIME_MILLIS,
|
||||
LeaseRefreshPeriodMillis: DEFAULT_LEASE_REFRESH_PERIOD_MILLIS,
|
||||
MaxRecords: DEFAULT_MAX_RECORDS,
|
||||
IdleTimeBetweenReadsInMillis: DEFAULT_IDLETIME_BETWEEN_READS_MILLIS,
|
||||
CallProcessRecordsEvenForEmptyRecordList: DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST,
|
||||
|
|
@ -133,6 +134,12 @@ func (c *KinesisClientLibConfiguration) WithFailoverTimeMillis(failoverTimeMilli
|
|||
return c
|
||||
}
|
||||
|
||||
func (c *KinesisClientLibConfiguration) WithLeaseRefreshPeriodMillis(leaseRefreshPeriodMillis int) *KinesisClientLibConfiguration {
|
||||
checkIsValuePositive("LeaseRefreshPeriodMillis", leaseRefreshPeriodMillis)
|
||||
c.LeaseRefreshPeriodMillis = leaseRefreshPeriodMillis
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *KinesisClientLibConfiguration) WithShardSyncIntervalMillis(shardSyncIntervalMillis int) *KinesisClientLibConfiguration {
|
||||
checkIsValuePositive("ShardSyncIntervalMillis", shardSyncIntervalMillis)
|
||||
c.ShardSyncIntervalMillis = shardSyncIntervalMillis
|
||||
|
|
|
|||
|
|
@ -169,7 +169,7 @@ func (sc *ShardConsumer) getRecords(shard *par.ShardStatus) error {
|
|||
retriedErrors := 0
|
||||
|
||||
for {
|
||||
if time.Now().UTC().After(shard.LeaseTimeout.Add(-5 * time.Second)) {
|
||||
if time.Now().UTC().After(shard.LeaseTimeout.Add(-time.Duration(sc.kclConfig.LeaseRefreshPeriodMillis) * time.Millisecond)) {
|
||||
log.Debugf("Refreshing lease on shard: %s for worker: %s", shard.ID, sc.consumerID)
|
||||
err = sc.checkpointer.GetLease(shard, sc.consumerID)
|
||||
if err != nil {
|
||||
|
|
@ -266,7 +266,7 @@ func (sc *ShardConsumer) getRecords(shard *par.ShardStatus) error {
|
|||
shutdownInput := &kcl.ShutdownInput{ShutdownReason: kcl.REQUESTED, Checkpointer: recordCheckpointer}
|
||||
sc.recordProcessor.Shutdown(shutdownInput)
|
||||
return nil
|
||||
case <-time.After(1 * time.Nanosecond):
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue