diff --git a/amazon-kinesis-client/pom.xml b/amazon-kinesis-client/pom.xml
index 67c22703..a05c70c8 100644
--- a/amazon-kinesis-client/pom.xml
+++ b/amazon-kinesis-client/pom.xml
@@ -207,6 +207,10 @@
sqlite4java.library.path
${sqlite4java.libpath}
+
+ credentials
+ ${credentials}
+
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/common/RecordValidatorQueueTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/common/RecordValidatorQueueTest.java
new file mode 100644
index 00000000..87e730d1
--- /dev/null
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/common/RecordValidatorQueueTest.java
@@ -0,0 +1,55 @@
+package software.amazon.kinesis.common;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mock;
+import software.amazon.kinesis.utils.RecordValidatorQueue;
+
+public class RecordValidatorQueueTest {
+
+ @Mock
+ private RecordValidatorQueue recordValidator;
+
+ private static final String shardId = "ABC";
+
+ private final int outOfOrderError = -1;
+ private final int missingRecordError = -2;
+
+ private final int noError = 0;
+
+ @Test
+ public void validationFailedRecordOutOfOrderTest() {
+ recordValidator = new RecordValidatorQueue();
+ recordValidator.add(shardId, "0");
+ recordValidator.add(shardId, "1");
+ recordValidator.add(shardId, "3");
+ recordValidator.add(shardId, "2");
+
+ int error = recordValidator.validateRecords( 4 );
+ Assert.assertEquals(outOfOrderError, error);
+ }
+
+ @Test
+ public void validationFailedMissingRecordTest() {
+ recordValidator = new RecordValidatorQueue();
+ recordValidator.add(shardId, "0");
+ recordValidator.add(shardId, "1");
+ recordValidator.add(shardId, "2");
+ recordValidator.add(shardId, "3");
+
+ int error = recordValidator.validateRecords( 5 );
+ Assert.assertEquals(missingRecordError, error);
+ }
+
+ @Test
+ public void validRecordsTest() {
+ recordValidator = new RecordValidatorQueue();
+ recordValidator.add(shardId, "0");
+ recordValidator.add(shardId, "1");
+ recordValidator.add(shardId, "2");
+ recordValidator.add(shardId, "3");
+
+ int error = recordValidator.validateRecords( 4 );
+ Assert.assertEquals(noError, error);
+ }
+}
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLAppConfig.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLAppConfig.java
new file mode 100644
index 00000000..a9fbf2a0
--- /dev/null
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLAppConfig.java
@@ -0,0 +1,236 @@
+package software.amazon.kinesis.config;
+
+import com.amazonaws.auth.AWSCredentialsProvider;
+import software.amazon.kinesis.utils.RecordValidatorQueue;
+import software.amazon.kinesis.integration_tests.TestRecordProcessorFactoryV2;
+import software.amazon.kinesis.utils.KCLVersion;
+import lombok.Builder;
+import lombok.Data;
+import software.amazon.awssdk.arns.Arn;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
+import software.amazon.awssdk.http.Protocol;
+import software.amazon.awssdk.http.SdkHttpConfigurationOption;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
+import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClientBuilder;
+import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
+import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClientBuilder;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder;
+import software.amazon.awssdk.utils.AttributeMap;
+import software.amazon.kinesis.common.ConfigsBuilder;
+import software.amazon.kinesis.common.InitialPositionInStream;
+import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
+import software.amazon.kinesis.retrieval.RetrievalConfig;
+import software.amazon.kinesis.utils.OdinCredentialsHelper;
+
+import java.io.IOException;
+import java.net.Inet4Address;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.Optional;
+
+public interface KCLAppConfig {
+
+ String getStreamName();
+
+ default String getStreamArn() {
+ return null;
+ }
+
+ int getShardCount();
+
+ String getApplicationName();
+
+ default KCLVersion getKCLVersion() {
+ return KCLVersion.KCL2X;
+ }
+
+ default boolean canaryMonitorEnabled() {
+ return false;
+ }
+
+ String getEndpoint();
+
+ Region getRegion();
+
+ boolean isProd();
+
+ default boolean durabilityCheck() {
+ return true;
+ }
+
+ // "default" profile, should match with profiles listed in "cat ~/.aws/config"
+ String getProfile();
+
+ default String odinMaterialName() {
+ return null;
+ }
+
+ default AWSCredentialsProvider getSyncAwsCredentials() throws IOException {
+ return OdinCredentialsHelper.getSyncAwsCredentialsFromMaterialSet( odinMaterialName() );
+ }
+
+ default AwsCredentialsProvider getAsyncAwsCredentials() throws IOException {
+ return OdinCredentialsHelper.getAsyncAwsCredentialsFromMaterialSet( odinMaterialName() );
+ }
+
+ // '-1' means round robin across 0, 5_000, 15_000, 30_000 milliseconds delay.
+ // The delay period is picked according to current time, so expected to be unpredictable across different KCL runs.
+ // '0' means PassThroughRecordProcessor
+ // Any other constant will delay according to the specified value.
+ long getProcessingDelayMillis();
+
+ InitialPositionInStream getKclInitialPosition();
+
+ Protocol getConsumerProtocol();
+
+ Protocol getProducerProtocol();
+
+ ProducerConfig getProducerConfig();
+
+ ReshardConfig getReshardConfig();
+
+ default MultiStreamRotatorConfig getMultiStreamRotatorConfig() {
+ throw new UnsupportedOperationException();
+ }
+
+ default KinesisAsyncClient buildConsumerClient() throws URISyntaxException, IOException {
+ return buildAsyncKinesisClient( getConsumerProtocol() );
+ }
+
+ default KinesisAsyncClient buildProducerClient() throws URISyntaxException, IOException {
+ return buildAsyncKinesisClient( getProducerProtocol() );
+ }
+
+ default KinesisAsyncClient buildAsyncKinesisClient( Protocol protocol ) throws URISyntaxException, IOException {
+ return buildAsyncKinesisClient( Optional.ofNullable( protocol ) );
+ }
+
+ default KinesisAsyncClient buildAsyncKinesisClient( Optional< Protocol > protocol ) throws URISyntaxException, IOException {
+
+ // Setup H2 client config.
+ final NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder()
+ .maxConcurrency( Integer.MAX_VALUE );
+
+ if ( protocol.isPresent() ) {
+ builder.protocol( protocol.get() );
+ }
+
+ final SdkAsyncHttpClient sdkAsyncHttpClient =
+ builder.buildWithDefaults( AttributeMap.builder().put( SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, true ).build() );
+
+ // Setup client builder by default values
+ final KinesisAsyncClientBuilder kinesisAsyncClientBuilder = KinesisAsyncClient.builder().region( getRegion() );
+
+ // Override endpoint if not one of the Prod stacks.
+// if (!isProd()) {
+// kinesisAsyncClientBuilder
+// .endpointOverride(new URI(getEndpoint()));
+// }
+
+ kinesisAsyncClientBuilder.httpClient( sdkAsyncHttpClient );
+
+
+ if ( getAsyncAwsCredentials() != null ) {
+ kinesisAsyncClientBuilder.credentialsProvider( getAsyncAwsCredentials() );
+ } else if ( getProfile() != null ) {
+ kinesisAsyncClientBuilder.credentialsProvider( ProfileCredentialsProvider.builder().profileName( getProfile() ).build() );
+ } else {
+ kinesisAsyncClientBuilder.credentialsProvider( DefaultCredentialsProvider.create() );
+ }
+
+ return kinesisAsyncClientBuilder.build();
+ }
+
+ default DynamoDbAsyncClient buildAsyncDynamoDbClient() throws IOException {
+ final DynamoDbAsyncClientBuilder builder = DynamoDbAsyncClient.builder().region( getRegion() );
+
+ if ( getAsyncAwsCredentials() != null ) {
+ builder.credentialsProvider( getAsyncAwsCredentials() );
+ } else if ( getProfile() != null ) {
+ builder.credentialsProvider( ProfileCredentialsProvider.builder().profileName( getProfile() ).build() );
+ } else {
+ builder.credentialsProvider( DefaultCredentialsProvider.create() );
+ }
+
+ return builder.build();
+ }
+
+ default CloudWatchAsyncClient buildAsyncCloudWatchClient() throws IOException {
+ final CloudWatchAsyncClientBuilder builder = CloudWatchAsyncClient.builder().region( getRegion() );
+
+ if ( getAsyncAwsCredentials() != null ) {
+ builder.credentialsProvider( getAsyncAwsCredentials() );
+ } else if ( getProfile() != null ) {
+ builder.credentialsProvider( ProfileCredentialsProvider.builder().profileName( getProfile() ).build() );
+ } else {
+ builder.credentialsProvider( DefaultCredentialsProvider.create() );
+ }
+
+ return builder.build();
+ }
+
+ default String getWorkerId() throws UnknownHostException {
+ return Inet4Address.getLocalHost().getHostName();
+ }
+
+ default RecordValidatorQueue getRecordValidator() {
+ return new RecordValidatorQueue();
+ }
+
+ default ShardRecordProcessorFactory getShardRecordProcessorFactory() {
+ if (getKCLVersion() == KCLVersion.KCL2X) {
+ return new TestRecordProcessorFactoryV2( getRecordValidator() );
+ } else {
+ return null;
+ }
+ }
+
+ default ConfigsBuilder getConfigsBuilder() throws IOException, URISyntaxException {
+ return getConfigsBuilder( "" );
+ }
+
+ default ConfigsBuilder getConfigsBuilder( String workerIdSuffix ) throws IOException, URISyntaxException {
+ final String workerId = getWorkerId() + workerIdSuffix;
+ if ( getStreamArn() == null ) {
+ return new ConfigsBuilder( getStreamName(), getApplicationName(), buildConsumerClient(), buildAsyncDynamoDbClient(),
+ buildAsyncCloudWatchClient(), workerId, getShardRecordProcessorFactory() );
+ } else {
+ return new ConfigsBuilder( Arn.fromString( getStreamArn() ), getApplicationName(), buildConsumerClient(), buildAsyncDynamoDbClient(),
+ buildAsyncCloudWatchClient(), workerId, getShardRecordProcessorFactory() );
+ }
+ }
+
+ RetrievalConfig getRetrievalConfig() throws IOException, URISyntaxException;
+
+ @Data
+ @Builder
+ class ProducerConfig {
+ private boolean isBatchPut;
+ private int batchSize;
+ private int recordSizeKB;
+ private long callPeriodMills;
+ }
+
+ @Data
+ @Builder
+ class ReshardConfig {
+ private Double[] reshardingFactorCycle;
+ private int numReshardCycles;
+ private long reshardFrequencyMillis;
+ }
+
+ @Data
+ @Builder
+ class MultiStreamRotatorConfig {
+ private int totalStreams;
+ private int maxStreamsToProcess;
+ private long streamsRotationMillis;
+ }
+
+}
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLReleaseCanary2XPollingH1TestConfig.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLReleaseCanary2XPollingH1TestConfig.java
new file mode 100644
index 00000000..bdf543c4
--- /dev/null
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLReleaseCanary2XPollingH1TestConfig.java
@@ -0,0 +1,105 @@
+package software.amazon.kinesis.config;
+
+import software.amazon.awssdk.http.Protocol;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.kinesis.common.InitialPositionInStream;
+import software.amazon.kinesis.common.InitialPositionInStreamExtended;
+import software.amazon.kinesis.retrieval.RetrievalConfig;
+import software.amazon.kinesis.retrieval.polling.PollingConfig;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.Date;
+
+public class KCLReleaseCanary2XPollingH1TestConfig implements KCLAppConfig {
+ @Override
+ public String getStreamName() {
+ return "KCLReleaseCanary2XPollingH1TestConfig";
+ }
+
+ @Override
+ public int getShardCount() {
+ return 20;
+ }
+
+ @Override
+ public String getApplicationName() {
+ return "KCLReleaseCanary2XPollingH1TestConfigApplication";
+ }
+
+ @Override
+ public String getEndpoint() {
+ return "";
+ }
+
+ @Override
+ public Region getRegion() {
+ return Region.US_WEST_2;
+ }
+
+ @Override
+ public boolean isProd() {
+ return true;
+ }
+
+ @Override
+ public String getProfile() {
+ String iamUser = System.getProperty( "credentials" );
+ return iamUser;
+ }
+
+ @Override
+ public long getProcessingDelayMillis() {
+ return -1;
+ }
+
+ @Override
+ public InitialPositionInStream getKclInitialPosition() {
+ return InitialPositionInStream.TRIM_HORIZON;
+ }
+
+ @Override
+ public Protocol getConsumerProtocol() {
+ return Protocol.HTTP1_1;
+ }
+
+ @Override
+ public Protocol getProducerProtocol() {
+ return Protocol.HTTP1_1;
+ }
+
+ @Override
+ public ProducerConfig getProducerConfig() {
+ return ProducerConfig.builder()
+ .isBatchPut( false )
+ .batchSize( 1 )
+ .recordSizeKB( 60 )
+ .callPeriodMills( 100 )
+ .build();
+ }
+
+ @Override
+ public ReshardConfig getReshardConfig() {
+ return null;
+ }
+
+ @Override
+ public RetrievalConfig getRetrievalConfig() throws IOException, URISyntaxException {
+ LocalDateTime d = LocalDateTime.now();
+ d = d.minusMinutes( 5 );
+ Instant instant = d.atZone( ZoneId.systemDefault() ).toInstant();
+ Date startStreamTime = Date.from( instant );
+
+ InitialPositionInStreamExtended initialPosition = InitialPositionInStreamExtended
+ .newInitialPositionAtTimestamp( startStreamTime );
+
+ RetrievalConfig config = getConfigsBuilder().retrievalConfig();
+ config.initialPositionInStreamExtended( initialPosition );
+ config.retrievalSpecificConfig( new PollingConfig( getStreamName(), buildConsumerClient() ) );
+
+ return config;
+ }
+}
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLReleaseCanary2XPollingH2TestConfig.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLReleaseCanary2XPollingH2TestConfig.java
new file mode 100644
index 00000000..6d840f3c
--- /dev/null
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLReleaseCanary2XPollingH2TestConfig.java
@@ -0,0 +1,106 @@
+package software.amazon.kinesis.config;
+
+import software.amazon.awssdk.http.Protocol;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.kinesis.common.InitialPositionInStream;
+import software.amazon.kinesis.common.InitialPositionInStreamExtended;
+import software.amazon.kinesis.retrieval.RetrievalConfig;
+import software.amazon.kinesis.retrieval.polling.PollingConfig;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.Date;
+
+public class KCLReleaseCanary2XPollingH2TestConfig implements KCLAppConfig {
+ @Override
+ public String getStreamName() {
+ return "KCLTest3";
+ }
+
+ @Override
+ public int getShardCount() {
+ return 20;
+ }
+
+ @Override
+ public String getApplicationName() {
+ return "KCLReleaseCanary2XPollingH2TestApplication";
+ }
+
+ @Override
+ public String getEndpoint() {
+ return "";
+ }
+
+ @Override
+ public Region getRegion() {
+ return Region.US_WEST_2;
+ }
+
+ @Override
+ public boolean isProd() {
+ return true;
+ }
+
+ @Override
+ public String getProfile() {
+ String iamUser = System.getProperty( "credentials" );
+ return iamUser;
+ }
+
+ @Override
+ public long getProcessingDelayMillis() {
+ return -1;
+ }
+
+ @Override
+ public InitialPositionInStream getKclInitialPosition() {
+ return InitialPositionInStream.TRIM_HORIZON;
+ }
+
+ @Override
+ public Protocol getConsumerProtocol() {
+ return Protocol.HTTP2;
+ }
+
+ @Override
+ public Protocol getProducerProtocol() {
+ return Protocol.HTTP1_1;
+ }
+
+ @Override
+ public ProducerConfig getProducerConfig() {
+ return ProducerConfig.builder()
+ .isBatchPut( false )
+ .batchSize( 1 )
+ .recordSizeKB( 60 )
+ .callPeriodMills( 100 )
+ .build();
+ }
+
+ @Override
+ public ReshardConfig getReshardConfig() {
+ return null;
+ }
+
+ @Override
+ public RetrievalConfig getRetrievalConfig() throws IOException, URISyntaxException {
+ LocalDateTime d = LocalDateTime.now();
+ d = d.minusMinutes( 5 );
+ Instant instant = d.atZone( ZoneId.systemDefault() ).toInstant();
+ Date startStreamTime = Date.from( instant );
+
+ InitialPositionInStreamExtended initialPosition = InitialPositionInStreamExtended
+ .newInitialPositionAtTimestamp( startStreamTime );
+
+ RetrievalConfig config = getConfigsBuilder().retrievalConfig();
+ config.initialPositionInStreamExtended( initialPosition );
+ config.retrievalSpecificConfig( new PollingConfig( getStreamName(), buildConsumerClient() ) );
+
+ return config;
+ }
+}
+
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLReleaseCanary2XStreamingTestConfig.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLReleaseCanary2XStreamingTestConfig.java
new file mode 100644
index 00000000..cb809125
--- /dev/null
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLReleaseCanary2XStreamingTestConfig.java
@@ -0,0 +1,104 @@
+package software.amazon.kinesis.config;
+
+import software.amazon.awssdk.http.Protocol;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.kinesis.common.InitialPositionInStream;
+import software.amazon.kinesis.common.InitialPositionInStreamExtended;
+import software.amazon.kinesis.retrieval.RetrievalConfig;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.Date;
+
+public class KCLReleaseCanary2XStreamingTestConfig implements KCLAppConfig {
+ @Override
+ public String getStreamName() {
+ return "KCLReleaseCanary2XStreamingTestStream";
+ }
+
+ @Override
+ public int getShardCount() {
+ return 10;
+ }
+
+ @Override
+ public String getApplicationName() {
+ return "KCLReleaseCanary2XStreamingTestApplication";
+ }
+
+ @Override
+ public String getEndpoint() {
+ return "";
+ }
+
+ @Override
+ public Region getRegion() {
+ return Region.US_WEST_2;
+ }
+
+ @Override
+ public boolean isProd() {
+ return true;
+ }
+
+ @Override
+ public String getProfile() {
+ String iamUser = System.getProperty( "credentials" );
+ return iamUser;
+ }
+
+ @Override
+ public long getProcessingDelayMillis() {
+ return 50;
+ }
+
+ @Override
+ public InitialPositionInStream getKclInitialPosition() {
+ return InitialPositionInStream.TRIM_HORIZON;
+ }
+
+ @Override
+ public Protocol getConsumerProtocol() {
+ return Protocol.HTTP2;
+ }
+
+ @Override
+ public Protocol getProducerProtocol() {
+ return Protocol.HTTP1_1;
+ }
+
+ @Override
+ public ProducerConfig getProducerConfig() {
+ return ProducerConfig.builder()
+ .isBatchPut( false )
+ .batchSize( 1 )
+ .recordSizeKB( 60 )
+ .callPeriodMills( 100 )
+ .build();
+ }
+
+ @Override
+ public ReshardConfig getReshardConfig() {
+ return null;
+ }
+
+ @Override
+ public RetrievalConfig getRetrievalConfig() throws IOException, URISyntaxException {
+ LocalDateTime d = LocalDateTime.now();
+ d = d.minusMinutes( 5 );
+ Instant instant = d.atZone( ZoneId.systemDefault() ).toInstant();
+ Date startStreamTime = Date.from( instant );
+
+ InitialPositionInStreamExtended initialPosition = InitialPositionInStreamExtended
+ .newInitialPositionAtTimestamp( startStreamTime );
+
+ RetrievalConfig config = getConfigsBuilder().retrievalConfig();
+ config.initialPositionInStreamExtended( initialPosition );
+
+ return config;
+ }
+}
+
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/integration_tests/KCL2XIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/integration_tests/KCL2XIntegrationTest.java
new file mode 100644
index 00000000..69e48112
--- /dev/null
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/integration_tests/KCL2XIntegrationTest.java
@@ -0,0 +1,35 @@
+package software.amazon.kinesis.integration_tests;
+
+import lombok.extern.slf4j.Slf4j;
+import org.junit.Test;
+import software.amazon.kinesis.config.KCLAppConfig;
+
+@Slf4j
+public class KCL2XIntegrationTest {
+
+ private static final String CONFIG_PACKAGE = "software.amazon.kinesis.config";
+
+ @Test
+ public void KCLReleaseCanary2XPollingH2Test() throws Exception {
+ String[] configName = { "KCLReleaseCanary2XPollingH2TestConfig" };
+ KCLAppConfig consumerConfig = (KCLAppConfig) Class.forName(CONFIG_PACKAGE + "." + configName[0]).newInstance();
+ TestConsumerV2 consumer = new TestConsumerV2( consumerConfig );
+ consumer.run();
+ }
+
+ @Test
+ public void KCLReleaseCanary2XPollingH1Test() throws Exception {
+ String[] configName = { "KCLReleaseCanary2XPollingH1TestConfig" };
+ KCLAppConfig consumerConfig = (KCLAppConfig) Class.forName(CONFIG_PACKAGE + "." + configName[0]).newInstance();
+ TestConsumerV2 consumer = new TestConsumerV2( consumerConfig );
+ consumer.run();
+ }
+
+ @Test
+ public void KCLReleaseCanary2XStreamingTest() throws Exception {
+ String[] configName = { "KCLReleaseCanary2XStreamingTestConfig" };
+ KCLAppConfig consumerConfig = (KCLAppConfig) Class.forName(CONFIG_PACKAGE + "." + configName[0]).newInstance();
+ TestConsumerV2 consumer = new TestConsumerV2( consumerConfig );
+ consumer.run();
+ }
+}
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/integration_tests/StreamExistenceManager.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/integration_tests/StreamExistenceManager.java
new file mode 100644
index 00000000..c827b439
--- /dev/null
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/integration_tests/StreamExistenceManager.java
@@ -0,0 +1,101 @@
+package software.amazon.kinesis.integration_tests;
+
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import software.amazon.awssdk.http.Protocol;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
+import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryRequest;
+import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryResponse;
+import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.kinesis.model.StreamStatus;
+import software.amazon.kinesis.config.KCLAppConfig;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.Thread.sleep;
+
+@Data
+@Slf4j
+public class StreamExistenceManager {
+ private final KinesisAsyncClient client;
+ private final KCLAppConfig testConfig;
+
+ public StreamExistenceManager(KCLAppConfig config) throws URISyntaxException, IOException {
+ this.testConfig = config;
+ this.client = config.buildAsyncKinesisClient(Protocol.HTTP1_1);
+ }
+
+ public static StreamExistenceManager newManager(KCLAppConfig config) throws URISyntaxException, IOException {
+ return new StreamExistenceManager(config);
+ }
+
+ private boolean isStreamActive(String streamName) {
+
+ DescribeStreamSummaryRequest request = DescribeStreamSummaryRequest.builder().streamName(streamName).build();
+
+ final CompletableFuture describeStreamSummaryResponseCompletableFuture = client.describeStreamSummary(request);
+
+ try {
+ final DescribeStreamSummaryResponse response = describeStreamSummaryResponseCompletableFuture.get(30, TimeUnit.SECONDS);
+ boolean isActive = response.streamDescriptionSummary().streamStatus().equals(StreamStatus.ACTIVE);
+ if (!isActive) {
+ throw new RuntimeException("Stream is not active, instead in status: " + response.streamDescriptionSummary().streamStatus());
+ }
+ return true;
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof ResourceNotFoundException) {
+ return false;
+ }
+ else {
+ throw new RuntimeException(e);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void createStream(String streamName, int shardCount) {
+ CreateStreamRequest request = CreateStreamRequest.builder().streamName(streamName).shardCount(shardCount).build();
+ try {
+ client.createStream(request).get(30, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to create stream with name " + streamName, e);
+ }
+
+ int i = 0;
+ while (true) {
+ i++;
+ if (i > 100) {
+ throw new RuntimeException("Failed stream creation, did not transition into active");
+ }
+ try {
+ boolean isActive = isStreamActive(streamName);
+ if (isActive) {
+ log.info("Succesfully created the stream " + streamName);
+ return;
+ }
+ } catch (Exception e) {
+ try {
+ sleep(10_000); // 10 secs backoff.
+ } catch (InterruptedException e1) {
+ log.error("Failed to sleep");
+ }
+ log.info("Stream {} is not active yet, exception: ", streamName, e);
+ }
+ }
+ }
+
+ public void checkStreamAndCreateIfNecessary(String streamName) {
+
+ if (!isStreamActive(streamName)) {
+ createStream(streamName, testConfig.getShardCount());
+ }
+ log.info("Using stream " + streamName + " in endpoint " + testConfig.getEndpoint() + " with region " + testConfig.getRegion());
+ }
+
+}
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/integration_tests/TestConsumer.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/integration_tests/TestConsumer.java
new file mode 100644
index 00000000..6f03333f
--- /dev/null
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/integration_tests/TestConsumer.java
@@ -0,0 +1,70 @@
+package software.amazon.kinesis.integration_tests;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
+import software.amazon.kinesis.common.KinesisClientUtil;
+import software.amazon.kinesis.config.KCLAppConfig;
+import org.apache.commons.lang3.RandomStringUtils;
+
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutionException;
+
+public class TestConsumer {
+
+ private static final Logger log = LoggerFactory.getLogger( TestConsumer.class );
+ public final KCLAppConfig consumerConfig;
+ public final Region region;
+ public final String streamName;
+ public final KinesisAsyncClient kinesisClient;
+ public int successfulPutRecords = 0;
+ public BigInteger payloadCounter = new BigInteger( "0" );
+
+
+ public TestConsumer( KCLAppConfig consumerConfig ) {
+ this.consumerConfig = consumerConfig;
+ this.region = consumerConfig.getRegion();
+ this.streamName = consumerConfig.getStreamName();
+ this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient( KinesisAsyncClient.builder().region( this.region ) );
+ }
+
+ public void publishRecord() {
+ PutRecordRequest request;
+ try {
+ request = PutRecordRequest.builder()
+ .partitionKey( RandomStringUtils.randomAlphabetic( 5, 20 ) )
+ .streamName( streamName )
+ .data( SdkBytes.fromByteBuffer( wrapWithCounter( 5, payloadCounter ) ) ) // 1024 is 1 KB
+ .build();
+ kinesisClient.putRecord( request ).get();
+ // Increment the payload counter if the putRecord call was successful
+ payloadCounter = payloadCounter.add( new BigInteger( "1" ) );
+ successfulPutRecords += 1;
+ } catch ( InterruptedException e ) {
+ log.info( "Interrupted, assuming shutdown." );
+ } catch ( ExecutionException e ) {
+ log.error( "Error during publishRecord. Will try again next cycle", e );
+ } catch ( RuntimeException e ) {
+ log.error( "Error while creating request", e );
+ }
+ }
+
+ private ByteBuffer wrapWithCounter( int payloadSize, BigInteger payloadCounter ) throws RuntimeException {
+ byte[] returnData;
+ log.info( "--------------Putting record with data: {}", payloadCounter );
+ ObjectMapper mapper = new ObjectMapper();
+ try {
+ returnData = mapper.writeValueAsBytes( payloadCounter );
+ } catch ( Exception e ) {
+ log.error( "Error creating payload data for {}", payloadCounter.toString() );
+ throw new RuntimeException( "Error converting object to bytes: ", e );
+ }
+ return ByteBuffer.wrap( returnData );
+ }
+
+}
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/integration_tests/TestConsumerV2.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/integration_tests/TestConsumerV2.java
new file mode 100644
index 00000000..418064a4
--- /dev/null
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/integration_tests/TestConsumerV2.java
@@ -0,0 +1,142 @@
+package software.amazon.kinesis.integration_tests;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
+import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
+import software.amazon.kinesis.checkpoint.CheckpointConfig;
+import software.amazon.kinesis.common.ConfigsBuilder;
+import software.amazon.kinesis.common.InitialPositionInStreamExtended;
+import software.amazon.kinesis.config.KCLAppConfig;
+import software.amazon.kinesis.coordinator.CoordinatorConfig;
+import software.amazon.kinesis.coordinator.Scheduler;
+import software.amazon.kinesis.leases.LeaseManagementConfig;
+import software.amazon.kinesis.lifecycle.LifecycleConfig;
+import software.amazon.kinesis.metrics.MetricsConfig;
+import software.amazon.kinesis.processor.ProcessorConfig;
+import software.amazon.kinesis.retrieval.RetrievalConfig;
+import software.amazon.kinesis.utils.RecordValidatorQueue;
+
+import java.util.UUID;
+import java.util.concurrent.*;
+
+public class TestConsumerV2 extends TestConsumer {
+ private static final Logger log = LoggerFactory.getLogger( TestConsumerV2.class );
+ private final int outOfOrderError = -1;
+ private final int missingRecordError = -2;
+ private MetricsConfig metricsConfig;
+ private RetrievalConfig retrievalConfig;
+ private CheckpointConfig checkpointConfig;
+ private CoordinatorConfig coordinatorConfig;
+ private LeaseManagementConfig leaseManagementConfig;
+ private LifecycleConfig lifecycleConfig;
+ private ProcessorConfig processorConfig;
+
+ public TestConsumerV2( KCLAppConfig consumerConfig ) {
+ super( consumerConfig );
+ }
+
+ public void run() throws Exception {
+
+ /**
+ * Check if stream is created. If not, create it
+ */
+ StreamExistenceManager.newManager( this.consumerConfig ).checkStreamAndCreateIfNecessary( this.streamName );
+
+ /**
+ * Send dummy data to stream
+ */
+ ScheduledExecutorService producerExecutor = Executors.newSingleThreadScheduledExecutor();
+ ScheduledFuture> producerFuture = producerExecutor.scheduleAtFixedRate( this::publishRecord, 10, 1, TimeUnit.SECONDS );
+
+ RecordValidatorQueue recordValidator = new RecordValidatorQueue();
+
+ /**
+ * Setup configuration of KCL (including DynamoDB and CloudWatch)
+ */
+ DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region( region ).build();
+ CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region( region ).build();
+ ConfigsBuilder configsBuilder = new ConfigsBuilder( streamName, streamName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new TestRecordProcessorFactoryV2( recordValidator ) );
+
+
+ retrievalConfig = consumerConfig.getRetrievalConfig();
+ checkpointConfig = configsBuilder.checkpointConfig();
+ coordinatorConfig = configsBuilder.coordinatorConfig();
+ leaseManagementConfig = configsBuilder.leaseManagementConfig()
+ .initialPositionInStream( InitialPositionInStreamExtended.newInitialPosition( consumerConfig.getKclInitialPosition() ) )
+ .initialLeaseTableReadCapacity( 50 ).initialLeaseTableWriteCapacity( 50 );
+ lifecycleConfig = configsBuilder.lifecycleConfig();
+ processorConfig = configsBuilder.processorConfig();
+ metricsConfig = configsBuilder.metricsConfig();
+
+ /**
+ * Create Scheduler
+ */
+ Scheduler scheduler = new Scheduler(
+ checkpointConfig,
+ coordinatorConfig,
+ leaseManagementConfig,
+ lifecycleConfig,
+ metricsConfig,
+ processorConfig,
+ retrievalConfig
+ );
+
+ /**
+ * Start record processing of dummy data
+ */
+ Thread schedulerThread = new Thread( scheduler );
+ schedulerThread.setDaemon( true );
+ schedulerThread.start();
+
+
+ /**
+ * Sleep for two minutes to allow the producer/consumer to run and then end the test case.
+ */
+ try {
+ Thread.sleep( TimeUnit.SECONDS.toMillis( 60 * 2 ) ); // 60 * 2
+ } catch ( InterruptedException e ) {
+ throw new RuntimeException( e );
+ }
+
+ /**
+ * Stops sending dummy data.
+ */
+ log.info( "Cancelling producer and shutting down executor." );
+ producerFuture.cancel( true );
+ producerExecutor.shutdownNow();
+
+ /**
+ * Wait a few seconds for the last few records to be processed
+ */
+ Thread.sleep( TimeUnit.SECONDS.toMillis( 10 ) );
+
+ /**
+ * Stops consuming data. Finishes processing the current batch of data already received from Kinesis
+ * before shutting down.
+ */
+ Future gracefulShutdownFuture = scheduler.startGracefulShutdown();
+ log.info( "Waiting up to 20 seconds for shutdown to complete." );
+ try {
+ gracefulShutdownFuture.get( 20, TimeUnit.SECONDS );
+ } catch ( InterruptedException e ) {
+ log.info( "Interrupted while waiting for graceful shutdown. Continuing." );
+ } catch ( ExecutionException e ) {
+ log.error( "Exception while executing graceful shutdown.", e );
+ } catch ( TimeoutException e ) {
+ log.error( "Timeout while waiting for shutdown. Scheduler may not have exited." );
+ }
+ log.info( "Completed, shutting down now." );
+
+ /**
+ * Validate processed data
+ */
+ int errorVal = recordValidator.validateRecords( successfulPutRecords );
+ if ( errorVal == outOfOrderError ) {
+ throw new RuntimeException( "There was an error validating the records that were processed. The records were out of order" );
+ } else if ( errorVal == missingRecordError ) {
+ throw new RuntimeException( "There was an error validating the records that were processed. Some records were missing." );
+ }
+ log.info( "--------------Completed validation of processed records.--------------" );
+ }
+}
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/integration_tests/TestRecordProcessorFactoryV2.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/integration_tests/TestRecordProcessorFactoryV2.java
new file mode 100644
index 00000000..167709bb
--- /dev/null
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/integration_tests/TestRecordProcessorFactoryV2.java
@@ -0,0 +1,20 @@
+package software.amazon.kinesis.integration_tests;
+
+import software.amazon.kinesis.processor.ShardRecordProcessor;
+import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
+import software.amazon.kinesis.utils.RecordValidatorQueue;
+
+public class TestRecordProcessorFactoryV2 implements ShardRecordProcessorFactory {
+
+ RecordValidatorQueue recordValidator;
+
+ public TestRecordProcessorFactoryV2( RecordValidatorQueue recordValidator ) {
+ this.recordValidator = recordValidator;
+ }
+
+ @Override
+ public ShardRecordProcessor shardRecordProcessor() {
+ return new TestRecordProcessorV2( this.recordValidator );
+ }
+
+}
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/integration_tests/TestRecordProcessorV2.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/integration_tests/TestRecordProcessorV2.java
new file mode 100644
index 00000000..62e9b44d
--- /dev/null
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/integration_tests/TestRecordProcessorV2.java
@@ -0,0 +1,107 @@
+package software.amazon.kinesis.integration_tests;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+import software.amazon.kinesis.exceptions.InvalidStateException;
+import software.amazon.kinesis.exceptions.ShutdownException;
+import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
+import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
+import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
+import software.amazon.kinesis.processor.ShardRecordProcessor;
+import software.amazon.kinesis.lifecycle.events.InitializationInput;
+import software.amazon.kinesis.retrieval.KinesisClientRecord;
+import software.amazon.kinesis.utils.RecordValidatorQueue;
+
+import java.nio.ByteBuffer;
+
+public class TestRecordProcessorV2 implements ShardRecordProcessor {
+
+ private static final String SHARD_ID_MDC_KEY = "ShardId";
+
+ private static final Logger log = LoggerFactory.getLogger( TestRecordProcessorV2.class );
+
+ private String shardId;
+
+ RecordValidatorQueue recordValidator;
+
+ public TestRecordProcessorV2( RecordValidatorQueue recordValidator ) {
+ this.recordValidator = recordValidator;
+ }
+
+ @Override
+ public void initialize( InitializationInput initializationInput ) {
+ shardId = initializationInput.shardId();
+ MDC.put( SHARD_ID_MDC_KEY, shardId );
+ try {
+ log.info( "Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber() );
+ } finally {
+ MDC.remove( SHARD_ID_MDC_KEY );
+ }
+ }
+
+
+ @Override
+ public void processRecords( software.amazon.kinesis.lifecycle.events.ProcessRecordsInput processRecordsInput ) {
+ MDC.put( SHARD_ID_MDC_KEY, shardId );
+ try {
+ log.info( "Processing {} record(s)", processRecordsInput.records().size() );
+
+ for ( KinesisClientRecord r : processRecordsInput.records() ) {
+ String data = new String( asByteArray( r.data() ) );
+ log.info( "Processing record pk: {}", data );
+ recordValidator.add( shardId, data );
+ }
+
+ } catch ( Throwable t ) {
+ log.error( "Caught throwable while processing records. Aborting.", t );
+ Runtime.getRuntime().halt( 1 );
+ } finally {
+ MDC.remove( SHARD_ID_MDC_KEY );
+ }
+ }
+
+ public static byte[] asByteArray( ByteBuffer buf ) {
+ byte[] bytes = new byte[buf.remaining()];
+ buf.get( bytes );
+ return bytes;
+ }
+
+ @Override
+ public void leaseLost( LeaseLostInput leaseLostInput ) {
+ MDC.put( SHARD_ID_MDC_KEY, shardId );
+ try {
+ log.info( "Lost lease, so terminating." );
+ } finally {
+ MDC.remove( SHARD_ID_MDC_KEY );
+ }
+ }
+
+ @Override
+ public void shardEnded( ShardEndedInput shardEndedInput ) {
+ MDC.put( SHARD_ID_MDC_KEY, shardId );
+ try {
+ log.info( "Reached shard end checkpointing." );
+ shardEndedInput.checkpointer().checkpoint();
+ } catch ( ShutdownException | InvalidStateException e ) {
+ log.error( "Exception while checkpointing at shard end. Giving up.", e );
+ } finally {
+ MDC.remove( SHARD_ID_MDC_KEY );
+ }
+ }
+
+ @Override
+ public void shutdownRequested( ShutdownRequestedInput shutdownRequestedInput ) {
+ MDC.put( SHARD_ID_MDC_KEY, shardId );
+ try {
+ log.info( "Scheduler is shutting down, checkpointing." );
+ shutdownRequestedInput.checkpointer().checkpoint();
+ } catch ( ShutdownException | InvalidStateException e ) {
+ log.error( "Exception while checkpointing at requested shutdown. Giving up.", e );
+ } finally {
+ MDC.remove( SHARD_ID_MDC_KEY );
+ }
+ }
+
+
+}
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/KCLVersion.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/KCLVersion.java
new file mode 100644
index 00000000..65b75172
--- /dev/null
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/KCLVersion.java
@@ -0,0 +1,6 @@
+package software.amazon.kinesis.utils;
+
+public enum KCLVersion {
+ KCL1X,
+ KCL2X
+}
\ No newline at end of file
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/OdinCredentialsHelper.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/OdinCredentialsHelper.java
new file mode 100644
index 00000000..f35830ce
--- /dev/null
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/OdinCredentialsHelper.java
@@ -0,0 +1,114 @@
+package software.amazon.kinesis.utils;
+
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.google.common.io.CharStreams;
+import lombok.extern.slf4j.Slf4j;
+import software.amazon.awssdk.auth.credentials.AwsCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * Helper class to hold odin credentials because odin is not available externally and this package doesn't use brazil.
+ */
+@Slf4j
+public class OdinCredentialsHelper {
+
+ private final static String PRINCIPAL = "Principal";
+ private final static String CREDENTIAL = "Credential";
+ private final static String ODIN_COMMAND = "/apollo/env/envImprovement/bin/odin-get -t";
+
+ private static String getMaterial(String materialName, String materialType) throws IOException {
+ final InputStream inputStream = Runtime.getRuntime().exec(String.format("%s %s %s", ODIN_COMMAND, materialType, materialName)).getInputStream();
+ return CharStreams.toString(new InputStreamReader(inputStream, StandardCharsets.UTF_8)).trim();
+ }
+ private static String getPrincipal(String materialName) throws IOException {
+ return getMaterial(materialName, PRINCIPAL);
+ }
+
+ private static String getCredential(String materialName) throws IOException {
+ return getMaterial(materialName, CREDENTIAL);
+ }
+
+ /**
+ * Helper method to pull credentials from odin for testing for AWS SDK sync clients (1.x).
+ *
+ * @param materialName name of the material set to fetch.
+ * @return access/secret key pair from Odin if specified for testing.
+ * @throws IOException
+ */
+ public static AWSCredentialsProvider getSyncAwsCredentialsFromMaterialSet(String materialName) throws IOException {
+ if (materialName == null) {
+ log.debug("No material name found.");
+ return null;
+ }
+
+ log.debug("Fetching credentials for material - {}.", materialName);
+
+ final String principal = getPrincipal(materialName);
+ final String credential = getCredential(materialName);
+
+ final AWSCredentialsProvider awsCredentialsProvider = new AWSCredentialsProvider() {
+ @Override
+ public AWSCredentials getCredentials() {
+ return new AWSCredentials() {
+ @Override
+ public String getAWSAccessKeyId() {
+ return principal;
+ }
+
+ @Override
+ public String getAWSSecretKey() {
+ return credential;
+ }
+ };
+ }
+ @Override
+ public void refresh() {
+ }
+ };
+
+ log.debug("Successfully retrieved credentials from odin. Access key - {}.", principal);
+
+ return awsCredentialsProvider;
+ }
+
+ /**
+ * Helper method to pull credentials from odin for testing for AWS SDK async clients (2.x).
+ *
+ * @param materialName name of the material set to fetch.
+ * @return access/secret key pair from Odin if specified for testing.
+ * @throws IOException
+ */
+ public static AwsCredentialsProvider getAsyncAwsCredentialsFromMaterialSet(String materialName) throws IOException {
+ if (materialName == null) {
+ log.debug("No material name found.");
+ return null;
+ }
+
+ log.debug("Fetching credentials for material - {}.", materialName);
+
+ final String principal = getPrincipal(materialName);
+ final String credential = getCredential(materialName);
+
+ final AwsCredentialsProvider awsCredentialsProvider = () -> new AwsCredentials() {
+ @Override
+ public String accessKeyId() {
+ return principal;
+ }
+
+ @Override
+ public String secretAccessKey() {
+ return credential;
+ }
+ };
+
+ log.debug("Successfully retrieved credentials from odin. Access key - {}.", principal);
+
+ return awsCredentialsProvider;
+ }
+}
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/RecordValidatorQueue.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/RecordValidatorQueue.java
new file mode 100644
index 00000000..b28cf6c2
--- /dev/null
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/RecordValidatorQueue.java
@@ -0,0 +1,66 @@
+package software.amazon.kinesis.utils;
+
+import lombok.extern.slf4j.Slf4j;
+import java.util.*;
+
+@Slf4j
+public class RecordValidatorQueue {
+
+ HashMap> dict = new HashMap<>();
+
+ public void add( String shardId, String data ) {
+ if ( dict.containsKey( shardId ) ) {
+ // Just add the data to this item
+ List oldVal = dict.get( shardId );
+ oldVal.add( data );
+ dict.put( shardId, oldVal );
+ } else {
+ List newVal = new ArrayList<>();
+ newVal.add( data );
+ dict.put( shardId, newVal );
+ }
+ }
+
+ public int validateRecords( int trueTotalShardCount ) {
+ // Validate that each List in the HashMap has data records in increasing order
+
+ boolean incOrder = true;
+ for ( Map.Entry> entry : dict.entrySet() ) {
+ List recordsPerShard = entry.getValue();
+ int prevVal = -1;
+ boolean shardIncOrder = true;
+ for ( String record : recordsPerShard ) {
+ int nextVal = Integer.parseInt( record );
+ if ( prevVal > nextVal ) {
+ log.error("The records are not in increasing order. Saw record data {} before {}.", prevVal, nextVal );
+ shardIncOrder = false;
+ }
+ prevVal = nextVal;
+ }
+ if ( !shardIncOrder ) {
+ incOrder = false;
+ }
+ }
+
+ // If this is true, then there was some record that was processed out of order
+ if ( !incOrder ) {
+ return -1;
+ }
+
+ // Validate that no records are missing over all shards
+ int totalShardCount = 0;
+ for ( Map.Entry> entry : dict.entrySet() ) {
+ List recordsPerShard = entry.getValue();
+ Set noDupRecords = new HashSet( recordsPerShard );
+ totalShardCount += noDupRecords.size();
+ }
+
+ // If this is true, then there was some record that was missed during processing.
+ if ( totalShardCount != trueTotalShardCount ) {
+ log.error( "Failed to get correct number of records processed. Should be {} but was {}", trueTotalShardCount, totalShardCount );
+ return -2;
+ }
+ return 0;
+ }
+
+}