diff --git a/README.md b/README.md index 6328e115..6d90dde8 100644 --- a/README.md +++ b/README.md @@ -32,15 +32,17 @@ Please open an issue if you have any questions. ## Building from Source After you've downloaded the code from GitHub, you can build it using Maven. To disable GPG signing in the build, use - this command: `mvn clean install -Dgpg.skip=true`. Note: This command runs Integration tests, which in turn creates AWS - resources (which requires manual cleanup). Integration tests require valid AWS credentials need to be discovered at - runtime. To skip running integration tests, add ` -DskipITs` option to the build command. +this command: `mvn clean install -Dgpg.skip=true`. +Note: This command does not run integration tests. ## Running Integration Tests -To run integration tests: `mvn -Dit.test=*IntegrationTest verify`. -This will look for a default AWS profile specified in your local `.aws/credentials`. -Optionally, you can provide the name of an IAM user/role to run tests with as a string using this command: `mvn -Dit.test=*IntegrationTest -DawsProfile="" verify`. +Note that running integration tests create AWS resources. +Integration tests require valid AWS credentials need to be discovered at runtime. +To run all integration tests: `mvn install package -DskipITs=false`. +To run one integration tests: `mvn -Dit.test=*IntegrationTest -DskipITs=false verify` +This will look for a default AWS profile specified in your local `.aws/credentials`. +Optionally, you can provide the name of an IAM user/role to run tests with as a string using this command: `mvn install package -DskipITs=false -DawsProfile=""`. ## Integration with the Kinesis Producer Library For producer-side developers using the **[Kinesis Producer Library (KPL)][kinesis-guide-kpl]**, the KCL integrates without additional effort. When the KCL retrieves an aggregated Amazon Kinesis record consisting of multiple KPL user records, it will automatically invoke the KPL to extract the individual user records before returning them to the user. diff --git a/amazon-kinesis-client/pom.xml b/amazon-kinesis-client/pom.xml index 3eb8ad94..dd761b33 100644 --- a/amazon-kinesis-client/pom.xml +++ b/amazon-kinesis-client/pom.xml @@ -52,6 +52,7 @@ ${project.build.directory}/test-lib 2.0.7 1.1.14 + true @@ -199,6 +200,7 @@ maven-surefire-plugin 2.22.2 + ${skipITs} **/*IntegrationTest.java diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestConsumer.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/application/TestConsumer.java similarity index 66% rename from amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestConsumer.java rename to amazon-kinesis-client/src/test/java/software/amazon/kinesis/application/TestConsumer.java index 223ca99a..d2ddd06d 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestConsumer.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/application/TestConsumer.java @@ -1,13 +1,21 @@ -package software.amazon.kinesis.utils; +package software.amazon.kinesis.application; import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.ToString; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.RandomStringUtils; import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryRequest; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryResponse; import software.amazon.awssdk.services.kinesis.model.PutRecordRequest; +import software.amazon.awssdk.services.kinesis.model.ScalingType; +import software.amazon.awssdk.services.kinesis.model.UpdateShardCountRequest; +import software.amazon.awssdk.services.kinesis.model.UpdateShardCountResponse; import software.amazon.kinesis.checkpoint.CheckpointConfig; import software.amazon.kinesis.common.ConfigsBuilder; import software.amazon.kinesis.common.InitialPositionInStreamExtended; @@ -19,9 +27,16 @@ import software.amazon.kinesis.lifecycle.LifecycleConfig; import software.amazon.kinesis.metrics.MetricsConfig; import software.amazon.kinesis.processor.ProcessorConfig; import software.amazon.kinesis.retrieval.RetrievalConfig; +import software.amazon.kinesis.utils.LeaseTableManager; +import software.amazon.kinesis.utils.RecordValidationStatus; +import software.amazon.kinesis.utils.ReshardOptions; +import software.amazon.kinesis.utils.StreamExistenceManager; import java.math.BigInteger; import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.Queue; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -79,7 +94,11 @@ public class TestConsumer { startConsumer(); // Sleep for three minutes to allow the producer/consumer to run and then end the test case. - Thread.sleep(TimeUnit.SECONDS.toMillis(60 * 3)); + if (consumerConfig.getReshardConfig() == null) { + Thread.sleep(TimeUnit.SECONDS.toMillis(60 * 3)); + } else { + Thread.sleep(TimeUnit.SECONDS.toMillis(60 * 8)); + } // Stops sending dummy data. stopProducer(); @@ -115,9 +134,28 @@ public class TestConsumer { } private void startProducer() { - // Send dummy data to stream - this.producerExecutor = Executors.newSingleThreadScheduledExecutor(); - this.producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 60, 1, TimeUnit.SECONDS); + producerExecutor = Executors.newSingleThreadScheduledExecutor(); + producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 10, 1, TimeUnit.SECONDS); + + // Reshard logic if required for the test + if (consumerConfig.getReshardConfig() != null) { + log.info("------------------------- Reshard Config found -----------------------------"); + final Queue reshardQueue = new LinkedList<>(Arrays.asList(consumerConfig.getReshardConfig().getReshardingFactorCycle())); + int totalRotations = reshardQueue.size() * (consumerConfig.getReshardConfig().getNumReshardCycles() - 1); + + final StreamScaler s = new StreamScaler(kinesisClient, consumerConfig.getStreamName(), reshardQueue, totalRotations, consumerConfig); + + Runnable task1 = () -> { + log.info("----------------------Starting new reshard------------------------------"); + s.run(); + }; + + // Split shard + producerExecutor.schedule(task1, 2, TimeUnit.MINUTES); + + // Merge shard + producerExecutor.schedule(task1, 6, TimeUnit.MINUTES); + } } private void setUpConsumerResources() throws Exception { @@ -152,6 +190,12 @@ public class TestConsumer { this.consumerFuture = consumerExecutor.schedule(scheduler, 0, TimeUnit.SECONDS); } + public void stopProducer() { + log.info("Cancelling producer and shutting down executor."); + producerFuture.cancel(false); + producerExecutor.shutdown(); + } + public void publishRecord() { final PutRecordRequest request; try { @@ -184,12 +228,6 @@ public class TestConsumer { return ByteBuffer.wrap(returnData); } - private void stopProducer() { - log.info("Cancelling producer and shutting down executor."); - producerFuture.cancel(false); - producerExecutor.shutdown(); - } - private void awaitConsumerFinish() throws Exception { Future gracefulShutdownFuture = scheduler.startGracefulShutdown(); log.info("Waiting up to 20 seconds for shutdown to complete."); @@ -198,7 +236,7 @@ public class TestConsumer { } catch (InterruptedException e) { log.info("Interrupted while waiting for graceful shutdown. Continuing."); } catch (ExecutionException | TimeoutException e) { - throw e; + scheduler.shutdown(); } log.info("Completed, shutting down now."); } @@ -220,4 +258,53 @@ public class TestConsumer { log.info("-------------Finished deleting resources.----------------"); } + @Data + @ToString + @AllArgsConstructor + public static class StreamScaler implements Runnable { + private final KinesisAsyncClient client; + private final String streamName; + private final Queue scalingFactor; + private int totalRotations; + private KCLAppConfig testConfig; + + @Override + public void run() { + try { + log.info("----------------------------Starting stream scale----------------------"); + if (!scalingFactor.isEmpty()) { + log.info("Starting stream scaling with params : {}", this); + final DescribeStreamSummaryRequest describeStreamSummaryRequest = DescribeStreamSummaryRequest.builder().streamName(streamName).build(); + final DescribeStreamSummaryResponse response = client.describeStreamSummary(describeStreamSummaryRequest).get(); + + int openShardCount = response.streamDescriptionSummary().openShardCount(); + int targetShardCount; + if (scalingFactor.peek() == ReshardOptions.SPLIT) { + // Split case: double the number of shards + targetShardCount = (int) (openShardCount * 2.0); + } else { + // Merge case: half the number of shards + targetShardCount = (int) (openShardCount * 0.5); + } + log.info("Scaling stream {} from {} shards to {} shards w/ scaling factor {}", streamName, openShardCount, targetShardCount, scalingFactor.peek()); + + final UpdateShardCountRequest updateShardCountRequest = UpdateShardCountRequest.builder().streamName(streamName).targetShardCount(targetShardCount).scalingType(ScalingType.UNIFORM_SCALING).build(); + final UpdateShardCountResponse shardCountResponse = client.updateShardCount(updateShardCountRequest).get(); + log.info("Executed shard scaling request. Response Details : {}", shardCountResponse.toString()); + + if (--totalRotations >= 0) { + scalingFactor.offer(scalingFactor.poll()); + } else { + scalingFactor.remove(); + } + } else { + log.info("No scaling factor found in queue"); + } + } catch (Exception e) { + log.error("Caught error while scaling shards for stream", e); + } finally { + log.info("Reshard Queue State : {}", scalingFactor); + } + } + } } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestRecordProcessor.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/application/TestRecordProcessor.java similarity index 97% rename from amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestRecordProcessor.java rename to amazon-kinesis-client/src/test/java/software/amazon/kinesis/application/TestRecordProcessor.java index f3e43915..0e4dc489 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestRecordProcessor.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/application/TestRecordProcessor.java @@ -1,4 +1,4 @@ -package software.amazon.kinesis.utils; +package software.amazon.kinesis.application; import lombok.extern.slf4j.Slf4j; import org.slf4j.MDC; @@ -11,6 +11,7 @@ import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.retrieval.KinesisClientRecord; +import software.amazon.kinesis.utils.RecordValidatorQueue; import java.nio.ByteBuffer; diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestRecordProcessorFactory.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/application/TestRecordProcessorFactory.java similarity index 84% rename from amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestRecordProcessorFactory.java rename to amazon-kinesis-client/src/test/java/software/amazon/kinesis/application/TestRecordProcessorFactory.java index 03361b6e..4e06890e 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestRecordProcessorFactory.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/application/TestRecordProcessorFactory.java @@ -1,7 +1,8 @@ -package software.amazon.kinesis.utils; +package software.amazon.kinesis.application; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; +import software.amazon.kinesis.utils.RecordValidatorQueue; public class TestRecordProcessorFactory implements ShardRecordProcessorFactory { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLAppConfig.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLAppConfig.java index b67efa10..fda33c0c 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLAppConfig.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLAppConfig.java @@ -5,7 +5,7 @@ import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.utils.RecordValidatorQueue; import software.amazon.kinesis.utils.ReshardOptions; -import software.amazon.kinesis.utils.TestRecordProcessorFactory; +import software.amazon.kinesis.application.TestRecordProcessorFactory; import lombok.Builder; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider; @@ -24,6 +24,7 @@ import software.amazon.kinesis.common.ConfigsBuilder; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; import software.amazon.kinesis.retrieval.RetrievalConfig; +import software.amazon.kinesis.utils.ReshardOptions; import java.io.IOException; import java.net.Inet4Address; @@ -159,7 +160,7 @@ public abstract class KCLAppConfig { */ @Value @Builder - static class ProducerConfig { + public static class ProducerConfig { private boolean isBatchPut; private int batchSize; private int recordSizeKB; @@ -171,7 +172,7 @@ public abstract class KCLAppConfig { */ @Value @Builder - static class ReshardConfig { + public static class ReshardConfig { /** * reshardingFactorCycle: lists the order or reshards that will be done during one reshard cycle * e.g {SPLIT, MERGE} means that the number of shards will first be doubled, then halved diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryStreamingReshardingTestConfig.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryStreamingReshardingTestConfig.java new file mode 100644 index 00000000..8f40f8cc --- /dev/null +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryStreamingReshardingTestConfig.java @@ -0,0 +1,36 @@ +package software.amazon.kinesis.config; + +import software.amazon.awssdk.http.Protocol; +import software.amazon.kinesis.utils.ReshardOptions; + +import java.util.UUID; + +import static software.amazon.kinesis.utils.ReshardOptions.MERGE; +import static software.amazon.kinesis.utils.ReshardOptions.SPLIT; + +public class ReleaseCanaryStreamingReshardingTestConfig extends KCLAppConfig { + + private final UUID uniqueId = UUID.randomUUID(); + @Override + public String getStreamName() { + return "KCLReleaseCanary2XStreamingReshardingTestStream_" + uniqueId; + } + + @Override + public Protocol getKinesisClientProtocol() { return Protocol.HTTP2; } + + @Override + public int getShardCount() { + return 100; + } + + @Override + public ReshardConfig getReshardConfig() { + return ReshardConfig.builder() + .reshardFrequencyMillis(3 * 60 * 1000) + .reshardingFactorCycle(new ReshardOptions[]{SPLIT, MERGE}) + .numReshardCycles(1) + .build(); + } + +} diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/BasicStreamConsumerIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/BasicStreamConsumerIntegrationTest.java index e2e44687..d03254c2 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/BasicStreamConsumerIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/BasicStreamConsumerIntegrationTest.java @@ -5,7 +5,7 @@ import software.amazon.kinesis.config.KCLAppConfig; import software.amazon.kinesis.config.ReleaseCanaryPollingH2TestConfig; import software.amazon.kinesis.config.ReleaseCanaryPollingH1TestConfig; import software.amazon.kinesis.config.ReleaseCanaryStreamingTestConfig; -import software.amazon.kinesis.utils.TestConsumer; +import software.amazon.kinesis.application.TestConsumer; public class BasicStreamConsumerIntegrationTest { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ReshardIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ReshardIntegrationTest.java new file mode 100644 index 00000000..179809fd --- /dev/null +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ReshardIntegrationTest.java @@ -0,0 +1,15 @@ +package software.amazon.kinesis.lifecycle; + +import org.junit.Test; +import software.amazon.kinesis.config.KCLAppConfig; +import software.amazon.kinesis.config.ReleaseCanaryStreamingReshardingTestConfig; +import software.amazon.kinesis.application.TestConsumer; + +public class ReshardIntegrationTest { + @Test + public void ReleaseCanaryStreamingReshardingTest() throws Exception { + KCLAppConfig consumerConfig = new ReleaseCanaryStreamingReshardingTestConfig(); + TestConsumer consumer = new TestConsumer(consumerConfig); + consumer.run(); + } +} diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/StreamExistenceManager.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/StreamExistenceManager.java index eeffb36b..db8615c3 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/StreamExistenceManager.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/StreamExistenceManager.java @@ -2,8 +2,6 @@ package software.amazon.kinesis.utils; import lombok.Value; import lombok.extern.slf4j.Slf4j; -import software.amazon.awssdk.services.dynamodb.model.ListTablesRequest; -import software.amazon.awssdk.services.dynamodb.model.ListTablesResponse; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest; import software.amazon.awssdk.services.kinesis.model.DeleteStreamRequest; @@ -113,5 +111,4 @@ public class StreamExistenceManager extends AWSResourceManager { } } } - }