From f52f2559edd2ccd86f819be45ad650dbeb246540 Mon Sep 17 00:00:00 2001 From: Justin Pfifer Date: Wed, 7 Nov 2018 16:33:49 -0800 Subject: [PATCH] Remove a possible deadlock on polling queue fill (#462) * Remove a possible deadlock on polling queue fill Adding new items to the receive queue for the PrefetchRecordsPublisher when at capacity would deadlock retrievals as it was already holding a lock on this. The method addArrivedRecordsInput did not need to be synchronized on this as it didn't change any of the protected state (requestedResponses). There is a call to drainQueueForRequests immediately after the addArrivedRecordsInput that will ensure newly arrived data is dispatched. This fixes #448 * Small fix on the reasoning comment * Adjust the test to act more like the ShardConsumer The ShardConsuemr, which is the principal user of the PrefetchRecordsPublisher, uses RxJava to consume from publisher. This test uses RxJava to consume, and notifies the test thread once MAX_ITEMS * 3 have been received. This ensures that we cycle through the queue at least 3 times. * Removed the upper limit on the retrievals The way RxJava's request management makes it possible that more requests than we might expect can happen. --- .../polling/PrefetchRecordsPublisher.java | 2 +- .../polling/PrefetchRecordsPublisherTest.java | 98 ++++++++++++++++++- 2 files changed, 95 insertions(+), 5 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java index 431b134f..15a564df 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java @@ -176,7 +176,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { }); } - private synchronized void addArrivedRecordsInput(ProcessRecordsInput processRecordsInput) throws InterruptedException { + private void addArrivedRecordsInput(ProcessRecordsInput processRecordsInput) throws InterruptedException { getRecordsResultQueue.put(processRecordsInput); prefetchCounters.added(processRecordsInput); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java index dd4b96ac..7fb82ea6 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java @@ -15,38 +15,48 @@ package software.amazon.kinesis.retrieval.polling; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.atMost; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import java.nio.ByteBuffer; import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.IntStream; -import lombok.extern.slf4j.Slf4j; import org.junit.After; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.runners.MockitoJUnitRunner; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import io.reactivex.Flowable; +import io.reactivex.schedulers.Schedulers; +import lombok.extern.slf4j.Slf4j; import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException; import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; @@ -222,10 +232,11 @@ public class PrefetchRecordsPublisherTest { @Test public void testExpiredIteratorException() { log.info("Starting tests"); - getRecordsCache.start(sequenceNumber, initialPosition); - when(getRecordsRetrievalStrategy.getRecords(MAX_RECORDS_PER_CALL)).thenThrow(ExpiredIteratorException.class) .thenReturn(getRecordsResponse); + + getRecordsCache.start(sequenceNumber, initialPosition); + doNothing().when(dataFetcher).restartIterator(); getRecordsCache.getNextResult(); @@ -235,6 +246,85 @@ public class PrefetchRecordsPublisherTest { verify(dataFetcher).restartIterator(); } + @Test(timeout = 1000L) + public void testNoDeadlockOnFullQueue() { + // + // Fixes https://github.com/awslabs/amazon-kinesis-client/issues/448 + // + // This test is to verify that the drain of a blocked queue no longer deadlocks. + // If the test times out before starting the subscriber it means something went wrong while filling the queue. + // After the subscriber is started one of the things that can trigger a timeout is a deadlock. + // + GetRecordsResponse response = GetRecordsResponse.builder().records( + Record.builder().data(SdkBytes.fromByteArray(new byte[] { 1, 2, 3 })).sequenceNumber("123").build()) + .build(); + when(getRecordsRetrievalStrategy.getRecords(anyInt())).thenReturn(response); + + getRecordsCache.start(sequenceNumber, initialPosition); + + // + // Wait for the queue to fill up, and the publisher to block on adding items to the queue. + // + log.info("Waiting for queue to fill up"); + while (getRecordsCache.getRecordsResultQueue.size() < MAX_SIZE) { + Thread.yield(); + } + + log.info("Queue is currently at {} starting subscriber", getRecordsCache.getRecordsResultQueue.size()); + AtomicInteger receivedItems = new AtomicInteger(0); + final int expectedItems = MAX_SIZE * 3; + + Object lock = new Object(); + + Subscriber subscriber = new Subscriber() { + Subscription sub; + + @Override + public void onSubscribe(Subscription s) { + sub = s; + s.request(1); + } + + @Override + public void onNext(ProcessRecordsInput processRecordsInput) { + receivedItems.incrementAndGet(); + if (receivedItems.get() >= expectedItems) { + synchronized (lock) { + log.info("Notifying waiters"); + lock.notifyAll(); + } + sub.cancel(); + } else { + sub.request(1); + } + } + + @Override + public void onError(Throwable t) { + log.error("Caught error", t); + throw new RuntimeException(t); + } + + @Override + public void onComplete() { + fail("onComplete not expected in this test"); + } + }; + + synchronized (lock) { + log.info("Awaiting notification"); + Flowable.fromPublisher(getRecordsCache).subscribeOn(Schedulers.computation()) + .observeOn(Schedulers.computation(), true, 8).subscribe(subscriber); + try { + lock.wait(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + verify(getRecordsRetrievalStrategy, atLeast(expectedItems)).getRecords(anyInt()); + assertThat(receivedItems.get(), equalTo(expectedItems)); + } + @After public void shutdown() { getRecordsCache.shutdown();