Adding resharding integration tests and changing ITs to not run by default (#1152)

* Initial changes for resharding integration tests KCL 2.x and changing integration tests to not run by default
This commit is contained in:
Meher M 2023-08-03 13:16:56 -07:00 committed by GitHub
parent eccd6cf2e7
commit 46cd1179d4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 179 additions and 56 deletions

View file

@ -32,15 +32,17 @@ Please open an issue if you have any questions.
## Building from Source
After you've downloaded the code from GitHub, you can build it using Maven. To disable GPG signing in the build, use
this command: `mvn clean install -Dgpg.skip=true`. Note: This command runs Integration tests, which in turn creates AWS
resources (which requires manual cleanup). Integration tests require valid AWS credentials need to be discovered at
runtime. To skip running integration tests, add ` -DskipITs` option to the build command.
this command: `mvn clean install -Dgpg.skip=true`.
Note: This command does not run integration tests.
## Running Integration Tests
To run integration tests: `mvn -Dit.test=*IntegrationTest verify`.
This will look for a default AWS profile specified in your local `.aws/credentials`.
Optionally, you can provide the name of an IAM user/role to run tests with as a string using this command: `mvn -Dit.test=*IntegrationTest -DawsProfile="<PROFILE_NAME>" verify`.
Note that running integration tests creates AWS resources.
Integration tests require valid AWS credentials.
This will look for a default AWS profile specified in your local `.aws/credentials`.
To run all integration tests: `mvn verify -DskipITs=false`.
To run one integration tests: `mvn -Dit.test=*IntegrationTest -DskipITs=false verify`
Optionally, you can provide the name of an IAM user/role to run tests with as a string using this command: `mvn verify -DskipITs=false -DawsProfile="<PROFILE_NAME>"`.
## Integration with the Kinesis Producer Library
For producer-side developers using the **[Kinesis Producer Library (KPL)][kinesis-guide-kpl]**, the KCL integrates without additional effort. When the KCL retrieves an aggregated Amazon Kinesis record consisting of multiple KPL user records, it will automatically invoke the KPL to extract the individual user records before returning them to the user.

View file

@ -52,6 +52,7 @@
<sqlite4java.libpath>${project.build.directory}/test-lib</sqlite4java.libpath>
<slf4j.version>2.0.7</slf4j.version>
<gsr.version>1.1.14</gsr.version>
<skipITs>true</skipITs>
</properties>
<dependencies>
@ -199,6 +200,7 @@
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.2</version>
<configuration>
<skipITs>${skipITs}</skipITs>
<excludes>
<exclude>**/*IntegrationTest.java</exclude>
</excludes>

View file

@ -1,13 +1,19 @@
package software.amazon.kinesis.utils;
package software.amazon.kinesis.application;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryResponse;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.awssdk.services.kinesis.model.ScalingType;
import software.amazon.awssdk.services.kinesis.model.UpdateShardCountRequest;
import software.amazon.awssdk.services.kinesis.model.UpdateShardCountResponse;
import software.amazon.kinesis.checkpoint.CheckpointConfig;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
@ -19,9 +25,14 @@ import software.amazon.kinesis.lifecycle.LifecycleConfig;
import software.amazon.kinesis.metrics.MetricsConfig;
import software.amazon.kinesis.processor.ProcessorConfig;
import software.amazon.kinesis.retrieval.RetrievalConfig;
import software.amazon.kinesis.utils.LeaseTableManager;
import software.amazon.kinesis.utils.RecordValidationStatus;
import software.amazon.kinesis.utils.ReshardOptions;
import software.amazon.kinesis.utils.StreamExistenceManager;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@ -78,8 +89,10 @@ public class TestConsumer {
try {
startConsumer();
// Sleep for three minutes to allow the producer/consumer to run and then end the test case.
Thread.sleep(TimeUnit.SECONDS.toMillis(60 * 3));
// Sleep to allow the producer/consumer to run and then end the test case.
// If non-reshard sleep 3 minutes, else sleep 4 minutes per scale.
final int sleepMinutes = (consumerConfig.getReshardFactorList() == null) ? 3 : (4 * consumerConfig.getReshardFactorList().size());
Thread.sleep(TimeUnit.MINUTES.toMillis(sleepMinutes));
// Stops sending dummy data.
stopProducer();
@ -115,9 +128,25 @@ public class TestConsumer {
}
private void startProducer() {
// Send dummy data to stream
this.producerExecutor = Executors.newSingleThreadScheduledExecutor();
this.producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 60, 1, TimeUnit.SECONDS);
this.producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 10, 1, TimeUnit.SECONDS);
// Reshard logic if required for the test
if (consumerConfig.getReshardFactorList() != null) {
log.info("----Reshard Config found: {}", consumerConfig.getReshardFactorList());
final StreamScaler s = new StreamScaler(
kinesisClient,
consumerConfig.getStreamName(),
consumerConfig.getReshardFactorList(),
consumerConfig
);
// Schedule the stream scales 4 minutes apart with 2 minute starting delay
for (int i = 0; i < consumerConfig.getReshardFactorList().size(); i++) {
producerExecutor.schedule(s, (4 * i) + 2, TimeUnit.MINUTES);
}
}
}
private void setUpConsumerResources() throws Exception {
@ -128,7 +157,9 @@ public class TestConsumer {
checkpointConfig = configsBuilder.checkpointConfig();
coordinatorConfig = configsBuilder.coordinatorConfig();
leaseManagementConfig = configsBuilder.leaseManagementConfig()
.initialPositionInStream(InitialPositionInStreamExtended.newInitialPosition(consumerConfig.getInitialPosition()))
.initialPositionInStream(
InitialPositionInStreamExtended.newInitialPosition(consumerConfig.getInitialPosition())
)
.initialLeaseTableReadCapacity(50).initialLeaseTableWriteCapacity(50);
lifecycleConfig = configsBuilder.lifecycleConfig();
processorConfig = configsBuilder.processorConfig();
@ -152,6 +183,16 @@ public class TestConsumer {
this.consumerFuture = consumerExecutor.schedule(scheduler, 0, TimeUnit.SECONDS);
}
private void stopProducer() {
log.info("Cancelling producer and shutting down executor.");
if (producerFuture != null) {
producerFuture.cancel(false);
}
if (producerExecutor != null) {
producerExecutor.shutdown();
}
}
public void publishRecord() {
final PutRecordRequest request;
try {
@ -175,7 +216,7 @@ public class TestConsumer {
private ByteBuffer wrapWithCounter(int payloadSize, BigInteger payloadCounter) throws RuntimeException {
final byte[] returnData;
log.info("--------------Putting record with data: {}", payloadCounter);
log.info("---------Putting record with data: {}", payloadCounter);
try {
returnData = mapper.writeValueAsBytes(payloadCounter);
} catch (Exception e) {
@ -184,12 +225,6 @@ public class TestConsumer {
return ByteBuffer.wrap(returnData);
}
private void stopProducer() {
log.info("Cancelling producer and shutting down executor.");
producerFuture.cancel(false);
producerExecutor.shutdown();
}
private void awaitConsumerFinish() throws Exception {
Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
log.info("Waiting up to 20 seconds for shutdown to complete.");
@ -198,7 +233,7 @@ public class TestConsumer {
} catch (InterruptedException e) {
log.info("Interrupted while waiting for graceful shutdown. Continuing.");
} catch (ExecutionException | TimeoutException e) {
throw e;
scheduler.shutdown();
}
log.info("Completed, shutting down now.");
}
@ -209,15 +244,61 @@ public class TestConsumer {
if (errorVal != RecordValidationStatus.NO_ERROR) {
throw new RuntimeException("There was an error validating the records that were processed: " + errorVal.toString());
}
log.info("--------------Completed validation of processed records.--------------");
log.info("---------Completed validation of processed records.---------");
}
private void deleteResources(StreamExistenceManager streamExistenceManager, LeaseTableManager leaseTableManager) throws Exception {
log.info("-------------Start deleting stream.----------------");
log.info("-------------Start deleting stream.---------");
streamExistenceManager.deleteResource(this.streamName);
log.info("-------------Start deleting lease table.----------------");
log.info("---------Start deleting lease table.---------");
leaseTableManager.deleteResource(this.consumerConfig.getStreamName());
log.info("-------------Finished deleting resources.----------------");
log.info("---------Finished deleting resources.---------");
}
@Data
private static class StreamScaler implements Runnable {
private final KinesisAsyncClient client;
private final String streamName;
private final List<ReshardOptions> scalingFactors;
private final KCLAppConfig consumerConfig;
private int scalingFactorIdx = 0;
private DescribeStreamSummaryRequest describeStreamSummaryRequest;
private synchronized void scaleStream() throws InterruptedException, ExecutionException {
final DescribeStreamSummaryResponse response = client.describeStreamSummary(describeStreamSummaryRequest).get();
final int openShardCount = response.streamDescriptionSummary().openShardCount();
final int targetShardCount = scalingFactors.get(scalingFactorIdx).calculateShardCount(openShardCount);
log.info("Scaling stream {} from {} shards to {} shards w/ scaling factor {}",
streamName, openShardCount, targetShardCount, scalingFactors.get(scalingFactorIdx));
final UpdateShardCountRequest updateShardCountRequest = UpdateShardCountRequest.builder()
.streamName(streamName).targetShardCount(targetShardCount).scalingType(ScalingType.UNIFORM_SCALING).build();
final UpdateShardCountResponse shardCountResponse = client.updateShardCount(updateShardCountRequest).get();
log.info("Executed shard scaling request. Response Details : {}", shardCountResponse.toString());
scalingFactorIdx++;
}
@Override
public void run() {
if (scalingFactors.size() == 0 || scalingFactorIdx >= scalingFactors.size()) {
log.info("No scaling factor found in list");
return;
}
log.info("Starting stream scaling with params : {}", this);
if (describeStreamSummaryRequest == null) {
describeStreamSummaryRequest = DescribeStreamSummaryRequest.builder().streamName(streamName).build();
}
try {
scaleStream();
} catch (InterruptedException | ExecutionException e) {
log.error("Caught error while scaling shards for stream", e);
} finally {
log.info("Reshard List State : {}", scalingFactors);
}
}
}
}

View file

@ -1,4 +1,4 @@
package software.amazon.kinesis.utils;
package software.amazon.kinesis.application;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
@ -11,6 +11,7 @@ import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.retrieval.KinesisClientRecord;
import software.amazon.kinesis.utils.RecordValidatorQueue;
import java.nio.ByteBuffer;

View file

@ -1,7 +1,8 @@
package software.amazon.kinesis.utils;
package software.amazon.kinesis.application;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.utils.RecordValidatorQueue;
public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {

View file

@ -5,7 +5,7 @@ import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.utils.RecordValidatorQueue;
import software.amazon.kinesis.utils.ReshardOptions;
import software.amazon.kinesis.utils.TestRecordProcessorFactory;
import software.amazon.kinesis.application.TestRecordProcessorFactory;
import lombok.Builder;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
@ -29,6 +29,7 @@ import java.io.IOException;
import java.net.Inet4Address;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.List;
/**
* Default configuration for a producer or consumer used in integration tests.
@ -75,7 +76,7 @@ public abstract class KCLAppConfig {
.build();
}
public ReshardConfig getReshardConfig() {
public List<ReshardOptions> getReshardFactorList() {
return null;
}
@ -157,34 +158,11 @@ public abstract class KCLAppConfig {
*/
@Value
@Builder
static class ProducerConfig {
public static class ProducerConfig {
private boolean isBatchPut;
private int batchSize;
private int recordSizeKB;
private long callPeriodMills;
}
/**
* Description of the method of resharding for a test case
*/
@Value
@Builder
static class ReshardConfig {
/**
* reshardingFactorCycle: lists the order or reshards that will be done during one reshard cycle
* e.g {SPLIT, MERGE} means that the number of shards will first be doubled, then halved
*/
private ReshardOptions[] reshardingFactorCycle;
/**
* numReshardCycles: the number of resharding cycles that will be executed in a test
*/
private int numReshardCycles;
/**
* reshardFrequencyMillis: the period of time between reshard cycles (in milliseconds)
*/
private long reshardFrequencyMillis;
}
}

View file

@ -0,0 +1,34 @@
package software.amazon.kinesis.config;
import software.amazon.awssdk.http.Protocol;
import software.amazon.kinesis.utils.ReshardOptions;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import static software.amazon.kinesis.utils.ReshardOptions.MERGE;
import static software.amazon.kinesis.utils.ReshardOptions.SPLIT;
public class ReleaseCanaryStreamingReshardingTestConfig extends KCLAppConfig {
private final UUID uniqueId = UUID.randomUUID();
@Override
public String getStreamName() {
return "KCLReleaseCanary2XStreamingReshardingTestStream_" + uniqueId;
}
@Override
public Protocol getKinesisClientProtocol() { return Protocol.HTTP2; }
@Override
public int getShardCount() {
return 100;
}
@Override
public List<ReshardOptions> getReshardFactorList() {
return Arrays.asList(SPLIT, MERGE);
}
}

View file

@ -5,7 +5,7 @@ import software.amazon.kinesis.config.KCLAppConfig;
import software.amazon.kinesis.config.ReleaseCanaryPollingH2TestConfig;
import software.amazon.kinesis.config.ReleaseCanaryPollingH1TestConfig;
import software.amazon.kinesis.config.ReleaseCanaryStreamingTestConfig;
import software.amazon.kinesis.utils.TestConsumer;
import software.amazon.kinesis.application.TestConsumer;
public class BasicStreamConsumerIntegrationTest {

View file

@ -0,0 +1,15 @@
package software.amazon.kinesis.lifecycle;
import org.junit.Test;
import software.amazon.kinesis.config.KCLAppConfig;
import software.amazon.kinesis.config.ReleaseCanaryStreamingReshardingTestConfig;
import software.amazon.kinesis.application.TestConsumer;
public class ReshardIntegrationTest {
@Test
public void kclReleaseCanaryStreamingReshardingTest() throws Exception {
KCLAppConfig consumerConfig = new ReleaseCanaryStreamingReshardingTestConfig();
TestConsumer consumer = new TestConsumer(consumerConfig);
consumer.run();
}
}

View file

@ -6,6 +6,16 @@ package software.amazon.kinesis.utils;
* Merge halves the number of shards.
*/
public enum ReshardOptions {
SPLIT,
MERGE
SPLIT {
public int calculateShardCount(int currentShards) {
return (int) (2.0 * currentShards);
}
},
MERGE {
public int calculateShardCount(int currentShards) {
return (int) (0.5 * currentShards);
}
};
public abstract int calculateShardCount(int currentShards);
}

View file

@ -111,5 +111,4 @@ public class StreamExistenceManager extends AWSResourceManager {
}
}
}
}