Initial changes for resharding integration tests KCL 2.x and changing integration tests to not run by default

This commit is contained in:
Meher Mankikar 2023-06-15 09:23:55 -07:00
parent 53dbb4ea79
commit a50c2fcdf8
10 changed files with 169 additions and 27 deletions

View file

@ -32,15 +32,17 @@ Please open an issue if you have any questions.
## Building from Source ## Building from Source
After you've downloaded the code from GitHub, you can build it using Maven. To disable GPG signing in the build, use After you've downloaded the code from GitHub, you can build it using Maven. To disable GPG signing in the build, use
this command: `mvn clean install -Dgpg.skip=true`. Note: This command runs Integration tests, which in turn creates AWS this command: `mvn clean install -Dgpg.skip=true`.
resources (which requires manual cleanup). Integration tests require valid AWS credentials need to be discovered at Note: This command does not run integration tests.
runtime. To skip running integration tests, add ` -DskipITs` option to the build command.
## Running Integration Tests ## Running Integration Tests
To run integration tests: `mvn -Dit.test=*IntegrationTest verify`. Note that running integration tests create AWS resources.
Integration tests require valid AWS credentials need to be discovered at runtime.
To run all integration tests: `mvn install package -DskipITs=false`.
To run one integration tests: `mvn -Dit.test=*IntegrationTest -DskipITs=false verify`
This will look for a default AWS profile specified in your local `.aws/credentials`. This will look for a default AWS profile specified in your local `.aws/credentials`.
Optionally, you can provide the name of an IAM user/role to run tests with as a string using this command: `mvn -Dit.test=*IntegrationTest -DawsProfile="<PROFILE_NAME>" verify`. Optionally, you can provide the name of an IAM user/role to run tests with as a string using this command: `mvn install package -DskipITs=false -DawsProfile="<PROFILE_NAME>"`.
## Integration with the Kinesis Producer Library ## Integration with the Kinesis Producer Library
For producer-side developers using the **[Kinesis Producer Library (KPL)][kinesis-guide-kpl]**, the KCL integrates without additional effort. When the KCL retrieves an aggregated Amazon Kinesis record consisting of multiple KPL user records, it will automatically invoke the KPL to extract the individual user records before returning them to the user. For producer-side developers using the **[Kinesis Producer Library (KPL)][kinesis-guide-kpl]**, the KCL integrates without additional effort. When the KCL retrieves an aggregated Amazon Kinesis record consisting of multiple KPL user records, it will automatically invoke the KPL to extract the individual user records before returning them to the user.

View file

@ -52,6 +52,7 @@
<sqlite4java.libpath>${project.build.directory}/test-lib</sqlite4java.libpath> <sqlite4java.libpath>${project.build.directory}/test-lib</sqlite4java.libpath>
<slf4j.version>2.0.7</slf4j.version> <slf4j.version>2.0.7</slf4j.version>
<gsr.version>1.1.14</gsr.version> <gsr.version>1.1.14</gsr.version>
<skipITs>true</skipITs>
</properties> </properties>
<dependencies> <dependencies>
@ -199,6 +200,7 @@
<artifactId>maven-surefire-plugin</artifactId> <artifactId>maven-surefire-plugin</artifactId>
<version>2.22.2</version> <version>2.22.2</version>
<configuration> <configuration>
<skipITs>${skipITs}</skipITs>
<excludes> <excludes>
<exclude>**/*IntegrationTest.java</exclude> <exclude>**/*IntegrationTest.java</exclude>
</excludes> </excludes>

View file

@ -1,13 +1,21 @@
package software.amazon.kinesis.utils; package software.amazon.kinesis.application;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.RandomStringUtils;
import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryResponse;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest; import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.awssdk.services.kinesis.model.ScalingType;
import software.amazon.awssdk.services.kinesis.model.UpdateShardCountRequest;
import software.amazon.awssdk.services.kinesis.model.UpdateShardCountResponse;
import software.amazon.kinesis.checkpoint.CheckpointConfig; import software.amazon.kinesis.checkpoint.CheckpointConfig;
import software.amazon.kinesis.common.ConfigsBuilder; import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
@ -19,9 +27,16 @@ import software.amazon.kinesis.lifecycle.LifecycleConfig;
import software.amazon.kinesis.metrics.MetricsConfig; import software.amazon.kinesis.metrics.MetricsConfig;
import software.amazon.kinesis.processor.ProcessorConfig; import software.amazon.kinesis.processor.ProcessorConfig;
import software.amazon.kinesis.retrieval.RetrievalConfig; import software.amazon.kinesis.retrieval.RetrievalConfig;
import software.amazon.kinesis.utils.LeaseTableManager;
import software.amazon.kinesis.utils.RecordValidationStatus;
import software.amazon.kinesis.utils.ReshardOptions;
import software.amazon.kinesis.utils.StreamExistenceManager;
import java.math.BigInteger; import java.math.BigInteger;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
@ -79,7 +94,11 @@ public class TestConsumer {
startConsumer(); startConsumer();
// Sleep for three minutes to allow the producer/consumer to run and then end the test case. // Sleep for three minutes to allow the producer/consumer to run and then end the test case.
Thread.sleep(TimeUnit.SECONDS.toMillis(60 * 3)); if (consumerConfig.getReshardConfig() == null) {
Thread.sleep(TimeUnit.SECONDS.toMillis(60 * 3));
} else {
Thread.sleep(TimeUnit.SECONDS.toMillis(60 * 8));
}
// Stops sending dummy data. // Stops sending dummy data.
stopProducer(); stopProducer();
@ -115,9 +134,28 @@ public class TestConsumer {
} }
private void startProducer() { private void startProducer() {
// Send dummy data to stream producerExecutor = Executors.newSingleThreadScheduledExecutor();
this.producerExecutor = Executors.newSingleThreadScheduledExecutor(); producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 10, 1, TimeUnit.SECONDS);
this.producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 60, 1, TimeUnit.SECONDS);
// Reshard logic if required for the test
if (consumerConfig.getReshardConfig() != null) {
log.info("------------------------- Reshard Config found -----------------------------");
final Queue<ReshardOptions> reshardQueue = new LinkedList<>(Arrays.asList(consumerConfig.getReshardConfig().getReshardingFactorCycle()));
int totalRotations = reshardQueue.size() * (consumerConfig.getReshardConfig().getNumReshardCycles() - 1);
final StreamScaler s = new StreamScaler(kinesisClient, consumerConfig.getStreamName(), reshardQueue, totalRotations, consumerConfig);
Runnable task1 = () -> {
log.info("----------------------Starting new reshard------------------------------");
s.run();
};
// Split shard
producerExecutor.schedule(task1, 2, TimeUnit.MINUTES);
// Merge shard
producerExecutor.schedule(task1, 6, TimeUnit.MINUTES);
}
} }
private void setUpConsumerResources() throws Exception { private void setUpConsumerResources() throws Exception {
@ -152,6 +190,12 @@ public class TestConsumer {
this.consumerFuture = consumerExecutor.schedule(scheduler, 0, TimeUnit.SECONDS); this.consumerFuture = consumerExecutor.schedule(scheduler, 0, TimeUnit.SECONDS);
} }
public void stopProducer() {
log.info("Cancelling producer and shutting down executor.");
producerFuture.cancel(false);
producerExecutor.shutdown();
}
public void publishRecord() { public void publishRecord() {
final PutRecordRequest request; final PutRecordRequest request;
try { try {
@ -184,12 +228,6 @@ public class TestConsumer {
return ByteBuffer.wrap(returnData); return ByteBuffer.wrap(returnData);
} }
private void stopProducer() {
log.info("Cancelling producer and shutting down executor.");
producerFuture.cancel(false);
producerExecutor.shutdown();
}
private void awaitConsumerFinish() throws Exception { private void awaitConsumerFinish() throws Exception {
Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown(); Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
log.info("Waiting up to 20 seconds for shutdown to complete."); log.info("Waiting up to 20 seconds for shutdown to complete.");
@ -198,7 +236,7 @@ public class TestConsumer {
} catch (InterruptedException e) { } catch (InterruptedException e) {
log.info("Interrupted while waiting for graceful shutdown. Continuing."); log.info("Interrupted while waiting for graceful shutdown. Continuing.");
} catch (ExecutionException | TimeoutException e) { } catch (ExecutionException | TimeoutException e) {
throw e; scheduler.shutdown();
} }
log.info("Completed, shutting down now."); log.info("Completed, shutting down now.");
} }
@ -220,4 +258,53 @@ public class TestConsumer {
log.info("-------------Finished deleting resources.----------------"); log.info("-------------Finished deleting resources.----------------");
} }
@Data
@ToString
@AllArgsConstructor
public static class StreamScaler implements Runnable {
private final KinesisAsyncClient client;
private final String streamName;
private final Queue<ReshardOptions> scalingFactor;
private int totalRotations;
private KCLAppConfig testConfig;
@Override
public void run() {
try {
log.info("----------------------------Starting stream scale----------------------");
if (!scalingFactor.isEmpty()) {
log.info("Starting stream scaling with params : {}", this);
final DescribeStreamSummaryRequest describeStreamSummaryRequest = DescribeStreamSummaryRequest.builder().streamName(streamName).build();
final DescribeStreamSummaryResponse response = client.describeStreamSummary(describeStreamSummaryRequest).get();
int openShardCount = response.streamDescriptionSummary().openShardCount();
int targetShardCount;
if (scalingFactor.peek() == ReshardOptions.SPLIT) {
// Split case: double the number of shards
targetShardCount = (int) (openShardCount * 2.0);
} else {
// Merge case: half the number of shards
targetShardCount = (int) (openShardCount * 0.5);
}
log.info("Scaling stream {} from {} shards to {} shards w/ scaling factor {}", streamName, openShardCount, targetShardCount, scalingFactor.peek());
final UpdateShardCountRequest updateShardCountRequest = UpdateShardCountRequest.builder().streamName(streamName).targetShardCount(targetShardCount).scalingType(ScalingType.UNIFORM_SCALING).build();
final UpdateShardCountResponse shardCountResponse = client.updateShardCount(updateShardCountRequest).get();
log.info("Executed shard scaling request. Response Details : {}", shardCountResponse.toString());
if (--totalRotations >= 0) {
scalingFactor.offer(scalingFactor.poll());
} else {
scalingFactor.remove();
}
} else {
log.info("No scaling factor found in queue");
}
} catch (Exception e) {
log.error("Caught error while scaling shards for stream", e);
} finally {
log.info("Reshard Queue State : {}", scalingFactor);
}
}
}
} }

View file

@ -1,4 +1,4 @@
package software.amazon.kinesis.utils; package software.amazon.kinesis.application;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC; import org.slf4j.MDC;
@ -11,6 +11,7 @@ import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.retrieval.KinesisClientRecord; import software.amazon.kinesis.retrieval.KinesisClientRecord;
import software.amazon.kinesis.utils.RecordValidatorQueue;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;

View file

@ -1,7 +1,8 @@
package software.amazon.kinesis.utils; package software.amazon.kinesis.application;
import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory; import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.utils.RecordValidatorQueue;
public class TestRecordProcessorFactory implements ShardRecordProcessorFactory { public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {

View file

@ -5,7 +5,7 @@ import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.utils.RecordValidatorQueue; import software.amazon.kinesis.utils.RecordValidatorQueue;
import software.amazon.kinesis.utils.ReshardOptions; import software.amazon.kinesis.utils.ReshardOptions;
import software.amazon.kinesis.utils.TestRecordProcessorFactory; import software.amazon.kinesis.application.TestRecordProcessorFactory;
import lombok.Builder; import lombok.Builder;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider; import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
@ -24,6 +24,7 @@ import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory; import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.retrieval.RetrievalConfig; import software.amazon.kinesis.retrieval.RetrievalConfig;
import software.amazon.kinesis.utils.ReshardOptions;
import java.io.IOException; import java.io.IOException;
import java.net.Inet4Address; import java.net.Inet4Address;
@ -159,7 +160,7 @@ public abstract class KCLAppConfig {
*/ */
@Value @Value
@Builder @Builder
static class ProducerConfig { public static class ProducerConfig {
private boolean isBatchPut; private boolean isBatchPut;
private int batchSize; private int batchSize;
private int recordSizeKB; private int recordSizeKB;
@ -171,7 +172,7 @@ public abstract class KCLAppConfig {
*/ */
@Value @Value
@Builder @Builder
static class ReshardConfig { public static class ReshardConfig {
/** /**
* reshardingFactorCycle: lists the order or reshards that will be done during one reshard cycle * reshardingFactorCycle: lists the order or reshards that will be done during one reshard cycle
* e.g {SPLIT, MERGE} means that the number of shards will first be doubled, then halved * e.g {SPLIT, MERGE} means that the number of shards will first be doubled, then halved

View file

@ -0,0 +1,36 @@
package software.amazon.kinesis.config;
import software.amazon.awssdk.http.Protocol;
import software.amazon.kinesis.utils.ReshardOptions;
import java.util.UUID;
import static software.amazon.kinesis.utils.ReshardOptions.MERGE;
import static software.amazon.kinesis.utils.ReshardOptions.SPLIT;
public class ReleaseCanaryStreamingReshardingTestConfig extends KCLAppConfig {
private final UUID uniqueId = UUID.randomUUID();
@Override
public String getStreamName() {
return "KCLReleaseCanary2XStreamingReshardingTestStream_" + uniqueId;
}
@Override
public Protocol getKinesisClientProtocol() { return Protocol.HTTP2; }
@Override
public int getShardCount() {
return 100;
}
@Override
public ReshardConfig getReshardConfig() {
return ReshardConfig.builder()
.reshardFrequencyMillis(3 * 60 * 1000)
.reshardingFactorCycle(new ReshardOptions[]{SPLIT, MERGE})
.numReshardCycles(1)
.build();
}
}

View file

@ -5,7 +5,7 @@ import software.amazon.kinesis.config.KCLAppConfig;
import software.amazon.kinesis.config.ReleaseCanaryPollingH2TestConfig; import software.amazon.kinesis.config.ReleaseCanaryPollingH2TestConfig;
import software.amazon.kinesis.config.ReleaseCanaryPollingH1TestConfig; import software.amazon.kinesis.config.ReleaseCanaryPollingH1TestConfig;
import software.amazon.kinesis.config.ReleaseCanaryStreamingTestConfig; import software.amazon.kinesis.config.ReleaseCanaryStreamingTestConfig;
import software.amazon.kinesis.utils.TestConsumer; import software.amazon.kinesis.application.TestConsumer;
public class BasicStreamConsumerIntegrationTest { public class BasicStreamConsumerIntegrationTest {

View file

@ -0,0 +1,15 @@
package software.amazon.kinesis.lifecycle;
import org.junit.Test;
import software.amazon.kinesis.config.KCLAppConfig;
import software.amazon.kinesis.config.ReleaseCanaryStreamingReshardingTestConfig;
import software.amazon.kinesis.application.TestConsumer;
public class ReshardIntegrationTest {
@Test
public void ReleaseCanaryStreamingReshardingTest() throws Exception {
KCLAppConfig consumerConfig = new ReleaseCanaryStreamingReshardingTestConfig();
TestConsumer consumer = new TestConsumer(consumerConfig);
consumer.run();
}
}

View file

@ -2,8 +2,6 @@ package software.amazon.kinesis.utils;
import lombok.Value; import lombok.Value;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.dynamodb.model.ListTablesRequest;
import software.amazon.awssdk.services.dynamodb.model.ListTablesResponse;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest; import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
import software.amazon.awssdk.services.kinesis.model.DeleteStreamRequest; import software.amazon.awssdk.services.kinesis.model.DeleteStreamRequest;
@ -113,5 +111,4 @@ public class StreamExistenceManager extends AWSResourceManager {
} }
} }
} }
} }