From 73e75e2a08981a642099185e1acb9d13f42dcbb0 Mon Sep 17 00:00:00 2001 From: Qilin Jin Date: Wed, 27 Oct 2021 20:43:32 -0700 Subject: [PATCH 1/6] typo fix in comments --- .../java/software/amazon/kinesis/processor/Checkpointer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/Checkpointer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/Checkpointer.java index 2ffadc06..bb5c2029 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/Checkpointer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/Checkpointer.java @@ -60,7 +60,7 @@ public interface Checkpointer { /** - * Record intent to checkpoint for a shard. Upon failover, the pendingCheckpointValue will be passed to the new + * Record intent to checkpoint for a shard. Upon failover, the pendingCheckpoint will be passed to the new * ShardRecordProcessor's initialize() method. * * @param leaseKey Checkpoint is specified for this shard. From 4cd395da98f1c77a81c86eb96a2e0f775afc9674 Mon Sep 17 00:00:00 2001 From: Qilin Jin Date: Thu, 18 Nov 2021 10:48:23 -0800 Subject: [PATCH 2/6] first commit for app-level mills_behind_latest metric --- .../amazon/kinesis/lifecycle/ProcessTask.java | 25 +++++++++++-------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java index b3ba8a7d..bd3b9721 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java @@ -45,6 +45,7 @@ import software.amazon.kinesis.schemaregistry.SchemaRegistryDecoder; @KinesisClientInternalApi public class ProcessTask implements ConsumerTask { private static final String PROCESS_TASK_OPERATION = "ProcessTask"; + private static final String APPLICATION_LEVEL_METRICS = "ApplicationLevelMetrics"; private static final String DATA_BYTES_PROCESSED_METRIC = "DataBytesProcessed"; private static final String RECORDS_PROCESSED_METRIC = "RecordsProcessed"; private static final String RECORD_PROCESSOR_PROCESS_RECORDS_METRIC = "RecordProcessor.processRecords"; @@ -112,20 +113,23 @@ public class ProcessTask implements ConsumerTask { */ @Override public TaskResult call() { - final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, PROCESS_TASK_OPERATION); + final MetricsScope scope_app = MetricsUtil.createMetricsWithOperation(metricsFactory, APPLICATION_LEVEL_METRICS); + final MetricsScope scope_shard = MetricsUtil.createMetricsWithOperation(metricsFactory, PROCESS_TASK_OPERATION); shardInfo.streamIdentifierSerOpt() - .ifPresent(streamId -> MetricsUtil.addStreamId(scope, StreamIdentifier.multiStreamInstance(streamId))); - MetricsUtil.addShardId(scope, shardInfo.shardId()); + .ifPresent(streamId -> MetricsUtil.addStreamId(scope_shard, StreamIdentifier.multiStreamInstance(streamId))); + MetricsUtil.addShardId(scope_shard, shardInfo.shardId()); long startTimeMillis = System.currentTimeMillis(); boolean success = false; try { - scope.addData(RECORDS_PROCESSED_METRIC, 0, StandardUnit.COUNT, MetricsLevel.SUMMARY); - scope.addData(DATA_BYTES_PROCESSED_METRIC, 0, StandardUnit.BYTES, MetricsLevel.SUMMARY); + scope_shard.addData(RECORDS_PROCESSED_METRIC, 0, StandardUnit.COUNT, MetricsLevel.SUMMARY); + scope_shard.addData(DATA_BYTES_PROCESSED_METRIC, 0, StandardUnit.BYTES, MetricsLevel.SUMMARY); Exception exception = null; try { if (processRecordsInput.millisBehindLatest() != null) { - scope.addData(MILLIS_BEHIND_LATEST_METRIC, processRecordsInput.millisBehindLatest(), + scope_shard.addData(MILLIS_BEHIND_LATEST_METRIC, processRecordsInput.millisBehindLatest(), + StandardUnit.MILLISECONDS, MetricsLevel.SUMMARY); + scope_app.addData(MILLIS_BEHIND_LATEST_METRIC, processRecordsInput.millisBehindLatest(), StandardUnit.MILLISECONDS, MetricsLevel.SUMMARY); } @@ -142,11 +146,11 @@ public class ProcessTask implements ConsumerTask { } if (!records.isEmpty()) { - scope.addData(RECORDS_PROCESSED_METRIC, records.size(), StandardUnit.COUNT, MetricsLevel.SUMMARY); + scope_shard.addData(RECORDS_PROCESSED_METRIC, records.size(), StandardUnit.COUNT, MetricsLevel.SUMMARY); } recordProcessorCheckpointer.largestPermittedCheckpointValue(filterAndGetMaxExtendedSequenceNumber( - scope, records, recordProcessorCheckpointer.lastCheckpointValue(), + scope_shard, records, recordProcessorCheckpointer.lastCheckpointValue(), recordProcessorCheckpointer.largestPermittedCheckpointValue())); if (shouldCallProcessRecords(records)) { @@ -165,8 +169,9 @@ public class ProcessTask implements ConsumerTask { } return new TaskResult(exception); } finally { - MetricsUtil.addSuccessAndLatency(scope, success, startTimeMillis, MetricsLevel.SUMMARY); - MetricsUtil.endScope(scope); + MetricsUtil.addSuccessAndLatency(scope_shard, success, startTimeMillis, MetricsLevel.SUMMARY); + MetricsUtil.endScope(scope_shard); + MetricsUtil.endScope(scope_app); } } From 92e02017f0d09261e8c162bd08992a6af15b47d4 Mon Sep 17 00:00:00 2001 From: Qilin Jin Date: Sun, 21 Nov 2021 22:10:16 -0800 Subject: [PATCH 3/6] temporary test file --- .../amazon/kinesis/ApplicationTest.java | 272 ++++++++++++++++++ 1 file changed, 272 insertions(+) create mode 100644 amazon-kinesis-client/src/test/java/software/amazon/kinesis/ApplicationTest.java diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/ApplicationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/ApplicationTest.java new file mode 100644 index 00000000..69957939 --- /dev/null +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/ApplicationTest.java @@ -0,0 +1,272 @@ +package software.amazon.kinesis; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.commons.lang3.ObjectUtils; +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.commons.lang3.RandomUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; + +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.model.PutRecordRequest; +import software.amazon.kinesis.common.ConfigsBuilder; +import software.amazon.kinesis.common.KinesisClientUtil; +import software.amazon.kinesis.coordinator.Scheduler; +import software.amazon.kinesis.exceptions.InvalidStateException; +import software.amazon.kinesis.exceptions.ShutdownException; +import software.amazon.kinesis.lifecycle.events.InitializationInput; +import software.amazon.kinesis.lifecycle.events.LeaseLostInput; +import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; +import software.amazon.kinesis.lifecycle.events.ShardEndedInput; +import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; + +import software.amazon.kinesis.processor.ShardRecordProcessor; +import software.amazon.kinesis.processor.ShardRecordProcessorFactory; +import software.amazon.kinesis.retrieval.polling.PollingConfig; + +public class ApplicationTest { + private static final Logger log = LoggerFactory.getLogger(ApplicationTest.class); + + /** + * Invoke the main method + * Verifies valid inputs and then starts running the app. + */ + public static void main(String... args) { + String streamName1 = "consumer-level-metrics-1"; + String streamName2 = "consumer-level-metrics-2"; + String region = "us-east-2"; + + new ApplicationTest(streamName1, region).run(); + new ApplicationTest(streamName2, region).run(); + } + + private final String streamName; + private final Region region; + private final KinesisAsyncClient kinesisClient; + + /** + * Constructor sets streamName and region. It also creates a KinesisClient object to send data to Kinesis. + * This KinesisClient is used to send dummy data so that the consumer has something to read; it is also used + * indirectly by the KCL to handle the consumption of the data. + */ + private ApplicationTest(String streamName, String region) { + this.streamName = streamName; + this.region = Region.of(ObjectUtils.firstNonNull(region, "us-east-2")); + this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region)); + } + + private void run() { + + /** + * Sends dummy data to Kinesis. Not relevant to consuming the data with the KCL + */ + ScheduledExecutorService producerExecutor = Executors.newSingleThreadScheduledExecutor(); + ScheduledFuture producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 10, 1, TimeUnit.SECONDS); + + /** + * Sets up configuration for the KCL, including DynamoDB and CloudWatch dependencies. The final argument, a + * ShardRecordProcessorFactory, is where the logic for record processing lives, and is located in a private + * class below. + */ + DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build(); + CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); + ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, streamName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory()); + + /** + * The Scheduler (also called Worker in earlier versions of the KCL) is the entry point to the KCL. This + * instance is configured with defaults provided by the ConfigsBuilder. + */ + Scheduler scheduler = new Scheduler( + configsBuilder.checkpointConfig(), + configsBuilder.coordinatorConfig(), + configsBuilder.leaseManagementConfig(), + configsBuilder.lifecycleConfig(), + configsBuilder.metricsConfig(), + configsBuilder.processorConfig(), + configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient)) + ); + + /** + * Kickoff the Scheduler. Record processing of the stream of dummy data will continue indefinitely + * until an exit is triggered. + */ + Thread schedulerThread = new Thread(scheduler); + schedulerThread.setDaemon(true); + schedulerThread.start(); + + /** + * Allows termination of app by pressing Enter. + */ + System.out.println("Press enter to shutdown"); + BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); + try { + reader.readLine(); + } catch (IOException ioex) { + log.error("Caught exception while waiting for confirm. Shutting down.", ioex); + } + + /** + * Stops sending dummy data. + */ + log.info("Cancelling producer and shutting down executor."); + producerFuture.cancel(true); + producerExecutor.shutdownNow(); + + /** + * Stops consuming data. Finishes processing the current batch of data already received from Kinesis + * before shutting down. + */ + Future gracefulShutdownFuture = scheduler.startGracefulShutdown(); + log.info("Waiting up to 20 seconds for shutdown to complete."); + try { + gracefulShutdownFuture.get(20, TimeUnit.SECONDS); + } catch (InterruptedException e) { + log.info("Interrupted while waiting for graceful shutdown. Continuing."); + } catch (ExecutionException e) { + log.error("Exception while executing graceful shutdown.", e); + } catch (TimeoutException e) { + log.error("Timeout while waiting for shutdown. Scheduler may not have exited."); + } + log.info("Completed, shutting down now."); + } + + /** + * Sends a single record of dummy data to Kinesis. + */ + private void publishRecord() { + PutRecordRequest request = PutRecordRequest.builder() + .partitionKey(RandomStringUtils.randomAlphabetic(5, 20)) + .streamName(streamName) + .data(SdkBytes.fromByteArray(RandomUtils.nextBytes(10))) + .build(); + try { + kinesisClient.putRecord(request).get(); + } catch (InterruptedException e) { + log.info("Interrupted, assuming shutdown."); + } catch (ExecutionException e) { + log.error("Exception while sending data to Kinesis. Will try again next cycle.", e); + } + } + + private static class SampleRecordProcessorFactory implements ShardRecordProcessorFactory { + public ShardRecordProcessor shardRecordProcessor() { + return new SampleRecordProcessor(); + } + } + + /** + * The implementation of the ShardRecordProcessor interface is where the heart of the record processing logic lives. + * In this example all we do to 'process' is log info about the records. + */ + private static class SampleRecordProcessor implements ShardRecordProcessor { + + private static final String SHARD_ID_MDC_KEY = "ShardId"; + + private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class); + + private String shardId; + + /** + * Invoked by the KCL before data records are delivered to the ShardRecordProcessor instance (via + * processRecords). In this example we do nothing except some logging. + * + * @param initializationInput Provides information related to initialization. + */ + public void initialize(InitializationInput initializationInput) { + shardId = initializationInput.shardId(); + MDC.put(SHARD_ID_MDC_KEY, shardId); + try { + log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber()); + } finally { + MDC.remove(SHARD_ID_MDC_KEY); + } + } + + /** + * Handles record processing logic. The Amazon Kinesis Client Library will invoke this method to deliver + * data records to the application. In this example we simply log our records. + * + * @param processRecordsInput Provides the records to be processed as well as information and capabilities + * related to them (e.g. checkpointing). + */ + public void processRecords(ProcessRecordsInput processRecordsInput) { + MDC.put(SHARD_ID_MDC_KEY, shardId); + try { + log.info("Processing {} record(s)", processRecordsInput.records().size()); + processRecordsInput.records().forEach(r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber())); + } catch (Throwable t) { + log.error("Caught throwable while processing records. Aborting."); + Runtime.getRuntime().halt(1); + } finally { + MDC.remove(SHARD_ID_MDC_KEY); + } + } + + /** Called when the lease tied to this record processor has been lost. Once the lease has been lost, + * the record processor can no longer checkpoint. + * + * @param leaseLostInput Provides access to functions and data related to the loss of the lease. + */ + public void leaseLost(LeaseLostInput leaseLostInput) { + MDC.put(SHARD_ID_MDC_KEY, shardId); + try { + log.info("Lost lease, so terminating."); + } finally { + MDC.remove(SHARD_ID_MDC_KEY); + } + } + + /** + * Called when all data on this shard has been processed. Checkpointing must occur in the method for record + * processing to be considered complete; an exception will be thrown otherwise. + * + * @param shardEndedInput Provides access to a checkpointer method for completing processing of the shard. + */ + public void shardEnded(ShardEndedInput shardEndedInput) { + MDC.put(SHARD_ID_MDC_KEY, shardId); + try { + log.info("Reached shard end checkpointing."); + shardEndedInput.checkpointer().checkpoint(); + } catch (ShutdownException | InvalidStateException e) { + log.error("Exception while checkpointing at shard end. Giving up.", e); + } finally { + MDC.remove(SHARD_ID_MDC_KEY); + } + } + + /** + * Invoked when Scheduler has been requested to shut down (i.e. we decide to stop running the app by pressing + * Enter). Checkpoints and logs the data a final time. + * + * @param shutdownRequestedInput Provides access to a checkpointer, allowing a record processor to checkpoint + * before the shutdown is completed. + */ + public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { + MDC.put(SHARD_ID_MDC_KEY, shardId); + try { + log.info("Scheduler is shutting down, checkpointing."); + shutdownRequestedInput.checkpointer().checkpoint(); + } catch (ShutdownException | InvalidStateException e) { + log.error("Exception while checkpointing at requested shutdown. Giving up.", e); + } finally { + MDC.remove(SHARD_ID_MDC_KEY); + } + } + } +} From 2e0b9815cc2c64e4626ffc809ba6281e2810ba50 Mon Sep 17 00:00:00 2001 From: Qilin Jin Date: Mon, 22 Nov 2021 10:56:43 -0800 Subject: [PATCH 4/6] change the app to have 2 worker --- .../amazon/kinesis/ApplicationTest.java | 71 +++++++++++++------ 1 file changed, 49 insertions(+), 22 deletions(-) diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/ApplicationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/ApplicationTest.java index 69957939..32d228ef 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/ApplicationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/ApplicationTest.java @@ -3,6 +3,7 @@ package software.amazon.kinesis; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; +import java.util.Map; import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; @@ -51,12 +52,14 @@ public class ApplicationTest { String streamName1 = "consumer-level-metrics-1"; String streamName2 = "consumer-level-metrics-2"; String region = "us-east-2"; + String applicationName = "consumer-level-metrics-test"; - new ApplicationTest(streamName1, region).run(); - new ApplicationTest(streamName2, region).run(); + new ApplicationTest(applicationName, streamName1, streamName2, region).run(); } - private final String streamName; + private final String applicationName; + private final String streamName1; + private final String streamName2; private final Region region; private final KinesisAsyncClient kinesisClient; @@ -65,8 +68,10 @@ public class ApplicationTest { * This KinesisClient is used to send dummy data so that the consumer has something to read; it is also used * indirectly by the KCL to handle the consumption of the data. */ - private ApplicationTest(String streamName, String region) { - this.streamName = streamName; + private ApplicationTest(String applicationName, String streamName1, String streamName2, String region) { + this.applicationName = applicationName; + this.streamName1 = streamName1; + this.streamName2 = streamName2; this.region = Region.of(ObjectUtils.firstNonNull(region, "us-east-2")); this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region)); } @@ -86,29 +91,43 @@ public class ApplicationTest { */ DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); - ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, streamName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory()); + ConfigsBuilder configsBuilder1 = new ConfigsBuilder(streamName1, applicationName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory()); + ConfigsBuilder configsBuilder2 = new ConfigsBuilder(streamName2, applicationName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory()); /** * The Scheduler (also called Worker in earlier versions of the KCL) is the entry point to the KCL. This * instance is configured with defaults provided by the ConfigsBuilder. */ - Scheduler scheduler = new Scheduler( - configsBuilder.checkpointConfig(), - configsBuilder.coordinatorConfig(), - configsBuilder.leaseManagementConfig(), - configsBuilder.lifecycleConfig(), - configsBuilder.metricsConfig(), - configsBuilder.processorConfig(), - configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient)) + Scheduler scheduler1 = new Scheduler( + configsBuilder1.checkpointConfig(), + configsBuilder1.coordinatorConfig(), + configsBuilder1.leaseManagementConfig(), + configsBuilder1.lifecycleConfig(), + configsBuilder1.metricsConfig(), + configsBuilder1.processorConfig(), + configsBuilder1.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName1, kinesisClient)) + ); + Scheduler scheduler2 = new Scheduler( + configsBuilder2.checkpointConfig(), + configsBuilder2.coordinatorConfig(), + configsBuilder2.leaseManagementConfig(), + configsBuilder2.lifecycleConfig(), + configsBuilder2.metricsConfig(), + configsBuilder2.processorConfig(), + configsBuilder2.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName2, kinesisClient)) ); /** * Kickoff the Scheduler. Record processing of the stream of dummy data will continue indefinitely * until an exit is triggered. */ - Thread schedulerThread = new Thread(scheduler); - schedulerThread.setDaemon(true); - schedulerThread.start(); + Thread schedulerThread1 = new Thread(scheduler1); + schedulerThread1.setDaemon(true); + schedulerThread1.start(); + + Thread schedulerThread2 = new Thread(scheduler2); + schedulerThread2.setDaemon(true); + schedulerThread2.start(); /** * Allows termination of app by pressing Enter. @@ -132,10 +151,12 @@ public class ApplicationTest { * Stops consuming data. Finishes processing the current batch of data already received from Kinesis * before shutting down. */ - Future gracefulShutdownFuture = scheduler.startGracefulShutdown(); + Future gracefulShutdownFuture1 = scheduler1.startGracefulShutdown(); + Future gracefulShutdownFuture2 = scheduler2.startGracefulShutdown(); log.info("Waiting up to 20 seconds for shutdown to complete."); try { - gracefulShutdownFuture.get(20, TimeUnit.SECONDS); + gracefulShutdownFuture1.get(20, TimeUnit.SECONDS); + gracefulShutdownFuture2.get(20, TimeUnit.SECONDS); } catch (InterruptedException e) { log.info("Interrupted while waiting for graceful shutdown. Continuing."); } catch (ExecutionException e) { @@ -150,13 +171,19 @@ public class ApplicationTest { * Sends a single record of dummy data to Kinesis. */ private void publishRecord() { - PutRecordRequest request = PutRecordRequest.builder() + PutRecordRequest request1 = PutRecordRequest.builder() .partitionKey(RandomStringUtils.randomAlphabetic(5, 20)) - .streamName(streamName) + .streamName(streamName1) + .data(SdkBytes.fromByteArray(RandomUtils.nextBytes(10))) + .build(); + PutRecordRequest request2 = PutRecordRequest.builder() + .partitionKey(RandomStringUtils.randomAlphabetic(5, 20)) + .streamName(streamName2) .data(SdkBytes.fromByteArray(RandomUtils.nextBytes(10))) .build(); try { - kinesisClient.putRecord(request).get(); + kinesisClient.putRecord(request1).get(); + kinesisClient.putRecord(request2).get(); } catch (InterruptedException e) { log.info("Interrupted, assuming shutdown."); } catch (ExecutionException e) { From c85b1f42b7fb0cb7bdf59e919a756277f44b65df Mon Sep 17 00:00:00 2001 From: Qilin Jin Date: Thu, 2 Dec 2021 10:41:05 -0800 Subject: [PATCH 5/6] change operation name for the new metric --- .../java/software/amazon/kinesis/lifecycle/ProcessTask.java | 4 ++-- .../test/java/software/amazon/kinesis/ApplicationTest.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java index bd3b9721..b1f6121c 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java @@ -45,7 +45,7 @@ import software.amazon.kinesis.schemaregistry.SchemaRegistryDecoder; @KinesisClientInternalApi public class ProcessTask implements ConsumerTask { private static final String PROCESS_TASK_OPERATION = "ProcessTask"; - private static final String APPLICATION_LEVEL_METRICS = "ApplicationLevelMetrics"; + private static final String APPLICATION_TRACKER_OPERATION = "ApplicationTracker"; private static final String DATA_BYTES_PROCESSED_METRIC = "DataBytesProcessed"; private static final String RECORDS_PROCESSED_METRIC = "RecordsProcessed"; private static final String RECORD_PROCESSOR_PROCESS_RECORDS_METRIC = "RecordProcessor.processRecords"; @@ -113,7 +113,7 @@ public class ProcessTask implements ConsumerTask { */ @Override public TaskResult call() { - final MetricsScope scope_app = MetricsUtil.createMetricsWithOperation(metricsFactory, APPLICATION_LEVEL_METRICS); + final MetricsScope scope_app = MetricsUtil.createMetricsWithOperation(metricsFactory, APPLICATION_TRACKER_OPERATION); final MetricsScope scope_shard = MetricsUtil.createMetricsWithOperation(metricsFactory, PROCESS_TASK_OPERATION); shardInfo.streamIdentifierSerOpt() .ifPresent(streamId -> MetricsUtil.addStreamId(scope_shard, StreamIdentifier.multiStreamInstance(streamId))); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/ApplicationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/ApplicationTest.java index 32d228ef..e3616ff0 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/ApplicationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/ApplicationTest.java @@ -52,7 +52,7 @@ public class ApplicationTest { String streamName1 = "consumer-level-metrics-1"; String streamName2 = "consumer-level-metrics-2"; String region = "us-east-2"; - String applicationName = "consumer-level-metrics-test"; + String applicationName = "consumer-level-metrics-test-new"; new ApplicationTest(applicationName, streamName1, streamName2, region).run(); } From e92f077817b6353cbf53ffb046e831811fe54a28 Mon Sep 17 00:00:00 2001 From: Qilin Jin Date: Sun, 5 Dec 2021 13:29:38 -0800 Subject: [PATCH 6/6] testing script for creating millisbehinlatest --- .../amazon/kinesis/lifecycle/ProcessTask.java | 1 + .../kinesis/lifecycle/ShardConsumer.java | 9 ++--- .../retrieval/polling/KinesisDataFetcher.java | 5 ++- .../amazon/kinesis/ApplicationTest.java | 33 ++++++++++++++++--- 4 files changed, 39 insertions(+), 9 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java index b1f6121c..ee4f42c3 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java @@ -16,6 +16,7 @@ package software.amazon.kinesis.lifecycle; import java.util.List; import java.util.ListIterator; +import java.util.concurrent.TimeUnit; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java index b6e7c068..bf28f818 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java @@ -18,10 +18,7 @@ import java.time.Duration; import java.time.Instant; import java.util.List; import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.*; import java.util.function.Function; import org.reactivestreams.Subscription; @@ -331,7 +328,11 @@ public class ShardConsumer { taskIsRunning = true; TaskResult result; try { + TimeUnit.MILLISECONDS.sleep(20); result = task.call(); + } catch (InterruptedException e) { + e.printStackTrace(); + result = null; } finally { taskIsRunning = false; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java index 223ab367..a9efb480 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java @@ -19,6 +19,7 @@ import com.google.common.collect.Iterables; import java.time.Duration; import java.util.Collections; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import lombok.AccessLevel; import lombok.Data; @@ -291,8 +292,10 @@ public class KinesisDataFetcher implements DataFetcher { @Override public GetRecordsResponse getGetRecordsResponse(GetRecordsRequest request) throws ExecutionException, InterruptedException, TimeoutException { + TimeUnit.SECONDS.sleep(20); final GetRecordsResponse response = FutureUtils.resolveOrCancelFuture(kinesisClient.getRecords(request), maxFutureWait); + System.out.println("millisbehindlatest is " + response.millisBehindLatest()); if (!isValidResult(response.nextShardIterator(), response.childShards())) { throw new RetryableRetrievalException("GetRecords response is not valid for shard: " + streamAndShardId + ". nextShardIterator: " + response.nextShardIterator() @@ -304,7 +307,7 @@ public class KinesisDataFetcher implements DataFetcher { @Override public GetRecordsRequest getGetRecordsRequest(String nextIterator) { return KinesisRequestsBuilder.getRecordsRequestBuilder().shardIterator(nextIterator) - .limit(maxRecords).build(); + .limit(10).build(); } @Override diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/ApplicationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/ApplicationTest.java index e3616ff0..f2ddabf9 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/ApplicationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/ApplicationTest.java @@ -82,7 +82,7 @@ public class ApplicationTest { * Sends dummy data to Kinesis. Not relevant to consuming the data with the KCL */ ScheduledExecutorService producerExecutor = Executors.newSingleThreadScheduledExecutor(); - ScheduledFuture producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 10, 1, TimeUnit.SECONDS); + ScheduledFuture producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 1, 1, TimeUnit.SECONDS); /** * Sets up configuration for the KCL, including DynamoDB and CloudWatch dependencies. The final argument, a @@ -173,17 +173,42 @@ public class ApplicationTest { private void publishRecord() { PutRecordRequest request1 = PutRecordRequest.builder() .partitionKey(RandomStringUtils.randomAlphabetic(5, 20)) - .streamName(streamName1) + .streamName(streamName2) .data(SdkBytes.fromByteArray(RandomUtils.nextBytes(10))) .build(); PutRecordRequest request2 = PutRecordRequest.builder() .partitionKey(RandomStringUtils.randomAlphabetic(5, 20)) - .streamName(streamName2) - .data(SdkBytes.fromByteArray(RandomUtils.nextBytes(10))) + .streamName(streamName1) + .data(SdkBytes.fromByteArray(RandomUtils.nextBytes(1))) + .build(); + PutRecordRequest request3 = PutRecordRequest.builder() + .partitionKey(RandomStringUtils.randomAlphabetic(5, 20)) + .streamName(streamName1) + .data(SdkBytes.fromByteArray(RandomUtils.nextBytes(1))) + .build(); + PutRecordRequest request4 = PutRecordRequest.builder() + .partitionKey(RandomStringUtils.randomAlphabetic(5, 20)) + .streamName(streamName1) + .data(SdkBytes.fromByteArray(RandomUtils.nextBytes(1))) + .build(); + PutRecordRequest request5 = PutRecordRequest.builder() + .partitionKey(RandomStringUtils.randomAlphabetic(5, 20)) + .streamName(streamName1) + .data(SdkBytes.fromByteArray(RandomUtils.nextBytes(1))) + .build(); + PutRecordRequest request6 = PutRecordRequest.builder() + .partitionKey(RandomStringUtils.randomAlphabetic(5, 20)) + .streamName(streamName1) + .data(SdkBytes.fromByteArray(RandomUtils.nextBytes(1))) .build(); try { kinesisClient.putRecord(request1).get(); kinesisClient.putRecord(request2).get(); + kinesisClient.putRecord(request3).get(); + kinesisClient.putRecord(request4).get(); + kinesisClient.putRecord(request5).get(); + kinesisClient.putRecord(request6).get(); + System.out.println("600 records published yeah"); } catch (InterruptedException e) { log.info("Interrupted, assuming shutdown."); } catch (ExecutionException e) {