diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCoordinator.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCoordinator.java index e7573aec..482555b3 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCoordinator.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCoordinator.java @@ -16,6 +16,7 @@ package software.amazon.kinesis.leases; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.UUID; @@ -124,6 +125,17 @@ public interface LeaseCoordinator { */ List getCurrentAssignments(); + /** + * Default implementation returns an empty list and concrete implementation is expected to return all leases + * for the application that are in the lease table. This enables application managing Kcl Scheduler to take care of + * horizontal scaling for example. + * + * @return all leases for the application that are in the lease table + */ + default List allLeases() { + return Collections.emptyList(); + } + /** * @param writeCapacity The DynamoDB table used for tracking leases will be provisioned with the specified initial * write capacity diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseTaker.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseTaker.java index 9d00ff17..394375b3 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseTaker.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseTaker.java @@ -14,6 +14,8 @@ */ package software.amazon.kinesis.leases; +import java.util.Collections; +import java.util.List; import java.util.Map; import software.amazon.kinesis.leases.exceptions.DependencyException; @@ -45,4 +47,13 @@ public interface LeaseTaker { */ String getWorkerIdentifier(); + /** + * Default implementation returns an empty list and concrete implementaion is expected to return all leases + * for the application that are in the lease table either by reading lease table or from an internal cache. + * + * @return all leases for the application that are in the lease table + */ + default List allLeases() { + return Collections.emptyList(); + } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java index 12ca3a01..20a66e9a 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java @@ -278,6 +278,11 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator { return leaseRenewer.getCurrentlyHeldLeases().values(); } + @Override + public List allLeases() { + return leaseTaker.allLeases(); + } + @Override public Lease getCurrentlyHeldLease(String leaseKey) { return leaseRenewer.getCurrentlyHeldLease(leaseKey); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java index 2d52fe2a..165ad01d 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java @@ -530,4 +530,12 @@ public class DynamoDBLeaseTaker implements LeaseTaker { public String getWorkerIdentifier() { return workerIdentifier; } + + /** + * {@inheritDoc} + */ + @Override + public synchronized List allLeases() { + return new ArrayList<>(allLeases.values()); + } } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorIntegrationTest.java index 051e8000..abadd4a8 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorIntegrationTest.java @@ -14,12 +14,15 @@ */ package software.amazon.kinesis.leases.dynamodb; +import static org.hamcrest.CoreMatchers.equalTo; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -136,6 +139,28 @@ public class DynamoDBLeaseCoordinatorIntegrationTest { assertEquals(lease, fromDynamo); } + /** + * Tests if getAllAssignments() returns all leases + */ + @Test + public void testGetAllAssignments() throws Exception { + TestHarnessBuilder builder = new TestHarnessBuilder(); + + Map addedLeases = builder.withLease("1", WORKER_ID) + .withLease("2", WORKER_ID) + .withLease("3", WORKER_ID) + .withLease("4", WORKER_ID) + .withLease("5", WORKER_ID) + .build(); + + // Run the taker + coordinator.runLeaseTaker(); + + List allLeases = coordinator.allLeases(); + assertThat(allLeases.size(), equalTo(addedLeases.size())); + assertThat(allLeases.containsAll(addedLeases.values()), equalTo(true)); + } + /** * Tests updateCheckpoint when the lease has changed out from under us. */ diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerIntegrationTest.java index ba77d26d..871918b5 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerIntegrationTest.java @@ -14,11 +14,11 @@ */ package software.amazon.kinesis.leases.dynamodb; +import java.util.Collection; import java.util.Map; import org.junit.Assert; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import software.amazon.kinesis.leases.Lease; @@ -26,6 +26,11 @@ import software.amazon.kinesis.leases.LeaseIntegrationTest; import software.amazon.kinesis.leases.exceptions.LeasingException; import software.amazon.kinesis.metrics.NullMetricsFactory; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + public class DynamoDBLeaseTakerIntegrationTest extends LeaseIntegrationTest { private static final long LEASE_DURATION_MILLIS = 1000L; @@ -100,6 +105,32 @@ public class DynamoDBLeaseTakerIntegrationTest extends LeaseIntegrationTest { builder.takeMutateAssert(taker, 2); } + /** + * Verify that when getAllLeases() is called, DynamoDBLeaseTaker + * - does not call listLeases() + * - returns cached result was built during takeLeases() operation to return result + */ + @Test + public void testGetAllLeases() throws LeasingException { + TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher); + + Map addedLeases = builder.withLease("1", "bar") + .withLease("2", "bar") + .withLease("3", "baz") + .withLease("4", "baz") + .withLease("5", "foo") + .build(); + + // In the current DynamoDBLeaseTaker implementation getAllLeases() gets leases from an internal cache that is built during takeLeases() operation + assertThat(taker.allLeases().size(), equalTo(0)); + + taker.takeLeases(); + + Collection allLeases = taker.allLeases(); + assertThat(allLeases.size(), equalTo(addedLeases.size())); + assertThat(addedLeases.values().containsAll(allLeases), equalTo(true)); + } + /** * Verify that LeaseTaker does not steal when it's only short 1 lease and the other worker is at target. Set up a * scenario where there are 4 leases held by two servers, and a third server with one lease. The third server should