From 74a74c3a00f6e6df0502ff4562bf79b6eded05eb Mon Sep 17 00:00:00 2001 From: Meher Mankikar Date: Fri, 16 Jun 2023 11:41:32 -0700 Subject: [PATCH] Updates based on 2nd round of comments --- README.md | 6 +- .../config/BasicReleaseCanaryConfig.java | 108 ------------- .../amazon/kinesis/config/KCLAppConfig.java | 149 +++++++++++------- .../ReleaseCanaryPollingH1TestConfig.java | 24 +-- .../ReleaseCanaryPollingH2TestConfig.java | 24 +-- .../ReleaseCanaryStreamingTestConfig.java | 7 +- .../BasicStreamingPollingIntegrationTest.java | 2 - .../kinesis/utils/RecordValidatorQueue.java | 21 +-- .../utils/RecordValidatorQueueTest.java | 6 +- .../amazon/kinesis/utils/ReshardOptions.java | 11 ++ .../kinesis/utils/StreamExistenceManager.java | 30 ++++ .../amazon/kinesis/utils/TestConsumer.java | 148 ++++++++--------- .../kinesis/utils/TestRecordProcessor.java | 6 +- 13 files changed, 228 insertions(+), 314 deletions(-) delete mode 100644 amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/BasicReleaseCanaryConfig.java create mode 100644 amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/ReshardOptions.java diff --git a/README.md b/README.md index 34b9e003..84f4d2f6 100644 --- a/README.md +++ b/README.md @@ -38,9 +38,9 @@ After you've downloaded the code from GitHub, you can build it using Maven. To d ## Running Integration Tests -To run integration tests to test any changes to KCL, you can use this command: `mvn -Dit.test=*IntegrationTest verify`. -This will look for default AWS credentials in your local `.aws/credentials`. If you want to override these -credentials, you can provide the name of an IAM user as a string using this command: `mvn -Dit.test=*IntegrationTest -Dcredentials="" verify`. +To run integration tests: `mvn -Dit.test=*IntegrationTest verify`. +This will look for a default AWS profile specified in your local `.aws/credentials`. +Optionally, you can provide the name of an IAM user to run tests with as a string using this command: `mvn -Dit.test=*IntegrationTest -Dcredentials="" verify`. ## Integration with the Kinesis Producer Library For producer-side developers using the **[Kinesis Producer Library (KPL)][kinesis-guide-kpl]**, the KCL integrates without additional effort. When the KCL retrieves an aggregated Amazon Kinesis record consisting of multiple KPL user records, it will automatically invoke the KPL to extract the individual user records before returning them to the user. diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/BasicReleaseCanaryConfig.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/BasicReleaseCanaryConfig.java deleted file mode 100644 index edde0734..00000000 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/BasicReleaseCanaryConfig.java +++ /dev/null @@ -1,108 +0,0 @@ -package software.amazon.kinesis.config; - -import lombok.extern.slf4j.Slf4j; -import software.amazon.awssdk.http.Protocol; -import software.amazon.awssdk.regions.Region; -import software.amazon.kinesis.common.InitialPositionInStream; -import software.amazon.kinesis.common.InitialPositionInStreamExtended; -import software.amazon.kinesis.retrieval.RetrievalConfig; - -import java.io.IOException; -import java.net.URISyntaxException; -import java.time.Instant; -import java.time.LocalDateTime; -import java.time.ZoneId; -import java.util.Date; - -/** - * Basic config for a release canary (streaming) with default settings - */ -@Slf4j -public class BasicReleaseCanaryConfig implements KCLAppConfig { - @Override - public String getStreamName() { - return ""; - } - - @Override - public int getShardCount() { - return 10; - } - - @Override - public String getApplicationName() { - return ""; - } - - @Override - public String getEndpoint() { - return ""; - } - - @Override - public Region getRegion() { - return Region.US_WEST_2; - } - - - /** - * This will get the credentials that are provided in the maven command - * when running integration tests if any are provided through -Dcredentials=iamUser - * Otherwise, iamUser will be null and the test will search for default credentials - * in the test environment. - */ - @Override - public String getProfile() { - String iamUser = System.getProperty("credentials"); - return iamUser; - } - - @Override - public InitialPositionInStream getInitialPosition() { - return InitialPositionInStream.TRIM_HORIZON; - } - - @Override - public Protocol getConsumerProtocol() { - return Protocol.HTTP1_1; - } - - @Override - public Protocol getProducerProtocol() { - return Protocol.HTTP1_1; - } - - @Override - public ProducerConfig getProducerConfig() { - return ProducerConfig.builder() - .isBatchPut(false) - .batchSize(1) - .recordSizeKB(60) - .callPeriodMills(100) - .build(); - } - - @Override - public ReshardConfig getReshardConfig() { - return null; - } - - @Override - public RetrievalConfig getRetrievalConfig() throws IOException, URISyntaxException { - LocalDateTime d = LocalDateTime.now(); - d = d.minusMinutes(5); - Instant instant = d.atZone(ZoneId.systemDefault()).toInstant(); - Date startStreamTime = Date.from(instant); - - InitialPositionInStreamExtended initialPosition = InitialPositionInStreamExtended - .newInitialPositionAtTimestamp(startStreamTime); - - /** - * Default is a streaming consumer - */ - RetrievalConfig config = getConfigsBuilder().retrievalConfig(); - config.initialPositionInStreamExtended(initialPosition); - - return config; - } -} diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLAppConfig.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLAppConfig.java index cf1902ab..9dff8f74 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLAppConfig.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLAppConfig.java @@ -2,14 +2,15 @@ package software.amazon.kinesis.config; import lombok.Value; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.utils.RecordValidatorQueue; +import software.amazon.kinesis.utils.ReshardOptions; import software.amazon.kinesis.utils.TestRecordProcessorFactory; import lombok.Builder; import software.amazon.awssdk.arns.Arn; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider; import software.amazon.awssdk.http.Protocol; -import software.amazon.awssdk.http.SdkHttpConfigurationOption; import software.amazon.awssdk.http.async.SdkAsyncHttpClient; import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; import software.amazon.awssdk.regions.Region; @@ -31,133 +32,156 @@ import java.net.URISyntaxException; import java.net.UnknownHostException; import java.util.Optional; -public interface KCLAppConfig { +/** + * Default configuration for a producer or consumer used in integration tests. + * Producer: puts records of size 60 KB at an interval of 100 ms + * Consumer: streaming configuration (vs polling) that starts processing records put on the shard 5 minutes before + * the start of the test + */ +public abstract class KCLAppConfig { - String getStreamName(); + /** + * Name used for test stream and DDB table + */ + public abstract String getStreamName(); - default String getStreamArn() { + public String getStreamArn() { return null; } - int getShardCount(); + public int getShardCount() { return 4; } - String getApplicationName(); - - String getEndpoint(); - - Region getRegion(); + public String getEndpoint() { return ""; } + public Region getRegion() { return Region.US_WEST_2; } /** * "default" profile, should match with profiles listed in "cat ~/.aws/config" */ - String getProfile(); + public String getProfile() { + String iamUser = System.getProperty("credentials"); + return iamUser; + } - InitialPositionInStream getInitialPosition(); + public InitialPositionInStream getInitialPosition() { + return InitialPositionInStream.TRIM_HORIZON; + } - Protocol getConsumerProtocol(); + public Protocol getConsumerProtocol() { + return Protocol.HTTP1_1; + } - Protocol getProducerProtocol(); + public Protocol getProducerProtocol() { + return Protocol.HTTP1_1; + } - ProducerConfig getProducerConfig(); + public ProducerConfig getProducerConfig() { + return ProducerConfig.builder() + .isBatchPut(false) + .batchSize(1) + .recordSizeKB(60) + .callPeriodMills(100) + .build(); + } - ReshardConfig getReshardConfig(); + public ReshardConfig getReshardConfig() { + return null; + } - default KinesisAsyncClient buildConsumerClient() throws URISyntaxException, IOException { + public KinesisAsyncClient buildConsumerClient() throws URISyntaxException, IOException { return buildAsyncKinesisClient(getConsumerProtocol()); } - default KinesisAsyncClient buildProducerClient() throws URISyntaxException, IOException { + public KinesisAsyncClient buildProducerClient() throws URISyntaxException, IOException { return buildAsyncKinesisClient(getProducerProtocol()); } - default KinesisAsyncClient buildAsyncKinesisClient(Protocol protocol) throws URISyntaxException, IOException { + public KinesisAsyncClient buildAsyncKinesisClient(Protocol protocol) throws URISyntaxException, IOException { return buildAsyncKinesisClient(Optional.ofNullable(protocol)); } - default KinesisAsyncClient buildAsyncKinesisClient(Optional protocol) throws URISyntaxException, IOException { + private AwsCredentialsProvider getCredentialsProvider() { + return (getProfile() != null) ? + ProfileCredentialsProvider.builder().profileName(getProfile()).build() : DefaultCredentialsProvider.create(); + } - /** - * Setup H2 client config. - */ + public KinesisAsyncClient buildAsyncKinesisClient(Optional protocol) throws URISyntaxException, IOException { + + // Setup H2 client config. final NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder() .maxConcurrency(Integer.MAX_VALUE); - /** - * If not present, defaults to HTTP1_1 - */ + // If not present, defaults to HTTP1_1 if (protocol.isPresent()) { builder.protocol(protocol.get()); } final SdkAsyncHttpClient sdkAsyncHttpClient = - builder.buildWithDefaults(AttributeMap.builder().put(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, true).build()); + builder.buildWithDefaults(AttributeMap.builder().build()); - /** - * Setup client builder by default values - */ + // Setup client builder by default values final KinesisAsyncClientBuilder kinesisAsyncClientBuilder = KinesisAsyncClient.builder().region(getRegion()); kinesisAsyncClientBuilder.httpClient(sdkAsyncHttpClient); - AwsCredentialsProvider credentialsProvider = (getProfile() != null) ? - ProfileCredentialsProvider.builder().profileName(getProfile()).build() : DefaultCredentialsProvider.create(); - kinesisAsyncClientBuilder.credentialsProvider( credentialsProvider ); + kinesisAsyncClientBuilder.credentialsProvider(getCredentialsProvider()); return kinesisAsyncClientBuilder.build(); } - default DynamoDbAsyncClient buildAsyncDynamoDbClient() throws IOException { + public DynamoDbAsyncClient buildAsyncDynamoDbClient() throws IOException { final DynamoDbAsyncClientBuilder builder = DynamoDbAsyncClient.builder().region(getRegion()); - AwsCredentialsProvider credentialsProvider = (getProfile() != null) ? - ProfileCredentialsProvider.builder().profileName(getProfile()).build() : DefaultCredentialsProvider.create(); - builder.credentialsProvider(credentialsProvider); - + builder.credentialsProvider(getCredentialsProvider()); return builder.build(); } - default CloudWatchAsyncClient buildAsyncCloudWatchClient() throws IOException { + public CloudWatchAsyncClient buildAsyncCloudWatchClient() throws IOException { final CloudWatchAsyncClientBuilder builder = CloudWatchAsyncClient.builder().region(getRegion()); - AwsCredentialsProvider credentialsProvider = (getProfile() != null) ? - ProfileCredentialsProvider.builder().profileName(getProfile()).build() : DefaultCredentialsProvider.create(); - builder.credentialsProvider(credentialsProvider); - + builder.credentialsProvider(getCredentialsProvider()); return builder.build(); } - default String getWorkerId() throws UnknownHostException { + public String getWorkerId() throws UnknownHostException { return Inet4Address.getLocalHost().getHostName(); } - default RecordValidatorQueue getRecordValidator() { + public RecordValidatorQueue getRecordValidator() { return new RecordValidatorQueue(); } - default ShardRecordProcessorFactory getShardRecordProcessorFactory() { + public ShardRecordProcessorFactory getShardRecordProcessorFactory() { return new TestRecordProcessorFactory(getRecordValidator()); } - default ConfigsBuilder getConfigsBuilder() throws IOException, URISyntaxException { + public ConfigsBuilder getConfigsBuilder() throws IOException, URISyntaxException { final String workerId = getWorkerId(); if (getStreamArn() == null) { - return new ConfigsBuilder(getStreamName(), getApplicationName(), buildConsumerClient(), buildAsyncDynamoDbClient(), + return new ConfigsBuilder(getStreamName(), getStreamName(), buildConsumerClient(), buildAsyncDynamoDbClient(), buildAsyncCloudWatchClient(), workerId, getShardRecordProcessorFactory()); } else { - return new ConfigsBuilder(Arn.fromString(getStreamArn()), getApplicationName(), buildConsumerClient(), buildAsyncDynamoDbClient(), + return new ConfigsBuilder(Arn.fromString(getStreamArn()), getStreamName(), buildConsumerClient(), buildAsyncDynamoDbClient(), buildAsyncCloudWatchClient(), workerId, getShardRecordProcessorFactory()); } } - RetrievalConfig getRetrievalConfig() throws IOException, URISyntaxException; + public RetrievalConfig getRetrievalConfig() throws IOException, URISyntaxException { + InitialPositionInStreamExtended initialPosition = InitialPositionInStreamExtended + .newInitialPosition(getInitialPosition()); + + // Default is a streaming consumer + RetrievalConfig config = getConfigsBuilder().retrievalConfig(); + config.initialPositionInStreamExtended(initialPosition); + return config; + } /** * Configure ingress load (batch size, record size, and calling interval) */ @Value @Builder - class ProducerConfig { + static class ProducerConfig { private boolean isBatchPut; private int batchSize; private int recordSizeKB; @@ -166,19 +190,24 @@ public interface KCLAppConfig { /** * Description of the method of resharding for a test case - *

- * reshardingFactorCycle: lists the scales by which the number of shards in a stream will be updated - * in sequence. e.g {2.0, 0.5} means that the number of shards will first be doubled, then halved - *

- * numReshardCycles: the number of resharding cycles that will be executed in a test] - *

- * reshardFrequencyMillis: the period of time between reshard cycles (in milliseconds) */ @Value @Builder - class ReshardConfig { - private double[] reshardingFactorCycle; + static class ReshardConfig { + /** + * reshardingFactorCycle: lists the order or reshards that will be done during one reshard cycle + * e.g {SPLIT, MERGE} means that the number of shards will first be doubled, then halved + */ + private ReshardOptions[] reshardingFactorCycle; + + /** + * numReshardCycles: the number of resharding cycles that will be executed in a test + */ private int numReshardCycles; + + /** + * reshardFrequencyMillis: the period of time between reshard cycles (in milliseconds) + */ private long reshardFrequencyMillis; } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryPollingH1TestConfig.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryPollingH1TestConfig.java index 1df852d3..074284ba 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryPollingH1TestConfig.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryPollingH1TestConfig.java @@ -7,28 +7,14 @@ import software.amazon.kinesis.retrieval.polling.PollingConfig; import java.io.IOException; import java.net.URISyntaxException; -import java.time.Instant; -import java.time.LocalDateTime; -import java.time.ZoneId; -import java.util.Date; /** * Config for a polling consumer with HTTP protocol of HTTP1 */ -public class ReleaseCanaryPollingH1TestConfig extends BasicReleaseCanaryConfig { +public class ReleaseCanaryPollingH1TestConfig extends KCLAppConfig { @Override public String getStreamName() { - return "KCLReleaseCanary2XPollingH1TestConfig"; - } - - @Override - public int getShardCount() { - return 20; - } - - @Override - public String getApplicationName() { - return "KCLReleaseCanary2XPollingH1TestConfigApplication"; + return "KCLReleaseCanary2XPollingH1TestStream"; } @Override @@ -38,13 +24,9 @@ public class ReleaseCanaryPollingH1TestConfig extends BasicReleaseCanaryConfig { @Override public RetrievalConfig getRetrievalConfig() throws IOException, URISyntaxException { - LocalDateTime d = LocalDateTime.now(); - d = d.minusMinutes(5); - Instant instant = d.atZone(ZoneId.systemDefault()).toInstant(); - Date startStreamTime = Date.from(instant); InitialPositionInStreamExtended initialPosition = InitialPositionInStreamExtended - .newInitialPositionAtTimestamp(startStreamTime); + .newInitialPosition(getInitialPosition()); RetrievalConfig config = getConfigsBuilder().retrievalConfig(); config.initialPositionInStreamExtended(initialPosition); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryPollingH2TestConfig.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryPollingH2TestConfig.java index cd824475..7939df5b 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryPollingH2TestConfig.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryPollingH2TestConfig.java @@ -7,28 +7,14 @@ import software.amazon.kinesis.retrieval.polling.PollingConfig; import java.io.IOException; import java.net.URISyntaxException; -import java.time.Instant; -import java.time.LocalDateTime; -import java.time.ZoneId; -import java.util.Date; /** * Config for a polling consumer with HTTP protocol of HTTP2 */ -public class ReleaseCanaryPollingH2TestConfig extends BasicReleaseCanaryConfig { +public class ReleaseCanaryPollingH2TestConfig extends KCLAppConfig { @Override public String getStreamName() { - return "KCLTest3"; - } - - @Override - public int getShardCount() { - return 20; - } - - @Override - public String getApplicationName() { - return "KCLReleaseCanary2XPollingH2TestApplication"; + return "KCLReleaseCanary2XPollingH2TestStream"; } @Override @@ -38,13 +24,9 @@ public class ReleaseCanaryPollingH2TestConfig extends BasicReleaseCanaryConfig { @Override public RetrievalConfig getRetrievalConfig() throws IOException, URISyntaxException { - LocalDateTime d = LocalDateTime.now(); - d = d.minusMinutes(5); - Instant instant = d.atZone(ZoneId.systemDefault()).toInstant(); - Date startStreamTime = Date.from(instant); InitialPositionInStreamExtended initialPosition = InitialPositionInStreamExtended - .newInitialPositionAtTimestamp(startStreamTime); + .newInitialPosition(getInitialPosition()); RetrievalConfig config = getConfigsBuilder().retrievalConfig(); config.initialPositionInStreamExtended(initialPosition); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryStreamingTestConfig.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryStreamingTestConfig.java index 2d6b5c73..c92ab087 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryStreamingTestConfig.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryStreamingTestConfig.java @@ -5,17 +5,12 @@ import software.amazon.awssdk.http.Protocol; /** * Config for a streaming consumer with HTTP protocol of HTTP2 */ -public class ReleaseCanaryStreamingTestConfig extends BasicReleaseCanaryConfig { +public class ReleaseCanaryStreamingTestConfig extends KCLAppConfig { @Override public String getStreamName() { return "KCLReleaseCanary2XStreamingTestStream"; } - @Override - public String getApplicationName() { - return "KCLReleaseCanary2XStreamingTestApplication"; - } - @Override public Protocol getConsumerProtocol() { return Protocol.HTTP2; diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/BasicStreamingPollingIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/BasicStreamingPollingIntegrationTest.java index b0e4fa64..e19501b2 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/BasicStreamingPollingIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/BasicStreamingPollingIntegrationTest.java @@ -1,6 +1,5 @@ package software.amazon.kinesis.lifecycle; -import lombok.extern.slf4j.Slf4j; import org.junit.Test; import software.amazon.kinesis.config.KCLAppConfig; import software.amazon.kinesis.config.ReleaseCanaryPollingH1TestConfig; @@ -8,7 +7,6 @@ import software.amazon.kinesis.config.ReleaseCanaryPollingH2TestConfig; import software.amazon.kinesis.config.ReleaseCanaryStreamingTestConfig; import software.amazon.kinesis.utils.TestConsumer; -@Slf4j public class BasicStreamingPollingIntegrationTest { @Test diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/RecordValidatorQueue.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/RecordValidatorQueue.java index 29a2b373..936b4ad1 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/RecordValidatorQueue.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/RecordValidatorQueue.java @@ -27,9 +27,8 @@ public class RecordValidatorQueue { } public RecordValidationStatus validateRecords(int trueTotalShardCount) { - /** - * Validate that each List in the HashMap has data records in increasing order - */ + + // Validate that each List in the HashMap has data records in increasing order boolean incOrder = true; for (Map.Entry> entry : dict.entrySet()) { List recordsPerShard = entry.getValue(); @@ -49,16 +48,12 @@ public class RecordValidatorQueue { } } - /** - * If this is true, then there was some record that was processed out of order - */ + // If this is true, then there was some record that was processed out of order if (!incOrder) { return RecordValidationStatus.OUT_OF_ORDER; } - /** - * Validate that no records are missing over all shards - */ + // Validate that no records are missing over all shards int totalShardCount = 0; for (Map.Entry> entry : dict.entrySet()) { List recordsPerShard = entry.getValue(); @@ -66,17 +61,13 @@ public class RecordValidatorQueue { totalShardCount += noDupRecords.size(); } - /** - * If this is true, then there was some record that was missed during processing. - */ + // If this is true, then there was some record that was missed during processing. if (totalShardCount != trueTotalShardCount) { log.error("Failed to get correct number of records processed. Should be {} but was {}", trueTotalShardCount, totalShardCount); return RecordValidationStatus.MISSING_RECORD; } - /** - * Record validation succeeded. - */ + // Record validation succeeded. return RecordValidationStatus.NO_ERROR; } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/RecordValidatorQueueTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/RecordValidatorQueueTest.java index 4a25fbbd..e64565d8 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/RecordValidatorQueueTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/RecordValidatorQueueTest.java @@ -10,7 +10,7 @@ public class RecordValidatorQueueTest { private static final String SHARD_ID = "ABC"; @Test - public void validationFailedRecordOutOfOrderTest() { + public void testValidationFailedRecordOutOfOrder() { recordValidator.add(SHARD_ID, "0"); recordValidator.add(SHARD_ID, "1"); recordValidator.add(SHARD_ID, "3"); @@ -21,7 +21,7 @@ public class RecordValidatorQueueTest { } @Test - public void validationFailedMissingRecordTest() { + public void testValidationFailedMissingRecord() { recordValidator.add(SHARD_ID, "0"); recordValidator.add(SHARD_ID, "1"); recordValidator.add(SHARD_ID, "2"); @@ -32,7 +32,7 @@ public class RecordValidatorQueueTest { } @Test - public void validRecordsTest() { + public void testValidRecords() { recordValidator.add(SHARD_ID, "0"); recordValidator.add(SHARD_ID, "1"); recordValidator.add(SHARD_ID, "2"); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/ReshardOptions.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/ReshardOptions.java new file mode 100644 index 00000000..fbf5f68b --- /dev/null +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/ReshardOptions.java @@ -0,0 +1,11 @@ +package software.amazon.kinesis.utils; + +/** + * Specifies the types of resharding possible in integration tests + * Split doubles the number of shards. + * Merge halves the number of shards. + */ +public enum ReshardOptions { + SPLIT, + MERGE +} diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/StreamExistenceManager.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/StreamExistenceManager.java index b6ffc3e8..ee0450a0 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/StreamExistenceManager.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/StreamExistenceManager.java @@ -4,6 +4,7 @@ import lombok.Value; import lombok.extern.slf4j.Slf4j; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest; +import software.amazon.awssdk.services.kinesis.model.DeleteStreamRequest; import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryRequest; import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryResponse; import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; @@ -84,6 +85,35 @@ public class StreamExistenceManager { } } + public void deleteStream(String streamName) { + DeleteStreamRequest request = DeleteStreamRequest.builder().streamName(streamName).enforceConsumerDeletion(true).build(); + try{ + client.deleteStream(request).get(30, TimeUnit.SECONDS); + } catch (Exception e) { + throw new RuntimeException("Failed to delete stream with name " + streamName, e); + } + + int i = 0; + while (true) { + i++; + if (i > 100) { + throw new RuntimeException("Failed stream deletion"); + } + try { + boolean isActive = isStreamActive(streamName); + if (!isActive) { + log.info("Succesfully deleted the stream " + streamName); + return; + } + } catch (Exception e) { + try { + sleep(10_000); // 10 secs backoff. + } catch (InterruptedException e1) {} + log.info("Stream {} is not deleted yet, exception: ", streamName, e); + } + } + } + public void checkStreamAndCreateIfNecessary(String streamName) { if (!isStreamActive(streamName)) { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestConsumer.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestConsumer.java index f2581b39..5485524d 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestConsumer.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestConsumer.java @@ -7,10 +7,12 @@ import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; +import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.PutRecordRequest; import software.amazon.kinesis.checkpoint.CheckpointConfig; import software.amazon.kinesis.common.ConfigsBuilder; +import software.amazon.kinesis.common.FutureUtils; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.config.KCLAppConfig; @@ -24,6 +26,7 @@ import software.amazon.kinesis.retrieval.RetrievalConfig; import java.math.BigInteger; import java.nio.ByteBuffer; +import java.time.Duration; import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; @@ -32,6 +35,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; @Slf4j public class TestConsumer { @@ -58,23 +62,17 @@ public class TestConsumer { public void run() throws Exception { - /** - * Check if stream is created. If not, create it - */ + // Check if stream is created. If not, create it StreamExistenceManager streamExistenceManager = new StreamExistenceManager(this.consumerConfig); streamExistenceManager.checkStreamAndCreateIfNecessary(this.streamName); - /** - * Send dummy data to stream - */ + // Send dummy data to stream ScheduledExecutorService producerExecutor = Executors.newSingleThreadScheduledExecutor(); - ScheduledFuture producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 10, 1, TimeUnit.SECONDS); + ScheduledFuture producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 60, 1, TimeUnit.SECONDS); RecordValidatorQueue recordValidator = new RecordValidatorQueue(); - /** - * Setup configuration of KCL (including DynamoDB and CloudWatch) - */ + // Setup configuration of KCL (including DynamoDB and CloudWatch) DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, streamName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new TestRecordProcessorFactory(recordValidator)); @@ -90,9 +88,7 @@ public class TestConsumer { processorConfig = configsBuilder.processorConfig(); metricsConfig = configsBuilder.metricsConfig(); - /** - * Create Scheduler - */ + // Create Scheduler Scheduler scheduler = new Scheduler( checkpointConfig, coordinatorConfig, @@ -103,63 +99,57 @@ public class TestConsumer { retrievalConfig ); - /** - * Start record processing of dummy data - */ - Thread schedulerThread = new Thread(scheduler); - schedulerThread.setDaemon(true); - schedulerThread.start(); - - - /** - * Sleep for two minutes to allow the producer/consumer to run and then end the test case. - */ try { - Thread.sleep(TimeUnit.SECONDS.toMillis(60 * 2)); - } catch (InterruptedException e) { - throw new RuntimeException(e); + // Start record processing of dummy data + Thread schedulerThread = new Thread(scheduler); + schedulerThread.setDaemon(true); + schedulerThread.start(); + + // Sleep for two minutes to allow the producer/consumer to run and then end the test case. + Thread.sleep(TimeUnit.SECONDS.toMillis(60 * 3)); + + // Stops sending dummy data. + log.info("Cancelling producer and shutting down executor."); + producerFuture.cancel(false); + producerExecutor.shutdown(); + + // Wait a few seconds for the last few records to be processed + Thread.sleep(TimeUnit.SECONDS.toMillis(10)); + + // Finishes processing current batch of data already received from Kinesis before shutting down. + Future gracefulShutdownFuture = scheduler.startGracefulShutdown(); + log.info("Waiting up to 20 seconds for shutdown to complete."); + try { + gracefulShutdownFuture.get(20, TimeUnit.SECONDS); + } catch (InterruptedException e) { + log.info("Interrupted while waiting for graceful shutdown. Continuing."); + } catch (ExecutionException e) { + throw new ExecutionException("Exception while executing graceful shutdown. {}", e); + } catch (TimeoutException e) { + throw new TimeoutException("Timeout while waiting for shutdown. Scheduler may not have exited. {}" + e); + } + log.info("Completed, shutting down now."); + + // Validate processed data + log.info("The number of expected records is: {}", successfulPutRecords); + RecordValidationStatus errorVal = recordValidator.validateRecords(successfulPutRecords); + if (errorVal == RecordValidationStatus.OUT_OF_ORDER) { + throw new RuntimeException("There was an error validating the records that were processed. The records were out of order"); + } else if (errorVal == RecordValidationStatus.MISSING_RECORD) { + throw new RuntimeException("There was an error validating the records that were processed. Some records were missing."); + } + log.info("--------------Completed validation of processed records.--------------"); + + // Clean up resources created + Thread.sleep(TimeUnit.SECONDS.toMillis(30)); + deleteResources(streamExistenceManager, dynamoClient); + + } catch (Exception e) { + // Test Failed. Clean up resources and then throw exception. + Thread.sleep(TimeUnit.SECONDS.toMillis(30)); + deleteResources(streamExistenceManager, dynamoClient); + throw e; } - - /** - * Stops sending dummy data. - */ - log.info("Cancelling producer and shutting down executor."); - producerFuture.cancel(false); - producerExecutor.shutdown(); - - /** - * Wait a few seconds for the last few records to be processed - */ - Thread.sleep(TimeUnit.SECONDS.toMillis(10)); - - /** - * Stops consuming data. Finishes processing the current batch of data already received from Kinesis - * before shutting down. - */ - Future gracefulShutdownFuture = scheduler.startGracefulShutdown(); - log.info("Waiting up to 20 seconds for shutdown to complete."); - try { - gracefulShutdownFuture.get(20, TimeUnit.SECONDS); - } catch (InterruptedException e) { - log.info("Interrupted while waiting for graceful shutdown. Continuing."); - } catch (ExecutionException e) { - log.error("Exception while executing graceful shutdown.", e); - } catch (TimeoutException e) { - log.error("Timeout while waiting for shutdown. Scheduler may not have exited."); - } - log.info("Completed, shutting down now."); - - /** - * Validate processed data - */ - log.info("The number of expected records is: {}", successfulPutRecords); - RecordValidationStatus errorVal = recordValidator.validateRecords(successfulPutRecords); - if (errorVal == RecordValidationStatus.OUT_OF_ORDER) { - throw new RuntimeException("There was an error validating the records that were processed. The records were out of order"); - } else if (errorVal == RecordValidationStatus.MISSING_RECORD) { - throw new RuntimeException("There was an error validating the records that were processed. Some records were missing."); - } - log.info("--------------Completed validation of processed records.--------------"); } public void publishRecord() { @@ -172,9 +162,7 @@ public class TestConsumer { .build(); kinesisClient.putRecord(request).get(); - /** - * Increment the payload counter if the putRecord call was successful - */ + // Increment the payload counter if the putRecord call was successful payloadCounter = payloadCounter.add(new BigInteger("1")); successfulPutRecords += 1; log.info("---------Record published, successfulPutRecords is now: {}", successfulPutRecords); @@ -199,4 +187,22 @@ public class TestConsumer { } return ByteBuffer.wrap(returnData); } + + private void deleteResources(StreamExistenceManager streamExistenceManager, DynamoDbAsyncClient dynamoDBClient) throws Exception { + log.info("-------------Start deleting test resources.----------------"); + streamExistenceManager.deleteStream(this.streamName); + deleteLeaseTable(dynamoDBClient, consumerConfig.getStreamName()); + } + + private void deleteLeaseTable(DynamoDbAsyncClient dynamoClient, String tableName) throws Exception { + DeleteTableRequest request = DeleteTableRequest.builder().tableName(tableName).build(); + try { + FutureUtils.resolveOrCancelFuture(dynamoClient.deleteTable(request), Duration.ofSeconds(60)); + } catch (ExecutionException e) { + throw new Exception("Could not delete lease table: {}", e); + } catch (InterruptedException e) { + throw new Exception("Deleting lease table interrupted: {}", e); + } + + } } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestRecordProcessor.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestRecordProcessor.java index c49a268d..32b53743 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestRecordProcessor.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestRecordProcessor.java @@ -1,7 +1,6 @@ package software.amazon.kinesis.utils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import lombok.extern.slf4j.Slf4j; import org.slf4j.MDC; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; @@ -14,12 +13,11 @@ import software.amazon.kinesis.retrieval.KinesisClientRecord; import java.nio.ByteBuffer; +@Slf4j public class TestRecordProcessor implements ShardRecordProcessor { private static final String SHARD_ID_MDC_KEY = "ShardId"; - private static final Logger log = LoggerFactory.getLogger(TestRecordProcessor.class); - private String shardId; RecordValidatorQueue recordValidator;