diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index 85234f7f..395882d9 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -363,14 +363,14 @@ public class Scheduler implements Runnable { long waitTime = ThreadLocalRandom.current().nextLong(MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS, MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS); long waitUntil = System.currentTimeMillis() + waitTime; - boolean isLeaseTableEmpty = true; - while (System.currentTimeMillis() < waitUntil && (isLeaseTableEmpty = leaseRefresher.isLeaseTableEmpty())) { + boolean shouldInitiateLeaseSync = true; + while (System.currentTimeMillis() < waitUntil && (shouldInitiateLeaseSync = leaseRefresher.isLeaseTableEmpty())) { // check every 3 seconds if lease table is still empty, // to minimize contention between all workers bootstrapping at the same time log.info("Lease table is still empty. Checking again in {} ms", LEASE_TABLE_CHECK_FREQUENCY_MILLIS); Thread.sleep(LEASE_TABLE_CHECK_FREQUENCY_MILLIS); } - return isLeaseTableEmpty; + return shouldInitiateLeaseSync; } private void waitUntilHashRangeCovered() throws InterruptedException { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DeterministicShuffleShardSyncLeaderDeciderTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DeterministicShuffleShardSyncLeaderDeciderTest.java index b6ff3a0d..2e228a8a 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DeterministicShuffleShardSyncLeaderDeciderTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DeterministicShuffleShardSyncLeaderDeciderTest.java @@ -34,6 +34,7 @@ import java.util.stream.Collectors; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.booleanThat; import static org.mockito.Mockito.when; import static software.amazon.kinesis.coordinator.DeterministicShuffleShardSyncLeaderDecider.DETERMINISTIC_SHUFFLE_SEED; @@ -72,10 +73,18 @@ public class DeterministicShuffleShardSyncLeaderDeciderTest { assertTrue("IsLeader should return true if no leases are returned", isLeader); } + @Test + public void testleaderElectionWithEmptyOwnerLeases() throws Exception { + List leases = getLeases(5, true, true, true); + when(leaseRefresher.listLeases()).thenReturn(leases); + boolean isLeader = leaderDecider.isLeader(WORKER_ID); + assertTrue("IsLeader should return false if leases have no owner", isLeader); + } + @Test public void testElectedLeadersAsPerExpectedShufflingOrder() throws Exception { - List leases = getLeases(5, false /* duplicateLeaseOwner */, true /* activeLeases */); + List leases = getLeases(5, false /*emptyLeaseOwner */,false /* duplicateLeaseOwner */, true /* activeLeases */); when(leaseRefresher.listLeases()).thenReturn(leases); Set expectedLeaders = getExpectedLeaders(leases); for (String leader : expectedLeaders) { @@ -92,7 +101,7 @@ public class DeterministicShuffleShardSyncLeaderDeciderTest { public void testElectedLeadersAsPerExpectedShufflingOrderWhenUniqueWorkersLessThanMaxLeaders() { this.numShardSyncWorkers = 5; // More than number of unique lease owners leaderDecider = new DeterministicShuffleShardSyncLeaderDecider(leaseRefresher, scheduledExecutorService, numShardSyncWorkers); - List leases = getLeases(3, false /* duplicateLeaseOwner */, true /* activeLeases */); + List leases = getLeases(3, false /*emptyLeaseOwner */, false /* duplicateLeaseOwner */, true /* activeLeases */); Set expectedLeaders = getExpectedLeaders(leases); // All lease owners should be present in expected leaders set, and they should all be leaders. for (Lease lease : leases) { @@ -101,14 +110,16 @@ public class DeterministicShuffleShardSyncLeaderDeciderTest { } } - private List getLeases(int count, boolean duplicateLeaseOwner, boolean activeLeases) { + private List getLeases(int count, boolean emptyLeaseOwner, boolean duplicateLeaseOwner, boolean activeLeases) { List leases = new ArrayList<>(); for (int i = 0; i < count; i++) { Lease lease = new Lease(); lease.leaseKey(LEASE_KEY + i); lease.checkpoint(activeLeases ? ExtendedSequenceNumber.LATEST : ExtendedSequenceNumber.SHARD_END); lease.leaseCounter(new Random().nextLong()); - lease.leaseOwner(LEASE_OWNER + (duplicateLeaseOwner ? "" : i)); + if (!emptyLeaseOwner) { + lease.leaseOwner(LEASE_OWNER + (duplicateLeaseOwner ? "" : i)); + } leases.add(lease); } return leases;