issue #42 - Make worker init retry attempts configurable

This commit is contained in:
Chris Collins 2018-03-15 15:04:29 -03:00
parent a53473d536
commit 6426dbf419
3 changed files with 44 additions and 5 deletions

View file

@ -192,6 +192,11 @@ public class KinesisClientLibConfiguration {
*/ */
public static final int DEFAULT_MAX_LIST_SHARDS_RETRY_ATTEMPTS = 50; public static final int DEFAULT_MAX_LIST_SHARDS_RETRY_ATTEMPTS = 50;
/**
* The number of times a worker will try to initialize before exiting
*/
public static final int DEFAULT_WORKER_INITIALIZATION_RETRY_ATTEMPTS = 20;
private String applicationName; private String applicationName;
private String tableName; private String tableName;
private String streamName; private String streamName;
@ -255,6 +260,9 @@ public class KinesisClientLibConfiguration {
@Getter @Getter
private int maxListShardsRetryAttempts = DEFAULT_MAX_LIST_SHARDS_RETRY_ATTEMPTS; private int maxListShardsRetryAttempts = DEFAULT_MAX_LIST_SHARDS_RETRY_ATTEMPTS;
@Getter
private int maxWorkerInitializationRetryAttempts = DEFAULT_WORKER_INITIALIZATION_RETRY_ATTEMPTS;
/** /**
* Constructor. * Constructor.
* *
@ -1407,7 +1415,7 @@ public class KinesisClientLibConfiguration {
/** /**
* @param listShardsBackoffTimeInMillis Max sleep between two listShards call when throttled * @param listShardsBackoffTimeInMillis Max sleep between two listShards call when throttled
* in {@link com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy}. * in {@link com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy}.
* @return * @return KinesisClientLibConfiguration
*/ */
public KinesisClientLibConfiguration withListShardsBackoffTimeInMillis(long listShardsBackoffTimeInMillis) { public KinesisClientLibConfiguration withListShardsBackoffTimeInMillis(long listShardsBackoffTimeInMillis) {
checkIsValuePositive("listShardsBackoffTimeInMillis", listShardsBackoffTimeInMillis); checkIsValuePositive("listShardsBackoffTimeInMillis", listShardsBackoffTimeInMillis);
@ -1418,11 +1426,22 @@ public class KinesisClientLibConfiguration {
/** /**
* @param maxListShardsRetryAttempts Max number of retries for listShards when throttled * @param maxListShardsRetryAttempts Max number of retries for listShards when throttled
* in {@link com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy}. * in {@link com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy}.
* @return * @return KinesisClientLibConfiguration
*/ */
public KinesisClientLibConfiguration withMaxListShardsRetryAttempts(int maxListShardsRetryAttempts) { public KinesisClientLibConfiguration withMaxListShardsRetryAttempts(int maxListShardsRetryAttempts) {
checkIsValuePositive("maxListShardsRetryAttempts", maxListShardsRetryAttempts); checkIsValuePositive("maxListShardsRetryAttempts", maxListShardsRetryAttempts);
this.maxListShardsRetryAttempts = maxListShardsRetryAttempts; this.maxListShardsRetryAttempts = maxListShardsRetryAttempts;
return this; return this;
} }
/**
* @param maxWorkerInitializationRetryAttempts Max number of retries a worker will attempt to initialize before exiting
* in {@link com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy}.
* @return KinesisClientLibConfiguration
*/
public KinesisClientLibConfiguration withMaxWorkerInitializationRetryAttempts(int maxWorkerInitializationRetryAttempts) {
checkIsValuePositive("maxWorkerInitializationRetryAttempts", maxWorkerInitializationRetryAttempts);
this.maxWorkerInitializationRetryAttempts = maxWorkerInitializationRetryAttempts;
return this;
}
} }

View file

@ -72,7 +72,6 @@ public class Worker implements Runnable {
private static final Log LOG = LogFactory.getLog(Worker.class); private static final Log LOG = LogFactory.getLog(Worker.class);
private static final int MAX_INITIALIZATION_ATTEMPTS = 20;
private static final WorkerStateChangeListener DEFAULT_WORKER_STATE_CHANGE_LISTENER = new NoOpWorkerStateChangeListener(); private static final WorkerStateChangeListener DEFAULT_WORKER_STATE_CHANGE_LISTENER = new NoOpWorkerStateChangeListener();
private WorkerLog wlog = new WorkerLog(); private WorkerLog wlog = new WorkerLog();
@ -566,7 +565,7 @@ public class Worker implements Runnable {
initialize(); initialize();
LOG.info("Initialization complete. Starting worker loop."); LOG.info("Initialization complete. Starting worker loop.");
} catch (RuntimeException e1) { } catch (RuntimeException e1) {
LOG.error("Unable to initialize after " + MAX_INITIALIZATION_ATTEMPTS + " attempts. Shutting down.", e1); LOG.error("Unable to initialize after " + config.getMaxWorkerInitializationRetryAttempts() + " attempts. Shutting down.", e1);
shutdown(); shutdown();
} }
@ -619,7 +618,7 @@ public class Worker implements Runnable {
boolean isDone = false; boolean isDone = false;
Exception lastException = null; Exception lastException = null;
for (int i = 0; (!isDone) && (i < MAX_INITIALIZATION_ATTEMPTS); i++) { for (int i = 0; (!isDone) && (i < config.getMaxWorkerInitializationRetryAttempts()); i++) {
try { try {
LOG.info("Initialization attempt " + (i + 1)); LOG.info("Initialization attempt " + (i + 1));
LOG.info("Initializing LeaseCoordinator"); LOG.info("Initializing LeaseCoordinator");

View file

@ -418,4 +418,25 @@ public class KinesisClientLibConfigurationTest {
config = config.withIgnoreUnexpectedChildShards(true); config = config.withIgnoreUnexpectedChildShards(true);
assertTrue(config.shouldIgnoreUnexpectedChildShards()); assertTrue(config.shouldIgnoreUnexpectedChildShards());
} }
/**
* Test testDefaultWorkerInitRetry() default value of 20
*/
@Test
public final void testDefaultWorkerInitRetry() {
final String stageName = "testStageName";
KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(stageName, null, null, null);
Assert.assertEquals(config.getMaxWorkerInitializationRetryAttempts(), 20);
}
/**
* Test testDefaultWorkerInitRetry() custom value of 10
*/
@Test
public final void testCustomWorkerInitRetry() {
final String stageName = "testStageName";
KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(stageName, null, null, null).withMaxWorkerInitializationRetryAttempts(10);
Assert.assertEquals(config.getMaxWorkerInitializationRetryAttempts(), 10);
}
} }