Adding testing architecture and KCL 2.x basic polling and streaming tests

This commit is contained in:
Meher Mankikar 2023-06-09 11:23:11 -07:00
parent a8fc1367c6
commit f7d286ac2e
15 changed files with 1271 additions and 0 deletions

View file

@ -207,6 +207,10 @@
<name>sqlite4java.library.path</name>
<value>${sqlite4java.libpath}</value>
</property>
<property>
<name>credentials</name>
<value>${credentials}</value>
</property>
</systemProperties>
</configuration>
</plugin>

View file

@ -0,0 +1,55 @@
package software.amazon.kinesis.common;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mock;
import software.amazon.kinesis.utils.RecordValidatorQueue;
public class RecordValidatorQueueTest {
@Mock
private RecordValidatorQueue recordValidator;
private static final String shardId = "ABC";
private final int outOfOrderError = -1;
private final int missingRecordError = -2;
private final int noError = 0;
@Test
public void validationFailedRecordOutOfOrderTest() {
recordValidator = new RecordValidatorQueue();
recordValidator.add(shardId, "0");
recordValidator.add(shardId, "1");
recordValidator.add(shardId, "3");
recordValidator.add(shardId, "2");
int error = recordValidator.validateRecords( 4 );
Assert.assertEquals(outOfOrderError, error);
}
@Test
public void validationFailedMissingRecordTest() {
recordValidator = new RecordValidatorQueue();
recordValidator.add(shardId, "0");
recordValidator.add(shardId, "1");
recordValidator.add(shardId, "2");
recordValidator.add(shardId, "3");
int error = recordValidator.validateRecords( 5 );
Assert.assertEquals(missingRecordError, error);
}
@Test
public void validRecordsTest() {
recordValidator = new RecordValidatorQueue();
recordValidator.add(shardId, "0");
recordValidator.add(shardId, "1");
recordValidator.add(shardId, "2");
recordValidator.add(shardId, "3");
int error = recordValidator.validateRecords( 4 );
Assert.assertEquals(noError, error);
}
}

View file

@ -0,0 +1,236 @@
package software.amazon.kinesis.config;
import com.amazonaws.auth.AWSCredentialsProvider;
import software.amazon.kinesis.utils.RecordValidatorQueue;
import software.amazon.kinesis.integration_tests.TestRecordProcessorFactoryV2;
import software.amazon.kinesis.utils.KCLVersion;
import lombok.Builder;
import lombok.Data;
import software.amazon.awssdk.arns.Arn;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
import software.amazon.awssdk.http.Protocol;
import software.amazon.awssdk.http.SdkHttpConfigurationOption;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClientBuilder;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClientBuilder;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder;
import software.amazon.awssdk.utils.AttributeMap;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.retrieval.RetrievalConfig;
import software.amazon.kinesis.utils.OdinCredentialsHelper;
import java.io.IOException;
import java.net.Inet4Address;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.Optional;
public interface KCLAppConfig {
String getStreamName();
default String getStreamArn() {
return null;
}
int getShardCount();
String getApplicationName();
default KCLVersion getKCLVersion() {
return KCLVersion.KCL2X;
}
default boolean canaryMonitorEnabled() {
return false;
}
String getEndpoint();
Region getRegion();
boolean isProd();
default boolean durabilityCheck() {
return true;
}
// "default" profile, should match with profiles listed in "cat ~/.aws/config"
String getProfile();
default String odinMaterialName() {
return null;
}
default AWSCredentialsProvider getSyncAwsCredentials() throws IOException {
return OdinCredentialsHelper.getSyncAwsCredentialsFromMaterialSet( odinMaterialName() );
}
default AwsCredentialsProvider getAsyncAwsCredentials() throws IOException {
return OdinCredentialsHelper.getAsyncAwsCredentialsFromMaterialSet( odinMaterialName() );
}
// '-1' means round robin across 0, 5_000, 15_000, 30_000 milliseconds delay.
// The delay period is picked according to current time, so expected to be unpredictable across different KCL runs.
// '0' means PassThroughRecordProcessor
// Any other constant will delay according to the specified value.
long getProcessingDelayMillis();
InitialPositionInStream getKclInitialPosition();
Protocol getConsumerProtocol();
Protocol getProducerProtocol();
ProducerConfig getProducerConfig();
ReshardConfig getReshardConfig();
default MultiStreamRotatorConfig getMultiStreamRotatorConfig() {
throw new UnsupportedOperationException();
}
default KinesisAsyncClient buildConsumerClient() throws URISyntaxException, IOException {
return buildAsyncKinesisClient( getConsumerProtocol() );
}
default KinesisAsyncClient buildProducerClient() throws URISyntaxException, IOException {
return buildAsyncKinesisClient( getProducerProtocol() );
}
default KinesisAsyncClient buildAsyncKinesisClient( Protocol protocol ) throws URISyntaxException, IOException {
return buildAsyncKinesisClient( Optional.ofNullable( protocol ) );
}
default KinesisAsyncClient buildAsyncKinesisClient( Optional< Protocol > protocol ) throws URISyntaxException, IOException {
// Setup H2 client config.
final NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder()
.maxConcurrency( Integer.MAX_VALUE );
if ( protocol.isPresent() ) {
builder.protocol( protocol.get() );
}
final SdkAsyncHttpClient sdkAsyncHttpClient =
builder.buildWithDefaults( AttributeMap.builder().put( SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, true ).build() );
// Setup client builder by default values
final KinesisAsyncClientBuilder kinesisAsyncClientBuilder = KinesisAsyncClient.builder().region( getRegion() );
// Override endpoint if not one of the Prod stacks.
// if (!isProd()) {
// kinesisAsyncClientBuilder
// .endpointOverride(new URI(getEndpoint()));
// }
kinesisAsyncClientBuilder.httpClient( sdkAsyncHttpClient );
if ( getAsyncAwsCredentials() != null ) {
kinesisAsyncClientBuilder.credentialsProvider( getAsyncAwsCredentials() );
} else if ( getProfile() != null ) {
kinesisAsyncClientBuilder.credentialsProvider( ProfileCredentialsProvider.builder().profileName( getProfile() ).build() );
} else {
kinesisAsyncClientBuilder.credentialsProvider( DefaultCredentialsProvider.create() );
}
return kinesisAsyncClientBuilder.build();
}
default DynamoDbAsyncClient buildAsyncDynamoDbClient() throws IOException {
final DynamoDbAsyncClientBuilder builder = DynamoDbAsyncClient.builder().region( getRegion() );
if ( getAsyncAwsCredentials() != null ) {
builder.credentialsProvider( getAsyncAwsCredentials() );
} else if ( getProfile() != null ) {
builder.credentialsProvider( ProfileCredentialsProvider.builder().profileName( getProfile() ).build() );
} else {
builder.credentialsProvider( DefaultCredentialsProvider.create() );
}
return builder.build();
}
default CloudWatchAsyncClient buildAsyncCloudWatchClient() throws IOException {
final CloudWatchAsyncClientBuilder builder = CloudWatchAsyncClient.builder().region( getRegion() );
if ( getAsyncAwsCredentials() != null ) {
builder.credentialsProvider( getAsyncAwsCredentials() );
} else if ( getProfile() != null ) {
builder.credentialsProvider( ProfileCredentialsProvider.builder().profileName( getProfile() ).build() );
} else {
builder.credentialsProvider( DefaultCredentialsProvider.create() );
}
return builder.build();
}
default String getWorkerId() throws UnknownHostException {
return Inet4Address.getLocalHost().getHostName();
}
default RecordValidatorQueue getRecordValidator() {
return new RecordValidatorQueue();
}
default ShardRecordProcessorFactory getShardRecordProcessorFactory() {
if (getKCLVersion() == KCLVersion.KCL2X) {
return new TestRecordProcessorFactoryV2( getRecordValidator() );
} else {
return null;
}
}
default ConfigsBuilder getConfigsBuilder() throws IOException, URISyntaxException {
return getConfigsBuilder( "" );
}
default ConfigsBuilder getConfigsBuilder( String workerIdSuffix ) throws IOException, URISyntaxException {
final String workerId = getWorkerId() + workerIdSuffix;
if ( getStreamArn() == null ) {
return new ConfigsBuilder( getStreamName(), getApplicationName(), buildConsumerClient(), buildAsyncDynamoDbClient(),
buildAsyncCloudWatchClient(), workerId, getShardRecordProcessorFactory() );
} else {
return new ConfigsBuilder( Arn.fromString( getStreamArn() ), getApplicationName(), buildConsumerClient(), buildAsyncDynamoDbClient(),
buildAsyncCloudWatchClient(), workerId, getShardRecordProcessorFactory() );
}
}
RetrievalConfig getRetrievalConfig() throws IOException, URISyntaxException;
@Data
@Builder
class ProducerConfig {
private boolean isBatchPut;
private int batchSize;
private int recordSizeKB;
private long callPeriodMills;
}
@Data
@Builder
class ReshardConfig {
private Double[] reshardingFactorCycle;
private int numReshardCycles;
private long reshardFrequencyMillis;
}
@Data
@Builder
class MultiStreamRotatorConfig {
private int totalStreams;
private int maxStreamsToProcess;
private long streamsRotationMillis;
}
}

View file

@ -0,0 +1,105 @@
package software.amazon.kinesis.config;
import software.amazon.awssdk.http.Protocol;
import software.amazon.awssdk.regions.Region;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.retrieval.RetrievalConfig;
import software.amazon.kinesis.retrieval.polling.PollingConfig;
import java.io.IOException;
import java.net.URISyntaxException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Date;
public class KCLReleaseCanary2XPollingH1TestConfig implements KCLAppConfig {
@Override
public String getStreamName() {
return "KCLReleaseCanary2XPollingH1TestConfig";
}
@Override
public int getShardCount() {
return 20;
}
@Override
public String getApplicationName() {
return "KCLReleaseCanary2XPollingH1TestConfigApplication";
}
@Override
public String getEndpoint() {
return "";
}
@Override
public Region getRegion() {
return Region.US_WEST_2;
}
@Override
public boolean isProd() {
return true;
}
@Override
public String getProfile() {
String iamUser = System.getProperty( "credentials" );
return iamUser;
}
@Override
public long getProcessingDelayMillis() {
return -1;
}
@Override
public InitialPositionInStream getKclInitialPosition() {
return InitialPositionInStream.TRIM_HORIZON;
}
@Override
public Protocol getConsumerProtocol() {
return Protocol.HTTP1_1;
}
@Override
public Protocol getProducerProtocol() {
return Protocol.HTTP1_1;
}
@Override
public ProducerConfig getProducerConfig() {
return ProducerConfig.builder()
.isBatchPut( false )
.batchSize( 1 )
.recordSizeKB( 60 )
.callPeriodMills( 100 )
.build();
}
@Override
public ReshardConfig getReshardConfig() {
return null;
}
@Override
public RetrievalConfig getRetrievalConfig() throws IOException, URISyntaxException {
LocalDateTime d = LocalDateTime.now();
d = d.minusMinutes( 5 );
Instant instant = d.atZone( ZoneId.systemDefault() ).toInstant();
Date startStreamTime = Date.from( instant );
InitialPositionInStreamExtended initialPosition = InitialPositionInStreamExtended
.newInitialPositionAtTimestamp( startStreamTime );
RetrievalConfig config = getConfigsBuilder().retrievalConfig();
config.initialPositionInStreamExtended( initialPosition );
config.retrievalSpecificConfig( new PollingConfig( getStreamName(), buildConsumerClient() ) );
return config;
}
}

View file

@ -0,0 +1,106 @@
package software.amazon.kinesis.config;
import software.amazon.awssdk.http.Protocol;
import software.amazon.awssdk.regions.Region;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.retrieval.RetrievalConfig;
import software.amazon.kinesis.retrieval.polling.PollingConfig;
import java.io.IOException;
import java.net.URISyntaxException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Date;
public class KCLReleaseCanary2XPollingH2TestConfig implements KCLAppConfig {
@Override
public String getStreamName() {
return "KCLTest3";
}
@Override
public int getShardCount() {
return 20;
}
@Override
public String getApplicationName() {
return "KCLReleaseCanary2XPollingH2TestApplication";
}
@Override
public String getEndpoint() {
return "";
}
@Override
public Region getRegion() {
return Region.US_WEST_2;
}
@Override
public boolean isProd() {
return true;
}
@Override
public String getProfile() {
String iamUser = System.getProperty( "credentials" );
return iamUser;
}
@Override
public long getProcessingDelayMillis() {
return -1;
}
@Override
public InitialPositionInStream getKclInitialPosition() {
return InitialPositionInStream.TRIM_HORIZON;
}
@Override
public Protocol getConsumerProtocol() {
return Protocol.HTTP2;
}
@Override
public Protocol getProducerProtocol() {
return Protocol.HTTP1_1;
}
@Override
public ProducerConfig getProducerConfig() {
return ProducerConfig.builder()
.isBatchPut( false )
.batchSize( 1 )
.recordSizeKB( 60 )
.callPeriodMills( 100 )
.build();
}
@Override
public ReshardConfig getReshardConfig() {
return null;
}
@Override
public RetrievalConfig getRetrievalConfig() throws IOException, URISyntaxException {
LocalDateTime d = LocalDateTime.now();
d = d.minusMinutes( 5 );
Instant instant = d.atZone( ZoneId.systemDefault() ).toInstant();
Date startStreamTime = Date.from( instant );
InitialPositionInStreamExtended initialPosition = InitialPositionInStreamExtended
.newInitialPositionAtTimestamp( startStreamTime );
RetrievalConfig config = getConfigsBuilder().retrievalConfig();
config.initialPositionInStreamExtended( initialPosition );
config.retrievalSpecificConfig( new PollingConfig( getStreamName(), buildConsumerClient() ) );
return config;
}
}

View file

@ -0,0 +1,104 @@
package software.amazon.kinesis.config;
import software.amazon.awssdk.http.Protocol;
import software.amazon.awssdk.regions.Region;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.retrieval.RetrievalConfig;
import java.io.IOException;
import java.net.URISyntaxException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Date;
public class KCLReleaseCanary2XStreamingTestConfig implements KCLAppConfig {
@Override
public String getStreamName() {
return "KCLReleaseCanary2XStreamingTestStream";
}
@Override
public int getShardCount() {
return 10;
}
@Override
public String getApplicationName() {
return "KCLReleaseCanary2XStreamingTestApplication";
}
@Override
public String getEndpoint() {
return "";
}
@Override
public Region getRegion() {
return Region.US_WEST_2;
}
@Override
public boolean isProd() {
return true;
}
@Override
public String getProfile() {
String iamUser = System.getProperty( "credentials" );
return iamUser;
}
@Override
public long getProcessingDelayMillis() {
return 50;
}
@Override
public InitialPositionInStream getKclInitialPosition() {
return InitialPositionInStream.TRIM_HORIZON;
}
@Override
public Protocol getConsumerProtocol() {
return Protocol.HTTP2;
}
@Override
public Protocol getProducerProtocol() {
return Protocol.HTTP1_1;
}
@Override
public ProducerConfig getProducerConfig() {
return ProducerConfig.builder()
.isBatchPut( false )
.batchSize( 1 )
.recordSizeKB( 60 )
.callPeriodMills( 100 )
.build();
}
@Override
public ReshardConfig getReshardConfig() {
return null;
}
@Override
public RetrievalConfig getRetrievalConfig() throws IOException, URISyntaxException {
LocalDateTime d = LocalDateTime.now();
d = d.minusMinutes( 5 );
Instant instant = d.atZone( ZoneId.systemDefault() ).toInstant();
Date startStreamTime = Date.from( instant );
InitialPositionInStreamExtended initialPosition = InitialPositionInStreamExtended
.newInitialPositionAtTimestamp( startStreamTime );
RetrievalConfig config = getConfigsBuilder().retrievalConfig();
config.initialPositionInStreamExtended( initialPosition );
return config;
}
}

View file

@ -0,0 +1,35 @@
package software.amazon.kinesis.integration_tests;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import software.amazon.kinesis.config.KCLAppConfig;
@Slf4j
public class KCL2XIntegrationTest {
private static final String CONFIG_PACKAGE = "software.amazon.kinesis.config";
@Test
public void KCLReleaseCanary2XPollingH2Test() throws Exception {
String[] configName = { "KCLReleaseCanary2XPollingH2TestConfig" };
KCLAppConfig consumerConfig = (KCLAppConfig) Class.forName(CONFIG_PACKAGE + "." + configName[0]).newInstance();
TestConsumerV2 consumer = new TestConsumerV2( consumerConfig );
consumer.run();
}
@Test
public void KCLReleaseCanary2XPollingH1Test() throws Exception {
String[] configName = { "KCLReleaseCanary2XPollingH1TestConfig" };
KCLAppConfig consumerConfig = (KCLAppConfig) Class.forName(CONFIG_PACKAGE + "." + configName[0]).newInstance();
TestConsumerV2 consumer = new TestConsumerV2( consumerConfig );
consumer.run();
}
@Test
public void KCLReleaseCanary2XStreamingTest() throws Exception {
String[] configName = { "KCLReleaseCanary2XStreamingTestConfig" };
KCLAppConfig consumerConfig = (KCLAppConfig) Class.forName(CONFIG_PACKAGE + "." + configName[0]).newInstance();
TestConsumerV2 consumer = new TestConsumerV2( consumerConfig );
consumer.run();
}
}

View file

@ -0,0 +1,101 @@
package software.amazon.kinesis.integration_tests;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.http.Protocol;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryResponse;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import software.amazon.awssdk.services.kinesis.model.StreamStatus;
import software.amazon.kinesis.config.KCLAppConfig;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import static java.lang.Thread.sleep;
@Data
@Slf4j
public class StreamExistenceManager {
private final KinesisAsyncClient client;
private final KCLAppConfig testConfig;
public StreamExistenceManager(KCLAppConfig config) throws URISyntaxException, IOException {
this.testConfig = config;
this.client = config.buildAsyncKinesisClient(Protocol.HTTP1_1);
}
public static StreamExistenceManager newManager(KCLAppConfig config) throws URISyntaxException, IOException {
return new StreamExistenceManager(config);
}
private boolean isStreamActive(String streamName) {
DescribeStreamSummaryRequest request = DescribeStreamSummaryRequest.builder().streamName(streamName).build();
final CompletableFuture<DescribeStreamSummaryResponse> describeStreamSummaryResponseCompletableFuture = client.describeStreamSummary(request);
try {
final DescribeStreamSummaryResponse response = describeStreamSummaryResponseCompletableFuture.get(30, TimeUnit.SECONDS);
boolean isActive = response.streamDescriptionSummary().streamStatus().equals(StreamStatus.ACTIVE);
if (!isActive) {
throw new RuntimeException("Stream is not active, instead in status: " + response.streamDescriptionSummary().streamStatus());
}
return true;
} catch (ExecutionException e) {
if (e.getCause() instanceof ResourceNotFoundException) {
return false;
}
else {
throw new RuntimeException(e);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private void createStream(String streamName, int shardCount) {
CreateStreamRequest request = CreateStreamRequest.builder().streamName(streamName).shardCount(shardCount).build();
try {
client.createStream(request).get(30, TimeUnit.SECONDS);
} catch (Exception e) {
throw new RuntimeException("Failed to create stream with name " + streamName, e);
}
int i = 0;
while (true) {
i++;
if (i > 100) {
throw new RuntimeException("Failed stream creation, did not transition into active");
}
try {
boolean isActive = isStreamActive(streamName);
if (isActive) {
log.info("Succesfully created the stream " + streamName);
return;
}
} catch (Exception e) {
try {
sleep(10_000); // 10 secs backoff.
} catch (InterruptedException e1) {
log.error("Failed to sleep");
}
log.info("Stream {} is not active yet, exception: ", streamName, e);
}
}
}
public void checkStreamAndCreateIfNecessary(String streamName) {
if (!isStreamActive(streamName)) {
createStream(streamName, testConfig.getShardCount());
}
log.info("Using stream " + streamName + " in endpoint " + testConfig.getEndpoint() + " with region " + testConfig.getRegion());
}
}

View file

@ -0,0 +1,70 @@
package software.amazon.kinesis.integration_tests;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.config.KCLAppConfig;
import org.apache.commons.lang3.RandomStringUtils;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutionException;
public class TestConsumer {
private static final Logger log = LoggerFactory.getLogger( TestConsumer.class );
public final KCLAppConfig consumerConfig;
public final Region region;
public final String streamName;
public final KinesisAsyncClient kinesisClient;
public int successfulPutRecords = 0;
public BigInteger payloadCounter = new BigInteger( "0" );
public TestConsumer( KCLAppConfig consumerConfig ) {
this.consumerConfig = consumerConfig;
this.region = consumerConfig.getRegion();
this.streamName = consumerConfig.getStreamName();
this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient( KinesisAsyncClient.builder().region( this.region ) );
}
public void publishRecord() {
PutRecordRequest request;
try {
request = PutRecordRequest.builder()
.partitionKey( RandomStringUtils.randomAlphabetic( 5, 20 ) )
.streamName( streamName )
.data( SdkBytes.fromByteBuffer( wrapWithCounter( 5, payloadCounter ) ) ) // 1024 is 1 KB
.build();
kinesisClient.putRecord( request ).get();
// Increment the payload counter if the putRecord call was successful
payloadCounter = payloadCounter.add( new BigInteger( "1" ) );
successfulPutRecords += 1;
} catch ( InterruptedException e ) {
log.info( "Interrupted, assuming shutdown." );
} catch ( ExecutionException e ) {
log.error( "Error during publishRecord. Will try again next cycle", e );
} catch ( RuntimeException e ) {
log.error( "Error while creating request", e );
}
}
private ByteBuffer wrapWithCounter( int payloadSize, BigInteger payloadCounter ) throws RuntimeException {
byte[] returnData;
log.info( "--------------Putting record with data: {}", payloadCounter );
ObjectMapper mapper = new ObjectMapper();
try {
returnData = mapper.writeValueAsBytes( payloadCounter );
} catch ( Exception e ) {
log.error( "Error creating payload data for {}", payloadCounter.toString() );
throw new RuntimeException( "Error converting object to bytes: ", e );
}
return ByteBuffer.wrap( returnData );
}
}

View file

@ -0,0 +1,142 @@
package software.amazon.kinesis.integration_tests;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.kinesis.checkpoint.CheckpointConfig;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.config.KCLAppConfig;
import software.amazon.kinesis.coordinator.CoordinatorConfig;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.leases.LeaseManagementConfig;
import software.amazon.kinesis.lifecycle.LifecycleConfig;
import software.amazon.kinesis.metrics.MetricsConfig;
import software.amazon.kinesis.processor.ProcessorConfig;
import software.amazon.kinesis.retrieval.RetrievalConfig;
import software.amazon.kinesis.utils.RecordValidatorQueue;
import java.util.UUID;
import java.util.concurrent.*;
public class TestConsumerV2 extends TestConsumer {
private static final Logger log = LoggerFactory.getLogger( TestConsumerV2.class );
private final int outOfOrderError = -1;
private final int missingRecordError = -2;
private MetricsConfig metricsConfig;
private RetrievalConfig retrievalConfig;
private CheckpointConfig checkpointConfig;
private CoordinatorConfig coordinatorConfig;
private LeaseManagementConfig leaseManagementConfig;
private LifecycleConfig lifecycleConfig;
private ProcessorConfig processorConfig;
public TestConsumerV2( KCLAppConfig consumerConfig ) {
super( consumerConfig );
}
public void run() throws Exception {
/**
* Check if stream is created. If not, create it
*/
StreamExistenceManager.newManager( this.consumerConfig ).checkStreamAndCreateIfNecessary( this.streamName );
/**
* Send dummy data to stream
*/
ScheduledExecutorService producerExecutor = Executors.newSingleThreadScheduledExecutor();
ScheduledFuture<?> producerFuture = producerExecutor.scheduleAtFixedRate( this::publishRecord, 10, 1, TimeUnit.SECONDS );
RecordValidatorQueue recordValidator = new RecordValidatorQueue();
/**
* Setup configuration of KCL (including DynamoDB and CloudWatch)
*/
DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region( region ).build();
CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region( region ).build();
ConfigsBuilder configsBuilder = new ConfigsBuilder( streamName, streamName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new TestRecordProcessorFactoryV2( recordValidator ) );
retrievalConfig = consumerConfig.getRetrievalConfig();
checkpointConfig = configsBuilder.checkpointConfig();
coordinatorConfig = configsBuilder.coordinatorConfig();
leaseManagementConfig = configsBuilder.leaseManagementConfig()
.initialPositionInStream( InitialPositionInStreamExtended.newInitialPosition( consumerConfig.getKclInitialPosition() ) )
.initialLeaseTableReadCapacity( 50 ).initialLeaseTableWriteCapacity( 50 );
lifecycleConfig = configsBuilder.lifecycleConfig();
processorConfig = configsBuilder.processorConfig();
metricsConfig = configsBuilder.metricsConfig();
/**
* Create Scheduler
*/
Scheduler scheduler = new Scheduler(
checkpointConfig,
coordinatorConfig,
leaseManagementConfig,
lifecycleConfig,
metricsConfig,
processorConfig,
retrievalConfig
);
/**
* Start record processing of dummy data
*/
Thread schedulerThread = new Thread( scheduler );
schedulerThread.setDaemon( true );
schedulerThread.start();
/**
* Sleep for two minutes to allow the producer/consumer to run and then end the test case.
*/
try {
Thread.sleep( TimeUnit.SECONDS.toMillis( 60 * 2 ) ); // 60 * 2
} catch ( InterruptedException e ) {
throw new RuntimeException( e );
}
/**
* Stops sending dummy data.
*/
log.info( "Cancelling producer and shutting down executor." );
producerFuture.cancel( true );
producerExecutor.shutdownNow();
/**
* Wait a few seconds for the last few records to be processed
*/
Thread.sleep( TimeUnit.SECONDS.toMillis( 10 ) );
/**
* Stops consuming data. Finishes processing the current batch of data already received from Kinesis
* before shutting down.
*/
Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
log.info( "Waiting up to 20 seconds for shutdown to complete." );
try {
gracefulShutdownFuture.get( 20, TimeUnit.SECONDS );
} catch ( InterruptedException e ) {
log.info( "Interrupted while waiting for graceful shutdown. Continuing." );
} catch ( ExecutionException e ) {
log.error( "Exception while executing graceful shutdown.", e );
} catch ( TimeoutException e ) {
log.error( "Timeout while waiting for shutdown. Scheduler may not have exited." );
}
log.info( "Completed, shutting down now." );
/**
* Validate processed data
*/
int errorVal = recordValidator.validateRecords( successfulPutRecords );
if ( errorVal == outOfOrderError ) {
throw new RuntimeException( "There was an error validating the records that were processed. The records were out of order" );
} else if ( errorVal == missingRecordError ) {
throw new RuntimeException( "There was an error validating the records that were processed. Some records were missing." );
}
log.info( "--------------Completed validation of processed records.--------------" );
}
}

View file

@ -0,0 +1,20 @@
package software.amazon.kinesis.integration_tests;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.utils.RecordValidatorQueue;
public class TestRecordProcessorFactoryV2 implements ShardRecordProcessorFactory {
RecordValidatorQueue recordValidator;
public TestRecordProcessorFactoryV2( RecordValidatorQueue recordValidator ) {
this.recordValidator = recordValidator;
}
@Override
public ShardRecordProcessor shardRecordProcessor() {
return new TestRecordProcessorV2( this.recordValidator );
}
}

View file

@ -0,0 +1,107 @@
package software.amazon.kinesis.integration_tests;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.retrieval.KinesisClientRecord;
import software.amazon.kinesis.utils.RecordValidatorQueue;
import java.nio.ByteBuffer;
public class TestRecordProcessorV2 implements ShardRecordProcessor {
private static final String SHARD_ID_MDC_KEY = "ShardId";
private static final Logger log = LoggerFactory.getLogger( TestRecordProcessorV2.class );
private String shardId;
RecordValidatorQueue recordValidator;
public TestRecordProcessorV2( RecordValidatorQueue recordValidator ) {
this.recordValidator = recordValidator;
}
@Override
public void initialize( InitializationInput initializationInput ) {
shardId = initializationInput.shardId();
MDC.put( SHARD_ID_MDC_KEY, shardId );
try {
log.info( "Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber() );
} finally {
MDC.remove( SHARD_ID_MDC_KEY );
}
}
@Override
public void processRecords( software.amazon.kinesis.lifecycle.events.ProcessRecordsInput processRecordsInput ) {
MDC.put( SHARD_ID_MDC_KEY, shardId );
try {
log.info( "Processing {} record(s)", processRecordsInput.records().size() );
for ( KinesisClientRecord r : processRecordsInput.records() ) {
String data = new String( asByteArray( r.data() ) );
log.info( "Processing record pk: {}", data );
recordValidator.add( shardId, data );
}
} catch ( Throwable t ) {
log.error( "Caught throwable while processing records. Aborting.", t );
Runtime.getRuntime().halt( 1 );
} finally {
MDC.remove( SHARD_ID_MDC_KEY );
}
}
public static byte[] asByteArray( ByteBuffer buf ) {
byte[] bytes = new byte[buf.remaining()];
buf.get( bytes );
return bytes;
}
@Override
public void leaseLost( LeaseLostInput leaseLostInput ) {
MDC.put( SHARD_ID_MDC_KEY, shardId );
try {
log.info( "Lost lease, so terminating." );
} finally {
MDC.remove( SHARD_ID_MDC_KEY );
}
}
@Override
public void shardEnded( ShardEndedInput shardEndedInput ) {
MDC.put( SHARD_ID_MDC_KEY, shardId );
try {
log.info( "Reached shard end checkpointing." );
shardEndedInput.checkpointer().checkpoint();
} catch ( ShutdownException | InvalidStateException e ) {
log.error( "Exception while checkpointing at shard end. Giving up.", e );
} finally {
MDC.remove( SHARD_ID_MDC_KEY );
}
}
@Override
public void shutdownRequested( ShutdownRequestedInput shutdownRequestedInput ) {
MDC.put( SHARD_ID_MDC_KEY, shardId );
try {
log.info( "Scheduler is shutting down, checkpointing." );
shutdownRequestedInput.checkpointer().checkpoint();
} catch ( ShutdownException | InvalidStateException e ) {
log.error( "Exception while checkpointing at requested shutdown. Giving up.", e );
} finally {
MDC.remove( SHARD_ID_MDC_KEY );
}
}
}

View file

@ -0,0 +1,6 @@
package software.amazon.kinesis.utils;
public enum KCLVersion {
KCL1X,
KCL2X
}

View file

@ -0,0 +1,114 @@
package software.amazon.kinesis.utils;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.google.common.io.CharStreams;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
/**
* Helper class to hold odin credentials because odin is not available externally and this package doesn't use brazil.
*/
@Slf4j
public class OdinCredentialsHelper {
private final static String PRINCIPAL = "Principal";
private final static String CREDENTIAL = "Credential";
private final static String ODIN_COMMAND = "/apollo/env/envImprovement/bin/odin-get -t";
private static String getMaterial(String materialName, String materialType) throws IOException {
final InputStream inputStream = Runtime.getRuntime().exec(String.format("%s %s %s", ODIN_COMMAND, materialType, materialName)).getInputStream();
return CharStreams.toString(new InputStreamReader(inputStream, StandardCharsets.UTF_8)).trim();
}
private static String getPrincipal(String materialName) throws IOException {
return getMaterial(materialName, PRINCIPAL);
}
private static String getCredential(String materialName) throws IOException {
return getMaterial(materialName, CREDENTIAL);
}
/**
* Helper method to pull credentials from odin for testing for AWS SDK sync clients (1.x).
*
* @param materialName name of the material set to fetch.
* @return access/secret key pair from Odin if specified for testing.
* @throws IOException
*/
public static AWSCredentialsProvider getSyncAwsCredentialsFromMaterialSet(String materialName) throws IOException {
if (materialName == null) {
log.debug("No material name found.");
return null;
}
log.debug("Fetching credentials for material - {}.", materialName);
final String principal = getPrincipal(materialName);
final String credential = getCredential(materialName);
final AWSCredentialsProvider awsCredentialsProvider = new AWSCredentialsProvider() {
@Override
public AWSCredentials getCredentials() {
return new AWSCredentials() {
@Override
public String getAWSAccessKeyId() {
return principal;
}
@Override
public String getAWSSecretKey() {
return credential;
}
};
}
@Override
public void refresh() {
}
};
log.debug("Successfully retrieved credentials from odin. Access key - {}.", principal);
return awsCredentialsProvider;
}
/**
* Helper method to pull credentials from odin for testing for AWS SDK async clients (2.x).
*
* @param materialName name of the material set to fetch.
* @return access/secret key pair from Odin if specified for testing.
* @throws IOException
*/
public static AwsCredentialsProvider getAsyncAwsCredentialsFromMaterialSet(String materialName) throws IOException {
if (materialName == null) {
log.debug("No material name found.");
return null;
}
log.debug("Fetching credentials for material - {}.", materialName);
final String principal = getPrincipal(materialName);
final String credential = getCredential(materialName);
final AwsCredentialsProvider awsCredentialsProvider = () -> new AwsCredentials() {
@Override
public String accessKeyId() {
return principal;
}
@Override
public String secretAccessKey() {
return credential;
}
};
log.debug("Successfully retrieved credentials from odin. Access key - {}.", principal);
return awsCredentialsProvider;
}
}

View file

@ -0,0 +1,66 @@
package software.amazon.kinesis.utils;
import lombok.extern.slf4j.Slf4j;
import java.util.*;
@Slf4j
public class RecordValidatorQueue {
HashMap<String, List<String>> dict = new HashMap<>();
public void add( String shardId, String data ) {
if ( dict.containsKey( shardId ) ) {
// Just add the data to this item
List<String> oldVal = dict.get( shardId );
oldVal.add( data );
dict.put( shardId, oldVal );
} else {
List<String> newVal = new ArrayList<>();
newVal.add( data );
dict.put( shardId, newVal );
}
}
public int validateRecords( int trueTotalShardCount ) {
// Validate that each List in the HashMap has data records in increasing order
boolean incOrder = true;
for ( Map.Entry<String, List<String>> entry : dict.entrySet() ) {
List<String> recordsPerShard = entry.getValue();
int prevVal = -1;
boolean shardIncOrder = true;
for ( String record : recordsPerShard ) {
int nextVal = Integer.parseInt( record );
if ( prevVal > nextVal ) {
log.error("The records are not in increasing order. Saw record data {} before {}.", prevVal, nextVal );
shardIncOrder = false;
}
prevVal = nextVal;
}
if ( !shardIncOrder ) {
incOrder = false;
}
}
// If this is true, then there was some record that was processed out of order
if ( !incOrder ) {
return -1;
}
// Validate that no records are missing over all shards
int totalShardCount = 0;
for ( Map.Entry<String, List<String>> entry : dict.entrySet() ) {
List<String> recordsPerShard = entry.getValue();
Set<String> noDupRecords = new HashSet<String>( recordsPerShard );
totalShardCount += noDupRecords.size();
}
// If this is true, then there was some record that was missed during processing.
if ( totalShardCount != trueTotalShardCount ) {
log.error( "Failed to get correct number of records processed. Should be {} but was {}", trueTotalShardCount, totalShardCount );
return -2;
}
return 0;
}
}