From f6dec3e5798a916fd7862205c58edfea87d8cd43 Mon Sep 17 00:00:00 2001 From: ashwing Date: Tue, 3 Sep 2019 09:20:34 -0700 Subject: [PATCH] Fix to prevent data loss and stuck shards in the event of failed records delivery in Polling readers (#603) * Fix to prevent data loss and stuck shards in the event of failed records delivery. * Review comment fixes * Access specifiers fix --- .../polling/PrefetchRecordsPublisher.java | 164 ++++++++++++++---- ...efetchRecordsPublisherIntegrationTest.java | 22 ++- .../polling/PrefetchRecordsPublisherTest.java | 144 +++++++++++++-- .../amazon/kinesis/utils/BlockingUtils.java | 38 ++++ 4 files changed, 309 insertions(+), 59 deletions(-) create mode 100644 amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/BlockingUtils.java diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java index f5aaf051..4c513d6a 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java @@ -18,14 +18,16 @@ package software.amazon.kinesis.retrieval.polling; import java.time.Duration; import java.time.Instant; import java.util.List; +import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.Validate; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; @@ -48,6 +50,7 @@ import software.amazon.kinesis.metrics.MetricsLevel; import software.amazon.kinesis.metrics.MetricsScope; import software.amazon.kinesis.metrics.MetricsUtil; import software.amazon.kinesis.metrics.ThreadSafeMetricsDelegatingFactory; +import software.amazon.kinesis.retrieval.BatchUniqueIdentifier; import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; import software.amazon.kinesis.retrieval.KinesisClientRecord; import software.amazon.kinesis.retrieval.RecordsDeliveryAck; @@ -65,6 +68,12 @@ import static software.amazon.kinesis.common.DiagnosticUtils.takeDelayedDelivery * i.e. the byte size of the records stored in the cache and maxRecordsCount i.e. the max number of records that should * be present in the cache across multiple GetRecordsResult object. If no data is available in the cache, the call from * the record processor is blocked till records are retrieved from Kinesis. + * + * There are three threads namely publisher, demand-notifier and ack-notifier which will contend to drain the events + * to the Subscriber (ShardConsumer in KCL). The publisher/demand-notifier thread gains the control to drain only when + * there is no pending event in the prefetch queue waiting for the ack. Otherwise, it will be the ack-notifier thread + * which will drain an event on the receipt of an ack. + * */ @Slf4j @KinesisClientInternalApi @@ -97,8 +106,11 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { private final ReentrantReadWriteLock resetLock = new ReentrantReadWriteLock(); private boolean wasReset = false; - private final Semaphore eventDeliveryLock = new Semaphore(1); - private Instant eventDeliveryLockAcquireTime = Instant.EPOCH; + private Instant lastEventDeliveryTime = Instant.EPOCH; + // This flag controls who should drain the next request in the prefetch queue. + // When set to false, the publisher and demand-notifier thread would have the control. + // When set to true, the event-notifier thread would have the control. + private AtomicBoolean shouldDrainEventOnlyOnAck = new AtomicBoolean(false); /** * Constructor for the PrefetchRecordsPublisher. This cache prefetches records from Kinesis and stores them in a @@ -151,13 +163,13 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { dataFetcher.initialize(extendedSequenceNumber, initialPositionInStreamExtended); if (!started) { - log.info("Starting prefetching thread."); + log.info("{} : Starting prefetching thread.", shardId); executorService.execute(defaultGetRecordsCacheDaemon); } started = true; } - RecordsRetrieved getNextResult() { + private void throwOnIllegalState() { if (executorService.isShutdown()) { throw new IllegalStateException("Shutdown has been called on the cache, can't accept new requests."); } @@ -165,16 +177,26 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { if (!started) { throw new IllegalStateException("Cache has not been initialized, make sure to call start."); } - PrefetchRecordsRetrieved result = null; - try { - result = getRecordsResultQueue.take().prepareForPublish(); + } + + private RecordsRetrieved peekNextResult() { + throwOnIllegalState(); + final PrefetchRecordsRetrieved result = getRecordsResultQueue.peek(); + return result == null ? result : result.prepareForPublish(); + } + + @VisibleForTesting + RecordsRetrieved pollNextResultAndUpdatePrefetchCounters() { + throwOnIllegalState(); + final PrefetchRecordsRetrieved result = getRecordsResultQueue.poll(); + if (result != null) { prefetchCounters.removed(result.processRecordsInput); requestedResponses.decrementAndGet(); - - } catch (InterruptedException e) { - log.error("Interrupted while getting records from the cache", e); + } else { + log.info( + "{}: No record batch found while evicting from the prefetch queue. This indicates the prefetch buffer" + + "was reset.", shardId); } - return result; } @@ -195,6 +217,12 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { resetLock.writeLock().lock(); try { getRecordsResultQueue.clear(); + + // Give the drain control to publisher/demand-notifier thread. + log.debug("{} : Publisher thread takes over the draining control. Queue Size : {}, Demand : {}", shardId, + getRecordsResultQueue.size(), requestedResponses.get()); + shouldDrainEventOnlyOnAck.set(false); + prefetchCounters.reset(); highestSequenceNumber = prefetchRecordsRetrieved.lastBatchSequenceNumber(); @@ -213,7 +241,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { @Override public void request(long n) { requestedResponses.addAndGet(n); - drainQueueForRequests(); + drainQueueForRequestsIfAllowed(); } @Override @@ -224,12 +252,35 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { } @Override - public void notify(RecordsDeliveryAck ack) { - eventDeliveryLock.release(); + public synchronized void notify(RecordsDeliveryAck recordsDeliveryAck) { + final RecordsRetrieved recordsToCheck = peekNextResult(); + // Verify if the ack matches the head of the queue and evict it. + if (recordsToCheck != null && recordsToCheck.batchUniqueIdentifier() + .equals(recordsDeliveryAck.batchUniqueIdentifier())) { + pollNextResultAndUpdatePrefetchCounters(); + // Upon evicting, check if queue is empty. if yes, then give the drain control back to publisher thread. + if (getRecordsResultQueue.isEmpty()) { + log.debug("{} : Publisher thread takes over the draining control. Queue Size : {}, Demand : {}", + shardId, getRecordsResultQueue.size(), requestedResponses.get()); + shouldDrainEventOnlyOnAck.set(false); + } else { + // Else attempt to drain the queue. + drainQueueForRequests(); + } + } else { + // Log and ignore any other ack received. As long as an ack is received for head of the queue + // we are good. Any stale or future ack received can be ignored, though the latter is not feasible + // to happen. + final BatchUniqueIdentifier peekedBatchUniqueIdentifier = + recordsToCheck == null ? null : recordsToCheck.batchUniqueIdentifier(); + log.info("{} : Received a stale notification with id {} instead of expected id {} at {}. Will ignore.", + shardId, recordsDeliveryAck.batchUniqueIdentifier(), peekedBatchUniqueIdentifier, Instant.now()); + } // Take action based on the time spent by the event in queue. - takeDelayedDeliveryActionIfRequired(shardId, eventDeliveryLockAcquireTime, log); + takeDelayedDeliveryActionIfRequired(shardId, lastEventDeliveryTime, log); } + // Note : Do not make this method synchronous as notify() will not be able to evict any entry from the queue. private void addArrivedRecordsInput(PrefetchRecordsRetrieved recordsRetrieved) throws InterruptedException { wasReset = false; while (!getRecordsResultQueue.offer(recordsRetrieved, idleMillisBetweenCalls, TimeUnit.MILLISECONDS)) { @@ -248,11 +299,39 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { prefetchCounters.added(recordsRetrieved.processRecordsInput); } + /** + * Method that will be called by the 'publisher thread' and the 'demand notifying thread', + * to drain the events if the 'event notifying thread' do not have the control. + */ + private synchronized void drainQueueForRequestsIfAllowed() { + if (!shouldDrainEventOnlyOnAck.get()) { + drainQueueForRequests(); + } + } + + /** + * Method to drain the queue based on the demand and the events availability in the queue. + */ private synchronized void drainQueueForRequests() { - while (requestedResponses.get() > 0 && !getRecordsResultQueue.isEmpty()) { - eventDeliveryLock.acquireUninterruptibly(); - eventDeliveryLockAcquireTime = Instant.now(); - subscriber.onNext(getNextResult()); + final RecordsRetrieved recordsToDeliver = peekNextResult(); + // If there is an event available to drain and if there is at least one demand, + // then schedule it for delivery + if (requestedResponses.get() > 0 && recordsToDeliver != null) { + lastEventDeliveryTime = Instant.now(); + subscriber.onNext(recordsToDeliver); + if (!shouldDrainEventOnlyOnAck.get()) { + log.debug("{} : Notifier thread takes over the draining control. Queue Size : {}, Demand : {}", shardId, + getRecordsResultQueue.size(), requestedResponses.get()); + shouldDrainEventOnlyOnAck.set(true); + } + } else { + // Since we haven't scheduled the event delivery, give the drain control back to publisher/demand-notifier + // thread. + if (shouldDrainEventOnlyOnAck.get()) { + log.debug("{} : Publisher thread takes over the draining control. Queue Size : {}, Demand : {}", + shardId, getRecordsResultQueue.size(), requestedResponses.get()); + shouldDrainEventOnlyOnAck.set(false); + } } } @@ -263,12 +342,26 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { final ProcessRecordsInput processRecordsInput; final String lastBatchSequenceNumber; final String shardIterator; + final BatchUniqueIdentifier batchUniqueIdentifier; PrefetchRecordsRetrieved prepareForPublish() { return new PrefetchRecordsRetrieved(processRecordsInput.toBuilder().cacheExitTime(Instant.now()).build(), - lastBatchSequenceNumber, shardIterator); + lastBatchSequenceNumber, shardIterator, batchUniqueIdentifier); } + @Override + public BatchUniqueIdentifier batchUniqueIdentifier() { + return batchUniqueIdentifier; + } + + /** + * Generate batch unique identifier for PrefetchRecordsRetrieved, where flow will be empty. + * @return BatchUniqueIdentifier + */ + public static BatchUniqueIdentifier generateBatchUniqueIdentifier() { + return new BatchUniqueIdentifier(UUID.randomUUID().toString(), + StringUtils.EMPTY); + } } private String calculateHighestSequenceNumber(ProcessRecordsInput processRecordsInput) { @@ -291,7 +384,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { public void run() { while (!isShutdown) { if (Thread.currentThread().isInterrupted()) { - log.warn("Prefetch thread was interrupted."); + log.warn("{} : Prefetch thread was interrupted.", shardId); break; } @@ -299,7 +392,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { try { makeRetrievalAttempt(); } catch(PositionResetException pre) { - log.debug("Position was reset while attempting to add item to queue."); + log.debug("{} : Position was reset while attempting to add item to queue.", shardId); } finally { resetLock.readLock().unlock(); } @@ -328,30 +421,31 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { highestSequenceNumber = calculateHighestSequenceNumber(processRecordsInput); PrefetchRecordsRetrieved recordsRetrieved = new PrefetchRecordsRetrieved(processRecordsInput, - highestSequenceNumber, getRecordsResult.nextShardIterator()); + highestSequenceNumber, getRecordsResult.nextShardIterator(), + PrefetchRecordsRetrieved.generateBatchUniqueIdentifier()); highestSequenceNumber = recordsRetrieved.lastBatchSequenceNumber; addArrivedRecordsInput(recordsRetrieved); - drainQueueForRequests(); + drainQueueForRequestsIfAllowed(); } catch (PositionResetException pse) { throw pse; } catch (RetryableRetrievalException rre) { - log.info("Timeout occurred while waiting for response from Kinesis. Will retry the request."); + log.info("{} : Timeout occurred while waiting for response from Kinesis. Will retry the request.", shardId); } catch (InterruptedException e) { - log.info("Thread was interrupted, indicating shutdown was called on the cache."); + log.info("{} : Thread was interrupted, indicating shutdown was called on the cache.", shardId); } catch (ExpiredIteratorException e) { - log.info("ShardId {}: records threw ExpiredIteratorException - restarting" + log.info("{} : records threw ExpiredIteratorException - restarting" + " after greatest seqNum passed to customer", shardId, e); scope.addData(EXPIRED_ITERATOR_METRIC, 1, StandardUnit.COUNT, MetricsLevel.SUMMARY); dataFetcher.restartIterator(); } catch (SdkException e) { - log.error("Exception thrown while fetching records from Kinesis", e); + log.error("{} : Exception thrown while fetching records from Kinesis", shardId, e); } catch (Throwable e) { - log.error("Unexpected exception was thrown. This could probably be an issue or a bug." + + log.error("{} : Unexpected exception was thrown. This could probably be an issue or a bug." + " Please search for the exception/error online to check what is going on. If the " + "issue persists or is a recurring problem, feel free to open an issue on, " + - "https://github.com/awslabs/amazon-kinesis-client.", e); + "https://github.com/awslabs/amazon-kinesis-client.", shardId, e); } finally { MetricsUtil.endScope(scope); } @@ -362,8 +456,8 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { try { prefetchCounters.waitForConsumer(); } catch (InterruptedException ie) { - log.info("Thread was interrupted while waiting for the consumer. " + - "Shutdown has probably been started"); + log.info("{} : Thread was interrupted while waiting for the consumer. " + + "Shutdown has probably been started", shardId); } } } @@ -410,14 +504,14 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { public synchronized void waitForConsumer() throws InterruptedException { if (!shouldGetNewRecords()) { - log.debug("Queue is full waiting for consumer for {} ms", idleMillisBetweenCalls); + log.debug("{} : Queue is full waiting for consumer for {} ms", shardId, idleMillisBetweenCalls); this.wait(idleMillisBetweenCalls); } } public synchronized boolean shouldGetNewRecords() { if (log.isDebugEnabled()) { - log.debug("Current Prefetch Counter States: {}", this.toString()); + log.debug("{} : Current Prefetch Counter States: {}", shardId, this.toString()); } return size < maxRecordsCount && byteSize < maxByteSize; } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherIntegrationTest.java index bf8dc8da..55e15b41 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherIntegrationTest.java @@ -27,12 +27,11 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static software.amazon.kinesis.utils.BlockingUtils.blockUntilRecordsAvailable; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -123,13 +122,15 @@ public class PrefetchRecordsPublisherIntegrationTest { getRecordsCache.start(extendedSequenceNumber, initialPosition); sleep(IDLE_MILLIS_BETWEEN_CALLS); - ProcessRecordsInput processRecordsInput1 = getRecordsCache.getNextResult().processRecordsInput(); + ProcessRecordsInput processRecordsInput1 = blockUntilRecordsAvailable(getRecordsCache::pollNextResultAndUpdatePrefetchCounters, 1000L) + .processRecordsInput(); assertTrue(processRecordsInput1.records().isEmpty()); assertEquals(processRecordsInput1.millisBehindLatest(), new Long(1000)); assertNotNull(processRecordsInput1.cacheEntryTime()); - ProcessRecordsInput processRecordsInput2 = getRecordsCache.getNextResult().processRecordsInput(); + ProcessRecordsInput processRecordsInput2 = blockUntilRecordsAvailable(getRecordsCache::pollNextResultAndUpdatePrefetchCounters, 1000L) + .processRecordsInput(); assertNotEquals(processRecordsInput1, processRecordsInput2); } @@ -141,8 +142,10 @@ public class PrefetchRecordsPublisherIntegrationTest { assertEquals(getRecordsCache.getRecordsResultQueue.size(), MAX_SIZE); - ProcessRecordsInput processRecordsInput1 = getRecordsCache.getNextResult().processRecordsInput(); - ProcessRecordsInput processRecordsInput2 = getRecordsCache.getNextResult().processRecordsInput(); + ProcessRecordsInput processRecordsInput1 = blockUntilRecordsAvailable(getRecordsCache::pollNextResultAndUpdatePrefetchCounters, 1000L) + .processRecordsInput(); + ProcessRecordsInput processRecordsInput2 = blockUntilRecordsAvailable(getRecordsCache::pollNextResultAndUpdatePrefetchCounters, 1000L) + .processRecordsInput(); assertNotEquals(processRecordsInput1, processRecordsInput2); } @@ -181,9 +184,9 @@ public class PrefetchRecordsPublisherIntegrationTest { sleep(IDLE_MILLIS_BETWEEN_CALLS); - ProcessRecordsInput p1 = getRecordsCache.getNextResult().processRecordsInput(); + ProcessRecordsInput p1 = getRecordsCache.pollNextResultAndUpdatePrefetchCounters().processRecordsInput(); - ProcessRecordsInput p2 = recordsPublisher2.getNextResult().processRecordsInput(); + ProcessRecordsInput p2 = recordsPublisher2.pollNextResultAndUpdatePrefetchCounters().processRecordsInput(); assertNotEquals(p1, p2); assertTrue(p1.records().isEmpty()); @@ -209,7 +212,8 @@ public class PrefetchRecordsPublisherIntegrationTest { getRecordsCache.start(extendedSequenceNumber, initialPosition); sleep(IDLE_MILLIS_BETWEEN_CALLS); - ProcessRecordsInput processRecordsInput = getRecordsCache.getNextResult().processRecordsInput(); + ProcessRecordsInput processRecordsInput = blockUntilRecordsAvailable(getRecordsCache::pollNextResultAndUpdatePrefetchCounters, 1000L) + .processRecordsInput(); assertNotNull(processRecordsInput); assertTrue(processRecordsInput.records().isEmpty()); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java index e569921a..8c1aa743 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java @@ -33,6 +33,7 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static software.amazon.kinesis.utils.BlockingUtils.blockUntilRecordsAvailable; import static software.amazon.kinesis.utils.ProcessRecordsInputMatcher.eqProcessRecordsInput; import java.time.Duration; @@ -44,12 +45,15 @@ import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; +import io.reactivex.plugins.RxJavaPlugins; import org.apache.commons.lang3.StringUtils; import org.junit.After; import org.junit.Before; @@ -76,6 +80,7 @@ import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.metrics.NullMetricsFactory; import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; import software.amazon.kinesis.retrieval.KinesisClientRecord; +import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RecordsRetrieved; import software.amazon.kinesis.retrieval.RetryableRetrievalException; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; @@ -143,7 +148,8 @@ public class PrefetchRecordsPublisherTest { .map(KinesisClientRecord::fromRecord).collect(Collectors.toList()); getRecordsCache.start(sequenceNumber, initialPosition); - ProcessRecordsInput result = getRecordsCache.getNextResult().processRecordsInput(); + ProcessRecordsInput result = blockUntilRecordsAvailable(getRecordsCache::pollNextResultAndUpdatePrefetchCounters, 1000L) + .processRecordsInput(); assertEquals(expectedRecords, result.records()); @@ -183,7 +189,7 @@ public class PrefetchRecordsPublisherTest { // TODO: fix this verification // verify(getRecordsRetrievalStrategy, times(callRate)).getRecords(MAX_RECORDS_PER_CALL); // assertEquals(spyQueue.size(), callRate); - assertTrue(callRate < MAX_SIZE); + assertTrue("Call Rate is "+callRate,callRate < MAX_SIZE); } @Test @@ -212,7 +218,7 @@ public class PrefetchRecordsPublisherTest { .map(KinesisClientRecord::fromRecord).collect(Collectors.toList()); getRecordsCache.start(sequenceNumber, initialPosition); - ProcessRecordsInput processRecordsInput = getRecordsCache.getNextResult().processRecordsInput(); + ProcessRecordsInput processRecordsInput = getRecordsCache.pollNextResultAndUpdatePrefetchCounters().processRecordsInput(); verify(executorService).execute(any()); assertEquals(expectedRecords, processRecordsInput.records()); @@ -221,7 +227,7 @@ public class PrefetchRecordsPublisherTest { sleep(2000); - ProcessRecordsInput processRecordsInput2 = getRecordsCache.getNextResult().processRecordsInput(); + ProcessRecordsInput processRecordsInput2 = getRecordsCache.pollNextResultAndUpdatePrefetchCounters().processRecordsInput(); assertNotEquals(processRecordsInput, processRecordsInput2); assertEquals(expectedRecords, processRecordsInput2.records()); assertNotEquals(processRecordsInput2.timeSpentInCache(), Duration.ZERO); @@ -232,13 +238,13 @@ public class PrefetchRecordsPublisherTest { @Test(expected = IllegalStateException.class) public void testGetNextRecordsWithoutStarting() { verify(executorService, times(0)).execute(any()); - getRecordsCache.getNextResult(); + getRecordsCache.pollNextResultAndUpdatePrefetchCounters(); } @Test(expected = IllegalStateException.class) public void testCallAfterShutdown() { when(executorService.isShutdown()).thenReturn(true); - getRecordsCache.getNextResult(); + getRecordsCache.pollNextResultAndUpdatePrefetchCounters(); } @Test @@ -251,7 +257,7 @@ public class PrefetchRecordsPublisherTest { doNothing().when(dataFetcher).restartIterator(); - getRecordsCache.getNextResult(); + blockUntilRecordsAvailable(() -> getRecordsCache.pollNextResultAndUpdatePrefetchCounters(), 1000L); sleep(1000); @@ -266,11 +272,11 @@ public class PrefetchRecordsPublisherTest { getRecordsCache.start(sequenceNumber, initialPosition); - RecordsRetrieved records = getRecordsCache.getNextResult(); + RecordsRetrieved records = blockUntilRecordsAvailable(getRecordsCache::pollNextResultAndUpdatePrefetchCounters, 1000); assertThat(records.processRecordsInput().millisBehindLatest(), equalTo(response.millisBehindLatest())); } - @Test(timeout = 1000L) + @Test(timeout = 20000L) public void testNoDeadlockOnFullQueue() { // // Fixes https://github.com/awslabs/amazon-kinesis-client/issues/448 @@ -284,6 +290,8 @@ public class PrefetchRecordsPublisherTest { .build(); when(getRecordsRetrievalStrategy.getRecords(anyInt())).thenReturn(response); + RxJavaPlugins.setErrorHandler(e -> e.printStackTrace()); + getRecordsCache.start(sequenceNumber, initialPosition); // @@ -296,7 +304,7 @@ public class PrefetchRecordsPublisherTest { log.info("Queue is currently at {} starting subscriber", getRecordsCache.getRecordsResultQueue.size()); AtomicInteger receivedItems = new AtomicInteger(0); - final int expectedItems = MAX_SIZE * 3; + final int expectedItems = MAX_SIZE * 1000; Object lock = new Object(); @@ -351,6 +359,85 @@ public class PrefetchRecordsPublisherTest { assertThat(receivedItems.get(), equalTo(expectedItems)); } + @Test(timeout = 20000L) + public void testNoDeadlockOnFullQueueAndLossOfNotification() { + // + // Fixes https://github.com/awslabs/amazon-kinesis-client/issues/602 + // + // This test is to verify that the data consumption is not stuck in the case of an failed event delivery + // to the subscriber. + GetRecordsResponse response = GetRecordsResponse.builder().records( + Record.builder().data(SdkBytes.fromByteArray(new byte[] { 1, 2, 3 })).sequenceNumber("123").build()) + .build(); + when(getRecordsRetrievalStrategy.getRecords(anyInt())).thenReturn(response); + + getRecordsCache.start(sequenceNumber, initialPosition); + + // + // Wait for the queue to fill up, and the publisher to block on adding items to the queue. + // + log.info("Waiting for queue to fill up"); + while (getRecordsCache.getRecordsResultQueue.size() < MAX_SIZE) { + Thread.yield(); + } + + log.info("Queue is currently at {} starting subscriber", getRecordsCache.getRecordsResultQueue.size()); + AtomicInteger receivedItems = new AtomicInteger(0); + final int expectedItems = MAX_SIZE * 100; + + Object lock = new Object(); + + Subscriber delegateSubscriber = new Subscriber() { + Subscription sub; + + @Override + public void onSubscribe(Subscription s) { + sub = s; + s.request(1); + } + + @Override + public void onNext(RecordsRetrieved recordsRetrieved) { + receivedItems.incrementAndGet(); + if (receivedItems.get() >= expectedItems) { + synchronized (lock) { + log.info("Notifying waiters"); + lock.notifyAll(); + } + sub.cancel(); + } else { + sub.request(1); + } + } + + @Override + public void onError(Throwable t) { + log.error("Caught error", t); + throw new RuntimeException(t); + } + + @Override + public void onComplete() { + fail("onComplete not expected in this test"); + } + }; + + Subscriber subscriber = new LossyNotificationSubscriber(delegateSubscriber, getRecordsCache); + + synchronized (lock) { + log.info("Awaiting notification"); + Flowable.fromPublisher(getRecordsCache).subscribeOn(Schedulers.computation()) + .observeOn(Schedulers.computation(), true, 8).subscribe(subscriber); + try { + lock.wait(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + verify(getRecordsRetrievalStrategy, atLeast(expectedItems)).getRecords(anyInt()); + assertThat(receivedItems.get(), equalTo(expectedItems)); + } + @Test public void testResetClearsRemainingData() { List responses = Stream.iterate(0, i -> i + 1).limit(10).map(i -> { @@ -372,14 +459,14 @@ public class PrefetchRecordsPublisherTest { getRecordsCache.start(sequenceNumber, initialPosition); - RecordsRetrieved lastProcessed = getRecordsCache.getNextResult(); - RecordsRetrieved expected = getRecordsCache.getNextResult(); + RecordsRetrieved lastProcessed = blockUntilRecordsAvailable(getRecordsCache::pollNextResultAndUpdatePrefetchCounters, 1000); + RecordsRetrieved expected = blockUntilRecordsAvailable(getRecordsCache::pollNextResultAndUpdatePrefetchCounters, 1000); // // Skip some of the records the cache // - getRecordsCache.getNextResult(); - getRecordsCache.getNextResult(); + blockUntilRecordsAvailable(getRecordsCache::pollNextResultAndUpdatePrefetchCounters, 1000); + blockUntilRecordsAvailable(getRecordsCache::pollNextResultAndUpdatePrefetchCounters, 1000); verify(getRecordsRetrievalStrategy, atLeast(2)).getRecords(anyInt()); @@ -388,7 +475,7 @@ public class PrefetchRecordsPublisherTest { } getRecordsCache.restartFrom(lastProcessed); - RecordsRetrieved postRestart = getRecordsCache.getNextResult(); + RecordsRetrieved postRestart = blockUntilRecordsAvailable(getRecordsCache::pollNextResultAndUpdatePrefetchCounters, 1000); assertThat(postRestart.processRecordsInput(), eqProcessRecordsInput(expected.processRecordsInput())); verify(dataFetcher).resetIterator(eq(responses.get(0).nextShardIterator()), @@ -432,6 +519,33 @@ public class PrefetchRecordsPublisherTest { } } + private static class LossyNotificationSubscriber extends ShardConsumerNotifyingSubscriber { + + private static final int LOSS_EVERY_NTH_RECORD = 100; + private static int recordCounter = 0; + private static final ScheduledExecutorService consumerHealthChecker = Executors.newScheduledThreadPool(1); + + public LossyNotificationSubscriber(Subscriber delegate, RecordsPublisher recordsPublisher) { + super(delegate, recordsPublisher); + } + + @Override + public void onNext(RecordsRetrieved recordsRetrieved) { + log.info("Subscriber received onNext"); + if (!(recordCounter % LOSS_EVERY_NTH_RECORD == LOSS_EVERY_NTH_RECORD - 1)) { + getRecordsPublisher().notify(getRecordsDeliveryAck(recordsRetrieved)); + getDelegateSubscriber().onNext(recordsRetrieved); + } else { + log.info("Record Loss Triggered"); + consumerHealthChecker.schedule(() -> { + getRecordsPublisher().restartFrom(recordsRetrieved); + Flowable.fromPublisher(getRecordsPublisher()).subscribeOn(Schedulers.computation()) + .observeOn(Schedulers.computation(), true, 8).subscribe(this); + }, 1000, TimeUnit.MILLISECONDS); + } + recordCounter++; + } + } @After public void shutdown() { getRecordsCache.shutdown(); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/BlockingUtils.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/BlockingUtils.java new file mode 100644 index 00000000..fa10557f --- /dev/null +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/BlockingUtils.java @@ -0,0 +1,38 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package software.amazon.kinesis.utils; + +import java.util.function.Supplier; + +public class BlockingUtils { + + public static Records blockUntilRecordsAvailable(Supplier recordsSupplier, long timeoutMillis) { + Records recordsRetrieved; + while((recordsRetrieved = recordsSupplier.get()) == null && timeoutMillis > 0 ) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + timeoutMillis -= 100; + } + if(recordsRetrieved != null) { + return recordsRetrieved; + } else { + throw new RuntimeException("No records found"); + } + } +}