Add configurable max initialization attempts (#39)

* Add configurable max initialization attempts

* Add maxInitializationAttempts to unit test setup
This commit is contained in:
Micah Jaffe 2020-05-15 14:02:44 -07:00 committed by GitHub
parent fdad20aff4
commit 6b474b7390
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 24 additions and 4 deletions

View file

@ -200,6 +200,11 @@ public class KinesisClientLibConfiguration {
*/
public static final int DEFAULT_MAX_LIST_SHARDS_RETRY_ATTEMPTS = 50;
/**
* The number of times the {@link Worker} will try to initialize before giving up.
*/
public static final int DEFAULT_MAX_INITIALIZATION_ATTEMPTS = 20;
@Getter
private BillingMode billingMode;
private String applicationName;
@ -266,6 +271,9 @@ public class KinesisClientLibConfiguration {
@Getter
private int maxListShardsRetryAttempts = DEFAULT_MAX_LIST_SHARDS_RETRY_ATTEMPTS;
@Getter
private int maxInitializationAttempts = DEFAULT_MAX_INITIALIZATION_ATTEMPTS;
/**
* Constructor.
*
@ -1458,4 +1466,14 @@ public class KinesisClientLibConfiguration {
this.maxListShardsRetryAttempts = maxListShardsRetryAttempts;
return this;
}
/**
* @param maxInitializationAttempts Max number of Worker initialization attempts before giving up
* @return
*/
public KinesisClientLibConfiguration withMaxInitializationAttempts(int maxInitializationAttempts) {
checkIsValuePositive("maxInitializationAttempts", maxInitializationAttempts);
this.maxInitializationAttempts = maxInitializationAttempts;
return this;
}
}

View file

@ -94,7 +94,6 @@ public class Worker implements Runnable {
// Default configs for periodic shard sync
private static final int SHARD_SYNC_SLEEP_FOR_PERIODIC_SHARD_SYNC = 0;
private static final int MAX_INITIALIZATION_ATTEMPTS = 20;
private static final int PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT = 1; //Default for KCL.
static final long LEASE_TABLE_CHECK_FREQUENCY_MILLIS = 3 * 1000L;
static final long MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 1 * 1000L;
@ -661,7 +660,8 @@ public class Worker implements Runnable {
initialize();
LOG.info("Initialization complete. Starting worker loop.");
} catch (RuntimeException e1) {
LOG.error("Unable to initialize after " + MAX_INITIALIZATION_ATTEMPTS + " attempts. Shutting down.", e1);
LOG.error("Unable to initialize after " + config.getMaxInitializationAttempts() + " attempts. " +
"Shutting down.", e1);
shutdown();
}
@ -714,7 +714,7 @@ public class Worker implements Runnable {
boolean isDone = false;
Exception lastException = null;
for (int i = 0; (!isDone) && (i < MAX_INITIALIZATION_ATTEMPTS); i++) {
for (int i = 0; (!isDone) && (i < config.getMaxInitializationAttempts()); i++) {
try {
LOG.info("Initialization attempt " + (i + 1));
LOG.info("Initializing LeaseCoordinator");

View file

@ -197,6 +197,7 @@ public class WorkerTest {
@Before
public void setup() {
config = spy(new KinesisClientLibConfiguration("app", null, null, WORKER_ID));
config.withMaxInitializationAttempts(1);
recordsFetcherFactory = spy(new SimpleRecordsFetcherFactory());
when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory);
}
@ -435,6 +436,8 @@ public class WorkerTest {
String stageName = "testInitializationWorker";
IRecordProcessorFactory recordProcessorFactory = new TestStreamletFactory(null, null);
config = new KinesisClientLibConfiguration(stageName, null, null, WORKER_ID);
config.withMaxInitializationAttempts(2);
int count = 0;
when(proxy.getShardListWithFilter(any())).thenThrow(new RuntimeException(Integer.toString(count++)));
int maxRecords = 2;
@ -612,7 +615,6 @@ public class WorkerTest {
@Test
public final void testWorkerShutsDownOwnedResources() throws Exception {
final long failoverTimeMillis = 20L;
// Make sure that worker thread is run before invoking shutdown.