diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 9eabcffe..276b5570 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -148,6 +148,9 @@ public class Worker implements Runnable { private LeaderDecider leaderDecider; private ShardSyncStrategy shardSyncStrategy; + @VisibleForTesting + boolean isInitialized; + /** * Constructor. * @@ -406,7 +409,8 @@ public class Worker implements Runnable { config.getShardPrioritizationStrategy(), config.getRetryGetRecordsInSeconds(), config.getMaxGetRecordsThreadPool(), - DEFAULT_WORKER_STATE_CHANGE_LISTENER, DEFAULT_LEASE_CLEANUP_VALIDATOR, null /* leaderDecider */); + DEFAULT_WORKER_STATE_CHANGE_LISTENER, DEFAULT_LEASE_CLEANUP_VALIDATOR, null /* leaderDecider */, + false); // If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB. if (config.getRegionName() != null) { @@ -467,7 +471,7 @@ public class Worker implements Runnable { shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, shardPrioritization, Optional.empty(), Optional.empty(), DEFAULT_WORKER_STATE_CHANGE_LISTENER, - DEFAULT_LEASE_CLEANUP_VALIDATOR, null); + DEFAULT_LEASE_CLEANUP_VALIDATOR, null, false); } /** @@ -507,6 +511,9 @@ public class Worker implements Runnable { * Max number of threads in the getRecords thread pool. * @param leaseCleanupValidator * leaseCleanupValidator instance used to validate leases + * @param initializeOnCreation + * initializeOnCreation used to initialize Worker on creation, validate dependencies and make sure that + * it would be ready to process records once created. */ // NOTE: This has package level access solely for testing // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES @@ -517,13 +524,30 @@ public class Worker implements Runnable { IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization, Optional retryGetRecordsInSeconds, Optional maxGetRecordsThreadPool, WorkerStateChangeListener workerStateChangeListener, - LeaseCleanupValidator leaseCleanupValidator, LeaderDecider leaderDecider) { + LeaseCleanupValidator leaseCleanupValidator, LeaderDecider leaderDecider, + boolean initializeOnCreation) { this(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream, parentShardPollIntervalMillis, shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, shardPrioritization, retryGetRecordsInSeconds, maxGetRecordsThreadPool, workerStateChangeListener, new KinesisShardSyncer(leaseCleanupValidator), - leaderDecider); + leaderDecider, initializeOnCreation); + } + + @Deprecated + Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, + StreamConfig streamConfig, InitialPositionInStreamExtended initialPositionInStream, + long parentShardPollIntervalMillis, long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, + ICheckpoint checkpoint, KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, + IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, + boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization, + Optional retryGetRecordsInSeconds, Optional maxGetRecordsThreadPool, + WorkerStateChangeListener workerStateChangeListener, ShardSyncer shardSyncer, LeaderDecider leaderDecider) { + this(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream, + parentShardPollIntervalMillis, shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, + leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, + skipShardSyncAtWorkerInitializationIfLeasesExist, shardPrioritization, retryGetRecordsInSeconds, + maxGetRecordsThreadPool, workerStateChangeListener, shardSyncer, leaderDecider, false); } Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, @@ -533,7 +557,8 @@ public class Worker implements Runnable { IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization, Optional retryGetRecordsInSeconds, Optional maxGetRecordsThreadPool, - WorkerStateChangeListener workerStateChangeListener, ShardSyncer shardSyncer, LeaderDecider leaderDecider) { + WorkerStateChangeListener workerStateChangeListener, ShardSyncer shardSyncer, LeaderDecider leaderDecider, + boolean initializeOnCreation) { this.applicationName = applicationName; this.recordProcessorFactory = recordProcessorFactory; this.config = config; @@ -561,6 +586,12 @@ public class Worker implements Runnable { this.leaderDecider = leaderDecider; this.shardSyncStrategy = createShardSyncStrategy(config.getShardSyncStrategyType()); LOG.info(String.format("Shard sync strategy determined as %s.", shardSyncStrategy.getStrategyType().toString())); + + if (initializeOnCreation) { + this.initialize(); + } else { + this.isInitialized = false; + } } private ShardSyncStrategy createShardSyncStrategy(ShardSyncStrategyType strategyType) { @@ -663,11 +694,15 @@ public class Worker implements Runnable { } private void initialize() { + if (this.isInitialized) { + return; + } + workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.INITIALIZING); - boolean isDone = false; + Exception lastException = null; - for (int i = 0; (!isDone) && (i < MAX_INITIALIZATION_ATTEMPTS); i++) { + for (int i = 0; (!this.isInitialized) && (i < MAX_INITIALIZATION_ATTEMPTS); i++) { try { LOG.info("Initialization attempt " + (i + 1)); LOG.info("Initializing LeaseCoordinator"); @@ -693,7 +728,7 @@ public class Worker implements Runnable { LOG.info("LeaseCoordinator is already running. No need to start it."); } shardSyncStrategy.onWorkerInitialization(); - isDone = true; + this.isInitialized = true; } else { lastException = result.getException(); } @@ -711,7 +746,7 @@ public class Worker implements Runnable { } } - if (!isDone) { + if (!this.isInitialized) { throw new RuntimeException(lastException); } workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.STARTED); @@ -1241,6 +1276,8 @@ public class Worker implements Runnable { private ILeaseRenewer leaseRenewer; @Setter @Accessors(fluent = true) private ShardSyncer shardSyncer; + @Setter @Accessors(fluent = true) + private boolean initializeOnCreation = false; @VisibleForTesting @@ -1419,7 +1456,8 @@ public class Worker implements Runnable { shardPrioritization, config.getRetryGetRecordsInSeconds(), config.getMaxGetRecordsThreadPool(), - workerStateChangeListener, shardSyncer, leaderDecider); + workerStateChangeListener, shardSyncer, leaderDecider, + initializeOnCreation); } > R createClient(final T builder, diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index a2faf607..fb76274d 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -66,11 +66,13 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import com.amazonaws.services.kinesis.leases.exceptions.DependencyException; import com.amazonaws.services.kinesis.leases.impl.GenericLeaseSelector; import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; import com.amazonaws.services.kinesis.leases.impl.KinesisClientLeaseBuilder; import com.amazonaws.services.kinesis.leases.impl.KinesisClientLeaseManager; import com.amazonaws.services.kinesis.leases.impl.LeaseManager; +import com.amazonaws.services.kinesis.leases.interfaces.ILeaseTaker; import com.amazonaws.services.kinesis.leases.interfaces.LeaseSelector; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -1822,6 +1824,125 @@ public class WorkerTest { any(AwsClientBuilder.class), eq(null), any(ClientConfiguration.class), eq(null), eq(region)); } + @Test + public void testBuilderWithInitializeOnCreationNotSet() { + IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + IKinesisProxy kinesisProxy = mock(KinesisLocalFileProxy.class); + AmazonKinesis kinesisClient = mock(AmazonKinesis.class); + AmazonDynamoDB dynamoDBClient = mock(AmazonDynamoDB.class); + AmazonCloudWatch amazonCloudWatchClient = mock(AmazonCloudWatch.class); + ILeaseTaker leaseTaker = mock(ILeaseTaker.class); + LeaderDecider leaderDecider = mock(LeaderDecider.class); + Worker worker = new Worker.Builder() + .execService(executorService) + .recordProcessorFactory(recordProcessorFactory) + .kinesisClient(kinesisClient) + .dynamoDBClient(dynamoDBClient) + .cloudWatchClient(amazonCloudWatchClient) + .metricsFactory(nullMetricsFactory) + .leaseManager(leaseManager) + .shardPrioritization(shardPrioritization) + .config(config) + .kinesisProxy(kinesisProxy) + .shardSyncer(shardSyncer) + .leaseTaker(leaseTaker) + .leaderDecider(leaderDecider) + .build(); + Assert.assertTrue(!worker.isInitialized); + } + + @Test + public void testBuilderWithInitializeOnCreationNotEnabled() { + IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + IKinesisProxy kinesisProxy = mock(KinesisLocalFileProxy.class); + AmazonKinesis kinesisClient = mock(AmazonKinesis.class); + AmazonDynamoDB dynamoDBClient = mock(AmazonDynamoDB.class); + AmazonCloudWatch amazonCloudWatchClient = mock(AmazonCloudWatch.class); + ILeaseTaker leaseTaker = mock(ILeaseTaker.class); + LeaderDecider leaderDecider = mock(LeaderDecider.class); + Worker worker = new Worker.Builder() + .execService(executorService) + .recordProcessorFactory(recordProcessorFactory) + .kinesisClient(kinesisClient) + .dynamoDBClient(dynamoDBClient) + .cloudWatchClient(amazonCloudWatchClient) + .metricsFactory(nullMetricsFactory) + .leaseManager(leaseManager) + .shardPrioritization(shardPrioritization) + .config(config) + .kinesisProxy(kinesisProxy) + .shardSyncer(shardSyncer) + .leaseTaker(leaseTaker) + .leaderDecider(leaderDecider) + .initializeOnCreation(false) + .build(); + Assert.assertTrue(!worker.isInitialized); + } + + @Test + public void testBuilderWithInitializeOnCreationEnabled() throws DependencyException { + IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + IKinesisProxy kinesisProxy = mock(KinesisLocalFileProxy.class); + AmazonKinesis kinesisClient = mock(AmazonKinesis.class); + AmazonDynamoDB dynamoDBClient = mock(AmazonDynamoDB.class); + AmazonCloudWatch amazonCloudWatchClient = mock(AmazonCloudWatch.class); + ILeaseTaker leaseTaker = mock(ILeaseTaker.class); + LeaderDecider leaderDecider = mock(LeaderDecider.class); + + when(leaseManager.waitUntilLeaseTableExists(anyLong(), anyLong())).thenReturn(true); + + Worker worker = new Worker.Builder() + .execService(executorService) + .recordProcessorFactory(recordProcessorFactory) + .kinesisClient(kinesisClient) + .dynamoDBClient(dynamoDBClient) + .cloudWatchClient(amazonCloudWatchClient) + .metricsFactory(nullMetricsFactory) + .leaseManager(leaseManager) + .shardPrioritization(shardPrioritization) + .config(config) + .kinesisProxy(kinesisProxy) + .shardSyncer(shardSyncer) + .leaseTaker(leaseTaker) + .leaderDecider(leaderDecider) + .initializeOnCreation(true) + .build(); + + Assert.assertTrue(worker.isInitialized); + } + + @Test(expected = RuntimeException.class) + public void testBuilderWithInitializeOnCreationEnabled_Exception() throws DependencyException { + IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + IKinesisProxy kinesisProxy = mock(KinesisLocalFileProxy.class); + AmazonKinesis kinesisClient = mock(AmazonKinesis.class); + AmazonDynamoDB dynamoDBClient = mock(AmazonDynamoDB.class); + AmazonCloudWatch amazonCloudWatchClient = mock(AmazonCloudWatch.class); + ILeaseTaker leaseTaker = mock(ILeaseTaker.class); + LeaderDecider leaderDecider = mock(LeaderDecider.class); + + when(leaseManager.waitUntilLeaseTableExists(anyLong(), anyLong())).thenThrow(new RuntimeException( + "Failed to initialize the lease table.")); + when(config.getParentShardPollIntervalMillis()).thenReturn(1L); + + Worker worker = new Worker.Builder() + .execService(executorService) + .recordProcessorFactory(recordProcessorFactory) + .kinesisClient(kinesisClient) + .dynamoDBClient(dynamoDBClient) + .cloudWatchClient(amazonCloudWatchClient) + .metricsFactory(nullMetricsFactory) + .leaseManager(leaseManager) + .shardPrioritization(shardPrioritization) + .config(config) + .kinesisProxy(kinesisProxy) + .shardSyncer(shardSyncer) + .leaseTaker(leaseTaker) + .leaderDecider(leaderDecider) + .initializeOnCreation(true) + .build(); + } + private abstract class InjectableWorker extends Worker { InjectableWorker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, StreamConfig streamConfig,