diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java index 3134fef7..7b10694f 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java @@ -18,6 +18,7 @@ import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.Validate; +import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException; import software.amazon.kinesis.leases.ShardSyncTask; import software.amazon.kinesis.lifecycle.ConsumerTask; import software.amazon.kinesis.lifecycle.TaskResult; @@ -36,8 +37,8 @@ import java.util.concurrent.TimeUnit; @EqualsAndHashCode @Slf4j class PeriodicShardSyncManager { - private static final long INITIAL_DELAY = 0; - private static final long PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 5 * 60 * 1000; + private static final long INITIAL_DELAY = 60 * 1000L; + private static final long PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 5 * 60 * 1000L; private final String workerId; private final LeaderDecider leaderDecider; @@ -61,14 +62,34 @@ class PeriodicShardSyncManager { public synchronized TaskResult start() { if (!isRunning) { - shardSyncThreadPool - .scheduleWithFixedDelay(this::runShardSync, INITIAL_DELAY, PERIODIC_SHARD_SYNC_INTERVAL_MILLIS, - TimeUnit.MILLISECONDS); + final Runnable periodicShardSyncer = () -> { + try { + runShardSync(); + } catch (Throwable t) { + log.error("Error during runShardSync.", t); + } + }; + shardSyncThreadPool.scheduleWithFixedDelay(periodicShardSyncer, INITIAL_DELAY, PERIODIC_SHARD_SYNC_INTERVAL_MILLIS, + TimeUnit.MILLISECONDS); isRunning = true; + } return new TaskResult(null); } + public synchronized TaskResult syncShardsOnce() { + + Exception lastException = null; + try { + if (!isRunning) { + runShardSync(); + } + } catch (Exception e) { + lastException = e; + } + return new TaskResult(lastException); + } + public void stop() { if (isRunning) { log.info(String.format("Shutting down leader decider on worker %s", workerId)); @@ -80,15 +101,23 @@ class PeriodicShardSyncManager { } private void runShardSync() { - try { - if (leaderDecider.isLeader(workerId)) { - log.info(String.format("WorkerId %s is a leader, running the shard sync task", workerId)); - metricsEmittingShardSyncTask.call(); - } else { - log.debug(String.format("WorkerId %s is not a leader, not running the shard sync task", workerId)); + if (leaderDecider.isLeader(workerId)) { + log.info(String.format("WorkerId %s is a leader, running the shard sync task", workerId)); + final TaskResult taskResult = metricsEmittingShardSyncTask.call(); + if (taskResult != null && taskResult.getException() != null) { + throw new KinesisClientLibIOException("Failed to sync shards", taskResult.getException()); } - } catch (Throwable t) { - log.error("Error during runShardSync.", t); + } else { + log.debug(String.format("WorkerId %s is not a leader, not running the shard sync task", workerId)); } } + + /** + * Checks if the entire hash range is covered + * @return true if covered, false otherwise + */ + public boolean hashRangeCovered() { + // TODO: Implement method + return true; + } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index 7f92c384..87b92162 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -46,7 +46,6 @@ import software.amazon.kinesis.lifecycle.ShutdownNotification; import software.amazon.kinesis.lifecycle.ShutdownReason; import software.amazon.kinesis.lifecycle.TaskResult; import software.amazon.kinesis.metrics.CloudWatchMetricsFactory; -import software.amazon.kinesis.metrics.MetricsCollectingTaskDecorator; import software.amazon.kinesis.metrics.MetricsConfig; import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.processor.Checkpointer; @@ -68,6 +67,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; /** @@ -102,7 +102,7 @@ public class Scheduler implements Runnable { // private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; private final LeaseCoordinator leaseCoordinator; private final ShardSyncTaskManager shardSyncTaskManager; - private final PeriodicShardSyncManager periodicShardSyncManager; + private final PeriodicShardSyncManager leaderElectedPeriodicShardSyncManager; private final ShardPrioritization shardPrioritization; private final boolean cleanupLeasesUponShardCompletion; private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist; @@ -121,7 +121,7 @@ public class Scheduler implements Runnable { private final AggregatorUtil aggregatorUtil; private final HierarchicalShardSyncer hierarchicalShardSyncer; private final long schedulerInitializationBackoffTimeMillis; - private LeaderDecider leaderDecider; + private final LeaderDecider leaderDecider; // Holds consumers for shards the worker is currently tracking. Key is shard // info, value is ShardConsumer. @@ -212,10 +212,8 @@ public class Scheduler implements Runnable { this.workerStateChangeListener = this.coordinatorConfig.coordinatorFactory() .createWorkerStateChangeListener(); } - if (leaderDecider == null) { - leaderDecider = new DeterministicShuffleShardSyncLeaderDecider(leaseRefresher, + this.leaderDecider = new DeterministicShuffleShardSyncLeaderDecider(leaseRefresher, Executors.newSingleThreadScheduledExecutor(), PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT); - } this.initialPosition = retrievalConfig.initialPositionInStreamExtended(); this.failoverTimeMillis = this.leaseManagementConfig.failoverTimeMillis(); this.taskBackoffTimeMillis = this.lifecycleConfig.taskBackoffTimeMillis(); @@ -232,7 +230,7 @@ public class Scheduler implements Runnable { ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetector, leaseRefresher, initialPosition, cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L, hierarchicalShardSyncer, metricsFactory); - this.periodicShardSyncManager = new PeriodicShardSyncManager(leaseManagementConfig.workerIdentifier(), + this.leaderElectedPeriodicShardSyncManager = new PeriodicShardSyncManager(leaseManagementConfig.workerIdentifier(), leaderDecider, shardSyncTask, metricsFactory); } @@ -247,12 +245,7 @@ public class Scheduler implements Runnable { try { initialize(); - log.info("Initialization complete. Scheduling periodicShardSync.."); - - periodicShardSyncManager.start(); - - log.info("Scheduled periodicShardSync tasks. Starting worker loop."); - + log.info("Initialization complete. Starting worker loop."); } catch (RuntimeException e) { log.error("Unable to initialize after {} attempts. Shutting down.", maxInitializationAttempts, e); workerStateChangeListener.onAllInitializationAttemptsFailed(e); @@ -282,11 +275,14 @@ public class Scheduler implements Runnable { TaskResult result = null; if (!skipShardSyncAtWorkerInitializationIfLeasesExist || leaseRefresher.isLeaseTableEmpty()) { + for (int j = 0; j < 10 && leaseRefresher.isLeaseTableEmpty(); j++) { + // check every 1-5 seconds if lease table is still empty, + // to minimize contention between all workers bootstrapping at the same time + long waitTime = ThreadLocalRandom.current().nextLong(1000L, 5000L); + Thread.sleep(waitTime); + } log.info("Syncing Kinesis shard info"); - ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetector, leaseRefresher, initialPosition, - cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L, hierarchicalShardSyncer, - metricsFactory); - result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call(); + result = leaderElectedPeriodicShardSyncManager.syncShardsOnce(); } else { log.info("Skipping shard sync per configuration setting (and lease table is not empty)"); } @@ -298,6 +294,9 @@ public class Scheduler implements Runnable { } else { log.info("LeaseCoordinator is already running. No need to start it."); } + log.info("Scheduling periodicShardSync)"); + // leaderElectedPeriodicShardSyncManager.start(); + // TODO: enable periodicShardSync after https://github.com/jushkem/amazon-kinesis-client/pull/2 is merged isDone = true; } else { lastException = result.getException(); @@ -309,9 +308,10 @@ public class Scheduler implements Runnable { lastException = e; } - if (!isDone) { + if (!isDone || !leaderElectedPeriodicShardSyncManager.hashRangeCovered()) { try { Thread.sleep(schedulerInitializationBackoffTimeMillis); + leaderElectedPeriodicShardSyncManager.stop(); } catch (InterruptedException e) { log.debug("Sleep interrupted while initializing worker."); } @@ -532,7 +532,7 @@ public class Scheduler implements Runnable { // Lost leases will force Worker to begin shutdown process for all shard consumers in // Worker.run(). leaseCoordinator.stop(); - periodicShardSyncManager.stop(); + leaderElectedPeriodicShardSyncManager.stop(); workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.SHUT_DOWN); } } @@ -630,12 +630,12 @@ public class Scheduler implements Runnable { hierarchicalShardSyncer, metricsFactory); return new ShardConsumer(cache, executorService, shardInfo, lifecycleConfig.logWarningForTaskAfterMillis(), - argument, lifecycleConfig.taskExecutionListener(),lifecycleConfig.readTimeoutsToIgnoreBeforeWarning()); + argument, lifecycleConfig.taskExecutionListener(), lifecycleConfig.readTimeoutsToIgnoreBeforeWarning()); } /** * NOTE: This method is internal/private to the Worker class. It has package access solely for testing. - * + *
* This method relies on ShardInfo.equals() method returning true for ShardInfo objects which may have been
* instantiated with parentShardIds in a different order (and rest of the fields being the equal). For example
* shardInfo1.equals(shardInfo2) should return true with shardInfo1 and shardInfo2 defined as follows. ShardInfo
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DeterministicShuffleShardSyncLeaderDeciderTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DeterministicShuffleShardSyncLeaderDeciderTest.java
new file mode 100644
index 00000000..b6ff3a0d
--- /dev/null
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DeterministicShuffleShardSyncLeaderDeciderTest.java
@@ -0,0 +1,125 @@
+/*
+ * Copyright 2020 Amazon.com, Inc. or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package software.amazon.kinesis.coordinator;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import software.amazon.kinesis.leases.Lease;
+import software.amazon.kinesis.leases.LeaseRefresher;
+import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.when;
+import static software.amazon.kinesis.coordinator.DeterministicShuffleShardSyncLeaderDecider.DETERMINISTIC_SHUFFLE_SEED;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DeterministicShuffleShardSyncLeaderDeciderTest {
+ private static final String LEASE_KEY = "lease_key";
+ private static final String LEASE_OWNER = "lease_owner";
+ private static final String WORKER_ID = "worker-id";
+
+ private DeterministicShuffleShardSyncLeaderDecider leaderDecider;
+
+ @Mock
+ private LeaseRefresher leaseRefresher;
+
+ @Mock
+ private ScheduledExecutorService scheduledExecutorService;
+
+ private int numShardSyncWorkers;
+
+ @Before
+ public void setup() {
+ numShardSyncWorkers = 1;
+ leaderDecider = new DeterministicShuffleShardSyncLeaderDecider(leaseRefresher, scheduledExecutorService, numShardSyncWorkers);
+ }
+
+ @Test
+ public void testLeaderElectionWithNullLeases() {
+ boolean isLeader = leaderDecider.isLeader(WORKER_ID);
+ assertTrue("IsLeader should return true if leaders is null", isLeader);
+ }
+
+ @Test
+ public void testLeaderElectionWithEmptyLeases() throws Exception {
+ when(leaseRefresher.listLeases()).thenReturn(new ArrayList<>());
+ boolean isLeader = leaderDecider.isLeader(WORKER_ID);
+ assertTrue("IsLeader should return true if no leases are returned", isLeader);
+ }
+
+ @Test
+ public void testElectedLeadersAsPerExpectedShufflingOrder()
+ throws Exception {
+ List