change the app to have 2 worker

This commit is contained in:
Qilin Jin 2021-11-22 10:56:43 -08:00
parent 92e02017f0
commit 2e0b9815cc

View file

@ -3,6 +3,7 @@ package software.amazon.kinesis;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.IOException; import java.io.IOException;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.util.Map;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -51,12 +52,14 @@ public class ApplicationTest {
String streamName1 = "consumer-level-metrics-1"; String streamName1 = "consumer-level-metrics-1";
String streamName2 = "consumer-level-metrics-2"; String streamName2 = "consumer-level-metrics-2";
String region = "us-east-2"; String region = "us-east-2";
String applicationName = "consumer-level-metrics-test";
new ApplicationTest(streamName1, region).run(); new ApplicationTest(applicationName, streamName1, streamName2, region).run();
new ApplicationTest(streamName2, region).run();
} }
private final String streamName; private final String applicationName;
private final String streamName1;
private final String streamName2;
private final Region region; private final Region region;
private final KinesisAsyncClient kinesisClient; private final KinesisAsyncClient kinesisClient;
@ -65,8 +68,10 @@ public class ApplicationTest {
* This KinesisClient is used to send dummy data so that the consumer has something to read; it is also used * This KinesisClient is used to send dummy data so that the consumer has something to read; it is also used
* indirectly by the KCL to handle the consumption of the data. * indirectly by the KCL to handle the consumption of the data.
*/ */
private ApplicationTest(String streamName, String region) { private ApplicationTest(String applicationName, String streamName1, String streamName2, String region) {
this.streamName = streamName; this.applicationName = applicationName;
this.streamName1 = streamName1;
this.streamName2 = streamName2;
this.region = Region.of(ObjectUtils.firstNonNull(region, "us-east-2")); this.region = Region.of(ObjectUtils.firstNonNull(region, "us-east-2"));
this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region)); this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region));
} }
@ -86,29 +91,43 @@ public class ApplicationTest {
*/ */
DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build(); DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, streamName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory()); ConfigsBuilder configsBuilder1 = new ConfigsBuilder(streamName1, applicationName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory());
ConfigsBuilder configsBuilder2 = new ConfigsBuilder(streamName2, applicationName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory());
/** /**
* The Scheduler (also called Worker in earlier versions of the KCL) is the entry point to the KCL. This * The Scheduler (also called Worker in earlier versions of the KCL) is the entry point to the KCL. This
* instance is configured with defaults provided by the ConfigsBuilder. * instance is configured with defaults provided by the ConfigsBuilder.
*/ */
Scheduler scheduler = new Scheduler( Scheduler scheduler1 = new Scheduler(
configsBuilder.checkpointConfig(), configsBuilder1.checkpointConfig(),
configsBuilder.coordinatorConfig(), configsBuilder1.coordinatorConfig(),
configsBuilder.leaseManagementConfig(), configsBuilder1.leaseManagementConfig(),
configsBuilder.lifecycleConfig(), configsBuilder1.lifecycleConfig(),
configsBuilder.metricsConfig(), configsBuilder1.metricsConfig(),
configsBuilder.processorConfig(), configsBuilder1.processorConfig(),
configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient)) configsBuilder1.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName1, kinesisClient))
);
Scheduler scheduler2 = new Scheduler(
configsBuilder2.checkpointConfig(),
configsBuilder2.coordinatorConfig(),
configsBuilder2.leaseManagementConfig(),
configsBuilder2.lifecycleConfig(),
configsBuilder2.metricsConfig(),
configsBuilder2.processorConfig(),
configsBuilder2.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName2, kinesisClient))
); );
/** /**
* Kickoff the Scheduler. Record processing of the stream of dummy data will continue indefinitely * Kickoff the Scheduler. Record processing of the stream of dummy data will continue indefinitely
* until an exit is triggered. * until an exit is triggered.
*/ */
Thread schedulerThread = new Thread(scheduler); Thread schedulerThread1 = new Thread(scheduler1);
schedulerThread.setDaemon(true); schedulerThread1.setDaemon(true);
schedulerThread.start(); schedulerThread1.start();
Thread schedulerThread2 = new Thread(scheduler2);
schedulerThread2.setDaemon(true);
schedulerThread2.start();
/** /**
* Allows termination of app by pressing Enter. * Allows termination of app by pressing Enter.
@ -132,10 +151,12 @@ public class ApplicationTest {
* Stops consuming data. Finishes processing the current batch of data already received from Kinesis * Stops consuming data. Finishes processing the current batch of data already received from Kinesis
* before shutting down. * before shutting down.
*/ */
Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown(); Future<Boolean> gracefulShutdownFuture1 = scheduler1.startGracefulShutdown();
Future<Boolean> gracefulShutdownFuture2 = scheduler2.startGracefulShutdown();
log.info("Waiting up to 20 seconds for shutdown to complete."); log.info("Waiting up to 20 seconds for shutdown to complete.");
try { try {
gracefulShutdownFuture.get(20, TimeUnit.SECONDS); gracefulShutdownFuture1.get(20, TimeUnit.SECONDS);
gracefulShutdownFuture2.get(20, TimeUnit.SECONDS);
} catch (InterruptedException e) { } catch (InterruptedException e) {
log.info("Interrupted while waiting for graceful shutdown. Continuing."); log.info("Interrupted while waiting for graceful shutdown. Continuing.");
} catch (ExecutionException e) { } catch (ExecutionException e) {
@ -150,13 +171,19 @@ public class ApplicationTest {
* Sends a single record of dummy data to Kinesis. * Sends a single record of dummy data to Kinesis.
*/ */
private void publishRecord() { private void publishRecord() {
PutRecordRequest request = PutRecordRequest.builder() PutRecordRequest request1 = PutRecordRequest.builder()
.partitionKey(RandomStringUtils.randomAlphabetic(5, 20)) .partitionKey(RandomStringUtils.randomAlphabetic(5, 20))
.streamName(streamName) .streamName(streamName1)
.data(SdkBytes.fromByteArray(RandomUtils.nextBytes(10)))
.build();
PutRecordRequest request2 = PutRecordRequest.builder()
.partitionKey(RandomStringUtils.randomAlphabetic(5, 20))
.streamName(streamName2)
.data(SdkBytes.fromByteArray(RandomUtils.nextBytes(10))) .data(SdkBytes.fromByteArray(RandomUtils.nextBytes(10)))
.build(); .build();
try { try {
kinesisClient.putRecord(request).get(); kinesisClient.putRecord(request1).get();
kinesisClient.putRecord(request2).get();
} catch (InterruptedException e) { } catch (InterruptedException e) {
log.info("Interrupted, assuming shutdown."); log.info("Interrupted, assuming shutdown.");
} catch (ExecutionException e) { } catch (ExecutionException e) {