Introducing sleep between DescribeStreamConsumer calls (#507)

* * Wait between each describe stream consumer retry

* ! Fix imports in test

* * Apply review: catch InterruptedException outside of while loop
This commit is contained in:
lbourdages 2019-03-08 12:08:19 -05:00 committed by Sahil Palvia
parent c3b3846357
commit 2f3907d19f
2 changed files with 14 additions and 3 deletions

View file

@ -149,9 +149,15 @@ public class FanOutConsumerRegistration implements ConsumerRegistration {
int retries = maxDescribeStreamConsumerRetries; int retries = maxDescribeStreamConsumerRetries;
while (!ConsumerStatus.ACTIVE.equals(status) && retries > 0) { try {
status = describeStreamConsumer().consumerDescription().consumerStatus(); while (!ConsumerStatus.ACTIVE.equals(status) && retries > 0) {
retries--; status = describeStreamConsumer().consumerDescription().consumerStatus();
retries--;
log.info(String.format("Waiting for StreamConsumer %s to have ACTIVE status...", streamConsumerName));
Thread.sleep(retryBackoffMillis);
}
} catch (InterruptedException ie) {
log.debug("Thread was interrupted while fetching StreamConsumer status, moving on.");
} }
if (!ConsumerStatus.ACTIVE.equals(status)) { if (!ConsumerStatus.ACTIVE.equals(status)) {

View file

@ -9,6 +9,7 @@
package software.amazon.kinesis.retrieval.fanout; package software.amazon.kinesis.retrieval.fanout;
import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
@ -20,6 +21,7 @@ import static org.mockito.Mockito.when;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.hamcrest.Matchers;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
@ -172,9 +174,12 @@ public class FanOutConsumerRegistrationTest {
.thenReturn(intermidateResponse).thenReturn(successResponse); .thenReturn(intermidateResponse).thenReturn(successResponse);
when(client.registerStreamConsumer(any(RegisterStreamConsumerRequest.class))).thenReturn(rscFuture); when(client.registerStreamConsumer(any(RegisterStreamConsumerRequest.class))).thenReturn(rscFuture);
final long startTime = System.currentTimeMillis();
final String consumerArn = consumerRegistration.getOrCreateStreamConsumerArn(); final String consumerArn = consumerRegistration.getOrCreateStreamConsumerArn();
final long endTime = System.currentTimeMillis();
assertThat(consumerArn, equalTo(CONSUMER_ARN)); assertThat(consumerArn, equalTo(CONSUMER_ARN));
assertThat(endTime - startTime, greaterThanOrEqualTo(2 * BACKOFF_MILLIS));
verify(client).registerStreamConsumer(eq(createRegisterStreamConsumerRequest())); verify(client).registerStreamConsumer(eq(createRegisterStreamConsumerRequest()));
verify(client).describeStreamSummary(eq(createDescribeStreamSummaryRequest())); verify(client).describeStreamSummary(eq(createDescribeStreamSummaryRequest()));