From 10e8ebb3ffc2e378c810129645264d21835d2ccb Mon Sep 17 00:00:00 2001 From: Tao Jiang Date: Tue, 4 Sep 2018 20:32:45 -0700 Subject: [PATCH] KCL: Fix KCL stops processing when Kinesis Internal Error Current, KCL doesn't release shard when returning on error which causes the worker cannot get any shard because it has the maximum number of shard already. This change makes sure releasing shard when return. update the log message. Test: Integration test by forcing error on reading shard to simulate Kinesis Internal error and make sure the KCL will not stop processing. Jira CNA-1995 Change-Id: Iac91579634a5023ab5ed73c6af89e4ff1a9af564 --- clientlibrary/worker/shard-consumer.go | 16 ++++++++++++---- clientlibrary/worker/worker.go | 6 +++--- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/clientlibrary/worker/shard-consumer.go b/clientlibrary/worker/shard-consumer.go index a7f6fa0..30ad07e 100644 --- a/clientlibrary/worker/shard-consumer.go +++ b/clientlibrary/worker/shard-consumer.go @@ -120,8 +120,11 @@ func (sc *ShardConsumer) getShardIterator(shard *shardStatus) (*string, error) { return iterResp.ShardIterator, nil } +// getRecords continously poll one shard for data record +// Precondition: it currently has the lease on the shard. func (sc *ShardConsumer) getRecords(shard *shardStatus) error { defer sc.waitGroup.Done() + defer sc.releaseLease(shard) // If the shard is child shard, need to wait until the parent finished. if err := sc.waitOnParentShard(shard); err != nil { @@ -146,17 +149,15 @@ func (sc *ShardConsumer) getRecords(shard *shardStatus) error { sc.recordProcessor.Initialize(input) recordCheckpointer := NewRecordProcessorCheckpoint(shard, sc.checkpointer) - var retriedErrors int for { + retriedErrors := 0 getRecordsStartTime := time.Now() if time.Now().UTC().After(shard.LeaseTimeout.Add(-5 * time.Second)) { log.Debugf("Refreshing lease on shard: %s for worker: %s", shard.ID, sc.consumerID) err = sc.checkpointer.GetLease(shard, sc.consumerID) if err != nil { if err.Error() == ErrLeaseNotAquired { - shard.setLeaseOwner("") - sc.mService.LeaseLost(shard.ID) log.Warnf("Failed in acquiring lease on shard: %s for worker: %s", shard.ID, sc.consumerID) return nil } @@ -187,7 +188,6 @@ func (sc *ShardConsumer) getRecords(shard *shardStatus) error { log.Errorf("Error getting records from Kinesis that cannot be retried: %+v Request: %s", err, getRecordsArgs) return err } - retriedErrors = 0 // IRecordProcessorCheckpointer input := &kcl.ProcessRecordsInput{ @@ -273,3 +273,11 @@ func (sc *ShardConsumer) waitOnParentShard(shard *shardStatus) error { time.Sleep(time.Duration(sc.kclConfig.ParentShardPollIntervalMillis) * time.Millisecond) } } + +// Cleanup the internal lease cache +func (sc *ShardConsumer) releaseLease(shard *shardStatus) { + log.Infof("Release lease for shard %s", shard.ID) + shard.setLeaseOwner("") + // reporting lease lose metrics + sc.mService.LeaseLost(shard.ID) +} diff --git a/clientlibrary/worker/worker.go b/clientlibrary/worker/worker.go index f430b8d..1a7db33 100644 --- a/clientlibrary/worker/worker.go +++ b/clientlibrary/worker/worker.go @@ -232,10 +232,10 @@ func (w *Worker) eventLoop() { log.Infof("Found %d shards", len(w.shardStatus)) - // Count the number of leases hold by this worker + // Count the number of leases hold by this worker excluding the processed shard counter := 0 for _, shard := range w.shardStatus { - if shard.getLeaseOwner() == w.workerID { + if shard.getLeaseOwner() == w.workerID && shard.Checkpoint != SHARD_END { counter++ } } @@ -327,7 +327,7 @@ func (w *Worker) getShardIDs(startShardID string, shardInfo map[string]bool) err // found new shard if _, ok := w.shardStatus[*s.ShardId]; !ok { - log.Debugf("Found shard with id %s", *s.ShardId) + log.Infof("Found new shard with id %s", *s.ShardId) w.shardStatus[*s.ShardId] = &shardStatus{ ID: *s.ShardId, ParentShardId: aws.StringValue(s.ParentShardId),