diff --git a/amazon-kinesis-client/pom.xml b/amazon-kinesis-client/pom.xml index a05c70c8..3eb8ad94 100644 --- a/amazon-kinesis-client/pom.xml +++ b/amazon-kinesis-client/pom.xml @@ -208,8 +208,8 @@ ${sqlite4java.libpath} - credentials - ${credentials} + awsProfile + ${awsProfile} diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLAppConfig.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLAppConfig.java index 081c7ef4..7afda529 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLAppConfig.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLAppConfig.java @@ -7,7 +7,6 @@ import software.amazon.kinesis.utils.RecordValidatorQueue; import software.amazon.kinesis.utils.ReshardOptions; import software.amazon.kinesis.utils.TestRecordProcessorFactory; import lombok.Builder; -import software.amazon.awssdk.arns.Arn; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider; import software.amazon.awssdk.http.Protocol; @@ -39,27 +38,27 @@ import java.util.Optional; */ public abstract class KCLAppConfig { + private KinesisAsyncClient kinesisAsyncClient; + private DynamoDbAsyncClient dynamoDbAsyncClient; + private CloudWatchAsyncClient cloudWatchAsyncClient; + private RecordValidatorQueue recordValidator; + /** * Name used for test stream and lease tracker table */ public abstract String getStreamName(); - public String getStreamArn() { - return null; - } - public int getShardCount() { return 4; } - public String getEndpoint() { return ""; } - public Region getRegion() { return Region.US_WEST_2; } /** * "default" profile, should match with profiles listed in "cat ~/.aws/config" */ - public String getProfile() { - String iamUser = System.getProperty("credentials"); - return iamUser; + private AwsCredentialsProvider getCredentialsProvider() { + final String awsProfile = System.getProperty("credentials"); + return (awsProfile != null) ? + ProfileCredentialsProvider.builder().profileName(awsProfile).build() : DefaultCredentialsProvider.create(); } public InitialPositionInStream getInitialPosition() { @@ -87,24 +86,14 @@ public abstract class KCLAppConfig { return null; } - public KinesisAsyncClient buildConsumerClient() throws URISyntaxException, IOException { - return buildAsyncKinesisClient(getConsumerProtocol()); + public final KinesisAsyncClient buildAsyncKinesisClient(Protocol protocol) throws URISyntaxException, IOException { + if (kinesisAsyncClient == null) { + this.kinesisAsyncClient = buildAsyncKinesisClient(Optional.ofNullable(protocol)); + } + return this.kinesisAsyncClient; } - public KinesisAsyncClient buildProducerClient() throws URISyntaxException, IOException { - return buildAsyncKinesisClient(getProducerProtocol()); - } - - public KinesisAsyncClient buildAsyncKinesisClient(Protocol protocol) throws URISyntaxException, IOException { - return buildAsyncKinesisClient(Optional.ofNullable(protocol)); - } - - private AwsCredentialsProvider getCredentialsProvider() { - return (getProfile() != null) ? - ProfileCredentialsProvider.builder().profileName(getProfile()).build() : DefaultCredentialsProvider.create(); - } - - public KinesisAsyncClient buildAsyncKinesisClient(Optional protocol) throws URISyntaxException, IOException { + public final KinesisAsyncClient buildAsyncKinesisClient(Optional protocol) throws URISyntaxException, IOException { // Setup H2 client config. final NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder() @@ -128,26 +117,33 @@ public abstract class KCLAppConfig { return kinesisAsyncClientBuilder.build(); } - public DynamoDbAsyncClient buildAsyncDynamoDbClient() throws IOException { - final DynamoDbAsyncClientBuilder builder = DynamoDbAsyncClient.builder().region(getRegion()); - - builder.credentialsProvider(getCredentialsProvider()); - return builder.build(); + public final DynamoDbAsyncClient buildAsyncDynamoDbClient() throws IOException { + if (this.dynamoDbAsyncClient == null) { + final DynamoDbAsyncClientBuilder builder = DynamoDbAsyncClient.builder().region(getRegion()); + builder.credentialsProvider(getCredentialsProvider()); + this.dynamoDbAsyncClient = builder.build(); + } + return this.dynamoDbAsyncClient; } - public CloudWatchAsyncClient buildAsyncCloudWatchClient() throws IOException { - final CloudWatchAsyncClientBuilder builder = CloudWatchAsyncClient.builder().region(getRegion()); - - builder.credentialsProvider(getCredentialsProvider()); - return builder.build(); + public final CloudWatchAsyncClient buildAsyncCloudWatchClient() throws IOException { + if (this.cloudWatchAsyncClient == null) { + final CloudWatchAsyncClientBuilder builder = CloudWatchAsyncClient.builder().region(getRegion()); + builder.credentialsProvider(getCredentialsProvider()); + this.cloudWatchAsyncClient = builder.build(); + } + return this.cloudWatchAsyncClient; } public String getWorkerId() throws UnknownHostException { return Inet4Address.getLocalHost().getHostName(); } - public RecordValidatorQueue getRecordValidator() { - return new RecordValidatorQueue(); + public final RecordValidatorQueue getRecordValidator() { + if (recordValidator == null) { + this.recordValidator = new RecordValidatorQueue(); + } + return this.recordValidator; } public ShardRecordProcessorFactory getShardRecordProcessorFactory() { @@ -156,21 +152,16 @@ public abstract class KCLAppConfig { public ConfigsBuilder getConfigsBuilder() throws IOException, URISyntaxException { final String workerId = getWorkerId(); - if (getStreamArn() == null) { - return new ConfigsBuilder(getStreamName(), getStreamName(), buildConsumerClient(), buildAsyncDynamoDbClient(), - buildAsyncCloudWatchClient(), workerId, getShardRecordProcessorFactory()); - } else { - return new ConfigsBuilder(Arn.fromString(getStreamArn()), getStreamName(), buildConsumerClient(), buildAsyncDynamoDbClient(), - buildAsyncCloudWatchClient(), workerId, getShardRecordProcessorFactory()); - } + return new ConfigsBuilder(getStreamName(), getStreamName(), buildAsyncKinesisClient(getConsumerProtocol()), buildAsyncDynamoDbClient(), + buildAsyncCloudWatchClient(), workerId, getShardRecordProcessorFactory()); } public RetrievalConfig getRetrievalConfig() throws IOException, URISyntaxException { - InitialPositionInStreamExtended initialPosition = InitialPositionInStreamExtended + final InitialPositionInStreamExtended initialPosition = InitialPositionInStreamExtended .newInitialPosition(getInitialPosition()); // Default is a streaming consumer - RetrievalConfig config = getConfigsBuilder().retrievalConfig(); + final RetrievalConfig config = getConfigsBuilder().retrievalConfig(); config.initialPositionInStreamExtended(initialPosition); return config; } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryPollingH1TestConfig.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryPollingH1TestConfig.java index 074284ba..7fdd910d 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryPollingH1TestConfig.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryPollingH1TestConfig.java @@ -7,14 +7,18 @@ import software.amazon.kinesis.retrieval.polling.PollingConfig; import java.io.IOException; import java.net.URISyntaxException; +import java.util.UUID; /** * Config for a polling consumer with HTTP protocol of HTTP1 */ public class ReleaseCanaryPollingH1TestConfig extends KCLAppConfig { + + private final UUID uniqueId = UUID.randomUUID(); + @Override public String getStreamName() { - return "KCLReleaseCanary2XPollingH1TestStream"; + return "KCLReleaseCanary2XPollingH1TestStream_" + uniqueId; } @Override @@ -25,12 +29,12 @@ public class ReleaseCanaryPollingH1TestConfig extends KCLAppConfig { @Override public RetrievalConfig getRetrievalConfig() throws IOException, URISyntaxException { - InitialPositionInStreamExtended initialPosition = InitialPositionInStreamExtended + final InitialPositionInStreamExtended initialPosition = InitialPositionInStreamExtended .newInitialPosition(getInitialPosition()); - RetrievalConfig config = getConfigsBuilder().retrievalConfig(); + final RetrievalConfig config = getConfigsBuilder().retrievalConfig(); config.initialPositionInStreamExtended(initialPosition); - config.retrievalSpecificConfig(new PollingConfig(getStreamName(), buildConsumerClient())); + config.retrievalSpecificConfig(new PollingConfig(getStreamName(), config.kinesisClient())); return config; } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryPollingH2TestConfig.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryPollingH2TestConfig.java index 7939df5b..b2089519 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryPollingH2TestConfig.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryPollingH2TestConfig.java @@ -7,14 +7,17 @@ import software.amazon.kinesis.retrieval.polling.PollingConfig; import java.io.IOException; import java.net.URISyntaxException; +import java.util.UUID; /** * Config for a polling consumer with HTTP protocol of HTTP2 */ public class ReleaseCanaryPollingH2TestConfig extends KCLAppConfig { + private final UUID uniqueId = UUID.randomUUID(); + @Override public String getStreamName() { - return "KCLReleaseCanary2XPollingH2TestStream"; + return "KCLReleaseCanary2XPollingH2TestStream_" + uniqueId; } @Override @@ -25,12 +28,12 @@ public class ReleaseCanaryPollingH2TestConfig extends KCLAppConfig { @Override public RetrievalConfig getRetrievalConfig() throws IOException, URISyntaxException { - InitialPositionInStreamExtended initialPosition = InitialPositionInStreamExtended + final InitialPositionInStreamExtended initialPosition = InitialPositionInStreamExtended .newInitialPosition(getInitialPosition()); - RetrievalConfig config = getConfigsBuilder().retrievalConfig(); + final RetrievalConfig config = getConfigsBuilder().retrievalConfig(); config.initialPositionInStreamExtended(initialPosition); - config.retrievalSpecificConfig(new PollingConfig(getStreamName(), buildConsumerClient())); + config.retrievalSpecificConfig(new PollingConfig(getStreamName(), config.kinesisClient())); return config; } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryStreamingTestConfig.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryStreamingTestConfig.java index c92ab087..cbe02440 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryStreamingTestConfig.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryStreamingTestConfig.java @@ -2,13 +2,17 @@ package software.amazon.kinesis.config; import software.amazon.awssdk.http.Protocol; +import java.util.UUID; + /** * Config for a streaming consumer with HTTP protocol of HTTP2 */ public class ReleaseCanaryStreamingTestConfig extends KCLAppConfig { + private final UUID uniqueId = UUID.randomUUID(); + @Override public String getStreamName() { - return "KCLReleaseCanary2XStreamingTestStream"; + return "KCLReleaseCanary2XStreamingTestStream_" + uniqueId; } @Override diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/BasicStreamingPollingIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/BasicStreamConsumerIntegrationTest.java similarity index 79% rename from amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/BasicStreamingPollingIntegrationTest.java rename to amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/BasicStreamConsumerIntegrationTest.java index e19501b2..457c4dbf 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/BasicStreamingPollingIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/BasicStreamConsumerIntegrationTest.java @@ -2,29 +2,29 @@ package software.amazon.kinesis.lifecycle; import org.junit.Test; import software.amazon.kinesis.config.KCLAppConfig; -import software.amazon.kinesis.config.ReleaseCanaryPollingH1TestConfig; import software.amazon.kinesis.config.ReleaseCanaryPollingH2TestConfig; +import software.amazon.kinesis.config.ReleaseCanaryPollingH1TestConfig; import software.amazon.kinesis.config.ReleaseCanaryStreamingTestConfig; import software.amazon.kinesis.utils.TestConsumer; -public class BasicStreamingPollingIntegrationTest { +public class BasicStreamConsumerIntegrationTest { @Test - public void KCLReleaseCanaryPollingH2Test() throws Exception { + public void kclReleaseCanaryPollingH2Test() throws Exception { KCLAppConfig consumerConfig = new ReleaseCanaryPollingH2TestConfig(); TestConsumer consumer = new TestConsumer(consumerConfig); consumer.run(); } @Test - public void KCLReleaseCanaryPollingH1Test() throws Exception { + public void kclReleaseCanaryPollingH1Test() throws Exception { KCLAppConfig consumerConfig = new ReleaseCanaryPollingH1TestConfig(); TestConsumer consumer = new TestConsumer(consumerConfig); consumer.run(); } @Test - public void KCLReleaseCanaryStreamingTest() throws Exception { + public void kclReleaseCanaryStreamingTest() throws Exception { KCLAppConfig consumerConfig = new ReleaseCanaryStreamingTestConfig(); TestConsumer consumer = new TestConsumer(consumerConfig); consumer.run(); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/LeaseTableManager.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/LeaseTableManager.java new file mode 100644 index 00000000..df7a4c3e --- /dev/null +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/LeaseTableManager.java @@ -0,0 +1,74 @@ +package software.amazon.kinesis.utils; + +import lombok.extern.slf4j.Slf4j; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; +import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest; +import software.amazon.awssdk.services.dynamodb.model.ListTablesRequest; +import software.amazon.awssdk.services.dynamodb.model.ListTablesResponse; +import software.amazon.kinesis.common.FutureUtils; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.time.Duration; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +@Slf4j +public class LeaseTableManager { + + private final DynamoDbAsyncClient dynamoClient; + + public LeaseTableManager(DynamoDbAsyncClient dynamoClient) throws URISyntaxException, IOException { + this.dynamoClient = dynamoClient; + } + + private List listAllLeaseTables() throws Exception { + final ListTablesRequest request = ListTablesRequest.builder().build(); + final ListTablesResponse response; + try { + response = FutureUtils.resolveOrCancelFuture(dynamoClient.listTables(request), Duration.ofSeconds(60)); + } catch (ExecutionException | InterruptedException e) { + throw new Exception("Error listing all lease tables"); + } + return response.tableNames(); + } + + public void deleteLeaseTable(String tableName) throws Exception { + final DeleteTableRequest request = DeleteTableRequest.builder().tableName(tableName).build(); + try { + FutureUtils.resolveOrCancelFuture(dynamoClient.deleteTable(request), Duration.ofSeconds(60)); + } catch (ExecutionException | InterruptedException e) { + throw new Exception("Could not delete lease table: {}", e); + } + + // Wait till table is deleted to return + int i = 0; + while (true) { + i++; + if (i > 100) { + throw new RuntimeException("Failed lease table deletion"); + } + try { + if (!listAllLeaseTables().contains(tableName)) { + log.info("Succesfully deleted the lease table {}", tableName); + return; + } + } catch (Exception e) { + try { + Thread.sleep(TimeUnit.SECONDS.toMillis(10)); + } catch (InterruptedException e1) { } + log.info("Lease table {} is not deleted yet, exception: ", tableName, e); + } + } + } + + public void deleteAllLeaseTables() throws Exception { + + final List tableNames = listAllLeaseTables(); + for (String tableName : tableNames) { + deleteLeaseTable(tableName); + } + } + +} diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/StreamExistenceManager.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/StreamExistenceManager.java index ee0450a0..3943cf6a 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/StreamExistenceManager.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/StreamExistenceManager.java @@ -7,18 +7,19 @@ import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest; import software.amazon.awssdk.services.kinesis.model.DeleteStreamRequest; import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryRequest; import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryResponse; +import software.amazon.awssdk.services.kinesis.model.ListStreamsRequest; +import software.amazon.awssdk.services.kinesis.model.ListStreamsResponse; import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; import software.amazon.awssdk.services.kinesis.model.StreamStatus; import software.amazon.kinesis.config.KCLAppConfig; import java.io.IOException; import java.net.URISyntaxException; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import static java.lang.Thread.sleep; - @Value @Slf4j public class StreamExistenceManager { @@ -32,8 +33,7 @@ public class StreamExistenceManager { private boolean isStreamActive(String streamName) { - DescribeStreamSummaryRequest request = DescribeStreamSummaryRequest.builder().streamName(streamName).build(); - + final DescribeStreamSummaryRequest request = DescribeStreamSummaryRequest.builder().streamName(streamName).build(); final CompletableFuture describeStreamSummaryResponseCompletableFuture = client.describeStreamSummary(request); try { @@ -55,7 +55,7 @@ public class StreamExistenceManager { } private void createStream(String streamName, int shardCount) { - CreateStreamRequest request = CreateStreamRequest.builder().streamName(streamName).shardCount(shardCount).build(); + final CreateStreamRequest request = CreateStreamRequest.builder().streamName(streamName).shardCount(shardCount).build(); try { client.createStream(request).get(30, TimeUnit.SECONDS); } catch (Exception e) { @@ -71,12 +71,12 @@ public class StreamExistenceManager { try { boolean isActive = isStreamActive(streamName); if (isActive) { - log.info("Succesfully created the stream " + streamName); + log.info("Succesfully created the stream {}", streamName); return; } } catch (Exception e) { try { - sleep(10_000); // 10 secs backoff. + Thread.sleep(TimeUnit.SECONDS.toMillis(10)); } catch (InterruptedException e1) { log.error("Failed to sleep"); } @@ -86,8 +86,8 @@ public class StreamExistenceManager { } public void deleteStream(String streamName) { - DeleteStreamRequest request = DeleteStreamRequest.builder().streamName(streamName).enforceConsumerDeletion(true).build(); - try{ + final DeleteStreamRequest request = DeleteStreamRequest.builder().streamName(streamName).enforceConsumerDeletion(true).build(); + try { client.deleteStream(request).get(30, TimeUnit.SECONDS); } catch (Exception e) { throw new RuntimeException("Failed to delete stream with name " + streamName, e); @@ -102,13 +102,13 @@ public class StreamExistenceManager { try { boolean isActive = isStreamActive(streamName); if (!isActive) { - log.info("Succesfully deleted the stream " + streamName); + log.info("Succesfully deleted the stream {}", streamName); return; } } catch (Exception e) { try { - sleep(10_000); // 10 secs backoff. - } catch (InterruptedException e1) {} + Thread.sleep(TimeUnit.SECONDS.toMillis(10)); + } catch (InterruptedException e1) { } log.info("Stream {} is not deleted yet, exception: ", streamName, e); } } @@ -119,7 +119,25 @@ public class StreamExistenceManager { if (!isStreamActive(streamName)) { createStream(streamName, testConfig.getShardCount()); } - log.info("Using stream {} in endpoint {} with region {}", streamName, testConfig.getEndpoint(), testConfig.getRegion()); + log.info("Using stream {} with region {}", streamName, testConfig.getRegion()); + } + + private List getAllStreamNames() { + final ListStreamsRequest request = ListStreamsRequest.builder().build(); + ListStreamsResponse response; + try { + response = client.listStreams(request).get(30, TimeUnit.SECONDS); + } catch (Exception e) { + throw new RuntimeException("Failed to list all streams", e); + } + return response.streamNames(); + } + + public void deleteAllStreams() { + final List streamNames = getAllStreamNames(); + for (String streamName : streamNames) { + deleteStream(streamName); + } } } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestConsumer.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestConsumer.java index ece63922..b0eb7d9d 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestConsumer.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestConsumer.java @@ -5,16 +5,12 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.RandomStringUtils; import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.regions.Region; -import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; -import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.PutRecordRequest; import software.amazon.kinesis.checkpoint.CheckpointConfig; import software.amazon.kinesis.common.ConfigsBuilder; -import software.amazon.kinesis.common.FutureUtils; import software.amazon.kinesis.common.InitialPositionInStreamExtended; -import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.config.KCLAppConfig; import software.amazon.kinesis.coordinator.CoordinatorConfig; import software.amazon.kinesis.coordinator.Scheduler; @@ -26,8 +22,6 @@ import software.amazon.kinesis.retrieval.RetrievalConfig; import java.math.BigInteger; import java.nio.ByteBuffer; -import java.time.Duration; -import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -35,7 +29,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.function.Consumer; @Slf4j public class TestConsumer { @@ -50,33 +43,87 @@ public class TestConsumer { private LeaseManagementConfig leaseManagementConfig; private LifecycleConfig lifecycleConfig; private ProcessorConfig processorConfig; + private Scheduler scheduler; + private ScheduledExecutorService producerExecutor; + private ScheduledFuture producerFuture; + private DynamoDbAsyncClient dynamoClient; public int successfulPutRecords = 0; public BigInteger payloadCounter = new BigInteger("0"); - public TestConsumer(KCLAppConfig consumerConfig) { + public TestConsumer(KCLAppConfig consumerConfig) throws Exception { this.consumerConfig = consumerConfig; this.region = consumerConfig.getRegion(); this.streamName = consumerConfig.getStreamName(); - this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region)); + this.kinesisClient = consumerConfig.buildAsyncKinesisClient(consumerConfig.getConsumerProtocol()); + this.dynamoClient = consumerConfig.buildAsyncDynamoDbClient(); } public void run() throws Exception { + final StreamExistenceManager streamExistenceManager = new StreamExistenceManager(this.consumerConfig); + final LeaseTableManager leaseTableManager = new LeaseTableManager(this.dynamoClient); + + // Clean up any old streams or lease tables left in test environment + cleanTestEnvironment(streamExistenceManager, leaseTableManager); + Thread.sleep(TimeUnit.SECONDS.toMillis(30)); + // Check if stream is created. If not, create it - StreamExistenceManager streamExistenceManager = new StreamExistenceManager(this.consumerConfig); streamExistenceManager.checkStreamAndCreateIfNecessary(this.streamName); + startProducer(); + setUpTestResources(); + + try { + startConsumer(); + + // Sleep for two minutes to allow the producer/consumer to run and then end the test case. + Thread.sleep(TimeUnit.SECONDS.toMillis(60 * 3)); + + // Stops sending dummy data. + stopProducer(); + + // Wait a few seconds for the last few records to be processed + Thread.sleep(TimeUnit.SECONDS.toMillis(10)); + + // Finishes processing current batch of data already received from Kinesis before shutting down. + awaitConsumerFinish(); + + // Validate processed data + validateRecordProcessor(); + + // Clean up resources created + Thread.sleep(TimeUnit.SECONDS.toMillis(30)); + deleteResources(streamExistenceManager, leaseTableManager); + + } catch (Exception e) { + // Test Failed. Clean up resources and then throw exception. + log.info("----------Test Failed: Cleaning up resources------------"); + Thread.sleep(TimeUnit.SECONDS.toMillis(30)); + deleteResources(streamExistenceManager, leaseTableManager); + throw e; + } + } + + private void cleanTestEnvironment(StreamExistenceManager streamExistenceManager, LeaseTableManager leaseTableManager) throws Exception { + log.info("----------Before starting, Cleaning test environment----------"); + log.info("----------Deleting all lease tables in account----------"); + leaseTableManager.deleteAllLeaseTables(); + log.info("----------Finished deleting all lease tables-------------"); + + log.info("----------Deleting all streams in account----------"); + streamExistenceManager.deleteAllStreams(); + log.info("----------Finished deleting all streams-------------"); + } + + private void startProducer() { // Send dummy data to stream - ScheduledExecutorService producerExecutor = Executors.newSingleThreadScheduledExecutor(); - ScheduledFuture producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 60, 1, TimeUnit.SECONDS); - - RecordValidatorQueue recordValidator = new RecordValidatorQueue(); + this.producerExecutor = Executors.newSingleThreadScheduledExecutor(); + this.producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 60, 1, TimeUnit.SECONDS); + } + private void setUpTestResources() throws Exception { // Setup configuration of KCL (including DynamoDB and CloudWatch) - DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build(); - CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); - ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, streamName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new TestRecordProcessorFactory(recordValidator)); - + final ConfigsBuilder configsBuilder = consumerConfig.getConfigsBuilder(); retrievalConfig = consumerConfig.getRetrievalConfig(); checkpointConfig = configsBuilder.checkpointConfig(); @@ -89,7 +136,7 @@ public class TestConsumer { metricsConfig = configsBuilder.metricsConfig(); // Create Scheduler - Scheduler scheduler = new Scheduler( + this.scheduler = new Scheduler( checkpointConfig, coordinatorConfig, leaseManagementConfig, @@ -98,66 +145,21 @@ public class TestConsumer { processorConfig, retrievalConfig ); + } - try { - // Start record processing of dummy data - Thread schedulerThread = new Thread(scheduler); - schedulerThread.setDaemon(true); - schedulerThread.start(); - - // Sleep for two minutes to allow the producer/consumer to run and then end the test case. - Thread.sleep(TimeUnit.SECONDS.toMillis(60 * 3)); - - // Stops sending dummy data. - log.info("Cancelling producer and shutting down executor."); - producerFuture.cancel(false); - producerExecutor.shutdown(); - - // Wait a few seconds for the last few records to be processed - Thread.sleep(TimeUnit.SECONDS.toMillis(10)); - - // Finishes processing current batch of data already received from Kinesis before shutting down. - Future gracefulShutdownFuture = scheduler.startGracefulShutdown(); - log.info("Waiting up to 20 seconds for shutdown to complete."); - try { - gracefulShutdownFuture.get(20, TimeUnit.SECONDS); - } catch (InterruptedException e) { - log.info("Interrupted while waiting for graceful shutdown. Continuing."); - } catch (ExecutionException e) { - throw new ExecutionException("Exception while executing graceful shutdown. {}", e); - } catch (TimeoutException e) { - throw new TimeoutException("Timeout while waiting for shutdown. Scheduler may not have exited. {}" + e); - } - log.info("Completed, shutting down now."); - - // Validate processed data - log.info("The number of expected records is: {}", successfulPutRecords); - RecordValidationStatus errorVal = recordValidator.validateRecords(successfulPutRecords); - if (errorVal == RecordValidationStatus.OUT_OF_ORDER) { - throw new RuntimeException("There was an error validating the records that were processed. The records were out of order"); - } else if (errorVal == RecordValidationStatus.MISSING_RECORD) { - throw new RuntimeException("There was an error validating the records that were processed. Some records were missing."); - } - log.info("--------------Completed validation of processed records.--------------"); - - // Clean up resources created - Thread.sleep(TimeUnit.SECONDS.toMillis(30)); - deleteResources(streamExistenceManager, dynamoClient); - - } catch (Exception e) { - // Test Failed. Clean up resources and then throw exception. - Thread.sleep(TimeUnit.SECONDS.toMillis(30)); - deleteResources(streamExistenceManager, dynamoClient); - throw e; - } + private void startConsumer() { + // Start record processing of dummy data + final Thread schedulerThread = new Thread(scheduler); + schedulerThread.setDaemon(true); + schedulerThread.start(); } public void publishRecord() { - PutRecordRequest request; + final PutRecordRequest request; try { request = PutRecordRequest.builder() .partitionKey(RandomStringUtils.randomAlphabetic(5, 20)) - .streamName(streamName) + .streamName(this.streamName) .data(SdkBytes.fromByteBuffer(wrapWithCounter(5, payloadCounter))) // 1024 is 1 KB .build(); kinesisClient.putRecord(request).get(); @@ -168,15 +170,13 @@ public class TestConsumer { log.info("---------Record published, successfulPutRecords is now: {}", successfulPutRecords); } catch (InterruptedException e) { log.info("Interrupted, assuming shutdown."); - } catch (ExecutionException e) { - log.error("Error during publishRecord. Will try again next cycle", e); - } catch (RuntimeException e) { - log.error("Error while creating request", e); + } catch (ExecutionException | RuntimeException e) { + log.error("Error during publish records"); } } private ByteBuffer wrapWithCounter(int payloadSize, BigInteger payloadCounter) throws RuntimeException { - byte[] returnData; + final byte[] returnData; log.info("--------------Putting record with data: {}", payloadCounter); ObjectMapper mapper = new ObjectMapper(); try { @@ -188,23 +188,42 @@ public class TestConsumer { return ByteBuffer.wrap(returnData); } - private void deleteResources(StreamExistenceManager streamExistenceManager, DynamoDbAsyncClient dynamoDBClient) throws Exception { + private void stopProducer() { + log.info("Cancelling producer and shutting down executor."); + producerFuture.cancel(false); + producerExecutor.shutdown(); + } + + private void awaitConsumerFinish() throws Exception { + Future gracefulShutdownFuture = scheduler.startGracefulShutdown(); + log.info("Waiting up to 20 seconds for shutdown to complete."); + try { + gracefulShutdownFuture.get(20, TimeUnit.SECONDS); + } catch (InterruptedException e) { + log.info("Interrupted while waiting for graceful shutdown. Continuing."); + } catch (ExecutionException | TimeoutException e) { + throw new Exception("Exception while executing graceful shutdown. {}", e); + } + log.info("Completed, shutting down now."); + } + + private void validateRecordProcessor() throws Exception { + log.info("The number of expected records is: {}", successfulPutRecords); + final RecordValidationStatus errorVal = consumerConfig.getRecordValidator().validateRecords(successfulPutRecords); + if (errorVal == RecordValidationStatus.OUT_OF_ORDER) { + throw new RuntimeException("There was an error validating the records that were processed. The records were out of order"); + } else if (errorVal == RecordValidationStatus.MISSING_RECORD) { + throw new RuntimeException("There was an error validating the records that were processed. Some records were missing."); + } + log.info("--------------Completed validation of processed records.--------------"); + } + + private void deleteResources(StreamExistenceManager streamExistenceManager, LeaseTableManager leaseTableManager) throws Exception { log.info("-------------Start deleting stream.----------------"); streamExistenceManager.deleteStream(this.streamName); log.info("-------------Start deleting lease table.----------------"); - deleteLeaseTable(dynamoDBClient, consumerConfig.getStreamName()); + leaseTableManager.deleteLeaseTable(this.consumerConfig.getStreamName()); log.info("-------------Finished deleting resources.----------------"); } - private void deleteLeaseTable(DynamoDbAsyncClient dynamoClient, String tableName) throws Exception { - DeleteTableRequest request = DeleteTableRequest.builder().tableName(tableName).build(); - try { - FutureUtils.resolveOrCancelFuture(dynamoClient.deleteTable(request), Duration.ofSeconds(60)); - } catch (ExecutionException e) { - throw new Exception("Could not delete lease table: {}", e); - } catch (InterruptedException e) { - throw new Exception("Deleting lease table interrupted: {}", e); - } - - } }