Updates based on 2nd round of comments

This commit is contained in:
Meher Mankikar 2023-06-16 11:41:32 -07:00
parent 8dcbb75031
commit 74a74c3a00
13 changed files with 228 additions and 314 deletions

View file

@ -38,9 +38,9 @@ After you've downloaded the code from GitHub, you can build it using Maven. To d
## Running Integration Tests
To run integration tests to test any changes to KCL, you can use this command: `mvn -Dit.test=*IntegrationTest verify`.
This will look for default AWS credentials in your local `.aws/credentials`. If you want to override these
credentials, you can provide the name of an IAM user as a string using this command: `mvn -Dit.test=*IntegrationTest -Dcredentials="<IAM_USER>" verify`.
To run integration tests: `mvn -Dit.test=*IntegrationTest verify`.
This will look for a default AWS profile specified in your local `.aws/credentials`.
Optionally, you can provide the name of an IAM user to run tests with as a string using this command: `mvn -Dit.test=*IntegrationTest -Dcredentials="<IAM_USER>" verify`.
## Integration with the Kinesis Producer Library
For producer-side developers using the **[Kinesis Producer Library (KPL)][kinesis-guide-kpl]**, the KCL integrates without additional effort. When the KCL retrieves an aggregated Amazon Kinesis record consisting of multiple KPL user records, it will automatically invoke the KPL to extract the individual user records before returning them to the user.

View file

@ -1,108 +0,0 @@
package software.amazon.kinesis.config;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.http.Protocol;
import software.amazon.awssdk.regions.Region;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.retrieval.RetrievalConfig;
import java.io.IOException;
import java.net.URISyntaxException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Date;
/**
* Basic config for a release canary (streaming) with default settings
*/
@Slf4j
public class BasicReleaseCanaryConfig implements KCLAppConfig {
@Override
public String getStreamName() {
return "";
}
@Override
public int getShardCount() {
return 10;
}
@Override
public String getApplicationName() {
return "";
}
@Override
public String getEndpoint() {
return "";
}
@Override
public Region getRegion() {
return Region.US_WEST_2;
}
/**
* This will get the credentials that are provided in the maven command
* when running integration tests if any are provided through -Dcredentials=iamUser
* Otherwise, iamUser will be null and the test will search for default credentials
* in the test environment.
*/
@Override
public String getProfile() {
String iamUser = System.getProperty("credentials");
return iamUser;
}
@Override
public InitialPositionInStream getInitialPosition() {
return InitialPositionInStream.TRIM_HORIZON;
}
@Override
public Protocol getConsumerProtocol() {
return Protocol.HTTP1_1;
}
@Override
public Protocol getProducerProtocol() {
return Protocol.HTTP1_1;
}
@Override
public ProducerConfig getProducerConfig() {
return ProducerConfig.builder()
.isBatchPut(false)
.batchSize(1)
.recordSizeKB(60)
.callPeriodMills(100)
.build();
}
@Override
public ReshardConfig getReshardConfig() {
return null;
}
@Override
public RetrievalConfig getRetrievalConfig() throws IOException, URISyntaxException {
LocalDateTime d = LocalDateTime.now();
d = d.minusMinutes(5);
Instant instant = d.atZone(ZoneId.systemDefault()).toInstant();
Date startStreamTime = Date.from(instant);
InitialPositionInStreamExtended initialPosition = InitialPositionInStreamExtended
.newInitialPositionAtTimestamp(startStreamTime);
/**
* Default is a streaming consumer
*/
RetrievalConfig config = getConfigsBuilder().retrievalConfig();
config.initialPositionInStreamExtended(initialPosition);
return config;
}
}

View file

@ -2,14 +2,15 @@ package software.amazon.kinesis.config;
import lombok.Value;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.utils.RecordValidatorQueue;
import software.amazon.kinesis.utils.ReshardOptions;
import software.amazon.kinesis.utils.TestRecordProcessorFactory;
import lombok.Builder;
import software.amazon.awssdk.arns.Arn;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
import software.amazon.awssdk.http.Protocol;
import software.amazon.awssdk.http.SdkHttpConfigurationOption;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
import software.amazon.awssdk.regions.Region;
@ -31,133 +32,156 @@ import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.Optional;
public interface KCLAppConfig {
/**
* Default configuration for a producer or consumer used in integration tests.
* Producer: puts records of size 60 KB at an interval of 100 ms
* Consumer: streaming configuration (vs polling) that starts processing records put on the shard 5 minutes before
* the start of the test
*/
public abstract class KCLAppConfig {
String getStreamName();
/**
* Name used for test stream and DDB table
*/
public abstract String getStreamName();
default String getStreamArn() {
public String getStreamArn() {
return null;
}
int getShardCount();
public int getShardCount() { return 4; }
String getApplicationName();
String getEndpoint();
Region getRegion();
public String getEndpoint() { return ""; }
public Region getRegion() { return Region.US_WEST_2; }
/**
* "default" profile, should match with profiles listed in "cat ~/.aws/config"
*/
String getProfile();
public String getProfile() {
String iamUser = System.getProperty("credentials");
return iamUser;
}
InitialPositionInStream getInitialPosition();
public InitialPositionInStream getInitialPosition() {
return InitialPositionInStream.TRIM_HORIZON;
}
Protocol getConsumerProtocol();
public Protocol getConsumerProtocol() {
return Protocol.HTTP1_1;
}
Protocol getProducerProtocol();
public Protocol getProducerProtocol() {
return Protocol.HTTP1_1;
}
ProducerConfig getProducerConfig();
public ProducerConfig getProducerConfig() {
return ProducerConfig.builder()
.isBatchPut(false)
.batchSize(1)
.recordSizeKB(60)
.callPeriodMills(100)
.build();
}
ReshardConfig getReshardConfig();
public ReshardConfig getReshardConfig() {
return null;
}
default KinesisAsyncClient buildConsumerClient() throws URISyntaxException, IOException {
public KinesisAsyncClient buildConsumerClient() throws URISyntaxException, IOException {
return buildAsyncKinesisClient(getConsumerProtocol());
}
default KinesisAsyncClient buildProducerClient() throws URISyntaxException, IOException {
public KinesisAsyncClient buildProducerClient() throws URISyntaxException, IOException {
return buildAsyncKinesisClient(getProducerProtocol());
}
default KinesisAsyncClient buildAsyncKinesisClient(Protocol protocol) throws URISyntaxException, IOException {
public KinesisAsyncClient buildAsyncKinesisClient(Protocol protocol) throws URISyntaxException, IOException {
return buildAsyncKinesisClient(Optional.ofNullable(protocol));
}
default KinesisAsyncClient buildAsyncKinesisClient(Optional<Protocol> protocol) throws URISyntaxException, IOException {
private AwsCredentialsProvider getCredentialsProvider() {
return (getProfile() != null) ?
ProfileCredentialsProvider.builder().profileName(getProfile()).build() : DefaultCredentialsProvider.create();
}
/**
* Setup H2 client config.
*/
public KinesisAsyncClient buildAsyncKinesisClient(Optional<Protocol> protocol) throws URISyntaxException, IOException {
// Setup H2 client config.
final NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder()
.maxConcurrency(Integer.MAX_VALUE);
/**
* If not present, defaults to HTTP1_1
*/
// If not present, defaults to HTTP1_1
if (protocol.isPresent()) {
builder.protocol(protocol.get());
}
final SdkAsyncHttpClient sdkAsyncHttpClient =
builder.buildWithDefaults(AttributeMap.builder().put(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, true).build());
builder.buildWithDefaults(AttributeMap.builder().build());
/**
* Setup client builder by default values
*/
// Setup client builder by default values
final KinesisAsyncClientBuilder kinesisAsyncClientBuilder = KinesisAsyncClient.builder().region(getRegion());
kinesisAsyncClientBuilder.httpClient(sdkAsyncHttpClient);
AwsCredentialsProvider credentialsProvider = (getProfile() != null) ?
ProfileCredentialsProvider.builder().profileName(getProfile()).build() : DefaultCredentialsProvider.create();
kinesisAsyncClientBuilder.credentialsProvider( credentialsProvider );
kinesisAsyncClientBuilder.credentialsProvider(getCredentialsProvider());
return kinesisAsyncClientBuilder.build();
}
default DynamoDbAsyncClient buildAsyncDynamoDbClient() throws IOException {
public DynamoDbAsyncClient buildAsyncDynamoDbClient() throws IOException {
final DynamoDbAsyncClientBuilder builder = DynamoDbAsyncClient.builder().region(getRegion());
AwsCredentialsProvider credentialsProvider = (getProfile() != null) ?
ProfileCredentialsProvider.builder().profileName(getProfile()).build() : DefaultCredentialsProvider.create();
builder.credentialsProvider(credentialsProvider);
builder.credentialsProvider(getCredentialsProvider());
return builder.build();
}
default CloudWatchAsyncClient buildAsyncCloudWatchClient() throws IOException {
public CloudWatchAsyncClient buildAsyncCloudWatchClient() throws IOException {
final CloudWatchAsyncClientBuilder builder = CloudWatchAsyncClient.builder().region(getRegion());
AwsCredentialsProvider credentialsProvider = (getProfile() != null) ?
ProfileCredentialsProvider.builder().profileName(getProfile()).build() : DefaultCredentialsProvider.create();
builder.credentialsProvider(credentialsProvider);
builder.credentialsProvider(getCredentialsProvider());
return builder.build();
}
default String getWorkerId() throws UnknownHostException {
public String getWorkerId() throws UnknownHostException {
return Inet4Address.getLocalHost().getHostName();
}
default RecordValidatorQueue getRecordValidator() {
public RecordValidatorQueue getRecordValidator() {
return new RecordValidatorQueue();
}
default ShardRecordProcessorFactory getShardRecordProcessorFactory() {
public ShardRecordProcessorFactory getShardRecordProcessorFactory() {
return new TestRecordProcessorFactory(getRecordValidator());
}
default ConfigsBuilder getConfigsBuilder() throws IOException, URISyntaxException {
public ConfigsBuilder getConfigsBuilder() throws IOException, URISyntaxException {
final String workerId = getWorkerId();
if (getStreamArn() == null) {
return new ConfigsBuilder(getStreamName(), getApplicationName(), buildConsumerClient(), buildAsyncDynamoDbClient(),
return new ConfigsBuilder(getStreamName(), getStreamName(), buildConsumerClient(), buildAsyncDynamoDbClient(),
buildAsyncCloudWatchClient(), workerId, getShardRecordProcessorFactory());
} else {
return new ConfigsBuilder(Arn.fromString(getStreamArn()), getApplicationName(), buildConsumerClient(), buildAsyncDynamoDbClient(),
return new ConfigsBuilder(Arn.fromString(getStreamArn()), getStreamName(), buildConsumerClient(), buildAsyncDynamoDbClient(),
buildAsyncCloudWatchClient(), workerId, getShardRecordProcessorFactory());
}
}
RetrievalConfig getRetrievalConfig() throws IOException, URISyntaxException;
public RetrievalConfig getRetrievalConfig() throws IOException, URISyntaxException {
InitialPositionInStreamExtended initialPosition = InitialPositionInStreamExtended
.newInitialPosition(getInitialPosition());
// Default is a streaming consumer
RetrievalConfig config = getConfigsBuilder().retrievalConfig();
config.initialPositionInStreamExtended(initialPosition);
return config;
}
/**
* Configure ingress load (batch size, record size, and calling interval)
*/
@Value
@Builder
class ProducerConfig {
static class ProducerConfig {
private boolean isBatchPut;
private int batchSize;
private int recordSizeKB;
@ -166,19 +190,24 @@ public interface KCLAppConfig {
/**
* Description of the method of resharding for a test case
* <p>
* reshardingFactorCycle: lists the scales by which the number of shards in a stream will be updated
* in sequence. e.g {2.0, 0.5} means that the number of shards will first be doubled, then halved
* <p>
* numReshardCycles: the number of resharding cycles that will be executed in a test]
* <p>
* reshardFrequencyMillis: the period of time between reshard cycles (in milliseconds)
*/
@Value
@Builder
class ReshardConfig {
private double[] reshardingFactorCycle;
static class ReshardConfig {
/**
* reshardingFactorCycle: lists the order or reshards that will be done during one reshard cycle
* e.g {SPLIT, MERGE} means that the number of shards will first be doubled, then halved
*/
private ReshardOptions[] reshardingFactorCycle;
/**
* numReshardCycles: the number of resharding cycles that will be executed in a test
*/
private int numReshardCycles;
/**
* reshardFrequencyMillis: the period of time between reshard cycles (in milliseconds)
*/
private long reshardFrequencyMillis;
}

View file

@ -7,28 +7,14 @@ import software.amazon.kinesis.retrieval.polling.PollingConfig;
import java.io.IOException;
import java.net.URISyntaxException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Date;
/**
* Config for a polling consumer with HTTP protocol of HTTP1
*/
public class ReleaseCanaryPollingH1TestConfig extends BasicReleaseCanaryConfig {
public class ReleaseCanaryPollingH1TestConfig extends KCLAppConfig {
@Override
public String getStreamName() {
return "KCLReleaseCanary2XPollingH1TestConfig";
}
@Override
public int getShardCount() {
return 20;
}
@Override
public String getApplicationName() {
return "KCLReleaseCanary2XPollingH1TestConfigApplication";
return "KCLReleaseCanary2XPollingH1TestStream";
}
@Override
@ -38,13 +24,9 @@ public class ReleaseCanaryPollingH1TestConfig extends BasicReleaseCanaryConfig {
@Override
public RetrievalConfig getRetrievalConfig() throws IOException, URISyntaxException {
LocalDateTime d = LocalDateTime.now();
d = d.minusMinutes(5);
Instant instant = d.atZone(ZoneId.systemDefault()).toInstant();
Date startStreamTime = Date.from(instant);
InitialPositionInStreamExtended initialPosition = InitialPositionInStreamExtended
.newInitialPositionAtTimestamp(startStreamTime);
.newInitialPosition(getInitialPosition());
RetrievalConfig config = getConfigsBuilder().retrievalConfig();
config.initialPositionInStreamExtended(initialPosition);

View file

@ -7,28 +7,14 @@ import software.amazon.kinesis.retrieval.polling.PollingConfig;
import java.io.IOException;
import java.net.URISyntaxException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Date;
/**
* Config for a polling consumer with HTTP protocol of HTTP2
*/
public class ReleaseCanaryPollingH2TestConfig extends BasicReleaseCanaryConfig {
public class ReleaseCanaryPollingH2TestConfig extends KCLAppConfig {
@Override
public String getStreamName() {
return "KCLTest3";
}
@Override
public int getShardCount() {
return 20;
}
@Override
public String getApplicationName() {
return "KCLReleaseCanary2XPollingH2TestApplication";
return "KCLReleaseCanary2XPollingH2TestStream";
}
@Override
@ -38,13 +24,9 @@ public class ReleaseCanaryPollingH2TestConfig extends BasicReleaseCanaryConfig {
@Override
public RetrievalConfig getRetrievalConfig() throws IOException, URISyntaxException {
LocalDateTime d = LocalDateTime.now();
d = d.minusMinutes(5);
Instant instant = d.atZone(ZoneId.systemDefault()).toInstant();
Date startStreamTime = Date.from(instant);
InitialPositionInStreamExtended initialPosition = InitialPositionInStreamExtended
.newInitialPositionAtTimestamp(startStreamTime);
.newInitialPosition(getInitialPosition());
RetrievalConfig config = getConfigsBuilder().retrievalConfig();
config.initialPositionInStreamExtended(initialPosition);

View file

@ -5,17 +5,12 @@ import software.amazon.awssdk.http.Protocol;
/**
* Config for a streaming consumer with HTTP protocol of HTTP2
*/
public class ReleaseCanaryStreamingTestConfig extends BasicReleaseCanaryConfig {
public class ReleaseCanaryStreamingTestConfig extends KCLAppConfig {
@Override
public String getStreamName() {
return "KCLReleaseCanary2XStreamingTestStream";
}
@Override
public String getApplicationName() {
return "KCLReleaseCanary2XStreamingTestApplication";
}
@Override
public Protocol getConsumerProtocol() {
return Protocol.HTTP2;

View file

@ -1,6 +1,5 @@
package software.amazon.kinesis.lifecycle;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import software.amazon.kinesis.config.KCLAppConfig;
import software.amazon.kinesis.config.ReleaseCanaryPollingH1TestConfig;
@ -8,7 +7,6 @@ import software.amazon.kinesis.config.ReleaseCanaryPollingH2TestConfig;
import software.amazon.kinesis.config.ReleaseCanaryStreamingTestConfig;
import software.amazon.kinesis.utils.TestConsumer;
@Slf4j
public class BasicStreamingPollingIntegrationTest {
@Test

View file

@ -27,9 +27,8 @@ public class RecordValidatorQueue {
}
public RecordValidationStatus validateRecords(int trueTotalShardCount) {
/**
* Validate that each List in the HashMap has data records in increasing order
*/
// Validate that each List in the HashMap has data records in increasing order
boolean incOrder = true;
for (Map.Entry<String, List<String>> entry : dict.entrySet()) {
List<String> recordsPerShard = entry.getValue();
@ -49,16 +48,12 @@ public class RecordValidatorQueue {
}
}
/**
* If this is true, then there was some record that was processed out of order
*/
// If this is true, then there was some record that was processed out of order
if (!incOrder) {
return RecordValidationStatus.OUT_OF_ORDER;
}
/**
* Validate that no records are missing over all shards
*/
// Validate that no records are missing over all shards
int totalShardCount = 0;
for (Map.Entry<String, List<String>> entry : dict.entrySet()) {
List<String> recordsPerShard = entry.getValue();
@ -66,17 +61,13 @@ public class RecordValidatorQueue {
totalShardCount += noDupRecords.size();
}
/**
* If this is true, then there was some record that was missed during processing.
*/
// If this is true, then there was some record that was missed during processing.
if (totalShardCount != trueTotalShardCount) {
log.error("Failed to get correct number of records processed. Should be {} but was {}", trueTotalShardCount, totalShardCount);
return RecordValidationStatus.MISSING_RECORD;
}
/**
* Record validation succeeded.
*/
// Record validation succeeded.
return RecordValidationStatus.NO_ERROR;
}

View file

@ -10,7 +10,7 @@ public class RecordValidatorQueueTest {
private static final String SHARD_ID = "ABC";
@Test
public void validationFailedRecordOutOfOrderTest() {
public void testValidationFailedRecordOutOfOrder() {
recordValidator.add(SHARD_ID, "0");
recordValidator.add(SHARD_ID, "1");
recordValidator.add(SHARD_ID, "3");
@ -21,7 +21,7 @@ public class RecordValidatorQueueTest {
}
@Test
public void validationFailedMissingRecordTest() {
public void testValidationFailedMissingRecord() {
recordValidator.add(SHARD_ID, "0");
recordValidator.add(SHARD_ID, "1");
recordValidator.add(SHARD_ID, "2");
@ -32,7 +32,7 @@ public class RecordValidatorQueueTest {
}
@Test
public void validRecordsTest() {
public void testValidRecords() {
recordValidator.add(SHARD_ID, "0");
recordValidator.add(SHARD_ID, "1");
recordValidator.add(SHARD_ID, "2");

View file

@ -0,0 +1,11 @@
package software.amazon.kinesis.utils;
/**
* Specifies the types of resharding possible in integration tests
* Split doubles the number of shards.
* Merge halves the number of shards.
*/
public enum ReshardOptions {
SPLIT,
MERGE
}

View file

@ -4,6 +4,7 @@ import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
import software.amazon.awssdk.services.kinesis.model.DeleteStreamRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryResponse;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
@ -84,6 +85,35 @@ public class StreamExistenceManager {
}
}
public void deleteStream(String streamName) {
DeleteStreamRequest request = DeleteStreamRequest.builder().streamName(streamName).enforceConsumerDeletion(true).build();
try{
client.deleteStream(request).get(30, TimeUnit.SECONDS);
} catch (Exception e) {
throw new RuntimeException("Failed to delete stream with name " + streamName, e);
}
int i = 0;
while (true) {
i++;
if (i > 100) {
throw new RuntimeException("Failed stream deletion");
}
try {
boolean isActive = isStreamActive(streamName);
if (!isActive) {
log.info("Succesfully deleted the stream " + streamName);
return;
}
} catch (Exception e) {
try {
sleep(10_000); // 10 secs backoff.
} catch (InterruptedException e1) {}
log.info("Stream {} is not deleted yet, exception: ", streamName, e);
}
}
}
public void checkStreamAndCreateIfNecessary(String streamName) {
if (!isStreamActive(streamName)) {

View file

@ -7,10 +7,12 @@ import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.kinesis.checkpoint.CheckpointConfig;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.FutureUtils;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.config.KCLAppConfig;
@ -24,6 +26,7 @@ import software.amazon.kinesis.retrieval.RetrievalConfig;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
@ -32,6 +35,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
@Slf4j
public class TestConsumer {
@ -58,23 +62,17 @@ public class TestConsumer {
public void run() throws Exception {
/**
* Check if stream is created. If not, create it
*/
// Check if stream is created. If not, create it
StreamExistenceManager streamExistenceManager = new StreamExistenceManager(this.consumerConfig);
streamExistenceManager.checkStreamAndCreateIfNecessary(this.streamName);
/**
* Send dummy data to stream
*/
// Send dummy data to stream
ScheduledExecutorService producerExecutor = Executors.newSingleThreadScheduledExecutor();
ScheduledFuture<?> producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 10, 1, TimeUnit.SECONDS);
ScheduledFuture<?> producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 60, 1, TimeUnit.SECONDS);
RecordValidatorQueue recordValidator = new RecordValidatorQueue();
/**
* Setup configuration of KCL (including DynamoDB and CloudWatch)
*/
// Setup configuration of KCL (including DynamoDB and CloudWatch)
DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, streamName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new TestRecordProcessorFactory(recordValidator));
@ -90,9 +88,7 @@ public class TestConsumer {
processorConfig = configsBuilder.processorConfig();
metricsConfig = configsBuilder.metricsConfig();
/**
* Create Scheduler
*/
// Create Scheduler
Scheduler scheduler = new Scheduler(
checkpointConfig,
coordinatorConfig,
@ -103,39 +99,24 @@ public class TestConsumer {
retrievalConfig
);
/**
* Start record processing of dummy data
*/
try {
// Start record processing of dummy data
Thread schedulerThread = new Thread(scheduler);
schedulerThread.setDaemon(true);
schedulerThread.start();
// Sleep for two minutes to allow the producer/consumer to run and then end the test case.
Thread.sleep(TimeUnit.SECONDS.toMillis(60 * 3));
/**
* Sleep for two minutes to allow the producer/consumer to run and then end the test case.
*/
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(60 * 2));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
/**
* Stops sending dummy data.
*/
// Stops sending dummy data.
log.info("Cancelling producer and shutting down executor.");
producerFuture.cancel(false);
producerExecutor.shutdown();
/**
* Wait a few seconds for the last few records to be processed
*/
// Wait a few seconds for the last few records to be processed
Thread.sleep(TimeUnit.SECONDS.toMillis(10));
/**
* Stops consuming data. Finishes processing the current batch of data already received from Kinesis
* before shutting down.
*/
// Finishes processing current batch of data already received from Kinesis before shutting down.
Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
log.info("Waiting up to 20 seconds for shutdown to complete.");
try {
@ -143,15 +124,13 @@ public class TestConsumer {
} catch (InterruptedException e) {
log.info("Interrupted while waiting for graceful shutdown. Continuing.");
} catch (ExecutionException e) {
log.error("Exception while executing graceful shutdown.", e);
throw new ExecutionException("Exception while executing graceful shutdown. {}", e);
} catch (TimeoutException e) {
log.error("Timeout while waiting for shutdown. Scheduler may not have exited.");
throw new TimeoutException("Timeout while waiting for shutdown. Scheduler may not have exited. {}" + e);
}
log.info("Completed, shutting down now.");
/**
* Validate processed data
*/
// Validate processed data
log.info("The number of expected records is: {}", successfulPutRecords);
RecordValidationStatus errorVal = recordValidator.validateRecords(successfulPutRecords);
if (errorVal == RecordValidationStatus.OUT_OF_ORDER) {
@ -160,6 +139,17 @@ public class TestConsumer {
throw new RuntimeException("There was an error validating the records that were processed. Some records were missing.");
}
log.info("--------------Completed validation of processed records.--------------");
// Clean up resources created
Thread.sleep(TimeUnit.SECONDS.toMillis(30));
deleteResources(streamExistenceManager, dynamoClient);
} catch (Exception e) {
// Test Failed. Clean up resources and then throw exception.
Thread.sleep(TimeUnit.SECONDS.toMillis(30));
deleteResources(streamExistenceManager, dynamoClient);
throw e;
}
}
public void publishRecord() {
@ -172,9 +162,7 @@ public class TestConsumer {
.build();
kinesisClient.putRecord(request).get();
/**
* Increment the payload counter if the putRecord call was successful
*/
// Increment the payload counter if the putRecord call was successful
payloadCounter = payloadCounter.add(new BigInteger("1"));
successfulPutRecords += 1;
log.info("---------Record published, successfulPutRecords is now: {}", successfulPutRecords);
@ -199,4 +187,22 @@ public class TestConsumer {
}
return ByteBuffer.wrap(returnData);
}
private void deleteResources(StreamExistenceManager streamExistenceManager, DynamoDbAsyncClient dynamoDBClient) throws Exception {
log.info("-------------Start deleting test resources.----------------");
streamExistenceManager.deleteStream(this.streamName);
deleteLeaseTable(dynamoDBClient, consumerConfig.getStreamName());
}
private void deleteLeaseTable(DynamoDbAsyncClient dynamoClient, String tableName) throws Exception {
DeleteTableRequest request = DeleteTableRequest.builder().tableName(tableName).build();
try {
FutureUtils.resolveOrCancelFuture(dynamoClient.deleteTable(request), Duration.ofSeconds(60));
} catch (ExecutionException e) {
throw new Exception("Could not delete lease table: {}", e);
} catch (InterruptedException e) {
throw new Exception("Deleting lease table interrupted: {}", e);
}
}
}

View file

@ -1,7 +1,6 @@
package software.amazon.kinesis.utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
@ -14,12 +13,11 @@ import software.amazon.kinesis.retrieval.KinesisClientRecord;
import java.nio.ByteBuffer;
@Slf4j
public class TestRecordProcessor implements ShardRecordProcessor {
private static final String SHARD_ID_MDC_KEY = "ShardId";
private static final Logger log = LoggerFactory.getLogger(TestRecordProcessor.class);
private String shardId;
RecordValidatorQueue recordValidator;