Updates based off initial comments

This commit is contained in:
Meher Mankikar 2023-06-28 10:50:57 -07:00
parent a207c3760c
commit dc0fa21b4a
6 changed files with 82 additions and 112 deletions

View file

@ -37,12 +37,12 @@ Note: This command does not run integration tests.
## Running Integration Tests ## Running Integration Tests
Note that running integration tests create AWS resources. Note that running integration tests creates AWS resources.
Integration tests require valid AWS credentials need to be discovered at runtime. Integration tests require valid AWS credentials.
To run all integration tests: `mvn install package -DskipITs=false`.
To run one integration tests: `mvn -Dit.test=*IntegrationTest -DskipITs=false verify`
This will look for a default AWS profile specified in your local `.aws/credentials`. This will look for a default AWS profile specified in your local `.aws/credentials`.
Optionally, you can provide the name of an IAM user/role to run tests with as a string using this command: `mvn install package -DskipITs=false -DawsProfile="<PROFILE_NAME>"`. To run all integration tests: `mvn verify -DskipITs=false`.
To run one integration tests: `mvn -Dit.test=*IntegrationTest -DskipITs=false verify`
Optionally, you can provide the name of an IAM user/role to run tests with as a string using this command: `mvn verify -DskipITs=false -DawsProfile="<PROFILE_NAME>"`.
## Integration with the Kinesis Producer Library ## Integration with the Kinesis Producer Library
For producer-side developers using the **[Kinesis Producer Library (KPL)][kinesis-guide-kpl]**, the KCL integrates without additional effort. When the KCL retrieves an aggregated Amazon Kinesis record consisting of multiple KPL user records, it will automatically invoke the KPL to extract the individual user records before returning them to the user. For producer-side developers using the **[Kinesis Producer Library (KPL)][kinesis-guide-kpl]**, the KCL integrates without additional effort. When the KCL retrieves an aggregated Amazon Kinesis record consisting of multiple KPL user records, it will automatically invoke the KPL to extract the individual user records before returning them to the user.

View file

@ -1,9 +1,7 @@
package software.amazon.kinesis.application; package software.amazon.kinesis.application;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.AllArgsConstructor;
import lombok.Data; import lombok.Data;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.RandomStringUtils;
import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.core.SdkBytes;
@ -34,9 +32,7 @@ import software.amazon.kinesis.utils.StreamExistenceManager;
import java.math.BigInteger; import java.math.BigInteger;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Arrays; import java.util.List;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
@ -59,7 +55,7 @@ public class TestConsumer {
private LifecycleConfig lifecycleConfig; private LifecycleConfig lifecycleConfig;
private ProcessorConfig processorConfig; private ProcessorConfig processorConfig;
private Scheduler scheduler; private Scheduler scheduler;
private ScheduledExecutorService producerExecutor; private static final ScheduledExecutorService PRODUCER_EXECUTOR = Executors.newSingleThreadScheduledExecutor();
private ScheduledFuture<?> producerFuture; private ScheduledFuture<?> producerFuture;
private ScheduledExecutorService consumerExecutor; private ScheduledExecutorService consumerExecutor;
private ScheduledFuture<?> consumerFuture; private ScheduledFuture<?> consumerFuture;
@ -93,12 +89,9 @@ public class TestConsumer {
try { try {
startConsumer(); startConsumer();
// Sleep for three minutes to allow the producer/consumer to run and then end the test case. // Sleep to allow the producer/consumer to run and then end the test case. If non-reshard sleep 3 minutes, else sleep 4 minutes per scale.
if (consumerConfig.getReshardConfig() == null) { final int sleepMinutes = (consumerConfig.getReshardFactorList() == null) ? 3 : (4 * consumerConfig.getReshardFactorList().size());
Thread.sleep(TimeUnit.SECONDS.toMillis(60 * 3)); Thread.sleep(TimeUnit.MINUTES.toMillis(sleepMinutes));
} else {
Thread.sleep(TimeUnit.SECONDS.toMillis(60 * 8));
}
// Stops sending dummy data. // Stops sending dummy data.
stopProducer(); stopProducer();
@ -134,27 +127,18 @@ public class TestConsumer {
} }
private void startProducer() { private void startProducer() {
producerExecutor = Executors.newSingleThreadScheduledExecutor(); producerFuture = PRODUCER_EXECUTOR.scheduleAtFixedRate(this::publishRecord, 10, 1, TimeUnit.SECONDS);
producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 10, 1, TimeUnit.SECONDS);
// Reshard logic if required for the test // Reshard logic if required for the test
if (consumerConfig.getReshardConfig() != null) { if (consumerConfig.getReshardFactorList() != null) {
log.info("------------------------- Reshard Config found -----------------------------"); log.info("----Reshard Config found: {}", consumerConfig.getReshardFactorList());
final Queue<ReshardOptions> reshardQueue = new LinkedList<>(Arrays.asList(consumerConfig.getReshardConfig().getReshardingFactorCycle()));
int totalRotations = reshardQueue.size() * (consumerConfig.getReshardConfig().getNumReshardCycles() - 1);
final StreamScaler s = new StreamScaler(kinesisClient, consumerConfig.getStreamName(), reshardQueue, totalRotations, consumerConfig); final StreamScaler s = new StreamScaler(kinesisClient, consumerConfig.getStreamName(), consumerConfig.getReshardFactorList(), consumerConfig);
Runnable task1 = () -> { // Schedule the stream scales 4 minutes apart with 2 minute starting delay
log.info("----------------------Starting new reshard------------------------------"); for (int i = 0; i < consumerConfig.getReshardFactorList().size(); i++) {
s.run(); PRODUCER_EXECUTOR.schedule(s, (4 * i) + 2, TimeUnit.MINUTES);
}; }
// Split shard
producerExecutor.schedule(task1, 2, TimeUnit.MINUTES);
// Merge shard
producerExecutor.schedule(task1, 6, TimeUnit.MINUTES);
} }
} }
@ -192,8 +176,12 @@ public class TestConsumer {
public void stopProducer() { public void stopProducer() {
log.info("Cancelling producer and shutting down executor."); log.info("Cancelling producer and shutting down executor.");
if (producerFuture != null) {
producerFuture.cancel(false); producerFuture.cancel(false);
producerExecutor.shutdown(); }
if (PRODUCER_EXECUTOR != null) {
PRODUCER_EXECUTOR.shutdown();
}
} }
public void publishRecord() { public void publishRecord() {
@ -219,7 +207,7 @@ public class TestConsumer {
private ByteBuffer wrapWithCounter(int payloadSize, BigInteger payloadCounter) throws RuntimeException { private ByteBuffer wrapWithCounter(int payloadSize, BigInteger payloadCounter) throws RuntimeException {
final byte[] returnData; final byte[] returnData;
log.info("--------------Putting record with data: {}", payloadCounter); log.info("---------Putting record with data: {}", payloadCounter);
try { try {
returnData = mapper.writeValueAsBytes(payloadCounter); returnData = mapper.writeValueAsBytes(payloadCounter);
} catch (Exception e) { } catch (Exception e) {
@ -247,63 +235,60 @@ public class TestConsumer {
if (errorVal != RecordValidationStatus.NO_ERROR) { if (errorVal != RecordValidationStatus.NO_ERROR) {
throw new RuntimeException("There was an error validating the records that were processed: " + errorVal.toString()); throw new RuntimeException("There was an error validating the records that were processed: " + errorVal.toString());
} }
log.info("--------------Completed validation of processed records.--------------"); log.info("---------Completed validation of processed records.---------");
} }
private void deleteResources(StreamExistenceManager streamExistenceManager, LeaseTableManager leaseTableManager) throws Exception { private void deleteResources(StreamExistenceManager streamExistenceManager, LeaseTableManager leaseTableManager) throws Exception {
log.info("-------------Start deleting stream.----------------"); log.info("-------------Start deleting stream.---------");
streamExistenceManager.deleteResource(this.streamName); streamExistenceManager.deleteResource(this.streamName);
log.info("-------------Start deleting lease table.----------------"); log.info("---------Start deleting lease table.---------");
leaseTableManager.deleteResource(this.consumerConfig.getStreamName()); leaseTableManager.deleteResource(this.consumerConfig.getStreamName());
log.info("-------------Finished deleting resources.----------------"); log.info("---------Finished deleting resources.---------");
} }
@Data @Data
@ToString private static class StreamScaler implements Runnable {
@AllArgsConstructor
public static class StreamScaler implements Runnable {
private final KinesisAsyncClient client; private final KinesisAsyncClient client;
private final String streamName; private final String streamName;
private final Queue<ReshardOptions> scalingFactor; private final List<ReshardOptions> scalingFactors;
private int totalRotations; private final KCLAppConfig consumerConfig;
private KCLAppConfig testConfig; private int scalingFactorIdx = 0;
private DescribeStreamSummaryRequest describeStreamSummaryRequest;
@Override private synchronized void scaleStream() throws InterruptedException, ExecutionException {
public void run() {
try {
log.info("----------------------------Starting stream scale----------------------");
if (!scalingFactor.isEmpty()) {
log.info("Starting stream scaling with params : {}", this);
final DescribeStreamSummaryRequest describeStreamSummaryRequest = DescribeStreamSummaryRequest.builder().streamName(streamName).build();
final DescribeStreamSummaryResponse response = client.describeStreamSummary(describeStreamSummaryRequest).get(); final DescribeStreamSummaryResponse response = client.describeStreamSummary(describeStreamSummaryRequest).get();
int openShardCount = response.streamDescriptionSummary().openShardCount(); final int openShardCount = response.streamDescriptionSummary().openShardCount();
int targetShardCount; final int targetShardCount = scalingFactors.get(scalingFactorIdx).calculateShardCount(openShardCount);
if (scalingFactor.peek() == ReshardOptions.SPLIT) {
// Split case: double the number of shards
targetShardCount = (int) (openShardCount * 2.0);
} else {
// Merge case: half the number of shards
targetShardCount = (int) (openShardCount * 0.5);
}
log.info("Scaling stream {} from {} shards to {} shards w/ scaling factor {}", streamName, openShardCount, targetShardCount, scalingFactor.peek());
final UpdateShardCountRequest updateShardCountRequest = UpdateShardCountRequest.builder().streamName(streamName).targetShardCount(targetShardCount).scalingType(ScalingType.UNIFORM_SCALING).build(); log.info("Scaling stream {} from {} shards to {} shards w/ scaling factor {}",
streamName, openShardCount, targetShardCount, scalingFactors.get(scalingFactorIdx));
final UpdateShardCountRequest updateShardCountRequest = UpdateShardCountRequest.builder()
.streamName(streamName).targetShardCount(targetShardCount).scalingType(ScalingType.UNIFORM_SCALING).build();
final UpdateShardCountResponse shardCountResponse = client.updateShardCount(updateShardCountRequest).get(); final UpdateShardCountResponse shardCountResponse = client.updateShardCount(updateShardCountRequest).get();
log.info("Executed shard scaling request. Response Details : {}", shardCountResponse.toString()); log.info("Executed shard scaling request. Response Details : {}", shardCountResponse.toString());
if (--totalRotations >= 0) { scalingFactorIdx++;
scalingFactor.offer(scalingFactor.poll());
} else {
scalingFactor.remove();
} }
} else {
log.info("No scaling factor found in queue"); @Override
public void run() {
if (scalingFactors.size() == 0 || scalingFactorIdx >= scalingFactors.size()) {
log.info("No scaling factor found in list");
return;
} }
} catch (Exception e) { log.info("Starting stream scaling with params : {}", this);
if (describeStreamSummaryRequest == null) {
describeStreamSummaryRequest = DescribeStreamSummaryRequest.builder().streamName(streamName).build();
}
try {
scaleStream();
} catch (InterruptedException | ExecutionException e) {
log.error("Caught error while scaling shards for stream", e); log.error("Caught error while scaling shards for stream", e);
} finally { } finally {
log.info("Reshard Queue State : {}", scalingFactor); log.info("Reshard List State : {}", scalingFactors);
} }
} }
} }

View file

@ -24,12 +24,12 @@ import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory; import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.retrieval.RetrievalConfig; import software.amazon.kinesis.retrieval.RetrievalConfig;
import software.amazon.kinesis.utils.ReshardOptions;
import java.io.IOException; import java.io.IOException;
import java.net.Inet4Address; import java.net.Inet4Address;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.List;
/** /**
* Default configuration for a producer or consumer used in integration tests. * Default configuration for a producer or consumer used in integration tests.
@ -76,7 +76,7 @@ public abstract class KCLAppConfig {
.build(); .build();
} }
public ReshardConfig getReshardConfig() { public List<ReshardOptions> getReshardFactorList() {
return null; return null;
} }
@ -165,27 +165,4 @@ public abstract class KCLAppConfig {
private long callPeriodMills; private long callPeriodMills;
} }
/**
* Description of the method of resharding for a test case
*/
@Value
@Builder
public static class ReshardConfig {
/**
* reshardingFactorCycle: lists the order or reshards that will be done during one reshard cycle
* e.g {SPLIT, MERGE} means that the number of shards will first be doubled, then halved
*/
private ReshardOptions[] reshardingFactorCycle;
/**
* numReshardCycles: the number of resharding cycles that will be executed in a test
*/
private int numReshardCycles;
/**
* reshardFrequencyMillis: the period of time between reshard cycles (in milliseconds)
*/
private long reshardFrequencyMillis;
}
} }

View file

@ -3,6 +3,8 @@ package software.amazon.kinesis.config;
import software.amazon.awssdk.http.Protocol; import software.amazon.awssdk.http.Protocol;
import software.amazon.kinesis.utils.ReshardOptions; import software.amazon.kinesis.utils.ReshardOptions;
import java.util.Arrays;
import java.util.List;
import java.util.UUID; import java.util.UUID;
import static software.amazon.kinesis.utils.ReshardOptions.MERGE; import static software.amazon.kinesis.utils.ReshardOptions.MERGE;
@ -25,12 +27,8 @@ public class ReleaseCanaryStreamingReshardingTestConfig extends KCLAppConfig {
} }
@Override @Override
public ReshardConfig getReshardConfig() { public List<ReshardOptions> getReshardFactorList() {
return ReshardConfig.builder() return Arrays.asList(SPLIT, MERGE);
.reshardFrequencyMillis(3 * 60 * 1000)
.reshardingFactorCycle(new ReshardOptions[]{SPLIT, MERGE})
.numReshardCycles(1)
.build();
} }
} }

View file

@ -7,7 +7,7 @@ import software.amazon.kinesis.application.TestConsumer;
public class ReshardIntegrationTest { public class ReshardIntegrationTest {
@Test @Test
public void ReleaseCanaryStreamingReshardingTest() throws Exception { public void kclReleaseCanaryStreamingReshardingTest() throws Exception {
KCLAppConfig consumerConfig = new ReleaseCanaryStreamingReshardingTestConfig(); KCLAppConfig consumerConfig = new ReleaseCanaryStreamingReshardingTestConfig();
TestConsumer consumer = new TestConsumer(consumerConfig); TestConsumer consumer = new TestConsumer(consumerConfig);
consumer.run(); consumer.run();

View file

@ -6,6 +6,16 @@ package software.amazon.kinesis.utils;
* Merge halves the number of shards. * Merge halves the number of shards.
*/ */
public enum ReshardOptions { public enum ReshardOptions {
SPLIT, SPLIT {
MERGE public int calculateShardCount(int currentShards) {
return (int) (2.0 * currentShards);
}
},
MERGE {
public int calculateShardCount(int currentShards) {
return (int) (0.5 * currentShards);
}
};
public abstract int calculateShardCount(int currentShards);
} }