Merge pull request #801 from Renjuju/fix-indefinitely-blocking-leaderElectionThreadPool-bug-fix

Add acquiredLock flag to prevent unlocking when no lock is held
This commit is contained in:
ashwing 2021-04-02 10:57:37 -07:00 committed by GitHub
commit 668422ccbd
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 73 additions and 26 deletions

View file

@ -14,14 +14,6 @@
*/
package software.amazon.kinesis.coordinator;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import java.time.Instant;
import java.util.Collections;
import java.util.HashSet;
@ -34,6 +26,13 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BooleanSupplier;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
/**
* An implementation of the {@code LeaderDecider} to elect leader(s) based on workerId.
@ -55,7 +54,7 @@ class DeterministicShuffleShardSyncLeaderDecider
private static final long ELECTION_SCHEDULING_INTERVAL_MILLIS = 5 * 60 * 1000;
private static final int AWAIT_TERMINATION_MILLIS = 5000;
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final ReadWriteLock readWriteLock;
private final LeaseRefresher leaseRefresher;
private final int numPeriodicShardSyncWorkers;
@ -68,11 +67,29 @@ class DeterministicShuffleShardSyncLeaderDecider
* @param leaderElectionThreadPool Thread-pool to be used for leaderElection.
* @param numPeriodicShardSyncWorkers Number of leaders that will be elected to perform periodic shard syncs.
*/
DeterministicShuffleShardSyncLeaderDecider(LeaseRefresher leaseRefresher, ScheduledExecutorService leaderElectionThreadPool,
DeterministicShuffleShardSyncLeaderDecider(LeaseRefresher leaseRefresher,
ScheduledExecutorService leaderElectionThreadPool,
int numPeriodicShardSyncWorkers) {
this(leaseRefresher,
leaderElectionThreadPool,
numPeriodicShardSyncWorkers,
new ReentrantReadWriteLock());
}
/**
* @param leaseRefresher LeaseManager instance used to fetch leases.
* @param leaderElectionThreadPool Thread-pool to be used for leaderElection.
* @param numPeriodicShardSyncWorkers Number of leaders that will be elected to perform periodic shard syncs.
* @param readWriteLock Mechanism to lock for reading and writing of critical components
*/
DeterministicShuffleShardSyncLeaderDecider(LeaseRefresher leaseRefresher,
ScheduledExecutorService leaderElectionThreadPool,
int numPeriodicShardSyncWorkers,
ReadWriteLock readWriteLock) {
this.leaseRefresher = leaseRefresher;
this.leaderElectionThreadPool = leaderElectionThreadPool;
this.numPeriodicShardSyncWorkers = numPeriodicShardSyncWorkers;
this.readWriteLock = readWriteLock;
}
/*
@ -80,6 +97,7 @@ class DeterministicShuffleShardSyncLeaderDecider
* as leaders (workers that will perform shard sync).
*/
private void electLeaders() {
boolean acquiredLock = false;
try {
log.debug("Started leader election at: " + Instant.now());
List<Lease> leases = leaseRefresher.listLeases();
@ -91,6 +109,7 @@ class DeterministicShuffleShardSyncLeaderDecider
// In case value is currently being read, we wait for reading to complete before updating the variable.
// This is to prevent any ConcurrentModificationException exceptions.
readWriteLock.writeLock().lock();
acquiredLock = true;
leaders = new HashSet<>(uniqueHosts.subList(0, numShardSyncWorkers));
log.info("Elected leaders: " + String.join(", ", leaders));
log.debug("Completed leader election at: " + Instant.now());
@ -99,9 +118,11 @@ class DeterministicShuffleShardSyncLeaderDecider
} catch (Throwable t) {
log.error("Unknown exception during leader election.", t);
} finally {
if (acquiredLock) {
readWriteLock.writeLock().unlock();
}
}
}
private boolean isWorkerLeaderForShardSync(String workerId) {
return CollectionUtils.isNullOrEmpty(leaders) || leaders.contains(workerId);

View file

@ -40,6 +40,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -279,7 +280,8 @@ public class Scheduler implements Runnable {
.createWorkerStateChangeListener();
}
this.leaderDecider = new DeterministicShuffleShardSyncLeaderDecider(leaseRefresher,
Executors.newSingleThreadScheduledExecutor(), PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT);
Executors.newSingleThreadScheduledExecutor(),
PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT);
this.failoverTimeMillis = this.leaseManagementConfig.failoverTimeMillis();
this.taskBackoffTimeMillis = this.lifecycleConfig.taskBackoffTimeMillis();
// this.retryGetRecordsInSeconds = this.retrievalConfig.retryGetRecordsInSeconds();

View file

@ -14,15 +14,6 @@
*/
package software.amazon.kinesis.coordinator;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@ -30,11 +21,23 @@ import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.booleanThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static software.amazon.kinesis.coordinator.DeterministicShuffleShardSyncLeaderDecider.DETERMINISTIC_SHUFFLE_SEED;
@ -52,12 +55,21 @@ public class DeterministicShuffleShardSyncLeaderDeciderTest {
@Mock
private ScheduledExecutorService scheduledExecutorService;
@Mock
private ReadWriteLock readWriteLock;
private int numShardSyncWorkers;
@Before
public void setup() {
numShardSyncWorkers = 1;
leaderDecider = new DeterministicShuffleShardSyncLeaderDecider(leaseRefresher, scheduledExecutorService, numShardSyncWorkers);
leaderDecider = new DeterministicShuffleShardSyncLeaderDecider(leaseRefresher,
scheduledExecutorService,
numShardSyncWorkers,
readWriteLock);
when(readWriteLock.readLock()).thenReturn(mock(ReentrantReadWriteLock.ReadLock.class));
when(readWriteLock.writeLock()).thenReturn(mock(ReentrantReadWriteLock.WriteLock.class));
}
@Test
@ -73,6 +85,15 @@ public class DeterministicShuffleShardSyncLeaderDeciderTest {
assertTrue("IsLeader should return true if no leases are returned", isLeader);
}
@Test
public void testLeaderElectionDoesNotUseLocksOnListLeasesException() throws Exception {
when(leaseRefresher.listLeases()).thenThrow(new DependencyException("error", new Throwable()));
leaderDecider.isLeader(WORKER_ID);
verify(leaseRefresher, times(1)).listLeases();
verify(readWriteLock.writeLock(), times(0)).lock();
verify(readWriteLock.writeLock(), times(0)).unlock();
}
@Test
public void testleaderElectionWithEmptyOwnerLeases() throws Exception {
List<Lease> leases = getLeases(5, true, true, true);
@ -100,7 +121,10 @@ public class DeterministicShuffleShardSyncLeaderDeciderTest {
@Test
public void testElectedLeadersAsPerExpectedShufflingOrderWhenUniqueWorkersLessThanMaxLeaders() {
this.numShardSyncWorkers = 5; // More than number of unique lease owners
leaderDecider = new DeterministicShuffleShardSyncLeaderDecider(leaseRefresher, scheduledExecutorService, numShardSyncWorkers);
leaderDecider = new DeterministicShuffleShardSyncLeaderDecider(leaseRefresher,
scheduledExecutorService,
numShardSyncWorkers,
readWriteLock);
List<Lease> leases = getLeases(3, false /*emptyLeaseOwner */, false /* duplicateLeaseOwner */, true /* activeLeases */);
Set<String> expectedLeaders = getExpectedLeaders(leases);
// All lease owners should be present in expected leaders set, and they should all be leaders.