From 7816727b3a3647e7da0b5b5127517f9419d8a37f Mon Sep 17 00:00:00 2001 From: Meher Mankikar Date: Wed, 21 Jun 2023 11:32:59 -0700 Subject: [PATCH] Updating paginated call and cleaning up style --- README.md | 2 +- .../amazon/kinesis/config/KCLAppConfig.java | 2 +- .../BasicStreamConsumerIntegrationTest.java | 12 +++++++ .../kinesis/utils/AWSResourceManager.java | 34 ++++++++++++++----- .../kinesis/utils/LeaseTableManager.java | 17 +++++++--- .../kinesis/utils/RecordValidatorQueue.java | 11 +++--- .../kinesis/utils/StreamExistenceManager.java | 12 ++++++- .../amazon/kinesis/utils/TestConsumer.java | 20 +++++------ 8 files changed, 77 insertions(+), 33 deletions(-) diff --git a/README.md b/README.md index 84f4d2f6..6328e115 100644 --- a/README.md +++ b/README.md @@ -40,7 +40,7 @@ After you've downloaded the code from GitHub, you can build it using Maven. To d To run integration tests: `mvn -Dit.test=*IntegrationTest verify`. This will look for a default AWS profile specified in your local `.aws/credentials`. -Optionally, you can provide the name of an IAM user to run tests with as a string using this command: `mvn -Dit.test=*IntegrationTest -Dcredentials="" verify`. +Optionally, you can provide the name of an IAM user/role to run tests with as a string using this command: `mvn -Dit.test=*IntegrationTest -DawsProfile="" verify`. ## Integration with the Kinesis Producer Library For producer-side developers using the **[Kinesis Producer Library (KPL)][kinesis-guide-kpl]**, the KCL integrates without additional effort. When the KCL retrieves an aggregated Amazon Kinesis record consisting of multiple KPL user records, it will automatically invoke the KPL to extract the individual user records before returning them to the user. diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLAppConfig.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLAppConfig.java index 22b575fc..b67efa10 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLAppConfig.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLAppConfig.java @@ -56,7 +56,7 @@ public abstract class KCLAppConfig { * "default" profile, should match with profiles listed in "cat ~/.aws/config" */ private AwsCredentialsProvider getCredentialsProvider() { - final String awsProfile = System.getProperty("credentials"); + final String awsProfile = System.getProperty("awsProfile"); return (awsProfile != null) ? ProfileCredentialsProvider.builder().profileName(awsProfile).build() : DefaultCredentialsProvider.create(); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/BasicStreamConsumerIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/BasicStreamConsumerIntegrationTest.java index 457c4dbf..e2e44687 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/BasicStreamConsumerIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/BasicStreamConsumerIntegrationTest.java @@ -9,6 +9,10 @@ import software.amazon.kinesis.utils.TestConsumer; public class BasicStreamConsumerIntegrationTest { + /** + * Test with a polling consumer using HTTP2 protocol. + * In the polling case, consumer makes calls to the producer each time to request records to process. + */ @Test public void kclReleaseCanaryPollingH2Test() throws Exception { KCLAppConfig consumerConfig = new ReleaseCanaryPollingH2TestConfig(); @@ -16,6 +20,10 @@ public class BasicStreamConsumerIntegrationTest { consumer.run(); } + /** + * Test with a polling consumer using HTTP1 protocol. + * In the polling case, consumer makes calls to the producer each time to request records to process. + */ @Test public void kclReleaseCanaryPollingH1Test() throws Exception { KCLAppConfig consumerConfig = new ReleaseCanaryPollingH1TestConfig(); @@ -23,6 +31,10 @@ public class BasicStreamConsumerIntegrationTest { consumer.run(); } + /** + * Test with a streaming consumer. + * In the streaming configuration, connection is made once between consumer and producer and producer continuously sends data to be processed. + */ @Test public void kclReleaseCanaryStreamingTest() throws Exception { KCLAppConfig consumerConfig = new ReleaseCanaryStreamingTestConfig(); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/AWSResourceManager.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/AWSResourceManager.java index fbe602a6..f88f7406 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/AWSResourceManager.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/AWSResourceManager.java @@ -1,22 +1,40 @@ package software.amazon.kinesis.utils; +import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; +import software.amazon.awssdk.services.dynamodb.model.ListTablesRequest; +import software.amazon.awssdk.services.dynamodb.model.ListTablesResponse; +import software.amazon.kinesis.common.FutureUtils; + +import java.time.Duration; +import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @Slf4j +@NoArgsConstructor public abstract class AWSResourceManager { - public AWSResourceManager() {} - + /** + * Make delete resource API call for specific resource type + */ public abstract void _deleteResource(String resourceName) throws Exception; + + /** + * Check if resource with given name is in active state + */ public abstract boolean _isResourceActive(String name); + + /** + * Get a list of all the names of resources of a specified type + * @return + * @throws Exception + */ public abstract List _getAllResourceNames() throws Exception; /** - * Deletes resource with specified resource name - * @param resourceName - * @throws Exception + * Delete resource with specified resource name */ public void deleteResource(String resourceName) throws Exception { @@ -51,9 +69,9 @@ public abstract class AWSResourceManager { * Delete all instances of a particular resource type */ public void deleteAllResource() throws Exception { - final List streamNames = _getAllResourceNames(); - for (String streamName : streamNames) { - deleteResource(streamName); + final List resourceNames = _getAllResourceNames(); + for (String resourceName : resourceNames) { + deleteResource(resourceName); } } } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/LeaseTableManager.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/LeaseTableManager.java index dcf930d3..d7dd31a1 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/LeaseTableManager.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/LeaseTableManager.java @@ -1,5 +1,6 @@ package software.amazon.kinesis.utils; +import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest; @@ -21,14 +22,11 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @Slf4j +@AllArgsConstructor public class LeaseTableManager extends AWSResourceManager { private final DynamoDbAsyncClient dynamoClient; - public LeaseTableManager(DynamoDbAsyncClient dynamoClient) throws URISyntaxException, IOException { - this.dynamoClient = dynamoClient; - } - public boolean _isResourceActive(String tableName) { final DescribeTableRequest request = DescribeTableRequest.builder().tableName(tableName).build(); final CompletableFuture describeTableResponseCompletableFuture = dynamoClient.describeTable(request); @@ -58,15 +56,24 @@ public class LeaseTableManager extends AWSResourceManager { public List _getAllResourceNames() throws Exception { List tableNames = new ArrayList<>(); - final ListTablesRequest request = ListTablesRequest.builder().build(); + ListTablesRequest request = ListTablesRequest.builder().build(); ListTablesResponse response = null; + String startTableName = null; + + // Continue while paginated call is still returning table names while(response == null || response.lastEvaluatedTableName() != null) { + if (startTableName != null) { + request = ListTablesRequest.builder().exclusiveStartTableName(startTableName).build(); + } try { response = FutureUtils.resolveOrCancelFuture(dynamoClient.listTables(request), Duration.ofSeconds(60)); } catch (ExecutionException | InterruptedException e) { throw new Exception("Error listing all lease tables"); } + // Add all table names to list tableNames.addAll(response.tableNames()); + // Set startTableName for next call to be the last table name evaluated in current call + startTableName = response.lastEvaluatedTableName(); } return tableNames; } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/RecordValidatorQueue.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/RecordValidatorQueue.java index 79671e49..c6b9d6a6 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/RecordValidatorQueue.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/RecordValidatorQueue.java @@ -26,10 +26,9 @@ public class RecordValidatorQueue { values.add(data); } - public RecordValidationStatus validateRecords(int expectedShardCount) { + public RecordValidationStatus validateRecords(int expectedRecordCount) { // Validate that each List in the HashMap has data records in increasing order - boolean incOrder = true; for (Map.Entry> entry : dict.entrySet()) { List recordsPerShard = entry.getValue(); int prevVal = -1; @@ -44,16 +43,16 @@ public class RecordValidatorQueue { } // Validate that no records are missing over all shards - int actualShardCount = 0; + int actualRecordCount = 0; for (Map.Entry> entry : dict.entrySet()) { List recordsPerShard = entry.getValue(); Set noDupRecords = new HashSet(recordsPerShard); - actualShardCount += noDupRecords.size(); + actualRecordCount += noDupRecords.size(); } // If this is true, then there was some record that was missed during processing. - if (actualShardCount != expectedShardCount) { - log.error("Failed to get correct number of records processed. Should be {} but was {}", expectedShardCount, actualShardCount); + if (actualRecordCount != expectedRecordCount) { + log.error("Failed to get correct number of records processed. Should be {} but was {}", expectedRecordCount, actualRecordCount); return RecordValidationStatus.MISSING_RECORD; } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/StreamExistenceManager.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/StreamExistenceManager.java index cf089fd7..321a36b5 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/StreamExistenceManager.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/StreamExistenceManager.java @@ -2,6 +2,7 @@ package software.amazon.kinesis.utils; import lombok.Value; import lombok.extern.slf4j.Slf4j; +import software.amazon.awssdk.services.dynamodb.model.ListTablesRequest; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest; import software.amazon.awssdk.services.kinesis.model.DeleteStreamRequest; @@ -63,15 +64,24 @@ public class StreamExistenceManager extends AWSResourceManager { public List _getAllResourceNames() throws Exception { List streamNames = new ArrayList<>(); - final ListStreamsRequest request = ListStreamsRequest.builder().build(); + ListStreamsRequest request = ListStreamsRequest.builder().build(); ListStreamsResponse response = null; + String startStreamName = null; + + // Continue while paginated call is still returning stream names while(response == null || response.hasMoreStreams()) { + if (startStreamName != null) { + request = ListStreamsRequest.builder().exclusiveStartStreamName(startStreamName).build(); + } try { response = FutureUtils.resolveOrCancelFuture(client.listStreams(request), Duration.ofSeconds(60)); } catch (Exception e) { throw new Exception("Error listing all lease tables"); } + // Add all stream names to list streamNames.addAll(response.streamNames()); + // Set startTableName for next call to be the last table name evaluated in current call + startStreamName = response.nextToken(); } return streamNames; } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestConsumer.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestConsumer.java index d3649952..394c2838 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestConsumer.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestConsumer.java @@ -49,6 +49,7 @@ public class TestConsumer { private ScheduledExecutorService consumerExecutor; private ScheduledFuture consumerFuture; private DynamoDbAsyncClient dynamoClient; + private final ObjectMapper mapper = new ObjectMapper(); public int successfulPutRecords = 0; public BigInteger payloadCounter = new BigInteger("0"); @@ -77,7 +78,7 @@ public class TestConsumer { try { startConsumer(); - // Sleep for two minutes to allow the producer/consumer to run and then end the test case. + // Sleep for three minutes to allow the producer/consumer to run and then end the test case. Thread.sleep(TimeUnit.SECONDS.toMillis(60 * 3)); // Stops sending dummy data. @@ -92,15 +93,14 @@ public class TestConsumer { // Validate processed data validateRecordProcessor(); - // Clean up resources created - Thread.sleep(TimeUnit.SECONDS.toMillis(30)); - deleteResources(streamExistenceManager, leaseTableManager); - } catch (Exception e) { // Test Failed. Clean up resources and then throw exception. log.info("----------Test Failed: Cleaning up resources------------"); - deleteResources(streamExistenceManager, leaseTableManager); throw e; + } finally { + // Clean up resources created + Thread.sleep(TimeUnit.SECONDS.toMillis(30)); + deleteResources(streamExistenceManager, leaseTableManager); } } @@ -168,20 +168,18 @@ public class TestConsumer { successfulPutRecords += 1; log.info("---------Record published, successfulPutRecords is now: {}", successfulPutRecords); } catch (InterruptedException e) { - log.info("Interrupted, assuming shutdown."); + log.info("Interrupted, assuming shutdown. ", e); } catch (ExecutionException | RuntimeException e) { - log.error("Error during publish records"); + log.error("Error during publish records", e); } } private ByteBuffer wrapWithCounter(int payloadSize, BigInteger payloadCounter) throws RuntimeException { final byte[] returnData; log.info("--------------Putting record with data: {}", payloadCounter); - ObjectMapper mapper = new ObjectMapper(); try { returnData = mapper.writeValueAsBytes(payloadCounter); } catch (Exception e) { - log.error("Error creating payload data for {}", payloadCounter.toString()); throw new RuntimeException("Error converting object to bytes: ", e); } return ByteBuffer.wrap(returnData); @@ -201,7 +199,7 @@ public class TestConsumer { } catch (InterruptedException e) { log.info("Interrupted while waiting for graceful shutdown. Continuing."); } catch (ExecutionException | TimeoutException e) { - throw new Exception("Exception while executing graceful shutdown. {}", e); + throw e; } log.info("Completed, shutting down now."); }