Added an API on LeaseCoordinator and LeaseTaker to get all leases for… (#428)

* Added an API on LeaseCoordinator and LeaseTaker to get all leases for the application
This commit is contained in:
shask-amazon 2018-10-09 07:56:13 -07:00 committed by Justin Pfifer
parent e972617bfc
commit 31ab0af901
6 changed files with 93 additions and 1 deletions

View file

@ -16,6 +16,7 @@
package software.amazon.kinesis.leases;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
@ -124,6 +125,17 @@ public interface LeaseCoordinator {
*/
List<ShardInfo> getCurrentAssignments();
/**
* Default implementation returns an empty list and concrete implementation is expected to return all leases
* for the application that are in the lease table. This enables application managing Kcl Scheduler to take care of
* horizontal scaling for example.
*
* @return all leases for the application that are in the lease table
*/
default List<Lease> allLeases() {
return Collections.emptyList();
}
/**
* @param writeCapacity The DynamoDB table used for tracking leases will be provisioned with the specified initial
* write capacity

View file

@ -14,6 +14,8 @@
*/
package software.amazon.kinesis.leases;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import software.amazon.kinesis.leases.exceptions.DependencyException;
@ -45,4 +47,13 @@ public interface LeaseTaker {
*/
String getWorkerIdentifier();
/**
* Default implementation returns an empty list and concrete implementaion is expected to return all leases
* for the application that are in the lease table either by reading lease table or from an internal cache.
*
* @return all leases for the application that are in the lease table
*/
default List<Lease> allLeases() {
return Collections.emptyList();
}
}

View file

@ -278,6 +278,11 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
return leaseRenewer.getCurrentlyHeldLeases().values();
}
@Override
public List<Lease> allLeases() {
return leaseTaker.allLeases();
}
@Override
public Lease getCurrentlyHeldLease(String leaseKey) {
return leaseRenewer.getCurrentlyHeldLease(leaseKey);

View file

@ -530,4 +530,12 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
public String getWorkerIdentifier() {
return workerIdentifier;
}
/**
* {@inheritDoc}
*/
@Override
public synchronized List<Lease> allLeases() {
return new ArrayList<>(allLeases.values());
}
}

View file

@ -14,12 +14,15 @@
*/
package software.amazon.kinesis.leases.dynamodb;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@ -136,6 +139,28 @@ public class DynamoDBLeaseCoordinatorIntegrationTest {
assertEquals(lease, fromDynamo);
}
/**
* Tests if getAllAssignments() returns all leases
*/
@Test
public void testGetAllAssignments() throws Exception {
TestHarnessBuilder builder = new TestHarnessBuilder();
Map<String, Lease> addedLeases = builder.withLease("1", WORKER_ID)
.withLease("2", WORKER_ID)
.withLease("3", WORKER_ID)
.withLease("4", WORKER_ID)
.withLease("5", WORKER_ID)
.build();
// Run the taker
coordinator.runLeaseTaker();
List<Lease> allLeases = coordinator.allLeases();
assertThat(allLeases.size(), equalTo(addedLeases.size()));
assertThat(allLeases.containsAll(addedLeases.values()), equalTo(true));
}
/**
* Tests updateCheckpoint when the lease has changed out from under us.
*/

View file

@ -14,11 +14,11 @@
*/
package software.amazon.kinesis.leases.dynamodb;
import java.util.Collection;
import java.util.Map;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import software.amazon.kinesis.leases.Lease;
@ -26,6 +26,11 @@ import software.amazon.kinesis.leases.LeaseIntegrationTest;
import software.amazon.kinesis.leases.exceptions.LeasingException;
import software.amazon.kinesis.metrics.NullMetricsFactory;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
public class DynamoDBLeaseTakerIntegrationTest extends LeaseIntegrationTest {
private static final long LEASE_DURATION_MILLIS = 1000L;
@ -100,6 +105,32 @@ public class DynamoDBLeaseTakerIntegrationTest extends LeaseIntegrationTest {
builder.takeMutateAssert(taker, 2);
}
/**
* Verify that when getAllLeases() is called, DynamoDBLeaseTaker
* - does not call listLeases()
* - returns cached result was built during takeLeases() operation to return result
*/
@Test
public void testGetAllLeases() throws LeasingException {
TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher);
Map<String, Lease> addedLeases = builder.withLease("1", "bar")
.withLease("2", "bar")
.withLease("3", "baz")
.withLease("4", "baz")
.withLease("5", "foo")
.build();
// In the current DynamoDBLeaseTaker implementation getAllLeases() gets leases from an internal cache that is built during takeLeases() operation
assertThat(taker.allLeases().size(), equalTo(0));
taker.takeLeases();
Collection<Lease> allLeases = taker.allLeases();
assertThat(allLeases.size(), equalTo(addedLeases.size()));
assertThat(addedLeases.values().containsAll(allLeases), equalTo(true));
}
/**
* Verify that LeaseTaker does not steal when it's only short 1 lease and the other worker is at target. Set up a
* scenario where there are 4 leases held by two servers, and a third server with one lease. The third server should