diff --git a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java index 7a96c396..59a9a505 100644 --- a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java +++ b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java @@ -625,7 +625,7 @@ class ShardSyncer { for (KinesisClientLease lease : garbageLeases) { if (isCandidateForCleanup(lease, currentKinesisShardIds)) { - if (lease.isCompete()) { + if (lease.isComplete()) { LOG.info("Deleting lease for a complete shard " + lease.getLeaseKey() + " as it is not present in Kinesis stream."); leaseManager.deleteLease(lease); @@ -702,7 +702,7 @@ class ShardSyncer { Set shardIdsOfClosedShards = new HashSet<>(); List leasesOfClosedShards = new ArrayList<>(); for (KinesisClientLease lease : currentLeases) { - if (lease.isCompete()) { + if (lease.isComplete()) { shardIdsOfClosedShards.add(lease.getLeaseKey()); leasesOfClosedShards.add(lease); } diff --git a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 04b56a74..5361830a 100644 --- a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -558,39 +558,26 @@ public class Worker implements Runnable { } private void handleTrimmedDataAccessException(Exception e) { - Optional maybeTrimmedException = getTrimmedDataAccessException(e); + Optional maybeTrimmedException = getCauseOfType(TrimmedDataAccessException.class, e); if (maybeTrimmedException.isPresent()) throw maybeTrimmedException.get(); } - private Optional getTrimmedDataAccessException(Throwable t) { - if (t.getClass().equals(TrimmedDataAccessException.class)) { - return Optional.of( (TrimmedDataAccessException) t); - } - if (t.getCause().getClass().equals(TrimmedDataAccessException.class)) { - return Optional.of( (TrimmedDataAccessException) t.getCause()); - } else if (t.getCause() == null) { - return Optional.empty(); - } - - return getTrimmedDataAccessException(t.getCause()); - } - private void handleMissingIncompleteLeasesException(Exception e) { - Optional maybeMissingLeaseException = getMissingIncompleteLeasesException(e); + Optional maybeMissingLeaseException = getCauseOfType(MissingIncompleteLeasesException.class, e); if (maybeMissingLeaseException.isPresent()) throw maybeMissingLeaseException.get(); } - private Optional getMissingIncompleteLeasesException(Throwable t) { - if (t.getClass().equals(MissingIncompleteLeasesException.class)) { - return Optional.of( (MissingIncompleteLeasesException) t); + private Optional getCauseOfType(Class clazz, Throwable t) { + if (t.getClass().equals(clazz)) { + return Optional.of( (T) t); } - if (t.getCause().getClass().equals(MissingIncompleteLeasesException.class)) { - return Optional.of( (MissingIncompleteLeasesException) t.getCause()); + if (t.getCause().getClass().equals(clazz)) { + return Optional.of( (T) t.getCause()); } else if (t.getCause() == null) { return Optional.empty(); } - return getMissingIncompleteLeasesException(t.getCause()); + return getCauseOfType(clazz, t.getCause()); } private void initialize() { diff --git a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/leases/impl/KinesisClientLease.java b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/leases/impl/KinesisClientLease.java index d4605e27..97b8e0f7 100644 --- a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/leases/impl/KinesisClientLease.java +++ b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/leases/impl/KinesisClientLease.java @@ -71,7 +71,7 @@ public class KinesisClientLease extends Lease { setParentShardIds(casted.parentShardIds); } - public boolean isCompete() { + public boolean isComplete() { return checkpoint.equals(ExtendedSequenceNumber.SHARD_END); }