From 653a25456cbbaa5053774b7f9bde1e418ddc2442 Mon Sep 17 00:00:00 2001 From: Chunxue Yang Date: Thu, 7 Nov 2019 13:37:45 -0800 Subject: [PATCH] Debug remote unit test failure --- .../amazon/kinesis/lifecycle/ShardConsumerSubscriber.java | 5 ++++- .../kinesis/lifecycle/ShardConsumerSubscriberTest.java | 4 ++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java index 14e347a2..566ed385 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java @@ -80,6 +80,7 @@ class ShardConsumerSubscriber implements Subscriber { lastRequestTime = Instant.now(); if (lastAccepted != null) { recordsPublisher.restartFrom(lastAccepted); + log.warn("Last record was accepted! Record Publisher restarted from the last accepted record."); } Flowable.fromPublisher(recordsPublisher).subscribeOn(scheduler).observeOn(scheduler, true, bufferSize) .subscribe(new ShardConsumerNotifyingSubscriber(this, recordsPublisher)); @@ -90,6 +91,7 @@ class ShardConsumerSubscriber implements Subscriber { Throwable result = restartIfFailed(); if (result == null) { restartIfRequestTimerExpired(maxTimeBetweenRequests); + log.warn("healthCheck result is null, called restartIfRequestTimerExpired"); } return result; } @@ -109,7 +111,7 @@ class ShardConsumerSubscriber implements Subscriber { String logMessage = String.format("%s: Failure occurred in retrieval. Restarting data requests", shardConsumer.shardInfo().shardId()); if (retrievalFailure instanceof RetryableRetrievalException) { - log.debug(logMessage, retrievalFailure.getCause()); + log.info(logMessage, retrievalFailure.getCause()); } else { log.warn(logMessage, retrievalFailure); } @@ -135,6 +137,7 @@ class ShardConsumerSubscriber implements Subscriber { // Start the subscription again which will update the lastRequestTime as well. startSubscriptions(); + log.info("No responses. Called previous subscription and called startSubscriptions."); } } } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java index a4002ab9..d775107a 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java @@ -325,6 +325,7 @@ public class ShardConsumerSubscriberTest { subscriber = new ShardConsumerSubscriber(recordsPublisher, executorService, bufferSize, shardConsumer, 0); addUniqueItem(1); + log.info("recordsPublisher and subscriber is created successfully."); List received = new ArrayList<>(); doAnswer(a -> { ProcessRecordsInput input = a.getArgumentAt(0, ProcessRecordsInput.class); @@ -346,6 +347,7 @@ public class ShardConsumerSubscriberTest { // subscription has not started correctly. verify(shardConsumer, never()).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)), any(Subscription.class)); + log.info("Verified no records were sent back and subscription has not started correctly"); Stream.iterate(2, i -> i + 1).limit(98).forEach(this::addUniqueItem); @@ -353,6 +355,7 @@ public class ShardConsumerSubscriberTest { // Doing the health check to allow the subscription to restart. assertThat(subscriber.healthCheck(1), nullValue()); + log.info("Health check was successful without exceptions"); // Allow time for processing of the records to end in the executor thread which call notifyAll as it gets the // terminal record. Keeping the timeout pretty high for avoiding test failures on slow machines. @@ -363,6 +366,7 @@ public class ShardConsumerSubscriberTest { // Verify that shardConsumer mock was called 100 times and all 100 input records are processed. verify(shardConsumer, times(100)).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)), any(Subscription.class)); + log.info("Verified that handleInput was called 100 times"); // Verify that received records in the subscriber are equal to the ones sent by the record publisher. assertThat(received.size(), equalTo(recordsPublisher.responses.size()));