Adding modularity to resource managers and nit changes

This commit is contained in:
Meher Mankikar 2023-06-20 15:57:23 -07:00
parent 2cfb411fea
commit d479e91add
13 changed files with 190 additions and 180 deletions

View file

@ -65,13 +65,7 @@ public abstract class KCLAppConfig {
return InitialPositionInStream.TRIM_HORIZON;
}
public Protocol getConsumerProtocol() {
return Protocol.HTTP1_1;
}
public Protocol getProducerProtocol() {
return Protocol.HTTP1_1;
}
public abstract Protocol getKinesisClientProtocol();
public ProducerConfig getProducerConfig() {
return ProducerConfig.builder()
@ -86,37 +80,31 @@ public abstract class KCLAppConfig {
return null;
}
public final KinesisAsyncClient buildAsyncKinesisClient(Protocol protocol) throws URISyntaxException, IOException {
public final KinesisAsyncClient buildAsyncKinesisClient() throws URISyntaxException, IOException {
if (kinesisAsyncClient == null) {
this.kinesisAsyncClient = buildAsyncKinesisClient(Optional.ofNullable(protocol));
// Setup H2 client config.
final NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder()
.maxConcurrency(Integer.MAX_VALUE);
builder.protocol(getKinesisClientProtocol());
final SdkAsyncHttpClient sdkAsyncHttpClient =
builder.buildWithDefaults(AttributeMap.builder().build());
// Setup client builder by default values
final KinesisAsyncClientBuilder kinesisAsyncClientBuilder = KinesisAsyncClient.builder().region(getRegion());
kinesisAsyncClientBuilder.httpClient(sdkAsyncHttpClient);
kinesisAsyncClientBuilder.credentialsProvider(getCredentialsProvider());
this.kinesisAsyncClient = kinesisAsyncClientBuilder.build();
}
return this.kinesisAsyncClient;
}
public final KinesisAsyncClient buildAsyncKinesisClient(Optional<Protocol> protocol) throws URISyntaxException, IOException {
// Setup H2 client config.
final NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder()
.maxConcurrency(Integer.MAX_VALUE);
// If not present, defaults to HTTP1_1
if (protocol.isPresent()) {
builder.protocol(protocol.get());
}
final SdkAsyncHttpClient sdkAsyncHttpClient =
builder.buildWithDefaults(AttributeMap.builder().build());
// Setup client builder by default values
final KinesisAsyncClientBuilder kinesisAsyncClientBuilder = KinesisAsyncClient.builder().region(getRegion());
kinesisAsyncClientBuilder.httpClient(sdkAsyncHttpClient);
kinesisAsyncClientBuilder.credentialsProvider(getCredentialsProvider());
return kinesisAsyncClientBuilder.build();
}
public final DynamoDbAsyncClient buildAsyncDynamoDbClient() throws IOException {
if (this.dynamoDbAsyncClient == null) {
final DynamoDbAsyncClientBuilder builder = DynamoDbAsyncClient.builder().region(getRegion());
@ -135,7 +123,7 @@ public abstract class KCLAppConfig {
return this.cloudWatchAsyncClient;
}
public String getWorkerId() throws UnknownHostException {
public final String getWorkerId() throws UnknownHostException {
return Inet4Address.getLocalHost().getHostName();
}
@ -150,9 +138,9 @@ public abstract class KCLAppConfig {
return new TestRecordProcessorFactory(getRecordValidator());
}
public ConfigsBuilder getConfigsBuilder() throws IOException, URISyntaxException {
public final ConfigsBuilder getConfigsBuilder() throws IOException, URISyntaxException {
final String workerId = getWorkerId();
return new ConfigsBuilder(getStreamName(), getStreamName(), buildAsyncKinesisClient(getConsumerProtocol()), buildAsyncDynamoDbClient(),
return new ConfigsBuilder(getStreamName(), getStreamName(), buildAsyncKinesisClient(), buildAsyncDynamoDbClient(),
buildAsyncCloudWatchClient(), workerId, getShardRecordProcessorFactory());
}

View file

@ -22,7 +22,7 @@ public class ReleaseCanaryPollingH1TestConfig extends KCLAppConfig {
}
@Override
public Protocol getConsumerProtocol() {
public Protocol getKinesisClientProtocol() {
return Protocol.HTTP1_1;
}

View file

@ -21,7 +21,7 @@ public class ReleaseCanaryPollingH2TestConfig extends KCLAppConfig {
}
@Override
public Protocol getConsumerProtocol() {
public Protocol getKinesisClientProtocol() {
return Protocol.HTTP2;
}

View file

@ -16,7 +16,7 @@ public class ReleaseCanaryStreamingTestConfig extends KCLAppConfig {
}
@Override
public Protocol getConsumerProtocol() {
public Protocol getKinesisClientProtocol() {
return Protocol.HTTP2;
}

View file

@ -16,17 +16,17 @@ public class BasicStreamConsumerIntegrationTest {
consumer.run();
}
@Test
public void kclReleaseCanaryPollingH1Test() throws Exception {
KCLAppConfig consumerConfig = new ReleaseCanaryPollingH1TestConfig();
TestConsumer consumer = new TestConsumer(consumerConfig);
consumer.run();
}
@Test
public void kclReleaseCanaryStreamingTest() throws Exception {
KCLAppConfig consumerConfig = new ReleaseCanaryStreamingTestConfig();
TestConsumer consumer = new TestConsumer(consumerConfig);
consumer.run();
}
// @Test
// public void kclReleaseCanaryPollingH1Test() throws Exception {
// KCLAppConfig consumerConfig = new ReleaseCanaryPollingH1TestConfig();
// TestConsumer consumer = new TestConsumer(consumerConfig);
// consumer.run();
// }
//
// @Test
// public void kclReleaseCanaryStreamingTest() throws Exception {
// KCLAppConfig consumerConfig = new ReleaseCanaryStreamingTestConfig();
// TestConsumer consumer = new TestConsumer(consumerConfig);
// consumer.run();
// }
}

View file

@ -0,0 +1,59 @@
package software.amazon.kinesis.utils;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.concurrent.TimeUnit;
@Slf4j
public abstract class AWSResourceManager {
public AWSResourceManager() {}
public abstract void _deleteResource(String resourceName) throws Exception;
public abstract boolean _isResourceActive(String name);
public abstract List<String> _getAllResourceNames() throws Exception;
/**
* Deletes resource with specified resource name
* @param resourceName
* @throws Exception
*/
public void deleteResource(String resourceName) throws Exception {
try {
_deleteResource(resourceName);
} catch (Exception e) {
throw new Exception("Could not delete resource: {}", e);
}
// Wait till resource is deleted to return
int i = 0;
while (true) {
i++;
if (i > 100) {
throw new RuntimeException("Failed resource deletion");
}
try {
if (!_isResourceActive(resourceName)) {
log.info("Successfully deleted the resource {}", resourceName);
return;
}
} catch (Exception e) {
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(10));
} catch (InterruptedException e1) {}
log.info("Resource {} is not deleted yet, exception: ", resourceName);
}
}
}
/**
* Delete all instances of a particular resource type
*/
public void deleteAllResource() throws Exception {
final List<String> streamNames = _getAllResourceNames();
for (String streamName : streamNames) {
deleteResource(streamName);
}
}
}

View file

@ -3,19 +3,25 @@ package software.amazon.kinesis.utils;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
import software.amazon.awssdk.services.dynamodb.model.ListTablesRequest;
import software.amazon.awssdk.services.dynamodb.model.ListTablesResponse;
import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
import software.amazon.awssdk.services.dynamodb.model.TableStatus;
import software.amazon.kinesis.common.FutureUtils;
import java.io.IOException;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@Slf4j
public class LeaseTableManager {
public class LeaseTableManager extends AWSResourceManager {
private final DynamoDbAsyncClient dynamoClient;
@ -23,53 +29,46 @@ public class LeaseTableManager {
this.dynamoClient = dynamoClient;
}
private List<String> listAllLeaseTables() throws Exception {
final ListTablesRequest request = ListTablesRequest.builder().build();
final ListTablesResponse response;
try {
response = FutureUtils.resolveOrCancelFuture(dynamoClient.listTables(request), Duration.ofSeconds(60));
} catch (ExecutionException | InterruptedException e) {
throw new Exception("Error listing all lease tables");
}
return response.tableNames();
}
public boolean _isResourceActive(String tableName) {
final DescribeTableRequest request = DescribeTableRequest.builder().tableName(tableName).build();
final CompletableFuture<DescribeTableResponse> describeTableResponseCompletableFuture = dynamoClient.describeTable(request);
public void deleteLeaseTable(String tableName) throws Exception {
final DeleteTableRequest request = DeleteTableRequest.builder().tableName(tableName).build();
try {
FutureUtils.resolveOrCancelFuture(dynamoClient.deleteTable(request), Duration.ofSeconds(60));
} catch (ExecutionException | InterruptedException e) {
throw new Exception("Could not delete lease table: {}", e);
}
// Wait till table is deleted to return
int i = 0;
while (true) {
i++;
if (i > 100) {
throw new RuntimeException("Failed lease table deletion");
final DescribeTableResponse response = describeTableResponseCompletableFuture.get(30, TimeUnit.SECONDS);
boolean isActive = response.table().tableStatus().equals(TableStatus.ACTIVE);
if (!isActive) {
throw new RuntimeException("Table is not active, instead in status: " + response.table().tableStatus());
}
List<String> leaseTableNames = listAllLeaseTables();
log.info("All lease tables name: {}. Looking for: {}", leaseTableNames, tableName);
if (!listAllLeaseTables().contains(tableName)) {
log.info("Succesfully deleted the lease table {}", tableName);
return;
return true;
} catch (ExecutionException e) {
if (e.getCause() instanceof ResourceNotFoundException) {
return false;
} else {
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(10));
} catch (InterruptedException e1) {}
log.info("Lease table {} is not deleted yet, exception: ", tableName);
throw new RuntimeException(e);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public void deleteAllLeaseTables() throws Exception {
public void _deleteResource(String tableName) throws Exception {
final DeleteTableRequest request = DeleteTableRequest.builder().tableName(tableName).build();
FutureUtils.resolveOrCancelFuture(dynamoClient.deleteTable(request), Duration.ofSeconds(60));
}
final List<String> tableNames = listAllLeaseTables();
for (String tableName : tableNames) {
deleteLeaseTable(tableName);
public List<String> _getAllResourceNames() throws Exception {
List<String> tableNames = new ArrayList<>();
final ListTablesRequest request = ListTablesRequest.builder().build();
ListTablesResponse response = null;
while(response == null || response.lastEvaluatedTableName() != null) {
try {
response = FutureUtils.resolveOrCancelFuture(dynamoClient.listTables(request), Duration.ofSeconds(60));
} catch (ExecutionException | InterruptedException e) {
throw new Exception("Error listing all lease tables");
}
tableNames.addAll(response.tableNames());
}
return tableNames;
}
}

View file

@ -33,24 +33,14 @@ public class RecordValidatorQueue {
for (Map.Entry<String, List<String>> entry : dict.entrySet()) {
List<String> recordsPerShard = entry.getValue();
int prevVal = -1;
boolean shardIncOrder = true;
for (String record : recordsPerShard) {
int nextVal = Integer.parseInt(record);
if (prevVal > nextVal) {
log.error("The records are not in increasing order. Saw record data {} before {}.", prevVal, nextVal);
shardIncOrder = false;
return RecordValidationStatus.OUT_OF_ORDER;
}
prevVal = nextVal;
}
if (!shardIncOrder) {
incOrder = false;
break;
}
}
// If this is true, then there was some record that was processed out of order
if (!incOrder) {
return RecordValidationStatus.OUT_OF_ORDER;
}
// Validate that no records are missing over all shards

View file

@ -5,7 +5,7 @@ import org.junit.Test;
public class RecordValidatorQueueTest {
private RecordValidatorQueue recordValidator = new RecordValidatorQueue();
private final RecordValidatorQueue recordValidator = new RecordValidatorQueue();
private static final String SHARD_ID = "ABC";

View file

@ -11,10 +11,13 @@ import software.amazon.awssdk.services.kinesis.model.ListStreamsRequest;
import software.amazon.awssdk.services.kinesis.model.ListStreamsResponse;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import software.amazon.awssdk.services.kinesis.model.StreamStatus;
import software.amazon.kinesis.common.FutureUtils;
import software.amazon.kinesis.config.KCLAppConfig;
import java.io.IOException;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@ -22,17 +25,16 @@ import java.util.concurrent.TimeUnit;
@Value
@Slf4j
public class StreamExistenceManager {
public class StreamExistenceManager extends AWSResourceManager {
private final KinesisAsyncClient client;
private final KCLAppConfig testConfig;
public StreamExistenceManager(KCLAppConfig config) throws URISyntaxException, IOException {
this.testConfig = config;
this.client = config.buildAsyncKinesisClient(config.getConsumerProtocol());
this.client = config.buildAsyncKinesisClient();
}
private boolean isStreamActive(String streamName) {
public boolean _isResourceActive(String streamName) {
final DescribeStreamSummaryRequest request = DescribeStreamSummaryRequest.builder().streamName(streamName).build();
final CompletableFuture<DescribeStreamSummaryResponse> describeStreamSummaryResponseCompletableFuture = client.describeStreamSummary(request);
@ -54,6 +56,34 @@ public class StreamExistenceManager {
}
}
public void _deleteResource(String streamName) throws Exception {
final DeleteStreamRequest request = DeleteStreamRequest.builder().streamName(streamName).enforceConsumerDeletion(true).build();
client.deleteStream(request).get(30, TimeUnit.SECONDS);
}
public List<String> _getAllResourceNames() throws Exception {
List<String> streamNames = new ArrayList<>();
final ListStreamsRequest request = ListStreamsRequest.builder().build();
ListStreamsResponse response = null;
while(response == null || response.hasMoreStreams()) {
try {
response = FutureUtils.resolveOrCancelFuture(client.listStreams(request), Duration.ofSeconds(60));
} catch (Exception e) {
throw new Exception("Error listing all lease tables");
}
streamNames.addAll(response.streamNames());
}
return streamNames;
}
public void checkStreamAndCreateIfNecessary(String streamName) {
if (!_isResourceActive(streamName)) {
createStream(streamName, testConfig.getShardCount());
}
log.info("Using stream {} with region {}", streamName, testConfig.getRegion());
}
private void createStream(String streamName, int shardCount) {
final CreateStreamRequest request = CreateStreamRequest.builder().streamName(streamName).shardCount(shardCount).build();
try {
@ -69,7 +99,7 @@ public class StreamExistenceManager {
throw new RuntimeException("Failed stream creation, did not transition into active");
}
try {
boolean isActive = isStreamActive(streamName);
boolean isActive = _isResourceActive(streamName);
if (isActive) {
log.info("Succesfully created the stream {}", streamName);
return;
@ -85,59 +115,4 @@ public class StreamExistenceManager {
}
}
public void deleteStream(String streamName) {
final DeleteStreamRequest request = DeleteStreamRequest.builder().streamName(streamName).enforceConsumerDeletion(true).build();
try {
client.deleteStream(request).get(30, TimeUnit.SECONDS);
} catch (Exception e) {
throw new RuntimeException("Failed to delete stream with name " + streamName, e);
}
int i = 0;
while (true) {
i++;
if (i > 100) {
throw new RuntimeException("Failed stream deletion");
}
try {
boolean isActive = isStreamActive(streamName);
if (!isActive) {
log.info("Succesfully deleted the stream {}", streamName);
return;
}
} catch (Exception e) {
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(10));
} catch (InterruptedException e1) { }
log.info("Stream {} is not deleted yet, exception: ", streamName, e);
}
}
}
public void checkStreamAndCreateIfNecessary(String streamName) {
if (!isStreamActive(streamName)) {
createStream(streamName, testConfig.getShardCount());
}
log.info("Using stream {} with region {}", streamName, testConfig.getRegion());
}
private List<String> getAllStreamNames() {
final ListStreamsRequest request = ListStreamsRequest.builder().build();
ListStreamsResponse response;
try {
response = client.listStreams(request).get(30, TimeUnit.SECONDS);
} catch (Exception e) {
throw new RuntimeException("Failed to list all streams", e);
}
return response.streamNames();
}
public void deleteAllStreams() {
final List<String> streamNames = getAllStreamNames();
for (String streamName : streamNames) {
deleteStream(streamName);
}
}
}

View file

@ -46,6 +46,8 @@ public class TestConsumer {
private Scheduler scheduler;
private ScheduledExecutorService producerExecutor;
private ScheduledFuture<?> producerFuture;
private ScheduledExecutorService consumerExecutor;
private ScheduledFuture<?> consumerFuture;
private DynamoDbAsyncClient dynamoClient;
public int successfulPutRecords = 0;
public BigInteger payloadCounter = new BigInteger("0");
@ -54,7 +56,7 @@ public class TestConsumer {
this.consumerConfig = consumerConfig;
this.region = consumerConfig.getRegion();
this.streamName = consumerConfig.getStreamName();
this.kinesisClient = consumerConfig.buildAsyncKinesisClient(consumerConfig.getConsumerProtocol());
this.kinesisClient = consumerConfig.buildAsyncKinesisClient();
this.dynamoClient = consumerConfig.buildAsyncDynamoDbClient();
}
@ -64,14 +66,13 @@ public class TestConsumer {
final LeaseTableManager leaseTableManager = new LeaseTableManager(this.dynamoClient);
// Clean up any old streams or lease tables left in test environment
cleanTestEnvironment(streamExistenceManager, leaseTableManager);
Thread.sleep(TimeUnit.SECONDS.toMillis(30));
cleanTestResources(streamExistenceManager, leaseTableManager);
// Check if stream is created. If not, create it
streamExistenceManager.checkStreamAndCreateIfNecessary(this.streamName);
startProducer();
setUpTestResources();
setUpConsumerResources();
try {
startConsumer();
@ -98,20 +99,19 @@ public class TestConsumer {
} catch (Exception e) {
// Test Failed. Clean up resources and then throw exception.
log.info("----------Test Failed: Cleaning up resources------------");
Thread.sleep(TimeUnit.SECONDS.toMillis(30));
deleteResources(streamExistenceManager, leaseTableManager);
throw e;
}
}
private void cleanTestEnvironment(StreamExistenceManager streamExistenceManager, LeaseTableManager leaseTableManager) throws Exception {
private void cleanTestResources(StreamExistenceManager streamExistenceManager, LeaseTableManager leaseTableManager) throws Exception {
log.info("----------Before starting, Cleaning test environment----------");
log.info("----------Deleting all lease tables in account----------");
leaseTableManager.deleteAllLeaseTables();
leaseTableManager.deleteAllResource();
log.info("----------Finished deleting all lease tables-------------");
log.info("----------Deleting all streams in account----------");
streamExistenceManager.deleteAllStreams();
streamExistenceManager.deleteAllResource();
log.info("----------Finished deleting all streams-------------");
}
@ -121,7 +121,7 @@ public class TestConsumer {
this.producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 60, 1, TimeUnit.SECONDS);
}
private void setUpTestResources() throws Exception {
private void setUpConsumerResources() throws Exception {
// Setup configuration of KCL (including DynamoDB and CloudWatch)
final ConfigsBuilder configsBuilder = consumerConfig.getConfigsBuilder();
@ -149,9 +149,8 @@ public class TestConsumer {
private void startConsumer() {
// Start record processing of dummy data
final Thread schedulerThread = new Thread(scheduler);
schedulerThread.setDaemon(true);
schedulerThread.start();
this.consumerExecutor = Executors.newSingleThreadScheduledExecutor();
this.consumerFuture = consumerExecutor.schedule(scheduler, 0, TimeUnit.SECONDS);
}
public void publishRecord() {
@ -220,9 +219,9 @@ public class TestConsumer {
private void deleteResources(StreamExistenceManager streamExistenceManager, LeaseTableManager leaseTableManager) throws Exception {
log.info("-------------Start deleting stream.----------------");
streamExistenceManager.deleteStream(this.streamName);
streamExistenceManager.deleteResource(this.streamName);
log.info("-------------Start deleting lease table.----------------");
leaseTableManager.deleteLeaseTable(this.consumerConfig.getStreamName());
leaseTableManager.deleteResource(this.consumerConfig.getStreamName());
log.info("-------------Finished deleting resources.----------------");
}

View file

@ -24,7 +24,7 @@ public class TestRecordProcessor implements ShardRecordProcessor {
private String shardId;
RecordValidatorQueue recordValidator;
private final RecordValidatorQueue recordValidator;
public TestRecordProcessor(RecordValidatorQueue recordValidator) {
this.recordValidator = recordValidator;
@ -48,8 +48,8 @@ public class TestRecordProcessor implements ShardRecordProcessor {
try {
log.info("Processing {} record(s)", processRecordsInput.records().size());
for (KinesisClientRecord r : processRecordsInput.records()) {
String data = new String(asByteArray(r.data()));
for (KinesisClientRecord kinesisRecord : processRecordsInput.records()) {
final String data = new String(asByteArray(kinesisRecord.data()));
log.info("Processing record pk: {}", data);
recordValidator.add(shardId, data);
}

View file

@ -5,7 +5,7 @@ import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
RecordValidatorQueue recordValidator;
private final RecordValidatorQueue recordValidator;
public TestRecordProcessorFactory(RecordValidatorQueue recordValidator) {
this.recordValidator = recordValidator;