Support initialize Worker on creation in order to validate dependency on application startup.

Succeeding deployment and putting hosts into service without knowing KCL worker would work correctly is a risk of outage. This change is to support an option of allowing KCL applications to fast-fail deployment if KCL workers are not initialized correctly.
This commit is contained in:
Thuan Duong-Ba 2020-01-16 18:06:54 -08:00
parent 02c2036d5d
commit cd9d7e319e
2 changed files with 169 additions and 10 deletions

View file

@ -148,6 +148,9 @@ public class Worker implements Runnable {
private LeaderDecider leaderDecider;
private ShardSyncStrategy shardSyncStrategy;
@VisibleForTesting
boolean isInitialized;
/**
* Constructor.
*
@ -406,7 +409,8 @@ public class Worker implements Runnable {
config.getShardPrioritizationStrategy(),
config.getRetryGetRecordsInSeconds(),
config.getMaxGetRecordsThreadPool(),
DEFAULT_WORKER_STATE_CHANGE_LISTENER, DEFAULT_LEASE_CLEANUP_VALIDATOR, null /* leaderDecider */);
DEFAULT_WORKER_STATE_CHANGE_LISTENER, DEFAULT_LEASE_CLEANUP_VALIDATOR, null /* leaderDecider */,
false);
// If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB.
if (config.getRegionName() != null) {
@ -467,7 +471,7 @@ public class Worker implements Runnable {
shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService,
metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist,
shardPrioritization, Optional.empty(), Optional.empty(), DEFAULT_WORKER_STATE_CHANGE_LISTENER,
DEFAULT_LEASE_CLEANUP_VALIDATOR, null);
DEFAULT_LEASE_CLEANUP_VALIDATOR, null, false);
}
/**
@ -507,6 +511,9 @@ public class Worker implements Runnable {
* Max number of threads in the getRecords thread pool.
* @param leaseCleanupValidator
* leaseCleanupValidator instance used to validate leases
* @param initializeOnCreation
* initializeOnCreation used to initialize Worker on creation, validate dependencies and make sure that
* it would be ready to process records once created.
*/
// NOTE: This has package level access solely for testing
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES
@ -517,13 +524,30 @@ public class Worker implements Runnable {
IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization,
Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> maxGetRecordsThreadPool, WorkerStateChangeListener workerStateChangeListener,
LeaseCleanupValidator leaseCleanupValidator, LeaderDecider leaderDecider) {
LeaseCleanupValidator leaseCleanupValidator, LeaderDecider leaderDecider,
boolean initializeOnCreation) {
this(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream,
parentShardPollIntervalMillis, shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint,
leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis,
skipShardSyncAtWorkerInitializationIfLeasesExist, shardPrioritization, retryGetRecordsInSeconds,
maxGetRecordsThreadPool, workerStateChangeListener, new KinesisShardSyncer(leaseCleanupValidator),
leaderDecider);
leaderDecider, initializeOnCreation);
}
@Deprecated
Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config,
StreamConfig streamConfig, InitialPositionInStreamExtended initialPositionInStream,
long parentShardPollIntervalMillis, long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion,
ICheckpoint checkpoint, KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService,
IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization,
Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> maxGetRecordsThreadPool,
WorkerStateChangeListener workerStateChangeListener, ShardSyncer shardSyncer, LeaderDecider leaderDecider) {
this(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream,
parentShardPollIntervalMillis, shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint,
leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis,
skipShardSyncAtWorkerInitializationIfLeasesExist, shardPrioritization, retryGetRecordsInSeconds,
maxGetRecordsThreadPool, workerStateChangeListener, shardSyncer, leaderDecider, false);
}
Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config,
@ -533,7 +557,8 @@ public class Worker implements Runnable {
IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization,
Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> maxGetRecordsThreadPool,
WorkerStateChangeListener workerStateChangeListener, ShardSyncer shardSyncer, LeaderDecider leaderDecider) {
WorkerStateChangeListener workerStateChangeListener, ShardSyncer shardSyncer, LeaderDecider leaderDecider,
boolean initializeOnCreation) {
this.applicationName = applicationName;
this.recordProcessorFactory = recordProcessorFactory;
this.config = config;
@ -561,6 +586,12 @@ public class Worker implements Runnable {
this.leaderDecider = leaderDecider;
this.shardSyncStrategy = createShardSyncStrategy(config.getShardSyncStrategyType());
LOG.info(String.format("Shard sync strategy determined as %s.", shardSyncStrategy.getStrategyType().toString()));
if (initializeOnCreation) {
this.initialize();
} else {
this.isInitialized = false;
}
}
private ShardSyncStrategy createShardSyncStrategy(ShardSyncStrategyType strategyType) {
@ -663,11 +694,15 @@ public class Worker implements Runnable {
}
private void initialize() {
if (this.isInitialized) {
return;
}
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.INITIALIZING);
boolean isDone = false;
Exception lastException = null;
for (int i = 0; (!isDone) && (i < MAX_INITIALIZATION_ATTEMPTS); i++) {
for (int i = 0; (!this.isInitialized) && (i < MAX_INITIALIZATION_ATTEMPTS); i++) {
try {
LOG.info("Initialization attempt " + (i + 1));
LOG.info("Initializing LeaseCoordinator");
@ -693,7 +728,7 @@ public class Worker implements Runnable {
LOG.info("LeaseCoordinator is already running. No need to start it.");
}
shardSyncStrategy.onWorkerInitialization();
isDone = true;
this.isInitialized = true;
} else {
lastException = result.getException();
}
@ -711,7 +746,7 @@ public class Worker implements Runnable {
}
}
if (!isDone) {
if (!this.isInitialized) {
throw new RuntimeException(lastException);
}
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.STARTED);
@ -1241,6 +1276,8 @@ public class Worker implements Runnable {
private ILeaseRenewer<KinesisClientLease> leaseRenewer;
@Setter @Accessors(fluent = true)
private ShardSyncer shardSyncer;
@Setter @Accessors(fluent = true)
private boolean initializeOnCreation = false;
@VisibleForTesting
@ -1419,7 +1456,8 @@ public class Worker implements Runnable {
shardPrioritization,
config.getRetryGetRecordsInSeconds(),
config.getMaxGetRecordsThreadPool(),
workerStateChangeListener, shardSyncer, leaderDecider);
workerStateChangeListener, shardSyncer, leaderDecider,
initializeOnCreation);
}
<R, T extends AwsClientBuilder<T, R>> R createClient(final T builder,

View file

@ -66,11 +66,13 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import com.amazonaws.services.kinesis.leases.exceptions.DependencyException;
import com.amazonaws.services.kinesis.leases.impl.GenericLeaseSelector;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLeaseBuilder;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLeaseManager;
import com.amazonaws.services.kinesis.leases.impl.LeaseManager;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseTaker;
import com.amazonaws.services.kinesis.leases.interfaces.LeaseSelector;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -1822,6 +1824,125 @@ public class WorkerTest {
any(AwsClientBuilder.class), eq(null), any(ClientConfiguration.class), eq(null), eq(region));
}
@Test
public void testBuilderWithInitializeOnCreationNotSet() {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
IKinesisProxy kinesisProxy = mock(KinesisLocalFileProxy.class);
AmazonKinesis kinesisClient = mock(AmazonKinesis.class);
AmazonDynamoDB dynamoDBClient = mock(AmazonDynamoDB.class);
AmazonCloudWatch amazonCloudWatchClient = mock(AmazonCloudWatch.class);
ILeaseTaker<KinesisClientLease> leaseTaker = mock(ILeaseTaker.class);
LeaderDecider leaderDecider = mock(LeaderDecider.class);
Worker worker = new Worker.Builder()
.execService(executorService)
.recordProcessorFactory(recordProcessorFactory)
.kinesisClient(kinesisClient)
.dynamoDBClient(dynamoDBClient)
.cloudWatchClient(amazonCloudWatchClient)
.metricsFactory(nullMetricsFactory)
.leaseManager(leaseManager)
.shardPrioritization(shardPrioritization)
.config(config)
.kinesisProxy(kinesisProxy)
.shardSyncer(shardSyncer)
.leaseTaker(leaseTaker)
.leaderDecider(leaderDecider)
.build();
Assert.assertTrue(!worker.isInitialized);
}
@Test
public void testBuilderWithInitializeOnCreationNotEnabled() {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
IKinesisProxy kinesisProxy = mock(KinesisLocalFileProxy.class);
AmazonKinesis kinesisClient = mock(AmazonKinesis.class);
AmazonDynamoDB dynamoDBClient = mock(AmazonDynamoDB.class);
AmazonCloudWatch amazonCloudWatchClient = mock(AmazonCloudWatch.class);
ILeaseTaker<KinesisClientLease> leaseTaker = mock(ILeaseTaker.class);
LeaderDecider leaderDecider = mock(LeaderDecider.class);
Worker worker = new Worker.Builder()
.execService(executorService)
.recordProcessorFactory(recordProcessorFactory)
.kinesisClient(kinesisClient)
.dynamoDBClient(dynamoDBClient)
.cloudWatchClient(amazonCloudWatchClient)
.metricsFactory(nullMetricsFactory)
.leaseManager(leaseManager)
.shardPrioritization(shardPrioritization)
.config(config)
.kinesisProxy(kinesisProxy)
.shardSyncer(shardSyncer)
.leaseTaker(leaseTaker)
.leaderDecider(leaderDecider)
.initializeOnCreation(false)
.build();
Assert.assertTrue(!worker.isInitialized);
}
@Test
public void testBuilderWithInitializeOnCreationEnabled() throws DependencyException {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
IKinesisProxy kinesisProxy = mock(KinesisLocalFileProxy.class);
AmazonKinesis kinesisClient = mock(AmazonKinesis.class);
AmazonDynamoDB dynamoDBClient = mock(AmazonDynamoDB.class);
AmazonCloudWatch amazonCloudWatchClient = mock(AmazonCloudWatch.class);
ILeaseTaker<KinesisClientLease> leaseTaker = mock(ILeaseTaker.class);
LeaderDecider leaderDecider = mock(LeaderDecider.class);
when(leaseManager.waitUntilLeaseTableExists(anyLong(), anyLong())).thenReturn(true);
Worker worker = new Worker.Builder()
.execService(executorService)
.recordProcessorFactory(recordProcessorFactory)
.kinesisClient(kinesisClient)
.dynamoDBClient(dynamoDBClient)
.cloudWatchClient(amazonCloudWatchClient)
.metricsFactory(nullMetricsFactory)
.leaseManager(leaseManager)
.shardPrioritization(shardPrioritization)
.config(config)
.kinesisProxy(kinesisProxy)
.shardSyncer(shardSyncer)
.leaseTaker(leaseTaker)
.leaderDecider(leaderDecider)
.initializeOnCreation(true)
.build();
Assert.assertTrue(worker.isInitialized);
}
@Test(expected = RuntimeException.class)
public void testBuilderWithInitializeOnCreationEnabled_Exception() throws DependencyException {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
IKinesisProxy kinesisProxy = mock(KinesisLocalFileProxy.class);
AmazonKinesis kinesisClient = mock(AmazonKinesis.class);
AmazonDynamoDB dynamoDBClient = mock(AmazonDynamoDB.class);
AmazonCloudWatch amazonCloudWatchClient = mock(AmazonCloudWatch.class);
ILeaseTaker<KinesisClientLease> leaseTaker = mock(ILeaseTaker.class);
LeaderDecider leaderDecider = mock(LeaderDecider.class);
when(leaseManager.waitUntilLeaseTableExists(anyLong(), anyLong())).thenThrow(new RuntimeException(
"Failed to initialize the lease table."));
when(config.getParentShardPollIntervalMillis()).thenReturn(1L);
Worker worker = new Worker.Builder()
.execService(executorService)
.recordProcessorFactory(recordProcessorFactory)
.kinesisClient(kinesisClient)
.dynamoDBClient(dynamoDBClient)
.cloudWatchClient(amazonCloudWatchClient)
.metricsFactory(nullMetricsFactory)
.leaseManager(leaseManager)
.shardPrioritization(shardPrioritization)
.config(config)
.kinesisProxy(kinesisProxy)
.shardSyncer(shardSyncer)
.leaseTaker(leaseTaker)
.leaderDecider(leaderDecider)
.initializeOnCreation(true)
.build();
}
private abstract class InjectableWorker extends Worker {
InjectableWorker(String applicationName, IRecordProcessorFactory recordProcessorFactory,
KinesisClientLibConfiguration config, StreamConfig streamConfig,