diff --git a/clientlibrary/checkpoint/dynamodb-checkpointer.go b/clientlibrary/checkpoint/dynamodb-checkpointer.go index 1cce247..1b61f45 100644 --- a/clientlibrary/checkpoint/dynamodb-checkpointer.go +++ b/clientlibrary/checkpoint/dynamodb-checkpointer.go @@ -135,7 +135,7 @@ func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssign return err } - if !time.Now().UTC().After(currentLeaseTimeout) && assignedTo != newAssignTo { + if time.Now().UTC().Before(currentLeaseTimeout) && assignedTo != newAssignTo { return errors.New(ErrLeaseNotAquired) } diff --git a/clientlibrary/config/config.go b/clientlibrary/config/config.go index e4fd36e..c517645 100644 --- a/clientlibrary/config/config.go +++ b/clientlibrary/config/config.go @@ -63,6 +63,9 @@ const ( // the number of DynamoDB IOPS required for tracking leases. DEFAULT_FAILOVER_TIME_MILLIS = 10000 + // Period before the end of lease during which a lease is refreshed by the owner. + DEFAULT_LEASE_REFRESH_PERIOD_MILLIS = 5000 + // Max records to fetch from Kinesis in a single GetRecords call. DEFAULT_MAX_RECORDS = 10000 @@ -190,7 +193,10 @@ type ( // FailoverTimeMillis Lease duration (leases not renewed within this period will be claimed by others) FailoverTimeMillis int - /// MaxRecords Max records to read per Kinesis getRecords() call + // LeaseRefreshPeriodMillis is the period before the end of lease during which a lease is refreshed by the owner. + LeaseRefreshPeriodMillis int + + // MaxRecords Max records to read per Kinesis getRecords() call MaxRecords int // IdleTimeBetweenReadsInMillis Idle time between calls to fetch data from Kinesis diff --git a/clientlibrary/config/kcl-config.go b/clientlibrary/config/kcl-config.go index ec59be9..d7960e9 100644 --- a/clientlibrary/config/kcl-config.go +++ b/clientlibrary/config/kcl-config.go @@ -79,6 +79,7 @@ func NewKinesisClientLibConfigWithCredentials(applicationName, streamName, regio InitialPositionInStream: DEFAULT_INITIAL_POSITION_IN_STREAM, InitialPositionInStreamExtended: *newInitialPosition(DEFAULT_INITIAL_POSITION_IN_STREAM), FailoverTimeMillis: DEFAULT_FAILOVER_TIME_MILLIS, + LeaseRefreshPeriodMillis: DEFAULT_LEASE_REFRESH_PERIOD_MILLIS, MaxRecords: DEFAULT_MAX_RECORDS, IdleTimeBetweenReadsInMillis: DEFAULT_IDLETIME_BETWEEN_READS_MILLIS, CallProcessRecordsEvenForEmptyRecordList: DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST, @@ -133,6 +134,12 @@ func (c *KinesisClientLibConfiguration) WithFailoverTimeMillis(failoverTimeMilli return c } +func (c *KinesisClientLibConfiguration) WithLeaseRefreshPeriodMillis(leaseRefreshPeriodMillis int) *KinesisClientLibConfiguration { + checkIsValuePositive("LeaseRefreshPeriodMillis", leaseRefreshPeriodMillis) + c.LeaseRefreshPeriodMillis = leaseRefreshPeriodMillis + return c +} + func (c *KinesisClientLibConfiguration) WithShardSyncIntervalMillis(shardSyncIntervalMillis int) *KinesisClientLibConfiguration { checkIsValuePositive("ShardSyncIntervalMillis", shardSyncIntervalMillis) c.ShardSyncIntervalMillis = shardSyncIntervalMillis diff --git a/clientlibrary/worker/shard-consumer.go b/clientlibrary/worker/shard-consumer.go index 06a09eb..476ab01 100644 --- a/clientlibrary/worker/shard-consumer.go +++ b/clientlibrary/worker/shard-consumer.go @@ -169,7 +169,7 @@ func (sc *ShardConsumer) getRecords(shard *par.ShardStatus) error { retriedErrors := 0 for { - if time.Now().UTC().After(shard.LeaseTimeout.Add(-5 * time.Second)) { + if time.Now().UTC().After(shard.LeaseTimeout.Add(-time.Duration(sc.kclConfig.LeaseRefreshPeriodMillis) * time.Millisecond)) { log.Debugf("Refreshing lease on shard: %s for worker: %s", shard.ID, sc.consumerID) err = sc.checkpointer.GetLease(shard, sc.consumerID) if err != nil { @@ -266,7 +266,7 @@ func (sc *ShardConsumer) getRecords(shard *par.ShardStatus) error { shutdownInput := &kcl.ShutdownInput{ShutdownReason: kcl.REQUESTED, Checkpointer: recordCheckpointer} sc.recordProcessor.Shutdown(shutdownInput) return nil - case <-time.After(1 * time.Nanosecond): + default: } } }