From 29f68f77eb5f6a71e43e7bc66cf32e5dd44d55d5 Mon Sep 17 00:00:00 2001 From: Xavi Ramirez Date: Thu, 3 Aug 2017 18:35:19 +0000 Subject: [PATCH] Removed redundant retry logic --- batchconsumer/writer.go | 50 +++++------------------------------------ 1 file changed, 5 insertions(+), 45 deletions(-) diff --git a/batchconsumer/writer.go b/batchconsumer/writer.go index 4260025..da07f65 100644 --- a/batchconsumer/writer.go +++ b/batchconsumer/writer.go @@ -67,35 +67,6 @@ func (b *batchedWriter) Initialize(shardID string, checkpointer kcl.Checkpointer return nil } -// handleCheckpointError returns true if checkout should be tried again. Returns false otherwise. -func (b *batchedWriter) handleCheckpointError(err error) bool { - if err == nil { - return false - } - - cperr, ok := err.(kcl.CheckpointError) - if !ok { - b.log.ErrorD("unknown-checkpoint-error", kv.M{"msg": err.Error(), "shard-id": b.shardID}) - return true - } - - switch cperr.Error() { - case "ShutdownException": // Skips checkpointing - b.log.ErrorD("shutdown-checkpoint-exception", kv.M{ - "msg": err.Error(), "shard-id": b.shardID, - }) - return false - case "ThrottlingException": - b.log.ErrorD("checkpoint-throttle", kv.M{"shard-id": b.shardID}) - case "InvalidStateException": - b.log.ErrorD("invalid-checkpoint-state", kv.M{"shard-id": b.shardID}) - default: - b.log.ErrorD("checkpoint-error", kv.M{"shard-id": b.shardID, "msg": err}) - } - - return true -} - func (b *batchedWriter) startCheckpointListener( checkpointer kcl.Checkpointer, checkpointMsg <-chan kcl.SequencePair, shutdown <-chan struct{}, @@ -124,23 +95,12 @@ func (b *batchedWriter) startCheckpointListener( } } - retry := true - for n := 0; retry && !pair.IsEmpty() && n < b.config.CheckpointRetries+1; n++ { - err := checkpointer.Checkpoint(seq, 5) - if err == nil { // Successfully checkpointed! + if !pair.IsEmpty() { + err := checkpointer.Checkpoint(pair, b.config.CheckpointRetries) + if err != nil { + b.log.ErrorD("checkpoint-err", kv.M{"msg": err.Error(), "shard-id": b.shardID}) + } else { lastCheckpoint = time.Now() - break - } - - retry = b.handleCheckpointError(err) - - if n == b.config.CheckpointRetries { - b.log.ErrorD("checkpoint-retries", kv.M{"attempts": b.config.CheckpointRetries}) - retry = false - } - - if retry { - time.Sleep(b.config.CheckpointRetrySleep) } }