diff --git a/.gitignore b/.gitignore index 863e68d5..b765c8ea 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ target/ AwsCredentials.properties .idea +amazon-kinesis-client.iml \ No newline at end of file diff --git a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/exceptions/MissingIncompleteLeasesException.java b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/exceptions/MissingIncompleteLeasesException.java new file mode 100644 index 00000000..3af368ef --- /dev/null +++ b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/exceptions/MissingIncompleteLeasesException.java @@ -0,0 +1,16 @@ +package com.fivetran.external.com.amazonaws.services.kinesis.clientlibrary.exceptions; + +import java.util.Set; + +public class MissingIncompleteLeasesException extends RuntimeException { + private final Set leases; + + public MissingIncompleteLeasesException(Set leases) { + super("missing leases: " + String.join(",", leases)); + this.leases = leases; + } + + public Set getLeases() { + return leases; + } +} diff --git a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java index 02563196..7a96c396 100644 --- a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java +++ b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java @@ -30,6 +30,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.commons.lang3.StringUtils; +import com.fivetran.external.com.amazonaws.services.kinesis.clientlibrary.exceptions.MissingIncompleteLeasesException; import com.fivetran.external.com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.KinesisClientLibIOException; import com.fivetran.external.com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; import com.fivetran.external.com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; @@ -610,10 +611,11 @@ class ShardSyncer { garbageLeases.add(lease); } } - + if (!garbageLeases.isEmpty()) { + Set unfinishedAndMissing = new HashSet<>(); LOG.info("Found " + garbageLeases.size() - + " candidate leases for cleanup. Refreshing list of" + + " candidate leases for cleanup. Refreshing list of" + " Kinesis shards to pick up recent/latest shards"); List currentShardList = getShardList(kinesisProxy); Set currentKinesisShardIds = new HashSet<>(); @@ -623,11 +625,17 @@ class ShardSyncer { for (KinesisClientLease lease : garbageLeases) { if (isCandidateForCleanup(lease, currentKinesisShardIds)) { - LOG.info("Deleting lease for shard " + lease.getLeaseKey() - + " as it is not present in Kinesis stream."); - leaseManager.deleteLease(lease); + if (lease.isCompete()) { + LOG.info("Deleting lease for a complete shard " + lease.getLeaseKey() + + " as it is not present in Kinesis stream."); + leaseManager.deleteLease(lease); + } else { + unfinishedAndMissing.add(lease.getLeaseKey()); + } } } + + if (!unfinishedAndMissing.isEmpty()) throw new MissingIncompleteLeasesException(unfinishedAndMissing); } } @@ -694,7 +702,7 @@ class ShardSyncer { Set shardIdsOfClosedShards = new HashSet<>(); List leasesOfClosedShards = new ArrayList<>(); for (KinesisClientLease lease : currentLeases) { - if (lease.getCheckpoint().equals(ExtendedSequenceNumber.SHARD_END)) { + if (lease.isCompete()) { shardIdsOfClosedShards.add(lease.getLeaseKey()); leasesOfClosedShards.add(lease); } diff --git a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index f4e2d0fc..04b56a74 100644 --- a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -32,6 +32,7 @@ import java.util.function.Consumer; import com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException; import com.amazonaws.services.dynamodbv2.model.TrimmedDataAccessException; +import com.fivetran.external.com.amazonaws.services.kinesis.clientlibrary.exceptions.MissingIncompleteLeasesException; import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -528,6 +529,7 @@ public class Worker implements Runnable { throw e; } catch (Exception e) { handleTrimmedDataAccessException(e); + handleMissingIncompleteLeasesException(e); if (causedByStreamRecordProcessingError(e)) throw new RuntimeException("Failing worker after irrecoverable failure: " + e.getMessage()); @@ -561,12 +563,36 @@ public class Worker implements Runnable { } private Optional getTrimmedDataAccessException(Throwable t) { - if (t.getCause() == null) return Optional.empty(); - if (t.getCause().getClass().equals(TrimmedDataAccessException.class)) return Optional.of( (TrimmedDataAccessException) t.getCause()); + if (t.getClass().equals(TrimmedDataAccessException.class)) { + return Optional.of( (TrimmedDataAccessException) t); + } + if (t.getCause().getClass().equals(TrimmedDataAccessException.class)) { + return Optional.of( (TrimmedDataAccessException) t.getCause()); + } else if (t.getCause() == null) { + return Optional.empty(); + } return getTrimmedDataAccessException(t.getCause()); } + private void handleMissingIncompleteLeasesException(Exception e) { + Optional maybeMissingLeaseException = getMissingIncompleteLeasesException(e); + if (maybeMissingLeaseException.isPresent()) throw maybeMissingLeaseException.get(); + } + + private Optional getMissingIncompleteLeasesException(Throwable t) { + if (t.getClass().equals(MissingIncompleteLeasesException.class)) { + return Optional.of( (MissingIncompleteLeasesException) t); + } + if (t.getCause().getClass().equals(MissingIncompleteLeasesException.class)) { + return Optional.of( (MissingIncompleteLeasesException) t.getCause()); + } else if (t.getCause() == null) { + return Optional.empty(); + } + + return getMissingIncompleteLeasesException(t.getCause()); + } + private void initialize() { workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.INITIALIZING); boolean isDone = false; @@ -600,7 +626,10 @@ public class Worker implements Runnable { isDone = true; } else { lastException = result.getException(); + handleMissingIncompleteLeasesException(lastException); } + } catch (MissingIncompleteLeasesException e) { + throw e; } catch (LeasingException e) { LOG.error("Caught exception when initializing LeaseCoordinator", e); lastException = e; diff --git a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/leases/impl/KinesisClientLease.java b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/leases/impl/KinesisClientLease.java index 2afa2e0f..d4605e27 100644 --- a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/leases/impl/KinesisClientLease.java +++ b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/leases/impl/KinesisClientLease.java @@ -71,6 +71,10 @@ public class KinesisClientLease extends Lease { setParentShardIds(casted.parentShardIds); } + public boolean isCompete() { + return checkpoint.equals(ExtendedSequenceNumber.SHARD_END); + } + /** * @return most recently application-supplied checkpoint value. During fail over, the new worker will pick up after * the old worker's last checkpoint.