Revisions from review

This commit is contained in:
Meher Mankikar 2023-06-20 10:25:11 -07:00
parent ad26ac5149
commit 23edc1b934
9 changed files with 279 additions and 166 deletions

View file

@ -208,8 +208,8 @@
<value>${sqlite4java.libpath}</value> <value>${sqlite4java.libpath}</value>
</property> </property>
<property> <property>
<name>credentials</name> <name>awsProfile</name>
<value>${credentials}</value> <value>${awsProfile}</value>
</property> </property>
</systemProperties> </systemProperties>
</configuration> </configuration>

View file

@ -7,7 +7,6 @@ import software.amazon.kinesis.utils.RecordValidatorQueue;
import software.amazon.kinesis.utils.ReshardOptions; import software.amazon.kinesis.utils.ReshardOptions;
import software.amazon.kinesis.utils.TestRecordProcessorFactory; import software.amazon.kinesis.utils.TestRecordProcessorFactory;
import lombok.Builder; import lombok.Builder;
import software.amazon.awssdk.arns.Arn;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider; import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
import software.amazon.awssdk.http.Protocol; import software.amazon.awssdk.http.Protocol;
@ -39,27 +38,27 @@ import java.util.Optional;
*/ */
public abstract class KCLAppConfig { public abstract class KCLAppConfig {
private KinesisAsyncClient kinesisAsyncClient;
private DynamoDbAsyncClient dynamoDbAsyncClient;
private CloudWatchAsyncClient cloudWatchAsyncClient;
private RecordValidatorQueue recordValidator;
/** /**
* Name used for test stream and lease tracker table * Name used for test stream and lease tracker table
*/ */
public abstract String getStreamName(); public abstract String getStreamName();
public String getStreamArn() {
return null;
}
public int getShardCount() { return 4; } public int getShardCount() { return 4; }
public String getEndpoint() { return ""; }
public Region getRegion() { return Region.US_WEST_2; } public Region getRegion() { return Region.US_WEST_2; }
/** /**
* "default" profile, should match with profiles listed in "cat ~/.aws/config" * "default" profile, should match with profiles listed in "cat ~/.aws/config"
*/ */
public String getProfile() { private AwsCredentialsProvider getCredentialsProvider() {
String iamUser = System.getProperty("credentials"); final String awsProfile = System.getProperty("credentials");
return iamUser; return (awsProfile != null) ?
ProfileCredentialsProvider.builder().profileName(awsProfile).build() : DefaultCredentialsProvider.create();
} }
public InitialPositionInStream getInitialPosition() { public InitialPositionInStream getInitialPosition() {
@ -87,24 +86,14 @@ public abstract class KCLAppConfig {
return null; return null;
} }
public KinesisAsyncClient buildConsumerClient() throws URISyntaxException, IOException { public final KinesisAsyncClient buildAsyncKinesisClient(Protocol protocol) throws URISyntaxException, IOException {
return buildAsyncKinesisClient(getConsumerProtocol()); if (kinesisAsyncClient == null) {
this.kinesisAsyncClient = buildAsyncKinesisClient(Optional.ofNullable(protocol));
}
return this.kinesisAsyncClient;
} }
public KinesisAsyncClient buildProducerClient() throws URISyntaxException, IOException { public final KinesisAsyncClient buildAsyncKinesisClient(Optional<Protocol> protocol) throws URISyntaxException, IOException {
return buildAsyncKinesisClient(getProducerProtocol());
}
public KinesisAsyncClient buildAsyncKinesisClient(Protocol protocol) throws URISyntaxException, IOException {
return buildAsyncKinesisClient(Optional.ofNullable(protocol));
}
private AwsCredentialsProvider getCredentialsProvider() {
return (getProfile() != null) ?
ProfileCredentialsProvider.builder().profileName(getProfile()).build() : DefaultCredentialsProvider.create();
}
public KinesisAsyncClient buildAsyncKinesisClient(Optional<Protocol> protocol) throws URISyntaxException, IOException {
// Setup H2 client config. // Setup H2 client config.
final NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder() final NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder()
@ -128,26 +117,33 @@ public abstract class KCLAppConfig {
return kinesisAsyncClientBuilder.build(); return kinesisAsyncClientBuilder.build();
} }
public DynamoDbAsyncClient buildAsyncDynamoDbClient() throws IOException { public final DynamoDbAsyncClient buildAsyncDynamoDbClient() throws IOException {
final DynamoDbAsyncClientBuilder builder = DynamoDbAsyncClient.builder().region(getRegion()); if (this.dynamoDbAsyncClient == null) {
final DynamoDbAsyncClientBuilder builder = DynamoDbAsyncClient.builder().region(getRegion());
builder.credentialsProvider(getCredentialsProvider()); builder.credentialsProvider(getCredentialsProvider());
return builder.build(); this.dynamoDbAsyncClient = builder.build();
}
return this.dynamoDbAsyncClient;
} }
public CloudWatchAsyncClient buildAsyncCloudWatchClient() throws IOException { public final CloudWatchAsyncClient buildAsyncCloudWatchClient() throws IOException {
final CloudWatchAsyncClientBuilder builder = CloudWatchAsyncClient.builder().region(getRegion()); if (this.cloudWatchAsyncClient == null) {
final CloudWatchAsyncClientBuilder builder = CloudWatchAsyncClient.builder().region(getRegion());
builder.credentialsProvider(getCredentialsProvider()); builder.credentialsProvider(getCredentialsProvider());
return builder.build(); this.cloudWatchAsyncClient = builder.build();
}
return this.cloudWatchAsyncClient;
} }
public String getWorkerId() throws UnknownHostException { public String getWorkerId() throws UnknownHostException {
return Inet4Address.getLocalHost().getHostName(); return Inet4Address.getLocalHost().getHostName();
} }
public RecordValidatorQueue getRecordValidator() { public final RecordValidatorQueue getRecordValidator() {
return new RecordValidatorQueue(); if (recordValidator == null) {
this.recordValidator = new RecordValidatorQueue();
}
return this.recordValidator;
} }
public ShardRecordProcessorFactory getShardRecordProcessorFactory() { public ShardRecordProcessorFactory getShardRecordProcessorFactory() {
@ -156,21 +152,16 @@ public abstract class KCLAppConfig {
public ConfigsBuilder getConfigsBuilder() throws IOException, URISyntaxException { public ConfigsBuilder getConfigsBuilder() throws IOException, URISyntaxException {
final String workerId = getWorkerId(); final String workerId = getWorkerId();
if (getStreamArn() == null) { return new ConfigsBuilder(getStreamName(), getStreamName(), buildAsyncKinesisClient(getConsumerProtocol()), buildAsyncDynamoDbClient(),
return new ConfigsBuilder(getStreamName(), getStreamName(), buildConsumerClient(), buildAsyncDynamoDbClient(), buildAsyncCloudWatchClient(), workerId, getShardRecordProcessorFactory());
buildAsyncCloudWatchClient(), workerId, getShardRecordProcessorFactory());
} else {
return new ConfigsBuilder(Arn.fromString(getStreamArn()), getStreamName(), buildConsumerClient(), buildAsyncDynamoDbClient(),
buildAsyncCloudWatchClient(), workerId, getShardRecordProcessorFactory());
}
} }
public RetrievalConfig getRetrievalConfig() throws IOException, URISyntaxException { public RetrievalConfig getRetrievalConfig() throws IOException, URISyntaxException {
InitialPositionInStreamExtended initialPosition = InitialPositionInStreamExtended final InitialPositionInStreamExtended initialPosition = InitialPositionInStreamExtended
.newInitialPosition(getInitialPosition()); .newInitialPosition(getInitialPosition());
// Default is a streaming consumer // Default is a streaming consumer
RetrievalConfig config = getConfigsBuilder().retrievalConfig(); final RetrievalConfig config = getConfigsBuilder().retrievalConfig();
config.initialPositionInStreamExtended(initialPosition); config.initialPositionInStreamExtended(initialPosition);
return config; return config;
} }

View file

@ -7,14 +7,18 @@ import software.amazon.kinesis.retrieval.polling.PollingConfig;
import java.io.IOException; import java.io.IOException;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.UUID;
/** /**
* Config for a polling consumer with HTTP protocol of HTTP1 * Config for a polling consumer with HTTP protocol of HTTP1
*/ */
public class ReleaseCanaryPollingH1TestConfig extends KCLAppConfig { public class ReleaseCanaryPollingH1TestConfig extends KCLAppConfig {
private final UUID uniqueId = UUID.randomUUID();
@Override @Override
public String getStreamName() { public String getStreamName() {
return "KCLReleaseCanary2XPollingH1TestStream"; return "KCLReleaseCanary2XPollingH1TestStream_" + uniqueId;
} }
@Override @Override
@ -25,12 +29,12 @@ public class ReleaseCanaryPollingH1TestConfig extends KCLAppConfig {
@Override @Override
public RetrievalConfig getRetrievalConfig() throws IOException, URISyntaxException { public RetrievalConfig getRetrievalConfig() throws IOException, URISyntaxException {
InitialPositionInStreamExtended initialPosition = InitialPositionInStreamExtended final InitialPositionInStreamExtended initialPosition = InitialPositionInStreamExtended
.newInitialPosition(getInitialPosition()); .newInitialPosition(getInitialPosition());
RetrievalConfig config = getConfigsBuilder().retrievalConfig(); final RetrievalConfig config = getConfigsBuilder().retrievalConfig();
config.initialPositionInStreamExtended(initialPosition); config.initialPositionInStreamExtended(initialPosition);
config.retrievalSpecificConfig(new PollingConfig(getStreamName(), buildConsumerClient())); config.retrievalSpecificConfig(new PollingConfig(getStreamName(), config.kinesisClient()));
return config; return config;
} }

View file

@ -7,14 +7,17 @@ import software.amazon.kinesis.retrieval.polling.PollingConfig;
import java.io.IOException; import java.io.IOException;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.UUID;
/** /**
* Config for a polling consumer with HTTP protocol of HTTP2 * Config for a polling consumer with HTTP protocol of HTTP2
*/ */
public class ReleaseCanaryPollingH2TestConfig extends KCLAppConfig { public class ReleaseCanaryPollingH2TestConfig extends KCLAppConfig {
private final UUID uniqueId = UUID.randomUUID();
@Override @Override
public String getStreamName() { public String getStreamName() {
return "KCLReleaseCanary2XPollingH2TestStream"; return "KCLReleaseCanary2XPollingH2TestStream_" + uniqueId;
} }
@Override @Override
@ -25,12 +28,12 @@ public class ReleaseCanaryPollingH2TestConfig extends KCLAppConfig {
@Override @Override
public RetrievalConfig getRetrievalConfig() throws IOException, URISyntaxException { public RetrievalConfig getRetrievalConfig() throws IOException, URISyntaxException {
InitialPositionInStreamExtended initialPosition = InitialPositionInStreamExtended final InitialPositionInStreamExtended initialPosition = InitialPositionInStreamExtended
.newInitialPosition(getInitialPosition()); .newInitialPosition(getInitialPosition());
RetrievalConfig config = getConfigsBuilder().retrievalConfig(); final RetrievalConfig config = getConfigsBuilder().retrievalConfig();
config.initialPositionInStreamExtended(initialPosition); config.initialPositionInStreamExtended(initialPosition);
config.retrievalSpecificConfig(new PollingConfig(getStreamName(), buildConsumerClient())); config.retrievalSpecificConfig(new PollingConfig(getStreamName(), config.kinesisClient()));
return config; return config;
} }

View file

@ -2,13 +2,17 @@ package software.amazon.kinesis.config;
import software.amazon.awssdk.http.Protocol; import software.amazon.awssdk.http.Protocol;
import java.util.UUID;
/** /**
* Config for a streaming consumer with HTTP protocol of HTTP2 * Config for a streaming consumer with HTTP protocol of HTTP2
*/ */
public class ReleaseCanaryStreamingTestConfig extends KCLAppConfig { public class ReleaseCanaryStreamingTestConfig extends KCLAppConfig {
private final UUID uniqueId = UUID.randomUUID();
@Override @Override
public String getStreamName() { public String getStreamName() {
return "KCLReleaseCanary2XStreamingTestStream"; return "KCLReleaseCanary2XStreamingTestStream_" + uniqueId;
} }
@Override @Override

View file

@ -2,29 +2,29 @@ package software.amazon.kinesis.lifecycle;
import org.junit.Test; import org.junit.Test;
import software.amazon.kinesis.config.KCLAppConfig; import software.amazon.kinesis.config.KCLAppConfig;
import software.amazon.kinesis.config.ReleaseCanaryPollingH1TestConfig;
import software.amazon.kinesis.config.ReleaseCanaryPollingH2TestConfig; import software.amazon.kinesis.config.ReleaseCanaryPollingH2TestConfig;
import software.amazon.kinesis.config.ReleaseCanaryPollingH1TestConfig;
import software.amazon.kinesis.config.ReleaseCanaryStreamingTestConfig; import software.amazon.kinesis.config.ReleaseCanaryStreamingTestConfig;
import software.amazon.kinesis.utils.TestConsumer; import software.amazon.kinesis.utils.TestConsumer;
public class BasicStreamingPollingIntegrationTest { public class BasicStreamConsumerIntegrationTest {
@Test @Test
public void KCLReleaseCanaryPollingH2Test() throws Exception { public void kclReleaseCanaryPollingH2Test() throws Exception {
KCLAppConfig consumerConfig = new ReleaseCanaryPollingH2TestConfig(); KCLAppConfig consumerConfig = new ReleaseCanaryPollingH2TestConfig();
TestConsumer consumer = new TestConsumer(consumerConfig); TestConsumer consumer = new TestConsumer(consumerConfig);
consumer.run(); consumer.run();
} }
@Test @Test
public void KCLReleaseCanaryPollingH1Test() throws Exception { public void kclReleaseCanaryPollingH1Test() throws Exception {
KCLAppConfig consumerConfig = new ReleaseCanaryPollingH1TestConfig(); KCLAppConfig consumerConfig = new ReleaseCanaryPollingH1TestConfig();
TestConsumer consumer = new TestConsumer(consumerConfig); TestConsumer consumer = new TestConsumer(consumerConfig);
consumer.run(); consumer.run();
} }
@Test @Test
public void KCLReleaseCanaryStreamingTest() throws Exception { public void kclReleaseCanaryStreamingTest() throws Exception {
KCLAppConfig consumerConfig = new ReleaseCanaryStreamingTestConfig(); KCLAppConfig consumerConfig = new ReleaseCanaryStreamingTestConfig();
TestConsumer consumer = new TestConsumer(consumerConfig); TestConsumer consumer = new TestConsumer(consumerConfig);
consumer.run(); consumer.run();

View file

@ -0,0 +1,74 @@
package software.amazon.kinesis.utils;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
import software.amazon.awssdk.services.dynamodb.model.ListTablesRequest;
import software.amazon.awssdk.services.dynamodb.model.ListTablesResponse;
import software.amazon.kinesis.common.FutureUtils;
import java.io.IOException;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@Slf4j
public class LeaseTableManager {
private final DynamoDbAsyncClient dynamoClient;
public LeaseTableManager(DynamoDbAsyncClient dynamoClient) throws URISyntaxException, IOException {
this.dynamoClient = dynamoClient;
}
private List<String> listAllLeaseTables() throws Exception {
final ListTablesRequest request = ListTablesRequest.builder().build();
final ListTablesResponse response;
try {
response = FutureUtils.resolveOrCancelFuture(dynamoClient.listTables(request), Duration.ofSeconds(60));
} catch (ExecutionException | InterruptedException e) {
throw new Exception("Error listing all lease tables");
}
return response.tableNames();
}
public void deleteLeaseTable(String tableName) throws Exception {
final DeleteTableRequest request = DeleteTableRequest.builder().tableName(tableName).build();
try {
FutureUtils.resolveOrCancelFuture(dynamoClient.deleteTable(request), Duration.ofSeconds(60));
} catch (ExecutionException | InterruptedException e) {
throw new Exception("Could not delete lease table: {}", e);
}
// Wait till table is deleted to return
int i = 0;
while (true) {
i++;
if (i > 100) {
throw new RuntimeException("Failed lease table deletion");
}
try {
if (!listAllLeaseTables().contains(tableName)) {
log.info("Succesfully deleted the lease table {}", tableName);
return;
}
} catch (Exception e) {
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(10));
} catch (InterruptedException e1) { }
log.info("Lease table {} is not deleted yet, exception: ", tableName, e);
}
}
}
public void deleteAllLeaseTables() throws Exception {
final List<String> tableNames = listAllLeaseTables();
for (String tableName : tableNames) {
deleteLeaseTable(tableName);
}
}
}

View file

@ -7,18 +7,19 @@ import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
import software.amazon.awssdk.services.kinesis.model.DeleteStreamRequest; import software.amazon.awssdk.services.kinesis.model.DeleteStreamRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryRequest; import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryResponse; import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryResponse;
import software.amazon.awssdk.services.kinesis.model.ListStreamsRequest;
import software.amazon.awssdk.services.kinesis.model.ListStreamsResponse;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import software.amazon.awssdk.services.kinesis.model.StreamStatus; import software.amazon.awssdk.services.kinesis.model.StreamStatus;
import software.amazon.kinesis.config.KCLAppConfig; import software.amazon.kinesis.config.KCLAppConfig;
import java.io.IOException; import java.io.IOException;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.List;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static java.lang.Thread.sleep;
@Value @Value
@Slf4j @Slf4j
public class StreamExistenceManager { public class StreamExistenceManager {
@ -32,8 +33,7 @@ public class StreamExistenceManager {
private boolean isStreamActive(String streamName) { private boolean isStreamActive(String streamName) {
DescribeStreamSummaryRequest request = DescribeStreamSummaryRequest.builder().streamName(streamName).build(); final DescribeStreamSummaryRequest request = DescribeStreamSummaryRequest.builder().streamName(streamName).build();
final CompletableFuture<DescribeStreamSummaryResponse> describeStreamSummaryResponseCompletableFuture = client.describeStreamSummary(request); final CompletableFuture<DescribeStreamSummaryResponse> describeStreamSummaryResponseCompletableFuture = client.describeStreamSummary(request);
try { try {
@ -55,7 +55,7 @@ public class StreamExistenceManager {
} }
private void createStream(String streamName, int shardCount) { private void createStream(String streamName, int shardCount) {
CreateStreamRequest request = CreateStreamRequest.builder().streamName(streamName).shardCount(shardCount).build(); final CreateStreamRequest request = CreateStreamRequest.builder().streamName(streamName).shardCount(shardCount).build();
try { try {
client.createStream(request).get(30, TimeUnit.SECONDS); client.createStream(request).get(30, TimeUnit.SECONDS);
} catch (Exception e) { } catch (Exception e) {
@ -71,12 +71,12 @@ public class StreamExistenceManager {
try { try {
boolean isActive = isStreamActive(streamName); boolean isActive = isStreamActive(streamName);
if (isActive) { if (isActive) {
log.info("Succesfully created the stream " + streamName); log.info("Succesfully created the stream {}", streamName);
return; return;
} }
} catch (Exception e) { } catch (Exception e) {
try { try {
sleep(10_000); // 10 secs backoff. Thread.sleep(TimeUnit.SECONDS.toMillis(10));
} catch (InterruptedException e1) { } catch (InterruptedException e1) {
log.error("Failed to sleep"); log.error("Failed to sleep");
} }
@ -86,8 +86,8 @@ public class StreamExistenceManager {
} }
public void deleteStream(String streamName) { public void deleteStream(String streamName) {
DeleteStreamRequest request = DeleteStreamRequest.builder().streamName(streamName).enforceConsumerDeletion(true).build(); final DeleteStreamRequest request = DeleteStreamRequest.builder().streamName(streamName).enforceConsumerDeletion(true).build();
try{ try {
client.deleteStream(request).get(30, TimeUnit.SECONDS); client.deleteStream(request).get(30, TimeUnit.SECONDS);
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException("Failed to delete stream with name " + streamName, e); throw new RuntimeException("Failed to delete stream with name " + streamName, e);
@ -102,13 +102,13 @@ public class StreamExistenceManager {
try { try {
boolean isActive = isStreamActive(streamName); boolean isActive = isStreamActive(streamName);
if (!isActive) { if (!isActive) {
log.info("Succesfully deleted the stream " + streamName); log.info("Succesfully deleted the stream {}", streamName);
return; return;
} }
} catch (Exception e) { } catch (Exception e) {
try { try {
sleep(10_000); // 10 secs backoff. Thread.sleep(TimeUnit.SECONDS.toMillis(10));
} catch (InterruptedException e1) {} } catch (InterruptedException e1) { }
log.info("Stream {} is not deleted yet, exception: ", streamName, e); log.info("Stream {} is not deleted yet, exception: ", streamName, e);
} }
} }
@ -119,7 +119,25 @@ public class StreamExistenceManager {
if (!isStreamActive(streamName)) { if (!isStreamActive(streamName)) {
createStream(streamName, testConfig.getShardCount()); createStream(streamName, testConfig.getShardCount());
} }
log.info("Using stream {} in endpoint {} with region {}", streamName, testConfig.getEndpoint(), testConfig.getRegion()); log.info("Using stream {} with region {}", streamName, testConfig.getRegion());
}
private List<String> getAllStreamNames() {
final ListStreamsRequest request = ListStreamsRequest.builder().build();
ListStreamsResponse response;
try {
response = client.listStreams(request).get(30, TimeUnit.SECONDS);
} catch (Exception e) {
throw new RuntimeException("Failed to list all streams", e);
}
return response.streamNames();
}
public void deleteAllStreams() {
final List<String> streamNames = getAllStreamNames();
for (String streamName : streamNames) {
deleteStream(streamName);
}
} }
} }

View file

@ -5,16 +5,12 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.RandomStringUtils;
import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest; import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.kinesis.checkpoint.CheckpointConfig; import software.amazon.kinesis.checkpoint.CheckpointConfig;
import software.amazon.kinesis.common.ConfigsBuilder; import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.FutureUtils;
import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.config.KCLAppConfig; import software.amazon.kinesis.config.KCLAppConfig;
import software.amazon.kinesis.coordinator.CoordinatorConfig; import software.amazon.kinesis.coordinator.CoordinatorConfig;
import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.coordinator.Scheduler;
@ -26,8 +22,6 @@ import software.amazon.kinesis.retrieval.RetrievalConfig;
import java.math.BigInteger; import java.math.BigInteger;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
@ -35,7 +29,6 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
@Slf4j @Slf4j
public class TestConsumer { public class TestConsumer {
@ -50,33 +43,87 @@ public class TestConsumer {
private LeaseManagementConfig leaseManagementConfig; private LeaseManagementConfig leaseManagementConfig;
private LifecycleConfig lifecycleConfig; private LifecycleConfig lifecycleConfig;
private ProcessorConfig processorConfig; private ProcessorConfig processorConfig;
private Scheduler scheduler;
private ScheduledExecutorService producerExecutor;
private ScheduledFuture<?> producerFuture;
private DynamoDbAsyncClient dynamoClient;
public int successfulPutRecords = 0; public int successfulPutRecords = 0;
public BigInteger payloadCounter = new BigInteger("0"); public BigInteger payloadCounter = new BigInteger("0");
public TestConsumer(KCLAppConfig consumerConfig) { public TestConsumer(KCLAppConfig consumerConfig) throws Exception {
this.consumerConfig = consumerConfig; this.consumerConfig = consumerConfig;
this.region = consumerConfig.getRegion(); this.region = consumerConfig.getRegion();
this.streamName = consumerConfig.getStreamName(); this.streamName = consumerConfig.getStreamName();
this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region)); this.kinesisClient = consumerConfig.buildAsyncKinesisClient(consumerConfig.getConsumerProtocol());
this.dynamoClient = consumerConfig.buildAsyncDynamoDbClient();
} }
public void run() throws Exception { public void run() throws Exception {
final StreamExistenceManager streamExistenceManager = new StreamExistenceManager(this.consumerConfig);
final LeaseTableManager leaseTableManager = new LeaseTableManager(this.dynamoClient);
// Clean up any old streams or lease tables left in test environment
cleanTestEnvironment(streamExistenceManager, leaseTableManager);
Thread.sleep(TimeUnit.SECONDS.toMillis(30));
// Check if stream is created. If not, create it // Check if stream is created. If not, create it
StreamExistenceManager streamExistenceManager = new StreamExistenceManager(this.consumerConfig);
streamExistenceManager.checkStreamAndCreateIfNecessary(this.streamName); streamExistenceManager.checkStreamAndCreateIfNecessary(this.streamName);
startProducer();
setUpTestResources();
try {
startConsumer();
// Sleep for two minutes to allow the producer/consumer to run and then end the test case.
Thread.sleep(TimeUnit.SECONDS.toMillis(60 * 3));
// Stops sending dummy data.
stopProducer();
// Wait a few seconds for the last few records to be processed
Thread.sleep(TimeUnit.SECONDS.toMillis(10));
// Finishes processing current batch of data already received from Kinesis before shutting down.
awaitConsumerFinish();
// Validate processed data
validateRecordProcessor();
// Clean up resources created
Thread.sleep(TimeUnit.SECONDS.toMillis(30));
deleteResources(streamExistenceManager, leaseTableManager);
} catch (Exception e) {
// Test Failed. Clean up resources and then throw exception.
log.info("----------Test Failed: Cleaning up resources------------");
Thread.sleep(TimeUnit.SECONDS.toMillis(30));
deleteResources(streamExistenceManager, leaseTableManager);
throw e;
}
}
private void cleanTestEnvironment(StreamExistenceManager streamExistenceManager, LeaseTableManager leaseTableManager) throws Exception {
log.info("----------Before starting, Cleaning test environment----------");
log.info("----------Deleting all lease tables in account----------");
leaseTableManager.deleteAllLeaseTables();
log.info("----------Finished deleting all lease tables-------------");
log.info("----------Deleting all streams in account----------");
streamExistenceManager.deleteAllStreams();
log.info("----------Finished deleting all streams-------------");
}
private void startProducer() {
// Send dummy data to stream // Send dummy data to stream
ScheduledExecutorService producerExecutor = Executors.newSingleThreadScheduledExecutor(); this.producerExecutor = Executors.newSingleThreadScheduledExecutor();
ScheduledFuture<?> producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 60, 1, TimeUnit.SECONDS); this.producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 60, 1, TimeUnit.SECONDS);
}
RecordValidatorQueue recordValidator = new RecordValidatorQueue();
private void setUpTestResources() throws Exception {
// Setup configuration of KCL (including DynamoDB and CloudWatch) // Setup configuration of KCL (including DynamoDB and CloudWatch)
DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build(); final ConfigsBuilder configsBuilder = consumerConfig.getConfigsBuilder();
CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, streamName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new TestRecordProcessorFactory(recordValidator));
retrievalConfig = consumerConfig.getRetrievalConfig(); retrievalConfig = consumerConfig.getRetrievalConfig();
checkpointConfig = configsBuilder.checkpointConfig(); checkpointConfig = configsBuilder.checkpointConfig();
@ -89,7 +136,7 @@ public class TestConsumer {
metricsConfig = configsBuilder.metricsConfig(); metricsConfig = configsBuilder.metricsConfig();
// Create Scheduler // Create Scheduler
Scheduler scheduler = new Scheduler( this.scheduler = new Scheduler(
checkpointConfig, checkpointConfig,
coordinatorConfig, coordinatorConfig,
leaseManagementConfig, leaseManagementConfig,
@ -98,66 +145,21 @@ public class TestConsumer {
processorConfig, processorConfig,
retrievalConfig retrievalConfig
); );
}
try { private void startConsumer() {
// Start record processing of dummy data // Start record processing of dummy data
Thread schedulerThread = new Thread(scheduler); final Thread schedulerThread = new Thread(scheduler);
schedulerThread.setDaemon(true); schedulerThread.setDaemon(true);
schedulerThread.start(); schedulerThread.start();
// Sleep for two minutes to allow the producer/consumer to run and then end the test case.
Thread.sleep(TimeUnit.SECONDS.toMillis(60 * 3));
// Stops sending dummy data.
log.info("Cancelling producer and shutting down executor.");
producerFuture.cancel(false);
producerExecutor.shutdown();
// Wait a few seconds for the last few records to be processed
Thread.sleep(TimeUnit.SECONDS.toMillis(10));
// Finishes processing current batch of data already received from Kinesis before shutting down.
Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
log.info("Waiting up to 20 seconds for shutdown to complete.");
try {
gracefulShutdownFuture.get(20, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.info("Interrupted while waiting for graceful shutdown. Continuing.");
} catch (ExecutionException e) {
throw new ExecutionException("Exception while executing graceful shutdown. {}", e);
} catch (TimeoutException e) {
throw new TimeoutException("Timeout while waiting for shutdown. Scheduler may not have exited. {}" + e);
}
log.info("Completed, shutting down now.");
// Validate processed data
log.info("The number of expected records is: {}", successfulPutRecords);
RecordValidationStatus errorVal = recordValidator.validateRecords(successfulPutRecords);
if (errorVal == RecordValidationStatus.OUT_OF_ORDER) {
throw new RuntimeException("There was an error validating the records that were processed. The records were out of order");
} else if (errorVal == RecordValidationStatus.MISSING_RECORD) {
throw new RuntimeException("There was an error validating the records that were processed. Some records were missing.");
}
log.info("--------------Completed validation of processed records.--------------");
// Clean up resources created
Thread.sleep(TimeUnit.SECONDS.toMillis(30));
deleteResources(streamExistenceManager, dynamoClient);
} catch (Exception e) {
// Test Failed. Clean up resources and then throw exception.
Thread.sleep(TimeUnit.SECONDS.toMillis(30));
deleteResources(streamExistenceManager, dynamoClient);
throw e;
}
} }
public void publishRecord() { public void publishRecord() {
PutRecordRequest request; final PutRecordRequest request;
try { try {
request = PutRecordRequest.builder() request = PutRecordRequest.builder()
.partitionKey(RandomStringUtils.randomAlphabetic(5, 20)) .partitionKey(RandomStringUtils.randomAlphabetic(5, 20))
.streamName(streamName) .streamName(this.streamName)
.data(SdkBytes.fromByteBuffer(wrapWithCounter(5, payloadCounter))) // 1024 is 1 KB .data(SdkBytes.fromByteBuffer(wrapWithCounter(5, payloadCounter))) // 1024 is 1 KB
.build(); .build();
kinesisClient.putRecord(request).get(); kinesisClient.putRecord(request).get();
@ -168,15 +170,13 @@ public class TestConsumer {
log.info("---------Record published, successfulPutRecords is now: {}", successfulPutRecords); log.info("---------Record published, successfulPutRecords is now: {}", successfulPutRecords);
} catch (InterruptedException e) { } catch (InterruptedException e) {
log.info("Interrupted, assuming shutdown."); log.info("Interrupted, assuming shutdown.");
} catch (ExecutionException e) { } catch (ExecutionException | RuntimeException e) {
log.error("Error during publishRecord. Will try again next cycle", e); log.error("Error during publish records");
} catch (RuntimeException e) {
log.error("Error while creating request", e);
} }
} }
private ByteBuffer wrapWithCounter(int payloadSize, BigInteger payloadCounter) throws RuntimeException { private ByteBuffer wrapWithCounter(int payloadSize, BigInteger payloadCounter) throws RuntimeException {
byte[] returnData; final byte[] returnData;
log.info("--------------Putting record with data: {}", payloadCounter); log.info("--------------Putting record with data: {}", payloadCounter);
ObjectMapper mapper = new ObjectMapper(); ObjectMapper mapper = new ObjectMapper();
try { try {
@ -188,23 +188,42 @@ public class TestConsumer {
return ByteBuffer.wrap(returnData); return ByteBuffer.wrap(returnData);
} }
private void deleteResources(StreamExistenceManager streamExistenceManager, DynamoDbAsyncClient dynamoDBClient) throws Exception { private void stopProducer() {
log.info("Cancelling producer and shutting down executor.");
producerFuture.cancel(false);
producerExecutor.shutdown();
}
private void awaitConsumerFinish() throws Exception {
Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
log.info("Waiting up to 20 seconds for shutdown to complete.");
try {
gracefulShutdownFuture.get(20, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.info("Interrupted while waiting for graceful shutdown. Continuing.");
} catch (ExecutionException | TimeoutException e) {
throw new Exception("Exception while executing graceful shutdown. {}", e);
}
log.info("Completed, shutting down now.");
}
private void validateRecordProcessor() throws Exception {
log.info("The number of expected records is: {}", successfulPutRecords);
final RecordValidationStatus errorVal = consumerConfig.getRecordValidator().validateRecords(successfulPutRecords);
if (errorVal == RecordValidationStatus.OUT_OF_ORDER) {
throw new RuntimeException("There was an error validating the records that were processed. The records were out of order");
} else if (errorVal == RecordValidationStatus.MISSING_RECORD) {
throw new RuntimeException("There was an error validating the records that were processed. Some records were missing.");
}
log.info("--------------Completed validation of processed records.--------------");
}
private void deleteResources(StreamExistenceManager streamExistenceManager, LeaseTableManager leaseTableManager) throws Exception {
log.info("-------------Start deleting stream.----------------"); log.info("-------------Start deleting stream.----------------");
streamExistenceManager.deleteStream(this.streamName); streamExistenceManager.deleteStream(this.streamName);
log.info("-------------Start deleting lease table.----------------"); log.info("-------------Start deleting lease table.----------------");
deleteLeaseTable(dynamoDBClient, consumerConfig.getStreamName()); leaseTableManager.deleteLeaseTable(this.consumerConfig.getStreamName());
log.info("-------------Finished deleting resources.----------------"); log.info("-------------Finished deleting resources.----------------");
} }
private void deleteLeaseTable(DynamoDbAsyncClient dynamoClient, String tableName) throws Exception {
DeleteTableRequest request = DeleteTableRequest.builder().tableName(tableName).build();
try {
FutureUtils.resolveOrCancelFuture(dynamoClient.deleteTable(request), Duration.ofSeconds(60));
} catch (ExecutionException e) {
throw new Exception("Could not delete lease table: {}", e);
} catch (InterruptedException e) {
throw new Exception("Deleting lease table interrupted: {}", e);
}
}
} }