temporary test file
This commit is contained in:
parent
4cd395da98
commit
92e02017f0
1 changed files with 272 additions and 0 deletions
|
|
@ -0,0 +1,272 @@
|
||||||
|
package software.amazon.kinesis;
|
||||||
|
|
||||||
|
import java.io.BufferedReader;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStreamReader;
|
||||||
|
import java.util.UUID;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
import java.util.concurrent.ScheduledFuture;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.ObjectUtils;
|
||||||
|
import org.apache.commons.lang3.RandomStringUtils;
|
||||||
|
import org.apache.commons.lang3.RandomUtils;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.slf4j.MDC;
|
||||||
|
|
||||||
|
import software.amazon.awssdk.core.SdkBytes;
|
||||||
|
import software.amazon.awssdk.regions.Region;
|
||||||
|
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
|
||||||
|
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
|
||||||
|
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
|
||||||
|
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
|
||||||
|
import software.amazon.kinesis.common.ConfigsBuilder;
|
||||||
|
import software.amazon.kinesis.common.KinesisClientUtil;
|
||||||
|
import software.amazon.kinesis.coordinator.Scheduler;
|
||||||
|
import software.amazon.kinesis.exceptions.InvalidStateException;
|
||||||
|
import software.amazon.kinesis.exceptions.ShutdownException;
|
||||||
|
import software.amazon.kinesis.lifecycle.events.InitializationInput;
|
||||||
|
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
|
||||||
|
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
|
||||||
|
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
|
||||||
|
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
|
||||||
|
|
||||||
|
import software.amazon.kinesis.processor.ShardRecordProcessor;
|
||||||
|
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
|
||||||
|
import software.amazon.kinesis.retrieval.polling.PollingConfig;
|
||||||
|
|
||||||
|
public class ApplicationTest {
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(ApplicationTest.class);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Invoke the main method
|
||||||
|
* Verifies valid inputs and then starts running the app.
|
||||||
|
*/
|
||||||
|
public static void main(String... args) {
|
||||||
|
String streamName1 = "consumer-level-metrics-1";
|
||||||
|
String streamName2 = "consumer-level-metrics-2";
|
||||||
|
String region = "us-east-2";
|
||||||
|
|
||||||
|
new ApplicationTest(streamName1, region).run();
|
||||||
|
new ApplicationTest(streamName2, region).run();
|
||||||
|
}
|
||||||
|
|
||||||
|
private final String streamName;
|
||||||
|
private final Region region;
|
||||||
|
private final KinesisAsyncClient kinesisClient;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor sets streamName and region. It also creates a KinesisClient object to send data to Kinesis.
|
||||||
|
* This KinesisClient is used to send dummy data so that the consumer has something to read; it is also used
|
||||||
|
* indirectly by the KCL to handle the consumption of the data.
|
||||||
|
*/
|
||||||
|
private ApplicationTest(String streamName, String region) {
|
||||||
|
this.streamName = streamName;
|
||||||
|
this.region = Region.of(ObjectUtils.firstNonNull(region, "us-east-2"));
|
||||||
|
this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void run() {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sends dummy data to Kinesis. Not relevant to consuming the data with the KCL
|
||||||
|
*/
|
||||||
|
ScheduledExecutorService producerExecutor = Executors.newSingleThreadScheduledExecutor();
|
||||||
|
ScheduledFuture<?> producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 10, 1, TimeUnit.SECONDS);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets up configuration for the KCL, including DynamoDB and CloudWatch dependencies. The final argument, a
|
||||||
|
* ShardRecordProcessorFactory, is where the logic for record processing lives, and is located in a private
|
||||||
|
* class below.
|
||||||
|
*/
|
||||||
|
DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
|
||||||
|
CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
|
||||||
|
ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, streamName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory());
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The Scheduler (also called Worker in earlier versions of the KCL) is the entry point to the KCL. This
|
||||||
|
* instance is configured with defaults provided by the ConfigsBuilder.
|
||||||
|
*/
|
||||||
|
Scheduler scheduler = new Scheduler(
|
||||||
|
configsBuilder.checkpointConfig(),
|
||||||
|
configsBuilder.coordinatorConfig(),
|
||||||
|
configsBuilder.leaseManagementConfig(),
|
||||||
|
configsBuilder.lifecycleConfig(),
|
||||||
|
configsBuilder.metricsConfig(),
|
||||||
|
configsBuilder.processorConfig(),
|
||||||
|
configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
|
||||||
|
);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Kickoff the Scheduler. Record processing of the stream of dummy data will continue indefinitely
|
||||||
|
* until an exit is triggered.
|
||||||
|
*/
|
||||||
|
Thread schedulerThread = new Thread(scheduler);
|
||||||
|
schedulerThread.setDaemon(true);
|
||||||
|
schedulerThread.start();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Allows termination of app by pressing Enter.
|
||||||
|
*/
|
||||||
|
System.out.println("Press enter to shutdown");
|
||||||
|
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
|
||||||
|
try {
|
||||||
|
reader.readLine();
|
||||||
|
} catch (IOException ioex) {
|
||||||
|
log.error("Caught exception while waiting for confirm. Shutting down.", ioex);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stops sending dummy data.
|
||||||
|
*/
|
||||||
|
log.info("Cancelling producer and shutting down executor.");
|
||||||
|
producerFuture.cancel(true);
|
||||||
|
producerExecutor.shutdownNow();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stops consuming data. Finishes processing the current batch of data already received from Kinesis
|
||||||
|
* before shutting down.
|
||||||
|
*/
|
||||||
|
Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
|
||||||
|
log.info("Waiting up to 20 seconds for shutdown to complete.");
|
||||||
|
try {
|
||||||
|
gracefulShutdownFuture.get(20, TimeUnit.SECONDS);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
log.info("Interrupted while waiting for graceful shutdown. Continuing.");
|
||||||
|
} catch (ExecutionException e) {
|
||||||
|
log.error("Exception while executing graceful shutdown.", e);
|
||||||
|
} catch (TimeoutException e) {
|
||||||
|
log.error("Timeout while waiting for shutdown. Scheduler may not have exited.");
|
||||||
|
}
|
||||||
|
log.info("Completed, shutting down now.");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sends a single record of dummy data to Kinesis.
|
||||||
|
*/
|
||||||
|
private void publishRecord() {
|
||||||
|
PutRecordRequest request = PutRecordRequest.builder()
|
||||||
|
.partitionKey(RandomStringUtils.randomAlphabetic(5, 20))
|
||||||
|
.streamName(streamName)
|
||||||
|
.data(SdkBytes.fromByteArray(RandomUtils.nextBytes(10)))
|
||||||
|
.build();
|
||||||
|
try {
|
||||||
|
kinesisClient.putRecord(request).get();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
log.info("Interrupted, assuming shutdown.");
|
||||||
|
} catch (ExecutionException e) {
|
||||||
|
log.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class SampleRecordProcessorFactory implements ShardRecordProcessorFactory {
|
||||||
|
public ShardRecordProcessor shardRecordProcessor() {
|
||||||
|
return new SampleRecordProcessor();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The implementation of the ShardRecordProcessor interface is where the heart of the record processing logic lives.
|
||||||
|
* In this example all we do to 'process' is log info about the records.
|
||||||
|
*/
|
||||||
|
private static class SampleRecordProcessor implements ShardRecordProcessor {
|
||||||
|
|
||||||
|
private static final String SHARD_ID_MDC_KEY = "ShardId";
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class);
|
||||||
|
|
||||||
|
private String shardId;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Invoked by the KCL before data records are delivered to the ShardRecordProcessor instance (via
|
||||||
|
* processRecords). In this example we do nothing except some logging.
|
||||||
|
*
|
||||||
|
* @param initializationInput Provides information related to initialization.
|
||||||
|
*/
|
||||||
|
public void initialize(InitializationInput initializationInput) {
|
||||||
|
shardId = initializationInput.shardId();
|
||||||
|
MDC.put(SHARD_ID_MDC_KEY, shardId);
|
||||||
|
try {
|
||||||
|
log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
|
||||||
|
} finally {
|
||||||
|
MDC.remove(SHARD_ID_MDC_KEY);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handles record processing logic. The Amazon Kinesis Client Library will invoke this method to deliver
|
||||||
|
* data records to the application. In this example we simply log our records.
|
||||||
|
*
|
||||||
|
* @param processRecordsInput Provides the records to be processed as well as information and capabilities
|
||||||
|
* related to them (e.g. checkpointing).
|
||||||
|
*/
|
||||||
|
public void processRecords(ProcessRecordsInput processRecordsInput) {
|
||||||
|
MDC.put(SHARD_ID_MDC_KEY, shardId);
|
||||||
|
try {
|
||||||
|
log.info("Processing {} record(s)", processRecordsInput.records().size());
|
||||||
|
processRecordsInput.records().forEach(r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber()));
|
||||||
|
} catch (Throwable t) {
|
||||||
|
log.error("Caught throwable while processing records. Aborting.");
|
||||||
|
Runtime.getRuntime().halt(1);
|
||||||
|
} finally {
|
||||||
|
MDC.remove(SHARD_ID_MDC_KEY);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Called when the lease tied to this record processor has been lost. Once the lease has been lost,
|
||||||
|
* the record processor can no longer checkpoint.
|
||||||
|
*
|
||||||
|
* @param leaseLostInput Provides access to functions and data related to the loss of the lease.
|
||||||
|
*/
|
||||||
|
public void leaseLost(LeaseLostInput leaseLostInput) {
|
||||||
|
MDC.put(SHARD_ID_MDC_KEY, shardId);
|
||||||
|
try {
|
||||||
|
log.info("Lost lease, so terminating.");
|
||||||
|
} finally {
|
||||||
|
MDC.remove(SHARD_ID_MDC_KEY);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called when all data on this shard has been processed. Checkpointing must occur in the method for record
|
||||||
|
* processing to be considered complete; an exception will be thrown otherwise.
|
||||||
|
*
|
||||||
|
* @param shardEndedInput Provides access to a checkpointer method for completing processing of the shard.
|
||||||
|
*/
|
||||||
|
public void shardEnded(ShardEndedInput shardEndedInput) {
|
||||||
|
MDC.put(SHARD_ID_MDC_KEY, shardId);
|
||||||
|
try {
|
||||||
|
log.info("Reached shard end checkpointing.");
|
||||||
|
shardEndedInput.checkpointer().checkpoint();
|
||||||
|
} catch (ShutdownException | InvalidStateException e) {
|
||||||
|
log.error("Exception while checkpointing at shard end. Giving up.", e);
|
||||||
|
} finally {
|
||||||
|
MDC.remove(SHARD_ID_MDC_KEY);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Invoked when Scheduler has been requested to shut down (i.e. we decide to stop running the app by pressing
|
||||||
|
* Enter). Checkpoints and logs the data a final time.
|
||||||
|
*
|
||||||
|
* @param shutdownRequestedInput Provides access to a checkpointer, allowing a record processor to checkpoint
|
||||||
|
* before the shutdown is completed.
|
||||||
|
*/
|
||||||
|
public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
|
||||||
|
MDC.put(SHARD_ID_MDC_KEY, shardId);
|
||||||
|
try {
|
||||||
|
log.info("Scheduler is shutting down, checkpointing.");
|
||||||
|
shutdownRequestedInput.checkpointer().checkpoint();
|
||||||
|
} catch (ShutdownException | InvalidStateException e) {
|
||||||
|
log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
|
||||||
|
} finally {
|
||||||
|
MDC.remove(SHARD_ID_MDC_KEY);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Reference in a new issue