Add additional integration tests for multistream and cross account
This commit is contained in:
parent
e9990190cc
commit
4b3d42bff0
22 changed files with 896 additions and 150 deletions
|
|
@ -38,14 +38,16 @@ After you've downloaded the code from GitHub, you can build it using Maven. To d
|
||||||
this command: `mvn clean install -Dgpg.skip=true`.
|
this command: `mvn clean install -Dgpg.skip=true`.
|
||||||
Note: This command does not run integration tests.
|
Note: This command does not run integration tests.
|
||||||
|
|
||||||
|
To disable running unit tests in the build, add the property `-Dskip.ut=true`.
|
||||||
|
|
||||||
## Running Integration Tests
|
## Running Integration Tests
|
||||||
|
|
||||||
Note that running integration tests creates AWS resources.
|
Note that running integration tests creates AWS resources.
|
||||||
Integration tests require valid AWS credentials.
|
Integration tests require valid AWS credentials.
|
||||||
This will look for a default AWS profile specified in your local `.aws/credentials`.
|
This will look for a default AWS profile specified in your local `.aws/credentials`.
|
||||||
To run all integration tests: `mvn verify -DskipITs=false`.
|
To run all integration tests: `mvn verify -DskipITs=false`.
|
||||||
To run one integration tests: `mvn -Dit.test=*IntegrationTest -DskipITs=false verify`
|
To run one integration tests, specify the integration test class: `mvn -Dit.test="BasicStreamConsumerIntegrationTest" -DskipITs=false verify`
|
||||||
Optionally, you can provide the name of an IAM user/role to run tests with as a string using this command: `mvn verify -DskipITs=false -DawsProfile="<PROFILE_NAME>"`.
|
Optionally, you can provide the name of an IAM user/role to run tests with as a string using this command: `mvn -DskipITs=false -DawsProfile="<PROFILE_NAME>" verify`.
|
||||||
|
|
||||||
## Integration with the Kinesis Producer Library
|
## Integration with the Kinesis Producer Library
|
||||||
For producer-side developers using the **[Kinesis Producer Library (KPL)][kinesis-guide-kpl]**, the KCL integrates without additional effort. When the KCL retrieves an aggregated Amazon Kinesis record consisting of multiple KPL user records, it will automatically invoke the KPL to extract the individual user records before returning them to the user.
|
For producer-side developers using the **[Kinesis Producer Library (KPL)][kinesis-guide-kpl]**, the KCL integrates without additional effort. When the KCL retrieves an aggregated Amazon Kinesis record consisting of multiple KPL user records, it will automatically invoke the KPL to extract the individual user records before returning them to the user.
|
||||||
|
|
|
||||||
|
|
@ -201,6 +201,7 @@
|
||||||
<artifactId>maven-surefire-plugin</artifactId>
|
<artifactId>maven-surefire-plugin</artifactId>
|
||||||
<version>3.1.2</version>
|
<version>3.1.2</version>
|
||||||
<configuration>
|
<configuration>
|
||||||
|
<skipTests>${skip.ut}</skipTests>
|
||||||
<skipITs>${skipITs}</skipITs>
|
<skipITs>${skipITs}</skipITs>
|
||||||
<excludes>
|
<excludes>
|
||||||
<exclude>**/*IntegrationTest.java</exclude>
|
<exclude>**/*IntegrationTest.java</exclude>
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,7 @@ import lombok.Data;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.apache.commons.lang3.RandomStringUtils;
|
import org.apache.commons.lang3.RandomStringUtils;
|
||||||
import software.amazon.awssdk.core.SdkBytes;
|
import software.amazon.awssdk.core.SdkBytes;
|
||||||
|
import software.amazon.awssdk.arns.Arn;
|
||||||
import software.amazon.awssdk.regions.Region;
|
import software.amazon.awssdk.regions.Region;
|
||||||
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
|
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
|
||||||
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
|
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
|
||||||
|
|
@ -18,6 +19,7 @@ import software.amazon.kinesis.checkpoint.CheckpointConfig;
|
||||||
import software.amazon.kinesis.common.ConfigsBuilder;
|
import software.amazon.kinesis.common.ConfigsBuilder;
|
||||||
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
|
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
|
||||||
import software.amazon.kinesis.config.KCLAppConfig;
|
import software.amazon.kinesis.config.KCLAppConfig;
|
||||||
|
import software.amazon.kinesis.config.RetrievalMode;
|
||||||
import software.amazon.kinesis.coordinator.CoordinatorConfig;
|
import software.amazon.kinesis.coordinator.CoordinatorConfig;
|
||||||
import software.amazon.kinesis.coordinator.Scheduler;
|
import software.amazon.kinesis.coordinator.Scheduler;
|
||||||
import software.amazon.kinesis.leases.LeaseManagementConfig;
|
import software.amazon.kinesis.leases.LeaseManagementConfig;
|
||||||
|
|
@ -33,6 +35,7 @@ import software.amazon.kinesis.utils.StreamExistenceManager;
|
||||||
import java.math.BigInteger;
|
import java.math.BigInteger;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
|
|
@ -45,8 +48,9 @@ import java.util.concurrent.TimeoutException;
|
||||||
public class TestConsumer {
|
public class TestConsumer {
|
||||||
public final KCLAppConfig consumerConfig;
|
public final KCLAppConfig consumerConfig;
|
||||||
public final Region region;
|
public final Region region;
|
||||||
public final String streamName;
|
public final List<String> streamNames;
|
||||||
public final KinesisAsyncClient kinesisClient;
|
public final KinesisAsyncClient kinesisClient;
|
||||||
|
public final KinesisAsyncClient kinesisClientForStreamOwner;
|
||||||
private MetricsConfig metricsConfig;
|
private MetricsConfig metricsConfig;
|
||||||
private RetrievalConfig retrievalConfig;
|
private RetrievalConfig retrievalConfig;
|
||||||
private CheckpointConfig checkpointConfig;
|
private CheckpointConfig checkpointConfig;
|
||||||
|
|
@ -67,13 +71,18 @@ public class TestConsumer {
|
||||||
public TestConsumer(KCLAppConfig consumerConfig) throws Exception {
|
public TestConsumer(KCLAppConfig consumerConfig) throws Exception {
|
||||||
this.consumerConfig = consumerConfig;
|
this.consumerConfig = consumerConfig;
|
||||||
this.region = consumerConfig.getRegion();
|
this.region = consumerConfig.getRegion();
|
||||||
this.streamName = consumerConfig.getStreamName();
|
this.streamNames = consumerConfig.getStreamNames();
|
||||||
this.kinesisClient = consumerConfig.buildAsyncKinesisClient();
|
this.kinesisClientForStreamOwner = consumerConfig.buildAsyncKinesisClientForStreamOwner();
|
||||||
|
this.kinesisClient = consumerConfig.buildAsyncKinesisClientForConsumer();
|
||||||
this.dynamoClient = consumerConfig.buildAsyncDynamoDbClient();
|
this.dynamoClient = consumerConfig.buildAsyncDynamoDbClient();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void run() throws Exception {
|
public void run() throws Exception {
|
||||||
|
|
||||||
|
if (consumerConfig.isCrossAccount()) {
|
||||||
|
verifyCrossAccountCreds();
|
||||||
|
}
|
||||||
|
|
||||||
final StreamExistenceManager streamExistenceManager = new StreamExistenceManager(this.consumerConfig);
|
final StreamExistenceManager streamExistenceManager = new StreamExistenceManager(this.consumerConfig);
|
||||||
final LeaseTableManager leaseTableManager = new LeaseTableManager(this.dynamoClient);
|
final LeaseTableManager leaseTableManager = new LeaseTableManager(this.dynamoClient);
|
||||||
|
|
||||||
|
|
@ -81,10 +90,11 @@ public class TestConsumer {
|
||||||
cleanTestResources(streamExistenceManager, leaseTableManager);
|
cleanTestResources(streamExistenceManager, leaseTableManager);
|
||||||
|
|
||||||
// Check if stream is created. If not, create it
|
// Check if stream is created. If not, create it
|
||||||
streamExistenceManager.checkStreamAndCreateIfNecessary(this.streamName);
|
streamExistenceManager.checkStreamsAndCreateIfNecessary();
|
||||||
|
Map<Arn, Arn> streamToConsumerArnsMap = streamExistenceManager.createCrossAccountConsumerIfNecessary();
|
||||||
|
|
||||||
startProducer();
|
startProducer();
|
||||||
setUpConsumerResources();
|
setUpConsumerResources(streamToConsumerArnsMap);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
startConsumer();
|
startConsumer();
|
||||||
|
|
@ -116,6 +126,13 @@ public class TestConsumer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void verifyCrossAccountCreds() {
|
||||||
|
if (consumerConfig.getCrossAccountCredentialsProvider() == null) {
|
||||||
|
throw new RuntimeException("To run cross account integration tests, pass in an AWS profile with -D" +
|
||||||
|
KCLAppConfig.CROSS_ACCOUNT_PROFILE_PROPERTY);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void cleanTestResources(StreamExistenceManager streamExistenceManager, LeaseTableManager leaseTableManager) throws Exception {
|
private void cleanTestResources(StreamExistenceManager streamExistenceManager, LeaseTableManager leaseTableManager) throws Exception {
|
||||||
log.info("----------Before starting, Cleaning test environment----------");
|
log.info("----------Before starting, Cleaning test environment----------");
|
||||||
log.info("----------Deleting all lease tables in account----------");
|
log.info("----------Deleting all lease tables in account----------");
|
||||||
|
|
@ -135,25 +152,35 @@ public class TestConsumer {
|
||||||
if (consumerConfig.getReshardFactorList() != null) {
|
if (consumerConfig.getReshardFactorList() != null) {
|
||||||
log.info("----Reshard Config found: {}", consumerConfig.getReshardFactorList());
|
log.info("----Reshard Config found: {}", consumerConfig.getReshardFactorList());
|
||||||
|
|
||||||
final StreamScaler s = new StreamScaler(
|
for (String streamName : consumerConfig.getStreamNames()) {
|
||||||
kinesisClient,
|
final StreamScaler streamScaler = new StreamScaler(kinesisClientForStreamOwner, streamName,
|
||||||
consumerConfig.getStreamName(),
|
consumerConfig.getReshardFactorList(), consumerConfig);
|
||||||
consumerConfig.getReshardFactorList(),
|
|
||||||
consumerConfig
|
|
||||||
);
|
|
||||||
|
|
||||||
// Schedule the stream scales 4 minutes apart with 2 minute starting delay
|
// Schedule the stream scales 4 minutes apart with 2 minute starting delay
|
||||||
for (int i = 0; i < consumerConfig.getReshardFactorList().size(); i++) {
|
for (int i = 0; i < consumerConfig.getReshardFactorList()
|
||||||
producerExecutor.schedule(s, (4 * i) + 2, TimeUnit.MINUTES);
|
.size(); i++) {
|
||||||
|
producerExecutor.schedule(streamScaler, (4 * i) + 2, TimeUnit.MINUTES);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void setUpConsumerResources() throws Exception {
|
private void setUpConsumerResources(Map<Arn, Arn> streamToConsumerArnsMap) throws Exception {
|
||||||
// Setup configuration of KCL (including DynamoDB and CloudWatch)
|
// Setup configuration of KCL (including DynamoDB and CloudWatch)
|
||||||
final ConfigsBuilder configsBuilder = consumerConfig.getConfigsBuilder();
|
final ConfigsBuilder configsBuilder = consumerConfig.getConfigsBuilder(streamToConsumerArnsMap);
|
||||||
|
|
||||||
|
// For polling mode in both CAA and non CAA, set retrievalSpecificConfig to use PollingConfig
|
||||||
|
// For SingleStreamMode EFO CAA, must set the retrieval config to specify the consumerArn in FanoutConfig
|
||||||
|
// For MultiStream EFO CAA, the consumerArn can be set in StreamConfig
|
||||||
|
if (consumerConfig.getRetrievalMode().equals(RetrievalMode.POLLING)) {
|
||||||
|
retrievalConfig = consumerConfig.getRetrievalConfig(configsBuilder, null);
|
||||||
|
} else if (consumerConfig.isCrossAccount()) {
|
||||||
|
retrievalConfig = consumerConfig.getRetrievalConfig(configsBuilder,
|
||||||
|
streamToConsumerArnsMap);
|
||||||
|
} else {
|
||||||
|
retrievalConfig = configsBuilder.retrievalConfig();
|
||||||
|
}
|
||||||
|
|
||||||
retrievalConfig = consumerConfig.getRetrievalConfig();
|
|
||||||
checkpointConfig = configsBuilder.checkpointConfig();
|
checkpointConfig = configsBuilder.checkpointConfig();
|
||||||
coordinatorConfig = configsBuilder.coordinatorConfig();
|
coordinatorConfig = configsBuilder.coordinatorConfig();
|
||||||
leaseManagementConfig = configsBuilder.leaseManagementConfig()
|
leaseManagementConfig = configsBuilder.leaseManagementConfig()
|
||||||
|
|
@ -194,25 +221,29 @@ public class TestConsumer {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void publishRecord() {
|
public void publishRecord() {
|
||||||
final PutRecordRequest request;
|
for (String streamName : consumerConfig.getStreamNames()) {
|
||||||
try {
|
try {
|
||||||
request = PutRecordRequest.builder()
|
final PutRecordRequest request = PutRecordRequest.builder()
|
||||||
.partitionKey(RandomStringUtils.randomAlphabetic(5, 20))
|
.partitionKey(RandomStringUtils.randomAlphabetic(5, 20))
|
||||||
.streamName(this.streamName)
|
.streamName(streamName)
|
||||||
.data(SdkBytes.fromByteBuffer(wrapWithCounter(5, payloadCounter))) // 1024 is 1 KB
|
.data(SdkBytes.fromByteBuffer(wrapWithCounter(5, payloadCounter))) // 1024
|
||||||
|
// is 1 KB
|
||||||
.build();
|
.build();
|
||||||
kinesisClient.putRecord(request).get();
|
kinesisClientForStreamOwner.putRecord(request)
|
||||||
|
.get();
|
||||||
|
|
||||||
// Increment the payload counter if the putRecord call was successful
|
// Increment the payload counter if the putRecord call was successful
|
||||||
payloadCounter = payloadCounter.add(new BigInteger("1"));
|
payloadCounter = payloadCounter.add(new BigInteger("1"));
|
||||||
successfulPutRecords += 1;
|
successfulPutRecords += 1;
|
||||||
log.info("---------Record published, successfulPutRecords is now: {}", successfulPutRecords);
|
log.info("---------Record published for stream {}, successfulPutRecords is now: {}",
|
||||||
|
streamName, successfulPutRecords);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
log.info("Interrupted, assuming shutdown. ", e);
|
log.info("Interrupted, assuming shutdown. ", e);
|
||||||
} catch (ExecutionException | RuntimeException e) {
|
} catch (ExecutionException | RuntimeException e) {
|
||||||
log.error("Error during publish records", e);
|
log.error("Error during publish records", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private ByteBuffer wrapWithCounter(int payloadSize, BigInteger payloadCounter) throws RuntimeException {
|
private ByteBuffer wrapWithCounter(int payloadSize, BigInteger payloadCounter) throws RuntimeException {
|
||||||
final byte[] returnData;
|
final byte[] returnData;
|
||||||
|
|
@ -248,10 +279,13 @@ public class TestConsumer {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void deleteResources(StreamExistenceManager streamExistenceManager, LeaseTableManager leaseTableManager) throws Exception {
|
private void deleteResources(StreamExistenceManager streamExistenceManager, LeaseTableManager leaseTableManager) throws Exception {
|
||||||
log.info("-------------Start deleting stream.---------");
|
log.info("-------------Start deleting streams.---------");
|
||||||
streamExistenceManager.deleteResource(this.streamName);
|
for (String streamName : consumerConfig.getStreamNames()) {
|
||||||
|
log.info("Deleting stream {}", streamName);
|
||||||
|
streamExistenceManager.deleteResource(streamName);
|
||||||
|
}
|
||||||
log.info("---------Start deleting lease table.---------");
|
log.info("---------Start deleting lease table.---------");
|
||||||
leaseTableManager.deleteResource(this.consumerConfig.getStreamName());
|
leaseTableManager.deleteResource(consumerConfig.getApplicationName());
|
||||||
log.info("---------Finished deleting resources.---------");
|
log.info("---------Finished deleting resources.---------");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -2,6 +2,7 @@ package software.amazon.kinesis.application;
|
||||||
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.slf4j.MDC;
|
import org.slf4j.MDC;
|
||||||
|
import software.amazon.kinesis.common.StreamIdentifier;
|
||||||
import software.amazon.kinesis.exceptions.InvalidStateException;
|
import software.amazon.kinesis.exceptions.InvalidStateException;
|
||||||
import software.amazon.kinesis.exceptions.ShutdownException;
|
import software.amazon.kinesis.exceptions.ShutdownException;
|
||||||
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
|
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
|
||||||
|
|
@ -23,12 +24,15 @@ public class TestRecordProcessor implements ShardRecordProcessor {
|
||||||
|
|
||||||
private static final String SHARD_ID_MDC_KEY = "ShardId";
|
private static final String SHARD_ID_MDC_KEY = "ShardId";
|
||||||
|
|
||||||
|
private StreamIdentifier streamIdentifier;
|
||||||
|
|
||||||
private String shardId;
|
private String shardId;
|
||||||
|
|
||||||
private final RecordValidatorQueue recordValidator;
|
private final RecordValidatorQueue recordValidator;
|
||||||
|
|
||||||
public TestRecordProcessor(RecordValidatorQueue recordValidator) {
|
public TestRecordProcessor(StreamIdentifier streamIdentifier, RecordValidatorQueue recordValidator) {
|
||||||
this.recordValidator = recordValidator;
|
this.recordValidator = recordValidator;
|
||||||
|
this.streamIdentifier = streamIdentifier;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
@ -51,8 +55,9 @@ public class TestRecordProcessor implements ShardRecordProcessor {
|
||||||
|
|
||||||
for (KinesisClientRecord kinesisRecord : processRecordsInput.records()) {
|
for (KinesisClientRecord kinesisRecord : processRecordsInput.records()) {
|
||||||
final String data = new String(asByteArray(kinesisRecord.data()));
|
final String data = new String(asByteArray(kinesisRecord.data()));
|
||||||
log.info("Processing record pk: {}", data);
|
log.info("Processing record pk for stream {}: {}", streamIdentifier.streamName(), data);
|
||||||
recordValidator.add(shardId, data);
|
String recordValidatorKey = streamIdentifier.toString() + "-" + shardId;
|
||||||
|
recordValidator.add(recordValidatorKey, data);
|
||||||
}
|
}
|
||||||
|
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,6 @@
|
||||||
package software.amazon.kinesis.application;
|
package software.amazon.kinesis.application;
|
||||||
|
|
||||||
|
import software.amazon.kinesis.common.StreamIdentifier;
|
||||||
import software.amazon.kinesis.processor.ShardRecordProcessor;
|
import software.amazon.kinesis.processor.ShardRecordProcessor;
|
||||||
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
|
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
|
||||||
import software.amazon.kinesis.utils.RecordValidatorQueue;
|
import software.amazon.kinesis.utils.RecordValidatorQueue;
|
||||||
|
|
@ -14,7 +15,12 @@ public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ShardRecordProcessor shardRecordProcessor() {
|
public ShardRecordProcessor shardRecordProcessor() {
|
||||||
return new TestRecordProcessor(this.recordValidator);
|
return new TestRecordProcessor(null, this.recordValidator);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ShardRecordProcessor shardRecordProcessor(StreamIdentifier streamIdentifier) {
|
||||||
|
return new TestRecordProcessor(streamIdentifier, this.recordValidator);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,8 +1,18 @@
|
||||||
package software.amazon.kinesis.config;
|
package software.amazon.kinesis.config;
|
||||||
|
|
||||||
import lombok.Value;
|
import lombok.Value;
|
||||||
|
import software.amazon.awssdk.arns.Arn;
|
||||||
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
|
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
|
||||||
|
import software.amazon.kinesis.common.FutureUtils;
|
||||||
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
|
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
|
||||||
|
import software.amazon.kinesis.common.StreamConfig;
|
||||||
|
import software.amazon.kinesis.common.StreamIdentifier;
|
||||||
|
import software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy;
|
||||||
|
import software.amazon.kinesis.processor.MultiStreamTracker;
|
||||||
|
import software.amazon.kinesis.processor.SingleStreamTracker;
|
||||||
|
import software.amazon.kinesis.retrieval.RetrievalConfig;
|
||||||
|
import software.amazon.kinesis.retrieval.fanout.FanOutConfig;
|
||||||
|
import software.amazon.kinesis.retrieval.polling.PollingConfig;
|
||||||
import software.amazon.kinesis.utils.RecordValidatorQueue;
|
import software.amazon.kinesis.utils.RecordValidatorQueue;
|
||||||
import software.amazon.kinesis.utils.ReshardOptions;
|
import software.amazon.kinesis.utils.ReshardOptions;
|
||||||
import software.amazon.kinesis.application.TestRecordProcessorFactory;
|
import software.amazon.kinesis.application.TestRecordProcessorFactory;
|
||||||
|
|
@ -19,48 +29,92 @@ import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
|
||||||
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClientBuilder;
|
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClientBuilder;
|
||||||
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
|
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
|
||||||
import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder;
|
import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder;
|
||||||
|
import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryRequest;
|
||||||
|
import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryResponse;
|
||||||
|
import software.amazon.awssdk.services.sts.StsAsyncClient;
|
||||||
import software.amazon.awssdk.utils.AttributeMap;
|
import software.amazon.awssdk.utils.AttributeMap;
|
||||||
import software.amazon.kinesis.common.ConfigsBuilder;
|
import software.amazon.kinesis.common.ConfigsBuilder;
|
||||||
import software.amazon.kinesis.common.InitialPositionInStream;
|
import software.amazon.kinesis.common.InitialPositionInStream;
|
||||||
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
|
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
|
||||||
import software.amazon.kinesis.retrieval.RetrievalConfig;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.Inet4Address;
|
import java.net.Inet4Address;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.net.UnknownHostException;
|
import java.net.UnknownHostException;
|
||||||
|
import java.time.Duration;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Default configuration for a producer or consumer used in integration tests.
|
* Default configuration for a producer or consumer used in integration tests.
|
||||||
* Producer: puts records of size 60 KB at an interval of 100 ms
|
* Producer: puts records of size 60 KB at an interval of 100 ms
|
||||||
* Consumer: streaming configuration (vs polling) that starts processing records at shard horizon
|
* Consumer: streaming configuration (vs polling) that starts processing records at shard horizon
|
||||||
*/
|
*/
|
||||||
|
@Slf4j
|
||||||
public abstract class KCLAppConfig {
|
public abstract class KCLAppConfig {
|
||||||
|
public static final String AWS_ACCOUNT_PROFILE_PROPERTY = "awsProfile";
|
||||||
|
public static final String CROSS_ACCOUNT_PROFILE_PROPERTY = "awsCrossAccountProfile";
|
||||||
|
public static final String CROSS_ACCOUNT_CONSUMER_NAME = "cross-account-consumer";
|
||||||
|
public static final String INTEGRATION_TEST_RESOURCE_PREFIX = "KCLIntegrationTest";
|
||||||
|
|
||||||
private KinesisAsyncClient kinesisAsyncClient;
|
private String accountIdForConsumer = null;
|
||||||
|
private String accountIdForStreamOwner = null;
|
||||||
|
private List<String> streamNames = null;
|
||||||
|
private KinesisAsyncClient kinesisAsyncClientForConsumer;
|
||||||
|
private StsAsyncClient stsAsyncClientForConsumer;
|
||||||
|
private KinesisAsyncClient kinesisAsyncClientForStreamOwner;
|
||||||
|
private StsAsyncClient stsAsyncClientForStreamOwner;
|
||||||
private DynamoDbAsyncClient dynamoDbAsyncClient;
|
private DynamoDbAsyncClient dynamoDbAsyncClient;
|
||||||
private CloudWatchAsyncClient cloudWatchAsyncClient;
|
private CloudWatchAsyncClient cloudWatchAsyncClient;
|
||||||
private RecordValidatorQueue recordValidator;
|
private RecordValidatorQueue recordValidator;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Name used for test stream and lease tracker table
|
* List of Strings, either stream names or valid stream Arns, to be used in testing. For single stream mode, return
|
||||||
|
* a list of size 1. For multistream mode, return a list of size > 1.
|
||||||
*/
|
*/
|
||||||
public abstract String getStreamName();
|
public abstract List<Arn> getStreamArns();
|
||||||
|
|
||||||
|
public List<String> getStreamNames() {
|
||||||
|
if (this.streamNames == null) {
|
||||||
|
return getStreamArns().stream().map(streamArn ->
|
||||||
|
streamArn.toString().substring(streamArn.toString().indexOf("/") + 1))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
} else {
|
||||||
|
return this.streamNames;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public abstract String getTestName();
|
||||||
|
|
||||||
|
public String getApplicationName() {
|
||||||
|
return INTEGRATION_TEST_RESOURCE_PREFIX + getTestName();
|
||||||
|
}
|
||||||
|
|
||||||
public int getShardCount() { return 4; }
|
public int getShardCount() { return 4; }
|
||||||
|
|
||||||
public Region getRegion() { return Region.US_WEST_2; }
|
public Region getRegion() { return Region.US_WEST_2; }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* "default" profile, should match with profiles listed in "cat ~/.aws/config"
|
* Gets credentials for passed in profile with "-DawsProfile" which should match "~/.aws/config". Otherwise,
|
||||||
|
* uses default profile credentials chain.
|
||||||
*/
|
*/
|
||||||
private AwsCredentialsProvider getCredentialsProvider() {
|
private AwsCredentialsProvider getCredentialsProvider() {
|
||||||
final String awsProfile = System.getProperty("awsProfile");
|
final String awsProfile = System.getProperty(AWS_ACCOUNT_PROFILE_PROPERTY);
|
||||||
return (awsProfile != null) ?
|
return (awsProfile != null) ?
|
||||||
ProfileCredentialsProvider.builder().profileName(awsProfile).build() : DefaultCredentialsProvider.create();
|
ProfileCredentialsProvider.builder().profileName(awsProfile).build() : DefaultCredentialsProvider.create();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isCrossAccount() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
public AwsCredentialsProvider getCrossAccountCredentialsProvider() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
public InitialPositionInStream getInitialPosition() {
|
public InitialPositionInStream getInitialPosition() {
|
||||||
return InitialPositionInStream.TRIM_HORIZON;
|
return InitialPositionInStream.TRIM_HORIZON;
|
||||||
}
|
}
|
||||||
|
|
@ -80,28 +134,106 @@ public abstract class KCLAppConfig {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
public final KinesisAsyncClient buildAsyncKinesisClient() throws URISyntaxException, IOException {
|
public String getAccountIdForConsumer() {
|
||||||
if (kinesisAsyncClient == null) {
|
if (this.accountIdForConsumer == null) {
|
||||||
|
try {
|
||||||
|
this.accountIdForConsumer = FutureUtils.resolveOrCancelFuture(
|
||||||
|
buildStsAsyncClientForConsumer().getCallerIdentity(), Duration.ofSeconds(30)).account();
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
log.error("Error when getting account ID through STS for consumer", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return this.accountIdForConsumer;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getAccountIdForStreamOwner() {
|
||||||
|
if (this.accountIdForStreamOwner == null) {
|
||||||
|
try {
|
||||||
|
this.accountIdForStreamOwner = FutureUtils.resolveOrCancelFuture(
|
||||||
|
buildStsAsyncClientForStreamOwner().getCallerIdentity(), Duration.ofSeconds(30)).account();
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
log.error("Error when getting account ID through STS for consumer", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return this.accountIdForStreamOwner;
|
||||||
|
}
|
||||||
|
|
||||||
|
public final KinesisAsyncClient buildAsyncKinesisClientForConsumer() throws URISyntaxException, IOException {
|
||||||
|
if (this.kinesisAsyncClientForConsumer == null) {
|
||||||
|
this.kinesisAsyncClientForConsumer = buildAsyncKinesisClient(getCredentialsProvider());
|
||||||
|
}
|
||||||
|
return this.kinesisAsyncClientForConsumer;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Builds the kinesis client for the account which owns the Kinesis stream. For cross account, this can be a
|
||||||
|
* different account than the account which gets records from the stream in the KCL.
|
||||||
|
* @return
|
||||||
|
* @throws URISyntaxException
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public final KinesisAsyncClient buildAsyncKinesisClientForStreamOwner() throws URISyntaxException, IOException {
|
||||||
|
if (this.kinesisAsyncClientForStreamOwner == null) {
|
||||||
|
final KinesisAsyncClient client;
|
||||||
|
if (isCrossAccount()) {
|
||||||
|
client = buildAsyncKinesisClient(getCrossAccountCredentialsProvider());
|
||||||
|
} else {
|
||||||
|
client = buildAsyncKinesisClient(getCredentialsProvider());
|
||||||
|
}
|
||||||
|
this.kinesisAsyncClientForStreamOwner = client;
|
||||||
|
}
|
||||||
|
return this.kinesisAsyncClientForStreamOwner;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private KinesisAsyncClient buildAsyncKinesisClient(AwsCredentialsProvider creds) throws URISyntaxException, IOException {
|
||||||
// Setup H2 client config.
|
// Setup H2 client config.
|
||||||
final NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder()
|
final NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder()
|
||||||
.maxConcurrency(Integer.MAX_VALUE);
|
.maxConcurrency(Integer.MAX_VALUE)
|
||||||
|
.protocol(getKinesisClientProtocol());
|
||||||
builder.protocol(getKinesisClientProtocol());
|
|
||||||
|
|
||||||
final SdkAsyncHttpClient sdkAsyncHttpClient =
|
final SdkAsyncHttpClient sdkAsyncHttpClient =
|
||||||
builder.buildWithDefaults(AttributeMap.builder().build());
|
builder.buildWithDefaults(AttributeMap.builder().build());
|
||||||
|
|
||||||
// Setup client builder by default values
|
// Setup client builder by default values
|
||||||
final KinesisAsyncClientBuilder kinesisAsyncClientBuilder = KinesisAsyncClient.builder().region(getRegion());
|
final KinesisAsyncClientBuilder kinesisAsyncClientBuilder = KinesisAsyncClient.builder().region(getRegion());
|
||||||
|
|
||||||
kinesisAsyncClientBuilder.httpClient(sdkAsyncHttpClient);
|
kinesisAsyncClientBuilder.httpClient(sdkAsyncHttpClient);
|
||||||
|
kinesisAsyncClientBuilder.credentialsProvider(creds);
|
||||||
|
|
||||||
kinesisAsyncClientBuilder.credentialsProvider(getCredentialsProvider());
|
return kinesisAsyncClientBuilder.build();
|
||||||
|
|
||||||
this.kinesisAsyncClient = kinesisAsyncClientBuilder.build();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return this.kinesisAsyncClient;
|
private StsAsyncClient buildStsAsyncClientForConsumer() {
|
||||||
|
if (this.stsAsyncClientForConsumer == null) {
|
||||||
|
this.stsAsyncClientForConsumer = StsAsyncClient.builder()
|
||||||
|
.credentialsProvider(getCredentialsProvider())
|
||||||
|
.region(getRegion())
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
return this.stsAsyncClientForConsumer;
|
||||||
|
}
|
||||||
|
|
||||||
|
private StsAsyncClient buildStsAsyncClientForStreamOwner() {
|
||||||
|
if (this.stsAsyncClientForStreamOwner == null) {
|
||||||
|
final StsAsyncClient client;
|
||||||
|
if (isCrossAccount()) {
|
||||||
|
client = buildStsAsyncClient(getCrossAccountCredentialsProvider());
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
client = buildStsAsyncClient(getCredentialsProvider());
|
||||||
|
}
|
||||||
|
this.stsAsyncClientForStreamOwner = client;
|
||||||
|
}
|
||||||
|
return this.stsAsyncClientForStreamOwner;
|
||||||
|
}
|
||||||
|
|
||||||
|
private StsAsyncClient buildStsAsyncClient(AwsCredentialsProvider creds) {
|
||||||
|
return StsAsyncClient.builder()
|
||||||
|
.credentialsProvider(creds)
|
||||||
|
.region(getRegion())
|
||||||
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
public final DynamoDbAsyncClient buildAsyncDynamoDbClient() throws IOException {
|
public final DynamoDbAsyncClient buildAsyncDynamoDbClient() throws IOException {
|
||||||
|
|
@ -137,22 +269,90 @@ public abstract class KCLAppConfig {
|
||||||
return new TestRecordProcessorFactory(getRecordValidator());
|
return new TestRecordProcessorFactory(getRecordValidator());
|
||||||
}
|
}
|
||||||
|
|
||||||
public final ConfigsBuilder getConfigsBuilder() throws IOException, URISyntaxException {
|
public final ConfigsBuilder getConfigsBuilder(Map<Arn, Arn> streamToConsumerArnsMap)
|
||||||
|
throws IOException, URISyntaxException {
|
||||||
final String workerId = getWorkerId();
|
final String workerId = getWorkerId();
|
||||||
return new ConfigsBuilder(getStreamName(), getStreamName(), buildAsyncKinesisClient(), buildAsyncDynamoDbClient(),
|
if (getStreamArns().size() == 1) {
|
||||||
buildAsyncCloudWatchClient(), workerId, getShardRecordProcessorFactory());
|
final SingleStreamTracker singleStreamTracker = new SingleStreamTracker(
|
||||||
|
StreamIdentifier.singleStreamInstance(getStreamArns().get(0)),
|
||||||
|
buildStreamConfigList(streamToConsumerArnsMap).get(0));
|
||||||
|
return new ConfigsBuilder(singleStreamTracker, getApplicationName(),
|
||||||
|
buildAsyncKinesisClientForConsumer(), buildAsyncDynamoDbClient(), buildAsyncCloudWatchClient(), workerId,
|
||||||
|
getShardRecordProcessorFactory());
|
||||||
|
} else {
|
||||||
|
final MultiStreamTracker multiStreamTracker = new MultiStreamTracker() {
|
||||||
|
@Override
|
||||||
|
public List<StreamConfig> streamConfigList() {
|
||||||
|
return buildStreamConfigList(streamToConsumerArnsMap);
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
public FormerStreamsLeasesDeletionStrategy formerStreamsLeasesDeletionStrategy() {
|
||||||
|
return new FormerStreamsLeasesDeletionStrategy.NoLeaseDeletionStrategy();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
return new ConfigsBuilder(multiStreamTracker, getApplicationName(),
|
||||||
|
buildAsyncKinesisClientForConsumer(), buildAsyncDynamoDbClient(), buildAsyncCloudWatchClient(), workerId,
|
||||||
|
getShardRecordProcessorFactory());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public RetrievalConfig getRetrievalConfig() throws IOException, URISyntaxException {
|
private List<StreamConfig> buildStreamConfigList(Map<Arn, Arn> streamToConsumerArnsMap) {
|
||||||
final InitialPositionInStreamExtended initialPosition = InitialPositionInStreamExtended
|
return getStreamArns().stream().map(streamArn-> {
|
||||||
.newInitialPosition(getInitialPosition());
|
final StreamIdentifier streamIdentifier;
|
||||||
|
if (getStreamArns().size() == 1) {
|
||||||
|
streamIdentifier = StreamIdentifier.singleStreamInstance(streamArn);
|
||||||
|
} else { //is multi-stream
|
||||||
|
streamIdentifier = StreamIdentifier.multiStreamInstance(streamArn, getCreationEpoch(streamArn));
|
||||||
|
}
|
||||||
|
|
||||||
// Default is a streaming consumer
|
if (streamToConsumerArnsMap != null) {
|
||||||
final RetrievalConfig config = getConfigsBuilder().retrievalConfig();
|
final StreamConfig streamConfig = new StreamConfig(streamIdentifier,
|
||||||
config.initialPositionInStreamExtended(initialPosition);
|
InitialPositionInStreamExtended.newInitialPosition(getInitialPosition()));
|
||||||
|
return streamConfig.consumerArn(streamToConsumerArnsMap.get(streamArn).toString());
|
||||||
|
} else {
|
||||||
|
return new StreamConfig(streamIdentifier, InitialPositionInStreamExtended.newInitialPosition(getInitialPosition()));
|
||||||
|
}
|
||||||
|
}).collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
|
||||||
|
private long getCreationEpoch(Arn streamArn) {
|
||||||
|
final DescribeStreamSummaryRequest request = DescribeStreamSummaryRequest.builder()
|
||||||
|
.streamARN(streamArn.toString())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
DescribeStreamSummaryResponse response = null;
|
||||||
|
try {
|
||||||
|
response = FutureUtils.resolveOrCancelFuture(
|
||||||
|
buildAsyncKinesisClientForStreamOwner().describeStreamSummary(request), Duration.ofSeconds(60));
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("Exception when calling DescribeStreamSummary", e);
|
||||||
|
}
|
||||||
|
return response.streamDescriptionSummary().streamCreationTimestamp().toEpochMilli();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public abstract RetrievalMode getRetrievalMode();
|
||||||
|
|
||||||
|
public RetrievalConfig getRetrievalConfig(ConfigsBuilder configsBuilder, Map<Arn, Arn> streamToConsumerArnsMap) {
|
||||||
|
final RetrievalConfig config = configsBuilder.retrievalConfig();
|
||||||
|
if (getRetrievalMode() == RetrievalMode.POLLING) {
|
||||||
|
config.retrievalSpecificConfig(new PollingConfig(config.kinesisClient()));
|
||||||
|
} else {
|
||||||
|
if (getStreamArns().size() == 1) {
|
||||||
|
final Arn consumerArn = streamToConsumerArnsMap.get(getStreamArns().get(0));
|
||||||
|
config.retrievalSpecificConfig(new FanOutConfig(config.kinesisClient()).consumerArn(consumerArn.toString()));
|
||||||
|
}
|
||||||
|
// For CAA multi-stream EFO, consumerArn is specified in StreamConfig
|
||||||
|
}
|
||||||
return config;
|
return config;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Arn buildStreamArn(String streamName) {
|
||||||
|
final String partition = getRegion().metadata().partition().id();
|
||||||
|
return Arn.fromString(String.join(":", "arn", partition, "kinesis", getRegion().id(),
|
||||||
|
getAccountIdForStreamOwner(), "stream") + "/" + INTEGRATION_TEST_RESOURCE_PREFIX + streamName);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Configure ingress load (batch size, record size, and calling interval)
|
* Configure ingress load (batch size, record size, and calling interval)
|
||||||
*/
|
*/
|
||||||
|
|
|
||||||
|
|
@ -1,12 +1,10 @@
|
||||||
package software.amazon.kinesis.config;
|
package software.amazon.kinesis.config;
|
||||||
|
|
||||||
|
import software.amazon.awssdk.arns.Arn;
|
||||||
import software.amazon.awssdk.http.Protocol;
|
import software.amazon.awssdk.http.Protocol;
|
||||||
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
|
|
||||||
import software.amazon.kinesis.retrieval.RetrievalConfig;
|
|
||||||
import software.amazon.kinesis.retrieval.polling.PollingConfig;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.util.Collections;
|
||||||
import java.net.URISyntaxException;
|
import java.util.List;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -16,9 +14,17 @@ public class ReleaseCanaryPollingH1TestConfig extends KCLAppConfig {
|
||||||
|
|
||||||
private final UUID uniqueId = UUID.randomUUID();
|
private final UUID uniqueId = UUID.randomUUID();
|
||||||
|
|
||||||
|
private final String applicationName = "PollingH1Test";
|
||||||
|
private final String streamName = "2XPollingH1TestStream_" + uniqueId;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getStreamName() {
|
public String getTestName() {
|
||||||
return "KCLReleaseCanary2XPollingH1TestStream_" + uniqueId;
|
return applicationName;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<Arn> getStreamArns() {
|
||||||
|
return Collections.singletonList(buildStreamArn(streamName));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
@ -27,15 +33,7 @@ public class ReleaseCanaryPollingH1TestConfig extends KCLAppConfig {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public RetrievalConfig getRetrievalConfig() throws IOException, URISyntaxException {
|
public RetrievalMode getRetrievalMode() {
|
||||||
|
return RetrievalMode.POLLING;
|
||||||
final InitialPositionInStreamExtended initialPosition = InitialPositionInStreamExtended
|
|
||||||
.newInitialPosition(getInitialPosition());
|
|
||||||
|
|
||||||
final RetrievalConfig config = getConfigsBuilder().retrievalConfig();
|
|
||||||
config.initialPositionInStreamExtended(initialPosition);
|
|
||||||
config.retrievalSpecificConfig(new PollingConfig(getStreamName(), config.kinesisClient()));
|
|
||||||
|
|
||||||
return config;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,12 +1,10 @@
|
||||||
package software.amazon.kinesis.config;
|
package software.amazon.kinesis.config;
|
||||||
|
|
||||||
|
import software.amazon.awssdk.arns.Arn;
|
||||||
import software.amazon.awssdk.http.Protocol;
|
import software.amazon.awssdk.http.Protocol;
|
||||||
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
|
|
||||||
import software.amazon.kinesis.retrieval.RetrievalConfig;
|
|
||||||
import software.amazon.kinesis.retrieval.polling.PollingConfig;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.util.Collections;
|
||||||
import java.net.URISyntaxException;
|
import java.util.List;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -15,9 +13,17 @@ import java.util.UUID;
|
||||||
public class ReleaseCanaryPollingH2TestConfig extends KCLAppConfig {
|
public class ReleaseCanaryPollingH2TestConfig extends KCLAppConfig {
|
||||||
private final UUID uniqueId = UUID.randomUUID();
|
private final UUID uniqueId = UUID.randomUUID();
|
||||||
|
|
||||||
|
private final String applicationName = "PollingH2Test";
|
||||||
|
private final String streamName = "2XPollingH2TestStream_" + uniqueId;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getStreamName() {
|
public String getTestName() {
|
||||||
return "KCLReleaseCanary2XPollingH2TestStream_" + uniqueId;
|
return applicationName;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<Arn> getStreamArns() {
|
||||||
|
return Collections.singletonList(buildStreamArn(streamName));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
@ -26,16 +32,7 @@ public class ReleaseCanaryPollingH2TestConfig extends KCLAppConfig {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public RetrievalConfig getRetrievalConfig() throws IOException, URISyntaxException {
|
public RetrievalMode getRetrievalMode() {
|
||||||
|
return RetrievalMode.POLLING;
|
||||||
final InitialPositionInStreamExtended initialPosition = InitialPositionInStreamExtended
|
|
||||||
.newInitialPosition(getInitialPosition());
|
|
||||||
|
|
||||||
final RetrievalConfig config = getConfigsBuilder().retrievalConfig();
|
|
||||||
config.initialPositionInStreamExtended(initialPosition);
|
|
||||||
config.retrievalSpecificConfig(new PollingConfig(getStreamName(), config.kinesisClient()));
|
|
||||||
|
|
||||||
return config;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,9 +1,11 @@
|
||||||
package software.amazon.kinesis.config;
|
package software.amazon.kinesis.config;
|
||||||
|
|
||||||
|
import software.amazon.awssdk.arns.Arn;
|
||||||
import software.amazon.awssdk.http.Protocol;
|
import software.amazon.awssdk.http.Protocol;
|
||||||
import software.amazon.kinesis.utils.ReshardOptions;
|
import software.amazon.kinesis.utils.ReshardOptions;
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
|
||||||
|
|
@ -13,22 +15,35 @@ import static software.amazon.kinesis.utils.ReshardOptions.SPLIT;
|
||||||
public class ReleaseCanaryStreamingReshardingTestConfig extends KCLAppConfig {
|
public class ReleaseCanaryStreamingReshardingTestConfig extends KCLAppConfig {
|
||||||
|
|
||||||
private final UUID uniqueId = UUID.randomUUID();
|
private final UUID uniqueId = UUID.randomUUID();
|
||||||
|
|
||||||
|
private final String applicationName = "StreamingReshardingTest";
|
||||||
|
private final String streamName ="2XStreamingReshardingTestStream_" + uniqueId;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getStreamName() {
|
public String getTestName() {
|
||||||
return "KCLReleaseCanary2XStreamingReshardingTestStream_" + uniqueId;
|
return applicationName;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<Arn> getStreamArns() {
|
||||||
|
return Collections.singletonList(buildStreamArn(streamName));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Protocol getKinesisClientProtocol() { return Protocol.HTTP2; }
|
public Protocol getKinesisClientProtocol() { return Protocol.HTTP2; }
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RetrievalMode getRetrievalMode() {
|
||||||
|
return RetrievalMode.STREAMING;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getShardCount() {
|
public int getShardCount() {
|
||||||
return 100;
|
return 20;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<ReshardOptions> getReshardFactorList() {
|
public List<ReshardOptions> getReshardFactorList() {
|
||||||
return Arrays.asList(SPLIT, MERGE);
|
return Arrays.asList(SPLIT, MERGE);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,10 @@
|
||||||
package software.amazon.kinesis.config;
|
package software.amazon.kinesis.config;
|
||||||
|
|
||||||
|
import software.amazon.awssdk.arns.Arn;
|
||||||
import software.amazon.awssdk.http.Protocol;
|
import software.amazon.awssdk.http.Protocol;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -10,9 +13,17 @@ import java.util.UUID;
|
||||||
public class ReleaseCanaryStreamingTestConfig extends KCLAppConfig {
|
public class ReleaseCanaryStreamingTestConfig extends KCLAppConfig {
|
||||||
private final UUID uniqueId = UUID.randomUUID();
|
private final UUID uniqueId = UUID.randomUUID();
|
||||||
|
|
||||||
|
private final String applicationName = "StreamingTest";
|
||||||
|
private final String streamName ="2XStreamingTestStream_" + uniqueId;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getStreamName() {
|
public String getTestName() {
|
||||||
return "KCLReleaseCanary2XStreamingTestStream_" + uniqueId;
|
return applicationName;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<Arn> getStreamArns() {
|
||||||
|
return Collections.singletonList(buildStreamArn(streamName));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
@ -20,5 +31,8 @@ public class ReleaseCanaryStreamingTestConfig extends KCLAppConfig {
|
||||||
return Protocol.HTTP2;
|
return Protocol.HTTP2;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RetrievalMode getRetrievalMode() {
|
||||||
|
return RetrievalMode.STREAMING;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,6 @@
|
||||||
|
package software.amazon.kinesis.config;
|
||||||
|
|
||||||
|
public enum RetrievalMode {
|
||||||
|
POLLING,
|
||||||
|
STREAMING
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,23 @@
|
||||||
|
package software.amazon.kinesis.config.crossaccount;
|
||||||
|
|
||||||
|
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
|
||||||
|
import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
|
||||||
|
import software.amazon.kinesis.config.KCLAppConfig;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Config class to configure cross account integration tests.
|
||||||
|
*/
|
||||||
|
public abstract class KCLCrossAccountAppConfig extends KCLAppConfig {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isCrossAccount() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public AwsCredentialsProvider getCrossAccountCredentialsProvider() {
|
||||||
|
final String awsCrossAccountProfile = System.getProperty(KCLAppConfig.CROSS_ACCOUNT_PROFILE_PROPERTY);
|
||||||
|
return (awsCrossAccountProfile != null) ?
|
||||||
|
ProfileCredentialsProvider.builder().profileName(awsCrossAccountProfile).build() : null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,48 @@
|
||||||
|
package software.amazon.kinesis.config.crossaccount;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
import software.amazon.awssdk.arns.Arn;
|
||||||
|
import software.amazon.awssdk.http.Protocol;
|
||||||
|
import software.amazon.kinesis.config.RetrievalMode;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Config for a cross account polling consumer with HTTP protocol of HTTP2
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
public class ReleaseCanaryCrossAccountMultiStreamPollingH2TestConfig extends KCLCrossAccountAppConfig {
|
||||||
|
private final UUID uniqueId = UUID.randomUUID();
|
||||||
|
|
||||||
|
private final int numStreams = 2;
|
||||||
|
private final String applicationName = "CrossAccountMultiStreamPollingH2Test";
|
||||||
|
|
||||||
|
private final String streamName = "2XCrossAccountPollingH2TestStream";
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getTestName() {
|
||||||
|
return applicationName;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<Arn> getStreamArns() {
|
||||||
|
ArrayList<Arn> streamArns = new ArrayList<>(numStreams);
|
||||||
|
for (int i = 1; i <= numStreams; i++) {
|
||||||
|
streamArns.add(buildStreamArn(String.join("_", streamName, Integer.toString(i), uniqueId.toString())));
|
||||||
|
}
|
||||||
|
return streamArns;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Protocol getKinesisClientProtocol() {
|
||||||
|
return Protocol.HTTP2;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RetrievalMode getRetrievalMode() {
|
||||||
|
return RetrievalMode.POLLING;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,48 @@
|
||||||
|
package software.amazon.kinesis.config.crossaccount;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
import software.amazon.awssdk.arns.Arn;
|
||||||
|
import software.amazon.awssdk.http.Protocol;
|
||||||
|
import software.amazon.kinesis.config.RetrievalMode;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Config for a cross account polling consumer with HTTP protocol of HTTP2
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
public class ReleaseCanaryCrossAccountMultiStreamStreamingTestConfig extends KCLCrossAccountAppConfig {
|
||||||
|
private final UUID uniqueId = UUID.randomUUID();
|
||||||
|
|
||||||
|
private final int numStreams = 2;
|
||||||
|
private final String applicationName = "CrossAccountMultiStreamStreamingTest";
|
||||||
|
|
||||||
|
private final String streamName = "2XCrossAccountStreamingTestStream";
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getTestName() {
|
||||||
|
return applicationName;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<Arn> getStreamArns() {
|
||||||
|
ArrayList<Arn> streamArns = new ArrayList<>(numStreams);
|
||||||
|
for (int i = 1; i <= numStreams; i++) {
|
||||||
|
streamArns.add(buildStreamArn(String.join("_", streamName, Integer.toString(i), uniqueId.toString())));
|
||||||
|
}
|
||||||
|
return streamArns;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Protocol getKinesisClientProtocol() {
|
||||||
|
return Protocol.HTTP2;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RetrievalMode getRetrievalMode() {
|
||||||
|
return RetrievalMode.STREAMING;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,43 @@
|
||||||
|
package software.amazon.kinesis.config.crossaccount;
|
||||||
|
|
||||||
|
import software.amazon.awssdk.arns.Arn;
|
||||||
|
import software.amazon.awssdk.http.Protocol;
|
||||||
|
import software.amazon.kinesis.config.RetrievalMode;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Config for a cross account polling consumer with HTTP protocol of HTTP2
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
public class ReleaseCanaryCrossAccountPollingH2TestConfig extends KCLCrossAccountAppConfig {
|
||||||
|
private final UUID uniqueId = UUID.randomUUID();
|
||||||
|
|
||||||
|
private final String applicationName = "CrossAccountPollingH2Test";
|
||||||
|
|
||||||
|
private final String streamName = "2XCrossAccountPollingH2TestStream_" + uniqueId;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getTestName() {
|
||||||
|
return applicationName;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<Arn> getStreamArns() {
|
||||||
|
return Collections.singletonList(buildStreamArn(streamName));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Protocol getKinesisClientProtocol() {
|
||||||
|
return Protocol.HTTP2;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RetrievalMode getRetrievalMode() {
|
||||||
|
return RetrievalMode.POLLING;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,42 @@
|
||||||
|
package software.amazon.kinesis.config.crossaccount;
|
||||||
|
|
||||||
|
import software.amazon.awssdk.arns.Arn;
|
||||||
|
import software.amazon.awssdk.http.Protocol;
|
||||||
|
import software.amazon.kinesis.config.RetrievalMode;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Config for a streaming consumer with HTTP protocol of HTTP2
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
public class ReleaseCanaryCrossAccountStreamingTestConfig extends KCLCrossAccountAppConfig {
|
||||||
|
private final UUID uniqueId = UUID.randomUUID();
|
||||||
|
|
||||||
|
private final String applicationName = "CrossAccountStreamingTest";
|
||||||
|
private final String streamName = "2XCrossAccountStreamingTestStream_" + uniqueId;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getTestName() {
|
||||||
|
return applicationName;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<Arn> getStreamArns() {
|
||||||
|
return Collections.singletonList(buildStreamArn(streamName));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Protocol getKinesisClientProtocol() {
|
||||||
|
return Protocol.HTTP2;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RetrievalMode getRetrievalMode() {
|
||||||
|
return RetrievalMode.STREAMING;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,43 @@
|
||||||
|
package software.amazon.kinesis.config;
|
||||||
|
|
||||||
|
import software.amazon.awssdk.arns.Arn;
|
||||||
|
import software.amazon.awssdk.http.Protocol;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Config for a polling consumer with HTTP protocol of HTTP2
|
||||||
|
*/
|
||||||
|
public class ReleaseCanaryMultiStreamPollingH2TestConfig extends KCLAppConfig {
|
||||||
|
private final UUID uniqueId = UUID.randomUUID();
|
||||||
|
|
||||||
|
private final int numStreams = 2;
|
||||||
|
private final String applicationName = "MultiStreamPollingH2Test";
|
||||||
|
private final String streamName = "2XMultiStreamPollingH2TestStream";
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getTestName() {
|
||||||
|
return applicationName;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<Arn> getStreamArns() {
|
||||||
|
ArrayList<Arn> streamArns = new ArrayList<>(numStreams);
|
||||||
|
for (Integer i = 1; i <= numStreams; i++) {
|
||||||
|
streamArns.add(buildStreamArn(String.join("_", streamName, i.toString(), uniqueId.toString())));
|
||||||
|
}
|
||||||
|
return streamArns;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Protocol getKinesisClientProtocol() {
|
||||||
|
return Protocol.HTTP2;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RetrievalMode getRetrievalMode() {
|
||||||
|
return RetrievalMode.POLLING;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,41 @@
|
||||||
|
package software.amazon.kinesis.config.multistream;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
import software.amazon.awssdk.arns.Arn;
|
||||||
|
import software.amazon.awssdk.http.Protocol;
|
||||||
|
import software.amazon.kinesis.config.KCLAppConfig;
|
||||||
|
import software.amazon.kinesis.config.RetrievalMode;
|
||||||
|
|
||||||
|
public class ReleaseCanaryMultiStreamStreamingTestConfig extends KCLAppConfig {
|
||||||
|
private final UUID uniqueId = UUID.randomUUID();
|
||||||
|
private final int numStreams = 2;
|
||||||
|
private final String applicationName = "MultiStreamStreamingTest";
|
||||||
|
private final String streamName = "2XMultiStreamStreamingTestStream";
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getTestName() {
|
||||||
|
return applicationName;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<Arn> getStreamArns() {
|
||||||
|
ArrayList<Arn> streamArns = new ArrayList<>(numStreams);
|
||||||
|
for (int i = 1; i <= numStreams; i++) {
|
||||||
|
streamArns.add(buildStreamArn(String.join("_", streamName, Integer.toString(i), uniqueId.toString())));
|
||||||
|
}
|
||||||
|
return streamArns;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Protocol getKinesisClientProtocol() {
|
||||||
|
return Protocol.HTTP2;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RetrievalMode getRetrievalMode() {
|
||||||
|
return RetrievalMode.STREAMING;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,47 @@
|
||||||
|
package software.amazon.kinesis.lifecycle;
|
||||||
|
|
||||||
|
import software.amazon.kinesis.application.TestConsumer;
|
||||||
|
import software.amazon.kinesis.config.KCLAppConfig;
|
||||||
|
import software.amazon.kinesis.config.crossaccount.ReleaseCanaryCrossAccountMultiStreamPollingH2TestConfig;
|
||||||
|
import software.amazon.kinesis.config.crossaccount.ReleaseCanaryCrossAccountMultiStreamStreamingTestConfig;
|
||||||
|
import software.amazon.kinesis.config.crossaccount.ReleaseCanaryCrossAccountPollingH2TestConfig;
|
||||||
|
import software.amazon.kinesis.config.crossaccount.ReleaseCanaryCrossAccountStreamingTestConfig;
|
||||||
|
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class CrossAccountStreamConsumerIntegrationTest {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test with a cross account polling consumer using HTTP2 protocol.
|
||||||
|
* In the polling case, consumer makes calls to the producer each time to request records to process.
|
||||||
|
* The stream is in a different account than the kinesis client used to get records.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void kclReleaseCanaryCrossAccountPollingH2Test() throws Exception {
|
||||||
|
KCLAppConfig consumerConfig = new ReleaseCanaryCrossAccountPollingH2TestConfig();
|
||||||
|
TestConsumer consumer = new TestConsumer(consumerConfig);
|
||||||
|
consumer.run();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void kclReleaseCanaryCrossAccountStreamingTest() throws Exception {
|
||||||
|
KCLAppConfig consumerConfig = new ReleaseCanaryCrossAccountStreamingTestConfig();
|
||||||
|
TestConsumer consumer = new TestConsumer(consumerConfig);
|
||||||
|
consumer.run();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void kclReleaseCanaryCrossAccountMultiStreamStreamingTest() throws Exception {
|
||||||
|
KCLAppConfig consumerConfig = new ReleaseCanaryCrossAccountMultiStreamStreamingTestConfig();
|
||||||
|
TestConsumer consumer = new TestConsumer(consumerConfig);
|
||||||
|
consumer.run();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void kclReleaseCanaryCrossAccountMultiStreamPollingH2Test() throws Exception {
|
||||||
|
KCLAppConfig consumerConfig = new ReleaseCanaryCrossAccountMultiStreamPollingH2TestConfig();
|
||||||
|
TestConsumer consumer = new TestConsumer(consumerConfig);
|
||||||
|
consumer.run();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,24 @@
|
||||||
|
package software.amazon.kinesis.lifecycle;
|
||||||
|
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import software.amazon.kinesis.application.TestConsumer;
|
||||||
|
import software.amazon.kinesis.config.KCLAppConfig;
|
||||||
|
import software.amazon.kinesis.config.ReleaseCanaryMultiStreamPollingH2TestConfig;
|
||||||
|
import software.amazon.kinesis.config.multistream.ReleaseCanaryMultiStreamStreamingTestConfig;
|
||||||
|
|
||||||
|
public class MultiStreamConsumerIntegrationTest {
|
||||||
|
@Test
|
||||||
|
public void kclReleaseCanaryMultiStreamPollingTest() throws Exception {
|
||||||
|
KCLAppConfig consumerConfig = new ReleaseCanaryMultiStreamPollingH2TestConfig();
|
||||||
|
TestConsumer consumer = new TestConsumer(consumerConfig);
|
||||||
|
consumer.run();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void kclReleaseCanaryMultiStreamStreamingTest() throws Exception {
|
||||||
|
KCLAppConfig consumerConfig = new ReleaseCanaryMultiStreamStreamingTestConfig();
|
||||||
|
TestConsumer consumer = new TestConsumer(consumerConfig);
|
||||||
|
consumer.run();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -1,5 +1,7 @@
|
||||||
package software.amazon.kinesis.utils;
|
package software.amazon.kinesis.utils;
|
||||||
|
|
||||||
|
import software.amazon.kinesis.config.KCLAppConfig;
|
||||||
|
|
||||||
import lombok.NoArgsConstructor;
|
import lombok.NoArgsConstructor;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
|
@ -66,7 +68,7 @@ public abstract class AWSResourceManager {
|
||||||
final List<String> resourceNames = getAllResourceNames();
|
final List<String> resourceNames = getAllResourceNames();
|
||||||
for (String resourceName : resourceNames) {
|
for (String resourceName : resourceNames) {
|
||||||
// Delete all resources that have prefix "KCLRelease"
|
// Delete all resources that have prefix "KCLRelease"
|
||||||
if (resourceName.startsWith("KCLRelease")) {
|
if (resourceName.startsWith(KCLAppConfig.INTEGRATION_TEST_RESOURCE_PREFIX)) {
|
||||||
deleteResource(resourceName);
|
deleteResource(resourceName);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -2,49 +2,74 @@ package software.amazon.kinesis.utils;
|
||||||
|
|
||||||
import lombok.Value;
|
import lombok.Value;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import software.amazon.awssdk.arns.Arn;
|
||||||
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
|
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
|
||||||
import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
|
import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
|
||||||
import software.amazon.awssdk.services.kinesis.model.DeleteStreamRequest;
|
import software.amazon.awssdk.services.kinesis.model.DeleteStreamRequest;
|
||||||
|
import software.amazon.awssdk.services.kinesis.model.DescribeStreamConsumerRequest;
|
||||||
|
import software.amazon.awssdk.services.kinesis.model.DescribeStreamConsumerResponse;
|
||||||
import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryRequest;
|
import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryRequest;
|
||||||
import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryResponse;
|
import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryResponse;
|
||||||
import software.amazon.awssdk.services.kinesis.model.ListStreamsRequest;
|
import software.amazon.awssdk.services.kinesis.model.ListStreamsRequest;
|
||||||
import software.amazon.awssdk.services.kinesis.model.ListStreamsResponse;
|
import software.amazon.awssdk.services.kinesis.model.ListStreamsResponse;
|
||||||
|
import software.amazon.awssdk.services.kinesis.model.PutResourcePolicyRequest;
|
||||||
|
import software.amazon.awssdk.services.kinesis.model.RegisterStreamConsumerRequest;
|
||||||
|
import software.amazon.awssdk.services.kinesis.model.RegisterStreamConsumerResponse;
|
||||||
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
|
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
|
||||||
import software.amazon.awssdk.services.kinesis.model.StreamStatus;
|
import software.amazon.awssdk.services.kinesis.model.StreamStatus;
|
||||||
|
import software.amazon.awssdk.services.kinesis.model.ConsumerStatus;
|
||||||
import software.amazon.kinesis.common.FutureUtils;
|
import software.amazon.kinesis.common.FutureUtils;
|
||||||
import software.amazon.kinesis.config.KCLAppConfig;
|
import software.amazon.kinesis.config.KCLAppConfig;
|
||||||
|
import software.amazon.kinesis.config.RetrievalMode;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
@Value
|
@Value
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class StreamExistenceManager extends AWSResourceManager {
|
public class StreamExistenceManager extends AWSResourceManager {
|
||||||
|
private static final int CHECK_RESOURCE_ACTIVE_MAX_RETRIES = 3;
|
||||||
|
|
||||||
private final KinesisAsyncClient client;
|
private final KinesisAsyncClient client;
|
||||||
private final KCLAppConfig testConfig;
|
private final KCLAppConfig testConfig;
|
||||||
|
|
||||||
public StreamExistenceManager(KCLAppConfig config) throws URISyntaxException, IOException {
|
public StreamExistenceManager(KCLAppConfig config) throws URISyntaxException, IOException {
|
||||||
this.testConfig = config;
|
this.testConfig = config;
|
||||||
this.client = config.buildAsyncKinesisClient();
|
this.client = config.buildAsyncKinesisClientForStreamOwner();
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isResourceActive(String streamName) {
|
public boolean isResourceActive(String streamName) {
|
||||||
final DescribeStreamSummaryRequest request = DescribeStreamSummaryRequest.builder().streamName(streamName).build();
|
final DescribeStreamSummaryRequest request = DescribeStreamSummaryRequest.builder().streamName(streamName).build();
|
||||||
final CompletableFuture<DescribeStreamSummaryResponse> describeStreamSummaryResponseCompletableFuture = client.describeStreamSummary(request);
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
final DescribeStreamSummaryResponse response = describeStreamSummaryResponseCompletableFuture.get(30, TimeUnit.SECONDS);
|
final DescribeStreamSummaryResponse response =
|
||||||
boolean isActive = response.streamDescriptionSummary().streamStatus().equals(StreamStatus.ACTIVE);
|
FutureUtils.resolveOrCancelFuture(client.describeStreamSummary(request), Duration.ofSeconds(60));
|
||||||
if (!isActive) {
|
final boolean isActive = response.streamDescriptionSummary().streamStatus().equals(StreamStatus.ACTIVE);
|
||||||
throw new RuntimeException("Stream is not active, instead in status: " + response.streamDescriptionSummary().streamStatus());
|
return isActive;
|
||||||
|
} catch (ExecutionException e) {
|
||||||
|
if (e.getCause() instanceof ResourceNotFoundException) {
|
||||||
|
return false;
|
||||||
|
} else {
|
||||||
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
return true;
|
} catch (Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean isConsumerActive(Arn consumerArn) {
|
||||||
|
final DescribeStreamConsumerRequest request = DescribeStreamConsumerRequest.builder().consumerARN(consumerArn.toString()).build();
|
||||||
|
try {
|
||||||
|
final DescribeStreamConsumerResponse response =
|
||||||
|
FutureUtils.resolveOrCancelFuture(client.describeStreamConsumer(request), Duration.ofSeconds(60));
|
||||||
|
final boolean isActive = response.consumerDescription().consumerStatus().equals(ConsumerStatus.ACTIVE);
|
||||||
|
return isActive;
|
||||||
} catch (ExecutionException e) {
|
} catch (ExecutionException e) {
|
||||||
if (e.getCause() instanceof ResourceNotFoundException) {
|
if (e.getCause() instanceof ResourceNotFoundException) {
|
||||||
return false;
|
return false;
|
||||||
|
|
@ -73,14 +98,103 @@ public class StreamExistenceManager extends AWSResourceManager {
|
||||||
return allStreamNames;
|
return allStreamNames;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void checkStreamAndCreateIfNecessary(String streamName) {
|
public void checkStreamsAndCreateIfNecessary() {
|
||||||
|
for (String streamName : testConfig.getStreamNames()) {
|
||||||
if (!isResourceActive(streamName)) {
|
if (!isResourceActive(streamName)) {
|
||||||
createStream(streamName, testConfig.getShardCount());
|
createStream(streamName, testConfig.getShardCount());
|
||||||
}
|
}
|
||||||
log.info("Using stream {} with region {}", streamName, testConfig.getRegion());
|
log.info("Using stream {} with region {}", streamName, testConfig.getRegion());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (testConfig.isCrossAccount()) {
|
||||||
|
for (Arn streamArn : testConfig.getStreamArns()) {
|
||||||
|
log.info("Putting cross account stream resource policy for stream {}", streamArn);
|
||||||
|
putResourcePolicyForCrossAccount(streamArn,
|
||||||
|
getCrossAccountStreamResourcePolicy(testConfig.getAccountIdForConsumer(), streamArn));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public Map<Arn, Arn> createCrossAccountConsumerIfNecessary() throws Exception {
|
||||||
|
// For cross account, KCL cannot create the consumer automatically in another account, so
|
||||||
|
// we have to create it ourselves and provide the arn to the StreamConfig in multi-stream mode or
|
||||||
|
// RetrievalConfig in single-stream mode
|
||||||
|
if (testConfig.isCrossAccount() && testConfig.getRetrievalMode().equals(RetrievalMode.STREAMING)) {
|
||||||
|
final Map<Arn, Arn> streamToConsumerArnsMap = new HashMap<>();
|
||||||
|
for (Arn streamArn : testConfig.getStreamArns()) {
|
||||||
|
final Arn consumerArn = registerConsumerAndWaitForActive(streamArn,
|
||||||
|
KCLAppConfig.CROSS_ACCOUNT_CONSUMER_NAME);
|
||||||
|
putResourcePolicyForCrossAccount(consumerArn,
|
||||||
|
getCrossAccountConsumerResourcePolicy(testConfig.getAccountIdForConsumer(), consumerArn));
|
||||||
|
streamToConsumerArnsMap.put(streamArn, consumerArn);
|
||||||
|
}
|
||||||
|
return streamToConsumerArnsMap;
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void putResourcePolicyForCrossAccount(Arn resourceArn, String policy) {
|
||||||
|
try {
|
||||||
|
final PutResourcePolicyRequest putResourcePolicyRequest = PutResourcePolicyRequest.builder()
|
||||||
|
.resourceARN(resourceArn.toString())
|
||||||
|
.policy(policy)
|
||||||
|
.build();
|
||||||
|
FutureUtils.resolveOrCancelFuture(client.putResourcePolicy(putResourcePolicyRequest), Duration.ofSeconds(60));
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new RuntimeException("Failed to PutResourcePolicy " + policy + " on resource " + resourceArn, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getCrossAccountStreamResourcePolicy(String accountId, Arn streamArn) {
|
||||||
|
return "{\"Version\":\"2012-10-17\","
|
||||||
|
+ "\"Statement\":[{"
|
||||||
|
+ "\"Effect\": \"Allow\","
|
||||||
|
+ "\"Principal\": {\"AWS\": \"" + accountId + "\"},"
|
||||||
|
+ "\"Action\": ["
|
||||||
|
+ "\"kinesis:DescribeStreamSummary\",\"kinesis:ListShards\",\"kinesis:PutRecord\",\"kinesis:PutRecords\","
|
||||||
|
+ "\"kinesis:GetRecords\",\"kinesis:GetShardIterator\"],"
|
||||||
|
+ "\"Resource\": \"" + streamArn.toString() + "\""
|
||||||
|
+ "}]}";
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getCrossAccountConsumerResourcePolicy(String accountId, Arn consumerArn) {
|
||||||
|
return "{\"Version\":\"2012-10-17\","
|
||||||
|
+ "\"Statement\":[{"
|
||||||
|
+ "\"Effect\": \"Allow\","
|
||||||
|
+ "\"Principal\": {\"AWS\": \"" + accountId + "\"},"
|
||||||
|
+ "\"Action\": ["
|
||||||
|
+ "\"kinesis:DescribeStreamConsumer\",\"kinesis:SubscribeToShard\"],"
|
||||||
|
+ "\"Resource\": \"" + consumerArn.toString() + "\""
|
||||||
|
+ "}]}";
|
||||||
|
}
|
||||||
|
|
||||||
|
private Arn registerConsumerAndWaitForActive(Arn streamArn, String consumerName) throws Exception {
|
||||||
|
final RegisterStreamConsumerRequest registerStreamConsumerRequest = RegisterStreamConsumerRequest.builder()
|
||||||
|
.streamARN(streamArn.toString())
|
||||||
|
.consumerName(consumerName)
|
||||||
|
.build();
|
||||||
|
final RegisterStreamConsumerResponse response =
|
||||||
|
FutureUtils.resolveOrCancelFuture(client.registerStreamConsumer(registerStreamConsumerRequest),
|
||||||
|
Duration.ofSeconds(60));
|
||||||
|
final Arn consumerArn = Arn.fromString(response.consumer().consumerARN());
|
||||||
|
|
||||||
|
int retries = 0;
|
||||||
|
while (!isConsumerActive(consumerArn)) {
|
||||||
|
log.info("Consumer {} is not active yet. Checking again in 5 seconds.", consumerArn);
|
||||||
|
if (retries > CHECK_RESOURCE_ACTIVE_MAX_RETRIES) {
|
||||||
|
throw new RuntimeException("Failed consumer registration, did not transition into active");
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
Thread.sleep(TimeUnit.SECONDS.toMillis(5));
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
log.error("Failed to sleep");
|
||||||
|
}
|
||||||
|
retries++;
|
||||||
|
}
|
||||||
|
log.info("Successfully registered consumer {}", consumerArn);
|
||||||
|
return consumerArn;
|
||||||
|
}
|
||||||
|
|
||||||
private void createStream(String streamName, int shardCount) {
|
private void createStream(String streamName, int shardCount) {
|
||||||
final CreateStreamRequest request = CreateStreamRequest.builder().streamName(streamName).shardCount(shardCount).build();
|
final CreateStreamRequest request = CreateStreamRequest.builder().streamName(streamName).shardCount(shardCount).build();
|
||||||
try {
|
try {
|
||||||
|
|
@ -89,26 +203,19 @@ public class StreamExistenceManager extends AWSResourceManager {
|
||||||
throw new RuntimeException("Failed to create stream with name " + streamName, e);
|
throw new RuntimeException("Failed to create stream with name " + streamName, e);
|
||||||
}
|
}
|
||||||
|
|
||||||
int i = 0;
|
int retries = 0;
|
||||||
while (true) {
|
while (!isResourceActive(streamName)) {
|
||||||
i++;
|
log.info("Stream {} is not active yet. Checking again in 5 seconds.", streamName);
|
||||||
if (i > 100) {
|
if (retries > CHECK_RESOURCE_ACTIVE_MAX_RETRIES) {
|
||||||
throw new RuntimeException("Failed stream creation, did not transition into active");
|
throw new RuntimeException("Failed stream creation, did not transition into active");
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
boolean isActive = isResourceActive(streamName);
|
Thread.sleep(TimeUnit.SECONDS.toMillis(5));
|
||||||
if (isActive) {
|
} catch (InterruptedException e) {
|
||||||
log.info("Succesfully created the stream {}", streamName);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
try {
|
|
||||||
Thread.sleep(TimeUnit.SECONDS.toMillis(10));
|
|
||||||
} catch (InterruptedException e1) {
|
|
||||||
log.error("Failed to sleep");
|
log.error("Failed to sleep");
|
||||||
}
|
}
|
||||||
log.info("Stream {} is not active yet, exception: ", streamName, e);
|
retries++;
|
||||||
}
|
}
|
||||||
}
|
log.info("Successfully created the stream {}", streamName);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue