Better handling of get checkpoint object in KinesisClientLibLeaseCoordinator (#558)
Improve the handling of get checkpoint object in KinesisClientLibLeaseCoordinator by no longer triggering a null pointer exception when lease is null.
This commit is contained in:
parent
73dc2aaa76
commit
3aefdfa28a
2 changed files with 16 additions and 3 deletions
|
|
@ -282,13 +282,16 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator<KinesisClientLea
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public Checkpoint getCheckpointObject(String shardId) throws KinesisClientLibException {
|
public Checkpoint getCheckpointObject(String shardId) throws KinesisClientLibException {
|
||||||
|
String errorMessage = "Unable to fetch checkpoint for shardId " + shardId;
|
||||||
try {
|
try {
|
||||||
KinesisClientLease lease = leaseManager.getLease(shardId);
|
KinesisClientLease lease = leaseManager.getLease(shardId);
|
||||||
|
if (lease == null) {
|
||||||
|
throw new KinesisClientLibIOException(errorMessage);
|
||||||
|
}
|
||||||
return new Checkpoint(lease.getCheckpoint(), lease.getPendingCheckpoint());
|
return new Checkpoint(lease.getCheckpoint(), lease.getPendingCheckpoint());
|
||||||
} catch (DependencyException | InvalidStateException | ProvisionedThroughputException e) {
|
} catch (DependencyException | InvalidStateException | ProvisionedThroughputException e) {
|
||||||
String message = "Unable to fetch checkpoint for shardId " + shardId;
|
LOG.error(errorMessage, e);
|
||||||
LOG.error(message, e);
|
throw new KinesisClientLibIOException(errorMessage, e);
|
||||||
throw new KinesisClientLibIOException(message, e);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -15,10 +15,12 @@
|
||||||
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||||
|
|
||||||
import static org.mockito.Matchers.anyLong;
|
import static org.mockito.Matchers.anyLong;
|
||||||
|
import static org.mockito.Matchers.anyString;
|
||||||
import static org.mockito.Mockito.doReturn;
|
import static org.mockito.Mockito.doReturn;
|
||||||
|
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
|
||||||
|
import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.KinesisClientLibIOException;
|
||||||
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
|
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
|
||||||
import com.amazonaws.services.kinesis.leases.impl.GenericLeaseSelector;
|
import com.amazonaws.services.kinesis.leases.impl.GenericLeaseSelector;
|
||||||
import com.amazonaws.services.kinesis.leases.interfaces.LeaseSelector;
|
import com.amazonaws.services.kinesis.leases.interfaces.LeaseSelector;
|
||||||
|
|
@ -76,4 +78,12 @@ public class KinesisClientLibLeaseCoordinatorTest {
|
||||||
doReturn(false).when(mockLeaseManager).waitUntilLeaseTableExists(anyLong(), anyLong());
|
doReturn(false).when(mockLeaseManager).waitUntilLeaseTableExists(anyLong(), anyLong());
|
||||||
leaseCoordinator.initialize();
|
leaseCoordinator.initialize();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(expected = KinesisClientLibIOException.class)
|
||||||
|
public void testGetCheckpointObjectWithNoLease()
|
||||||
|
throws DependencyException, ProvisionedThroughputException, IllegalStateException, InvalidStateException,
|
||||||
|
KinesisClientLibException {
|
||||||
|
doReturn(null).when(mockLeaseManager).getLease(anyString());
|
||||||
|
leaseCoordinator.getCheckpointObject(SHARD_ID);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue