diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/ApplicationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/ApplicationTest.java index 69957939..32d228ef 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/ApplicationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/ApplicationTest.java @@ -3,6 +3,7 @@ package software.amazon.kinesis; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; +import java.util.Map; import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; @@ -51,12 +52,14 @@ public class ApplicationTest { String streamName1 = "consumer-level-metrics-1"; String streamName2 = "consumer-level-metrics-2"; String region = "us-east-2"; + String applicationName = "consumer-level-metrics-test"; - new ApplicationTest(streamName1, region).run(); - new ApplicationTest(streamName2, region).run(); + new ApplicationTest(applicationName, streamName1, streamName2, region).run(); } - private final String streamName; + private final String applicationName; + private final String streamName1; + private final String streamName2; private final Region region; private final KinesisAsyncClient kinesisClient; @@ -65,8 +68,10 @@ public class ApplicationTest { * This KinesisClient is used to send dummy data so that the consumer has something to read; it is also used * indirectly by the KCL to handle the consumption of the data. */ - private ApplicationTest(String streamName, String region) { - this.streamName = streamName; + private ApplicationTest(String applicationName, String streamName1, String streamName2, String region) { + this.applicationName = applicationName; + this.streamName1 = streamName1; + this.streamName2 = streamName2; this.region = Region.of(ObjectUtils.firstNonNull(region, "us-east-2")); this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region)); } @@ -86,29 +91,43 @@ public class ApplicationTest { */ DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); - ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, streamName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory()); + ConfigsBuilder configsBuilder1 = new ConfigsBuilder(streamName1, applicationName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory()); + ConfigsBuilder configsBuilder2 = new ConfigsBuilder(streamName2, applicationName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory()); /** * The Scheduler (also called Worker in earlier versions of the KCL) is the entry point to the KCL. This * instance is configured with defaults provided by the ConfigsBuilder. */ - Scheduler scheduler = new Scheduler( - configsBuilder.checkpointConfig(), - configsBuilder.coordinatorConfig(), - configsBuilder.leaseManagementConfig(), - configsBuilder.lifecycleConfig(), - configsBuilder.metricsConfig(), - configsBuilder.processorConfig(), - configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient)) + Scheduler scheduler1 = new Scheduler( + configsBuilder1.checkpointConfig(), + configsBuilder1.coordinatorConfig(), + configsBuilder1.leaseManagementConfig(), + configsBuilder1.lifecycleConfig(), + configsBuilder1.metricsConfig(), + configsBuilder1.processorConfig(), + configsBuilder1.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName1, kinesisClient)) + ); + Scheduler scheduler2 = new Scheduler( + configsBuilder2.checkpointConfig(), + configsBuilder2.coordinatorConfig(), + configsBuilder2.leaseManagementConfig(), + configsBuilder2.lifecycleConfig(), + configsBuilder2.metricsConfig(), + configsBuilder2.processorConfig(), + configsBuilder2.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName2, kinesisClient)) ); /** * Kickoff the Scheduler. Record processing of the stream of dummy data will continue indefinitely * until an exit is triggered. */ - Thread schedulerThread = new Thread(scheduler); - schedulerThread.setDaemon(true); - schedulerThread.start(); + Thread schedulerThread1 = new Thread(scheduler1); + schedulerThread1.setDaemon(true); + schedulerThread1.start(); + + Thread schedulerThread2 = new Thread(scheduler2); + schedulerThread2.setDaemon(true); + schedulerThread2.start(); /** * Allows termination of app by pressing Enter. @@ -132,10 +151,12 @@ public class ApplicationTest { * Stops consuming data. Finishes processing the current batch of data already received from Kinesis * before shutting down. */ - Future gracefulShutdownFuture = scheduler.startGracefulShutdown(); + Future gracefulShutdownFuture1 = scheduler1.startGracefulShutdown(); + Future gracefulShutdownFuture2 = scheduler2.startGracefulShutdown(); log.info("Waiting up to 20 seconds for shutdown to complete."); try { - gracefulShutdownFuture.get(20, TimeUnit.SECONDS); + gracefulShutdownFuture1.get(20, TimeUnit.SECONDS); + gracefulShutdownFuture2.get(20, TimeUnit.SECONDS); } catch (InterruptedException e) { log.info("Interrupted while waiting for graceful shutdown. Continuing."); } catch (ExecutionException e) { @@ -150,13 +171,19 @@ public class ApplicationTest { * Sends a single record of dummy data to Kinesis. */ private void publishRecord() { - PutRecordRequest request = PutRecordRequest.builder() + PutRecordRequest request1 = PutRecordRequest.builder() .partitionKey(RandomStringUtils.randomAlphabetic(5, 20)) - .streamName(streamName) + .streamName(streamName1) + .data(SdkBytes.fromByteArray(RandomUtils.nextBytes(10))) + .build(); + PutRecordRequest request2 = PutRecordRequest.builder() + .partitionKey(RandomStringUtils.randomAlphabetic(5, 20)) + .streamName(streamName2) .data(SdkBytes.fromByteArray(RandomUtils.nextBytes(10))) .build(); try { - kinesisClient.putRecord(request).get(); + kinesisClient.putRecord(request1).get(); + kinesisClient.putRecord(request2).get(); } catch (InterruptedException e) { log.info("Interrupted, assuming shutdown."); } catch (ExecutionException e) {