Updating paginated call and cleaning up style

This commit is contained in:
Meher Mankikar 2023-06-21 11:32:59 -07:00
parent c9659ce11b
commit 7816727b3a
8 changed files with 77 additions and 33 deletions

View file

@ -40,7 +40,7 @@ After you've downloaded the code from GitHub, you can build it using Maven. To d
To run integration tests: `mvn -Dit.test=*IntegrationTest verify`. To run integration tests: `mvn -Dit.test=*IntegrationTest verify`.
This will look for a default AWS profile specified in your local `.aws/credentials`. This will look for a default AWS profile specified in your local `.aws/credentials`.
Optionally, you can provide the name of an IAM user to run tests with as a string using this command: `mvn -Dit.test=*IntegrationTest -Dcredentials="<IAM_USER>" verify`. Optionally, you can provide the name of an IAM user/role to run tests with as a string using this command: `mvn -Dit.test=*IntegrationTest -DawsProfile="<PROFILE_NAME>" verify`.
## Integration with the Kinesis Producer Library ## Integration with the Kinesis Producer Library
For producer-side developers using the **[Kinesis Producer Library (KPL)][kinesis-guide-kpl]**, the KCL integrates without additional effort. When the KCL retrieves an aggregated Amazon Kinesis record consisting of multiple KPL user records, it will automatically invoke the KPL to extract the individual user records before returning them to the user. For producer-side developers using the **[Kinesis Producer Library (KPL)][kinesis-guide-kpl]**, the KCL integrates without additional effort. When the KCL retrieves an aggregated Amazon Kinesis record consisting of multiple KPL user records, it will automatically invoke the KPL to extract the individual user records before returning them to the user.

View file

@ -56,7 +56,7 @@ public abstract class KCLAppConfig {
* "default" profile, should match with profiles listed in "cat ~/.aws/config" * "default" profile, should match with profiles listed in "cat ~/.aws/config"
*/ */
private AwsCredentialsProvider getCredentialsProvider() { private AwsCredentialsProvider getCredentialsProvider() {
final String awsProfile = System.getProperty("credentials"); final String awsProfile = System.getProperty("awsProfile");
return (awsProfile != null) ? return (awsProfile != null) ?
ProfileCredentialsProvider.builder().profileName(awsProfile).build() : DefaultCredentialsProvider.create(); ProfileCredentialsProvider.builder().profileName(awsProfile).build() : DefaultCredentialsProvider.create();
} }

View file

@ -9,6 +9,10 @@ import software.amazon.kinesis.utils.TestConsumer;
public class BasicStreamConsumerIntegrationTest { public class BasicStreamConsumerIntegrationTest {
/**
* Test with a polling consumer using HTTP2 protocol.
* In the polling case, consumer makes calls to the producer each time to request records to process.
*/
@Test @Test
public void kclReleaseCanaryPollingH2Test() throws Exception { public void kclReleaseCanaryPollingH2Test() throws Exception {
KCLAppConfig consumerConfig = new ReleaseCanaryPollingH2TestConfig(); KCLAppConfig consumerConfig = new ReleaseCanaryPollingH2TestConfig();
@ -16,6 +20,10 @@ public class BasicStreamConsumerIntegrationTest {
consumer.run(); consumer.run();
} }
/**
* Test with a polling consumer using HTTP1 protocol.
* In the polling case, consumer makes calls to the producer each time to request records to process.
*/
@Test @Test
public void kclReleaseCanaryPollingH1Test() throws Exception { public void kclReleaseCanaryPollingH1Test() throws Exception {
KCLAppConfig consumerConfig = new ReleaseCanaryPollingH1TestConfig(); KCLAppConfig consumerConfig = new ReleaseCanaryPollingH1TestConfig();
@ -23,6 +31,10 @@ public class BasicStreamConsumerIntegrationTest {
consumer.run(); consumer.run();
} }
/**
* Test with a streaming consumer.
* In the streaming configuration, connection is made once between consumer and producer and producer continuously sends data to be processed.
*/
@Test @Test
public void kclReleaseCanaryStreamingTest() throws Exception { public void kclReleaseCanaryStreamingTest() throws Exception {
KCLAppConfig consumerConfig = new ReleaseCanaryStreamingTestConfig(); KCLAppConfig consumerConfig = new ReleaseCanaryStreamingTestConfig();

View file

@ -1,22 +1,40 @@
package software.amazon.kinesis.utils; package software.amazon.kinesis.utils;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.dynamodb.model.ListTablesRequest;
import software.amazon.awssdk.services.dynamodb.model.ListTablesResponse;
import software.amazon.kinesis.common.FutureUtils;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@Slf4j @Slf4j
@NoArgsConstructor
public abstract class AWSResourceManager { public abstract class AWSResourceManager {
public AWSResourceManager() {} /**
* Make delete resource API call for specific resource type
*/
public abstract void _deleteResource(String resourceName) throws Exception; public abstract void _deleteResource(String resourceName) throws Exception;
/**
* Check if resource with given name is in active state
*/
public abstract boolean _isResourceActive(String name); public abstract boolean _isResourceActive(String name);
/**
* Get a list of all the names of resources of a specified type
* @return
* @throws Exception
*/
public abstract List<String> _getAllResourceNames() throws Exception; public abstract List<String> _getAllResourceNames() throws Exception;
/** /**
* Deletes resource with specified resource name * Delete resource with specified resource name
* @param resourceName
* @throws Exception
*/ */
public void deleteResource(String resourceName) throws Exception { public void deleteResource(String resourceName) throws Exception {
@ -51,9 +69,9 @@ public abstract class AWSResourceManager {
* Delete all instances of a particular resource type * Delete all instances of a particular resource type
*/ */
public void deleteAllResource() throws Exception { public void deleteAllResource() throws Exception {
final List<String> streamNames = _getAllResourceNames(); final List<String> resourceNames = _getAllResourceNames();
for (String streamName : streamNames) { for (String resourceName : resourceNames) {
deleteResource(streamName); deleteResource(resourceName);
} }
} }
} }

View file

@ -1,5 +1,6 @@
package software.amazon.kinesis.utils; package software.amazon.kinesis.utils;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest; import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
@ -21,14 +22,11 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@Slf4j @Slf4j
@AllArgsConstructor
public class LeaseTableManager extends AWSResourceManager { public class LeaseTableManager extends AWSResourceManager {
private final DynamoDbAsyncClient dynamoClient; private final DynamoDbAsyncClient dynamoClient;
public LeaseTableManager(DynamoDbAsyncClient dynamoClient) throws URISyntaxException, IOException {
this.dynamoClient = dynamoClient;
}
public boolean _isResourceActive(String tableName) { public boolean _isResourceActive(String tableName) {
final DescribeTableRequest request = DescribeTableRequest.builder().tableName(tableName).build(); final DescribeTableRequest request = DescribeTableRequest.builder().tableName(tableName).build();
final CompletableFuture<DescribeTableResponse> describeTableResponseCompletableFuture = dynamoClient.describeTable(request); final CompletableFuture<DescribeTableResponse> describeTableResponseCompletableFuture = dynamoClient.describeTable(request);
@ -58,15 +56,24 @@ public class LeaseTableManager extends AWSResourceManager {
public List<String> _getAllResourceNames() throws Exception { public List<String> _getAllResourceNames() throws Exception {
List<String> tableNames = new ArrayList<>(); List<String> tableNames = new ArrayList<>();
final ListTablesRequest request = ListTablesRequest.builder().build(); ListTablesRequest request = ListTablesRequest.builder().build();
ListTablesResponse response = null; ListTablesResponse response = null;
String startTableName = null;
// Continue while paginated call is still returning table names
while(response == null || response.lastEvaluatedTableName() != null) { while(response == null || response.lastEvaluatedTableName() != null) {
if (startTableName != null) {
request = ListTablesRequest.builder().exclusiveStartTableName(startTableName).build();
}
try { try {
response = FutureUtils.resolveOrCancelFuture(dynamoClient.listTables(request), Duration.ofSeconds(60)); response = FutureUtils.resolveOrCancelFuture(dynamoClient.listTables(request), Duration.ofSeconds(60));
} catch (ExecutionException | InterruptedException e) { } catch (ExecutionException | InterruptedException e) {
throw new Exception("Error listing all lease tables"); throw new Exception("Error listing all lease tables");
} }
// Add all table names to list
tableNames.addAll(response.tableNames()); tableNames.addAll(response.tableNames());
// Set startTableName for next call to be the last table name evaluated in current call
startTableName = response.lastEvaluatedTableName();
} }
return tableNames; return tableNames;
} }

View file

@ -26,10 +26,9 @@ public class RecordValidatorQueue {
values.add(data); values.add(data);
} }
public RecordValidationStatus validateRecords(int expectedShardCount) { public RecordValidationStatus validateRecords(int expectedRecordCount) {
// Validate that each List in the HashMap has data records in increasing order // Validate that each List in the HashMap has data records in increasing order
boolean incOrder = true;
for (Map.Entry<String, List<String>> entry : dict.entrySet()) { for (Map.Entry<String, List<String>> entry : dict.entrySet()) {
List<String> recordsPerShard = entry.getValue(); List<String> recordsPerShard = entry.getValue();
int prevVal = -1; int prevVal = -1;
@ -44,16 +43,16 @@ public class RecordValidatorQueue {
} }
// Validate that no records are missing over all shards // Validate that no records are missing over all shards
int actualShardCount = 0; int actualRecordCount = 0;
for (Map.Entry<String, List<String>> entry : dict.entrySet()) { for (Map.Entry<String, List<String>> entry : dict.entrySet()) {
List<String> recordsPerShard = entry.getValue(); List<String> recordsPerShard = entry.getValue();
Set<String> noDupRecords = new HashSet<String>(recordsPerShard); Set<String> noDupRecords = new HashSet<String>(recordsPerShard);
actualShardCount += noDupRecords.size(); actualRecordCount += noDupRecords.size();
} }
// If this is true, then there was some record that was missed during processing. // If this is true, then there was some record that was missed during processing.
if (actualShardCount != expectedShardCount) { if (actualRecordCount != expectedRecordCount) {
log.error("Failed to get correct number of records processed. Should be {} but was {}", expectedShardCount, actualShardCount); log.error("Failed to get correct number of records processed. Should be {} but was {}", expectedRecordCount, actualRecordCount);
return RecordValidationStatus.MISSING_RECORD; return RecordValidationStatus.MISSING_RECORD;
} }

View file

@ -2,6 +2,7 @@ package software.amazon.kinesis.utils;
import lombok.Value; import lombok.Value;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.dynamodb.model.ListTablesRequest;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest; import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
import software.amazon.awssdk.services.kinesis.model.DeleteStreamRequest; import software.amazon.awssdk.services.kinesis.model.DeleteStreamRequest;
@ -63,15 +64,24 @@ public class StreamExistenceManager extends AWSResourceManager {
public List<String> _getAllResourceNames() throws Exception { public List<String> _getAllResourceNames() throws Exception {
List<String> streamNames = new ArrayList<>(); List<String> streamNames = new ArrayList<>();
final ListStreamsRequest request = ListStreamsRequest.builder().build(); ListStreamsRequest request = ListStreamsRequest.builder().build();
ListStreamsResponse response = null; ListStreamsResponse response = null;
String startStreamName = null;
// Continue while paginated call is still returning stream names
while(response == null || response.hasMoreStreams()) { while(response == null || response.hasMoreStreams()) {
if (startStreamName != null) {
request = ListStreamsRequest.builder().exclusiveStartStreamName(startStreamName).build();
}
try { try {
response = FutureUtils.resolveOrCancelFuture(client.listStreams(request), Duration.ofSeconds(60)); response = FutureUtils.resolveOrCancelFuture(client.listStreams(request), Duration.ofSeconds(60));
} catch (Exception e) { } catch (Exception e) {
throw new Exception("Error listing all lease tables"); throw new Exception("Error listing all lease tables");
} }
// Add all stream names to list
streamNames.addAll(response.streamNames()); streamNames.addAll(response.streamNames());
// Set startTableName for next call to be the last table name evaluated in current call
startStreamName = response.nextToken();
} }
return streamNames; return streamNames;
} }

View file

@ -49,6 +49,7 @@ public class TestConsumer {
private ScheduledExecutorService consumerExecutor; private ScheduledExecutorService consumerExecutor;
private ScheduledFuture<?> consumerFuture; private ScheduledFuture<?> consumerFuture;
private DynamoDbAsyncClient dynamoClient; private DynamoDbAsyncClient dynamoClient;
private final ObjectMapper mapper = new ObjectMapper();
public int successfulPutRecords = 0; public int successfulPutRecords = 0;
public BigInteger payloadCounter = new BigInteger("0"); public BigInteger payloadCounter = new BigInteger("0");
@ -77,7 +78,7 @@ public class TestConsumer {
try { try {
startConsumer(); startConsumer();
// Sleep for two minutes to allow the producer/consumer to run and then end the test case. // Sleep for three minutes to allow the producer/consumer to run and then end the test case.
Thread.sleep(TimeUnit.SECONDS.toMillis(60 * 3)); Thread.sleep(TimeUnit.SECONDS.toMillis(60 * 3));
// Stops sending dummy data. // Stops sending dummy data.
@ -92,15 +93,14 @@ public class TestConsumer {
// Validate processed data // Validate processed data
validateRecordProcessor(); validateRecordProcessor();
// Clean up resources created
Thread.sleep(TimeUnit.SECONDS.toMillis(30));
deleteResources(streamExistenceManager, leaseTableManager);
} catch (Exception e) { } catch (Exception e) {
// Test Failed. Clean up resources and then throw exception. // Test Failed. Clean up resources and then throw exception.
log.info("----------Test Failed: Cleaning up resources------------"); log.info("----------Test Failed: Cleaning up resources------------");
deleteResources(streamExistenceManager, leaseTableManager);
throw e; throw e;
} finally {
// Clean up resources created
Thread.sleep(TimeUnit.SECONDS.toMillis(30));
deleteResources(streamExistenceManager, leaseTableManager);
} }
} }
@ -168,20 +168,18 @@ public class TestConsumer {
successfulPutRecords += 1; successfulPutRecords += 1;
log.info("---------Record published, successfulPutRecords is now: {}", successfulPutRecords); log.info("---------Record published, successfulPutRecords is now: {}", successfulPutRecords);
} catch (InterruptedException e) { } catch (InterruptedException e) {
log.info("Interrupted, assuming shutdown."); log.info("Interrupted, assuming shutdown. ", e);
} catch (ExecutionException | RuntimeException e) { } catch (ExecutionException | RuntimeException e) {
log.error("Error during publish records"); log.error("Error during publish records", e);
} }
} }
private ByteBuffer wrapWithCounter(int payloadSize, BigInteger payloadCounter) throws RuntimeException { private ByteBuffer wrapWithCounter(int payloadSize, BigInteger payloadCounter) throws RuntimeException {
final byte[] returnData; final byte[] returnData;
log.info("--------------Putting record with data: {}", payloadCounter); log.info("--------------Putting record with data: {}", payloadCounter);
ObjectMapper mapper = new ObjectMapper();
try { try {
returnData = mapper.writeValueAsBytes(payloadCounter); returnData = mapper.writeValueAsBytes(payloadCounter);
} catch (Exception e) { } catch (Exception e) {
log.error("Error creating payload data for {}", payloadCounter.toString());
throw new RuntimeException("Error converting object to bytes: ", e); throw new RuntimeException("Error converting object to bytes: ", e);
} }
return ByteBuffer.wrap(returnData); return ByteBuffer.wrap(returnData);
@ -201,7 +199,7 @@ public class TestConsumer {
} catch (InterruptedException e) { } catch (InterruptedException e) {
log.info("Interrupted while waiting for graceful shutdown. Continuing."); log.info("Interrupted while waiting for graceful shutdown. Continuing.");
} catch (ExecutionException | TimeoutException e) { } catch (ExecutionException | TimeoutException e) {
throw new Exception("Exception while executing graceful shutdown. {}", e); throw e;
} }
log.info("Completed, shutting down now."); log.info("Completed, shutting down now.");
} }