From 74175246497f5a362b2559f03c0582bcb4d4a01a Mon Sep 17 00:00:00 2001 From: glarwood Date: Tue, 11 Dec 2018 19:46:40 -0800 Subject: [PATCH] refactor(Worker): add retry logic on runProcessLoop. Throws exception after MAX_RETRIES --- .../clientlibrary/lib/worker/Worker.java | 61 ++++++++++--------- 1 file changed, 33 insertions(+), 28 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index bf9f4e7d..f3bf1507 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -66,6 +66,7 @@ public class Worker implements Runnable { private static final Log LOG = LogFactory.getLog(Worker.class); private static final int MAX_INITIALIZATION_ATTEMPTS = 20; + private static final int MAX_RETRIES = 4; private WorkerLog wlog = new WorkerLog(); @@ -370,38 +371,42 @@ public class Worker implements Runnable { @VisibleForTesting void runProcessLoop() { - try { - boolean foundCompletedShard = false; - Set assignedShards = new HashSet<>(); - for (ShardInfo shardInfo : getShardInfoForAssignments()) { - ShardConsumer shardConsumer = createOrGetShardConsumer(shardInfo, recordProcessorFactory); - if (shardConsumer.isShutdown() && shardConsumer.getShutdownReason().equals(ShutdownReason.TERMINATE)) { - foundCompletedShard = true; - } else { - shardConsumer.consumeShard(); - } - assignedShards.add(shardInfo); - } - - if (foundCompletedShard) { - controlServer.syncShardAndLeaseInfo(null); - } - - // clean up shard consumers for unassigned shards - cleanupShardConsumers(assignedShards); - - wlog.info("Sleeping ..."); - Thread.sleep(idleTimeInMilliseconds); - } catch (Exception e) { - LOG.error(String.format("Worker.run caught exception, sleeping for %s milli seconds!", - String.valueOf(idleTimeInMilliseconds)), e); + for (int i = 0; ; i++) { try { + boolean foundCompletedShard = false; + Set assignedShards = new HashSet<>(); + for (ShardInfo shardInfo : getShardInfoForAssignments()) { + ShardConsumer shardConsumer = createOrGetShardConsumer(shardInfo, recordProcessorFactory); + if (shardConsumer.isShutdown() && shardConsumer.getShutdownReason().equals(ShutdownReason.TERMINATE)) { + foundCompletedShard = true; + } else { + shardConsumer.consumeShard(); + } + assignedShards.add(shardInfo); + } + + if (foundCompletedShard) { + controlServer.syncShardAndLeaseInfo(null); + } + + // clean up shard consumers for unassigned shards + cleanupShardConsumers(assignedShards); + + wlog.info("Sleeping ..."); Thread.sleep(idleTimeInMilliseconds); - } catch (InterruptedException ex) { - LOG.info("Worker: sleep interrupted after catching exception ", ex); + } catch (Exception e) { + if (i > MAX_RETRIES) throw new RuntimeException("Giving up after " + i + " retries", e); + + LOG.error(String.format("Worker.run caught exception, sleeping for %s milli seconds!", + String.valueOf(idleTimeInMilliseconds)), e); + try { + Thread.sleep(idleTimeInMilliseconds); + } catch (InterruptedException ex) { + LOG.info("Worker: sleep interrupted after catching exception ", ex); + } } + wlog.resetInfoLogging(); } - wlog.resetInfoLogging(); } private void initialize() {