fix(worker): fail on incomplete garbage lease
This commit is contained in:
parent
c8a1797b7e
commit
981bacd512
5 changed files with 66 additions and 8 deletions
1
.gitignore
vendored
1
.gitignore
vendored
|
|
@ -1,3 +1,4 @@
|
||||||
target/
|
target/
|
||||||
AwsCredentials.properties
|
AwsCredentials.properties
|
||||||
.idea
|
.idea
|
||||||
|
amazon-kinesis-client.iml
|
||||||
|
|
@ -0,0 +1,16 @@
|
||||||
|
package com.fivetran.external.com.amazonaws.services.kinesis.clientlibrary.exceptions;
|
||||||
|
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
public class MissingIncompleteLeasesException extends RuntimeException {
|
||||||
|
private final Set<String> leases;
|
||||||
|
|
||||||
|
public MissingIncompleteLeasesException(Set<String> leases) {
|
||||||
|
super("missing leases: " + String.join(",", leases));
|
||||||
|
this.leases = leases;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Set<String> getLeases() {
|
||||||
|
return leases;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -30,6 +30,7 @@ import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
|
||||||
|
import com.fivetran.external.com.amazonaws.services.kinesis.clientlibrary.exceptions.MissingIncompleteLeasesException;
|
||||||
import com.fivetran.external.com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.KinesisClientLibIOException;
|
import com.fivetran.external.com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.KinesisClientLibIOException;
|
||||||
import com.fivetran.external.com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
|
import com.fivetran.external.com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
|
||||||
import com.fivetran.external.com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
|
import com.fivetran.external.com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
|
||||||
|
|
@ -610,10 +611,11 @@ class ShardSyncer {
|
||||||
garbageLeases.add(lease);
|
garbageLeases.add(lease);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!garbageLeases.isEmpty()) {
|
if (!garbageLeases.isEmpty()) {
|
||||||
|
Set<String> unfinishedAndMissing = new HashSet<>();
|
||||||
LOG.info("Found " + garbageLeases.size()
|
LOG.info("Found " + garbageLeases.size()
|
||||||
+ " candidate leases for cleanup. Refreshing list of"
|
+ " candidate leases for cleanup. Refreshing list of"
|
||||||
+ " Kinesis shards to pick up recent/latest shards");
|
+ " Kinesis shards to pick up recent/latest shards");
|
||||||
List<Shard> currentShardList = getShardList(kinesisProxy);
|
List<Shard> currentShardList = getShardList(kinesisProxy);
|
||||||
Set<String> currentKinesisShardIds = new HashSet<>();
|
Set<String> currentKinesisShardIds = new HashSet<>();
|
||||||
|
|
@ -623,11 +625,17 @@ class ShardSyncer {
|
||||||
|
|
||||||
for (KinesisClientLease lease : garbageLeases) {
|
for (KinesisClientLease lease : garbageLeases) {
|
||||||
if (isCandidateForCleanup(lease, currentKinesisShardIds)) {
|
if (isCandidateForCleanup(lease, currentKinesisShardIds)) {
|
||||||
LOG.info("Deleting lease for shard " + lease.getLeaseKey()
|
if (lease.isCompete()) {
|
||||||
+ " as it is not present in Kinesis stream.");
|
LOG.info("Deleting lease for a complete shard " + lease.getLeaseKey()
|
||||||
leaseManager.deleteLease(lease);
|
+ " as it is not present in Kinesis stream.");
|
||||||
|
leaseManager.deleteLease(lease);
|
||||||
|
} else {
|
||||||
|
unfinishedAndMissing.add(lease.getLeaseKey());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!unfinishedAndMissing.isEmpty()) throw new MissingIncompleteLeasesException(unfinishedAndMissing);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
@ -694,7 +702,7 @@ class ShardSyncer {
|
||||||
Set<String> shardIdsOfClosedShards = new HashSet<>();
|
Set<String> shardIdsOfClosedShards = new HashSet<>();
|
||||||
List<KinesisClientLease> leasesOfClosedShards = new ArrayList<>();
|
List<KinesisClientLease> leasesOfClosedShards = new ArrayList<>();
|
||||||
for (KinesisClientLease lease : currentLeases) {
|
for (KinesisClientLease lease : currentLeases) {
|
||||||
if (lease.getCheckpoint().equals(ExtendedSequenceNumber.SHARD_END)) {
|
if (lease.isCompete()) {
|
||||||
shardIdsOfClosedShards.add(lease.getLeaseKey());
|
shardIdsOfClosedShards.add(lease.getLeaseKey());
|
||||||
leasesOfClosedShards.add(lease);
|
leasesOfClosedShards.add(lease);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -32,6 +32,7 @@ import java.util.function.Consumer;
|
||||||
|
|
||||||
import com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException;
|
import com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException;
|
||||||
import com.amazonaws.services.dynamodbv2.model.TrimmedDataAccessException;
|
import com.amazonaws.services.dynamodbv2.model.TrimmedDataAccessException;
|
||||||
|
import com.fivetran.external.com.amazonaws.services.kinesis.clientlibrary.exceptions.MissingIncompleteLeasesException;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
@ -528,6 +529,7 @@ public class Worker implements Runnable {
|
||||||
throw e;
|
throw e;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
handleTrimmedDataAccessException(e);
|
handleTrimmedDataAccessException(e);
|
||||||
|
handleMissingIncompleteLeasesException(e);
|
||||||
|
|
||||||
if (causedByStreamRecordProcessingError(e))
|
if (causedByStreamRecordProcessingError(e))
|
||||||
throw new RuntimeException("Failing worker after irrecoverable failure: " + e.getMessage());
|
throw new RuntimeException("Failing worker after irrecoverable failure: " + e.getMessage());
|
||||||
|
|
@ -561,12 +563,36 @@ public class Worker implements Runnable {
|
||||||
}
|
}
|
||||||
|
|
||||||
private Optional<TrimmedDataAccessException> getTrimmedDataAccessException(Throwable t) {
|
private Optional<TrimmedDataAccessException> getTrimmedDataAccessException(Throwable t) {
|
||||||
if (t.getCause() == null) return Optional.empty();
|
if (t.getClass().equals(TrimmedDataAccessException.class)) {
|
||||||
if (t.getCause().getClass().equals(TrimmedDataAccessException.class)) return Optional.of( (TrimmedDataAccessException) t.getCause());
|
return Optional.of( (TrimmedDataAccessException) t);
|
||||||
|
}
|
||||||
|
if (t.getCause().getClass().equals(TrimmedDataAccessException.class)) {
|
||||||
|
return Optional.of( (TrimmedDataAccessException) t.getCause());
|
||||||
|
} else if (t.getCause() == null) {
|
||||||
|
return Optional.empty();
|
||||||
|
}
|
||||||
|
|
||||||
return getTrimmedDataAccessException(t.getCause());
|
return getTrimmedDataAccessException(t.getCause());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void handleMissingIncompleteLeasesException(Exception e) {
|
||||||
|
Optional<MissingIncompleteLeasesException> maybeMissingLeaseException = getMissingIncompleteLeasesException(e);
|
||||||
|
if (maybeMissingLeaseException.isPresent()) throw maybeMissingLeaseException.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
private Optional<MissingIncompleteLeasesException> getMissingIncompleteLeasesException(Throwable t) {
|
||||||
|
if (t.getClass().equals(MissingIncompleteLeasesException.class)) {
|
||||||
|
return Optional.of( (MissingIncompleteLeasesException) t);
|
||||||
|
}
|
||||||
|
if (t.getCause().getClass().equals(MissingIncompleteLeasesException.class)) {
|
||||||
|
return Optional.of( (MissingIncompleteLeasesException) t.getCause());
|
||||||
|
} else if (t.getCause() == null) {
|
||||||
|
return Optional.empty();
|
||||||
|
}
|
||||||
|
|
||||||
|
return getMissingIncompleteLeasesException(t.getCause());
|
||||||
|
}
|
||||||
|
|
||||||
private void initialize() {
|
private void initialize() {
|
||||||
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.INITIALIZING);
|
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.INITIALIZING);
|
||||||
boolean isDone = false;
|
boolean isDone = false;
|
||||||
|
|
@ -600,7 +626,10 @@ public class Worker implements Runnable {
|
||||||
isDone = true;
|
isDone = true;
|
||||||
} else {
|
} else {
|
||||||
lastException = result.getException();
|
lastException = result.getException();
|
||||||
|
handleMissingIncompleteLeasesException(lastException);
|
||||||
}
|
}
|
||||||
|
} catch (MissingIncompleteLeasesException e) {
|
||||||
|
throw e;
|
||||||
} catch (LeasingException e) {
|
} catch (LeasingException e) {
|
||||||
LOG.error("Caught exception when initializing LeaseCoordinator", e);
|
LOG.error("Caught exception when initializing LeaseCoordinator", e);
|
||||||
lastException = e;
|
lastException = e;
|
||||||
|
|
|
||||||
|
|
@ -71,6 +71,10 @@ public class KinesisClientLease extends Lease {
|
||||||
setParentShardIds(casted.parentShardIds);
|
setParentShardIds(casted.parentShardIds);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isCompete() {
|
||||||
|
return checkpoint.equals(ExtendedSequenceNumber.SHARD_END);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return most recently application-supplied checkpoint value. During fail over, the new worker will pick up after
|
* @return most recently application-supplied checkpoint value. During fail over, the new worker will pick up after
|
||||||
* the old worker's last checkpoint.
|
* the old worker's last checkpoint.
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue