Fixing exception type

This commit is contained in:
Ashwin Giridharan 2021-01-26 21:40:22 -08:00
parent 530fd477b1
commit 5142ce9bc5
3 changed files with 50 additions and 6 deletions

View file

@ -14,7 +14,7 @@
*/ */
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibNonRetryableException;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -83,7 +83,7 @@ class InitializeTask implements ITask {
Checkpoint initialCheckpointObject; Checkpoint initialCheckpointObject;
try { try {
initialCheckpointObject = checkpoint.getCheckpointObject(shardInfo.getShardId()); initialCheckpointObject = checkpoint.getCheckpointObject(shardInfo.getShardId());
} catch (KinesisClientLibException e) { } catch (KinesisClientLibNonRetryableException e) {
LOG.error("Caught exception while fetching checkpoint for " + shardInfo.getShardId(), e); LOG.error("Caught exception while fetching checkpoint for " + shardInfo.getShardId(), e);
final TaskResult result = new TaskResult(e); final TaskResult result = new TaskResult(e);
result.leaseNotFound(); result.leaseNotFound();

View file

@ -283,7 +283,8 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator<KinesisClientLea
try { try {
KinesisClientLease lease = leaseManager.getLease(shardId); KinesisClientLease lease = leaseManager.getLease(shardId);
if (lease == null) { if (lease == null) {
throw new KinesisClientLibIOException(errorMessage); // This is a KinesisClientLibNonRetryableException
throw new com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException(errorMessage);
} }
return new Checkpoint(lease.getCheckpoint(), lease.getPendingCheckpoint()); return new Checkpoint(lease.getCheckpoint(), lease.getPendingCheckpoint());
} catch (DependencyException | InvalidStateException | ProvisionedThroughputException e) { } catch (DependencyException | InvalidStateException | ProvisionedThroughputException e) {

View file

@ -54,8 +54,6 @@ import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager;
import com.amazonaws.services.kinesis.leases.impl.LeaseManager;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.hamcrest.Description; import org.hamcrest.Description;
@ -64,7 +62,6 @@ import org.hamcrest.TypeSafeMatcher;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner; import org.mockito.runners.MockitoJUnitRunner;
@ -246,6 +243,52 @@ public class ShardConsumerTest {
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING)));
} }
@Test
public void testInitializationStateTransitionsToShutdownOnLeaseNotFound() throws Exception {
ShardInfo shardInfo = new ShardInfo("s-0-0", "testToken", null, ExtendedSequenceNumber.TRIM_HORIZON);
ICheckpoint checkpoint = new KinesisClientLibLeaseCoordinator(leaseManager, "", 0, 0);
when(leaseManager.getLease(anyString())).thenReturn(null);
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
StreamConfig streamConfig =
new StreamConfig(streamProxy,
1,
10,
callProcessRecordsForEmptyRecordList,
skipCheckpointValidationValue, INITIAL_POSITION_LATEST);
ShardConsumer consumer =
new ShardConsumer(shardInfo,
streamConfig,
checkpoint,
processor,
leaseCoordinator,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
executorService,
metricsFactory,
taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
config,
shardSyncer,
shardSyncStrategy);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard();
Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard();
Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING)));
consumer.consumeShard();
Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTTING_DOWN)));
consumer.consumeShard();
Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE)));
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Test @Test
public final void testRecordProcessorThrowable() throws Exception { public final void testRecordProcessorThrowable() throws Exception {