fix(Worker): only save lease keys

This commit is contained in:
glarwood 2019-03-20 16:57:19 +00:00
parent 327f0722f5
commit 0f78ff0bae

View file

@ -31,10 +31,12 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.stream.Collectors;
import com.amazonaws.services.kinesis.leases.exceptions.DependencyException; import com.amazonaws.services.kinesis.leases.exceptions.DependencyException;
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException; import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException;
import com.amazonaws.services.kinesis.leases.impl.Lease;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -502,29 +504,29 @@ public class Worker implements Runnable {
} }
Instant start = Instant.now(); Instant start = Instant.now();
List<KinesisClientLease> initialLeases = getExistingLeases(); List<String> initialLeaseKeys = getExistingLeaseKeys();
while (!shouldShutdown()) { while (!shouldShutdown()) {
runProcessLoop(); runProcessLoop();
shutdownIfFivetranReady(start, initialLeases); shutdownIfFivetranReady(start, initialLeaseKeys);
} }
finalShutdown(); finalShutdown();
LOG.info("Worker loop is complete. Exiting from worker."); LOG.info("Worker loop is complete. Exiting from worker.");
} }
private List<KinesisClientLease> getExistingLeases() { private List<String> getExistingLeaseKeys() {
try { try {
return leaseCoordinator.getLeaseManager().listLeases(); return leaseCoordinator.getLeaseManager().listLeases().stream().map(Lease::getLeaseKey).collect(Collectors.toList());
} catch (DependencyException | InvalidStateException | ProvisionedThroughputException e) { } catch (DependencyException | InvalidStateException | ProvisionedThroughputException e) {
throw new RuntimeException("Unable to list leases to determine when to stop worker"); throw new RuntimeException("Unable to list leases to determine when to stop worker");
} }
} }
void shutdownIfFivetranReady(Instant start, List<KinesisClientLease> initialLeases) { void shutdownIfFivetranReady(Instant start, List<String> initialLeases) {
boolean initiateShutdown = false; boolean initiateShutdown = false;
List<KinesisClientLease> currentLeases = getExistingLeases(); List<String> currentLeases = getExistingLeaseKeys();
boolean initialLeasesComplete = currentLeases.stream().noneMatch(initialLeases::contains); boolean initialLeasesComplete = currentLeases.stream().noneMatch(initialLeases::contains);
if (exitAfterInitialLeasesComplete && initialLeasesComplete) { if (exitAfterInitialLeasesComplete && initialLeasesComplete) {