diff --git a/README.md b/README.md index 9db7259b..b6ebb6a1 100644 --- a/README.md +++ b/README.md @@ -37,12 +37,12 @@ Note: This command does not run integration tests. ## Running Integration Tests -Note that running integration tests create AWS resources. -Integration tests require valid AWS credentials need to be discovered at runtime. -To run all integration tests: `mvn install package -DskipITs=false`. -To run one integration tests: `mvn -Dit.test=*IntegrationTest -DskipITs=false verify` +Note that running integration tests creates AWS resources. +Integration tests require valid AWS credentials. This will look for a default AWS profile specified in your local `.aws/credentials`. -Optionally, you can provide the name of an IAM user/role to run tests with as a string using this command: `mvn install package -DskipITs=false -DawsProfile=""`. +To run all integration tests: `mvn verify -DskipITs=false`. +To run one integration tests: `mvn -Dit.test=*IntegrationTest -DskipITs=false verify` +Optionally, you can provide the name of an IAM user/role to run tests with as a string using this command: `mvn verify -DskipITs=false -DawsProfile=""`. ## Integration with the Kinesis Producer Library For producer-side developers using the **[Kinesis Producer Library (KPL)][kinesis-guide-kpl]**, the KCL integrates without additional effort. When the KCL retrieves an aggregated Amazon Kinesis record consisting of multiple KPL user records, it will automatically invoke the KPL to extract the individual user records before returning them to the user. diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/application/TestConsumer.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/application/TestConsumer.java index d2ddd06d..35200f6d 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/application/TestConsumer.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/application/TestConsumer.java @@ -1,9 +1,7 @@ package software.amazon.kinesis.application; import com.fasterxml.jackson.databind.ObjectMapper; -import lombok.AllArgsConstructor; import lombok.Data; -import lombok.ToString; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.RandomStringUtils; import software.amazon.awssdk.core.SdkBytes; @@ -34,9 +32,7 @@ import software.amazon.kinesis.utils.StreamExistenceManager; import java.math.BigInteger; import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.LinkedList; -import java.util.Queue; +import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -59,7 +55,7 @@ public class TestConsumer { private LifecycleConfig lifecycleConfig; private ProcessorConfig processorConfig; private Scheduler scheduler; - private ScheduledExecutorService producerExecutor; + private static final ScheduledExecutorService PRODUCER_EXECUTOR = Executors.newSingleThreadScheduledExecutor(); private ScheduledFuture producerFuture; private ScheduledExecutorService consumerExecutor; private ScheduledFuture consumerFuture; @@ -93,12 +89,9 @@ public class TestConsumer { try { startConsumer(); - // Sleep for three minutes to allow the producer/consumer to run and then end the test case. - if (consumerConfig.getReshardConfig() == null) { - Thread.sleep(TimeUnit.SECONDS.toMillis(60 * 3)); - } else { - Thread.sleep(TimeUnit.SECONDS.toMillis(60 * 8)); - } + // Sleep to allow the producer/consumer to run and then end the test case. If non-reshard sleep 3 minutes, else sleep 4 minutes per scale. + final int sleepMinutes = (consumerConfig.getReshardFactorList() == null) ? 3 : (4 * consumerConfig.getReshardFactorList().size()); + Thread.sleep(TimeUnit.MINUTES.toMillis(sleepMinutes)); // Stops sending dummy data. stopProducer(); @@ -134,27 +127,18 @@ public class TestConsumer { } private void startProducer() { - producerExecutor = Executors.newSingleThreadScheduledExecutor(); - producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 10, 1, TimeUnit.SECONDS); + producerFuture = PRODUCER_EXECUTOR.scheduleAtFixedRate(this::publishRecord, 10, 1, TimeUnit.SECONDS); // Reshard logic if required for the test - if (consumerConfig.getReshardConfig() != null) { - log.info("------------------------- Reshard Config found -----------------------------"); - final Queue reshardQueue = new LinkedList<>(Arrays.asList(consumerConfig.getReshardConfig().getReshardingFactorCycle())); - int totalRotations = reshardQueue.size() * (consumerConfig.getReshardConfig().getNumReshardCycles() - 1); + if (consumerConfig.getReshardFactorList() != null) { + log.info("----Reshard Config found: {}", consumerConfig.getReshardFactorList()); - final StreamScaler s = new StreamScaler(kinesisClient, consumerConfig.getStreamName(), reshardQueue, totalRotations, consumerConfig); + final StreamScaler s = new StreamScaler(kinesisClient, consumerConfig.getStreamName(), consumerConfig.getReshardFactorList(), consumerConfig); - Runnable task1 = () -> { - log.info("----------------------Starting new reshard------------------------------"); - s.run(); - }; - - // Split shard - producerExecutor.schedule(task1, 2, TimeUnit.MINUTES); - - // Merge shard - producerExecutor.schedule(task1, 6, TimeUnit.MINUTES); + // Schedule the stream scales 4 minutes apart with 2 minute starting delay + for (int i = 0; i < consumerConfig.getReshardFactorList().size(); i++) { + PRODUCER_EXECUTOR.schedule(s, (4 * i) + 2, TimeUnit.MINUTES); + } } } @@ -192,8 +176,12 @@ public class TestConsumer { public void stopProducer() { log.info("Cancelling producer and shutting down executor."); - producerFuture.cancel(false); - producerExecutor.shutdown(); + if (producerFuture != null) { + producerFuture.cancel(false); + } + if (PRODUCER_EXECUTOR != null) { + PRODUCER_EXECUTOR.shutdown(); + } } public void publishRecord() { @@ -219,7 +207,7 @@ public class TestConsumer { private ByteBuffer wrapWithCounter(int payloadSize, BigInteger payloadCounter) throws RuntimeException { final byte[] returnData; - log.info("--------------Putting record with data: {}", payloadCounter); + log.info("---------Putting record with data: {}", payloadCounter); try { returnData = mapper.writeValueAsBytes(payloadCounter); } catch (Exception e) { @@ -247,63 +235,60 @@ public class TestConsumer { if (errorVal != RecordValidationStatus.NO_ERROR) { throw new RuntimeException("There was an error validating the records that were processed: " + errorVal.toString()); } - log.info("--------------Completed validation of processed records.--------------"); + log.info("---------Completed validation of processed records.---------"); } private void deleteResources(StreamExistenceManager streamExistenceManager, LeaseTableManager leaseTableManager) throws Exception { - log.info("-------------Start deleting stream.----------------"); + log.info("-------------Start deleting stream.---------"); streamExistenceManager.deleteResource(this.streamName); - log.info("-------------Start deleting lease table.----------------"); + log.info("---------Start deleting lease table.---------"); leaseTableManager.deleteResource(this.consumerConfig.getStreamName()); - log.info("-------------Finished deleting resources.----------------"); + log.info("---------Finished deleting resources.---------"); } @Data - @ToString - @AllArgsConstructor - public static class StreamScaler implements Runnable { + private static class StreamScaler implements Runnable { private final KinesisAsyncClient client; private final String streamName; - private final Queue scalingFactor; - private int totalRotations; - private KCLAppConfig testConfig; + private final List scalingFactors; + private final KCLAppConfig consumerConfig; + private int scalingFactorIdx = 0; + private DescribeStreamSummaryRequest describeStreamSummaryRequest; + + private synchronized void scaleStream() throws InterruptedException, ExecutionException { + final DescribeStreamSummaryResponse response = client.describeStreamSummary(describeStreamSummaryRequest).get(); + + final int openShardCount = response.streamDescriptionSummary().openShardCount(); + final int targetShardCount = scalingFactors.get(scalingFactorIdx).calculateShardCount(openShardCount); + + log.info("Scaling stream {} from {} shards to {} shards w/ scaling factor {}", + streamName, openShardCount, targetShardCount, scalingFactors.get(scalingFactorIdx)); + + final UpdateShardCountRequest updateShardCountRequest = UpdateShardCountRequest.builder() + .streamName(streamName).targetShardCount(targetShardCount).scalingType(ScalingType.UNIFORM_SCALING).build(); + final UpdateShardCountResponse shardCountResponse = client.updateShardCount(updateShardCountRequest).get(); + log.info("Executed shard scaling request. Response Details : {}", shardCountResponse.toString()); + + scalingFactorIdx++; + } @Override public void run() { + if (scalingFactors.size() == 0 || scalingFactorIdx >= scalingFactors.size()) { + log.info("No scaling factor found in list"); + return; + } + log.info("Starting stream scaling with params : {}", this); + + if (describeStreamSummaryRequest == null) { + describeStreamSummaryRequest = DescribeStreamSummaryRequest.builder().streamName(streamName).build(); + } try { - log.info("----------------------------Starting stream scale----------------------"); - if (!scalingFactor.isEmpty()) { - log.info("Starting stream scaling with params : {}", this); - final DescribeStreamSummaryRequest describeStreamSummaryRequest = DescribeStreamSummaryRequest.builder().streamName(streamName).build(); - final DescribeStreamSummaryResponse response = client.describeStreamSummary(describeStreamSummaryRequest).get(); - - int openShardCount = response.streamDescriptionSummary().openShardCount(); - int targetShardCount; - if (scalingFactor.peek() == ReshardOptions.SPLIT) { - // Split case: double the number of shards - targetShardCount = (int) (openShardCount * 2.0); - } else { - // Merge case: half the number of shards - targetShardCount = (int) (openShardCount * 0.5); - } - log.info("Scaling stream {} from {} shards to {} shards w/ scaling factor {}", streamName, openShardCount, targetShardCount, scalingFactor.peek()); - - final UpdateShardCountRequest updateShardCountRequest = UpdateShardCountRequest.builder().streamName(streamName).targetShardCount(targetShardCount).scalingType(ScalingType.UNIFORM_SCALING).build(); - final UpdateShardCountResponse shardCountResponse = client.updateShardCount(updateShardCountRequest).get(); - log.info("Executed shard scaling request. Response Details : {}", shardCountResponse.toString()); - - if (--totalRotations >= 0) { - scalingFactor.offer(scalingFactor.poll()); - } else { - scalingFactor.remove(); - } - } else { - log.info("No scaling factor found in queue"); - } - } catch (Exception e) { + scaleStream(); + } catch (InterruptedException | ExecutionException e) { log.error("Caught error while scaling shards for stream", e); } finally { - log.info("Reshard Queue State : {}", scalingFactor); + log.info("Reshard List State : {}", scalingFactors); } } } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLAppConfig.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLAppConfig.java index 6732e6ff..b5d0c4d1 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLAppConfig.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLAppConfig.java @@ -24,12 +24,12 @@ import software.amazon.kinesis.common.ConfigsBuilder; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; import software.amazon.kinesis.retrieval.RetrievalConfig; -import software.amazon.kinesis.utils.ReshardOptions; import java.io.IOException; import java.net.Inet4Address; import java.net.URISyntaxException; import java.net.UnknownHostException; +import java.util.List; /** * Default configuration for a producer or consumer used in integration tests. @@ -76,7 +76,7 @@ public abstract class KCLAppConfig { .build(); } - public ReshardConfig getReshardConfig() { + public List getReshardFactorList() { return null; } @@ -165,27 +165,4 @@ public abstract class KCLAppConfig { private long callPeriodMills; } - /** - * Description of the method of resharding for a test case - */ - @Value - @Builder - public static class ReshardConfig { - /** - * reshardingFactorCycle: lists the order or reshards that will be done during one reshard cycle - * e.g {SPLIT, MERGE} means that the number of shards will first be doubled, then halved - */ - private ReshardOptions[] reshardingFactorCycle; - - /** - * numReshardCycles: the number of resharding cycles that will be executed in a test - */ - private int numReshardCycles; - - /** - * reshardFrequencyMillis: the period of time between reshard cycles (in milliseconds) - */ - private long reshardFrequencyMillis; - } - } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryStreamingReshardingTestConfig.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryStreamingReshardingTestConfig.java index 8f40f8cc..cfdc5298 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryStreamingReshardingTestConfig.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryStreamingReshardingTestConfig.java @@ -3,6 +3,8 @@ package software.amazon.kinesis.config; import software.amazon.awssdk.http.Protocol; import software.amazon.kinesis.utils.ReshardOptions; +import java.util.Arrays; +import java.util.List; import java.util.UUID; import static software.amazon.kinesis.utils.ReshardOptions.MERGE; @@ -25,12 +27,8 @@ public class ReleaseCanaryStreamingReshardingTestConfig extends KCLAppConfig { } @Override - public ReshardConfig getReshardConfig() { - return ReshardConfig.builder() - .reshardFrequencyMillis(3 * 60 * 1000) - .reshardingFactorCycle(new ReshardOptions[]{SPLIT, MERGE}) - .numReshardCycles(1) - .build(); + public List getReshardFactorList() { + return Arrays.asList(SPLIT, MERGE); } } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ReshardIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ReshardIntegrationTest.java index 179809fd..aa08980e 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ReshardIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ReshardIntegrationTest.java @@ -7,7 +7,7 @@ import software.amazon.kinesis.application.TestConsumer; public class ReshardIntegrationTest { @Test - public void ReleaseCanaryStreamingReshardingTest() throws Exception { + public void kclReleaseCanaryStreamingReshardingTest() throws Exception { KCLAppConfig consumerConfig = new ReleaseCanaryStreamingReshardingTestConfig(); TestConsumer consumer = new TestConsumer(consumerConfig); consumer.run(); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/ReshardOptions.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/ReshardOptions.java index fbf5f68b..f1513cfb 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/ReshardOptions.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/ReshardOptions.java @@ -6,6 +6,16 @@ package software.amazon.kinesis.utils; * Merge halves the number of shards. */ public enum ReshardOptions { - SPLIT, - MERGE + SPLIT { + public int calculateShardCount(int currentShards) { + return (int) (2.0 * currentShards); + } + }, + MERGE { + public int calculateShardCount(int currentShards) { + return (int) (0.5 * currentShards); + } + }; + + public abstract int calculateShardCount(int currentShards); }