Move throwable exception handling up a level to prevent daemon thread death
This commit is contained in:
parent
d07192c3e9
commit
0064d1e5fc
3 changed files with 48 additions and 8 deletions
|
|
@ -414,11 +414,14 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
|
|||
makeRetrievalAttempt();
|
||||
} catch(PositionResetException pre) {
|
||||
log.debug("{} : Position was reset while attempting to add item to queue.", streamAndShardId);
|
||||
} catch (Throwable e) {
|
||||
log.error("{} : Unexpected exception was thrown. This could probably be an issue or a bug." +
|
||||
" Please search for the exception/error online to check what is going on. If the " +
|
||||
"issue persists or is a recurring problem, feel free to open an issue on, " +
|
||||
"https://github.com/awslabs/amazon-kinesis-client.", streamAndShardId, e);
|
||||
} finally {
|
||||
resetLock.readLock().unlock();
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
callShutdownOnStrategy();
|
||||
}
|
||||
|
|
@ -469,11 +472,6 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
|
|||
log.error("{} : Exception thrown while fetching records from Kinesis", streamAndShardId, e);
|
||||
} catch (SdkException e) {
|
||||
log.error("{} : Exception thrown while fetching records from Kinesis", streamAndShardId, e);
|
||||
} catch (Throwable e) {
|
||||
log.error("{} : Unexpected exception was thrown. This could probably be an issue or a bug." +
|
||||
" Please search for the exception/error online to check what is going on. If the " +
|
||||
"issue persists or is a recurring problem, feel free to open an issue on, " +
|
||||
"https://github.com/awslabs/amazon-kinesis-client.", streamAndShardId, e);
|
||||
} finally {
|
||||
MetricsUtil.endScope(scope);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -24,10 +24,13 @@ import static org.mockito.Matchers.any;
|
|||
import static org.mockito.Matchers.anyLong;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Mockito.doNothing;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
import static software.amazon.kinesis.utils.BlockingUtils.blockUntilConditionSatisfied;
|
||||
import static software.amazon.kinesis.utils.BlockingUtils.blockUntilRecordsAvailable;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
|
@ -38,6 +41,7 @@ import java.util.concurrent.Executors;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
|
@ -49,7 +53,6 @@ import org.mockito.runners.MockitoJUnitRunner;
|
|||
import org.mockito.stubbing.Answer;
|
||||
|
||||
import software.amazon.awssdk.core.SdkBytes;
|
||||
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
|
||||
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
|
@ -224,6 +227,25 @@ public class PrefetchRecordsPublisherIntegrationTest {
|
|||
verify(dataFetcher).restartIterator();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExpiredIteratorExceptionWithInnerRestartIteratorException() {
|
||||
when(dataFetcher.getRecords())
|
||||
.thenThrow(ExpiredIteratorException.builder().message("ExpiredIterator").build())
|
||||
.thenCallRealMethod()
|
||||
.thenThrow(ExpiredIteratorException.builder().message("ExpiredIterator").build())
|
||||
.thenCallRealMethod();
|
||||
|
||||
doThrow(IllegalStateException.class).when(dataFetcher).restartIterator();
|
||||
|
||||
getRecordsCache.start(extendedSequenceNumber, initialPosition);
|
||||
|
||||
final boolean conditionSatisfied = blockUntilConditionSatisfied(() ->
|
||||
getRecordsCache.getPublisherSession().prefetchRecordsQueue().size() == MAX_SIZE, 5000);
|
||||
Assert.assertTrue(conditionSatisfied);
|
||||
// Asserts the exception was only thrown once for restartIterator
|
||||
verify(dataFetcher, times(2)).restartIterator();
|
||||
}
|
||||
|
||||
private RecordsRetrieved evictPublishedEvent(PrefetchRecordsPublisher publisher, String shardId) {
|
||||
return publisher.getPublisherSession().evictPublishedRecordAndUpdateDemand(shardId);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -448,6 +448,26 @@ public class PrefetchRecordsPublisherTest {
|
|||
verify(dataFetcher).restartIterator();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExpiredIteratorExceptionWithIllegalStateException() {
|
||||
// This test validates that the daemon thread doesn't die when ExpiredIteratorException occurs with an
|
||||
// IllegalStateException.
|
||||
|
||||
when(getRecordsRetrievalStrategy.getRecords(MAX_RECORDS_PER_CALL))
|
||||
.thenThrow(ExpiredIteratorException.builder().build())
|
||||
.thenReturn(getRecordsResponse)
|
||||
.thenThrow(ExpiredIteratorException.builder().build())
|
||||
.thenReturn(getRecordsResponse);
|
||||
|
||||
doThrow(new IllegalStateException()).when(dataFetcher).restartIterator();
|
||||
|
||||
getRecordsCache.start(sequenceNumber, initialPosition);
|
||||
blockUntilConditionSatisfied(() -> getRecordsCache.getPublisherSession().prefetchRecordsQueue().size() == MAX_SIZE, 300);
|
||||
|
||||
// verify restartIterator was called
|
||||
verify(dataFetcher, times(2)).restartIterator();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRetryableRetrievalExceptionContinues() {
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue