From f7d286ac2e6fea34a63f955709f75d69a1de243a Mon Sep 17 00:00:00 2001 From: Meher Mankikar Date: Fri, 9 Jun 2023 11:23:11 -0700 Subject: [PATCH] Adding testing architecture and KCL 2.x basic polling and streaming tests --- amazon-kinesis-client/pom.xml | 4 + .../common/RecordValidatorQueueTest.java | 55 ++++ .../amazon/kinesis/config/KCLAppConfig.java | 236 ++++++++++++++++++ ...KCLReleaseCanary2XPollingH1TestConfig.java | 105 ++++++++ ...KCLReleaseCanary2XPollingH2TestConfig.java | 106 ++++++++ ...KCLReleaseCanary2XStreamingTestConfig.java | 104 ++++++++ .../KCL2XIntegrationTest.java | 35 +++ .../StreamExistenceManager.java | 101 ++++++++ .../integration_tests/TestConsumer.java | 70 ++++++ .../integration_tests/TestConsumerV2.java | 142 +++++++++++ .../TestRecordProcessorFactoryV2.java | 20 ++ .../TestRecordProcessorV2.java | 107 ++++++++ .../amazon/kinesis/utils/KCLVersion.java | 6 + .../kinesis/utils/OdinCredentialsHelper.java | 114 +++++++++ .../kinesis/utils/RecordValidatorQueue.java | 66 +++++ 15 files changed, 1271 insertions(+) create mode 100644 amazon-kinesis-client/src/test/java/software/amazon/kinesis/common/RecordValidatorQueueTest.java create mode 100644 amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLAppConfig.java create mode 100644 amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLReleaseCanary2XPollingH1TestConfig.java create mode 100644 amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLReleaseCanary2XPollingH2TestConfig.java create mode 100644 amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLReleaseCanary2XStreamingTestConfig.java create mode 100644 amazon-kinesis-client/src/test/java/software/amazon/kinesis/integration_tests/KCL2XIntegrationTest.java create mode 100644 amazon-kinesis-client/src/test/java/software/amazon/kinesis/integration_tests/StreamExistenceManager.java create mode 100644 amazon-kinesis-client/src/test/java/software/amazon/kinesis/integration_tests/TestConsumer.java create mode 100644 amazon-kinesis-client/src/test/java/software/amazon/kinesis/integration_tests/TestConsumerV2.java create mode 100644 amazon-kinesis-client/src/test/java/software/amazon/kinesis/integration_tests/TestRecordProcessorFactoryV2.java create mode 100644 amazon-kinesis-client/src/test/java/software/amazon/kinesis/integration_tests/TestRecordProcessorV2.java create mode 100644 amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/KCLVersion.java create mode 100644 amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/OdinCredentialsHelper.java create mode 100644 amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/RecordValidatorQueue.java diff --git a/amazon-kinesis-client/pom.xml b/amazon-kinesis-client/pom.xml index 67c22703..a05c70c8 100644 --- a/amazon-kinesis-client/pom.xml +++ b/amazon-kinesis-client/pom.xml @@ -207,6 +207,10 @@ sqlite4java.library.path ${sqlite4java.libpath} + + credentials + ${credentials} + diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/common/RecordValidatorQueueTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/common/RecordValidatorQueueTest.java new file mode 100644 index 00000000..87e730d1 --- /dev/null +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/common/RecordValidatorQueueTest.java @@ -0,0 +1,55 @@ +package software.amazon.kinesis.common; + +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mock; +import software.amazon.kinesis.utils.RecordValidatorQueue; + +public class RecordValidatorQueueTest { + + @Mock + private RecordValidatorQueue recordValidator; + + private static final String shardId = "ABC"; + + private final int outOfOrderError = -1; + private final int missingRecordError = -2; + + private final int noError = 0; + + @Test + public void validationFailedRecordOutOfOrderTest() { + recordValidator = new RecordValidatorQueue(); + recordValidator.add(shardId, "0"); + recordValidator.add(shardId, "1"); + recordValidator.add(shardId, "3"); + recordValidator.add(shardId, "2"); + + int error = recordValidator.validateRecords( 4 ); + Assert.assertEquals(outOfOrderError, error); + } + + @Test + public void validationFailedMissingRecordTest() { + recordValidator = new RecordValidatorQueue(); + recordValidator.add(shardId, "0"); + recordValidator.add(shardId, "1"); + recordValidator.add(shardId, "2"); + recordValidator.add(shardId, "3"); + + int error = recordValidator.validateRecords( 5 ); + Assert.assertEquals(missingRecordError, error); + } + + @Test + public void validRecordsTest() { + recordValidator = new RecordValidatorQueue(); + recordValidator.add(shardId, "0"); + recordValidator.add(shardId, "1"); + recordValidator.add(shardId, "2"); + recordValidator.add(shardId, "3"); + + int error = recordValidator.validateRecords( 4 ); + Assert.assertEquals(noError, error); + } +} diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLAppConfig.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLAppConfig.java new file mode 100644 index 00000000..a9fbf2a0 --- /dev/null +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLAppConfig.java @@ -0,0 +1,236 @@ +package software.amazon.kinesis.config; + +import com.amazonaws.auth.AWSCredentialsProvider; +import software.amazon.kinesis.utils.RecordValidatorQueue; +import software.amazon.kinesis.integration_tests.TestRecordProcessorFactoryV2; +import software.amazon.kinesis.utils.KCLVersion; +import lombok.Builder; +import lombok.Data; +import software.amazon.awssdk.arns.Arn; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider; +import software.amazon.awssdk.http.Protocol; +import software.amazon.awssdk.http.SdkHttpConfigurationOption; +import software.amazon.awssdk.http.async.SdkAsyncHttpClient; +import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; +import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClientBuilder; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClientBuilder; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder; +import software.amazon.awssdk.utils.AttributeMap; +import software.amazon.kinesis.common.ConfigsBuilder; +import software.amazon.kinesis.common.InitialPositionInStream; +import software.amazon.kinesis.processor.ShardRecordProcessorFactory; +import software.amazon.kinesis.retrieval.RetrievalConfig; +import software.amazon.kinesis.utils.OdinCredentialsHelper; + +import java.io.IOException; +import java.net.Inet4Address; +import java.net.URISyntaxException; +import java.net.UnknownHostException; +import java.util.Optional; + +public interface KCLAppConfig { + + String getStreamName(); + + default String getStreamArn() { + return null; + } + + int getShardCount(); + + String getApplicationName(); + + default KCLVersion getKCLVersion() { + return KCLVersion.KCL2X; + } + + default boolean canaryMonitorEnabled() { + return false; + } + + String getEndpoint(); + + Region getRegion(); + + boolean isProd(); + + default boolean durabilityCheck() { + return true; + } + + // "default" profile, should match with profiles listed in "cat ~/.aws/config" + String getProfile(); + + default String odinMaterialName() { + return null; + } + + default AWSCredentialsProvider getSyncAwsCredentials() throws IOException { + return OdinCredentialsHelper.getSyncAwsCredentialsFromMaterialSet( odinMaterialName() ); + } + + default AwsCredentialsProvider getAsyncAwsCredentials() throws IOException { + return OdinCredentialsHelper.getAsyncAwsCredentialsFromMaterialSet( odinMaterialName() ); + } + + // '-1' means round robin across 0, 5_000, 15_000, 30_000 milliseconds delay. + // The delay period is picked according to current time, so expected to be unpredictable across different KCL runs. + // '0' means PassThroughRecordProcessor + // Any other constant will delay according to the specified value. + long getProcessingDelayMillis(); + + InitialPositionInStream getKclInitialPosition(); + + Protocol getConsumerProtocol(); + + Protocol getProducerProtocol(); + + ProducerConfig getProducerConfig(); + + ReshardConfig getReshardConfig(); + + default MultiStreamRotatorConfig getMultiStreamRotatorConfig() { + throw new UnsupportedOperationException(); + } + + default KinesisAsyncClient buildConsumerClient() throws URISyntaxException, IOException { + return buildAsyncKinesisClient( getConsumerProtocol() ); + } + + default KinesisAsyncClient buildProducerClient() throws URISyntaxException, IOException { + return buildAsyncKinesisClient( getProducerProtocol() ); + } + + default KinesisAsyncClient buildAsyncKinesisClient( Protocol protocol ) throws URISyntaxException, IOException { + return buildAsyncKinesisClient( Optional.ofNullable( protocol ) ); + } + + default KinesisAsyncClient buildAsyncKinesisClient( Optional< Protocol > protocol ) throws URISyntaxException, IOException { + + // Setup H2 client config. + final NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder() + .maxConcurrency( Integer.MAX_VALUE ); + + if ( protocol.isPresent() ) { + builder.protocol( protocol.get() ); + } + + final SdkAsyncHttpClient sdkAsyncHttpClient = + builder.buildWithDefaults( AttributeMap.builder().put( SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, true ).build() ); + + // Setup client builder by default values + final KinesisAsyncClientBuilder kinesisAsyncClientBuilder = KinesisAsyncClient.builder().region( getRegion() ); + + // Override endpoint if not one of the Prod stacks. +// if (!isProd()) { +// kinesisAsyncClientBuilder +// .endpointOverride(new URI(getEndpoint())); +// } + + kinesisAsyncClientBuilder.httpClient( sdkAsyncHttpClient ); + + + if ( getAsyncAwsCredentials() != null ) { + kinesisAsyncClientBuilder.credentialsProvider( getAsyncAwsCredentials() ); + } else if ( getProfile() != null ) { + kinesisAsyncClientBuilder.credentialsProvider( ProfileCredentialsProvider.builder().profileName( getProfile() ).build() ); + } else { + kinesisAsyncClientBuilder.credentialsProvider( DefaultCredentialsProvider.create() ); + } + + return kinesisAsyncClientBuilder.build(); + } + + default DynamoDbAsyncClient buildAsyncDynamoDbClient() throws IOException { + final DynamoDbAsyncClientBuilder builder = DynamoDbAsyncClient.builder().region( getRegion() ); + + if ( getAsyncAwsCredentials() != null ) { + builder.credentialsProvider( getAsyncAwsCredentials() ); + } else if ( getProfile() != null ) { + builder.credentialsProvider( ProfileCredentialsProvider.builder().profileName( getProfile() ).build() ); + } else { + builder.credentialsProvider( DefaultCredentialsProvider.create() ); + } + + return builder.build(); + } + + default CloudWatchAsyncClient buildAsyncCloudWatchClient() throws IOException { + final CloudWatchAsyncClientBuilder builder = CloudWatchAsyncClient.builder().region( getRegion() ); + + if ( getAsyncAwsCredentials() != null ) { + builder.credentialsProvider( getAsyncAwsCredentials() ); + } else if ( getProfile() != null ) { + builder.credentialsProvider( ProfileCredentialsProvider.builder().profileName( getProfile() ).build() ); + } else { + builder.credentialsProvider( DefaultCredentialsProvider.create() ); + } + + return builder.build(); + } + + default String getWorkerId() throws UnknownHostException { + return Inet4Address.getLocalHost().getHostName(); + } + + default RecordValidatorQueue getRecordValidator() { + return new RecordValidatorQueue(); + } + + default ShardRecordProcessorFactory getShardRecordProcessorFactory() { + if (getKCLVersion() == KCLVersion.KCL2X) { + return new TestRecordProcessorFactoryV2( getRecordValidator() ); + } else { + return null; + } + } + + default ConfigsBuilder getConfigsBuilder() throws IOException, URISyntaxException { + return getConfigsBuilder( "" ); + } + + default ConfigsBuilder getConfigsBuilder( String workerIdSuffix ) throws IOException, URISyntaxException { + final String workerId = getWorkerId() + workerIdSuffix; + if ( getStreamArn() == null ) { + return new ConfigsBuilder( getStreamName(), getApplicationName(), buildConsumerClient(), buildAsyncDynamoDbClient(), + buildAsyncCloudWatchClient(), workerId, getShardRecordProcessorFactory() ); + } else { + return new ConfigsBuilder( Arn.fromString( getStreamArn() ), getApplicationName(), buildConsumerClient(), buildAsyncDynamoDbClient(), + buildAsyncCloudWatchClient(), workerId, getShardRecordProcessorFactory() ); + } + } + + RetrievalConfig getRetrievalConfig() throws IOException, URISyntaxException; + + @Data + @Builder + class ProducerConfig { + private boolean isBatchPut; + private int batchSize; + private int recordSizeKB; + private long callPeriodMills; + } + + @Data + @Builder + class ReshardConfig { + private Double[] reshardingFactorCycle; + private int numReshardCycles; + private long reshardFrequencyMillis; + } + + @Data + @Builder + class MultiStreamRotatorConfig { + private int totalStreams; + private int maxStreamsToProcess; + private long streamsRotationMillis; + } + +} diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLReleaseCanary2XPollingH1TestConfig.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLReleaseCanary2XPollingH1TestConfig.java new file mode 100644 index 00000000..bdf543c4 --- /dev/null +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLReleaseCanary2XPollingH1TestConfig.java @@ -0,0 +1,105 @@ +package software.amazon.kinesis.config; + +import software.amazon.awssdk.http.Protocol; +import software.amazon.awssdk.regions.Region; +import software.amazon.kinesis.common.InitialPositionInStream; +import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.retrieval.RetrievalConfig; +import software.amazon.kinesis.retrieval.polling.PollingConfig; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.Date; + +public class KCLReleaseCanary2XPollingH1TestConfig implements KCLAppConfig { + @Override + public String getStreamName() { + return "KCLReleaseCanary2XPollingH1TestConfig"; + } + + @Override + public int getShardCount() { + return 20; + } + + @Override + public String getApplicationName() { + return "KCLReleaseCanary2XPollingH1TestConfigApplication"; + } + + @Override + public String getEndpoint() { + return ""; + } + + @Override + public Region getRegion() { + return Region.US_WEST_2; + } + + @Override + public boolean isProd() { + return true; + } + + @Override + public String getProfile() { + String iamUser = System.getProperty( "credentials" ); + return iamUser; + } + + @Override + public long getProcessingDelayMillis() { + return -1; + } + + @Override + public InitialPositionInStream getKclInitialPosition() { + return InitialPositionInStream.TRIM_HORIZON; + } + + @Override + public Protocol getConsumerProtocol() { + return Protocol.HTTP1_1; + } + + @Override + public Protocol getProducerProtocol() { + return Protocol.HTTP1_1; + } + + @Override + public ProducerConfig getProducerConfig() { + return ProducerConfig.builder() + .isBatchPut( false ) + .batchSize( 1 ) + .recordSizeKB( 60 ) + .callPeriodMills( 100 ) + .build(); + } + + @Override + public ReshardConfig getReshardConfig() { + return null; + } + + @Override + public RetrievalConfig getRetrievalConfig() throws IOException, URISyntaxException { + LocalDateTime d = LocalDateTime.now(); + d = d.minusMinutes( 5 ); + Instant instant = d.atZone( ZoneId.systemDefault() ).toInstant(); + Date startStreamTime = Date.from( instant ); + + InitialPositionInStreamExtended initialPosition = InitialPositionInStreamExtended + .newInitialPositionAtTimestamp( startStreamTime ); + + RetrievalConfig config = getConfigsBuilder().retrievalConfig(); + config.initialPositionInStreamExtended( initialPosition ); + config.retrievalSpecificConfig( new PollingConfig( getStreamName(), buildConsumerClient() ) ); + + return config; + } +} diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLReleaseCanary2XPollingH2TestConfig.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLReleaseCanary2XPollingH2TestConfig.java new file mode 100644 index 00000000..6d840f3c --- /dev/null +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLReleaseCanary2XPollingH2TestConfig.java @@ -0,0 +1,106 @@ +package software.amazon.kinesis.config; + +import software.amazon.awssdk.http.Protocol; +import software.amazon.awssdk.regions.Region; +import software.amazon.kinesis.common.InitialPositionInStream; +import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.retrieval.RetrievalConfig; +import software.amazon.kinesis.retrieval.polling.PollingConfig; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.Date; + +public class KCLReleaseCanary2XPollingH2TestConfig implements KCLAppConfig { + @Override + public String getStreamName() { + return "KCLTest3"; + } + + @Override + public int getShardCount() { + return 20; + } + + @Override + public String getApplicationName() { + return "KCLReleaseCanary2XPollingH2TestApplication"; + } + + @Override + public String getEndpoint() { + return ""; + } + + @Override + public Region getRegion() { + return Region.US_WEST_2; + } + + @Override + public boolean isProd() { + return true; + } + + @Override + public String getProfile() { + String iamUser = System.getProperty( "credentials" ); + return iamUser; + } + + @Override + public long getProcessingDelayMillis() { + return -1; + } + + @Override + public InitialPositionInStream getKclInitialPosition() { + return InitialPositionInStream.TRIM_HORIZON; + } + + @Override + public Protocol getConsumerProtocol() { + return Protocol.HTTP2; + } + + @Override + public Protocol getProducerProtocol() { + return Protocol.HTTP1_1; + } + + @Override + public ProducerConfig getProducerConfig() { + return ProducerConfig.builder() + .isBatchPut( false ) + .batchSize( 1 ) + .recordSizeKB( 60 ) + .callPeriodMills( 100 ) + .build(); + } + + @Override + public ReshardConfig getReshardConfig() { + return null; + } + + @Override + public RetrievalConfig getRetrievalConfig() throws IOException, URISyntaxException { + LocalDateTime d = LocalDateTime.now(); + d = d.minusMinutes( 5 ); + Instant instant = d.atZone( ZoneId.systemDefault() ).toInstant(); + Date startStreamTime = Date.from( instant ); + + InitialPositionInStreamExtended initialPosition = InitialPositionInStreamExtended + .newInitialPositionAtTimestamp( startStreamTime ); + + RetrievalConfig config = getConfigsBuilder().retrievalConfig(); + config.initialPositionInStreamExtended( initialPosition ); + config.retrievalSpecificConfig( new PollingConfig( getStreamName(), buildConsumerClient() ) ); + + return config; + } +} + diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLReleaseCanary2XStreamingTestConfig.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLReleaseCanary2XStreamingTestConfig.java new file mode 100644 index 00000000..cb809125 --- /dev/null +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLReleaseCanary2XStreamingTestConfig.java @@ -0,0 +1,104 @@ +package software.amazon.kinesis.config; + +import software.amazon.awssdk.http.Protocol; +import software.amazon.awssdk.regions.Region; +import software.amazon.kinesis.common.InitialPositionInStream; +import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.retrieval.RetrievalConfig; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.Date; + +public class KCLReleaseCanary2XStreamingTestConfig implements KCLAppConfig { + @Override + public String getStreamName() { + return "KCLReleaseCanary2XStreamingTestStream"; + } + + @Override + public int getShardCount() { + return 10; + } + + @Override + public String getApplicationName() { + return "KCLReleaseCanary2XStreamingTestApplication"; + } + + @Override + public String getEndpoint() { + return ""; + } + + @Override + public Region getRegion() { + return Region.US_WEST_2; + } + + @Override + public boolean isProd() { + return true; + } + + @Override + public String getProfile() { + String iamUser = System.getProperty( "credentials" ); + return iamUser; + } + + @Override + public long getProcessingDelayMillis() { + return 50; + } + + @Override + public InitialPositionInStream getKclInitialPosition() { + return InitialPositionInStream.TRIM_HORIZON; + } + + @Override + public Protocol getConsumerProtocol() { + return Protocol.HTTP2; + } + + @Override + public Protocol getProducerProtocol() { + return Protocol.HTTP1_1; + } + + @Override + public ProducerConfig getProducerConfig() { + return ProducerConfig.builder() + .isBatchPut( false ) + .batchSize( 1 ) + .recordSizeKB( 60 ) + .callPeriodMills( 100 ) + .build(); + } + + @Override + public ReshardConfig getReshardConfig() { + return null; + } + + @Override + public RetrievalConfig getRetrievalConfig() throws IOException, URISyntaxException { + LocalDateTime d = LocalDateTime.now(); + d = d.minusMinutes( 5 ); + Instant instant = d.atZone( ZoneId.systemDefault() ).toInstant(); + Date startStreamTime = Date.from( instant ); + + InitialPositionInStreamExtended initialPosition = InitialPositionInStreamExtended + .newInitialPositionAtTimestamp( startStreamTime ); + + RetrievalConfig config = getConfigsBuilder().retrievalConfig(); + config.initialPositionInStreamExtended( initialPosition ); + + return config; + } +} + diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/integration_tests/KCL2XIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/integration_tests/KCL2XIntegrationTest.java new file mode 100644 index 00000000..69e48112 --- /dev/null +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/integration_tests/KCL2XIntegrationTest.java @@ -0,0 +1,35 @@ +package software.amazon.kinesis.integration_tests; + +import lombok.extern.slf4j.Slf4j; +import org.junit.Test; +import software.amazon.kinesis.config.KCLAppConfig; + +@Slf4j +public class KCL2XIntegrationTest { + + private static final String CONFIG_PACKAGE = "software.amazon.kinesis.config"; + + @Test + public void KCLReleaseCanary2XPollingH2Test() throws Exception { + String[] configName = { "KCLReleaseCanary2XPollingH2TestConfig" }; + KCLAppConfig consumerConfig = (KCLAppConfig) Class.forName(CONFIG_PACKAGE + "." + configName[0]).newInstance(); + TestConsumerV2 consumer = new TestConsumerV2( consumerConfig ); + consumer.run(); + } + + @Test + public void KCLReleaseCanary2XPollingH1Test() throws Exception { + String[] configName = { "KCLReleaseCanary2XPollingH1TestConfig" }; + KCLAppConfig consumerConfig = (KCLAppConfig) Class.forName(CONFIG_PACKAGE + "." + configName[0]).newInstance(); + TestConsumerV2 consumer = new TestConsumerV2( consumerConfig ); + consumer.run(); + } + + @Test + public void KCLReleaseCanary2XStreamingTest() throws Exception { + String[] configName = { "KCLReleaseCanary2XStreamingTestConfig" }; + KCLAppConfig consumerConfig = (KCLAppConfig) Class.forName(CONFIG_PACKAGE + "." + configName[0]).newInstance(); + TestConsumerV2 consumer = new TestConsumerV2( consumerConfig ); + consumer.run(); + } +} diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/integration_tests/StreamExistenceManager.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/integration_tests/StreamExistenceManager.java new file mode 100644 index 00000000..c827b439 --- /dev/null +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/integration_tests/StreamExistenceManager.java @@ -0,0 +1,101 @@ +package software.amazon.kinesis.integration_tests; + +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import software.amazon.awssdk.http.Protocol; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryRequest; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryResponse; +import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; +import software.amazon.awssdk.services.kinesis.model.StreamStatus; +import software.amazon.kinesis.config.KCLAppConfig; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import static java.lang.Thread.sleep; + +@Data +@Slf4j +public class StreamExistenceManager { + private final KinesisAsyncClient client; + private final KCLAppConfig testConfig; + + public StreamExistenceManager(KCLAppConfig config) throws URISyntaxException, IOException { + this.testConfig = config; + this.client = config.buildAsyncKinesisClient(Protocol.HTTP1_1); + } + + public static StreamExistenceManager newManager(KCLAppConfig config) throws URISyntaxException, IOException { + return new StreamExistenceManager(config); + } + + private boolean isStreamActive(String streamName) { + + DescribeStreamSummaryRequest request = DescribeStreamSummaryRequest.builder().streamName(streamName).build(); + + final CompletableFuture describeStreamSummaryResponseCompletableFuture = client.describeStreamSummary(request); + + try { + final DescribeStreamSummaryResponse response = describeStreamSummaryResponseCompletableFuture.get(30, TimeUnit.SECONDS); + boolean isActive = response.streamDescriptionSummary().streamStatus().equals(StreamStatus.ACTIVE); + if (!isActive) { + throw new RuntimeException("Stream is not active, instead in status: " + response.streamDescriptionSummary().streamStatus()); + } + return true; + } catch (ExecutionException e) { + if (e.getCause() instanceof ResourceNotFoundException) { + return false; + } + else { + throw new RuntimeException(e); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private void createStream(String streamName, int shardCount) { + CreateStreamRequest request = CreateStreamRequest.builder().streamName(streamName).shardCount(shardCount).build(); + try { + client.createStream(request).get(30, TimeUnit.SECONDS); + } catch (Exception e) { + throw new RuntimeException("Failed to create stream with name " + streamName, e); + } + + int i = 0; + while (true) { + i++; + if (i > 100) { + throw new RuntimeException("Failed stream creation, did not transition into active"); + } + try { + boolean isActive = isStreamActive(streamName); + if (isActive) { + log.info("Succesfully created the stream " + streamName); + return; + } + } catch (Exception e) { + try { + sleep(10_000); // 10 secs backoff. + } catch (InterruptedException e1) { + log.error("Failed to sleep"); + } + log.info("Stream {} is not active yet, exception: ", streamName, e); + } + } + } + + public void checkStreamAndCreateIfNecessary(String streamName) { + + if (!isStreamActive(streamName)) { + createStream(streamName, testConfig.getShardCount()); + } + log.info("Using stream " + streamName + " in endpoint " + testConfig.getEndpoint() + " with region " + testConfig.getRegion()); + } + +} diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/integration_tests/TestConsumer.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/integration_tests/TestConsumer.java new file mode 100644 index 00000000..6f03333f --- /dev/null +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/integration_tests/TestConsumer.java @@ -0,0 +1,70 @@ +package software.amazon.kinesis.integration_tests; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.model.PutRecordRequest; +import software.amazon.kinesis.common.KinesisClientUtil; +import software.amazon.kinesis.config.KCLAppConfig; +import org.apache.commons.lang3.RandomStringUtils; + +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutionException; + +public class TestConsumer { + + private static final Logger log = LoggerFactory.getLogger( TestConsumer.class ); + public final KCLAppConfig consumerConfig; + public final Region region; + public final String streamName; + public final KinesisAsyncClient kinesisClient; + public int successfulPutRecords = 0; + public BigInteger payloadCounter = new BigInteger( "0" ); + + + public TestConsumer( KCLAppConfig consumerConfig ) { + this.consumerConfig = consumerConfig; + this.region = consumerConfig.getRegion(); + this.streamName = consumerConfig.getStreamName(); + this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient( KinesisAsyncClient.builder().region( this.region ) ); + } + + public void publishRecord() { + PutRecordRequest request; + try { + request = PutRecordRequest.builder() + .partitionKey( RandomStringUtils.randomAlphabetic( 5, 20 ) ) + .streamName( streamName ) + .data( SdkBytes.fromByteBuffer( wrapWithCounter( 5, payloadCounter ) ) ) // 1024 is 1 KB + .build(); + kinesisClient.putRecord( request ).get(); + // Increment the payload counter if the putRecord call was successful + payloadCounter = payloadCounter.add( new BigInteger( "1" ) ); + successfulPutRecords += 1; + } catch ( InterruptedException e ) { + log.info( "Interrupted, assuming shutdown." ); + } catch ( ExecutionException e ) { + log.error( "Error during publishRecord. Will try again next cycle", e ); + } catch ( RuntimeException e ) { + log.error( "Error while creating request", e ); + } + } + + private ByteBuffer wrapWithCounter( int payloadSize, BigInteger payloadCounter ) throws RuntimeException { + byte[] returnData; + log.info( "--------------Putting record with data: {}", payloadCounter ); + ObjectMapper mapper = new ObjectMapper(); + try { + returnData = mapper.writeValueAsBytes( payloadCounter ); + } catch ( Exception e ) { + log.error( "Error creating payload data for {}", payloadCounter.toString() ); + throw new RuntimeException( "Error converting object to bytes: ", e ); + } + return ByteBuffer.wrap( returnData ); + } + +} diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/integration_tests/TestConsumerV2.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/integration_tests/TestConsumerV2.java new file mode 100644 index 00000000..418064a4 --- /dev/null +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/integration_tests/TestConsumerV2.java @@ -0,0 +1,142 @@ +package software.amazon.kinesis.integration_tests; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; +import software.amazon.kinesis.checkpoint.CheckpointConfig; +import software.amazon.kinesis.common.ConfigsBuilder; +import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.config.KCLAppConfig; +import software.amazon.kinesis.coordinator.CoordinatorConfig; +import software.amazon.kinesis.coordinator.Scheduler; +import software.amazon.kinesis.leases.LeaseManagementConfig; +import software.amazon.kinesis.lifecycle.LifecycleConfig; +import software.amazon.kinesis.metrics.MetricsConfig; +import software.amazon.kinesis.processor.ProcessorConfig; +import software.amazon.kinesis.retrieval.RetrievalConfig; +import software.amazon.kinesis.utils.RecordValidatorQueue; + +import java.util.UUID; +import java.util.concurrent.*; + +public class TestConsumerV2 extends TestConsumer { + private static final Logger log = LoggerFactory.getLogger( TestConsumerV2.class ); + private final int outOfOrderError = -1; + private final int missingRecordError = -2; + private MetricsConfig metricsConfig; + private RetrievalConfig retrievalConfig; + private CheckpointConfig checkpointConfig; + private CoordinatorConfig coordinatorConfig; + private LeaseManagementConfig leaseManagementConfig; + private LifecycleConfig lifecycleConfig; + private ProcessorConfig processorConfig; + + public TestConsumerV2( KCLAppConfig consumerConfig ) { + super( consumerConfig ); + } + + public void run() throws Exception { + + /** + * Check if stream is created. If not, create it + */ + StreamExistenceManager.newManager( this.consumerConfig ).checkStreamAndCreateIfNecessary( this.streamName ); + + /** + * Send dummy data to stream + */ + ScheduledExecutorService producerExecutor = Executors.newSingleThreadScheduledExecutor(); + ScheduledFuture producerFuture = producerExecutor.scheduleAtFixedRate( this::publishRecord, 10, 1, TimeUnit.SECONDS ); + + RecordValidatorQueue recordValidator = new RecordValidatorQueue(); + + /** + * Setup configuration of KCL (including DynamoDB and CloudWatch) + */ + DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region( region ).build(); + CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region( region ).build(); + ConfigsBuilder configsBuilder = new ConfigsBuilder( streamName, streamName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new TestRecordProcessorFactoryV2( recordValidator ) ); + + + retrievalConfig = consumerConfig.getRetrievalConfig(); + checkpointConfig = configsBuilder.checkpointConfig(); + coordinatorConfig = configsBuilder.coordinatorConfig(); + leaseManagementConfig = configsBuilder.leaseManagementConfig() + .initialPositionInStream( InitialPositionInStreamExtended.newInitialPosition( consumerConfig.getKclInitialPosition() ) ) + .initialLeaseTableReadCapacity( 50 ).initialLeaseTableWriteCapacity( 50 ); + lifecycleConfig = configsBuilder.lifecycleConfig(); + processorConfig = configsBuilder.processorConfig(); + metricsConfig = configsBuilder.metricsConfig(); + + /** + * Create Scheduler + */ + Scheduler scheduler = new Scheduler( + checkpointConfig, + coordinatorConfig, + leaseManagementConfig, + lifecycleConfig, + metricsConfig, + processorConfig, + retrievalConfig + ); + + /** + * Start record processing of dummy data + */ + Thread schedulerThread = new Thread( scheduler ); + schedulerThread.setDaemon( true ); + schedulerThread.start(); + + + /** + * Sleep for two minutes to allow the producer/consumer to run and then end the test case. + */ + try { + Thread.sleep( TimeUnit.SECONDS.toMillis( 60 * 2 ) ); // 60 * 2 + } catch ( InterruptedException e ) { + throw new RuntimeException( e ); + } + + /** + * Stops sending dummy data. + */ + log.info( "Cancelling producer and shutting down executor." ); + producerFuture.cancel( true ); + producerExecutor.shutdownNow(); + + /** + * Wait a few seconds for the last few records to be processed + */ + Thread.sleep( TimeUnit.SECONDS.toMillis( 10 ) ); + + /** + * Stops consuming data. Finishes processing the current batch of data already received from Kinesis + * before shutting down. + */ + Future gracefulShutdownFuture = scheduler.startGracefulShutdown(); + log.info( "Waiting up to 20 seconds for shutdown to complete." ); + try { + gracefulShutdownFuture.get( 20, TimeUnit.SECONDS ); + } catch ( InterruptedException e ) { + log.info( "Interrupted while waiting for graceful shutdown. Continuing." ); + } catch ( ExecutionException e ) { + log.error( "Exception while executing graceful shutdown.", e ); + } catch ( TimeoutException e ) { + log.error( "Timeout while waiting for shutdown. Scheduler may not have exited." ); + } + log.info( "Completed, shutting down now." ); + + /** + * Validate processed data + */ + int errorVal = recordValidator.validateRecords( successfulPutRecords ); + if ( errorVal == outOfOrderError ) { + throw new RuntimeException( "There was an error validating the records that were processed. The records were out of order" ); + } else if ( errorVal == missingRecordError ) { + throw new RuntimeException( "There was an error validating the records that were processed. Some records were missing." ); + } + log.info( "--------------Completed validation of processed records.--------------" ); + } +} diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/integration_tests/TestRecordProcessorFactoryV2.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/integration_tests/TestRecordProcessorFactoryV2.java new file mode 100644 index 00000000..167709bb --- /dev/null +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/integration_tests/TestRecordProcessorFactoryV2.java @@ -0,0 +1,20 @@ +package software.amazon.kinesis.integration_tests; + +import software.amazon.kinesis.processor.ShardRecordProcessor; +import software.amazon.kinesis.processor.ShardRecordProcessorFactory; +import software.amazon.kinesis.utils.RecordValidatorQueue; + +public class TestRecordProcessorFactoryV2 implements ShardRecordProcessorFactory { + + RecordValidatorQueue recordValidator; + + public TestRecordProcessorFactoryV2( RecordValidatorQueue recordValidator ) { + this.recordValidator = recordValidator; + } + + @Override + public ShardRecordProcessor shardRecordProcessor() { + return new TestRecordProcessorV2( this.recordValidator ); + } + +} diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/integration_tests/TestRecordProcessorV2.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/integration_tests/TestRecordProcessorV2.java new file mode 100644 index 00000000..62e9b44d --- /dev/null +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/integration_tests/TestRecordProcessorV2.java @@ -0,0 +1,107 @@ +package software.amazon.kinesis.integration_tests; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; +import software.amazon.kinesis.exceptions.InvalidStateException; +import software.amazon.kinesis.exceptions.ShutdownException; +import software.amazon.kinesis.lifecycle.events.LeaseLostInput; +import software.amazon.kinesis.lifecycle.events.ShardEndedInput; +import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; +import software.amazon.kinesis.processor.ShardRecordProcessor; +import software.amazon.kinesis.lifecycle.events.InitializationInput; +import software.amazon.kinesis.retrieval.KinesisClientRecord; +import software.amazon.kinesis.utils.RecordValidatorQueue; + +import java.nio.ByteBuffer; + +public class TestRecordProcessorV2 implements ShardRecordProcessor { + + private static final String SHARD_ID_MDC_KEY = "ShardId"; + + private static final Logger log = LoggerFactory.getLogger( TestRecordProcessorV2.class ); + + private String shardId; + + RecordValidatorQueue recordValidator; + + public TestRecordProcessorV2( RecordValidatorQueue recordValidator ) { + this.recordValidator = recordValidator; + } + + @Override + public void initialize( InitializationInput initializationInput ) { + shardId = initializationInput.shardId(); + MDC.put( SHARD_ID_MDC_KEY, shardId ); + try { + log.info( "Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber() ); + } finally { + MDC.remove( SHARD_ID_MDC_KEY ); + } + } + + + @Override + public void processRecords( software.amazon.kinesis.lifecycle.events.ProcessRecordsInput processRecordsInput ) { + MDC.put( SHARD_ID_MDC_KEY, shardId ); + try { + log.info( "Processing {} record(s)", processRecordsInput.records().size() ); + + for ( KinesisClientRecord r : processRecordsInput.records() ) { + String data = new String( asByteArray( r.data() ) ); + log.info( "Processing record pk: {}", data ); + recordValidator.add( shardId, data ); + } + + } catch ( Throwable t ) { + log.error( "Caught throwable while processing records. Aborting.", t ); + Runtime.getRuntime().halt( 1 ); + } finally { + MDC.remove( SHARD_ID_MDC_KEY ); + } + } + + public static byte[] asByteArray( ByteBuffer buf ) { + byte[] bytes = new byte[buf.remaining()]; + buf.get( bytes ); + return bytes; + } + + @Override + public void leaseLost( LeaseLostInput leaseLostInput ) { + MDC.put( SHARD_ID_MDC_KEY, shardId ); + try { + log.info( "Lost lease, so terminating." ); + } finally { + MDC.remove( SHARD_ID_MDC_KEY ); + } + } + + @Override + public void shardEnded( ShardEndedInput shardEndedInput ) { + MDC.put( SHARD_ID_MDC_KEY, shardId ); + try { + log.info( "Reached shard end checkpointing." ); + shardEndedInput.checkpointer().checkpoint(); + } catch ( ShutdownException | InvalidStateException e ) { + log.error( "Exception while checkpointing at shard end. Giving up.", e ); + } finally { + MDC.remove( SHARD_ID_MDC_KEY ); + } + } + + @Override + public void shutdownRequested( ShutdownRequestedInput shutdownRequestedInput ) { + MDC.put( SHARD_ID_MDC_KEY, shardId ); + try { + log.info( "Scheduler is shutting down, checkpointing." ); + shutdownRequestedInput.checkpointer().checkpoint(); + } catch ( ShutdownException | InvalidStateException e ) { + log.error( "Exception while checkpointing at requested shutdown. Giving up.", e ); + } finally { + MDC.remove( SHARD_ID_MDC_KEY ); + } + } + + +} diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/KCLVersion.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/KCLVersion.java new file mode 100644 index 00000000..65b75172 --- /dev/null +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/KCLVersion.java @@ -0,0 +1,6 @@ +package software.amazon.kinesis.utils; + +public enum KCLVersion { + KCL1X, + KCL2X +} \ No newline at end of file diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/OdinCredentialsHelper.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/OdinCredentialsHelper.java new file mode 100644 index 00000000..f35830ce --- /dev/null +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/OdinCredentialsHelper.java @@ -0,0 +1,114 @@ +package software.amazon.kinesis.utils; + +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.google.common.io.CharStreams; +import lombok.extern.slf4j.Slf4j; +import software.amazon.awssdk.auth.credentials.AwsCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; + +/** + * Helper class to hold odin credentials because odin is not available externally and this package doesn't use brazil. + */ +@Slf4j +public class OdinCredentialsHelper { + + private final static String PRINCIPAL = "Principal"; + private final static String CREDENTIAL = "Credential"; + private final static String ODIN_COMMAND = "/apollo/env/envImprovement/bin/odin-get -t"; + + private static String getMaterial(String materialName, String materialType) throws IOException { + final InputStream inputStream = Runtime.getRuntime().exec(String.format("%s %s %s", ODIN_COMMAND, materialType, materialName)).getInputStream(); + return CharStreams.toString(new InputStreamReader(inputStream, StandardCharsets.UTF_8)).trim(); + } + private static String getPrincipal(String materialName) throws IOException { + return getMaterial(materialName, PRINCIPAL); + } + + private static String getCredential(String materialName) throws IOException { + return getMaterial(materialName, CREDENTIAL); + } + + /** + * Helper method to pull credentials from odin for testing for AWS SDK sync clients (1.x). + * + * @param materialName name of the material set to fetch. + * @return access/secret key pair from Odin if specified for testing. + * @throws IOException + */ + public static AWSCredentialsProvider getSyncAwsCredentialsFromMaterialSet(String materialName) throws IOException { + if (materialName == null) { + log.debug("No material name found."); + return null; + } + + log.debug("Fetching credentials for material - {}.", materialName); + + final String principal = getPrincipal(materialName); + final String credential = getCredential(materialName); + + final AWSCredentialsProvider awsCredentialsProvider = new AWSCredentialsProvider() { + @Override + public AWSCredentials getCredentials() { + return new AWSCredentials() { + @Override + public String getAWSAccessKeyId() { + return principal; + } + + @Override + public String getAWSSecretKey() { + return credential; + } + }; + } + @Override + public void refresh() { + } + }; + + log.debug("Successfully retrieved credentials from odin. Access key - {}.", principal); + + return awsCredentialsProvider; + } + + /** + * Helper method to pull credentials from odin for testing for AWS SDK async clients (2.x). + * + * @param materialName name of the material set to fetch. + * @return access/secret key pair from Odin if specified for testing. + * @throws IOException + */ + public static AwsCredentialsProvider getAsyncAwsCredentialsFromMaterialSet(String materialName) throws IOException { + if (materialName == null) { + log.debug("No material name found."); + return null; + } + + log.debug("Fetching credentials for material - {}.", materialName); + + final String principal = getPrincipal(materialName); + final String credential = getCredential(materialName); + + final AwsCredentialsProvider awsCredentialsProvider = () -> new AwsCredentials() { + @Override + public String accessKeyId() { + return principal; + } + + @Override + public String secretAccessKey() { + return credential; + } + }; + + log.debug("Successfully retrieved credentials from odin. Access key - {}.", principal); + + return awsCredentialsProvider; + } +} diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/RecordValidatorQueue.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/RecordValidatorQueue.java new file mode 100644 index 00000000..b28cf6c2 --- /dev/null +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/RecordValidatorQueue.java @@ -0,0 +1,66 @@ +package software.amazon.kinesis.utils; + +import lombok.extern.slf4j.Slf4j; +import java.util.*; + +@Slf4j +public class RecordValidatorQueue { + + HashMap> dict = new HashMap<>(); + + public void add( String shardId, String data ) { + if ( dict.containsKey( shardId ) ) { + // Just add the data to this item + List oldVal = dict.get( shardId ); + oldVal.add( data ); + dict.put( shardId, oldVal ); + } else { + List newVal = new ArrayList<>(); + newVal.add( data ); + dict.put( shardId, newVal ); + } + } + + public int validateRecords( int trueTotalShardCount ) { + // Validate that each List in the HashMap has data records in increasing order + + boolean incOrder = true; + for ( Map.Entry> entry : dict.entrySet() ) { + List recordsPerShard = entry.getValue(); + int prevVal = -1; + boolean shardIncOrder = true; + for ( String record : recordsPerShard ) { + int nextVal = Integer.parseInt( record ); + if ( prevVal > nextVal ) { + log.error("The records are not in increasing order. Saw record data {} before {}.", prevVal, nextVal ); + shardIncOrder = false; + } + prevVal = nextVal; + } + if ( !shardIncOrder ) { + incOrder = false; + } + } + + // If this is true, then there was some record that was processed out of order + if ( !incOrder ) { + return -1; + } + + // Validate that no records are missing over all shards + int totalShardCount = 0; + for ( Map.Entry> entry : dict.entrySet() ) { + List recordsPerShard = entry.getValue(); + Set noDupRecords = new HashSet( recordsPerShard ); + totalShardCount += noDupRecords.size(); + } + + // If this is true, then there was some record that was missed during processing. + if ( totalShardCount != trueTotalShardCount ) { + log.error( "Failed to get correct number of records processed. Should be {} but was {}", trueTotalShardCount, totalShardCount ); + return -2; + } + return 0; + } + +}