Only making leader to do the stream sync

This commit is contained in:
Ashwin Giridharan 2020-06-22 14:30:28 -07:00
parent db2c22e046
commit 0246e1e852
2 changed files with 9 additions and 3 deletions

View file

@ -423,7 +423,9 @@ public class Scheduler implements Runnable {
cleanupShardConsumers(assignedShards); cleanupShardConsumers(assignedShards);
// check for new streams and sync with the scheduler state // check for new streams and sync with the scheduler state
if (isLeader()) {
checkAndSyncStreamShardsAndLeases(); checkAndSyncStreamShardsAndLeases();
}
logExecutorState(); logExecutorState();
slog.info("Sleeping ..."); slog.info("Sleeping ...");
@ -440,6 +442,10 @@ public class Scheduler implements Runnable {
slog.resetInfoLogging(); slog.resetInfoLogging();
} }
private boolean isLeader() {
return leaderDecider.isLeader(leaseManagementConfig.workerIdentifier());
}
/** /**
* Note: This method has package level access solely for testing purposes. * Note: This method has package level access solely for testing purposes.

View file

@ -113,8 +113,8 @@ public class ShutdownTask implements ConsumerTask {
try { try {
try { try {
log.debug("Invoking shutdown() for shard {}, concurrencyToken {}. Shutdown reason: {}", log.debug("Invoking shutdown() for shard {} with child shards {} , concurrencyToken {}. Shutdown reason: {}",
leaseKeyProvider.apply(shardInfo), shardInfo.concurrencyToken(), reason); leaseKeyProvider.apply(shardInfo), childShards, shardInfo.concurrencyToken(), reason);
final long startTime = System.currentTimeMillis(); final long startTime = System.currentTimeMillis();
if (reason == ShutdownReason.SHARD_END) { if (reason == ShutdownReason.SHARD_END) {