From 2f3907d19f905a4a686145e099a25545423c968a Mon Sep 17 00:00:00 2001 From: lbourdages Date: Fri, 8 Mar 2019 12:08:19 -0500 Subject: [PATCH] Introducing sleep between DescribeStreamConsumer calls (#507) * * Wait between each describe stream consumer retry * ! Fix imports in test * * Apply review: catch InterruptedException outside of while loop --- .../retrieval/fanout/FanOutConsumerRegistration.java | 12 +++++++++--- .../fanout/FanOutConsumerRegistrationTest.java | 5 +++++ 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutConsumerRegistration.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutConsumerRegistration.java index 9baf5863..32f7dacf 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutConsumerRegistration.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutConsumerRegistration.java @@ -149,9 +149,15 @@ public class FanOutConsumerRegistration implements ConsumerRegistration { int retries = maxDescribeStreamConsumerRetries; - while (!ConsumerStatus.ACTIVE.equals(status) && retries > 0) { - status = describeStreamConsumer().consumerDescription().consumerStatus(); - retries--; + try { + while (!ConsumerStatus.ACTIVE.equals(status) && retries > 0) { + status = describeStreamConsumer().consumerDescription().consumerStatus(); + retries--; + log.info(String.format("Waiting for StreamConsumer %s to have ACTIVE status...", streamConsumerName)); + Thread.sleep(retryBackoffMillis); + } + } catch (InterruptedException ie) { + log.debug("Thread was interrupted while fetching StreamConsumer status, moving on."); } if (!ConsumerStatus.ACTIVE.equals(status)) { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutConsumerRegistrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutConsumerRegistrationTest.java index f6f23714..1e0b0b8a 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutConsumerRegistrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutConsumerRegistrationTest.java @@ -9,6 +9,7 @@ package software.amazon.kinesis.retrieval.fanout; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.junit.Assert.assertThat; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; @@ -20,6 +21,7 @@ import static org.mockito.Mockito.when; import java.util.concurrent.CompletableFuture; import org.apache.commons.lang3.StringUtils; +import org.hamcrest.Matchers; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -172,9 +174,12 @@ public class FanOutConsumerRegistrationTest { .thenReturn(intermidateResponse).thenReturn(successResponse); when(client.registerStreamConsumer(any(RegisterStreamConsumerRequest.class))).thenReturn(rscFuture); + final long startTime = System.currentTimeMillis(); final String consumerArn = consumerRegistration.getOrCreateStreamConsumerArn(); + final long endTime = System.currentTimeMillis(); assertThat(consumerArn, equalTo(CONSUMER_ARN)); + assertThat(endTime - startTime, greaterThanOrEqualTo(2 * BACKOFF_MILLIS)); verify(client).registerStreamConsumer(eq(createRegisterStreamConsumerRequest())); verify(client).describeStreamSummary(eq(createDescribeStreamSummaryRequest()));