Minor refactor, addressing feedback
This commit is contained in:
parent
62e13ff3a1
commit
8511475868
3 changed files with 188 additions and 34 deletions
|
|
@ -18,6 +18,7 @@ import lombok.EqualsAndHashCode;
|
||||||
import lombok.Getter;
|
import lombok.Getter;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.apache.commons.lang3.Validate;
|
import org.apache.commons.lang3.Validate;
|
||||||
|
import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException;
|
||||||
import software.amazon.kinesis.leases.ShardSyncTask;
|
import software.amazon.kinesis.leases.ShardSyncTask;
|
||||||
import software.amazon.kinesis.lifecycle.ConsumerTask;
|
import software.amazon.kinesis.lifecycle.ConsumerTask;
|
||||||
import software.amazon.kinesis.lifecycle.TaskResult;
|
import software.amazon.kinesis.lifecycle.TaskResult;
|
||||||
|
|
@ -36,8 +37,8 @@ import java.util.concurrent.TimeUnit;
|
||||||
@EqualsAndHashCode
|
@EqualsAndHashCode
|
||||||
@Slf4j
|
@Slf4j
|
||||||
class PeriodicShardSyncManager {
|
class PeriodicShardSyncManager {
|
||||||
private static final long INITIAL_DELAY = 0;
|
private static final long INITIAL_DELAY = 60 * 1000L;
|
||||||
private static final long PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 5 * 60 * 1000;
|
private static final long PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 5 * 60 * 1000L;
|
||||||
|
|
||||||
private final String workerId;
|
private final String workerId;
|
||||||
private final LeaderDecider leaderDecider;
|
private final LeaderDecider leaderDecider;
|
||||||
|
|
@ -61,14 +62,34 @@ class PeriodicShardSyncManager {
|
||||||
|
|
||||||
public synchronized TaskResult start() {
|
public synchronized TaskResult start() {
|
||||||
if (!isRunning) {
|
if (!isRunning) {
|
||||||
shardSyncThreadPool
|
final Runnable periodicShardSyncer = () -> {
|
||||||
.scheduleWithFixedDelay(this::runShardSync, INITIAL_DELAY, PERIODIC_SHARD_SYNC_INTERVAL_MILLIS,
|
try {
|
||||||
TimeUnit.MILLISECONDS);
|
runShardSync();
|
||||||
|
} catch (Throwable t) {
|
||||||
|
log.error("Error during runShardSync.", t);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
shardSyncThreadPool.scheduleWithFixedDelay(periodicShardSyncer, INITIAL_DELAY, PERIODIC_SHARD_SYNC_INTERVAL_MILLIS,
|
||||||
|
TimeUnit.MILLISECONDS);
|
||||||
isRunning = true;
|
isRunning = true;
|
||||||
|
|
||||||
}
|
}
|
||||||
return new TaskResult(null);
|
return new TaskResult(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public synchronized TaskResult syncShardsOnce() {
|
||||||
|
|
||||||
|
Exception lastException = null;
|
||||||
|
try {
|
||||||
|
if (!isRunning) {
|
||||||
|
runShardSync();
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
lastException = e;
|
||||||
|
}
|
||||||
|
return new TaskResult(lastException);
|
||||||
|
}
|
||||||
|
|
||||||
public void stop() {
|
public void stop() {
|
||||||
if (isRunning) {
|
if (isRunning) {
|
||||||
log.info(String.format("Shutting down leader decider on worker %s", workerId));
|
log.info(String.format("Shutting down leader decider on worker %s", workerId));
|
||||||
|
|
@ -80,15 +101,23 @@ class PeriodicShardSyncManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void runShardSync() {
|
private void runShardSync() {
|
||||||
try {
|
if (leaderDecider.isLeader(workerId)) {
|
||||||
if (leaderDecider.isLeader(workerId)) {
|
log.info(String.format("WorkerId %s is a leader, running the shard sync task", workerId));
|
||||||
log.info(String.format("WorkerId %s is a leader, running the shard sync task", workerId));
|
final TaskResult taskResult = metricsEmittingShardSyncTask.call();
|
||||||
metricsEmittingShardSyncTask.call();
|
if (taskResult != null && taskResult.getException() != null) {
|
||||||
} else {
|
throw new KinesisClientLibIOException("Failed to sync shards", taskResult.getException());
|
||||||
log.debug(String.format("WorkerId %s is not a leader, not running the shard sync task", workerId));
|
|
||||||
}
|
}
|
||||||
} catch (Throwable t) {
|
} else {
|
||||||
log.error("Error during runShardSync.", t);
|
log.debug(String.format("WorkerId %s is not a leader, not running the shard sync task", workerId));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Checks if the entire hash range is covered
|
||||||
|
* @return true if covered, false otherwise
|
||||||
|
*/
|
||||||
|
public boolean hashRangeCovered() {
|
||||||
|
// TODO: Implement method
|
||||||
|
return true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -46,7 +46,6 @@ import software.amazon.kinesis.lifecycle.ShutdownNotification;
|
||||||
import software.amazon.kinesis.lifecycle.ShutdownReason;
|
import software.amazon.kinesis.lifecycle.ShutdownReason;
|
||||||
import software.amazon.kinesis.lifecycle.TaskResult;
|
import software.amazon.kinesis.lifecycle.TaskResult;
|
||||||
import software.amazon.kinesis.metrics.CloudWatchMetricsFactory;
|
import software.amazon.kinesis.metrics.CloudWatchMetricsFactory;
|
||||||
import software.amazon.kinesis.metrics.MetricsCollectingTaskDecorator;
|
|
||||||
import software.amazon.kinesis.metrics.MetricsConfig;
|
import software.amazon.kinesis.metrics.MetricsConfig;
|
||||||
import software.amazon.kinesis.metrics.MetricsFactory;
|
import software.amazon.kinesis.metrics.MetricsFactory;
|
||||||
import software.amazon.kinesis.processor.Checkpointer;
|
import software.amazon.kinesis.processor.Checkpointer;
|
||||||
|
|
@ -68,6 +67,7 @@ import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -102,7 +102,7 @@ public class Scheduler implements Runnable {
|
||||||
// private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
|
// private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
|
||||||
private final LeaseCoordinator leaseCoordinator;
|
private final LeaseCoordinator leaseCoordinator;
|
||||||
private final ShardSyncTaskManager shardSyncTaskManager;
|
private final ShardSyncTaskManager shardSyncTaskManager;
|
||||||
private final PeriodicShardSyncManager periodicShardSyncManager;
|
private final PeriodicShardSyncManager leaderElectedPeriodicShardSyncManager;
|
||||||
private final ShardPrioritization shardPrioritization;
|
private final ShardPrioritization shardPrioritization;
|
||||||
private final boolean cleanupLeasesUponShardCompletion;
|
private final boolean cleanupLeasesUponShardCompletion;
|
||||||
private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist;
|
private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist;
|
||||||
|
|
@ -121,7 +121,7 @@ public class Scheduler implements Runnable {
|
||||||
private final AggregatorUtil aggregatorUtil;
|
private final AggregatorUtil aggregatorUtil;
|
||||||
private final HierarchicalShardSyncer hierarchicalShardSyncer;
|
private final HierarchicalShardSyncer hierarchicalShardSyncer;
|
||||||
private final long schedulerInitializationBackoffTimeMillis;
|
private final long schedulerInitializationBackoffTimeMillis;
|
||||||
private LeaderDecider leaderDecider;
|
private final LeaderDecider leaderDecider;
|
||||||
|
|
||||||
// Holds consumers for shards the worker is currently tracking. Key is shard
|
// Holds consumers for shards the worker is currently tracking. Key is shard
|
||||||
// info, value is ShardConsumer.
|
// info, value is ShardConsumer.
|
||||||
|
|
@ -212,10 +212,8 @@ public class Scheduler implements Runnable {
|
||||||
this.workerStateChangeListener = this.coordinatorConfig.coordinatorFactory()
|
this.workerStateChangeListener = this.coordinatorConfig.coordinatorFactory()
|
||||||
.createWorkerStateChangeListener();
|
.createWorkerStateChangeListener();
|
||||||
}
|
}
|
||||||
if (leaderDecider == null) {
|
this.leaderDecider = new DeterministicShuffleShardSyncLeaderDecider(leaseRefresher,
|
||||||
leaderDecider = new DeterministicShuffleShardSyncLeaderDecider(leaseRefresher,
|
|
||||||
Executors.newSingleThreadScheduledExecutor(), PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT);
|
Executors.newSingleThreadScheduledExecutor(), PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT);
|
||||||
}
|
|
||||||
this.initialPosition = retrievalConfig.initialPositionInStreamExtended();
|
this.initialPosition = retrievalConfig.initialPositionInStreamExtended();
|
||||||
this.failoverTimeMillis = this.leaseManagementConfig.failoverTimeMillis();
|
this.failoverTimeMillis = this.leaseManagementConfig.failoverTimeMillis();
|
||||||
this.taskBackoffTimeMillis = this.lifecycleConfig.taskBackoffTimeMillis();
|
this.taskBackoffTimeMillis = this.lifecycleConfig.taskBackoffTimeMillis();
|
||||||
|
|
@ -232,7 +230,7 @@ public class Scheduler implements Runnable {
|
||||||
ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetector, leaseRefresher, initialPosition,
|
ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetector, leaseRefresher, initialPosition,
|
||||||
cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L, hierarchicalShardSyncer,
|
cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L, hierarchicalShardSyncer,
|
||||||
metricsFactory);
|
metricsFactory);
|
||||||
this.periodicShardSyncManager = new PeriodicShardSyncManager(leaseManagementConfig.workerIdentifier(),
|
this.leaderElectedPeriodicShardSyncManager = new PeriodicShardSyncManager(leaseManagementConfig.workerIdentifier(),
|
||||||
leaderDecider, shardSyncTask, metricsFactory);
|
leaderDecider, shardSyncTask, metricsFactory);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -247,12 +245,7 @@ public class Scheduler implements Runnable {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
initialize();
|
initialize();
|
||||||
log.info("Initialization complete. Scheduling periodicShardSync..");
|
log.info("Initialization complete. Starting worker loop.");
|
||||||
|
|
||||||
periodicShardSyncManager.start();
|
|
||||||
|
|
||||||
log.info("Scheduled periodicShardSync tasks. Starting worker loop.");
|
|
||||||
|
|
||||||
} catch (RuntimeException e) {
|
} catch (RuntimeException e) {
|
||||||
log.error("Unable to initialize after {} attempts. Shutting down.", maxInitializationAttempts, e);
|
log.error("Unable to initialize after {} attempts. Shutting down.", maxInitializationAttempts, e);
|
||||||
workerStateChangeListener.onAllInitializationAttemptsFailed(e);
|
workerStateChangeListener.onAllInitializationAttemptsFailed(e);
|
||||||
|
|
@ -282,11 +275,14 @@ public class Scheduler implements Runnable {
|
||||||
|
|
||||||
TaskResult result = null;
|
TaskResult result = null;
|
||||||
if (!skipShardSyncAtWorkerInitializationIfLeasesExist || leaseRefresher.isLeaseTableEmpty()) {
|
if (!skipShardSyncAtWorkerInitializationIfLeasesExist || leaseRefresher.isLeaseTableEmpty()) {
|
||||||
|
for (int j = 0; j < 10 && leaseRefresher.isLeaseTableEmpty(); j++) {
|
||||||
|
// check every 1-5 seconds if lease table is still empty,
|
||||||
|
// to minimize contention between all workers bootstrapping at the same time
|
||||||
|
long waitTime = ThreadLocalRandom.current().nextLong(1000L, 5000L);
|
||||||
|
Thread.sleep(waitTime);
|
||||||
|
}
|
||||||
log.info("Syncing Kinesis shard info");
|
log.info("Syncing Kinesis shard info");
|
||||||
ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetector, leaseRefresher, initialPosition,
|
result = leaderElectedPeriodicShardSyncManager.syncShardsOnce();
|
||||||
cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L, hierarchicalShardSyncer,
|
|
||||||
metricsFactory);
|
|
||||||
result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call();
|
|
||||||
} else {
|
} else {
|
||||||
log.info("Skipping shard sync per configuration setting (and lease table is not empty)");
|
log.info("Skipping shard sync per configuration setting (and lease table is not empty)");
|
||||||
}
|
}
|
||||||
|
|
@ -298,6 +294,9 @@ public class Scheduler implements Runnable {
|
||||||
} else {
|
} else {
|
||||||
log.info("LeaseCoordinator is already running. No need to start it.");
|
log.info("LeaseCoordinator is already running. No need to start it.");
|
||||||
}
|
}
|
||||||
|
log.info("Scheduling periodicShardSync)");
|
||||||
|
// leaderElectedPeriodicShardSyncManager.start();
|
||||||
|
// TODO: enable periodicShardSync after https://github.com/jushkem/amazon-kinesis-client/pull/2 is merged
|
||||||
isDone = true;
|
isDone = true;
|
||||||
} else {
|
} else {
|
||||||
lastException = result.getException();
|
lastException = result.getException();
|
||||||
|
|
@ -309,9 +308,10 @@ public class Scheduler implements Runnable {
|
||||||
lastException = e;
|
lastException = e;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!isDone) {
|
if (!isDone || !leaderElectedPeriodicShardSyncManager.hashRangeCovered()) {
|
||||||
try {
|
try {
|
||||||
Thread.sleep(schedulerInitializationBackoffTimeMillis);
|
Thread.sleep(schedulerInitializationBackoffTimeMillis);
|
||||||
|
leaderElectedPeriodicShardSyncManager.stop();
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
log.debug("Sleep interrupted while initializing worker.");
|
log.debug("Sleep interrupted while initializing worker.");
|
||||||
}
|
}
|
||||||
|
|
@ -532,7 +532,7 @@ public class Scheduler implements Runnable {
|
||||||
// Lost leases will force Worker to begin shutdown process for all shard consumers in
|
// Lost leases will force Worker to begin shutdown process for all shard consumers in
|
||||||
// Worker.run().
|
// Worker.run().
|
||||||
leaseCoordinator.stop();
|
leaseCoordinator.stop();
|
||||||
periodicShardSyncManager.stop();
|
leaderElectedPeriodicShardSyncManager.stop();
|
||||||
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.SHUT_DOWN);
|
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.SHUT_DOWN);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -630,12 +630,12 @@ public class Scheduler implements Runnable {
|
||||||
hierarchicalShardSyncer,
|
hierarchicalShardSyncer,
|
||||||
metricsFactory);
|
metricsFactory);
|
||||||
return new ShardConsumer(cache, executorService, shardInfo, lifecycleConfig.logWarningForTaskAfterMillis(),
|
return new ShardConsumer(cache, executorService, shardInfo, lifecycleConfig.logWarningForTaskAfterMillis(),
|
||||||
argument, lifecycleConfig.taskExecutionListener(),lifecycleConfig.readTimeoutsToIgnoreBeforeWarning());
|
argument, lifecycleConfig.taskExecutionListener(), lifecycleConfig.readTimeoutsToIgnoreBeforeWarning());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* NOTE: This method is internal/private to the Worker class. It has package access solely for testing.
|
* NOTE: This method is internal/private to the Worker class. It has package access solely for testing.
|
||||||
*
|
* <p>
|
||||||
* This method relies on ShardInfo.equals() method returning true for ShardInfo objects which may have been
|
* This method relies on ShardInfo.equals() method returning true for ShardInfo objects which may have been
|
||||||
* instantiated with parentShardIds in a different order (and rest of the fields being the equal). For example
|
* instantiated with parentShardIds in a different order (and rest of the fields being the equal). For example
|
||||||
* shardInfo1.equals(shardInfo2) should return true with shardInfo1 and shardInfo2 defined as follows. ShardInfo
|
* shardInfo1.equals(shardInfo2) should return true with shardInfo1 and shardInfo2 defined as follows. ShardInfo
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,125 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2020 Amazon.com, Inc. or its affiliates.
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package software.amazon.kinesis.coordinator;
|
||||||
|
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.mockito.Mock;
|
||||||
|
import org.mockito.runners.MockitoJUnitRunner;
|
||||||
|
import software.amazon.kinesis.leases.Lease;
|
||||||
|
import software.amazon.kinesis.leases.LeaseRefresher;
|
||||||
|
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
import static software.amazon.kinesis.coordinator.DeterministicShuffleShardSyncLeaderDecider.DETERMINISTIC_SHUFFLE_SEED;
|
||||||
|
|
||||||
|
@RunWith(MockitoJUnitRunner.class)
|
||||||
|
public class DeterministicShuffleShardSyncLeaderDeciderTest {
|
||||||
|
private static final String LEASE_KEY = "lease_key";
|
||||||
|
private static final String LEASE_OWNER = "lease_owner";
|
||||||
|
private static final String WORKER_ID = "worker-id";
|
||||||
|
|
||||||
|
private DeterministicShuffleShardSyncLeaderDecider leaderDecider;
|
||||||
|
|
||||||
|
@Mock
|
||||||
|
private LeaseRefresher leaseRefresher;
|
||||||
|
|
||||||
|
@Mock
|
||||||
|
private ScheduledExecutorService scheduledExecutorService;
|
||||||
|
|
||||||
|
private int numShardSyncWorkers;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() {
|
||||||
|
numShardSyncWorkers = 1;
|
||||||
|
leaderDecider = new DeterministicShuffleShardSyncLeaderDecider(leaseRefresher, scheduledExecutorService, numShardSyncWorkers);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLeaderElectionWithNullLeases() {
|
||||||
|
boolean isLeader = leaderDecider.isLeader(WORKER_ID);
|
||||||
|
assertTrue("IsLeader should return true if leaders is null", isLeader);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLeaderElectionWithEmptyLeases() throws Exception {
|
||||||
|
when(leaseRefresher.listLeases()).thenReturn(new ArrayList<>());
|
||||||
|
boolean isLeader = leaderDecider.isLeader(WORKER_ID);
|
||||||
|
assertTrue("IsLeader should return true if no leases are returned", isLeader);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testElectedLeadersAsPerExpectedShufflingOrder()
|
||||||
|
throws Exception {
|
||||||
|
List<Lease> leases = getLeases(5, false /* duplicateLeaseOwner */, true /* activeLeases */);
|
||||||
|
when(leaseRefresher.listLeases()).thenReturn(leases);
|
||||||
|
Set<String> expectedLeaders = getExpectedLeaders(leases);
|
||||||
|
for (String leader : expectedLeaders) {
|
||||||
|
assertTrue(leaderDecider.isLeader(leader));
|
||||||
|
}
|
||||||
|
for (Lease lease : leases) {
|
||||||
|
if (!expectedLeaders.contains(lease.leaseOwner())) {
|
||||||
|
assertFalse(leaderDecider.isLeader(lease.leaseOwner()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testElectedLeadersAsPerExpectedShufflingOrderWhenUniqueWorkersLessThanMaxLeaders() {
|
||||||
|
this.numShardSyncWorkers = 5; // More than number of unique lease owners
|
||||||
|
leaderDecider = new DeterministicShuffleShardSyncLeaderDecider(leaseRefresher, scheduledExecutorService, numShardSyncWorkers);
|
||||||
|
List<Lease> leases = getLeases(3, false /* duplicateLeaseOwner */, true /* activeLeases */);
|
||||||
|
Set<String> expectedLeaders = getExpectedLeaders(leases);
|
||||||
|
// All lease owners should be present in expected leaders set, and they should all be leaders.
|
||||||
|
for (Lease lease : leases) {
|
||||||
|
assertTrue(leaderDecider.isLeader(lease.leaseOwner()));
|
||||||
|
assertTrue(expectedLeaders.contains(lease.leaseOwner()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<Lease> getLeases(int count, boolean duplicateLeaseOwner, boolean activeLeases) {
|
||||||
|
List<Lease> leases = new ArrayList<>();
|
||||||
|
for (int i = 0; i < count; i++) {
|
||||||
|
Lease lease = new Lease();
|
||||||
|
lease.leaseKey(LEASE_KEY + i);
|
||||||
|
lease.checkpoint(activeLeases ? ExtendedSequenceNumber.LATEST : ExtendedSequenceNumber.SHARD_END);
|
||||||
|
lease.leaseCounter(new Random().nextLong());
|
||||||
|
lease.leaseOwner(LEASE_OWNER + (duplicateLeaseOwner ? "" : i));
|
||||||
|
leases.add(lease);
|
||||||
|
}
|
||||||
|
return leases;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Set<String> getExpectedLeaders(List<Lease> leases) {
|
||||||
|
List<String> uniqueHosts = leases.stream().filter(lease -> lease.leaseOwner() != null)
|
||||||
|
.map(Lease::leaseOwner).distinct().sorted().collect(Collectors.toList());
|
||||||
|
|
||||||
|
Collections.shuffle(uniqueHosts, new Random(DETERMINISTIC_SHUFFLE_SEED));
|
||||||
|
int numWorkers = Math.min(uniqueHosts.size(), this.numShardSyncWorkers);
|
||||||
|
return new HashSet<>(uniqueHosts.subList(0, numWorkers));
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Reference in a new issue