From b5d0301b315a7a0c6bb12c2ad38e1cedbcde4876 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Wed, 3 Jun 2020 01:08:13 -0700 Subject: [PATCH] Adding stream information to Fanout Consumer Registration logs --- .../fanout/FanOutConsumerRegistration.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutConsumerRegistration.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutConsumerRegistration.java index 0519390c..9bcdd83c 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutConsumerRegistration.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutConsumerRegistration.java @@ -76,7 +76,7 @@ public class FanOutConsumerRegistration implements ConsumerRegistration { try { response = describeStreamConsumer(); } catch (ResourceNotFoundException e) { - log.info("StreamConsumer not found, need to create it."); + log.info("{} : StreamConsumer not found, need to create it.", streamName); } // 2. If not, register consumer @@ -92,7 +92,7 @@ public class FanOutConsumerRegistration implements ConsumerRegistration { break; } catch (LimitExceededException e) { // TODO: Figure out internal service exceptions - log.debug("RegisterStreamConsumer call got throttled will retry."); + log.debug("{} : RegisterStreamConsumer call got throttled will retry.", streamName); finalException = e; } retries--; @@ -104,7 +104,7 @@ public class FanOutConsumerRegistration implements ConsumerRegistration { } } catch (ResourceInUseException e) { // Consumer is present, call DescribeStreamConsumer - log.debug("Got ResourceInUseException consumer exists, will call DescribeStreamConsumer again."); + log.debug("{} : Got ResourceInUseException consumer exists, will call DescribeStreamConsumer again.", streamName); response = describeStreamConsumer(); } } @@ -160,17 +160,17 @@ public class FanOutConsumerRegistration implements ConsumerRegistration { while (!ConsumerStatus.ACTIVE.equals(status) && retries > 0) { status = describeStreamConsumer().consumerDescription().consumerStatus(); retries--; - log.info(String.format("Waiting for StreamConsumer %s to have ACTIVE status...", streamConsumerName)); + log.info("{} : Waiting for StreamConsumer {} to have ACTIVE status...", streamName, streamConsumerName); Thread.sleep(retryBackoffMillis); } } catch (InterruptedException ie) { - log.debug("Thread was interrupted while fetching StreamConsumer status, moving on."); + log.debug("{} : Thread was interrupted while fetching StreamConsumer status, moving on.", streamName); } if (!ConsumerStatus.ACTIVE.equals(status)) { final String message = String.format( - "Status of StreamConsumer %s, was not ACTIVE after all retries. Was instead %s.", - streamConsumerName, status); + "%s : Status of StreamConsumer %s, was not ACTIVE after all retries. Was instead %s.", + streamName, streamConsumerName, status); log.error(message); throw new IllegalStateException(message); } @@ -211,7 +211,7 @@ public class FanOutConsumerRegistration implements ConsumerRegistration { throw new DependencyException(e); } } catch (LimitExceededException e) { - log.info("Throttled while calling {} API, will backoff.", apiName); + log.info("{} : Throttled while calling {} API, will backoff.", streamName, apiName); try { Thread.sleep(retryBackoffMillis + (long) (Math.random() * 100)); } catch (InterruptedException ie) { @@ -224,7 +224,7 @@ public class FanOutConsumerRegistration implements ConsumerRegistration { if (finalException == null) { throw new IllegalStateException( - String.format("Finished all retries and no exception was caught while calling %s", apiName)); + String.format("%s : Finished all retries and no exception was caught while calling %s", streamName, apiName)); } throw finalException;