Adding stream information to Fanout Consumer Registration logs
This commit is contained in:
parent
f69398a2b2
commit
b5d0301b31
1 changed files with 9 additions and 9 deletions
|
|
@ -76,7 +76,7 @@ public class FanOutConsumerRegistration implements ConsumerRegistration {
|
||||||
try {
|
try {
|
||||||
response = describeStreamConsumer();
|
response = describeStreamConsumer();
|
||||||
} catch (ResourceNotFoundException e) {
|
} catch (ResourceNotFoundException e) {
|
||||||
log.info("StreamConsumer not found, need to create it.");
|
log.info("{} : StreamConsumer not found, need to create it.", streamName);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 2. If not, register consumer
|
// 2. If not, register consumer
|
||||||
|
|
@ -92,7 +92,7 @@ public class FanOutConsumerRegistration implements ConsumerRegistration {
|
||||||
break;
|
break;
|
||||||
} catch (LimitExceededException e) {
|
} catch (LimitExceededException e) {
|
||||||
// TODO: Figure out internal service exceptions
|
// TODO: Figure out internal service exceptions
|
||||||
log.debug("RegisterStreamConsumer call got throttled will retry.");
|
log.debug("{} : RegisterStreamConsumer call got throttled will retry.", streamName);
|
||||||
finalException = e;
|
finalException = e;
|
||||||
}
|
}
|
||||||
retries--;
|
retries--;
|
||||||
|
|
@ -104,7 +104,7 @@ public class FanOutConsumerRegistration implements ConsumerRegistration {
|
||||||
}
|
}
|
||||||
} catch (ResourceInUseException e) {
|
} catch (ResourceInUseException e) {
|
||||||
// Consumer is present, call DescribeStreamConsumer
|
// Consumer is present, call DescribeStreamConsumer
|
||||||
log.debug("Got ResourceInUseException consumer exists, will call DescribeStreamConsumer again.");
|
log.debug("{} : Got ResourceInUseException consumer exists, will call DescribeStreamConsumer again.", streamName);
|
||||||
response = describeStreamConsumer();
|
response = describeStreamConsumer();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -160,17 +160,17 @@ public class FanOutConsumerRegistration implements ConsumerRegistration {
|
||||||
while (!ConsumerStatus.ACTIVE.equals(status) && retries > 0) {
|
while (!ConsumerStatus.ACTIVE.equals(status) && retries > 0) {
|
||||||
status = describeStreamConsumer().consumerDescription().consumerStatus();
|
status = describeStreamConsumer().consumerDescription().consumerStatus();
|
||||||
retries--;
|
retries--;
|
||||||
log.info(String.format("Waiting for StreamConsumer %s to have ACTIVE status...", streamConsumerName));
|
log.info("{} : Waiting for StreamConsumer {} to have ACTIVE status...", streamName, streamConsumerName);
|
||||||
Thread.sleep(retryBackoffMillis);
|
Thread.sleep(retryBackoffMillis);
|
||||||
}
|
}
|
||||||
} catch (InterruptedException ie) {
|
} catch (InterruptedException ie) {
|
||||||
log.debug("Thread was interrupted while fetching StreamConsumer status, moving on.");
|
log.debug("{} : Thread was interrupted while fetching StreamConsumer status, moving on.", streamName);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!ConsumerStatus.ACTIVE.equals(status)) {
|
if (!ConsumerStatus.ACTIVE.equals(status)) {
|
||||||
final String message = String.format(
|
final String message = String.format(
|
||||||
"Status of StreamConsumer %s, was not ACTIVE after all retries. Was instead %s.",
|
"%s : Status of StreamConsumer %s, was not ACTIVE after all retries. Was instead %s.",
|
||||||
streamConsumerName, status);
|
streamName, streamConsumerName, status);
|
||||||
log.error(message);
|
log.error(message);
|
||||||
throw new IllegalStateException(message);
|
throw new IllegalStateException(message);
|
||||||
}
|
}
|
||||||
|
|
@ -211,7 +211,7 @@ public class FanOutConsumerRegistration implements ConsumerRegistration {
|
||||||
throw new DependencyException(e);
|
throw new DependencyException(e);
|
||||||
}
|
}
|
||||||
} catch (LimitExceededException e) {
|
} catch (LimitExceededException e) {
|
||||||
log.info("Throttled while calling {} API, will backoff.", apiName);
|
log.info("{} : Throttled while calling {} API, will backoff.", streamName, apiName);
|
||||||
try {
|
try {
|
||||||
Thread.sleep(retryBackoffMillis + (long) (Math.random() * 100));
|
Thread.sleep(retryBackoffMillis + (long) (Math.random() * 100));
|
||||||
} catch (InterruptedException ie) {
|
} catch (InterruptedException ie) {
|
||||||
|
|
@ -224,7 +224,7 @@ public class FanOutConsumerRegistration implements ConsumerRegistration {
|
||||||
|
|
||||||
if (finalException == null) {
|
if (finalException == null) {
|
||||||
throw new IllegalStateException(
|
throw new IllegalStateException(
|
||||||
String.format("Finished all retries and no exception was caught while calling %s", apiName));
|
String.format("%s : Finished all retries and no exception was caught while calling %s", streamName, apiName));
|
||||||
}
|
}
|
||||||
|
|
||||||
throw finalException;
|
throw finalException;
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue