This commit is contained in:
Qilin Jin 2021-12-06 16:43:07 +00:00 committed by GitHub
commit 695946a2c3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 350 additions and 16 deletions

View file

@ -16,6 +16,7 @@ package software.amazon.kinesis.lifecycle;
import java.util.List; import java.util.List;
import java.util.ListIterator; import java.util.ListIterator;
import java.util.concurrent.TimeUnit;
import lombok.NonNull; import lombok.NonNull;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -45,6 +46,7 @@ import software.amazon.kinesis.schemaregistry.SchemaRegistryDecoder;
@KinesisClientInternalApi @KinesisClientInternalApi
public class ProcessTask implements ConsumerTask { public class ProcessTask implements ConsumerTask {
private static final String PROCESS_TASK_OPERATION = "ProcessTask"; private static final String PROCESS_TASK_OPERATION = "ProcessTask";
private static final String APPLICATION_TRACKER_OPERATION = "ApplicationTracker";
private static final String DATA_BYTES_PROCESSED_METRIC = "DataBytesProcessed"; private static final String DATA_BYTES_PROCESSED_METRIC = "DataBytesProcessed";
private static final String RECORDS_PROCESSED_METRIC = "RecordsProcessed"; private static final String RECORDS_PROCESSED_METRIC = "RecordsProcessed";
private static final String RECORD_PROCESSOR_PROCESS_RECORDS_METRIC = "RecordProcessor.processRecords"; private static final String RECORD_PROCESSOR_PROCESS_RECORDS_METRIC = "RecordProcessor.processRecords";
@ -112,20 +114,23 @@ public class ProcessTask implements ConsumerTask {
*/ */
@Override @Override
public TaskResult call() { public TaskResult call() {
final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, PROCESS_TASK_OPERATION); final MetricsScope scope_app = MetricsUtil.createMetricsWithOperation(metricsFactory, APPLICATION_TRACKER_OPERATION);
final MetricsScope scope_shard = MetricsUtil.createMetricsWithOperation(metricsFactory, PROCESS_TASK_OPERATION);
shardInfo.streamIdentifierSerOpt() shardInfo.streamIdentifierSerOpt()
.ifPresent(streamId -> MetricsUtil.addStreamId(scope, StreamIdentifier.multiStreamInstance(streamId))); .ifPresent(streamId -> MetricsUtil.addStreamId(scope_shard, StreamIdentifier.multiStreamInstance(streamId)));
MetricsUtil.addShardId(scope, shardInfo.shardId()); MetricsUtil.addShardId(scope_shard, shardInfo.shardId());
long startTimeMillis = System.currentTimeMillis(); long startTimeMillis = System.currentTimeMillis();
boolean success = false; boolean success = false;
try { try {
scope.addData(RECORDS_PROCESSED_METRIC, 0, StandardUnit.COUNT, MetricsLevel.SUMMARY); scope_shard.addData(RECORDS_PROCESSED_METRIC, 0, StandardUnit.COUNT, MetricsLevel.SUMMARY);
scope.addData(DATA_BYTES_PROCESSED_METRIC, 0, StandardUnit.BYTES, MetricsLevel.SUMMARY); scope_shard.addData(DATA_BYTES_PROCESSED_METRIC, 0, StandardUnit.BYTES, MetricsLevel.SUMMARY);
Exception exception = null; Exception exception = null;
try { try {
if (processRecordsInput.millisBehindLatest() != null) { if (processRecordsInput.millisBehindLatest() != null) {
scope.addData(MILLIS_BEHIND_LATEST_METRIC, processRecordsInput.millisBehindLatest(), scope_shard.addData(MILLIS_BEHIND_LATEST_METRIC, processRecordsInput.millisBehindLatest(),
StandardUnit.MILLISECONDS, MetricsLevel.SUMMARY);
scope_app.addData(MILLIS_BEHIND_LATEST_METRIC, processRecordsInput.millisBehindLatest(),
StandardUnit.MILLISECONDS, MetricsLevel.SUMMARY); StandardUnit.MILLISECONDS, MetricsLevel.SUMMARY);
} }
@ -142,11 +147,11 @@ public class ProcessTask implements ConsumerTask {
} }
if (!records.isEmpty()) { if (!records.isEmpty()) {
scope.addData(RECORDS_PROCESSED_METRIC, records.size(), StandardUnit.COUNT, MetricsLevel.SUMMARY); scope_shard.addData(RECORDS_PROCESSED_METRIC, records.size(), StandardUnit.COUNT, MetricsLevel.SUMMARY);
} }
recordProcessorCheckpointer.largestPermittedCheckpointValue(filterAndGetMaxExtendedSequenceNumber( recordProcessorCheckpointer.largestPermittedCheckpointValue(filterAndGetMaxExtendedSequenceNumber(
scope, records, recordProcessorCheckpointer.lastCheckpointValue(), scope_shard, records, recordProcessorCheckpointer.lastCheckpointValue(),
recordProcessorCheckpointer.largestPermittedCheckpointValue())); recordProcessorCheckpointer.largestPermittedCheckpointValue()));
if (shouldCallProcessRecords(records)) { if (shouldCallProcessRecords(records)) {
@ -165,8 +170,9 @@ public class ProcessTask implements ConsumerTask {
} }
return new TaskResult(exception); return new TaskResult(exception);
} finally { } finally {
MetricsUtil.addSuccessAndLatency(scope, success, startTimeMillis, MetricsLevel.SUMMARY); MetricsUtil.addSuccessAndLatency(scope_shard, success, startTimeMillis, MetricsLevel.SUMMARY);
MetricsUtil.endScope(scope); MetricsUtil.endScope(scope_shard);
MetricsUtil.endScope(scope_app);
} }
} }

View file

@ -18,10 +18,7 @@ import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.function.Function; import java.util.function.Function;
import org.reactivestreams.Subscription; import org.reactivestreams.Subscription;
@ -331,7 +328,11 @@ public class ShardConsumer {
taskIsRunning = true; taskIsRunning = true;
TaskResult result; TaskResult result;
try { try {
TimeUnit.MILLISECONDS.sleep(20);
result = task.call(); result = task.call();
} catch (InterruptedException e) {
e.printStackTrace();
result = null;
} finally { } finally {
taskIsRunning = false; taskIsRunning = false;
} }

View file

@ -60,7 +60,7 @@ public interface Checkpointer {
/** /**
* Record intent to checkpoint for a shard. Upon failover, the pendingCheckpointValue will be passed to the new * Record intent to checkpoint for a shard. Upon failover, the pendingCheckpoint will be passed to the new
* ShardRecordProcessor's initialize() method. * ShardRecordProcessor's initialize() method.
* *
* @param leaseKey Checkpoint is specified for this shard. * @param leaseKey Checkpoint is specified for this shard.

View file

@ -19,6 +19,7 @@ import com.google.common.collect.Iterables;
import java.time.Duration; import java.time.Duration;
import java.util.Collections; import java.util.Collections;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import lombok.AccessLevel; import lombok.AccessLevel;
import lombok.Data; import lombok.Data;
@ -291,8 +292,10 @@ public class KinesisDataFetcher implements DataFetcher {
@Override @Override
public GetRecordsResponse getGetRecordsResponse(GetRecordsRequest request) throws ExecutionException, InterruptedException, TimeoutException { public GetRecordsResponse getGetRecordsResponse(GetRecordsRequest request) throws ExecutionException, InterruptedException, TimeoutException {
TimeUnit.SECONDS.sleep(20);
final GetRecordsResponse response = FutureUtils.resolveOrCancelFuture(kinesisClient.getRecords(request), final GetRecordsResponse response = FutureUtils.resolveOrCancelFuture(kinesisClient.getRecords(request),
maxFutureWait); maxFutureWait);
System.out.println("millisbehindlatest is " + response.millisBehindLatest());
if (!isValidResult(response.nextShardIterator(), response.childShards())) { if (!isValidResult(response.nextShardIterator(), response.childShards())) {
throw new RetryableRetrievalException("GetRecords response is not valid for shard: " + streamAndShardId throw new RetryableRetrievalException("GetRecords response is not valid for shard: " + streamAndShardId
+ ". nextShardIterator: " + response.nextShardIterator() + ". nextShardIterator: " + response.nextShardIterator()
@ -304,7 +307,7 @@ public class KinesisDataFetcher implements DataFetcher {
@Override @Override
public GetRecordsRequest getGetRecordsRequest(String nextIterator) { public GetRecordsRequest getGetRecordsRequest(String nextIterator) {
return KinesisRequestsBuilder.getRecordsRequestBuilder().shardIterator(nextIterator) return KinesisRequestsBuilder.getRecordsRequestBuilder().shardIterator(nextIterator)
.limit(maxRecords).build(); .limit(10).build();
} }
@Override @Override

View file

@ -0,0 +1,324 @@
package software.amazon.kinesis;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.retrieval.polling.PollingConfig;
public class ApplicationTest {
private static final Logger log = LoggerFactory.getLogger(ApplicationTest.class);
/**
* Invoke the main method
* Verifies valid inputs and then starts running the app.
*/
public static void main(String... args) {
String streamName1 = "consumer-level-metrics-1";
String streamName2 = "consumer-level-metrics-2";
String region = "us-east-2";
String applicationName = "consumer-level-metrics-test-new";
new ApplicationTest(applicationName, streamName1, streamName2, region).run();
}
private final String applicationName;
private final String streamName1;
private final String streamName2;
private final Region region;
private final KinesisAsyncClient kinesisClient;
/**
* Constructor sets streamName and region. It also creates a KinesisClient object to send data to Kinesis.
* This KinesisClient is used to send dummy data so that the consumer has something to read; it is also used
* indirectly by the KCL to handle the consumption of the data.
*/
private ApplicationTest(String applicationName, String streamName1, String streamName2, String region) {
this.applicationName = applicationName;
this.streamName1 = streamName1;
this.streamName2 = streamName2;
this.region = Region.of(ObjectUtils.firstNonNull(region, "us-east-2"));
this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region));
}
private void run() {
/**
* Sends dummy data to Kinesis. Not relevant to consuming the data with the KCL
*/
ScheduledExecutorService producerExecutor = Executors.newSingleThreadScheduledExecutor();
ScheduledFuture<?> producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 1, 1, TimeUnit.SECONDS);
/**
* Sets up configuration for the KCL, including DynamoDB and CloudWatch dependencies. The final argument, a
* ShardRecordProcessorFactory, is where the logic for record processing lives, and is located in a private
* class below.
*/
DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
ConfigsBuilder configsBuilder1 = new ConfigsBuilder(streamName1, applicationName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory());
ConfigsBuilder configsBuilder2 = new ConfigsBuilder(streamName2, applicationName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory());
/**
* The Scheduler (also called Worker in earlier versions of the KCL) is the entry point to the KCL. This
* instance is configured with defaults provided by the ConfigsBuilder.
*/
Scheduler scheduler1 = new Scheduler(
configsBuilder1.checkpointConfig(),
configsBuilder1.coordinatorConfig(),
configsBuilder1.leaseManagementConfig(),
configsBuilder1.lifecycleConfig(),
configsBuilder1.metricsConfig(),
configsBuilder1.processorConfig(),
configsBuilder1.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName1, kinesisClient))
);
Scheduler scheduler2 = new Scheduler(
configsBuilder2.checkpointConfig(),
configsBuilder2.coordinatorConfig(),
configsBuilder2.leaseManagementConfig(),
configsBuilder2.lifecycleConfig(),
configsBuilder2.metricsConfig(),
configsBuilder2.processorConfig(),
configsBuilder2.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName2, kinesisClient))
);
/**
* Kickoff the Scheduler. Record processing of the stream of dummy data will continue indefinitely
* until an exit is triggered.
*/
Thread schedulerThread1 = new Thread(scheduler1);
schedulerThread1.setDaemon(true);
schedulerThread1.start();
Thread schedulerThread2 = new Thread(scheduler2);
schedulerThread2.setDaemon(true);
schedulerThread2.start();
/**
* Allows termination of app by pressing Enter.
*/
System.out.println("Press enter to shutdown");
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
try {
reader.readLine();
} catch (IOException ioex) {
log.error("Caught exception while waiting for confirm. Shutting down.", ioex);
}
/**
* Stops sending dummy data.
*/
log.info("Cancelling producer and shutting down executor.");
producerFuture.cancel(true);
producerExecutor.shutdownNow();
/**
* Stops consuming data. Finishes processing the current batch of data already received from Kinesis
* before shutting down.
*/
Future<Boolean> gracefulShutdownFuture1 = scheduler1.startGracefulShutdown();
Future<Boolean> gracefulShutdownFuture2 = scheduler2.startGracefulShutdown();
log.info("Waiting up to 20 seconds for shutdown to complete.");
try {
gracefulShutdownFuture1.get(20, TimeUnit.SECONDS);
gracefulShutdownFuture2.get(20, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.info("Interrupted while waiting for graceful shutdown. Continuing.");
} catch (ExecutionException e) {
log.error("Exception while executing graceful shutdown.", e);
} catch (TimeoutException e) {
log.error("Timeout while waiting for shutdown. Scheduler may not have exited.");
}
log.info("Completed, shutting down now.");
}
/**
* Sends a single record of dummy data to Kinesis.
*/
private void publishRecord() {
PutRecordRequest request1 = PutRecordRequest.builder()
.partitionKey(RandomStringUtils.randomAlphabetic(5, 20))
.streamName(streamName2)
.data(SdkBytes.fromByteArray(RandomUtils.nextBytes(10)))
.build();
PutRecordRequest request2 = PutRecordRequest.builder()
.partitionKey(RandomStringUtils.randomAlphabetic(5, 20))
.streamName(streamName1)
.data(SdkBytes.fromByteArray(RandomUtils.nextBytes(1)))
.build();
PutRecordRequest request3 = PutRecordRequest.builder()
.partitionKey(RandomStringUtils.randomAlphabetic(5, 20))
.streamName(streamName1)
.data(SdkBytes.fromByteArray(RandomUtils.nextBytes(1)))
.build();
PutRecordRequest request4 = PutRecordRequest.builder()
.partitionKey(RandomStringUtils.randomAlphabetic(5, 20))
.streamName(streamName1)
.data(SdkBytes.fromByteArray(RandomUtils.nextBytes(1)))
.build();
PutRecordRequest request5 = PutRecordRequest.builder()
.partitionKey(RandomStringUtils.randomAlphabetic(5, 20))
.streamName(streamName1)
.data(SdkBytes.fromByteArray(RandomUtils.nextBytes(1)))
.build();
PutRecordRequest request6 = PutRecordRequest.builder()
.partitionKey(RandomStringUtils.randomAlphabetic(5, 20))
.streamName(streamName1)
.data(SdkBytes.fromByteArray(RandomUtils.nextBytes(1)))
.build();
try {
kinesisClient.putRecord(request1).get();
kinesisClient.putRecord(request2).get();
kinesisClient.putRecord(request3).get();
kinesisClient.putRecord(request4).get();
kinesisClient.putRecord(request5).get();
kinesisClient.putRecord(request6).get();
System.out.println("600 records published yeah");
} catch (InterruptedException e) {
log.info("Interrupted, assuming shutdown.");
} catch (ExecutionException e) {
log.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
}
}
private static class SampleRecordProcessorFactory implements ShardRecordProcessorFactory {
public ShardRecordProcessor shardRecordProcessor() {
return new SampleRecordProcessor();
}
}
/**
* The implementation of the ShardRecordProcessor interface is where the heart of the record processing logic lives.
* In this example all we do to 'process' is log info about the records.
*/
private static class SampleRecordProcessor implements ShardRecordProcessor {
private static final String SHARD_ID_MDC_KEY = "ShardId";
private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class);
private String shardId;
/**
* Invoked by the KCL before data records are delivered to the ShardRecordProcessor instance (via
* processRecords). In this example we do nothing except some logging.
*
* @param initializationInput Provides information related to initialization.
*/
public void initialize(InitializationInput initializationInput) {
shardId = initializationInput.shardId();
MDC.put(SHARD_ID_MDC_KEY, shardId);
try {
log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
} finally {
MDC.remove(SHARD_ID_MDC_KEY);
}
}
/**
* Handles record processing logic. The Amazon Kinesis Client Library will invoke this method to deliver
* data records to the application. In this example we simply log our records.
*
* @param processRecordsInput Provides the records to be processed as well as information and capabilities
* related to them (e.g. checkpointing).
*/
public void processRecords(ProcessRecordsInput processRecordsInput) {
MDC.put(SHARD_ID_MDC_KEY, shardId);
try {
log.info("Processing {} record(s)", processRecordsInput.records().size());
processRecordsInput.records().forEach(r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber()));
} catch (Throwable t) {
log.error("Caught throwable while processing records. Aborting.");
Runtime.getRuntime().halt(1);
} finally {
MDC.remove(SHARD_ID_MDC_KEY);
}
}
/** Called when the lease tied to this record processor has been lost. Once the lease has been lost,
* the record processor can no longer checkpoint.
*
* @param leaseLostInput Provides access to functions and data related to the loss of the lease.
*/
public void leaseLost(LeaseLostInput leaseLostInput) {
MDC.put(SHARD_ID_MDC_KEY, shardId);
try {
log.info("Lost lease, so terminating.");
} finally {
MDC.remove(SHARD_ID_MDC_KEY);
}
}
/**
* Called when all data on this shard has been processed. Checkpointing must occur in the method for record
* processing to be considered complete; an exception will be thrown otherwise.
*
* @param shardEndedInput Provides access to a checkpointer method for completing processing of the shard.
*/
public void shardEnded(ShardEndedInput shardEndedInput) {
MDC.put(SHARD_ID_MDC_KEY, shardId);
try {
log.info("Reached shard end checkpointing.");
shardEndedInput.checkpointer().checkpoint();
} catch (ShutdownException | InvalidStateException e) {
log.error("Exception while checkpointing at shard end. Giving up.", e);
} finally {
MDC.remove(SHARD_ID_MDC_KEY);
}
}
/**
* Invoked when Scheduler has been requested to shut down (i.e. we decide to stop running the app by pressing
* Enter). Checkpoints and logs the data a final time.
*
* @param shutdownRequestedInput Provides access to a checkpointer, allowing a record processor to checkpoint
* before the shutdown is completed.
*/
public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
MDC.put(SHARD_ID_MDC_KEY, shardId);
try {
log.info("Scheduler is shutting down, checkpointing.");
shutdownRequestedInput.checkpointer().checkpoint();
} catch (ShutdownException | InvalidStateException e) {
log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
} finally {
MDC.remove(SHARD_ID_MDC_KEY);
}
}
}
}