From 6b474b73908151108436d66790645737fe79d7e7 Mon Sep 17 00:00:00 2001 From: Micah Jaffe <31011877+micah-jaffe@users.noreply.github.com> Date: Fri, 15 May 2020 14:02:44 -0700 Subject: [PATCH] Add configurable max initialization attempts (#39) * Add configurable max initialization attempts * Add maxInitializationAttempts to unit test setup --- .../worker/KinesisClientLibConfiguration.java | 18 ++++++++++++++++++ .../clientlibrary/lib/worker/Worker.java | 6 +++--- .../clientlibrary/lib/worker/WorkerTest.java | 4 +++- 3 files changed, 24 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index ba01bc9e..86e7a496 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -200,6 +200,11 @@ public class KinesisClientLibConfiguration { */ public static final int DEFAULT_MAX_LIST_SHARDS_RETRY_ATTEMPTS = 50; + /** + * The number of times the {@link Worker} will try to initialize before giving up. + */ + public static final int DEFAULT_MAX_INITIALIZATION_ATTEMPTS = 20; + @Getter private BillingMode billingMode; private String applicationName; @@ -266,6 +271,9 @@ public class KinesisClientLibConfiguration { @Getter private int maxListShardsRetryAttempts = DEFAULT_MAX_LIST_SHARDS_RETRY_ATTEMPTS; + @Getter + private int maxInitializationAttempts = DEFAULT_MAX_INITIALIZATION_ATTEMPTS; + /** * Constructor. * @@ -1458,4 +1466,14 @@ public class KinesisClientLibConfiguration { this.maxListShardsRetryAttempts = maxListShardsRetryAttempts; return this; } + + /** + * @param maxInitializationAttempts Max number of Worker initialization attempts before giving up + * @return + */ + public KinesisClientLibConfiguration withMaxInitializationAttempts(int maxInitializationAttempts) { + checkIsValuePositive("maxInitializationAttempts", maxInitializationAttempts); + this.maxInitializationAttempts = maxInitializationAttempts; + return this; + } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index eec3910d..a86771e3 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -94,7 +94,6 @@ public class Worker implements Runnable { // Default configs for periodic shard sync private static final int SHARD_SYNC_SLEEP_FOR_PERIODIC_SHARD_SYNC = 0; - private static final int MAX_INITIALIZATION_ATTEMPTS = 20; private static final int PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT = 1; //Default for KCL. static final long LEASE_TABLE_CHECK_FREQUENCY_MILLIS = 3 * 1000L; static final long MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 1 * 1000L; @@ -661,7 +660,8 @@ public class Worker implements Runnable { initialize(); LOG.info("Initialization complete. Starting worker loop."); } catch (RuntimeException e1) { - LOG.error("Unable to initialize after " + MAX_INITIALIZATION_ATTEMPTS + " attempts. Shutting down.", e1); + LOG.error("Unable to initialize after " + config.getMaxInitializationAttempts() + " attempts. " + + "Shutting down.", e1); shutdown(); } @@ -714,7 +714,7 @@ public class Worker implements Runnable { boolean isDone = false; Exception lastException = null; - for (int i = 0; (!isDone) && (i < MAX_INITIALIZATION_ATTEMPTS); i++) { + for (int i = 0; (!isDone) && (i < config.getMaxInitializationAttempts()); i++) { try { LOG.info("Initialization attempt " + (i + 1)); LOG.info("Initializing LeaseCoordinator"); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index 08ad6efd..f6bfdd73 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -197,6 +197,7 @@ public class WorkerTest { @Before public void setup() { config = spy(new KinesisClientLibConfiguration("app", null, null, WORKER_ID)); + config.withMaxInitializationAttempts(1); recordsFetcherFactory = spy(new SimpleRecordsFetcherFactory()); when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory); } @@ -435,6 +436,8 @@ public class WorkerTest { String stageName = "testInitializationWorker"; IRecordProcessorFactory recordProcessorFactory = new TestStreamletFactory(null, null); config = new KinesisClientLibConfiguration(stageName, null, null, WORKER_ID); + config.withMaxInitializationAttempts(2); + int count = 0; when(proxy.getShardListWithFilter(any())).thenThrow(new RuntimeException(Integer.toString(count++))); int maxRecords = 2; @@ -612,7 +615,6 @@ public class WorkerTest { @Test public final void testWorkerShutsDownOwnedResources() throws Exception { - final long failoverTimeMillis = 20L; // Make sure that worker thread is run before invoking shutdown.