Merge pull request #775 from Renjuju/fix-daemon-thread-exception-handling

Move throwable exception handling up a level to prevent daemon thread death
This commit is contained in:
ashwing 2021-01-22 13:27:19 -08:00 committed by GitHub
commit e03285508e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 48 additions and 8 deletions

View file

@ -414,11 +414,14 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
makeRetrievalAttempt();
} catch(PositionResetException pre) {
log.debug("{} : Position was reset while attempting to add item to queue.", streamAndShardId);
} catch (Throwable e) {
log.error("{} : Unexpected exception was thrown. This could probably be an issue or a bug." +
" Please search for the exception/error online to check what is going on. If the " +
"issue persists or is a recurring problem, feel free to open an issue on, " +
"https://github.com/awslabs/amazon-kinesis-client.", streamAndShardId, e);
} finally {
resetLock.readLock().unlock();
}
}
callShutdownOnStrategy();
}
@ -469,11 +472,6 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
log.error("{} : Exception thrown while fetching records from Kinesis", streamAndShardId, e);
} catch (SdkException e) {
log.error("{} : Exception thrown while fetching records from Kinesis", streamAndShardId, e);
} catch (Throwable e) {
log.error("{} : Unexpected exception was thrown. This could probably be an issue or a bug." +
" Please search for the exception/error online to check what is going on. If the " +
"issue persists or is a recurring problem, feel free to open an issue on, " +
"https://github.com/awslabs/amazon-kinesis-client.", streamAndShardId, e);
} finally {
MetricsUtil.endScope(scope);
}

View file

@ -24,10 +24,13 @@ import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static software.amazon.kinesis.utils.BlockingUtils.blockUntilConditionSatisfied;
import static software.amazon.kinesis.utils.BlockingUtils.blockUntilRecordsAvailable;
import java.util.ArrayList;
@ -38,6 +41,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
@ -49,7 +53,6 @@ import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import lombok.extern.slf4j.Slf4j;
@ -224,6 +227,25 @@ public class PrefetchRecordsPublisherIntegrationTest {
verify(dataFetcher).restartIterator();
}
@Test
public void testExpiredIteratorExceptionWithInnerRestartIteratorException() {
when(dataFetcher.getRecords())
.thenThrow(ExpiredIteratorException.builder().message("ExpiredIterator").build())
.thenCallRealMethod()
.thenThrow(ExpiredIteratorException.builder().message("ExpiredIterator").build())
.thenCallRealMethod();
doThrow(IllegalStateException.class).when(dataFetcher).restartIterator();
getRecordsCache.start(extendedSequenceNumber, initialPosition);
final boolean conditionSatisfied = blockUntilConditionSatisfied(() ->
getRecordsCache.getPublisherSession().prefetchRecordsQueue().size() == MAX_SIZE, 5000);
Assert.assertTrue(conditionSatisfied);
// Asserts the exception was only thrown once for restartIterator
verify(dataFetcher, times(2)).restartIterator();
}
private RecordsRetrieved evictPublishedEvent(PrefetchRecordsPublisher publisher, String shardId) {
return publisher.getPublisherSession().evictPublishedRecordAndUpdateDemand(shardId);
}

View file

@ -448,6 +448,26 @@ public class PrefetchRecordsPublisherTest {
verify(dataFetcher).restartIterator();
}
@Test
public void testExpiredIteratorExceptionWithIllegalStateException() {
// This test validates that the daemon thread doesn't die when ExpiredIteratorException occurs with an
// IllegalStateException.
when(getRecordsRetrievalStrategy.getRecords(MAX_RECORDS_PER_CALL))
.thenThrow(ExpiredIteratorException.builder().build())
.thenReturn(getRecordsResponse)
.thenThrow(ExpiredIteratorException.builder().build())
.thenReturn(getRecordsResponse);
doThrow(new IllegalStateException()).when(dataFetcher).restartIterator();
getRecordsCache.start(sequenceNumber, initialPosition);
blockUntilConditionSatisfied(() -> getRecordsCache.getPublisherSession().prefetchRecordsQueue().size() == MAX_SIZE, 300);
// verify restartIterator was called
verify(dataFetcher, times(2)).restartIterator();
}
@Test
public void testRetryableRetrievalExceptionContinues() {