diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index 2756c803..dd5f8c41 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -191,6 +191,11 @@ public class KinesisClientLibConfiguration { * The number of times the Proxy will retry listShards call when throttled. */ public static final int DEFAULT_MAX_LIST_SHARDS_RETRY_ATTEMPTS = 50; + + /** + * The number of times a worker will try to initialize before exiting + */ + public static final int DEFAULT_WORKER_INITIALIZATION_RETRY_ATTEMPTS = 20; private String applicationName; private String tableName; @@ -254,6 +259,9 @@ public class KinesisClientLibConfiguration { @Getter private int maxListShardsRetryAttempts = DEFAULT_MAX_LIST_SHARDS_RETRY_ATTEMPTS; + + @Getter + private int maxWorkerInitializationRetryAttempts = DEFAULT_WORKER_INITIALIZATION_RETRY_ATTEMPTS; /** * Constructor. @@ -1407,7 +1415,7 @@ public class KinesisClientLibConfiguration { /** * @param listShardsBackoffTimeInMillis Max sleep between two listShards call when throttled * in {@link com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy}. - * @return + * @return KinesisClientLibConfiguration */ public KinesisClientLibConfiguration withListShardsBackoffTimeInMillis(long listShardsBackoffTimeInMillis) { checkIsValuePositive("listShardsBackoffTimeInMillis", listShardsBackoffTimeInMillis); @@ -1418,11 +1426,22 @@ public class KinesisClientLibConfiguration { /** * @param maxListShardsRetryAttempts Max number of retries for listShards when throttled * in {@link com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy}. - * @return + * @return KinesisClientLibConfiguration */ public KinesisClientLibConfiguration withMaxListShardsRetryAttempts(int maxListShardsRetryAttempts) { checkIsValuePositive("maxListShardsRetryAttempts", maxListShardsRetryAttempts); this.maxListShardsRetryAttempts = maxListShardsRetryAttempts; return this; } + + /** + * @param maxWorkerInitializationRetryAttempts Max number of retries a worker will attempt to initialize before exiting + * in {@link com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy}. + * @return KinesisClientLibConfiguration + */ + public KinesisClientLibConfiguration withMaxWorkerInitializationRetryAttempts(int maxWorkerInitializationRetryAttempts) { + checkIsValuePositive("maxWorkerInitializationRetryAttempts", maxWorkerInitializationRetryAttempts); + this.maxWorkerInitializationRetryAttempts = maxWorkerInitializationRetryAttempts; + return this; + } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 4a03b449..397bfb64 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -72,7 +72,6 @@ public class Worker implements Runnable { private static final Log LOG = LogFactory.getLog(Worker.class); - private static final int MAX_INITIALIZATION_ATTEMPTS = 20; private static final WorkerStateChangeListener DEFAULT_WORKER_STATE_CHANGE_LISTENER = new NoOpWorkerStateChangeListener(); private WorkerLog wlog = new WorkerLog(); @@ -566,7 +565,7 @@ public class Worker implements Runnable { initialize(); LOG.info("Initialization complete. Starting worker loop."); } catch (RuntimeException e1) { - LOG.error("Unable to initialize after " + MAX_INITIALIZATION_ATTEMPTS + " attempts. Shutting down.", e1); + LOG.error("Unable to initialize after " + config.getMaxWorkerInitializationRetryAttempts() + " attempts. Shutting down.", e1); shutdown(); } @@ -619,7 +618,7 @@ public class Worker implements Runnable { boolean isDone = false; Exception lastException = null; - for (int i = 0; (!isDone) && (i < MAX_INITIALIZATION_ATTEMPTS); i++) { + for (int i = 0; (!isDone) && (i < config.getMaxWorkerInitializationRetryAttempts()); i++) { try { LOG.info("Initialization attempt " + (i + 1)); LOG.info("Initializing LeaseCoordinator"); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java index cccbcb30..bec90578 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java @@ -418,4 +418,25 @@ public class KinesisClientLibConfigurationTest { config = config.withIgnoreUnexpectedChildShards(true); assertTrue(config.shouldIgnoreUnexpectedChildShards()); } + + /** + * Test testDefaultWorkerInitRetry() default value of 20 + */ + @Test + public final void testDefaultWorkerInitRetry() { + final String stageName = "testStageName"; + KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(stageName, null, null, null); + Assert.assertEquals(config.getMaxWorkerInitializationRetryAttempts(), 20); + } + + /** + * Test testDefaultWorkerInitRetry() custom value of 10 + */ + @Test + public final void testCustomWorkerInitRetry() { + final String stageName = "testStageName"; + KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(stageName, null, null, null).withMaxWorkerInitializationRetryAttempts(10); + Assert.assertEquals(config.getMaxWorkerInitializationRetryAttempts(), 10); + } + }