From 3adc1c180c7cce52e160b6bedafa8e99c8f55c2f Mon Sep 17 00:00:00 2001 From: Renju Radhakrishnan Date: Fri, 2 Apr 2021 10:18:27 -0700 Subject: [PATCH] Add acquiredLock flag to prevent unlocking when no lock is held --- ...ministicShuffleShardSyncLeaderDecider.java | 45 ++++++++++++----- .../amazon/kinesis/coordinator/Scheduler.java | 4 +- ...sticShuffleShardSyncLeaderDeciderTest.java | 50 ++++++++++++++----- 3 files changed, 73 insertions(+), 26 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DeterministicShuffleShardSyncLeaderDecider.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DeterministicShuffleShardSyncLeaderDecider.java index 720103c6..b06dba39 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DeterministicShuffleShardSyncLeaderDecider.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DeterministicShuffleShardSyncLeaderDecider.java @@ -14,14 +14,6 @@ */ package software.amazon.kinesis.coordinator; -import lombok.extern.slf4j.Slf4j; -import software.amazon.awssdk.utils.CollectionUtils; -import software.amazon.kinesis.leases.Lease; -import software.amazon.kinesis.leases.LeaseRefresher; -import software.amazon.kinesis.leases.exceptions.DependencyException; -import software.amazon.kinesis.leases.exceptions.InvalidStateException; -import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; - import java.time.Instant; import java.util.Collections; import java.util.HashSet; @@ -34,6 +26,13 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.BooleanSupplier; import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; +import software.amazon.awssdk.utils.CollectionUtils; +import software.amazon.kinesis.leases.Lease; +import software.amazon.kinesis.leases.LeaseRefresher; +import software.amazon.kinesis.leases.exceptions.DependencyException; +import software.amazon.kinesis.leases.exceptions.InvalidStateException; +import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; /** * An implementation of the {@code LeaderDecider} to elect leader(s) based on workerId. @@ -55,7 +54,7 @@ class DeterministicShuffleShardSyncLeaderDecider private static final long ELECTION_SCHEDULING_INTERVAL_MILLIS = 5 * 60 * 1000; private static final int AWAIT_TERMINATION_MILLIS = 5000; - private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + private final ReadWriteLock readWriteLock; private final LeaseRefresher leaseRefresher; private final int numPeriodicShardSyncWorkers; @@ -68,11 +67,29 @@ class DeterministicShuffleShardSyncLeaderDecider * @param leaderElectionThreadPool Thread-pool to be used for leaderElection. * @param numPeriodicShardSyncWorkers Number of leaders that will be elected to perform periodic shard syncs. */ - DeterministicShuffleShardSyncLeaderDecider(LeaseRefresher leaseRefresher, ScheduledExecutorService leaderElectionThreadPool, - int numPeriodicShardSyncWorkers) { + DeterministicShuffleShardSyncLeaderDecider(LeaseRefresher leaseRefresher, + ScheduledExecutorService leaderElectionThreadPool, + int numPeriodicShardSyncWorkers) { + this(leaseRefresher, + leaderElectionThreadPool, + numPeriodicShardSyncWorkers, + new ReentrantReadWriteLock()); + } + + /** + * @param leaseRefresher LeaseManager instance used to fetch leases. + * @param leaderElectionThreadPool Thread-pool to be used for leaderElection. + * @param numPeriodicShardSyncWorkers Number of leaders that will be elected to perform periodic shard syncs. + * @param readWriteLock Mechanism to lock for reading and writing of critical components + */ + DeterministicShuffleShardSyncLeaderDecider(LeaseRefresher leaseRefresher, + ScheduledExecutorService leaderElectionThreadPool, + int numPeriodicShardSyncWorkers, + ReadWriteLock readWriteLock) { this.leaseRefresher = leaseRefresher; this.leaderElectionThreadPool = leaderElectionThreadPool; this.numPeriodicShardSyncWorkers = numPeriodicShardSyncWorkers; + this.readWriteLock = readWriteLock; } /* @@ -80,6 +97,7 @@ class DeterministicShuffleShardSyncLeaderDecider * as leaders (workers that will perform shard sync). */ private void electLeaders() { + boolean acquiredLock = false; try { log.debug("Started leader election at: " + Instant.now()); List leases = leaseRefresher.listLeases(); @@ -91,6 +109,7 @@ class DeterministicShuffleShardSyncLeaderDecider // In case value is currently being read, we wait for reading to complete before updating the variable. // This is to prevent any ConcurrentModificationException exceptions. readWriteLock.writeLock().lock(); + acquiredLock = true; leaders = new HashSet<>(uniqueHosts.subList(0, numShardSyncWorkers)); log.info("Elected leaders: " + String.join(", ", leaders)); log.debug("Completed leader election at: " + Instant.now()); @@ -99,7 +118,9 @@ class DeterministicShuffleShardSyncLeaderDecider } catch (Throwable t) { log.error("Unknown exception during leader election.", t); } finally { - readWriteLock.writeLock().unlock(); + if (acquiredLock) { + readWriteLock.writeLock().unlock(); + } } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index d7523549..aa0a5568 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -40,6 +40,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; @@ -279,7 +280,8 @@ public class Scheduler implements Runnable { .createWorkerStateChangeListener(); } this.leaderDecider = new DeterministicShuffleShardSyncLeaderDecider(leaseRefresher, - Executors.newSingleThreadScheduledExecutor(), PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT); + Executors.newSingleThreadScheduledExecutor(), + PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT); this.failoverTimeMillis = this.leaseManagementConfig.failoverTimeMillis(); this.taskBackoffTimeMillis = this.lifecycleConfig.taskBackoffTimeMillis(); // this.retryGetRecordsInSeconds = this.retrievalConfig.retryGetRecordsInSeconds(); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DeterministicShuffleShardSyncLeaderDeciderTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DeterministicShuffleShardSyncLeaderDeciderTest.java index b300f355..dff2a8cb 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DeterministicShuffleShardSyncLeaderDeciderTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DeterministicShuffleShardSyncLeaderDeciderTest.java @@ -14,15 +14,6 @@ */ package software.amazon.kinesis.coordinator; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.runners.MockitoJUnitRunner; -import software.amazon.kinesis.leases.Lease; -import software.amazon.kinesis.leases.LeaseRefresher; -import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; - import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -30,11 +21,23 @@ import java.util.List; import java.util.Random; import java.util.Set; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; - +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; +import software.amazon.kinesis.leases.Lease; +import software.amazon.kinesis.leases.LeaseRefresher; +import software.amazon.kinesis.leases.exceptions.DependencyException; +import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.booleanThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static software.amazon.kinesis.coordinator.DeterministicShuffleShardSyncLeaderDecider.DETERMINISTIC_SHUFFLE_SEED; @@ -52,12 +55,21 @@ public class DeterministicShuffleShardSyncLeaderDeciderTest { @Mock private ScheduledExecutorService scheduledExecutorService; + @Mock + private ReadWriteLock readWriteLock; + private int numShardSyncWorkers; @Before public void setup() { numShardSyncWorkers = 1; - leaderDecider = new DeterministicShuffleShardSyncLeaderDecider(leaseRefresher, scheduledExecutorService, numShardSyncWorkers); + leaderDecider = new DeterministicShuffleShardSyncLeaderDecider(leaseRefresher, + scheduledExecutorService, + numShardSyncWorkers, + readWriteLock); + + when(readWriteLock.readLock()).thenReturn(mock(ReentrantReadWriteLock.ReadLock.class)); + when(readWriteLock.writeLock()).thenReturn(mock(ReentrantReadWriteLock.WriteLock.class)); } @Test @@ -73,6 +85,15 @@ public class DeterministicShuffleShardSyncLeaderDeciderTest { assertTrue("IsLeader should return true if no leases are returned", isLeader); } + @Test + public void testLeaderElectionDoesNotUseLocksOnListLeasesException() throws Exception { + when(leaseRefresher.listLeases()).thenThrow(new DependencyException("error", new Throwable())); + leaderDecider.isLeader(WORKER_ID); + verify(leaseRefresher, times(1)).listLeases(); + verify(readWriteLock.writeLock(), times(0)).lock(); + verify(readWriteLock.writeLock(), times(0)).unlock(); + } + @Test public void testleaderElectionWithEmptyOwnerLeases() throws Exception { List leases = getLeases(5, true, true, true); @@ -100,7 +121,10 @@ public class DeterministicShuffleShardSyncLeaderDeciderTest { @Test public void testElectedLeadersAsPerExpectedShufflingOrderWhenUniqueWorkersLessThanMaxLeaders() { this.numShardSyncWorkers = 5; // More than number of unique lease owners - leaderDecider = new DeterministicShuffleShardSyncLeaderDecider(leaseRefresher, scheduledExecutorService, numShardSyncWorkers); + leaderDecider = new DeterministicShuffleShardSyncLeaderDecider(leaseRefresher, + scheduledExecutorService, + numShardSyncWorkers, + readWriteLock); List leases = getLeases(3, false /*emptyLeaseOwner */, false /* duplicateLeaseOwner */, true /* activeLeases */); Set expectedLeaders = getExpectedLeaders(leases); // All lease owners should be present in expected leaders set, and they should all be leaders.