Merge pull request #7 from fivetran/failure-on-garbage-incomplete-leases
fix(worker): fail on garbage incomplete leases
This commit is contained in:
commit
92ab3a610e
5 changed files with 59 additions and 11 deletions
1
.gitignore
vendored
1
.gitignore
vendored
|
|
@ -1,3 +1,4 @@
|
|||
target/
|
||||
AwsCredentials.properties
|
||||
.idea
|
||||
amazon-kinesis-client.iml
|
||||
|
|
@ -0,0 +1,16 @@
|
|||
package com.fivetran.external.com.amazonaws.services.kinesis.clientlibrary.exceptions;
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
public class MissingIncompleteLeasesException extends RuntimeException {
|
||||
private final Set<String> leases;
|
||||
|
||||
public MissingIncompleteLeasesException(Set<String> leases) {
|
||||
super("missing leases: " + String.join(",", leases));
|
||||
this.leases = leases;
|
||||
}
|
||||
|
||||
public Set<String> getLeases() {
|
||||
return leases;
|
||||
}
|
||||
}
|
||||
|
|
@ -30,6 +30,7 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import com.fivetran.external.com.amazonaws.services.kinesis.clientlibrary.exceptions.MissingIncompleteLeasesException;
|
||||
import com.fivetran.external.com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.KinesisClientLibIOException;
|
||||
import com.fivetran.external.com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
|
||||
import com.fivetran.external.com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
|
||||
|
|
@ -610,10 +611,11 @@ class ShardSyncer {
|
|||
garbageLeases.add(lease);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
if (!garbageLeases.isEmpty()) {
|
||||
Set<String> unfinishedAndMissing = new HashSet<>();
|
||||
LOG.info("Found " + garbageLeases.size()
|
||||
+ " candidate leases for cleanup. Refreshing list of"
|
||||
+ " candidate leases for cleanup. Refreshing list of"
|
||||
+ " Kinesis shards to pick up recent/latest shards");
|
||||
List<Shard> currentShardList = getShardList(kinesisProxy);
|
||||
Set<String> currentKinesisShardIds = new HashSet<>();
|
||||
|
|
@ -623,11 +625,17 @@ class ShardSyncer {
|
|||
|
||||
for (KinesisClientLease lease : garbageLeases) {
|
||||
if (isCandidateForCleanup(lease, currentKinesisShardIds)) {
|
||||
LOG.info("Deleting lease for shard " + lease.getLeaseKey()
|
||||
+ " as it is not present in Kinesis stream.");
|
||||
leaseManager.deleteLease(lease);
|
||||
if (lease.isComplete()) {
|
||||
LOG.info("Deleting lease for a complete shard " + lease.getLeaseKey()
|
||||
+ " as it is not present in Kinesis stream.");
|
||||
leaseManager.deleteLease(lease);
|
||||
} else {
|
||||
unfinishedAndMissing.add(lease.getLeaseKey());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!unfinishedAndMissing.isEmpty()) throw new MissingIncompleteLeasesException(unfinishedAndMissing);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -694,7 +702,7 @@ class ShardSyncer {
|
|||
Set<String> shardIdsOfClosedShards = new HashSet<>();
|
||||
List<KinesisClientLease> leasesOfClosedShards = new ArrayList<>();
|
||||
for (KinesisClientLease lease : currentLeases) {
|
||||
if (lease.getCheckpoint().equals(ExtendedSequenceNumber.SHARD_END)) {
|
||||
if (lease.isComplete()) {
|
||||
shardIdsOfClosedShards.add(lease.getLeaseKey());
|
||||
leasesOfClosedShards.add(lease);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -32,6 +32,7 @@ import java.util.function.Consumer;
|
|||
|
||||
import com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException;
|
||||
import com.amazonaws.services.dynamodbv2.model.TrimmedDataAccessException;
|
||||
import com.fivetran.external.com.amazonaws.services.kinesis.clientlibrary.exceptions.MissingIncompleteLeasesException;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
|
@ -485,6 +486,10 @@ public class Worker implements Runnable {
|
|||
try {
|
||||
initialize();
|
||||
LOG.info("Initialization complete. Starting worker loop.");
|
||||
} catch (MissingIncompleteLeasesException e1){
|
||||
LOG.error("Unable to initialize because some incomplete leases were missing. Shutting down.", e1);
|
||||
shutdown();
|
||||
throw e1;
|
||||
} catch (RuntimeException e1) {
|
||||
LOG.error("Unable to initialize after " + MAX_INITIALIZATION_ATTEMPTS + " attempts. Shutting down.", e1);
|
||||
shutdown();
|
||||
|
|
@ -528,6 +533,7 @@ public class Worker implements Runnable {
|
|||
throw e;
|
||||
} catch (Exception e) {
|
||||
handleTrimmedDataAccessException(e);
|
||||
handleMissingIncompleteLeasesException(e);
|
||||
|
||||
if (causedByStreamRecordProcessingError(e))
|
||||
throw new RuntimeException("Failing worker after irrecoverable failure: " + e.getMessage());
|
||||
|
|
@ -556,15 +562,26 @@ public class Worker implements Runnable {
|
|||
}
|
||||
|
||||
private void handleTrimmedDataAccessException(Exception e) {
|
||||
Optional<TrimmedDataAccessException> maybeTrimmedException = getTrimmedDataAccessException(e);
|
||||
Optional<TrimmedDataAccessException> maybeTrimmedException = getCauseOfType(TrimmedDataAccessException.class, e);
|
||||
if (maybeTrimmedException.isPresent()) throw maybeTrimmedException.get();
|
||||
}
|
||||
|
||||
private Optional<TrimmedDataAccessException> getTrimmedDataAccessException(Throwable t) {
|
||||
if (t.getCause() == null) return Optional.empty();
|
||||
if (t.getCause().getClass().equals(TrimmedDataAccessException.class)) return Optional.of( (TrimmedDataAccessException) t.getCause());
|
||||
private void handleMissingIncompleteLeasesException(Exception e) {
|
||||
Optional<MissingIncompleteLeasesException> maybeMissingLeaseException = getCauseOfType(MissingIncompleteLeasesException.class, e);
|
||||
if (maybeMissingLeaseException.isPresent()) throw maybeMissingLeaseException.get();
|
||||
}
|
||||
|
||||
return getTrimmedDataAccessException(t.getCause());
|
||||
private <T extends Throwable> Optional<T> getCauseOfType(Class<T> clazz, Throwable t) {
|
||||
if (t.getClass().equals(clazz)) {
|
||||
return Optional.of( (T) t);
|
||||
}
|
||||
if (t.getCause().getClass().equals(clazz)) {
|
||||
return Optional.of( (T) t.getCause());
|
||||
} else if (t.getCause() == null) {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
return getCauseOfType(clazz, t.getCause());
|
||||
}
|
||||
|
||||
private void initialize() {
|
||||
|
|
@ -600,11 +617,13 @@ public class Worker implements Runnable {
|
|||
isDone = true;
|
||||
} else {
|
||||
lastException = result.getException();
|
||||
handleMissingIncompleteLeasesException(lastException);
|
||||
}
|
||||
} catch (LeasingException e) {
|
||||
LOG.error("Caught exception when initializing LeaseCoordinator", e);
|
||||
lastException = e;
|
||||
} catch (Exception e) {
|
||||
handleMissingIncompleteLeasesException(e);
|
||||
lastException = e;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -71,6 +71,10 @@ public class KinesisClientLease extends Lease {
|
|||
setParentShardIds(casted.parentShardIds);
|
||||
}
|
||||
|
||||
public boolean isComplete() {
|
||||
return checkpoint.equals(ExtendedSequenceNumber.SHARD_END);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return most recently application-supplied checkpoint value. During fail over, the new worker will pick up after
|
||||
* the old worker's last checkpoint.
|
||||
|
|
|
|||
Loading…
Reference in a new issue