diff --git a/clientlibrary/worker/shard-consumer.go b/clientlibrary/worker/shard-consumer.go index 9a76309..d238943 100644 --- a/clientlibrary/worker/shard-consumer.go +++ b/clientlibrary/worker/shard-consumer.go @@ -130,7 +130,10 @@ func (sc *ShardConsumer) getRecords(shard *shardStatus) error { log.Warnf("Failed in acquiring lease on shard: %s for worker: %s", shard.ID, sc.consumerID) return nil } - log.Fatal(err) + // log and return error + log.Errorf("Error in refreshing lease on shard: %s for worker: %s. Error: %+v", + shard.ID, sc.consumerID, err) + return err } } @@ -144,14 +147,15 @@ func (sc *ShardConsumer) getRecords(shard *shardStatus) error { if err != nil { if awsErr, ok := err.(awserr.Error); ok { if awsErr.Code() == kinesis.ErrCodeProvisionedThroughputExceededException || awsErr.Code() == ErrCodeKMSThrottlingException { - log.Errorf("Error getting records from shard %v: %v", shard.ID, err) + log.Errorf("Error getting records from shard %v: %+v", shard.ID, err) retriedErrors++ // exponential backoff time.Sleep(time.Duration(2^retriedErrors*100) * time.Millisecond) continue } } - log.Fatalf("Error getting records from Kinesis that cannot be retried: %s\nRequest: %s", err, getRecordsArgs) + log.Errorf("Error getting records from Kinesis that cannot be retried: %+v\nRequest: %s", err, getRecordsArgs) + return err } retriedErrors = 0