Updates based on initial comments

This commit is contained in:
Meher Mankikar 2023-06-13 14:47:38 -07:00
parent d191401c18
commit 37f6c042f4
23 changed files with 707 additions and 809 deletions

View file

@ -36,6 +36,12 @@ After you've downloaded the code from GitHub, you can build it using Maven. To d
resources (which requires manual cleanup). Integration tests require valid AWS credentials need to be discovered at
runtime. To skip running integration tests, add ` -DskipITs` option to the build command.
## Running Integration Tests
To run integration tests to test any changes to KCL, you can use this command: `mvn -Dit.test=*IntegrationTest verify`.
This will look for default AWS credentials in your local `.aws/credentials`. If you want to override these
credentials, you can provide the name of an IAM user as a string using this command: `mvn -Dit.test=*IntegrationTest -Dcredentials="<IAM_USER>" verify`.
## Integration with the Kinesis Producer Library
For producer-side developers using the **[Kinesis Producer Library (KPL)][kinesis-guide-kpl]**, the KCL integrates without additional effort. When the KCL retrieves an aggregated Amazon Kinesis record consisting of multiple KPL user records, it will automatically invoke the KPL to extract the individual user records before returning them to the user.

View file

@ -1,55 +0,0 @@
package software.amazon.kinesis.common;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mock;
import software.amazon.kinesis.utils.RecordValidatorQueue;
public class RecordValidatorQueueTest {
@Mock
private RecordValidatorQueue recordValidator;
private static final String shardId = "ABC";
private final int outOfOrderError = -1;
private final int missingRecordError = -2;
private final int noError = 0;
@Test
public void validationFailedRecordOutOfOrderTest() {
recordValidator = new RecordValidatorQueue();
recordValidator.add(shardId, "0");
recordValidator.add(shardId, "1");
recordValidator.add(shardId, "3");
recordValidator.add(shardId, "2");
int error = recordValidator.validateRecords( 4 );
Assert.assertEquals(outOfOrderError, error);
}
@Test
public void validationFailedMissingRecordTest() {
recordValidator = new RecordValidatorQueue();
recordValidator.add(shardId, "0");
recordValidator.add(shardId, "1");
recordValidator.add(shardId, "2");
recordValidator.add(shardId, "3");
int error = recordValidator.validateRecords( 5 );
Assert.assertEquals(missingRecordError, error);
}
@Test
public void validRecordsTest() {
recordValidator = new RecordValidatorQueue();
recordValidator.add(shardId, "0");
recordValidator.add(shardId, "1");
recordValidator.add(shardId, "2");
recordValidator.add(shardId, "3");
int error = recordValidator.validateRecords( 4 );
Assert.assertEquals(noError, error);
}
}

View file

@ -1,11 +1,11 @@
package software.amazon.kinesis.config;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.http.Protocol;
import software.amazon.awssdk.regions.Region;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.retrieval.RetrievalConfig;
import software.amazon.kinesis.retrieval.polling.PollingConfig;
import java.io.IOException;
import java.net.URISyntaxException;
@ -14,20 +14,24 @@ import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Date;
public class KCLReleaseCanary2XPollingH1TestConfig implements KCLAppConfig {
/**
* Basic config for a release canary (streaming) with default settings
*/
@Slf4j
public class BasicReleaseCanaryConfig implements KCLAppConfig {
@Override
public String getStreamName() {
return "KCLReleaseCanary2XPollingH1TestConfig";
return "";
}
@Override
public int getShardCount() {
return 20;
return 10;
}
@Override
public String getApplicationName() {
return "KCLReleaseCanary2XPollingH1TestConfigApplication";
return "";
}
@Override
@ -40,24 +44,21 @@ public class KCLReleaseCanary2XPollingH1TestConfig implements KCLAppConfig {
return Region.US_WEST_2;
}
@Override
public boolean isProd() {
return true;
}
/**
* This will get the credentials that are provided in the maven command
* when running integration tests if any are provided through -Dcredentials=iamUser
* Otherwise, iamUser will be null and the test will search for default credentials
* in the test environment.
*/
@Override
public String getProfile() {
String iamUser = System.getProperty( "credentials" );
String iamUser = System.getProperty("credentials");
return iamUser;
}
@Override
public long getProcessingDelayMillis() {
return -1;
}
@Override
public InitialPositionInStream getKclInitialPosition() {
public InitialPositionInStream getInitialPosition() {
return InitialPositionInStream.TRIM_HORIZON;
}
@ -74,10 +75,10 @@ public class KCLReleaseCanary2XPollingH1TestConfig implements KCLAppConfig {
@Override
public ProducerConfig getProducerConfig() {
return ProducerConfig.builder()
.isBatchPut( false )
.batchSize( 1 )
.recordSizeKB( 60 )
.callPeriodMills( 100 )
.isBatchPut(false)
.batchSize(1)
.recordSizeKB(60)
.callPeriodMills(100)
.build();
}
@ -89,16 +90,18 @@ public class KCLReleaseCanary2XPollingH1TestConfig implements KCLAppConfig {
@Override
public RetrievalConfig getRetrievalConfig() throws IOException, URISyntaxException {
LocalDateTime d = LocalDateTime.now();
d = d.minusMinutes( 5 );
Instant instant = d.atZone( ZoneId.systemDefault() ).toInstant();
Date startStreamTime = Date.from( instant );
d = d.minusMinutes(5);
Instant instant = d.atZone(ZoneId.systemDefault()).toInstant();
Date startStreamTime = Date.from(instant);
InitialPositionInStreamExtended initialPosition = InitialPositionInStreamExtended
.newInitialPositionAtTimestamp( startStreamTime );
.newInitialPositionAtTimestamp(startStreamTime);
/**
* Default is a streaming consumer
*/
RetrievalConfig config = getConfigsBuilder().retrievalConfig();
config.initialPositionInStreamExtended( initialPosition );
config.retrievalSpecificConfig( new PollingConfig( getStreamName(), buildConsumerClient() ) );
config.initialPositionInStreamExtended(initialPosition);
return config;
}

View file

@ -1,13 +1,11 @@
package software.amazon.kinesis.config;
import com.amazonaws.auth.AWSCredentialsProvider;
import software.amazon.kinesis.utils.RecordValidatorQueue;
import software.amazon.kinesis.integration_tests.TestRecordProcessorFactoryV2;
import software.amazon.kinesis.utils.KCLVersion;
import lombok.Builder;
import lombok.Data;
import software.amazon.awssdk.arns.Arn;
import lombok.Value;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.kinesis.utils.RecordValidatorQueue;
import software.amazon.kinesis.utils.TestRecordProcessorFactory;
import lombok.Builder;
import software.amazon.awssdk.arns.Arn;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
import software.amazon.awssdk.http.Protocol;
@ -45,34 +43,17 @@ public interface KCLAppConfig {
String getApplicationName();
default KCLVersion getKCLVersion() {
return KCLVersion.KCL2X;
}
default boolean canaryMonitorEnabled() {
return false;
}
String getEndpoint();
Region getRegion();
boolean isProd();
default boolean durabilityCheck() {
return true;
}
// "default" profile, should match with profiles listed in "cat ~/.aws/config"
/**
* "default" profile, should match with profiles listed in "cat ~/.aws/config"
*/
String getProfile();
// '-1' means round robin across 0, 5_000, 15_000, 30_000 milliseconds delay.
// The delay period is picked according to current time, so expected to be unpredictable across different KCL runs.
// '0' means PassThroughRecordProcessor
// Any other constant will delay according to the specified value.
long getProcessingDelayMillis();
InitialPositionInStream getKclInitialPosition();
InitialPositionInStream getInitialPosition();
Protocol getConsumerProtocol();
@ -82,76 +63,66 @@ public interface KCLAppConfig {
ReshardConfig getReshardConfig();
default MultiStreamRotatorConfig getMultiStreamRotatorConfig() {
throw new UnsupportedOperationException();
}
default KinesisAsyncClient buildConsumerClient() throws URISyntaxException, IOException {
return buildAsyncKinesisClient( getConsumerProtocol() );
return buildAsyncKinesisClient(getConsumerProtocol());
}
default KinesisAsyncClient buildProducerClient() throws URISyntaxException, IOException {
return buildAsyncKinesisClient( getProducerProtocol() );
return buildAsyncKinesisClient(getProducerProtocol());
}
default KinesisAsyncClient buildAsyncKinesisClient( Protocol protocol ) throws URISyntaxException, IOException {
return buildAsyncKinesisClient( Optional.ofNullable( protocol ) );
default KinesisAsyncClient buildAsyncKinesisClient(Protocol protocol) throws URISyntaxException, IOException {
return buildAsyncKinesisClient(Optional.ofNullable(protocol));
}
default KinesisAsyncClient buildAsyncKinesisClient( Optional< Protocol > protocol ) throws URISyntaxException, IOException {
default KinesisAsyncClient buildAsyncKinesisClient(Optional<Protocol> protocol) throws URISyntaxException, IOException {
// Setup H2 client config.
/**
* Setup H2 client config.
*/
final NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder()
.maxConcurrency( Integer.MAX_VALUE );
.maxConcurrency(Integer.MAX_VALUE);
if ( protocol.isPresent() ) {
builder.protocol( protocol.get() );
/**
* If not present, defaults to HTTP1_1
*/
if (protocol.isPresent()) {
builder.protocol(protocol.get());
}
final SdkAsyncHttpClient sdkAsyncHttpClient =
builder.buildWithDefaults( AttributeMap.builder().put( SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, true ).build() );
builder.buildWithDefaults(AttributeMap.builder().put(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, true).build());
// Setup client builder by default values
final KinesisAsyncClientBuilder kinesisAsyncClientBuilder = KinesisAsyncClient.builder().region( getRegion() );
/**
* Setup client builder by default values
*/
final KinesisAsyncClientBuilder kinesisAsyncClientBuilder = KinesisAsyncClient.builder().region(getRegion());
// Override endpoint if not one of the Prod stacks.
// if (!isProd()) {
// kinesisAsyncClientBuilder
// .endpointOverride(new URI(getEndpoint()));
// }
kinesisAsyncClientBuilder.httpClient(sdkAsyncHttpClient);
kinesisAsyncClientBuilder.httpClient( sdkAsyncHttpClient );
if ( getProfile() != null ) {
kinesisAsyncClientBuilder.credentialsProvider( ProfileCredentialsProvider.builder().profileName( getProfile() ).build() );
} else {
kinesisAsyncClientBuilder.credentialsProvider( DefaultCredentialsProvider.create() );
}
AwsCredentialsProvider credentialsProvider = (getProfile() != null) ?
ProfileCredentialsProvider.builder().profileName(getProfile()).build() : DefaultCredentialsProvider.create();
kinesisAsyncClientBuilder.credentialsProvider( credentialsProvider );
return kinesisAsyncClientBuilder.build();
}
default DynamoDbAsyncClient buildAsyncDynamoDbClient() throws IOException {
final DynamoDbAsyncClientBuilder builder = DynamoDbAsyncClient.builder().region( getRegion() );
final DynamoDbAsyncClientBuilder builder = DynamoDbAsyncClient.builder().region(getRegion());
if ( getProfile() != null ) {
builder.credentialsProvider( ProfileCredentialsProvider.builder().profileName( getProfile() ).build() );
} else {
builder.credentialsProvider( DefaultCredentialsProvider.create() );
}
AwsCredentialsProvider credentialsProvider = (getProfile() != null) ?
ProfileCredentialsProvider.builder().profileName(getProfile()).build() : DefaultCredentialsProvider.create();
builder.credentialsProvider(credentialsProvider);
return builder.build();
}
default CloudWatchAsyncClient buildAsyncCloudWatchClient() throws IOException {
final CloudWatchAsyncClientBuilder builder = CloudWatchAsyncClient.builder().region( getRegion() );
final CloudWatchAsyncClientBuilder builder = CloudWatchAsyncClient.builder().region(getRegion());
if ( getProfile() != null ) {
builder.credentialsProvider( ProfileCredentialsProvider.builder().profileName( getProfile() ).build() );
} else {
builder.credentialsProvider( DefaultCredentialsProvider.create() );
}
AwsCredentialsProvider credentialsProvider = (getProfile() != null) ?
ProfileCredentialsProvider.builder().profileName(getProfile()).build() : DefaultCredentialsProvider.create();
builder.credentialsProvider(credentialsProvider);
return builder.build();
}
@ -165,31 +136,30 @@ public interface KCLAppConfig {
}
default ShardRecordProcessorFactory getShardRecordProcessorFactory() {
if (getKCLVersion() == KCLVersion.KCL2X) {
return new TestRecordProcessorFactoryV2( getRecordValidator() );
} else {
return null;
}
return new TestRecordProcessorFactory(getRecordValidator());
}
default ConfigsBuilder getConfigsBuilder() throws IOException, URISyntaxException {
return getConfigsBuilder( "" );
return getConfigsBuilder("");
}
default ConfigsBuilder getConfigsBuilder( String workerIdSuffix ) throws IOException, URISyntaxException {
default ConfigsBuilder getConfigsBuilder(String workerIdSuffix) throws IOException, URISyntaxException {
final String workerId = getWorkerId() + workerIdSuffix;
if ( getStreamArn() == null ) {
return new ConfigsBuilder( getStreamName(), getApplicationName(), buildConsumerClient(), buildAsyncDynamoDbClient(),
buildAsyncCloudWatchClient(), workerId, getShardRecordProcessorFactory() );
if (getStreamArn() == null) {
return new ConfigsBuilder(getStreamName(), getApplicationName(), buildConsumerClient(), buildAsyncDynamoDbClient(),
buildAsyncCloudWatchClient(), workerId, getShardRecordProcessorFactory());
} else {
return new ConfigsBuilder( Arn.fromString( getStreamArn() ), getApplicationName(), buildConsumerClient(), buildAsyncDynamoDbClient(),
buildAsyncCloudWatchClient(), workerId, getShardRecordProcessorFactory() );
return new ConfigsBuilder(Arn.fromString(getStreamArn()), getApplicationName(), buildConsumerClient(), buildAsyncDynamoDbClient(),
buildAsyncCloudWatchClient(), workerId, getShardRecordProcessorFactory());
}
}
RetrievalConfig getRetrievalConfig() throws IOException, URISyntaxException;
@Data
/**
* Configure ingress load (batch size, record size, and calling interval)
*/
@Value
@Builder
class ProducerConfig {
private boolean isBatchPut;
@ -198,20 +168,22 @@ public interface KCLAppConfig {
private long callPeriodMills;
}
@Data
/**
* Description of the method of resharding for a test case
* <p>
* reshardingFactorCycle: lists the scales by which the number of shards in a stream will be updated
* in sequence. e.g {2.0, 0.5} means that the number of shards will first be doubled, then halved
* <p>
* numReshardCycles: the number of resharding cycles that will be executed in a test]
* <p>
* reshardFrequencyMillis: the period of time between reshard cycles (in milliseconds)
*/
@Value
@Builder
class ReshardConfig {
private Double[] reshardingFactorCycle;
private double[] reshardingFactorCycle;
private int numReshardCycles;
private long reshardFrequencyMillis;
}
@Data
@Builder
class MultiStreamRotatorConfig {
private int totalStreams;
private int maxStreamsToProcess;
private long streamsRotationMillis;
}
}

View file

@ -1,106 +0,0 @@
package software.amazon.kinesis.config;
import software.amazon.awssdk.http.Protocol;
import software.amazon.awssdk.regions.Region;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.retrieval.RetrievalConfig;
import software.amazon.kinesis.retrieval.polling.PollingConfig;
import java.io.IOException;
import java.net.URISyntaxException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Date;
public class KCLReleaseCanary2XPollingH2TestConfig implements KCLAppConfig {
@Override
public String getStreamName() {
return "KCLTest3";
}
@Override
public int getShardCount() {
return 20;
}
@Override
public String getApplicationName() {
return "KCLReleaseCanary2XPollingH2TestApplication";
}
@Override
public String getEndpoint() {
return "";
}
@Override
public Region getRegion() {
return Region.US_WEST_2;
}
@Override
public boolean isProd() {
return true;
}
@Override
public String getProfile() {
String iamUser = System.getProperty( "credentials" );
return iamUser;
}
@Override
public long getProcessingDelayMillis() {
return -1;
}
@Override
public InitialPositionInStream getKclInitialPosition() {
return InitialPositionInStream.TRIM_HORIZON;
}
@Override
public Protocol getConsumerProtocol() {
return Protocol.HTTP2;
}
@Override
public Protocol getProducerProtocol() {
return Protocol.HTTP1_1;
}
@Override
public ProducerConfig getProducerConfig() {
return ProducerConfig.builder()
.isBatchPut( false )
.batchSize( 1 )
.recordSizeKB( 60 )
.callPeriodMills( 100 )
.build();
}
@Override
public ReshardConfig getReshardConfig() {
return null;
}
@Override
public RetrievalConfig getRetrievalConfig() throws IOException, URISyntaxException {
LocalDateTime d = LocalDateTime.now();
d = d.minusMinutes( 5 );
Instant instant = d.atZone( ZoneId.systemDefault() ).toInstant();
Date startStreamTime = Date.from( instant );
InitialPositionInStreamExtended initialPosition = InitialPositionInStreamExtended
.newInitialPositionAtTimestamp( startStreamTime );
RetrievalConfig config = getConfigsBuilder().retrievalConfig();
config.initialPositionInStreamExtended( initialPosition );
config.retrievalSpecificConfig( new PollingConfig( getStreamName(), buildConsumerClient() ) );
return config;
}
}

View file

@ -1,104 +0,0 @@
package software.amazon.kinesis.config;
import software.amazon.awssdk.http.Protocol;
import software.amazon.awssdk.regions.Region;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.retrieval.RetrievalConfig;
import java.io.IOException;
import java.net.URISyntaxException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Date;
public class KCLReleaseCanary2XStreamingTestConfig implements KCLAppConfig {
@Override
public String getStreamName() {
return "KCLReleaseCanary2XStreamingTestStream";
}
@Override
public int getShardCount() {
return 10;
}
@Override
public String getApplicationName() {
return "KCLReleaseCanary2XStreamingTestApplication";
}
@Override
public String getEndpoint() {
return "";
}
@Override
public Region getRegion() {
return Region.US_WEST_2;
}
@Override
public boolean isProd() {
return true;
}
@Override
public String getProfile() {
String iamUser = System.getProperty( "credentials" );
return iamUser;
}
@Override
public long getProcessingDelayMillis() {
return 50;
}
@Override
public InitialPositionInStream getKclInitialPosition() {
return InitialPositionInStream.TRIM_HORIZON;
}
@Override
public Protocol getConsumerProtocol() {
return Protocol.HTTP2;
}
@Override
public Protocol getProducerProtocol() {
return Protocol.HTTP1_1;
}
@Override
public ProducerConfig getProducerConfig() {
return ProducerConfig.builder()
.isBatchPut( false )
.batchSize( 1 )
.recordSizeKB( 60 )
.callPeriodMills( 100 )
.build();
}
@Override
public ReshardConfig getReshardConfig() {
return null;
}
@Override
public RetrievalConfig getRetrievalConfig() throws IOException, URISyntaxException {
LocalDateTime d = LocalDateTime.now();
d = d.minusMinutes( 5 );
Instant instant = d.atZone( ZoneId.systemDefault() ).toInstant();
Date startStreamTime = Date.from( instant );
InitialPositionInStreamExtended initialPosition = InitialPositionInStreamExtended
.newInitialPositionAtTimestamp( startStreamTime );
RetrievalConfig config = getConfigsBuilder().retrievalConfig();
config.initialPositionInStreamExtended( initialPosition );
return config;
}
}

View file

@ -0,0 +1,55 @@
package software.amazon.kinesis.config;
import software.amazon.awssdk.http.Protocol;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.retrieval.RetrievalConfig;
import software.amazon.kinesis.retrieval.polling.PollingConfig;
import java.io.IOException;
import java.net.URISyntaxException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Date;
/**
* Config for a polling consumer with HTTP protocol of HTTP1
*/
public class ReleaseCanaryPollingH1TestConfig extends BasicReleaseCanaryConfig {
@Override
public String getStreamName() {
return "KCLReleaseCanary2XPollingH1TestConfig";
}
@Override
public int getShardCount() {
return 20;
}
@Override
public String getApplicationName() {
return "KCLReleaseCanary2XPollingH1TestConfigApplication";
}
@Override
public Protocol getConsumerProtocol() {
return Protocol.HTTP1_1;
}
@Override
public RetrievalConfig getRetrievalConfig() throws IOException, URISyntaxException {
LocalDateTime d = LocalDateTime.now();
d = d.minusMinutes(5);
Instant instant = d.atZone(ZoneId.systemDefault()).toInstant();
Date startStreamTime = Date.from(instant);
InitialPositionInStreamExtended initialPosition = InitialPositionInStreamExtended
.newInitialPositionAtTimestamp(startStreamTime);
RetrievalConfig config = getConfigsBuilder().retrievalConfig();
config.initialPositionInStreamExtended(initialPosition);
config.retrievalSpecificConfig(new PollingConfig(getStreamName(), buildConsumerClient()));
return config;
}
}

View file

@ -0,0 +1,56 @@
package software.amazon.kinesis.config;
import software.amazon.awssdk.http.Protocol;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.retrieval.RetrievalConfig;
import software.amazon.kinesis.retrieval.polling.PollingConfig;
import java.io.IOException;
import java.net.URISyntaxException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Date;
/**
* Config for a polling consumer with HTTP protocol of HTTP2
*/
public class ReleaseCanaryPollingH2TestConfig extends BasicReleaseCanaryConfig {
@Override
public String getStreamName() {
return "KCLTest3";
}
@Override
public int getShardCount() {
return 20;
}
@Override
public String getApplicationName() {
return "KCLReleaseCanary2XPollingH2TestApplication";
}
@Override
public Protocol getConsumerProtocol() {
return Protocol.HTTP2;
}
@Override
public RetrievalConfig getRetrievalConfig() throws IOException, URISyntaxException {
LocalDateTime d = LocalDateTime.now();
d = d.minusMinutes(5);
Instant instant = d.atZone(ZoneId.systemDefault()).toInstant();
Date startStreamTime = Date.from(instant);
InitialPositionInStreamExtended initialPosition = InitialPositionInStreamExtended
.newInitialPositionAtTimestamp(startStreamTime);
RetrievalConfig config = getConfigsBuilder().retrievalConfig();
config.initialPositionInStreamExtended(initialPosition);
config.retrievalSpecificConfig(new PollingConfig(getStreamName(), buildConsumerClient()));
return config;
}
}

View file

@ -0,0 +1,25 @@
package software.amazon.kinesis.config;
import software.amazon.awssdk.http.Protocol;
/**
* Config for a streaming consumer with HTTP protocol of HTTP2
*/
public class ReleaseCanaryStreamingTestConfig extends BasicReleaseCanaryConfig {
@Override
public String getStreamName() {
return "KCLReleaseCanary2XStreamingTestStream";
}
@Override
public String getApplicationName() {
return "KCLReleaseCanary2XStreamingTestApplication";
}
@Override
public Protocol getConsumerProtocol() {
return Protocol.HTTP2;
}
}

View file

@ -1,35 +0,0 @@
package software.amazon.kinesis.integration_tests;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import software.amazon.kinesis.config.KCLAppConfig;
@Slf4j
public class KCL2XIntegrationTest {
private static final String CONFIG_PACKAGE = "software.amazon.kinesis.config";
@Test
public void KCLReleaseCanary2XPollingH2Test() throws Exception {
String[] configName = { "KCLReleaseCanary2XPollingH2TestConfig" };
KCLAppConfig consumerConfig = (KCLAppConfig) Class.forName(CONFIG_PACKAGE + "." + configName[0]).newInstance();
TestConsumerV2 consumer = new TestConsumerV2( consumerConfig );
consumer.run();
}
@Test
public void KCLReleaseCanary2XPollingH1Test() throws Exception {
String[] configName = { "KCLReleaseCanary2XPollingH1TestConfig" };
KCLAppConfig consumerConfig = (KCLAppConfig) Class.forName(CONFIG_PACKAGE + "." + configName[0]).newInstance();
TestConsumerV2 consumer = new TestConsumerV2( consumerConfig );
consumer.run();
}
@Test
public void KCLReleaseCanary2XStreamingTest() throws Exception {
String[] configName = { "KCLReleaseCanary2XStreamingTestConfig" };
KCLAppConfig consumerConfig = (KCLAppConfig) Class.forName(CONFIG_PACKAGE + "." + configName[0]).newInstance();
TestConsumerV2 consumer = new TestConsumerV2( consumerConfig );
consumer.run();
}
}

View file

@ -1,70 +0,0 @@
package software.amazon.kinesis.integration_tests;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.config.KCLAppConfig;
import org.apache.commons.lang3.RandomStringUtils;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutionException;
public class TestConsumer {
private static final Logger log = LoggerFactory.getLogger( TestConsumer.class );
public final KCLAppConfig consumerConfig;
public final Region region;
public final String streamName;
public final KinesisAsyncClient kinesisClient;
public int successfulPutRecords = 0;
public BigInteger payloadCounter = new BigInteger( "0" );
public TestConsumer( KCLAppConfig consumerConfig ) {
this.consumerConfig = consumerConfig;
this.region = consumerConfig.getRegion();
this.streamName = consumerConfig.getStreamName();
this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient( KinesisAsyncClient.builder().region( this.region ) );
}
public void publishRecord() {
PutRecordRequest request;
try {
request = PutRecordRequest.builder()
.partitionKey( RandomStringUtils.randomAlphabetic( 5, 20 ) )
.streamName( streamName )
.data( SdkBytes.fromByteBuffer( wrapWithCounter( 5, payloadCounter ) ) ) // 1024 is 1 KB
.build();
kinesisClient.putRecord( request ).get();
// Increment the payload counter if the putRecord call was successful
payloadCounter = payloadCounter.add( new BigInteger( "1" ) );
successfulPutRecords += 1;
} catch ( InterruptedException e ) {
log.info( "Interrupted, assuming shutdown." );
} catch ( ExecutionException e ) {
log.error( "Error during publishRecord. Will try again next cycle", e );
} catch ( RuntimeException e ) {
log.error( "Error while creating request", e );
}
}
private ByteBuffer wrapWithCounter( int payloadSize, BigInteger payloadCounter ) throws RuntimeException {
byte[] returnData;
log.info( "--------------Putting record with data: {}", payloadCounter );
ObjectMapper mapper = new ObjectMapper();
try {
returnData = mapper.writeValueAsBytes( payloadCounter );
} catch ( Exception e ) {
log.error( "Error creating payload data for {}", payloadCounter.toString() );
throw new RuntimeException( "Error converting object to bytes: ", e );
}
return ByteBuffer.wrap( returnData );
}
}

View file

@ -1,142 +0,0 @@
package software.amazon.kinesis.integration_tests;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.kinesis.checkpoint.CheckpointConfig;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.config.KCLAppConfig;
import software.amazon.kinesis.coordinator.CoordinatorConfig;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.leases.LeaseManagementConfig;
import software.amazon.kinesis.lifecycle.LifecycleConfig;
import software.amazon.kinesis.metrics.MetricsConfig;
import software.amazon.kinesis.processor.ProcessorConfig;
import software.amazon.kinesis.retrieval.RetrievalConfig;
import software.amazon.kinesis.utils.RecordValidatorQueue;
import java.util.UUID;
import java.util.concurrent.*;
public class TestConsumerV2 extends TestConsumer {
private static final Logger log = LoggerFactory.getLogger( TestConsumerV2.class );
private final int outOfOrderError = -1;
private final int missingRecordError = -2;
private MetricsConfig metricsConfig;
private RetrievalConfig retrievalConfig;
private CheckpointConfig checkpointConfig;
private CoordinatorConfig coordinatorConfig;
private LeaseManagementConfig leaseManagementConfig;
private LifecycleConfig lifecycleConfig;
private ProcessorConfig processorConfig;
public TestConsumerV2( KCLAppConfig consumerConfig ) {
super( consumerConfig );
}
public void run() throws Exception {
/**
* Check if stream is created. If not, create it
*/
StreamExistenceManager.newManager( this.consumerConfig ).checkStreamAndCreateIfNecessary( this.streamName );
/**
* Send dummy data to stream
*/
ScheduledExecutorService producerExecutor = Executors.newSingleThreadScheduledExecutor();
ScheduledFuture<?> producerFuture = producerExecutor.scheduleAtFixedRate( this::publishRecord, 10, 1, TimeUnit.SECONDS );
RecordValidatorQueue recordValidator = new RecordValidatorQueue();
/**
* Setup configuration of KCL (including DynamoDB and CloudWatch)
*/
DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region( region ).build();
CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region( region ).build();
ConfigsBuilder configsBuilder = new ConfigsBuilder( streamName, streamName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new TestRecordProcessorFactoryV2( recordValidator ) );
retrievalConfig = consumerConfig.getRetrievalConfig();
checkpointConfig = configsBuilder.checkpointConfig();
coordinatorConfig = configsBuilder.coordinatorConfig();
leaseManagementConfig = configsBuilder.leaseManagementConfig()
.initialPositionInStream( InitialPositionInStreamExtended.newInitialPosition( consumerConfig.getKclInitialPosition() ) )
.initialLeaseTableReadCapacity( 50 ).initialLeaseTableWriteCapacity( 50 );
lifecycleConfig = configsBuilder.lifecycleConfig();
processorConfig = configsBuilder.processorConfig();
metricsConfig = configsBuilder.metricsConfig();
/**
* Create Scheduler
*/
Scheduler scheduler = new Scheduler(
checkpointConfig,
coordinatorConfig,
leaseManagementConfig,
lifecycleConfig,
metricsConfig,
processorConfig,
retrievalConfig
);
/**
* Start record processing of dummy data
*/
Thread schedulerThread = new Thread( scheduler );
schedulerThread.setDaemon( true );
schedulerThread.start();
/**
* Sleep for two minutes to allow the producer/consumer to run and then end the test case.
*/
try {
Thread.sleep( TimeUnit.SECONDS.toMillis( 60 * 2 ) ); // 60 * 2
} catch ( InterruptedException e ) {
throw new RuntimeException( e );
}
/**
* Stops sending dummy data.
*/
log.info( "Cancelling producer and shutting down executor." );
producerFuture.cancel( true );
producerExecutor.shutdownNow();
/**
* Wait a few seconds for the last few records to be processed
*/
Thread.sleep( TimeUnit.SECONDS.toMillis( 10 ) );
/**
* Stops consuming data. Finishes processing the current batch of data already received from Kinesis
* before shutting down.
*/
Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
log.info( "Waiting up to 20 seconds for shutdown to complete." );
try {
gracefulShutdownFuture.get( 20, TimeUnit.SECONDS );
} catch ( InterruptedException e ) {
log.info( "Interrupted while waiting for graceful shutdown. Continuing." );
} catch ( ExecutionException e ) {
log.error( "Exception while executing graceful shutdown.", e );
} catch ( TimeoutException e ) {
log.error( "Timeout while waiting for shutdown. Scheduler may not have exited." );
}
log.info( "Completed, shutting down now." );
/**
* Validate processed data
*/
int errorVal = recordValidator.validateRecords( successfulPutRecords );
if ( errorVal == outOfOrderError ) {
throw new RuntimeException( "There was an error validating the records that were processed. The records were out of order" );
} else if ( errorVal == missingRecordError ) {
throw new RuntimeException( "There was an error validating the records that were processed. Some records were missing." );
}
log.info( "--------------Completed validation of processed records.--------------" );
}
}

View file

@ -1,20 +0,0 @@
package software.amazon.kinesis.integration_tests;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.utils.RecordValidatorQueue;
public class TestRecordProcessorFactoryV2 implements ShardRecordProcessorFactory {
RecordValidatorQueue recordValidator;
public TestRecordProcessorFactoryV2( RecordValidatorQueue recordValidator ) {
this.recordValidator = recordValidator;
}
@Override
public ShardRecordProcessor shardRecordProcessor() {
return new TestRecordProcessorV2( this.recordValidator );
}
}

View file

@ -1,107 +0,0 @@
package software.amazon.kinesis.integration_tests;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.retrieval.KinesisClientRecord;
import software.amazon.kinesis.utils.RecordValidatorQueue;
import java.nio.ByteBuffer;
public class TestRecordProcessorV2 implements ShardRecordProcessor {
private static final String SHARD_ID_MDC_KEY = "ShardId";
private static final Logger log = LoggerFactory.getLogger( TestRecordProcessorV2.class );
private String shardId;
RecordValidatorQueue recordValidator;
public TestRecordProcessorV2( RecordValidatorQueue recordValidator ) {
this.recordValidator = recordValidator;
}
@Override
public void initialize( InitializationInput initializationInput ) {
shardId = initializationInput.shardId();
MDC.put( SHARD_ID_MDC_KEY, shardId );
try {
log.info( "Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber() );
} finally {
MDC.remove( SHARD_ID_MDC_KEY );
}
}
@Override
public void processRecords( software.amazon.kinesis.lifecycle.events.ProcessRecordsInput processRecordsInput ) {
MDC.put( SHARD_ID_MDC_KEY, shardId );
try {
log.info( "Processing {} record(s)", processRecordsInput.records().size() );
for ( KinesisClientRecord r : processRecordsInput.records() ) {
String data = new String( asByteArray( r.data() ) );
log.info( "Processing record pk: {}", data );
recordValidator.add( shardId, data );
}
} catch ( Throwable t ) {
log.error( "Caught throwable while processing records. Aborting.", t );
Runtime.getRuntime().halt( 1 );
} finally {
MDC.remove( SHARD_ID_MDC_KEY );
}
}
public static byte[] asByteArray( ByteBuffer buf ) {
byte[] bytes = new byte[buf.remaining()];
buf.get( bytes );
return bytes;
}
@Override
public void leaseLost( LeaseLostInput leaseLostInput ) {
MDC.put( SHARD_ID_MDC_KEY, shardId );
try {
log.info( "Lost lease, so terminating." );
} finally {
MDC.remove( SHARD_ID_MDC_KEY );
}
}
@Override
public void shardEnded( ShardEndedInput shardEndedInput ) {
MDC.put( SHARD_ID_MDC_KEY, shardId );
try {
log.info( "Reached shard end checkpointing." );
shardEndedInput.checkpointer().checkpoint();
} catch ( ShutdownException | InvalidStateException e ) {
log.error( "Exception while checkpointing at shard end. Giving up.", e );
} finally {
MDC.remove( SHARD_ID_MDC_KEY );
}
}
@Override
public void shutdownRequested( ShutdownRequestedInput shutdownRequestedInput ) {
MDC.put( SHARD_ID_MDC_KEY, shardId );
try {
log.info( "Scheduler is shutting down, checkpointing." );
shutdownRequestedInput.checkpointer().checkpoint();
} catch ( ShutdownException | InvalidStateException e ) {
log.error( "Exception while checkpointing at requested shutdown. Giving up.", e );
} finally {
MDC.remove( SHARD_ID_MDC_KEY );
}
}
}

View file

@ -0,0 +1,34 @@
package software.amazon.kinesis.lifecycle;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import software.amazon.kinesis.config.KCLAppConfig;
import software.amazon.kinesis.config.ReleaseCanaryPollingH1TestConfig;
import software.amazon.kinesis.config.ReleaseCanaryPollingH2TestConfig;
import software.amazon.kinesis.config.ReleaseCanaryStreamingTestConfig;
import software.amazon.kinesis.utils.TestConsumer;
@Slf4j
public class BasicStreamingPollingIntegrationTest {
@Test
public void KCLReleaseCanaryPollingH2Test() throws Exception {
KCLAppConfig consumerConfig = new ReleaseCanaryPollingH2TestConfig();
TestConsumer consumer = new TestConsumer(consumerConfig);
consumer.run();
}
@Test
public void KCLReleaseCanaryPollingH1Test() throws Exception {
KCLAppConfig consumerConfig = new ReleaseCanaryPollingH1TestConfig();
TestConsumer consumer = new TestConsumer(consumerConfig);
consumer.run();
}
@Test
public void KCLReleaseCanaryStreamingTest() throws Exception {
KCLAppConfig consumerConfig = new ReleaseCanaryStreamingTestConfig();
TestConsumer consumer = new TestConsumer(consumerConfig);
consumer.run();
}
}

View file

@ -1,6 +0,0 @@
package software.amazon.kinesis.utils;
public enum KCLVersion {
KCL1X,
KCL2X
}

View file

@ -0,0 +1,10 @@
package software.amazon.kinesis.utils;
/**
* Possible outcomes for record validation in RecordValidatorQueue
*/
public enum RecordValidationStatus {
OUT_OF_ORDER,
MISSING_RECORD,
NO_ERROR
}

View file

@ -1,66 +1,83 @@
package software.amazon.kinesis.utils;
import lombok.extern.slf4j.Slf4j;
import java.util.*;
import java.util.HashSet;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* Class that maintains a dictionary that maps shard IDs to a list of records
* that are processed by that shard.
* Validation ensures that
* 1. The records processed by each shard are in increasing order (duplicates allowed)
* 2. The total number of unique records processed is equal to the number of records put on the stream
*/
@Slf4j
public class RecordValidatorQueue {
HashMap<String, List<String>> dict = new HashMap<>();
private final ConcurrentHashMap<String, List<String>> dict = new ConcurrentHashMap<>();
public void add( String shardId, String data ) {
if ( dict.containsKey( shardId ) ) {
// Just add the data to this item
List<String> oldVal = dict.get( shardId );
oldVal.add( data );
dict.put( shardId, oldVal );
} else {
List<String> newVal = new ArrayList<>();
newVal.add( data );
dict.put( shardId, newVal );
}
public void add(String shardId, String data) {
final List<String> values = dict.computeIfAbsent(shardId, key -> new ArrayList<>());
values.add(data);
}
public int validateRecords( int trueTotalShardCount ) {
// Validate that each List in the HashMap has data records in increasing order
public RecordValidationStatus validateRecords(int trueTotalShardCount) {
/**
* Validate that each List in the HashMap has data records in increasing order
*/
boolean incOrder = true;
for ( Map.Entry<String, List<String>> entry : dict.entrySet() ) {
for (Map.Entry<String, List<String>> entry : dict.entrySet()) {
List<String> recordsPerShard = entry.getValue();
int prevVal = -1;
boolean shardIncOrder = true;
for ( String record : recordsPerShard ) {
int nextVal = Integer.parseInt( record );
if ( prevVal > nextVal ) {
log.error("The records are not in increasing order. Saw record data {} before {}.", prevVal, nextVal );
for (String record : recordsPerShard) {
int nextVal = Integer.parseInt(record);
if (prevVal > nextVal) {
log.error("The records are not in increasing order. Saw record data {} before {}.", prevVal, nextVal);
shardIncOrder = false;
}
prevVal = nextVal;
}
if ( !shardIncOrder ) {
if (!shardIncOrder) {
incOrder = false;
break;
}
}
// If this is true, then there was some record that was processed out of order
if ( !incOrder ) {
return -1;
/**
* If this is true, then there was some record that was processed out of order
*/
if (!incOrder) {
return RecordValidationStatus.OUT_OF_ORDER;
}
// Validate that no records are missing over all shards
/**
* Validate that no records are missing over all shards
*/
int totalShardCount = 0;
for ( Map.Entry<String, List<String>> entry : dict.entrySet() ) {
for (Map.Entry<String, List<String>> entry : dict.entrySet()) {
List<String> recordsPerShard = entry.getValue();
Set<String> noDupRecords = new HashSet<String>( recordsPerShard );
Set<String> noDupRecords = new HashSet<String>(recordsPerShard);
totalShardCount += noDupRecords.size();
}
// If this is true, then there was some record that was missed during processing.
if ( totalShardCount != trueTotalShardCount ) {
log.error( "Failed to get correct number of records processed. Should be {} but was {}", trueTotalShardCount, totalShardCount );
return -2;
/**
* If this is true, then there was some record that was missed during processing.
*/
if (totalShardCount != trueTotalShardCount) {
log.error("Failed to get correct number of records processed. Should be {} but was {}", trueTotalShardCount, totalShardCount);
return RecordValidationStatus.MISSING_RECORD;
}
return 0;
/**
* Record validation succeeded.
*/
return RecordValidationStatus.NO_ERROR;
}
}

View file

@ -0,0 +1,44 @@
package software.amazon.kinesis.utils;
import org.junit.Assert;
import org.junit.Test;
public class RecordValidatorQueueTest {
private RecordValidatorQueue recordValidator = new RecordValidatorQueue();
private static final String SHARD_ID = "ABC";
@Test
public void validationFailedRecordOutOfOrderTest() {
recordValidator.add(SHARD_ID, "0");
recordValidator.add(SHARD_ID, "1");
recordValidator.add(SHARD_ID, "3");
recordValidator.add(SHARD_ID, "2");
RecordValidationStatus error = recordValidator.validateRecords(4);
Assert.assertEquals(RecordValidationStatus.OUT_OF_ORDER, error);
}
@Test
public void validationFailedMissingRecordTest() {
recordValidator.add(SHARD_ID, "0");
recordValidator.add(SHARD_ID, "1");
recordValidator.add(SHARD_ID, "2");
recordValidator.add(SHARD_ID, "3");
RecordValidationStatus error = recordValidator.validateRecords(5);
Assert.assertEquals(RecordValidationStatus.MISSING_RECORD, error);
}
@Test
public void validRecordsTest() {
recordValidator.add(SHARD_ID, "0");
recordValidator.add(SHARD_ID, "1");
recordValidator.add(SHARD_ID, "2");
recordValidator.add(SHARD_ID, "3");
RecordValidationStatus error = recordValidator.validateRecords(4);
Assert.assertEquals(RecordValidationStatus.NO_ERROR, error);
}
}

View file

@ -1,8 +1,7 @@
package software.amazon.kinesis.integration_tests;
package software.amazon.kinesis.utils;
import lombok.Data;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.http.Protocol;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryRequest;
@ -19,7 +18,7 @@ import java.util.concurrent.TimeUnit;
import static java.lang.Thread.sleep;
@Data
@Value
@Slf4j
public class StreamExistenceManager {
private final KinesisAsyncClient client;
@ -27,11 +26,7 @@ public class StreamExistenceManager {
public StreamExistenceManager(KCLAppConfig config) throws URISyntaxException, IOException {
this.testConfig = config;
this.client = config.buildAsyncKinesisClient(Protocol.HTTP1_1);
}
public static StreamExistenceManager newManager(KCLAppConfig config) throws URISyntaxException, IOException {
return new StreamExistenceManager(config);
this.client = config.buildAsyncKinesisClient(config.getConsumerProtocol());
}
private boolean isStreamActive(String streamName) {
@ -50,8 +45,7 @@ public class StreamExistenceManager {
} catch (ExecutionException e) {
if (e.getCause() instanceof ResourceNotFoundException) {
return false;
}
else {
} else {
throw new RuntimeException(e);
}
} catch (Exception e) {
@ -95,7 +89,7 @@ public class StreamExistenceManager {
if (!isStreamActive(streamName)) {
createStream(streamName, testConfig.getShardCount());
}
log.info("Using stream " + streamName + " in endpoint " + testConfig.getEndpoint() + " with region " + testConfig.getRegion());
log.info("Using stream {} in endpoint {} with region {}", streamName, testConfig.getEndpoint(), testConfig.getRegion());
}
}

View file

@ -0,0 +1,202 @@
package software.amazon.kinesis.utils;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.kinesis.checkpoint.CheckpointConfig;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.config.KCLAppConfig;
import software.amazon.kinesis.coordinator.CoordinatorConfig;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.leases.LeaseManagementConfig;
import software.amazon.kinesis.lifecycle.LifecycleConfig;
import software.amazon.kinesis.metrics.MetricsConfig;
import software.amazon.kinesis.processor.ProcessorConfig;
import software.amazon.kinesis.retrieval.RetrievalConfig;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@Slf4j
public class TestConsumer {
public final KCLAppConfig consumerConfig;
public final Region region;
public final String streamName;
public final KinesisAsyncClient kinesisClient;
private MetricsConfig metricsConfig;
private RetrievalConfig retrievalConfig;
private CheckpointConfig checkpointConfig;
private CoordinatorConfig coordinatorConfig;
private LeaseManagementConfig leaseManagementConfig;
private LifecycleConfig lifecycleConfig;
private ProcessorConfig processorConfig;
public int successfulPutRecords = 0;
public BigInteger payloadCounter = new BigInteger("0");
public TestConsumer(KCLAppConfig consumerConfig) {
this.consumerConfig = consumerConfig;
this.region = consumerConfig.getRegion();
this.streamName = consumerConfig.getStreamName();
this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region));
}
public void run() throws Exception {
/**
* Check if stream is created. If not, create it
*/
StreamExistenceManager streamExistenceManager = new StreamExistenceManager(this.consumerConfig);
streamExistenceManager.checkStreamAndCreateIfNecessary(this.streamName);
/**
* Send dummy data to stream
*/
ScheduledExecutorService producerExecutor = Executors.newSingleThreadScheduledExecutor();
ScheduledFuture<?> producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 10, 1, TimeUnit.SECONDS);
RecordValidatorQueue recordValidator = new RecordValidatorQueue();
/**
* Setup configuration of KCL (including DynamoDB and CloudWatch)
*/
DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, streamName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new TestRecordProcessorFactory(recordValidator));
retrievalConfig = consumerConfig.getRetrievalConfig();
checkpointConfig = configsBuilder.checkpointConfig();
coordinatorConfig = configsBuilder.coordinatorConfig();
leaseManagementConfig = configsBuilder.leaseManagementConfig()
.initialPositionInStream(InitialPositionInStreamExtended.newInitialPosition(consumerConfig.getInitialPosition()))
.initialLeaseTableReadCapacity(50).initialLeaseTableWriteCapacity(50);
lifecycleConfig = configsBuilder.lifecycleConfig();
processorConfig = configsBuilder.processorConfig();
metricsConfig = configsBuilder.metricsConfig();
/**
* Create Scheduler
*/
Scheduler scheduler = new Scheduler(
checkpointConfig,
coordinatorConfig,
leaseManagementConfig,
lifecycleConfig,
metricsConfig,
processorConfig,
retrievalConfig
);
/**
* Start record processing of dummy data
*/
Thread schedulerThread = new Thread(scheduler);
schedulerThread.setDaemon(true);
schedulerThread.start();
/**
* Sleep for two minutes to allow the producer/consumer to run and then end the test case.
*/
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(60 * 2));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
/**
* Stops sending dummy data.
*/
log.info("Cancelling producer and shutting down executor.");
producerFuture.cancel(true);
producerExecutor.shutdownNow();
/**
* Wait a few seconds for the last few records to be processed
*/
Thread.sleep(TimeUnit.SECONDS.toMillis(10));
/**
* Stops consuming data. Finishes processing the current batch of data already received from Kinesis
* before shutting down.
*/
Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
log.info("Waiting up to 20 seconds for shutdown to complete.");
try {
gracefulShutdownFuture.get(20, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.info("Interrupted while waiting for graceful shutdown. Continuing.");
} catch (ExecutionException e) {
log.error("Exception while executing graceful shutdown.", e);
} catch (TimeoutException e) {
log.error("Timeout while waiting for shutdown. Scheduler may not have exited.");
}
log.info("Completed, shutting down now.");
/**
* Validate processed data
*/
log.info("The number of expected records is: {}", successfulPutRecords);
RecordValidationStatus errorVal = recordValidator.validateRecords(successfulPutRecords);
if (errorVal == RecordValidationStatus.OUT_OF_ORDER) {
throw new RuntimeException("There was an error validating the records that were processed. The records were out of order");
} else if (errorVal == RecordValidationStatus.MISSING_RECORD) {
throw new RuntimeException("There was an error validating the records that were processed. Some records were missing.");
}
log.info("--------------Completed validation of processed records.--------------");
}
public void publishRecord() {
PutRecordRequest request;
try {
request = PutRecordRequest.builder()
.partitionKey(RandomStringUtils.randomAlphabetic(5, 20))
.streamName(streamName)
.data(SdkBytes.fromByteBuffer(wrapWithCounter(5, payloadCounter))) // 1024 is 1 KB
.build();
kinesisClient.putRecord(request).get();
/**
* Increment the payload counter if the putRecord call was successful
*/
payloadCounter = payloadCounter.add(new BigInteger("1"));
successfulPutRecords += 1;
log.info("---------Record published, successfulPutRecords is now: {}", successfulPutRecords);
} catch (InterruptedException e) {
log.info("Interrupted, assuming shutdown.");
} catch (ExecutionException e) {
log.error("Error during publishRecord. Will try again next cycle", e);
} catch (RuntimeException e) {
log.error("Error while creating request", e);
}
}
private ByteBuffer wrapWithCounter(int payloadSize, BigInteger payloadCounter) throws RuntimeException {
byte[] returnData;
log.info("--------------Putting record with data: {}", payloadCounter);
ObjectMapper mapper = new ObjectMapper();
try {
returnData = mapper.writeValueAsBytes(payloadCounter);
} catch (Exception e) {
log.error("Error creating payload data for {}", payloadCounter.toString());
throw new RuntimeException("Error converting object to bytes: ", e);
}
return ByteBuffer.wrap(returnData);
}
}

View file

@ -0,0 +1,106 @@
package software.amazon.kinesis.utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.retrieval.KinesisClientRecord;
import java.nio.ByteBuffer;
public class TestRecordProcessor implements ShardRecordProcessor {
private static final String SHARD_ID_MDC_KEY = "ShardId";
private static final Logger log = LoggerFactory.getLogger(TestRecordProcessor.class);
private String shardId;
RecordValidatorQueue recordValidator;
public TestRecordProcessor(RecordValidatorQueue recordValidator) {
this.recordValidator = recordValidator;
}
@Override
public void initialize(InitializationInput initializationInput) {
shardId = initializationInput.shardId();
MDC.put(SHARD_ID_MDC_KEY, shardId);
try {
log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
} finally {
MDC.remove(SHARD_ID_MDC_KEY);
}
}
@Override
public void processRecords(software.amazon.kinesis.lifecycle.events.ProcessRecordsInput processRecordsInput) {
MDC.put(SHARD_ID_MDC_KEY, shardId);
try {
log.info("Processing {} record(s)", processRecordsInput.records().size());
for (KinesisClientRecord r : processRecordsInput.records()) {
String data = new String(asByteArray(r.data()));
log.info("Processing record pk: {}", data);
recordValidator.add(shardId, data);
}
} catch (Throwable t) {
log.error("Caught throwable while processing records. Aborting.", t);
Runtime.getRuntime().halt(1);
} finally {
MDC.remove(SHARD_ID_MDC_KEY);
}
}
public static byte[] asByteArray(ByteBuffer buf) {
byte[] bytes = new byte[buf.remaining()];
buf.get(bytes);
return bytes;
}
@Override
public void leaseLost(LeaseLostInput leaseLostInput) {
MDC.put(SHARD_ID_MDC_KEY, shardId);
try {
log.info("Lost lease, so terminating.");
} finally {
MDC.remove(SHARD_ID_MDC_KEY);
}
}
@Override
public void shardEnded(ShardEndedInput shardEndedInput) {
MDC.put(SHARD_ID_MDC_KEY, shardId);
try {
log.info("Reached shard end checkpointing.");
shardEndedInput.checkpointer().checkpoint();
} catch (ShutdownException | InvalidStateException e) {
log.error("Exception while checkpointing at shard end. Giving up.", e);
} finally {
MDC.remove(SHARD_ID_MDC_KEY);
}
}
@Override
public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
MDC.put(SHARD_ID_MDC_KEY, shardId);
try {
log.info("Scheduler is shutting down, checkpointing.");
shutdownRequestedInput.checkpointer().checkpoint();
} catch (ShutdownException | InvalidStateException e) {
log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
} finally {
MDC.remove(SHARD_ID_MDC_KEY);
}
}
}

View file

@ -0,0 +1,19 @@
package software.amazon.kinesis.utils;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
RecordValidatorQueue recordValidator;
public TestRecordProcessorFactory(RecordValidatorQueue recordValidator) {
this.recordValidator = recordValidator;
}
@Override
public ShardRecordProcessor shardRecordProcessor() {
return new TestRecordProcessor(this.recordValidator);
}
}