refactor(worker): consolidate exception checking logic

This commit is contained in:
bencvdb 2021-07-06 16:40:46 -07:00
parent 981bacd512
commit a8551d5389
3 changed files with 11 additions and 24 deletions

View file

@ -625,7 +625,7 @@ class ShardSyncer {
for (KinesisClientLease lease : garbageLeases) {
if (isCandidateForCleanup(lease, currentKinesisShardIds)) {
if (lease.isCompete()) {
if (lease.isComplete()) {
LOG.info("Deleting lease for a complete shard " + lease.getLeaseKey()
+ " as it is not present in Kinesis stream.");
leaseManager.deleteLease(lease);
@ -702,7 +702,7 @@ class ShardSyncer {
Set<String> shardIdsOfClosedShards = new HashSet<>();
List<KinesisClientLease> leasesOfClosedShards = new ArrayList<>();
for (KinesisClientLease lease : currentLeases) {
if (lease.isCompete()) {
if (lease.isComplete()) {
shardIdsOfClosedShards.add(lease.getLeaseKey());
leasesOfClosedShards.add(lease);
}

View file

@ -558,39 +558,26 @@ public class Worker implements Runnable {
}
private void handleTrimmedDataAccessException(Exception e) {
Optional<TrimmedDataAccessException> maybeTrimmedException = getTrimmedDataAccessException(e);
Optional<TrimmedDataAccessException> maybeTrimmedException = getCauseOfType(TrimmedDataAccessException.class, e);
if (maybeTrimmedException.isPresent()) throw maybeTrimmedException.get();
}
private Optional<TrimmedDataAccessException> getTrimmedDataAccessException(Throwable t) {
if (t.getClass().equals(TrimmedDataAccessException.class)) {
return Optional.of( (TrimmedDataAccessException) t);
}
if (t.getCause().getClass().equals(TrimmedDataAccessException.class)) {
return Optional.of( (TrimmedDataAccessException) t.getCause());
} else if (t.getCause() == null) {
return Optional.empty();
}
return getTrimmedDataAccessException(t.getCause());
}
private void handleMissingIncompleteLeasesException(Exception e) {
Optional<MissingIncompleteLeasesException> maybeMissingLeaseException = getMissingIncompleteLeasesException(e);
Optional<MissingIncompleteLeasesException> maybeMissingLeaseException = getCauseOfType(MissingIncompleteLeasesException.class, e);
if (maybeMissingLeaseException.isPresent()) throw maybeMissingLeaseException.get();
}
private Optional<MissingIncompleteLeasesException> getMissingIncompleteLeasesException(Throwable t) {
if (t.getClass().equals(MissingIncompleteLeasesException.class)) {
return Optional.of( (MissingIncompleteLeasesException) t);
private <T extends Throwable> Optional<T> getCauseOfType(Class<T> clazz, Throwable t) {
if (t.getClass().equals(clazz)) {
return Optional.of( (T) t);
}
if (t.getCause().getClass().equals(MissingIncompleteLeasesException.class)) {
return Optional.of( (MissingIncompleteLeasesException) t.getCause());
if (t.getCause().getClass().equals(clazz)) {
return Optional.of( (T) t.getCause());
} else if (t.getCause() == null) {
return Optional.empty();
}
return getMissingIncompleteLeasesException(t.getCause());
return getCauseOfType(clazz, t.getCause());
}
private void initialize() {

View file

@ -71,7 +71,7 @@ public class KinesisClientLease extends Lease {
setParentShardIds(casted.parentShardIds);
}
public boolean isCompete() {
public boolean isComplete() {
return checkpoint.equals(ExtendedSequenceNumber.SHARD_END);
}