diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardSyncer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardSyncer.java index 13a52355..21890663 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardSyncer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardSyncer.java @@ -177,7 +177,7 @@ class KinesisShardSyncer implements ShardSyncer { List shards; if(CollectionUtils.isNullOrEmpty(latestShards)) { - shards = getCompleteShardList(kinesisProxy); + shards = getShardListAtInitialPosition(kinesisProxy, initialPosition); } else { shards = latestShards; } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncManager.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncManager.java index d129944f..87237e0b 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncManager.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncManager.java @@ -58,14 +58,31 @@ class PeriodicShardSyncManager { public synchronized TaskResult start() { if (!isRunning) { + final Runnable periodicShardSyncer = () -> { + try { + runShardSync(); + } catch (Throwable t) { + LOG.error("Error running shard sync.", t); + } + }; + shardSyncThreadPool - .scheduleWithFixedDelay(this::runShardSync, INITIAL_DELAY, PERIODIC_SHARD_SYNC_INTERVAL_MILLIS, + .scheduleWithFixedDelay(periodicShardSyncer, INITIAL_DELAY, PERIODIC_SHARD_SYNC_INTERVAL_MILLIS, TimeUnit.MILLISECONDS); isRunning = true; } return new TaskResult(null); } + /** + * Runs ShardSync once, without scheduling further periodic ShardSyncs. + * @return TaskResult from shard sync + */ + public synchronized TaskResult syncShardsOnce() { + LOG.info("Syncing shards once from worker " + workerId); + return metricsEmittingShardSyncTask.call(); + } + public void stop() { if (isRunning) { LOG.info(String.format("Shutting down leader decider on worker %s", workerId)); @@ -77,15 +94,12 @@ class PeriodicShardSyncManager { } private void runShardSync() { - try { - if (leaderDecider.isLeader(workerId)) { - LOG.debug(String.format("WorkerId %s is a leader, running the shard sync task", workerId)); - metricsEmittingShardSyncTask.call(); - } else { - LOG.debug(String.format("WorkerId %s is not a leader, not running the shard sync task", workerId)); - } - } catch (Throwable t) { - LOG.error("Error during runShardSync.", t); + if (leaderDecider.isLeader(workerId)) { + LOG.debug(String.format("WorkerId %s is a leader, running the shard sync task", workerId)); + metricsEmittingShardSyncTask.call(); + } else { + LOG.debug(String.format("WorkerId %s is not a leader, not running the shard sync task", workerId)); } + } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardEndShardSyncStrategy.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardEndShardSyncStrategy.java index 8077efcc..18fc4a08 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardEndShardSyncStrategy.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardEndShardSyncStrategy.java @@ -42,6 +42,7 @@ class ShardEndShardSyncStrategy implements ShardSyncStrategy { @Override public TaskResult onWorkerInitialization() { + // TODO: Start leaderElectedPeriodicShardSyncManager in background LOG.debug(String.format("onWorkerInitialization is NoOp for ShardSyncStrategyType %s", getStrategyType().toString())); return new TaskResult(null); } @@ -65,6 +66,7 @@ class ShardEndShardSyncStrategy implements ShardSyncStrategy { @Override public void onWorkerShutDown() { + // TODO: Shut down leaderElectedPeriodicShardSyncManager LOG.debug(String.format("Stop is NoOp for ShardSyncStrategyType %s", getStrategyType().toString())); } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 9eabcffe..5f1b4fac 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -29,11 +29,15 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Consumer; +import com.amazonaws.services.kinesis.leases.exceptions.DependencyException; +import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException; +import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException; import com.amazonaws.services.kinesis.leases.impl.GenericLeaseSelector; import com.amazonaws.services.kinesis.leases.impl.LeaseCoordinator; import com.amazonaws.services.kinesis.leases.impl.LeaseRenewer; @@ -88,9 +92,14 @@ public class Worker implements Runnable { private static final Log LOG = LogFactory.getLog(Worker.class); + // Default configs for periodic shard sync private static final int SHARD_SYNC_SLEEP_FOR_PERIODIC_SHARD_SYNC = 0; private static final int MAX_INITIALIZATION_ATTEMPTS = 20; private static final int PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT = 1; //Default for KCL. + static final long LEASE_TABLE_CHECK_FREQUENCY_MILLIS = 3 * 1000L; + static final long MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 1 * 1000L; + static final long MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 30 * 1000L; + private static final WorkerStateChangeListener DEFAULT_WORKER_STATE_CHANGE_LISTENER = new NoOpWorkerStateChangeListener(); private static final LeaseCleanupValidator DEFAULT_LEASE_CLEANUP_VALIDATOR = new KinesisLeaseCleanupValidator(); private static final LeaseSelector DEFAULT_LEASE_SELECTOR = new GenericLeaseSelector(); @@ -147,6 +156,7 @@ public class Worker implements Runnable { // Periodic Shard Sync related fields private LeaderDecider leaderDecider; private ShardSyncStrategy shardSyncStrategy; + private PeriodicShardSyncManager leaderElectedPeriodicShardSyncManager; /** * Constructor. @@ -406,7 +416,7 @@ public class Worker implements Runnable { config.getShardPrioritizationStrategy(), config.getRetryGetRecordsInSeconds(), config.getMaxGetRecordsThreadPool(), - DEFAULT_WORKER_STATE_CHANGE_LISTENER, DEFAULT_LEASE_CLEANUP_VALIDATOR, null /* leaderDecider */); + DEFAULT_WORKER_STATE_CHANGE_LISTENER, DEFAULT_LEASE_CLEANUP_VALIDATOR, null, null); // If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB. if (config.getRegionName() != null) { @@ -467,7 +477,7 @@ public class Worker implements Runnable { shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, shardPrioritization, Optional.empty(), Optional.empty(), DEFAULT_WORKER_STATE_CHANGE_LISTENER, - DEFAULT_LEASE_CLEANUP_VALIDATOR, null); + DEFAULT_LEASE_CLEANUP_VALIDATOR, null, null); } /** @@ -507,6 +517,10 @@ public class Worker implements Runnable { * Max number of threads in the getRecords thread pool. * @param leaseCleanupValidator * leaseCleanupValidator instance used to validate leases + * @param leaderDecider + * leaderDecider instance used elect shard sync leaders + * @param periodicShardSyncManager + * manages periodic shard sync tasks */ // NOTE: This has package level access solely for testing // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES @@ -517,13 +531,13 @@ public class Worker implements Runnable { IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization, Optional retryGetRecordsInSeconds, Optional maxGetRecordsThreadPool, WorkerStateChangeListener workerStateChangeListener, - LeaseCleanupValidator leaseCleanupValidator, LeaderDecider leaderDecider) { + LeaseCleanupValidator leaseCleanupValidator, LeaderDecider leaderDecider, PeriodicShardSyncManager periodicShardSyncManager) { this(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream, parentShardPollIntervalMillis, shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, shardPrioritization, retryGetRecordsInSeconds, maxGetRecordsThreadPool, workerStateChangeListener, new KinesisShardSyncer(leaseCleanupValidator), - leaderDecider); + leaderDecider, periodicShardSyncManager); } Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, @@ -533,7 +547,8 @@ public class Worker implements Runnable { IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization, Optional retryGetRecordsInSeconds, Optional maxGetRecordsThreadPool, - WorkerStateChangeListener workerStateChangeListener, ShardSyncer shardSyncer, LeaderDecider leaderDecider) { + WorkerStateChangeListener workerStateChangeListener, ShardSyncer shardSyncer, LeaderDecider leaderDecider, + PeriodicShardSyncManager periodicShardSyncManager) { this.applicationName = applicationName; this.recordProcessorFactory = recordProcessorFactory; this.config = config; @@ -558,15 +573,17 @@ public class Worker implements Runnable { this.maxGetRecordsThreadPool = maxGetRecordsThreadPool; this.workerStateChangeListener = workerStateChangeListener; workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.CREATED); - this.leaderDecider = leaderDecider; this.shardSyncStrategy = createShardSyncStrategy(config.getShardSyncStrategyType()); LOG.info(String.format("Shard sync strategy determined as %s.", shardSyncStrategy.getStrategyType().toString())); + this.leaderDecider = leaderDecider != null ? leaderDecider : createLeaderDecider(); + this.leaderElectedPeriodicShardSyncManager = periodicShardSyncManager != null ? periodicShardSyncManager + : createPeriodicShardSyncManager(); } private ShardSyncStrategy createShardSyncStrategy(ShardSyncStrategyType strategyType) { switch (strategyType) { case PERIODIC: - return createPeriodicShardSyncStrategy(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager()); + return createPeriodicShardSyncStrategy(); case SHARD_END: default: return createShardEndShardSyncStrategy(controlServer); @@ -673,30 +690,30 @@ public class Worker implements Runnable { LOG.info("Initializing LeaseCoordinator"); leaseCoordinator.initialize(); - TaskResult result = null; - if (!skipShardSyncAtWorkerInitializationIfLeasesExist - || leaseCoordinator.getLeaseManager().isLeaseTableEmpty()) { - LOG.info("Syncing Kinesis shard info"); - ShardSyncTask shardSyncTask = new ShardSyncTask(streamConfig.getStreamProxy(), - leaseCoordinator.getLeaseManager(), initialPosition, cleanupLeasesUponShardCompletion, - config.shouldIgnoreUnexpectedChildShards(), 0L, shardSyncer, null); - result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call(); - } else { - LOG.info("Skipping shard sync per config setting (and lease table is not empty)"); + // Perform initial lease sync if configs allow it, with jitter. + if (shouldInitiateLeaseSync()) { + LOG.info(config.getWorkerIdentifier() + " worker is beginning initial lease sync."); + TaskResult result = leaderElectedPeriodicShardSyncManager.syncShardsOnce(); + if (result.getException() != null) { + throw result.getException(); + } } - if (result == null || result.getException() == null) { - if (!leaseCoordinator.isRunning()) { - LOG.info("Starting LeaseCoordinator"); - leaseCoordinator.start(); - } else { - LOG.info("LeaseCoordinator is already running. No need to start it."); - } - shardSyncStrategy.onWorkerInitialization(); - isDone = true; + // If we reach this point, then we either skipped the lease sync or did not have any exception for the + // shard sync in the previous attempt. + if (!leaseCoordinator.isRunning()) { + LOG.info("Starting LeaseCoordinator"); + leaseCoordinator.start(); } else { - lastException = result.getException(); + LOG.info("LeaseCoordinator is already running. No need to start it."); } + + // All shard sync strategies' initialization handlers should begin a periodic shard sync. For + // PeriodicShardSync strategy, this is the main shard sync loop. For ShardEndShardSync and other + // shard sync strategies, this serves as an auditor background process. + shardSyncStrategy.onWorkerInitialization(); + isDone = true; + } catch (LeasingException e) { LOG.error("Caught exception when initializing LeaseCoordinator", e); lastException = e; @@ -706,6 +723,7 @@ public class Worker implements Runnable { try { Thread.sleep(parentShardPollIntervalMillis); + leaderElectedPeriodicShardSyncManager.stop(); } catch (InterruptedException e) { LOG.debug("Sleep interrupted while initializing worker."); } @@ -717,6 +735,32 @@ public class Worker implements Runnable { workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.STARTED); } + @VisibleForTesting + boolean shouldInitiateLeaseSync() throws InterruptedException, DependencyException, InvalidStateException, + ProvisionedThroughputException { + + final ILeaseManager leaseManager = leaseCoordinator.getLeaseManager(); + if (skipShardSyncAtWorkerInitializationIfLeasesExist && !leaseManager.isLeaseTableEmpty()) { + LOG.info("Skipping shard sync because getSkipShardSyncAtWorkerInitializationIfLeasesExist config is set " + + "to TRUE and lease table is not empty."); + return false; + } + + final long waitTime = ThreadLocalRandom.current().nextLong(MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS, + MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS); + final long waitUntil = System.currentTimeMillis() + waitTime; + + boolean shouldInitiateLeaseSync = true; + while (System.currentTimeMillis() < waitUntil && (shouldInitiateLeaseSync = leaseManager.isLeaseTableEmpty())) { + // Check every 3 seconds if lease table is still empty, to minimize contention between all workers + // bootstrapping from empty lease table at the same time. + LOG.info("Lease table is still empty. Checking again in " + LEASE_TABLE_CHECK_FREQUENCY_MILLIS + " ms."); + Thread.sleep(LEASE_TABLE_CHECK_FREQUENCY_MILLIS); + } + + return shouldInitiateLeaseSync; + } + /** * NOTE: This method is internal/private to the Worker class. It has package access solely for testing. * @@ -1163,20 +1207,33 @@ public class Worker implements Runnable { } } - private PeriodicShardSyncStrategy createPeriodicShardSyncStrategy(IKinesisProxy kinesisProxy, - ILeaseManager leaseManager) { - return new PeriodicShardSyncStrategy( - new PeriodicShardSyncManager(config.getWorkerIdentifier(), leaderDecider, - new ShardSyncTask(kinesisProxy, leaseManager, config.getInitialPositionInStreamExtended(), - config.shouldCleanupLeasesUponShardCompletion(), - config.shouldIgnoreUnexpectedChildShards(), SHARD_SYNC_SLEEP_FOR_PERIODIC_SHARD_SYNC, - shardSyncer, null), metricsFactory)); + private PeriodicShardSyncStrategy createPeriodicShardSyncStrategy() { + return new PeriodicShardSyncStrategy(createPeriodicShardSyncManager()); } private ShardEndShardSyncStrategy createShardEndShardSyncStrategy(ShardSyncTaskManager shardSyncTaskManager) { return new ShardEndShardSyncStrategy(shardSyncTaskManager); } + private LeaderDecider createLeaderDecider() { + return new DeterministicShuffleShardSyncLeaderDecider(leaseCoordinator.getLeaseManager(), + Executors.newSingleThreadScheduledExecutor(), PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT); + } + + private PeriodicShardSyncManager createPeriodicShardSyncManager() { + return new PeriodicShardSyncManager(config.getWorkerIdentifier(), + leaderDecider, + new ShardSyncTask(streamConfig.getStreamProxy(), + leaseCoordinator.getLeaseManager(), + config.getInitialPositionInStreamExtended(), + config.shouldCleanupLeasesUponShardCompletion(), + config.shouldIgnoreUnexpectedChildShards(), + SHARD_SYNC_SLEEP_FOR_PERIODIC_SHARD_SYNC, + shardSyncer, + null), + metricsFactory); + } + /** * Extension to CWMetricsFactory, so worker can identify whether it owns the metrics factory instance or not. * Visible and non-final only for testing. @@ -1241,6 +1298,8 @@ public class Worker implements Runnable { private ILeaseRenewer leaseRenewer; @Setter @Accessors(fluent = true) private ShardSyncer shardSyncer; + @Setter @Accessors(fluent = true) + private PeriodicShardSyncManager periodicShardSyncManager; @VisibleForTesting @@ -1379,7 +1438,7 @@ public class Worker implements Runnable { } // We expect users to either inject both LeaseRenewer and the corresponding thread-pool, or neither of them (DEFAULT). - if (leaseRenewer == null){ + if (leaseRenewer == null) { ExecutorService leaseRenewerThreadPool = LeaseCoordinator.getDefaultLeaseRenewalExecutorService(config.getMaxLeaseRenewalThreads()); leaseRenewer = new LeaseRenewer<>(leaseManager, config.getWorkerIdentifier(), config.getFailoverTimeMillis(), leaseRenewerThreadPool); } @@ -1389,6 +1448,20 @@ public class Worker implements Runnable { Executors.newSingleThreadScheduledExecutor(), PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT); } + if (periodicShardSyncManager == null) { + periodicShardSyncManager = new PeriodicShardSyncManager(config.getWorkerIdentifier(), + leaderDecider, + new ShardSyncTask(kinesisProxy, + leaseManager, + config.getInitialPositionInStreamExtended(), + config.shouldCleanupLeasesUponShardCompletion(), + config.shouldIgnoreUnexpectedChildShards(), + SHARD_SYNC_SLEEP_FOR_PERIODIC_SHARD_SYNC, + shardSyncer, + null), + metricsFactory); + } + return new Worker(config.getApplicationName(), recordProcessorFactory, config, @@ -1419,7 +1492,10 @@ public class Worker implements Runnable { shardPrioritization, config.getRetryGetRecordsInSeconds(), config.getMaxGetRecordsThreadPool(), - workerStateChangeListener, shardSyncer, leaderDecider); + workerStateChangeListener, + shardSyncer, + leaderDecider, + periodicShardSyncManager); } > R createClient(final T builder, diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java index 83dcd4af..48d71f6d 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java @@ -18,7 +18,6 @@ import java.io.File; import java.io.IOException; import java.math.BigInteger; import java.util.ArrayList; -import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.HashSet; @@ -30,7 +29,6 @@ import java.util.stream.Stream; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.local.embedded.DynamoDBEmbedded; -import com.amazonaws.services.kinesis.leases.impl.Lease; import com.amazonaws.services.kinesis.model.ShardFilter; import com.amazonaws.services.kinesis.model.ShardFilterType; import org.apache.commons.logging.Log; @@ -60,7 +58,7 @@ import com.amazonaws.services.kinesis.model.Shard; import junit.framework.Assert; -import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; @@ -1816,10 +1814,13 @@ public class ShardSyncerTest { dataFile.deleteOnExit(); final IKinesisProxy kinesisProxy = spy(new KinesisLocalFileProxy(dataFile.getAbsolutePath())); + // Make sure ListShardsWithFilter is called in all public shard sync methods shardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, initialPosition, cleanupLeasesOfCompletedShards, false); + shardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, initialPosition, + cleanupLeasesOfCompletedShards, false, null); - verify(kinesisProxy, atLeastOnce()).getShardListWithFilter(shardFilter); + verify(kinesisProxy, atLeast(2)).getShardListWithFilter(shardFilter); verify(kinesisProxy, never()).getShardList(); } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index a2faf607..b4b2a597 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -72,6 +72,7 @@ import com.amazonaws.services.kinesis.leases.impl.KinesisClientLeaseBuilder; import com.amazonaws.services.kinesis.leases.impl.KinesisClientLeaseManager; import com.amazonaws.services.kinesis.leases.impl.LeaseManager; import com.amazonaws.services.kinesis.leases.interfaces.LeaseSelector; +import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.hamcrest.Condition; @@ -158,6 +159,7 @@ public class WorkerTest { private static final String KINESIS_SHARD_ID_FORMAT = "kinesis-0-0-%d"; private static final String CONCURRENCY_TOKEN_FORMAT = "testToken-%d"; + private static final String WORKER_ID = "workerId"; private RecordsFetcherFactory recordsFetcherFactory; private KinesisClientLibConfiguration config; @@ -194,7 +196,7 @@ public class WorkerTest { @Before public void setup() { - config = spy(new KinesisClientLibConfiguration("app", null, null, null)); + config = spy(new KinesisClientLibConfiguration("app", null, null, WORKER_ID)); recordsFetcherFactory = spy(new SimpleRecordsFetcherFactory()); when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory); } @@ -244,7 +246,7 @@ public class WorkerTest { @Test public final void testGetStageName() { final String stageName = "testStageName"; - config = new KinesisClientLibConfiguration(stageName, null, null, null); + config = new KinesisClientLibConfiguration(stageName, null, null, WORKER_ID); Worker worker = new Worker(v1RecordProcessorFactory, config); Assert.assertEquals(stageName, worker.getApplicationName()); } @@ -253,7 +255,7 @@ public class WorkerTest { public final void testCreateOrGetShardConsumer() { final String stageName = "testStageName"; IRecordProcessorFactory streamletFactory = SAMPLE_RECORD_PROCESSOR_FACTORY_V2; - config = new KinesisClientLibConfiguration(stageName, null, null, null); + config = new KinesisClientLibConfiguration(stageName, null, null, WORKER_ID); IKinesisProxy proxy = null; ICheckpoint checkpoint = null; int maxRecords = 1; @@ -372,7 +374,7 @@ public class WorkerTest { public final void testCleanupShardConsumers() { final String stageName = "testStageName"; IRecordProcessorFactory streamletFactory = SAMPLE_RECORD_PROCESSOR_FACTORY_V2; - config = new KinesisClientLibConfiguration(stageName, null, null, null); + config = new KinesisClientLibConfiguration(stageName, null, null, WORKER_ID); IKinesisProxy proxy = null; ICheckpoint checkpoint = null; int maxRecords = 1; @@ -429,12 +431,12 @@ public class WorkerTest { } @Test - public final void testInitializationFailureWithRetries() { + public final void testInitializationFailureWithRetries() throws Exception { String stageName = "testInitializationWorker"; IRecordProcessorFactory recordProcessorFactory = new TestStreamletFactory(null, null); - config = new KinesisClientLibConfiguration(stageName, null, null, null); + config = new KinesisClientLibConfiguration(stageName, null, null, WORKER_ID); int count = 0; - when(proxy.getShardList()).thenThrow(new RuntimeException(Integer.toString(count++))); + when(proxy.getShardListWithFilter(any())).thenThrow(new RuntimeException(Integer.toString(count++))); int maxRecords = 2; long idleTimeInMilliseconds = 1L; StreamConfig streamConfig = @@ -443,6 +445,7 @@ public class WorkerTest { idleTimeInMilliseconds, callProcessRecordsForEmptyRecordList, skipCheckpointValidationValue, INITIAL_POSITION_LATEST); when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); + when(leaseManager.isLeaseTableEmpty()).thenReturn(true); ExecutorService execService = Executors.newSingleThreadExecutor(); long shardPollInterval = 0L; Worker worker = @@ -465,6 +468,79 @@ public class WorkerTest { Assert.assertTrue(count > 0); } + @Test + public final void testInitializationWaitsWhenLeaseTableIsEmpty() throws Exception { + final String stageName = "testInitializationWorker"; + when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); + when(leaseManager.isLeaseTableEmpty()).thenReturn(true); + + final int maxRecords = 2; + final long idleTimeInMilliseconds = 1L; + final StreamConfig streamConfig = new StreamConfig(proxy, maxRecords, idleTimeInMilliseconds, + callProcessRecordsForEmptyRecordList, skipCheckpointValidationValue, INITIAL_POSITION_LATEST); + + final long shardPollInterval = 0L; + final Worker worker = + new Worker(stageName, + v2RecordProcessorFactory, + config, + streamConfig, INITIAL_POSITION_TRIM_HORIZON, + shardPollInterval, + shardSyncIntervalMillis, + cleanupLeasesUponShardCompletion, + leaseCoordinator, + leaseCoordinator, + Executors.newSingleThreadExecutor(), + nullMetricsFactory, + taskBackoffTimeMillis, + failoverTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, + shardPrioritization); + + final long startTime = System.currentTimeMillis(); + worker.shouldInitiateLeaseSync(); + final long endTime = System.currentTimeMillis(); + + assertTrue(endTime - startTime > Worker.MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS); + assertTrue(endTime - startTime < Worker.MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS + Worker.LEASE_TABLE_CHECK_FREQUENCY_MILLIS); + } + + @Test + public final void testInitializationDoesntWaitWhenLeaseTableIsNotEmpty() throws Exception { + final String stageName = "testInitializationWorker"; + when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); + when(leaseManager.isLeaseTableEmpty()).thenReturn(false); + + final int maxRecords = 2; + final long idleTimeInMilliseconds = 1L; + final StreamConfig streamConfig = new StreamConfig(proxy, maxRecords, idleTimeInMilliseconds, + callProcessRecordsForEmptyRecordList, skipCheckpointValidationValue, INITIAL_POSITION_LATEST); + + final long shardPollInterval = 0L; + final Worker worker = + new Worker(stageName, + v2RecordProcessorFactory, + config, + streamConfig, INITIAL_POSITION_TRIM_HORIZON, + shardPollInterval, + shardSyncIntervalMillis, + cleanupLeasesUponShardCompletion, + leaseCoordinator, + leaseCoordinator, + Executors.newSingleThreadExecutor(), + nullMetricsFactory, + taskBackoffTimeMillis, + failoverTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, + shardPrioritization); + + final long startTime = System.currentTimeMillis(); + worker.shouldInitiateLeaseSync(); + final long endTime = System.currentTimeMillis(); + + assertTrue(endTime - startTime < Worker.MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS); + } + /** * Runs worker with threadPoolSize == numShards * Test method for {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker#run()}. @@ -576,6 +652,7 @@ public class WorkerTest { final ExecutorService executorService = mock(ThreadPoolExecutor.class); final CWMetricsFactory cwMetricsFactory = mock(CWMetricsFactory.class); + when(cwMetricsFactory.createMetrics()).thenReturn(mock(IMetricsScope.class)); // Make sure that worker thread is run before invoking shutdown. final CountDownLatch workerStarted = new CountDownLatch(1); doAnswer(new Answer() { @@ -1708,7 +1785,7 @@ public class WorkerTest { public void testBuilderSetRegionAndEndpointToClient() { IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); final String endpoint = "TestEndpoint"; - KinesisClientLibConfiguration config = new KinesisClientLibConfiguration("TestApp", null, null, null) + KinesisClientLibConfiguration config = new KinesisClientLibConfiguration("TestApp", null, null, WORKER_ID) .withRegionName(Regions.US_WEST_2.getName()) .withKinesisEndpoint(endpoint) .withDynamoDBEndpoint(endpoint); @@ -1736,7 +1813,7 @@ public class WorkerTest { public void testBuilderSetRegionToClient() { IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); String region = Regions.US_WEST_2.getName(); - KinesisClientLibConfiguration config = new KinesisClientLibConfiguration("TestApp", null, null, null) + KinesisClientLibConfiguration config = new KinesisClientLibConfiguration("TestApp", null, null, WORKER_ID) .withRegionName(region); Worker.Builder builder = new Worker.Builder(); @@ -1763,7 +1840,7 @@ public class WorkerTest { @Test public void testBuilderGenerateClients() { IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); - KinesisClientLibConfiguration config = new KinesisClientLibConfiguration("TestApp", null, null, null); + KinesisClientLibConfiguration config = new KinesisClientLibConfiguration("TestApp", null, null, WORKER_ID); Worker.Builder builder = spy(new Worker.Builder().recordProcessorFactory(recordProcessorFactory).config(config)); ArgumentCaptor builderCaptor = ArgumentCaptor.forClass(AwsClientBuilder.class); @@ -1789,7 +1866,7 @@ public class WorkerTest { public void testBuilderGenerateClientsWithRegion() { IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); String region = Regions.US_WEST_2.getName(); - KinesisClientLibConfiguration config = new KinesisClientLibConfiguration("TestApp", null, null, null) + KinesisClientLibConfiguration config = new KinesisClientLibConfiguration("TestApp", null, null, WORKER_ID) .withRegionName(region); ArgumentCaptor builderCaptor = ArgumentCaptor.forClass(AwsClientBuilder.class); @@ -1809,7 +1886,7 @@ public class WorkerTest { IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); String region = Regions.US_WEST_2.getName(); String endpointUrl = "TestEndpoint"; - KinesisClientLibConfiguration config = new KinesisClientLibConfiguration("TestApp", null, null, null) + KinesisClientLibConfiguration config = new KinesisClientLibConfiguration("TestApp", null, null, WORKER_ID) .withRegionName(region).withKinesisEndpoint(endpointUrl).withDynamoDBEndpoint(endpointUrl); Worker.Builder builder = spy(new Worker.Builder());