Addressing comments about unit tests
This commit is contained in:
parent
bb495e4f60
commit
35be75c347
2 changed files with 18 additions and 7 deletions
|
|
@ -363,14 +363,14 @@ public class Scheduler implements Runnable {
|
|||
long waitTime = ThreadLocalRandom.current().nextLong(MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS, MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS);
|
||||
long waitUntil = System.currentTimeMillis() + waitTime;
|
||||
|
||||
boolean isLeaseTableEmpty = true;
|
||||
while (System.currentTimeMillis() < waitUntil && (isLeaseTableEmpty = leaseRefresher.isLeaseTableEmpty())) {
|
||||
boolean shouldInitiateLeaseSync = true;
|
||||
while (System.currentTimeMillis() < waitUntil && (shouldInitiateLeaseSync = leaseRefresher.isLeaseTableEmpty())) {
|
||||
// check every 3 seconds if lease table is still empty,
|
||||
// to minimize contention between all workers bootstrapping at the same time
|
||||
log.info("Lease table is still empty. Checking again in {} ms", LEASE_TABLE_CHECK_FREQUENCY_MILLIS);
|
||||
Thread.sleep(LEASE_TABLE_CHECK_FREQUENCY_MILLIS);
|
||||
}
|
||||
return isLeaseTableEmpty;
|
||||
return shouldInitiateLeaseSync;
|
||||
}
|
||||
|
||||
private void waitUntilHashRangeCovered() throws InterruptedException {
|
||||
|
|
|
|||
|
|
@ -34,6 +34,7 @@ import java.util.stream.Collectors;
|
|||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Matchers.booleanThat;
|
||||
import static org.mockito.Mockito.when;
|
||||
import static software.amazon.kinesis.coordinator.DeterministicShuffleShardSyncLeaderDecider.DETERMINISTIC_SHUFFLE_SEED;
|
||||
|
||||
|
|
@ -72,10 +73,18 @@ public class DeterministicShuffleShardSyncLeaderDeciderTest {
|
|||
assertTrue("IsLeader should return true if no leases are returned", isLeader);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testleaderElectionWithEmptyOwnerLeases() throws Exception {
|
||||
List<Lease> leases = getLeases(5, true, true, true);
|
||||
when(leaseRefresher.listLeases()).thenReturn(leases);
|
||||
boolean isLeader = leaderDecider.isLeader(WORKER_ID);
|
||||
assertTrue("IsLeader should return false if leases have no owner", isLeader);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testElectedLeadersAsPerExpectedShufflingOrder()
|
||||
throws Exception {
|
||||
List<Lease> leases = getLeases(5, false /* duplicateLeaseOwner */, true /* activeLeases */);
|
||||
List<Lease> leases = getLeases(5, false /*emptyLeaseOwner */,false /* duplicateLeaseOwner */, true /* activeLeases */);
|
||||
when(leaseRefresher.listLeases()).thenReturn(leases);
|
||||
Set<String> expectedLeaders = getExpectedLeaders(leases);
|
||||
for (String leader : expectedLeaders) {
|
||||
|
|
@ -92,7 +101,7 @@ public class DeterministicShuffleShardSyncLeaderDeciderTest {
|
|||
public void testElectedLeadersAsPerExpectedShufflingOrderWhenUniqueWorkersLessThanMaxLeaders() {
|
||||
this.numShardSyncWorkers = 5; // More than number of unique lease owners
|
||||
leaderDecider = new DeterministicShuffleShardSyncLeaderDecider(leaseRefresher, scheduledExecutorService, numShardSyncWorkers);
|
||||
List<Lease> leases = getLeases(3, false /* duplicateLeaseOwner */, true /* activeLeases */);
|
||||
List<Lease> leases = getLeases(3, false /*emptyLeaseOwner */, false /* duplicateLeaseOwner */, true /* activeLeases */);
|
||||
Set<String> expectedLeaders = getExpectedLeaders(leases);
|
||||
// All lease owners should be present in expected leaders set, and they should all be leaders.
|
||||
for (Lease lease : leases) {
|
||||
|
|
@ -101,14 +110,16 @@ public class DeterministicShuffleShardSyncLeaderDeciderTest {
|
|||
}
|
||||
}
|
||||
|
||||
private List<Lease> getLeases(int count, boolean duplicateLeaseOwner, boolean activeLeases) {
|
||||
private List<Lease> getLeases(int count, boolean emptyLeaseOwner, boolean duplicateLeaseOwner, boolean activeLeases) {
|
||||
List<Lease> leases = new ArrayList<>();
|
||||
for (int i = 0; i < count; i++) {
|
||||
Lease lease = new Lease();
|
||||
lease.leaseKey(LEASE_KEY + i);
|
||||
lease.checkpoint(activeLeases ? ExtendedSequenceNumber.LATEST : ExtendedSequenceNumber.SHARD_END);
|
||||
lease.leaseCounter(new Random().nextLong());
|
||||
lease.leaseOwner(LEASE_OWNER + (duplicateLeaseOwner ? "" : i));
|
||||
if (!emptyLeaseOwner) {
|
||||
lease.leaseOwner(LEASE_OWNER + (duplicateLeaseOwner ? "" : i));
|
||||
}
|
||||
leases.add(lease);
|
||||
}
|
||||
return leases;
|
||||
|
|
|
|||
Loading…
Reference in a new issue