This commit is contained in:
thuandb 2020-05-15 14:20:54 +01:00 committed by GitHub
commit 2a9205d03f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 175 additions and 13 deletions

View file

@ -148,6 +148,9 @@ public class Worker implements Runnable {
private LeaderDecider leaderDecider;
private ShardSyncStrategy shardSyncStrategy;
@VisibleForTesting
boolean isInitialized;
/**
* Constructor.
*
@ -406,7 +409,8 @@ public class Worker implements Runnable {
config.getShardPrioritizationStrategy(),
config.getRetryGetRecordsInSeconds(),
config.getMaxGetRecordsThreadPool(),
DEFAULT_WORKER_STATE_CHANGE_LISTENER, DEFAULT_LEASE_CLEANUP_VALIDATOR, null /* leaderDecider */);
DEFAULT_WORKER_STATE_CHANGE_LISTENER, DEFAULT_LEASE_CLEANUP_VALIDATOR, null /* leaderDecider */,
false);
// If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB.
if (config.getRegionName() != null) {
@ -467,7 +471,7 @@ public class Worker implements Runnable {
shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService,
metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist,
shardPrioritization, Optional.empty(), Optional.empty(), DEFAULT_WORKER_STATE_CHANGE_LISTENER,
DEFAULT_LEASE_CLEANUP_VALIDATOR, null);
DEFAULT_LEASE_CLEANUP_VALIDATOR, null, false);
}
/**
@ -507,6 +511,9 @@ public class Worker implements Runnable {
* Max number of threads in the getRecords thread pool.
* @param leaseCleanupValidator
* leaseCleanupValidator instance used to validate leases
* @param initializeOnCreation
* initializeOnCreation used to initialize Worker on creation, validate dependencies and make sure that
* it would be ready to process records once created.
*/
// NOTE: This has package level access solely for testing
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES
@ -517,13 +524,30 @@ public class Worker implements Runnable {
IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization,
Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> maxGetRecordsThreadPool, WorkerStateChangeListener workerStateChangeListener,
LeaseCleanupValidator leaseCleanupValidator, LeaderDecider leaderDecider) {
LeaseCleanupValidator leaseCleanupValidator, LeaderDecider leaderDecider,
boolean initializeOnCreation) {
this(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream,
parentShardPollIntervalMillis, shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint,
leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis,
skipShardSyncAtWorkerInitializationIfLeasesExist, shardPrioritization, retryGetRecordsInSeconds,
maxGetRecordsThreadPool, workerStateChangeListener, new KinesisShardSyncer(leaseCleanupValidator),
leaderDecider);
leaderDecider, initializeOnCreation);
}
@Deprecated
Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config,
StreamConfig streamConfig, InitialPositionInStreamExtended initialPositionInStream,
long parentShardPollIntervalMillis, long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion,
ICheckpoint checkpoint, KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService,
IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization,
Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> maxGetRecordsThreadPool,
WorkerStateChangeListener workerStateChangeListener, ShardSyncer shardSyncer, LeaderDecider leaderDecider) {
this(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream,
parentShardPollIntervalMillis, shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint,
leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis,
skipShardSyncAtWorkerInitializationIfLeasesExist, shardPrioritization, retryGetRecordsInSeconds,
maxGetRecordsThreadPool, workerStateChangeListener, shardSyncer, leaderDecider, false);
}
Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config,
@ -533,7 +557,8 @@ public class Worker implements Runnable {
IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization,
Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> maxGetRecordsThreadPool,
WorkerStateChangeListener workerStateChangeListener, ShardSyncer shardSyncer, LeaderDecider leaderDecider) {
WorkerStateChangeListener workerStateChangeListener, ShardSyncer shardSyncer, LeaderDecider leaderDecider,
boolean initializeOnCreation) {
this.applicationName = applicationName;
this.recordProcessorFactory = recordProcessorFactory;
this.config = config;
@ -561,6 +586,12 @@ public class Worker implements Runnable {
this.leaderDecider = leaderDecider;
this.shardSyncStrategy = createShardSyncStrategy(config.getShardSyncStrategyType());
LOG.info(String.format("Shard sync strategy determined as %s.", shardSyncStrategy.getStrategyType().toString()));
if (initializeOnCreation) {
this.initialize();
} else {
this.isInitialized = false;
}
}
private ShardSyncStrategy createShardSyncStrategy(ShardSyncStrategyType strategyType) {
@ -612,10 +643,9 @@ public class Worker implements Runnable {
try {
initialize();
LOG.info("Initialization complete. Starting worker loop.");
LOG.info("Starting worker loop.");
} catch (RuntimeException e1) {
LOG.error("Unable to initialize after " + MAX_INITIALIZATION_ATTEMPTS + " attempts. Shutting down.", e1);
shutdown();
LOG.error("Could not start worker loop as worker is failed to initialize.");
}
while (!shouldShutdown()) {
@ -663,11 +693,15 @@ public class Worker implements Runnable {
}
private void initialize() {
if (isInitialized) {
return;
}
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.INITIALIZING);
boolean isDone = false;
Exception lastException = null;
for (int i = 0; (!isDone) && (i < MAX_INITIALIZATION_ATTEMPTS); i++) {
for (int i = 0; (!isInitialized) && (i < MAX_INITIALIZATION_ATTEMPTS); i++) {
try {
LOG.info("Initialization attempt " + (i + 1));
LOG.info("Initializing LeaseCoordinator");
@ -693,7 +727,7 @@ public class Worker implements Runnable {
LOG.info("LeaseCoordinator is already running. No need to start it.");
}
shardSyncStrategy.onWorkerInitialization();
isDone = true;
isInitialized = true;
} else {
lastException = result.getException();
}
@ -711,10 +745,14 @@ public class Worker implements Runnable {
}
}
if (!isDone) {
if (!isInitialized) {
shutdown();
LOG.error("Unable to initialize after " + MAX_INITIALIZATION_ATTEMPTS + " attempts. Shutting down.",
lastException);
throw new RuntimeException(lastException);
}
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.STARTED);
LOG.info("Initialization complete.");
}
/**
@ -1241,6 +1279,8 @@ public class Worker implements Runnable {
private ILeaseRenewer<KinesisClientLease> leaseRenewer;
@Setter @Accessors(fluent = true)
private ShardSyncer shardSyncer;
@Setter @Accessors(fluent = true)
private boolean initializeOnCreation = false;
@VisibleForTesting
@ -1419,7 +1459,8 @@ public class Worker implements Runnable {
shardPrioritization,
config.getRetryGetRecordsInSeconds(),
config.getMaxGetRecordsThreadPool(),
workerStateChangeListener, shardSyncer, leaderDecider);
workerStateChangeListener, shardSyncer, leaderDecider,
initializeOnCreation);
}
<R, T extends AwsClientBuilder<T, R>> R createClient(final T builder,

View file

@ -66,11 +66,13 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import com.amazonaws.services.kinesis.leases.exceptions.DependencyException;
import com.amazonaws.services.kinesis.leases.impl.GenericLeaseSelector;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLeaseBuilder;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLeaseManager;
import com.amazonaws.services.kinesis.leases.impl.LeaseManager;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseTaker;
import com.amazonaws.services.kinesis.leases.interfaces.LeaseSelector;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -1822,6 +1824,125 @@ public class WorkerTest {
any(AwsClientBuilder.class), eq(null), any(ClientConfiguration.class), eq(null), eq(region));
}
@Test
public void testBuilderWithInitializeOnCreationNotSet() {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
IKinesisProxy kinesisProxy = mock(KinesisLocalFileProxy.class);
AmazonKinesis kinesisClient = mock(AmazonKinesis.class);
AmazonDynamoDB dynamoDBClient = mock(AmazonDynamoDB.class);
AmazonCloudWatch amazonCloudWatchClient = mock(AmazonCloudWatch.class);
ILeaseTaker<KinesisClientLease> leaseTaker = mock(ILeaseTaker.class);
LeaderDecider leaderDecider = mock(LeaderDecider.class);
Worker worker = new Worker.Builder()
.execService(executorService)
.recordProcessorFactory(recordProcessorFactory)
.kinesisClient(kinesisClient)
.dynamoDBClient(dynamoDBClient)
.cloudWatchClient(amazonCloudWatchClient)
.metricsFactory(nullMetricsFactory)
.leaseManager(leaseManager)
.shardPrioritization(shardPrioritization)
.config(config)
.kinesisProxy(kinesisProxy)
.shardSyncer(shardSyncer)
.leaseTaker(leaseTaker)
.leaderDecider(leaderDecider)
.build();
Assert.assertTrue(!worker.isInitialized);
}
@Test
public void testBuilderWithInitializeOnCreationNotEnabled() {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
IKinesisProxy kinesisProxy = mock(KinesisLocalFileProxy.class);
AmazonKinesis kinesisClient = mock(AmazonKinesis.class);
AmazonDynamoDB dynamoDBClient = mock(AmazonDynamoDB.class);
AmazonCloudWatch amazonCloudWatchClient = mock(AmazonCloudWatch.class);
ILeaseTaker<KinesisClientLease> leaseTaker = mock(ILeaseTaker.class);
LeaderDecider leaderDecider = mock(LeaderDecider.class);
Worker worker = new Worker.Builder()
.execService(executorService)
.recordProcessorFactory(recordProcessorFactory)
.kinesisClient(kinesisClient)
.dynamoDBClient(dynamoDBClient)
.cloudWatchClient(amazonCloudWatchClient)
.metricsFactory(nullMetricsFactory)
.leaseManager(leaseManager)
.shardPrioritization(shardPrioritization)
.config(config)
.kinesisProxy(kinesisProxy)
.shardSyncer(shardSyncer)
.leaseTaker(leaseTaker)
.leaderDecider(leaderDecider)
.initializeOnCreation(false)
.build();
Assert.assertTrue(!worker.isInitialized);
}
@Test
public void testBuilderWithInitializeOnCreationEnabled() throws DependencyException {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
IKinesisProxy kinesisProxy = mock(KinesisLocalFileProxy.class);
AmazonKinesis kinesisClient = mock(AmazonKinesis.class);
AmazonDynamoDB dynamoDBClient = mock(AmazonDynamoDB.class);
AmazonCloudWatch amazonCloudWatchClient = mock(AmazonCloudWatch.class);
ILeaseTaker<KinesisClientLease> leaseTaker = mock(ILeaseTaker.class);
LeaderDecider leaderDecider = mock(LeaderDecider.class);
when(leaseManager.waitUntilLeaseTableExists(anyLong(), anyLong())).thenReturn(true);
Worker worker = new Worker.Builder()
.execService(executorService)
.recordProcessorFactory(recordProcessorFactory)
.kinesisClient(kinesisClient)
.dynamoDBClient(dynamoDBClient)
.cloudWatchClient(amazonCloudWatchClient)
.metricsFactory(nullMetricsFactory)
.leaseManager(leaseManager)
.shardPrioritization(shardPrioritization)
.config(config)
.kinesisProxy(kinesisProxy)
.shardSyncer(shardSyncer)
.leaseTaker(leaseTaker)
.leaderDecider(leaderDecider)
.initializeOnCreation(true)
.build();
Assert.assertTrue(worker.isInitialized);
}
@Test(expected = RuntimeException.class)
public void testBuilderWithInitializeOnCreationEnabled_Exception() throws DependencyException {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
IKinesisProxy kinesisProxy = mock(KinesisLocalFileProxy.class);
AmazonKinesis kinesisClient = mock(AmazonKinesis.class);
AmazonDynamoDB dynamoDBClient = mock(AmazonDynamoDB.class);
AmazonCloudWatch amazonCloudWatchClient = mock(AmazonCloudWatch.class);
ILeaseTaker<KinesisClientLease> leaseTaker = mock(ILeaseTaker.class);
LeaderDecider leaderDecider = mock(LeaderDecider.class);
when(leaseManager.waitUntilLeaseTableExists(anyLong(), anyLong())).thenThrow(new RuntimeException(
"Failed to initialize the lease table."));
when(config.getParentShardPollIntervalMillis()).thenReturn(1L);
Worker worker = new Worker.Builder()
.execService(executorService)
.recordProcessorFactory(recordProcessorFactory)
.kinesisClient(kinesisClient)
.dynamoDBClient(dynamoDBClient)
.cloudWatchClient(amazonCloudWatchClient)
.metricsFactory(nullMetricsFactory)
.leaseManager(leaseManager)
.shardPrioritization(shardPrioritization)
.config(config)
.kinesisProxy(kinesisProxy)
.shardSyncer(shardSyncer)
.leaseTaker(leaseTaker)
.leaderDecider(leaderDecider)
.initializeOnCreation(true)
.build();
}
private abstract class InjectableWorker extends Worker {
InjectableWorker(String applicationName, IRecordProcessorFactory recordProcessorFactory,
KinesisClientLibConfiguration config, StreamConfig streamConfig,