diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 4e90853b..90b6c4de 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -31,10 +31,12 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; +import java.util.stream.Collectors; import com.amazonaws.services.kinesis.leases.exceptions.DependencyException; import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException; +import com.amazonaws.services.kinesis.leases.impl.Lease; import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -502,29 +504,29 @@ public class Worker implements Runnable { } Instant start = Instant.now(); - List initialLeases = getExistingLeases(); + List initialLeaseKeys = getExistingLeaseKeys(); while (!shouldShutdown()) { runProcessLoop(); - shutdownIfFivetranReady(start, initialLeases); + shutdownIfFivetranReady(start, initialLeaseKeys); } finalShutdown(); LOG.info("Worker loop is complete. Exiting from worker."); } - private List getExistingLeases() { + private List getExistingLeaseKeys() { try { - return leaseCoordinator.getLeaseManager().listLeases(); + return leaseCoordinator.getLeaseManager().listLeases().stream().map(Lease::getLeaseKey).collect(Collectors.toList()); } catch (DependencyException | InvalidStateException | ProvisionedThroughputException e) { throw new RuntimeException("Unable to list leases to determine when to stop worker"); } } - void shutdownIfFivetranReady(Instant start, List initialLeases) { + void shutdownIfFivetranReady(Instant start, List initialLeases) { boolean initiateShutdown = false; - List currentLeases = getExistingLeases(); + List currentLeases = getExistingLeaseKeys(); boolean initialLeasesComplete = currentLeases.stream().noneMatch(initialLeases::contains); if (exitAfterInitialLeasesComplete && initialLeasesComplete) {