Remove a possible deadlock on polling queue fill (#462)

* Remove a possible deadlock on polling queue fill

Adding new items to the receive queue for the PrefetchRecordsPublisher
when at capacity would deadlock retrievals as it was already holding
a lock on this.

The method addArrivedRecordsInput did not need to be synchronized on
this as it didn't change any of the protected
state (requestedResponses).  There is a call to drainQueueForRequests
immediately after the addArrivedRecordsInput that will ensure newly
arrived data is dispatched.

This fixes #448

* Small fix on the reasoning comment

* Adjust the test to act more like the ShardConsumer

The ShardConsuemr, which is the principal user of the
PrefetchRecordsPublisher, uses RxJava to consume from publisher. This
test uses RxJava to consume, and notifies the test thread once
MAX_ITEMS * 3 have been received. This ensures that we cycle through
the queue at least 3 times.

* Removed the upper limit on the retrievals

The way RxJava's request management makes it possible that more
requests than we might expect can happen.
This commit is contained in:
Justin Pfifer 2018-11-07 16:33:49 -08:00 committed by Sahil Palvia
parent b83a32b492
commit f52f2559ed
2 changed files with 95 additions and 5 deletions

View file

@ -176,7 +176,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
});
}
private synchronized void addArrivedRecordsInput(ProcessRecordsInput processRecordsInput) throws InterruptedException {
private void addArrivedRecordsInput(ProcessRecordsInput processRecordsInput) throws InterruptedException {
getRecordsResultQueue.put(processRecordsInput);
prefetchCounters.added(processRecordsInput);
}

View file

@ -15,38 +15,48 @@
package software.amazon.kinesis.retrieval.polling;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atMost;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import lombok.extern.slf4j.Slf4j;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
@ -222,10 +232,11 @@ public class PrefetchRecordsPublisherTest {
@Test
public void testExpiredIteratorException() {
log.info("Starting tests");
getRecordsCache.start(sequenceNumber, initialPosition);
when(getRecordsRetrievalStrategy.getRecords(MAX_RECORDS_PER_CALL)).thenThrow(ExpiredIteratorException.class)
.thenReturn(getRecordsResponse);
getRecordsCache.start(sequenceNumber, initialPosition);
doNothing().when(dataFetcher).restartIterator();
getRecordsCache.getNextResult();
@ -235,6 +246,85 @@ public class PrefetchRecordsPublisherTest {
verify(dataFetcher).restartIterator();
}
@Test(timeout = 1000L)
public void testNoDeadlockOnFullQueue() {
//
// Fixes https://github.com/awslabs/amazon-kinesis-client/issues/448
//
// This test is to verify that the drain of a blocked queue no longer deadlocks.
// If the test times out before starting the subscriber it means something went wrong while filling the queue.
// After the subscriber is started one of the things that can trigger a timeout is a deadlock.
//
GetRecordsResponse response = GetRecordsResponse.builder().records(
Record.builder().data(SdkBytes.fromByteArray(new byte[] { 1, 2, 3 })).sequenceNumber("123").build())
.build();
when(getRecordsRetrievalStrategy.getRecords(anyInt())).thenReturn(response);
getRecordsCache.start(sequenceNumber, initialPosition);
//
// Wait for the queue to fill up, and the publisher to block on adding items to the queue.
//
log.info("Waiting for queue to fill up");
while (getRecordsCache.getRecordsResultQueue.size() < MAX_SIZE) {
Thread.yield();
}
log.info("Queue is currently at {} starting subscriber", getRecordsCache.getRecordsResultQueue.size());
AtomicInteger receivedItems = new AtomicInteger(0);
final int expectedItems = MAX_SIZE * 3;
Object lock = new Object();
Subscriber<ProcessRecordsInput> subscriber = new Subscriber<ProcessRecordsInput>() {
Subscription sub;
@Override
public void onSubscribe(Subscription s) {
sub = s;
s.request(1);
}
@Override
public void onNext(ProcessRecordsInput processRecordsInput) {
receivedItems.incrementAndGet();
if (receivedItems.get() >= expectedItems) {
synchronized (lock) {
log.info("Notifying waiters");
lock.notifyAll();
}
sub.cancel();
} else {
sub.request(1);
}
}
@Override
public void onError(Throwable t) {
log.error("Caught error", t);
throw new RuntimeException(t);
}
@Override
public void onComplete() {
fail("onComplete not expected in this test");
}
};
synchronized (lock) {
log.info("Awaiting notification");
Flowable.fromPublisher(getRecordsCache).subscribeOn(Schedulers.computation())
.observeOn(Schedulers.computation(), true, 8).subscribe(subscriber);
try {
lock.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
verify(getRecordsRetrievalStrategy, atLeast(expectedItems)).getRecords(anyInt());
assertThat(receivedItems.get(), equalTo(expectedItems));
}
@After
public void shutdown() {
getRecordsCache.shutdown();