KCL: Fix KCL stops processing when Kinesis Internal Error

Current, KCL doesn't release shard when returning on error
which causes the worker cannot get any shard because it has
the maximum number of shard already. This change makes sure
releasing shard when return.

update the log message.

Test:
Integration test by forcing error on reading shard to
simulate Kinesis Internal error and make sure the KCL
will not stop processing.

Jira CNA-1995

Change-Id: Iac91579634a5023ab5ed73c6af89e4ff1a9af564
This commit is contained in:
Tao Jiang 2018-09-04 20:32:45 -07:00
parent 3163d31f28
commit 10e8ebb3ff
2 changed files with 15 additions and 7 deletions

View file

@ -120,8 +120,11 @@ func (sc *ShardConsumer) getShardIterator(shard *shardStatus) (*string, error) {
return iterResp.ShardIterator, nil
}
// getRecords continously poll one shard for data record
// Precondition: it currently has the lease on the shard.
func (sc *ShardConsumer) getRecords(shard *shardStatus) error {
defer sc.waitGroup.Done()
defer sc.releaseLease(shard)
// If the shard is child shard, need to wait until the parent finished.
if err := sc.waitOnParentShard(shard); err != nil {
@ -146,17 +149,15 @@ func (sc *ShardConsumer) getRecords(shard *shardStatus) error {
sc.recordProcessor.Initialize(input)
recordCheckpointer := NewRecordProcessorCheckpoint(shard, sc.checkpointer)
var retriedErrors int
for {
retriedErrors := 0
getRecordsStartTime := time.Now()
if time.Now().UTC().After(shard.LeaseTimeout.Add(-5 * time.Second)) {
log.Debugf("Refreshing lease on shard: %s for worker: %s", shard.ID, sc.consumerID)
err = sc.checkpointer.GetLease(shard, sc.consumerID)
if err != nil {
if err.Error() == ErrLeaseNotAquired {
shard.setLeaseOwner("")
sc.mService.LeaseLost(shard.ID)
log.Warnf("Failed in acquiring lease on shard: %s for worker: %s", shard.ID, sc.consumerID)
return nil
}
@ -187,7 +188,6 @@ func (sc *ShardConsumer) getRecords(shard *shardStatus) error {
log.Errorf("Error getting records from Kinesis that cannot be retried: %+v Request: %s", err, getRecordsArgs)
return err
}
retriedErrors = 0
// IRecordProcessorCheckpointer
input := &kcl.ProcessRecordsInput{
@ -273,3 +273,11 @@ func (sc *ShardConsumer) waitOnParentShard(shard *shardStatus) error {
time.Sleep(time.Duration(sc.kclConfig.ParentShardPollIntervalMillis) * time.Millisecond)
}
}
// Cleanup the internal lease cache
func (sc *ShardConsumer) releaseLease(shard *shardStatus) {
log.Infof("Release lease for shard %s", shard.ID)
shard.setLeaseOwner("")
// reporting lease lose metrics
sc.mService.LeaseLost(shard.ID)
}

View file

@ -232,10 +232,10 @@ func (w *Worker) eventLoop() {
log.Infof("Found %d shards", len(w.shardStatus))
// Count the number of leases hold by this worker
// Count the number of leases hold by this worker excluding the processed shard
counter := 0
for _, shard := range w.shardStatus {
if shard.getLeaseOwner() == w.workerID {
if shard.getLeaseOwner() == w.workerID && shard.Checkpoint != SHARD_END {
counter++
}
}
@ -327,7 +327,7 @@ func (w *Worker) getShardIDs(startShardID string, shardInfo map[string]bool) err
// found new shard
if _, ok := w.shardStatus[*s.ShardId]; !ok {
log.Debugf("Found shard with id %s", *s.ShardId)
log.Infof("Found new shard with id %s", *s.ShardId)
w.shardStatus[*s.ShardId] = &shardStatus{
ID: *s.ShardId,
ParentShardId: aws.StringValue(s.ParentShardId),