diff --git a/README.md b/README.md index 3cc92799..b6ebb6a1 100644 --- a/README.md +++ b/README.md @@ -32,15 +32,17 @@ Please open an issue if you have any questions. ## Building from Source After you've downloaded the code from GitHub, you can build it using Maven. To disable GPG signing in the build, use - this command: `mvn clean install -Dgpg.skip=true`. Note: This command runs Integration tests, which in turn creates AWS - resources (which requires manual cleanup). Integration tests require valid AWS credentials need to be discovered at - runtime. To skip running integration tests, add ` -DskipITs` option to the build command. +this command: `mvn clean install -Dgpg.skip=true`. +Note: This command does not run integration tests. ## Running Integration Tests -To run integration tests: `mvn -Dit.test=*IntegrationTest verify`. -This will look for a default AWS profile specified in your local `.aws/credentials`. -Optionally, you can provide the name of an IAM user/role to run tests with as a string using this command: `mvn -Dit.test=*IntegrationTest -DawsProfile="" verify`. +Note that running integration tests creates AWS resources. +Integration tests require valid AWS credentials. +This will look for a default AWS profile specified in your local `.aws/credentials`. +To run all integration tests: `mvn verify -DskipITs=false`. +To run one integration tests: `mvn -Dit.test=*IntegrationTest -DskipITs=false verify` +Optionally, you can provide the name of an IAM user/role to run tests with as a string using this command: `mvn verify -DskipITs=false -DawsProfile=""`. ## Integration with the Kinesis Producer Library For producer-side developers using the **[Kinesis Producer Library (KPL)][kinesis-guide-kpl]**, the KCL integrates without additional effort. When the KCL retrieves an aggregated Amazon Kinesis record consisting of multiple KPL user records, it will automatically invoke the KPL to extract the individual user records before returning them to the user. diff --git a/amazon-kinesis-client/pom.xml b/amazon-kinesis-client/pom.xml index 45681178..653d581f 100644 --- a/amazon-kinesis-client/pom.xml +++ b/amazon-kinesis-client/pom.xml @@ -52,6 +52,7 @@ ${project.build.directory}/test-lib 2.0.7 1.1.14 + true @@ -199,6 +200,7 @@ maven-surefire-plugin 2.22.2 + ${skipITs} **/*IntegrationTest.java diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestConsumer.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/application/TestConsumer.java similarity index 64% rename from amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestConsumer.java rename to amazon-kinesis-client/src/test/java/software/amazon/kinesis/application/TestConsumer.java index 223ca99a..3e4e931d 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestConsumer.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/application/TestConsumer.java @@ -1,13 +1,19 @@ -package software.amazon.kinesis.utils; +package software.amazon.kinesis.application; import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.RandomStringUtils; import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryRequest; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryResponse; import software.amazon.awssdk.services.kinesis.model.PutRecordRequest; +import software.amazon.awssdk.services.kinesis.model.ScalingType; +import software.amazon.awssdk.services.kinesis.model.UpdateShardCountRequest; +import software.amazon.awssdk.services.kinesis.model.UpdateShardCountResponse; import software.amazon.kinesis.checkpoint.CheckpointConfig; import software.amazon.kinesis.common.ConfigsBuilder; import software.amazon.kinesis.common.InitialPositionInStreamExtended; @@ -19,9 +25,14 @@ import software.amazon.kinesis.lifecycle.LifecycleConfig; import software.amazon.kinesis.metrics.MetricsConfig; import software.amazon.kinesis.processor.ProcessorConfig; import software.amazon.kinesis.retrieval.RetrievalConfig; +import software.amazon.kinesis.utils.LeaseTableManager; +import software.amazon.kinesis.utils.RecordValidationStatus; +import software.amazon.kinesis.utils.ReshardOptions; +import software.amazon.kinesis.utils.StreamExistenceManager; import java.math.BigInteger; import java.nio.ByteBuffer; +import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -78,8 +89,10 @@ public class TestConsumer { try { startConsumer(); - // Sleep for three minutes to allow the producer/consumer to run and then end the test case. - Thread.sleep(TimeUnit.SECONDS.toMillis(60 * 3)); + // Sleep to allow the producer/consumer to run and then end the test case. + // If non-reshard sleep 3 minutes, else sleep 4 minutes per scale. + final int sleepMinutes = (consumerConfig.getReshardFactorList() == null) ? 3 : (4 * consumerConfig.getReshardFactorList().size()); + Thread.sleep(TimeUnit.MINUTES.toMillis(sleepMinutes)); // Stops sending dummy data. stopProducer(); @@ -115,9 +128,25 @@ public class TestConsumer { } private void startProducer() { - // Send dummy data to stream this.producerExecutor = Executors.newSingleThreadScheduledExecutor(); - this.producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 60, 1, TimeUnit.SECONDS); + this.producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 10, 1, TimeUnit.SECONDS); + + // Reshard logic if required for the test + if (consumerConfig.getReshardFactorList() != null) { + log.info("----Reshard Config found: {}", consumerConfig.getReshardFactorList()); + + final StreamScaler s = new StreamScaler( + kinesisClient, + consumerConfig.getStreamName(), + consumerConfig.getReshardFactorList(), + consumerConfig + ); + + // Schedule the stream scales 4 minutes apart with 2 minute starting delay + for (int i = 0; i < consumerConfig.getReshardFactorList().size(); i++) { + producerExecutor.schedule(s, (4 * i) + 2, TimeUnit.MINUTES); + } + } } private void setUpConsumerResources() throws Exception { @@ -128,7 +157,9 @@ public class TestConsumer { checkpointConfig = configsBuilder.checkpointConfig(); coordinatorConfig = configsBuilder.coordinatorConfig(); leaseManagementConfig = configsBuilder.leaseManagementConfig() - .initialPositionInStream(InitialPositionInStreamExtended.newInitialPosition(consumerConfig.getInitialPosition())) + .initialPositionInStream( + InitialPositionInStreamExtended.newInitialPosition(consumerConfig.getInitialPosition()) + ) .initialLeaseTableReadCapacity(50).initialLeaseTableWriteCapacity(50); lifecycleConfig = configsBuilder.lifecycleConfig(); processorConfig = configsBuilder.processorConfig(); @@ -152,6 +183,16 @@ public class TestConsumer { this.consumerFuture = consumerExecutor.schedule(scheduler, 0, TimeUnit.SECONDS); } + private void stopProducer() { + log.info("Cancelling producer and shutting down executor."); + if (producerFuture != null) { + producerFuture.cancel(false); + } + if (producerExecutor != null) { + producerExecutor.shutdown(); + } + } + public void publishRecord() { final PutRecordRequest request; try { @@ -175,7 +216,7 @@ public class TestConsumer { private ByteBuffer wrapWithCounter(int payloadSize, BigInteger payloadCounter) throws RuntimeException { final byte[] returnData; - log.info("--------------Putting record with data: {}", payloadCounter); + log.info("---------Putting record with data: {}", payloadCounter); try { returnData = mapper.writeValueAsBytes(payloadCounter); } catch (Exception e) { @@ -184,12 +225,6 @@ public class TestConsumer { return ByteBuffer.wrap(returnData); } - private void stopProducer() { - log.info("Cancelling producer and shutting down executor."); - producerFuture.cancel(false); - producerExecutor.shutdown(); - } - private void awaitConsumerFinish() throws Exception { Future gracefulShutdownFuture = scheduler.startGracefulShutdown(); log.info("Waiting up to 20 seconds for shutdown to complete."); @@ -198,7 +233,7 @@ public class TestConsumer { } catch (InterruptedException e) { log.info("Interrupted while waiting for graceful shutdown. Continuing."); } catch (ExecutionException | TimeoutException e) { - throw e; + scheduler.shutdown(); } log.info("Completed, shutting down now."); } @@ -209,15 +244,61 @@ public class TestConsumer { if (errorVal != RecordValidationStatus.NO_ERROR) { throw new RuntimeException("There was an error validating the records that were processed: " + errorVal.toString()); } - log.info("--------------Completed validation of processed records.--------------"); + log.info("---------Completed validation of processed records.---------"); } private void deleteResources(StreamExistenceManager streamExistenceManager, LeaseTableManager leaseTableManager) throws Exception { - log.info("-------------Start deleting stream.----------------"); + log.info("-------------Start deleting stream.---------"); streamExistenceManager.deleteResource(this.streamName); - log.info("-------------Start deleting lease table.----------------"); + log.info("---------Start deleting lease table.---------"); leaseTableManager.deleteResource(this.consumerConfig.getStreamName()); - log.info("-------------Finished deleting resources.----------------"); + log.info("---------Finished deleting resources.---------"); } + @Data + private static class StreamScaler implements Runnable { + private final KinesisAsyncClient client; + private final String streamName; + private final List scalingFactors; + private final KCLAppConfig consumerConfig; + private int scalingFactorIdx = 0; + private DescribeStreamSummaryRequest describeStreamSummaryRequest; + + private synchronized void scaleStream() throws InterruptedException, ExecutionException { + final DescribeStreamSummaryResponse response = client.describeStreamSummary(describeStreamSummaryRequest).get(); + + final int openShardCount = response.streamDescriptionSummary().openShardCount(); + final int targetShardCount = scalingFactors.get(scalingFactorIdx).calculateShardCount(openShardCount); + + log.info("Scaling stream {} from {} shards to {} shards w/ scaling factor {}", + streamName, openShardCount, targetShardCount, scalingFactors.get(scalingFactorIdx)); + + final UpdateShardCountRequest updateShardCountRequest = UpdateShardCountRequest.builder() + .streamName(streamName).targetShardCount(targetShardCount).scalingType(ScalingType.UNIFORM_SCALING).build(); + final UpdateShardCountResponse shardCountResponse = client.updateShardCount(updateShardCountRequest).get(); + log.info("Executed shard scaling request. Response Details : {}", shardCountResponse.toString()); + + scalingFactorIdx++; + } + + @Override + public void run() { + if (scalingFactors.size() == 0 || scalingFactorIdx >= scalingFactors.size()) { + log.info("No scaling factor found in list"); + return; + } + log.info("Starting stream scaling with params : {}", this); + + if (describeStreamSummaryRequest == null) { + describeStreamSummaryRequest = DescribeStreamSummaryRequest.builder().streamName(streamName).build(); + } + try { + scaleStream(); + } catch (InterruptedException | ExecutionException e) { + log.error("Caught error while scaling shards for stream", e); + } finally { + log.info("Reshard List State : {}", scalingFactors); + } + } + } } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestRecordProcessor.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/application/TestRecordProcessor.java similarity index 97% rename from amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestRecordProcessor.java rename to amazon-kinesis-client/src/test/java/software/amazon/kinesis/application/TestRecordProcessor.java index f3e43915..0e4dc489 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestRecordProcessor.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/application/TestRecordProcessor.java @@ -1,4 +1,4 @@ -package software.amazon.kinesis.utils; +package software.amazon.kinesis.application; import lombok.extern.slf4j.Slf4j; import org.slf4j.MDC; @@ -11,6 +11,7 @@ import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.retrieval.KinesisClientRecord; +import software.amazon.kinesis.utils.RecordValidatorQueue; import java.nio.ByteBuffer; diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestRecordProcessorFactory.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/application/TestRecordProcessorFactory.java similarity index 84% rename from amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestRecordProcessorFactory.java rename to amazon-kinesis-client/src/test/java/software/amazon/kinesis/application/TestRecordProcessorFactory.java index 03361b6e..4e06890e 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestRecordProcessorFactory.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/application/TestRecordProcessorFactory.java @@ -1,7 +1,8 @@ -package software.amazon.kinesis.utils; +package software.amazon.kinesis.application; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; +import software.amazon.kinesis.utils.RecordValidatorQueue; public class TestRecordProcessorFactory implements ShardRecordProcessorFactory { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLAppConfig.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLAppConfig.java index 5365ca4f..b5d0c4d1 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLAppConfig.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLAppConfig.java @@ -5,7 +5,7 @@ import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.utils.RecordValidatorQueue; import software.amazon.kinesis.utils.ReshardOptions; -import software.amazon.kinesis.utils.TestRecordProcessorFactory; +import software.amazon.kinesis.application.TestRecordProcessorFactory; import lombok.Builder; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider; @@ -29,6 +29,7 @@ import java.io.IOException; import java.net.Inet4Address; import java.net.URISyntaxException; import java.net.UnknownHostException; +import java.util.List; /** * Default configuration for a producer or consumer used in integration tests. @@ -75,7 +76,7 @@ public abstract class KCLAppConfig { .build(); } - public ReshardConfig getReshardConfig() { + public List getReshardFactorList() { return null; } @@ -157,34 +158,11 @@ public abstract class KCLAppConfig { */ @Value @Builder - static class ProducerConfig { + public static class ProducerConfig { private boolean isBatchPut; private int batchSize; private int recordSizeKB; private long callPeriodMills; } - /** - * Description of the method of resharding for a test case - */ - @Value - @Builder - static class ReshardConfig { - /** - * reshardingFactorCycle: lists the order or reshards that will be done during one reshard cycle - * e.g {SPLIT, MERGE} means that the number of shards will first be doubled, then halved - */ - private ReshardOptions[] reshardingFactorCycle; - - /** - * numReshardCycles: the number of resharding cycles that will be executed in a test - */ - private int numReshardCycles; - - /** - * reshardFrequencyMillis: the period of time between reshard cycles (in milliseconds) - */ - private long reshardFrequencyMillis; - } - } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryStreamingReshardingTestConfig.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryStreamingReshardingTestConfig.java new file mode 100644 index 00000000..cfdc5298 --- /dev/null +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryStreamingReshardingTestConfig.java @@ -0,0 +1,34 @@ +package software.amazon.kinesis.config; + +import software.amazon.awssdk.http.Protocol; +import software.amazon.kinesis.utils.ReshardOptions; + +import java.util.Arrays; +import java.util.List; +import java.util.UUID; + +import static software.amazon.kinesis.utils.ReshardOptions.MERGE; +import static software.amazon.kinesis.utils.ReshardOptions.SPLIT; + +public class ReleaseCanaryStreamingReshardingTestConfig extends KCLAppConfig { + + private final UUID uniqueId = UUID.randomUUID(); + @Override + public String getStreamName() { + return "KCLReleaseCanary2XStreamingReshardingTestStream_" + uniqueId; + } + + @Override + public Protocol getKinesisClientProtocol() { return Protocol.HTTP2; } + + @Override + public int getShardCount() { + return 100; + } + + @Override + public List getReshardFactorList() { + return Arrays.asList(SPLIT, MERGE); + } + +} diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/BasicStreamConsumerIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/BasicStreamConsumerIntegrationTest.java index e2e44687..d03254c2 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/BasicStreamConsumerIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/BasicStreamConsumerIntegrationTest.java @@ -5,7 +5,7 @@ import software.amazon.kinesis.config.KCLAppConfig; import software.amazon.kinesis.config.ReleaseCanaryPollingH2TestConfig; import software.amazon.kinesis.config.ReleaseCanaryPollingH1TestConfig; import software.amazon.kinesis.config.ReleaseCanaryStreamingTestConfig; -import software.amazon.kinesis.utils.TestConsumer; +import software.amazon.kinesis.application.TestConsumer; public class BasicStreamConsumerIntegrationTest { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ReshardIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ReshardIntegrationTest.java new file mode 100644 index 00000000..aa08980e --- /dev/null +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ReshardIntegrationTest.java @@ -0,0 +1,15 @@ +package software.amazon.kinesis.lifecycle; + +import org.junit.Test; +import software.amazon.kinesis.config.KCLAppConfig; +import software.amazon.kinesis.config.ReleaseCanaryStreamingReshardingTestConfig; +import software.amazon.kinesis.application.TestConsumer; + +public class ReshardIntegrationTest { + @Test + public void kclReleaseCanaryStreamingReshardingTest() throws Exception { + KCLAppConfig consumerConfig = new ReleaseCanaryStreamingReshardingTestConfig(); + TestConsumer consumer = new TestConsumer(consumerConfig); + consumer.run(); + } +} diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/ReshardOptions.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/ReshardOptions.java index fbf5f68b..f1513cfb 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/ReshardOptions.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/ReshardOptions.java @@ -6,6 +6,16 @@ package software.amazon.kinesis.utils; * Merge halves the number of shards. */ public enum ReshardOptions { - SPLIT, - MERGE + SPLIT { + public int calculateShardCount(int currentShards) { + return (int) (2.0 * currentShards); + } + }, + MERGE { + public int calculateShardCount(int currentShards) { + return (int) (0.5 * currentShards); + } + }; + + public abstract int calculateShardCount(int currentShards); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/StreamExistenceManager.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/StreamExistenceManager.java index b5f06b78..db8615c3 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/StreamExistenceManager.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/StreamExistenceManager.java @@ -111,5 +111,4 @@ public class StreamExistenceManager extends AWSResourceManager { } } } - }