diff --git a/clientlibrary/worker/shard-consumer.go b/clientlibrary/worker/shard-consumer.go index a7f6fa0..30ad07e 100644 --- a/clientlibrary/worker/shard-consumer.go +++ b/clientlibrary/worker/shard-consumer.go @@ -120,8 +120,11 @@ func (sc *ShardConsumer) getShardIterator(shard *shardStatus) (*string, error) { return iterResp.ShardIterator, nil } +// getRecords continously poll one shard for data record +// Precondition: it currently has the lease on the shard. func (sc *ShardConsumer) getRecords(shard *shardStatus) error { defer sc.waitGroup.Done() + defer sc.releaseLease(shard) // If the shard is child shard, need to wait until the parent finished. if err := sc.waitOnParentShard(shard); err != nil { @@ -146,17 +149,15 @@ func (sc *ShardConsumer) getRecords(shard *shardStatus) error { sc.recordProcessor.Initialize(input) recordCheckpointer := NewRecordProcessorCheckpoint(shard, sc.checkpointer) - var retriedErrors int for { + retriedErrors := 0 getRecordsStartTime := time.Now() if time.Now().UTC().After(shard.LeaseTimeout.Add(-5 * time.Second)) { log.Debugf("Refreshing lease on shard: %s for worker: %s", shard.ID, sc.consumerID) err = sc.checkpointer.GetLease(shard, sc.consumerID) if err != nil { if err.Error() == ErrLeaseNotAquired { - shard.setLeaseOwner("") - sc.mService.LeaseLost(shard.ID) log.Warnf("Failed in acquiring lease on shard: %s for worker: %s", shard.ID, sc.consumerID) return nil } @@ -187,7 +188,6 @@ func (sc *ShardConsumer) getRecords(shard *shardStatus) error { log.Errorf("Error getting records from Kinesis that cannot be retried: %+v Request: %s", err, getRecordsArgs) return err } - retriedErrors = 0 // IRecordProcessorCheckpointer input := &kcl.ProcessRecordsInput{ @@ -273,3 +273,11 @@ func (sc *ShardConsumer) waitOnParentShard(shard *shardStatus) error { time.Sleep(time.Duration(sc.kclConfig.ParentShardPollIntervalMillis) * time.Millisecond) } } + +// Cleanup the internal lease cache +func (sc *ShardConsumer) releaseLease(shard *shardStatus) { + log.Infof("Release lease for shard %s", shard.ID) + shard.setLeaseOwner("") + // reporting lease lose metrics + sc.mService.LeaseLost(shard.ID) +} diff --git a/clientlibrary/worker/worker.go b/clientlibrary/worker/worker.go index f430b8d..1a7db33 100644 --- a/clientlibrary/worker/worker.go +++ b/clientlibrary/worker/worker.go @@ -232,10 +232,10 @@ func (w *Worker) eventLoop() { log.Infof("Found %d shards", len(w.shardStatus)) - // Count the number of leases hold by this worker + // Count the number of leases hold by this worker excluding the processed shard counter := 0 for _, shard := range w.shardStatus { - if shard.getLeaseOwner() == w.workerID { + if shard.getLeaseOwner() == w.workerID && shard.Checkpoint != SHARD_END { counter++ } } @@ -327,7 +327,7 @@ func (w *Worker) getShardIDs(startShardID string, shardInfo map[string]bool) err // found new shard if _, ok := w.shardStatus[*s.ShardId]; !ok { - log.Debugf("Found shard with id %s", *s.ShardId) + log.Infof("Found new shard with id %s", *s.ShardId) w.shardStatus[*s.ShardId] = &shardStatus{ ID: *s.ShardId, ParentShardId: aws.StringValue(s.ParentShardId),