From c8a1797b7eed474cb7fe91f9622a0a04b60841ca Mon Sep 17 00:00:00 2001 From: bencvdb Date: Thu, 1 Jul 2021 09:57:34 -0700 Subject: [PATCH 1/5] fix(worker): fail on TrimmedDataAccessException (101797) --- .../kinesis/clientlibrary/lib/worker/Worker.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index ee9a2f21..f4e2d0fc 100644 --- a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException; +import com.amazonaws.services.dynamodbv2.model.TrimmedDataAccessException; import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -526,6 +527,8 @@ public class Worker implements Runnable { } catch (AmazonDynamoDBException e) { throw e; } catch (Exception e) { + handleTrimmedDataAccessException(e); + if (causedByStreamRecordProcessingError(e)) throw new RuntimeException("Failing worker after irrecoverable failure: " + e.getMessage()); if (exitOnFailure && retries.getAndIncrement() > MAX_RETRIES) @@ -552,6 +555,18 @@ public class Worker implements Runnable { return causedByStreamRecordProcessingError(t.getCause()); } + private void handleTrimmedDataAccessException(Exception e) { + Optional maybeTrimmedException = getTrimmedDataAccessException(e); + if (maybeTrimmedException.isPresent()) throw maybeTrimmedException.get(); + } + + private Optional getTrimmedDataAccessException(Throwable t) { + if (t.getCause() == null) return Optional.empty(); + if (t.getCause().getClass().equals(TrimmedDataAccessException.class)) return Optional.of( (TrimmedDataAccessException) t.getCause()); + + return getTrimmedDataAccessException(t.getCause()); + } + private void initialize() { workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.INITIALIZING); boolean isDone = false; From 981bacd512c6e86b1420a70e6c992746fed80039 Mon Sep 17 00:00:00 2001 From: bencvdb Date: Tue, 6 Jul 2021 10:24:32 -0700 Subject: [PATCH 2/5] fix(worker): fail on incomplete garbage lease --- .gitignore | 1 + .../MissingIncompleteLeasesException.java | 16 +++++++++ .../clientlibrary/lib/worker/ShardSyncer.java | 20 +++++++---- .../clientlibrary/lib/worker/Worker.java | 33 +++++++++++++++++-- .../leases/impl/KinesisClientLease.java | 4 +++ 5 files changed, 66 insertions(+), 8 deletions(-) create mode 100644 src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/exceptions/MissingIncompleteLeasesException.java diff --git a/.gitignore b/.gitignore index 863e68d5..b765c8ea 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ target/ AwsCredentials.properties .idea +amazon-kinesis-client.iml \ No newline at end of file diff --git a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/exceptions/MissingIncompleteLeasesException.java b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/exceptions/MissingIncompleteLeasesException.java new file mode 100644 index 00000000..3af368ef --- /dev/null +++ b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/exceptions/MissingIncompleteLeasesException.java @@ -0,0 +1,16 @@ +package com.fivetran.external.com.amazonaws.services.kinesis.clientlibrary.exceptions; + +import java.util.Set; + +public class MissingIncompleteLeasesException extends RuntimeException { + private final Set leases; + + public MissingIncompleteLeasesException(Set leases) { + super("missing leases: " + String.join(",", leases)); + this.leases = leases; + } + + public Set getLeases() { + return leases; + } +} diff --git a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java index 02563196..7a96c396 100644 --- a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java +++ b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java @@ -30,6 +30,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.commons.lang3.StringUtils; +import com.fivetran.external.com.amazonaws.services.kinesis.clientlibrary.exceptions.MissingIncompleteLeasesException; import com.fivetran.external.com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.KinesisClientLibIOException; import com.fivetran.external.com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; import com.fivetran.external.com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; @@ -610,10 +611,11 @@ class ShardSyncer { garbageLeases.add(lease); } } - + if (!garbageLeases.isEmpty()) { + Set unfinishedAndMissing = new HashSet<>(); LOG.info("Found " + garbageLeases.size() - + " candidate leases for cleanup. Refreshing list of" + + " candidate leases for cleanup. Refreshing list of" + " Kinesis shards to pick up recent/latest shards"); List currentShardList = getShardList(kinesisProxy); Set currentKinesisShardIds = new HashSet<>(); @@ -623,11 +625,17 @@ class ShardSyncer { for (KinesisClientLease lease : garbageLeases) { if (isCandidateForCleanup(lease, currentKinesisShardIds)) { - LOG.info("Deleting lease for shard " + lease.getLeaseKey() - + " as it is not present in Kinesis stream."); - leaseManager.deleteLease(lease); + if (lease.isCompete()) { + LOG.info("Deleting lease for a complete shard " + lease.getLeaseKey() + + " as it is not present in Kinesis stream."); + leaseManager.deleteLease(lease); + } else { + unfinishedAndMissing.add(lease.getLeaseKey()); + } } } + + if (!unfinishedAndMissing.isEmpty()) throw new MissingIncompleteLeasesException(unfinishedAndMissing); } } @@ -694,7 +702,7 @@ class ShardSyncer { Set shardIdsOfClosedShards = new HashSet<>(); List leasesOfClosedShards = new ArrayList<>(); for (KinesisClientLease lease : currentLeases) { - if (lease.getCheckpoint().equals(ExtendedSequenceNumber.SHARD_END)) { + if (lease.isCompete()) { shardIdsOfClosedShards.add(lease.getLeaseKey()); leasesOfClosedShards.add(lease); } diff --git a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index f4e2d0fc..04b56a74 100644 --- a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -32,6 +32,7 @@ import java.util.function.Consumer; import com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException; import com.amazonaws.services.dynamodbv2.model.TrimmedDataAccessException; +import com.fivetran.external.com.amazonaws.services.kinesis.clientlibrary.exceptions.MissingIncompleteLeasesException; import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -528,6 +529,7 @@ public class Worker implements Runnable { throw e; } catch (Exception e) { handleTrimmedDataAccessException(e); + handleMissingIncompleteLeasesException(e); if (causedByStreamRecordProcessingError(e)) throw new RuntimeException("Failing worker after irrecoverable failure: " + e.getMessage()); @@ -561,12 +563,36 @@ public class Worker implements Runnable { } private Optional getTrimmedDataAccessException(Throwable t) { - if (t.getCause() == null) return Optional.empty(); - if (t.getCause().getClass().equals(TrimmedDataAccessException.class)) return Optional.of( (TrimmedDataAccessException) t.getCause()); + if (t.getClass().equals(TrimmedDataAccessException.class)) { + return Optional.of( (TrimmedDataAccessException) t); + } + if (t.getCause().getClass().equals(TrimmedDataAccessException.class)) { + return Optional.of( (TrimmedDataAccessException) t.getCause()); + } else if (t.getCause() == null) { + return Optional.empty(); + } return getTrimmedDataAccessException(t.getCause()); } + private void handleMissingIncompleteLeasesException(Exception e) { + Optional maybeMissingLeaseException = getMissingIncompleteLeasesException(e); + if (maybeMissingLeaseException.isPresent()) throw maybeMissingLeaseException.get(); + } + + private Optional getMissingIncompleteLeasesException(Throwable t) { + if (t.getClass().equals(MissingIncompleteLeasesException.class)) { + return Optional.of( (MissingIncompleteLeasesException) t); + } + if (t.getCause().getClass().equals(MissingIncompleteLeasesException.class)) { + return Optional.of( (MissingIncompleteLeasesException) t.getCause()); + } else if (t.getCause() == null) { + return Optional.empty(); + } + + return getMissingIncompleteLeasesException(t.getCause()); + } + private void initialize() { workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.INITIALIZING); boolean isDone = false; @@ -600,7 +626,10 @@ public class Worker implements Runnable { isDone = true; } else { lastException = result.getException(); + handleMissingIncompleteLeasesException(lastException); } + } catch (MissingIncompleteLeasesException e) { + throw e; } catch (LeasingException e) { LOG.error("Caught exception when initializing LeaseCoordinator", e); lastException = e; diff --git a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/leases/impl/KinesisClientLease.java b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/leases/impl/KinesisClientLease.java index 2afa2e0f..d4605e27 100644 --- a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/leases/impl/KinesisClientLease.java +++ b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/leases/impl/KinesisClientLease.java @@ -71,6 +71,10 @@ public class KinesisClientLease extends Lease { setParentShardIds(casted.parentShardIds); } + public boolean isCompete() { + return checkpoint.equals(ExtendedSequenceNumber.SHARD_END); + } + /** * @return most recently application-supplied checkpoint value. During fail over, the new worker will pick up after * the old worker's last checkpoint. From a8551d538992f66aeaf0063f42271dc7d2e6ee94 Mon Sep 17 00:00:00 2001 From: bencvdb Date: Tue, 6 Jul 2021 16:40:46 -0700 Subject: [PATCH 3/5] refactor(worker): consolidate exception checking logic --- .../clientlibrary/lib/worker/ShardSyncer.java | 4 +-- .../clientlibrary/lib/worker/Worker.java | 29 +++++-------------- .../leases/impl/KinesisClientLease.java | 2 +- 3 files changed, 11 insertions(+), 24 deletions(-) diff --git a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java index 7a96c396..59a9a505 100644 --- a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java +++ b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java @@ -625,7 +625,7 @@ class ShardSyncer { for (KinesisClientLease lease : garbageLeases) { if (isCandidateForCleanup(lease, currentKinesisShardIds)) { - if (lease.isCompete()) { + if (lease.isComplete()) { LOG.info("Deleting lease for a complete shard " + lease.getLeaseKey() + " as it is not present in Kinesis stream."); leaseManager.deleteLease(lease); @@ -702,7 +702,7 @@ class ShardSyncer { Set shardIdsOfClosedShards = new HashSet<>(); List leasesOfClosedShards = new ArrayList<>(); for (KinesisClientLease lease : currentLeases) { - if (lease.isCompete()) { + if (lease.isComplete()) { shardIdsOfClosedShards.add(lease.getLeaseKey()); leasesOfClosedShards.add(lease); } diff --git a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 04b56a74..5361830a 100644 --- a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -558,39 +558,26 @@ public class Worker implements Runnable { } private void handleTrimmedDataAccessException(Exception e) { - Optional maybeTrimmedException = getTrimmedDataAccessException(e); + Optional maybeTrimmedException = getCauseOfType(TrimmedDataAccessException.class, e); if (maybeTrimmedException.isPresent()) throw maybeTrimmedException.get(); } - private Optional getTrimmedDataAccessException(Throwable t) { - if (t.getClass().equals(TrimmedDataAccessException.class)) { - return Optional.of( (TrimmedDataAccessException) t); - } - if (t.getCause().getClass().equals(TrimmedDataAccessException.class)) { - return Optional.of( (TrimmedDataAccessException) t.getCause()); - } else if (t.getCause() == null) { - return Optional.empty(); - } - - return getTrimmedDataAccessException(t.getCause()); - } - private void handleMissingIncompleteLeasesException(Exception e) { - Optional maybeMissingLeaseException = getMissingIncompleteLeasesException(e); + Optional maybeMissingLeaseException = getCauseOfType(MissingIncompleteLeasesException.class, e); if (maybeMissingLeaseException.isPresent()) throw maybeMissingLeaseException.get(); } - private Optional getMissingIncompleteLeasesException(Throwable t) { - if (t.getClass().equals(MissingIncompleteLeasesException.class)) { - return Optional.of( (MissingIncompleteLeasesException) t); + private Optional getCauseOfType(Class clazz, Throwable t) { + if (t.getClass().equals(clazz)) { + return Optional.of( (T) t); } - if (t.getCause().getClass().equals(MissingIncompleteLeasesException.class)) { - return Optional.of( (MissingIncompleteLeasesException) t.getCause()); + if (t.getCause().getClass().equals(clazz)) { + return Optional.of( (T) t.getCause()); } else if (t.getCause() == null) { return Optional.empty(); } - return getMissingIncompleteLeasesException(t.getCause()); + return getCauseOfType(clazz, t.getCause()); } private void initialize() { diff --git a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/leases/impl/KinesisClientLease.java b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/leases/impl/KinesisClientLease.java index d4605e27..97b8e0f7 100644 --- a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/leases/impl/KinesisClientLease.java +++ b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/leases/impl/KinesisClientLease.java @@ -71,7 +71,7 @@ public class KinesisClientLease extends Lease { setParentShardIds(casted.parentShardIds); } - public boolean isCompete() { + public boolean isComplete() { return checkpoint.equals(ExtendedSequenceNumber.SHARD_END); } From f2dd16257da4a613055fda09e930a00f38e46162 Mon Sep 17 00:00:00 2001 From: bencvdb Date: Tue, 6 Jul 2021 16:47:07 -0700 Subject: [PATCH 4/5] fix(worker): actually throw the error --- .../services/kinesis/clientlibrary/lib/worker/Worker.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 5361830a..70b232b7 100644 --- a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -486,6 +486,10 @@ public class Worker implements Runnable { try { initialize(); LOG.info("Initialization complete. Starting worker loop."); + } catch (MissingIncompleteLeasesException e1){ + LOG.error("Unable to initialize because some incomplete leases were missing. Shutting down.", e1); + shutdown(); + throw e1; } catch (RuntimeException e1) { LOG.error("Unable to initialize after " + MAX_INITIALIZATION_ATTEMPTS + " attempts. Shutting down.", e1); shutdown(); From 52199a24ee081eaa1110d4b85187b4774edec597 Mon Sep 17 00:00:00 2001 From: bencvdb Date: Wed, 7 Jul 2021 08:29:42 -0700 Subject: [PATCH 5/5] refactor(worker): broaden exception handling --- .../services/kinesis/clientlibrary/lib/worker/Worker.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 70b232b7..c72148e0 100644 --- a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -619,12 +619,11 @@ public class Worker implements Runnable { lastException = result.getException(); handleMissingIncompleteLeasesException(lastException); } - } catch (MissingIncompleteLeasesException e) { - throw e; } catch (LeasingException e) { LOG.error("Caught exception when initializing LeaseCoordinator", e); lastException = e; } catch (Exception e) { + handleMissingIncompleteLeasesException(e); lastException = e; }