Adding testing architecture and KCL 2.x basic polling/streaming tests (#1136)
* Adding testing architecture and KCL 2.x basic polling and streaming tests
This commit is contained in:
parent
f1ef0e820d
commit
53dbb4ea79
17 changed files with 1093 additions and 1 deletions
|
|
@ -34,7 +34,13 @@ Please open an issue if you have any questions.
|
|||
After you've downloaded the code from GitHub, you can build it using Maven. To disable GPG signing in the build, use
|
||||
this command: `mvn clean install -Dgpg.skip=true`. Note: This command runs Integration tests, which in turn creates AWS
|
||||
resources (which requires manual cleanup). Integration tests require valid AWS credentials need to be discovered at
|
||||
runtime. To skip running integration tests, add ` -DskipITs` option to the build command.
|
||||
runtime. To skip running integration tests, add ` -DskipITs` option to the build command.
|
||||
|
||||
## Running Integration Tests
|
||||
|
||||
To run integration tests: `mvn -Dit.test=*IntegrationTest verify`.
|
||||
This will look for a default AWS profile specified in your local `.aws/credentials`.
|
||||
Optionally, you can provide the name of an IAM user/role to run tests with as a string using this command: `mvn -Dit.test=*IntegrationTest -DawsProfile="<PROFILE_NAME>" verify`.
|
||||
|
||||
## Integration with the Kinesis Producer Library
|
||||
For producer-side developers using the **[Kinesis Producer Library (KPL)][kinesis-guide-kpl]**, the KCL integrates without additional effort. When the KCL retrieves an aggregated Amazon Kinesis record consisting of multiple KPL user records, it will automatically invoke the KPL to extract the individual user records before returning them to the user.
|
||||
|
|
|
|||
|
|
@ -207,6 +207,10 @@
|
|||
<name>sqlite4java.library.path</name>
|
||||
<value>${sqlite4java.libpath}</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>awsProfile</name>
|
||||
<value>${awsProfile}</value>
|
||||
</property>
|
||||
</systemProperties>
|
||||
</configuration>
|
||||
</plugin>
|
||||
|
|
|
|||
|
|
@ -0,0 +1,192 @@
|
|||
package software.amazon.kinesis.config;
|
||||
|
||||
import lombok.Value;
|
||||
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
|
||||
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
|
||||
import software.amazon.kinesis.utils.RecordValidatorQueue;
|
||||
import software.amazon.kinesis.utils.ReshardOptions;
|
||||
import software.amazon.kinesis.utils.TestRecordProcessorFactory;
|
||||
import lombok.Builder;
|
||||
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
|
||||
import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
|
||||
import software.amazon.awssdk.http.Protocol;
|
||||
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
|
||||
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
|
||||
import software.amazon.awssdk.regions.Region;
|
||||
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
|
||||
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClientBuilder;
|
||||
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
|
||||
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClientBuilder;
|
||||
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
|
||||
import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder;
|
||||
import software.amazon.awssdk.utils.AttributeMap;
|
||||
import software.amazon.kinesis.common.ConfigsBuilder;
|
||||
import software.amazon.kinesis.common.InitialPositionInStream;
|
||||
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
|
||||
import software.amazon.kinesis.retrieval.RetrievalConfig;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.Inet4Address;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* Default configuration for a producer or consumer used in integration tests.
|
||||
* Producer: puts records of size 60 KB at an interval of 100 ms
|
||||
* Consumer: streaming configuration (vs polling) that starts processing records at shard horizon
|
||||
*/
|
||||
public abstract class KCLAppConfig {
|
||||
|
||||
private KinesisAsyncClient kinesisAsyncClient;
|
||||
private DynamoDbAsyncClient dynamoDbAsyncClient;
|
||||
private CloudWatchAsyncClient cloudWatchAsyncClient;
|
||||
private RecordValidatorQueue recordValidator;
|
||||
|
||||
/**
|
||||
* Name used for test stream and lease tracker table
|
||||
*/
|
||||
public abstract String getStreamName();
|
||||
|
||||
public int getShardCount() { return 4; }
|
||||
|
||||
public Region getRegion() { return Region.US_WEST_2; }
|
||||
|
||||
/**
|
||||
* "default" profile, should match with profiles listed in "cat ~/.aws/config"
|
||||
*/
|
||||
private AwsCredentialsProvider getCredentialsProvider() {
|
||||
final String awsProfile = System.getProperty("awsProfile");
|
||||
return (awsProfile != null) ?
|
||||
ProfileCredentialsProvider.builder().profileName(awsProfile).build() : DefaultCredentialsProvider.create();
|
||||
}
|
||||
|
||||
public InitialPositionInStream getInitialPosition() {
|
||||
return InitialPositionInStream.TRIM_HORIZON;
|
||||
}
|
||||
|
||||
public abstract Protocol getKinesisClientProtocol();
|
||||
|
||||
public ProducerConfig getProducerConfig() {
|
||||
return ProducerConfig.builder()
|
||||
.isBatchPut(false)
|
||||
.batchSize(1)
|
||||
.recordSizeKB(60)
|
||||
.callPeriodMills(100)
|
||||
.build();
|
||||
}
|
||||
|
||||
public ReshardConfig getReshardConfig() {
|
||||
return null;
|
||||
}
|
||||
|
||||
public final KinesisAsyncClient buildAsyncKinesisClient() throws URISyntaxException, IOException {
|
||||
|
||||
if (kinesisAsyncClient == null) {
|
||||
// Setup H2 client config.
|
||||
final NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder()
|
||||
.maxConcurrency(Integer.MAX_VALUE);
|
||||
|
||||
builder.protocol(getKinesisClientProtocol());
|
||||
|
||||
final SdkAsyncHttpClient sdkAsyncHttpClient =
|
||||
builder.buildWithDefaults(AttributeMap.builder().build());
|
||||
|
||||
// Setup client builder by default values
|
||||
final KinesisAsyncClientBuilder kinesisAsyncClientBuilder = KinesisAsyncClient.builder().region(getRegion());
|
||||
|
||||
kinesisAsyncClientBuilder.httpClient(sdkAsyncHttpClient);
|
||||
|
||||
kinesisAsyncClientBuilder.credentialsProvider(getCredentialsProvider());
|
||||
|
||||
this.kinesisAsyncClient = kinesisAsyncClientBuilder.build();
|
||||
}
|
||||
|
||||
return this.kinesisAsyncClient;
|
||||
}
|
||||
|
||||
public final DynamoDbAsyncClient buildAsyncDynamoDbClient() throws IOException {
|
||||
if (this.dynamoDbAsyncClient == null) {
|
||||
final DynamoDbAsyncClientBuilder builder = DynamoDbAsyncClient.builder().region(getRegion());
|
||||
builder.credentialsProvider(getCredentialsProvider());
|
||||
this.dynamoDbAsyncClient = builder.build();
|
||||
}
|
||||
return this.dynamoDbAsyncClient;
|
||||
}
|
||||
|
||||
public final CloudWatchAsyncClient buildAsyncCloudWatchClient() throws IOException {
|
||||
if (this.cloudWatchAsyncClient == null) {
|
||||
final CloudWatchAsyncClientBuilder builder = CloudWatchAsyncClient.builder().region(getRegion());
|
||||
builder.credentialsProvider(getCredentialsProvider());
|
||||
this.cloudWatchAsyncClient = builder.build();
|
||||
}
|
||||
return this.cloudWatchAsyncClient;
|
||||
}
|
||||
|
||||
public final String getWorkerId() throws UnknownHostException {
|
||||
return Inet4Address.getLocalHost().getHostName();
|
||||
}
|
||||
|
||||
public final RecordValidatorQueue getRecordValidator() {
|
||||
if (recordValidator == null) {
|
||||
this.recordValidator = new RecordValidatorQueue();
|
||||
}
|
||||
return this.recordValidator;
|
||||
}
|
||||
|
||||
public ShardRecordProcessorFactory getShardRecordProcessorFactory() {
|
||||
return new TestRecordProcessorFactory(getRecordValidator());
|
||||
}
|
||||
|
||||
public final ConfigsBuilder getConfigsBuilder() throws IOException, URISyntaxException {
|
||||
final String workerId = getWorkerId();
|
||||
return new ConfigsBuilder(getStreamName(), getStreamName(), buildAsyncKinesisClient(), buildAsyncDynamoDbClient(),
|
||||
buildAsyncCloudWatchClient(), workerId, getShardRecordProcessorFactory());
|
||||
}
|
||||
|
||||
public RetrievalConfig getRetrievalConfig() throws IOException, URISyntaxException {
|
||||
final InitialPositionInStreamExtended initialPosition = InitialPositionInStreamExtended
|
||||
.newInitialPosition(getInitialPosition());
|
||||
|
||||
// Default is a streaming consumer
|
||||
final RetrievalConfig config = getConfigsBuilder().retrievalConfig();
|
||||
config.initialPositionInStreamExtended(initialPosition);
|
||||
return config;
|
||||
}
|
||||
|
||||
/**
|
||||
* Configure ingress load (batch size, record size, and calling interval)
|
||||
*/
|
||||
@Value
|
||||
@Builder
|
||||
static class ProducerConfig {
|
||||
private boolean isBatchPut;
|
||||
private int batchSize;
|
||||
private int recordSizeKB;
|
||||
private long callPeriodMills;
|
||||
}
|
||||
|
||||
/**
|
||||
* Description of the method of resharding for a test case
|
||||
*/
|
||||
@Value
|
||||
@Builder
|
||||
static class ReshardConfig {
|
||||
/**
|
||||
* reshardingFactorCycle: lists the order or reshards that will be done during one reshard cycle
|
||||
* e.g {SPLIT, MERGE} means that the number of shards will first be doubled, then halved
|
||||
*/
|
||||
private ReshardOptions[] reshardingFactorCycle;
|
||||
|
||||
/**
|
||||
* numReshardCycles: the number of resharding cycles that will be executed in a test
|
||||
*/
|
||||
private int numReshardCycles;
|
||||
|
||||
/**
|
||||
* reshardFrequencyMillis: the period of time between reshard cycles (in milliseconds)
|
||||
*/
|
||||
private long reshardFrequencyMillis;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,41 @@
|
|||
package software.amazon.kinesis.config;
|
||||
|
||||
import software.amazon.awssdk.http.Protocol;
|
||||
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
|
||||
import software.amazon.kinesis.retrieval.RetrievalConfig;
|
||||
import software.amazon.kinesis.retrieval.polling.PollingConfig;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.UUID;
|
||||
|
||||
/**
|
||||
* Config for a polling consumer with HTTP protocol of HTTP1
|
||||
*/
|
||||
public class ReleaseCanaryPollingH1TestConfig extends KCLAppConfig {
|
||||
|
||||
private final UUID uniqueId = UUID.randomUUID();
|
||||
|
||||
@Override
|
||||
public String getStreamName() {
|
||||
return "KCLReleaseCanary2XPollingH1TestStream_" + uniqueId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Protocol getKinesisClientProtocol() {
|
||||
return Protocol.HTTP1_1;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RetrievalConfig getRetrievalConfig() throws IOException, URISyntaxException {
|
||||
|
||||
final InitialPositionInStreamExtended initialPosition = InitialPositionInStreamExtended
|
||||
.newInitialPosition(getInitialPosition());
|
||||
|
||||
final RetrievalConfig config = getConfigsBuilder().retrievalConfig();
|
||||
config.initialPositionInStreamExtended(initialPosition);
|
||||
config.retrievalSpecificConfig(new PollingConfig(getStreamName(), config.kinesisClient()));
|
||||
|
||||
return config;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,41 @@
|
|||
package software.amazon.kinesis.config;
|
||||
|
||||
import software.amazon.awssdk.http.Protocol;
|
||||
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
|
||||
import software.amazon.kinesis.retrieval.RetrievalConfig;
|
||||
import software.amazon.kinesis.retrieval.polling.PollingConfig;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.UUID;
|
||||
|
||||
/**
|
||||
* Config for a polling consumer with HTTP protocol of HTTP2
|
||||
*/
|
||||
public class ReleaseCanaryPollingH2TestConfig extends KCLAppConfig {
|
||||
private final UUID uniqueId = UUID.randomUUID();
|
||||
|
||||
@Override
|
||||
public String getStreamName() {
|
||||
return "KCLReleaseCanary2XPollingH2TestStream_" + uniqueId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Protocol getKinesisClientProtocol() {
|
||||
return Protocol.HTTP2;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RetrievalConfig getRetrievalConfig() throws IOException, URISyntaxException {
|
||||
|
||||
final InitialPositionInStreamExtended initialPosition = InitialPositionInStreamExtended
|
||||
.newInitialPosition(getInitialPosition());
|
||||
|
||||
final RetrievalConfig config = getConfigsBuilder().retrievalConfig();
|
||||
config.initialPositionInStreamExtended(initialPosition);
|
||||
config.retrievalSpecificConfig(new PollingConfig(getStreamName(), config.kinesisClient()));
|
||||
|
||||
return config;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -0,0 +1,24 @@
|
|||
package software.amazon.kinesis.config;
|
||||
|
||||
import software.amazon.awssdk.http.Protocol;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
/**
|
||||
* Config for a streaming consumer with HTTP protocol of HTTP2
|
||||
*/
|
||||
public class ReleaseCanaryStreamingTestConfig extends KCLAppConfig {
|
||||
private final UUID uniqueId = UUID.randomUUID();
|
||||
|
||||
@Override
|
||||
public String getStreamName() {
|
||||
return "KCLReleaseCanary2XStreamingTestStream_" + uniqueId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Protocol getKinesisClientProtocol() {
|
||||
return Protocol.HTTP2;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
|
@ -0,0 +1,44 @@
|
|||
package software.amazon.kinesis.lifecycle;
|
||||
|
||||
import org.junit.Test;
|
||||
import software.amazon.kinesis.config.KCLAppConfig;
|
||||
import software.amazon.kinesis.config.ReleaseCanaryPollingH2TestConfig;
|
||||
import software.amazon.kinesis.config.ReleaseCanaryPollingH1TestConfig;
|
||||
import software.amazon.kinesis.config.ReleaseCanaryStreamingTestConfig;
|
||||
import software.amazon.kinesis.utils.TestConsumer;
|
||||
|
||||
public class BasicStreamConsumerIntegrationTest {
|
||||
|
||||
/**
|
||||
* Test with a polling consumer using HTTP2 protocol.
|
||||
* In the polling case, consumer makes calls to the producer each time to request records to process.
|
||||
*/
|
||||
@Test
|
||||
public void kclReleaseCanaryPollingH2Test() throws Exception {
|
||||
KCLAppConfig consumerConfig = new ReleaseCanaryPollingH2TestConfig();
|
||||
TestConsumer consumer = new TestConsumer(consumerConfig);
|
||||
consumer.run();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test with a polling consumer using HTTP1 protocol.
|
||||
* In the polling case, consumer makes calls to the producer each time to request records to process.
|
||||
*/
|
||||
@Test
|
||||
public void kclReleaseCanaryPollingH1Test() throws Exception {
|
||||
KCLAppConfig consumerConfig = new ReleaseCanaryPollingH1TestConfig();
|
||||
TestConsumer consumer = new TestConsumer(consumerConfig);
|
||||
consumer.run();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test with a streaming consumer.
|
||||
* In the streaming configuration, connection is made once between consumer and producer and producer continuously sends data to be processed.
|
||||
*/
|
||||
@Test
|
||||
public void kclReleaseCanaryStreamingTest() throws Exception {
|
||||
KCLAppConfig consumerConfig = new ReleaseCanaryStreamingTestConfig();
|
||||
TestConsumer consumer = new TestConsumer(consumerConfig);
|
||||
consumer.run();
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,77 @@
|
|||
package software.amazon.kinesis.utils;
|
||||
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import software.amazon.awssdk.services.dynamodb.model.ListTablesRequest;
|
||||
import software.amazon.awssdk.services.dynamodb.model.ListTablesResponse;
|
||||
import software.amazon.kinesis.common.FutureUtils;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Slf4j
|
||||
@NoArgsConstructor
|
||||
public abstract class AWSResourceManager {
|
||||
|
||||
/**
|
||||
* Make delete resource API call for specific resource type
|
||||
*/
|
||||
public abstract void deleteResourceCall(String resourceName) throws Exception;
|
||||
|
||||
/**
|
||||
* Check if resource with given name is in active state
|
||||
*/
|
||||
public abstract boolean isResourceActive(String name);
|
||||
|
||||
/**
|
||||
* Get a list of all the names of resources of a specified type
|
||||
* @return
|
||||
* @throws Exception
|
||||
*/
|
||||
public abstract List<String> getAllResourceNames() throws Exception;
|
||||
|
||||
/**
|
||||
* Delete resource with specified resource name
|
||||
*/
|
||||
public void deleteResource(String resourceName) throws Exception {
|
||||
|
||||
try {
|
||||
deleteResourceCall(resourceName);
|
||||
} catch (Exception e) {
|
||||
throw new Exception("Could not delete resource: {}", e);
|
||||
}
|
||||
|
||||
// Wait till resource is deleted to return
|
||||
int i = 0;
|
||||
while (true) {
|
||||
i++;
|
||||
if (i > 100) {
|
||||
throw new RuntimeException("Failed resource deletion");
|
||||
}
|
||||
try {
|
||||
if (!isResourceActive(resourceName)) {
|
||||
log.info("Successfully deleted the resource {}", resourceName);
|
||||
return;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
try {
|
||||
Thread.sleep(TimeUnit.SECONDS.toMillis(10));
|
||||
} catch (InterruptedException e1) {}
|
||||
log.info("Resource {} is not deleted yet, exception: ", resourceName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete all instances of a particular resource type
|
||||
*/
|
||||
public void deleteAllResource() throws Exception {
|
||||
final List<String> resourceNames = getAllResourceNames();
|
||||
for (String resourceName : resourceNames) {
|
||||
deleteResource(resourceName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,68 @@
|
|||
package software.amazon.kinesis.utils;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
|
||||
import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
|
||||
import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
|
||||
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
|
||||
import software.amazon.awssdk.services.dynamodb.model.ListTablesRequest;
|
||||
import software.amazon.awssdk.services.dynamodb.model.ListTablesResponse;
|
||||
import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
|
||||
import software.amazon.awssdk.services.dynamodb.model.TableStatus;
|
||||
import software.amazon.kinesis.common.FutureUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Slf4j
|
||||
@AllArgsConstructor
|
||||
public class LeaseTableManager extends AWSResourceManager {
|
||||
|
||||
private final DynamoDbAsyncClient dynamoClient;
|
||||
|
||||
public boolean isResourceActive(String tableName) {
|
||||
final DescribeTableRequest request = DescribeTableRequest.builder().tableName(tableName).build();
|
||||
final CompletableFuture<DescribeTableResponse> describeTableResponseCompletableFuture = dynamoClient.describeTable(request);
|
||||
|
||||
try {
|
||||
final DescribeTableResponse response = describeTableResponseCompletableFuture.get(30, TimeUnit.SECONDS);
|
||||
boolean isActive = response.table().tableStatus().equals(TableStatus.ACTIVE);
|
||||
if (!isActive) {
|
||||
throw new RuntimeException("Table is not active, instead in status: " + response.table().tableStatus());
|
||||
}
|
||||
return true;
|
||||
} catch (ExecutionException e) {
|
||||
if (e.getCause() instanceof ResourceNotFoundException) {
|
||||
return false;
|
||||
} else {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public void deleteResourceCall(String tableName) throws Exception {
|
||||
final DeleteTableRequest request = DeleteTableRequest.builder().tableName(tableName).build();
|
||||
FutureUtils.resolveOrCancelFuture(dynamoClient.deleteTable(request), Duration.ofSeconds(60));
|
||||
}
|
||||
|
||||
public List<String> getAllResourceNames() throws Exception {
|
||||
ListTablesRequest listTableRequest = ListTablesRequest.builder().build();
|
||||
List<String> allTableNames = new ArrayList<>();
|
||||
ListTablesResponse result = null;
|
||||
do {
|
||||
result = FutureUtils.resolveOrCancelFuture(dynamoClient.listTables(listTableRequest), Duration.ofSeconds(60));
|
||||
allTableNames.addAll(result.tableNames());
|
||||
listTableRequest = ListTablesRequest.builder().exclusiveStartTableName(result.lastEvaluatedTableName()).build();
|
||||
} while (result.lastEvaluatedTableName() != null);
|
||||
return allTableNames;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,10 @@
|
|||
package software.amazon.kinesis.utils;
|
||||
|
||||
/**
|
||||
* Possible outcomes for record validation in RecordValidatorQueue
|
||||
*/
|
||||
public enum RecordValidationStatus {
|
||||
OUT_OF_ORDER,
|
||||
MISSING_RECORD,
|
||||
NO_ERROR
|
||||
}
|
||||
|
|
@ -0,0 +1,63 @@
|
|||
package software.amazon.kinesis.utils;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
* Class that maintains a dictionary that maps shard IDs to a list of records
|
||||
* that are processed by that shard.
|
||||
* Validation ensures that
|
||||
* 1. The records processed by each shard are in increasing order (duplicates allowed)
|
||||
* 2. The total number of unique records processed is equal to the number of records put on the stream
|
||||
*/
|
||||
@Slf4j
|
||||
public class RecordValidatorQueue {
|
||||
|
||||
private final ConcurrentHashMap<String, List<String>> dict = new ConcurrentHashMap<>();
|
||||
|
||||
public void add(String shardId, String data) {
|
||||
final List<String> values = dict.computeIfAbsent(shardId, key -> new ArrayList<>());
|
||||
values.add(data);
|
||||
}
|
||||
|
||||
public RecordValidationStatus validateRecords(int expectedRecordCount) {
|
||||
|
||||
// Validate that each List in the HashMap has data records in increasing order
|
||||
for (Map.Entry<String, List<String>> entry : dict.entrySet()) {
|
||||
List<String> recordsPerShard = entry.getValue();
|
||||
int prevVal = -1;
|
||||
for (String record : recordsPerShard) {
|
||||
int nextVal = Integer.parseInt(record);
|
||||
if (prevVal > nextVal) {
|
||||
log.error("The records are not in increasing order. Saw record data {} before {}.", prevVal, nextVal);
|
||||
return RecordValidationStatus.OUT_OF_ORDER;
|
||||
}
|
||||
prevVal = nextVal;
|
||||
}
|
||||
}
|
||||
|
||||
// Validate that no records are missing over all shards
|
||||
int actualRecordCount = 0;
|
||||
for (Map.Entry<String, List<String>> entry : dict.entrySet()) {
|
||||
List<String> recordsPerShard = entry.getValue();
|
||||
Set<String> noDupRecords = new HashSet<String>(recordsPerShard);
|
||||
actualRecordCount += noDupRecords.size();
|
||||
}
|
||||
|
||||
// If this is true, then there was some record that was missed during processing.
|
||||
if (actualRecordCount != expectedRecordCount) {
|
||||
log.error("Failed to get correct number of records processed. Should be {} but was {}", expectedRecordCount, actualRecordCount);
|
||||
return RecordValidationStatus.MISSING_RECORD;
|
||||
}
|
||||
|
||||
// Record validation succeeded.
|
||||
return RecordValidationStatus.NO_ERROR;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,44 @@
|
|||
package software.amazon.kinesis.utils;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public class RecordValidatorQueueTest {
|
||||
|
||||
private final RecordValidatorQueue recordValidator = new RecordValidatorQueue();
|
||||
|
||||
private static final String SHARD_ID = "ABC";
|
||||
|
||||
@Test
|
||||
public void testValidationFailedRecordOutOfOrder() {
|
||||
recordValidator.add(SHARD_ID, "0");
|
||||
recordValidator.add(SHARD_ID, "1");
|
||||
recordValidator.add(SHARD_ID, "3");
|
||||
recordValidator.add(SHARD_ID, "2");
|
||||
|
||||
RecordValidationStatus error = recordValidator.validateRecords(4);
|
||||
Assert.assertEquals(RecordValidationStatus.OUT_OF_ORDER, error);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidationFailedMissingRecord() {
|
||||
recordValidator.add(SHARD_ID, "0");
|
||||
recordValidator.add(SHARD_ID, "1");
|
||||
recordValidator.add(SHARD_ID, "2");
|
||||
recordValidator.add(SHARD_ID, "3");
|
||||
|
||||
RecordValidationStatus error = recordValidator.validateRecords(5);
|
||||
Assert.assertEquals(RecordValidationStatus.MISSING_RECORD, error);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidRecords() {
|
||||
recordValidator.add(SHARD_ID, "0");
|
||||
recordValidator.add(SHARD_ID, "1");
|
||||
recordValidator.add(SHARD_ID, "2");
|
||||
recordValidator.add(SHARD_ID, "3");
|
||||
|
||||
RecordValidationStatus error = recordValidator.validateRecords(4);
|
||||
Assert.assertEquals(RecordValidationStatus.NO_ERROR, error);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,11 @@
|
|||
package software.amazon.kinesis.utils;
|
||||
|
||||
/**
|
||||
* Specifies the types of resharding possible in integration tests
|
||||
* Split doubles the number of shards.
|
||||
* Merge halves the number of shards.
|
||||
*/
|
||||
public enum ReshardOptions {
|
||||
SPLIT,
|
||||
MERGE
|
||||
}
|
||||
|
|
@ -0,0 +1,117 @@
|
|||
package software.amazon.kinesis.utils;
|
||||
|
||||
import lombok.Value;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import software.amazon.awssdk.services.dynamodb.model.ListTablesRequest;
|
||||
import software.amazon.awssdk.services.dynamodb.model.ListTablesResponse;
|
||||
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
|
||||
import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
|
||||
import software.amazon.awssdk.services.kinesis.model.DeleteStreamRequest;
|
||||
import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryRequest;
|
||||
import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryResponse;
|
||||
import software.amazon.awssdk.services.kinesis.model.ListStreamsRequest;
|
||||
import software.amazon.awssdk.services.kinesis.model.ListStreamsResponse;
|
||||
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
|
||||
import software.amazon.awssdk.services.kinesis.model.StreamStatus;
|
||||
import software.amazon.kinesis.common.FutureUtils;
|
||||
import software.amazon.kinesis.config.KCLAppConfig;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Value
|
||||
@Slf4j
|
||||
public class StreamExistenceManager extends AWSResourceManager {
|
||||
private final KinesisAsyncClient client;
|
||||
private final KCLAppConfig testConfig;
|
||||
|
||||
public StreamExistenceManager(KCLAppConfig config) throws URISyntaxException, IOException {
|
||||
this.testConfig = config;
|
||||
this.client = config.buildAsyncKinesisClient();
|
||||
}
|
||||
|
||||
public boolean isResourceActive(String streamName) {
|
||||
final DescribeStreamSummaryRequest request = DescribeStreamSummaryRequest.builder().streamName(streamName).build();
|
||||
final CompletableFuture<DescribeStreamSummaryResponse> describeStreamSummaryResponseCompletableFuture = client.describeStreamSummary(request);
|
||||
|
||||
try {
|
||||
final DescribeStreamSummaryResponse response = describeStreamSummaryResponseCompletableFuture.get(30, TimeUnit.SECONDS);
|
||||
boolean isActive = response.streamDescriptionSummary().streamStatus().equals(StreamStatus.ACTIVE);
|
||||
if (!isActive) {
|
||||
throw new RuntimeException("Stream is not active, instead in status: " + response.streamDescriptionSummary().streamStatus());
|
||||
}
|
||||
return true;
|
||||
} catch (ExecutionException e) {
|
||||
if (e.getCause() instanceof ResourceNotFoundException) {
|
||||
return false;
|
||||
} else {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public void deleteResourceCall(String streamName) throws Exception {
|
||||
final DeleteStreamRequest request = DeleteStreamRequest.builder().streamName(streamName).enforceConsumerDeletion(true).build();
|
||||
client.deleteStream(request).get(30, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
public List<String> getAllResourceNames() throws Exception {
|
||||
ListStreamsRequest listStreamRequest = ListStreamsRequest.builder().build();
|
||||
List<String> allStreamNames = new ArrayList<>();
|
||||
ListStreamsResponse result = null;
|
||||
do {
|
||||
result = FutureUtils.resolveOrCancelFuture(client.listStreams(listStreamRequest), Duration.ofSeconds(60));
|
||||
allStreamNames.addAll(result.streamNames());
|
||||
listStreamRequest = ListStreamsRequest.builder().exclusiveStartStreamName(result.nextToken()).build();
|
||||
} while (result.hasMoreStreams());
|
||||
return allStreamNames;
|
||||
}
|
||||
|
||||
public void checkStreamAndCreateIfNecessary(String streamName) {
|
||||
|
||||
if (!isResourceActive(streamName)) {
|
||||
createStream(streamName, testConfig.getShardCount());
|
||||
}
|
||||
log.info("Using stream {} with region {}", streamName, testConfig.getRegion());
|
||||
}
|
||||
|
||||
private void createStream(String streamName, int shardCount) {
|
||||
final CreateStreamRequest request = CreateStreamRequest.builder().streamName(streamName).shardCount(shardCount).build();
|
||||
try {
|
||||
client.createStream(request).get(30, TimeUnit.SECONDS);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Failed to create stream with name " + streamName, e);
|
||||
}
|
||||
|
||||
int i = 0;
|
||||
while (true) {
|
||||
i++;
|
||||
if (i > 100) {
|
||||
throw new RuntimeException("Failed stream creation, did not transition into active");
|
||||
}
|
||||
try {
|
||||
boolean isActive = isResourceActive(streamName);
|
||||
if (isActive) {
|
||||
log.info("Succesfully created the stream {}", streamName);
|
||||
return;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
try {
|
||||
Thread.sleep(TimeUnit.SECONDS.toMillis(10));
|
||||
} catch (InterruptedException e1) {
|
||||
log.error("Failed to sleep");
|
||||
}
|
||||
log.info("Stream {} is not active yet, exception: ", streamName, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,223 @@
|
|||
package software.amazon.kinesis.utils;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.RandomStringUtils;
|
||||
import software.amazon.awssdk.core.SdkBytes;
|
||||
import software.amazon.awssdk.regions.Region;
|
||||
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
|
||||
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
|
||||
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
|
||||
import software.amazon.kinesis.checkpoint.CheckpointConfig;
|
||||
import software.amazon.kinesis.common.ConfigsBuilder;
|
||||
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
|
||||
import software.amazon.kinesis.config.KCLAppConfig;
|
||||
import software.amazon.kinesis.coordinator.CoordinatorConfig;
|
||||
import software.amazon.kinesis.coordinator.Scheduler;
|
||||
import software.amazon.kinesis.leases.LeaseManagementConfig;
|
||||
import software.amazon.kinesis.lifecycle.LifecycleConfig;
|
||||
import software.amazon.kinesis.metrics.MetricsConfig;
|
||||
import software.amazon.kinesis.processor.ProcessorConfig;
|
||||
import software.amazon.kinesis.retrieval.RetrievalConfig;
|
||||
|
||||
import java.math.BigInteger;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
@Slf4j
|
||||
public class TestConsumer {
|
||||
public final KCLAppConfig consumerConfig;
|
||||
public final Region region;
|
||||
public final String streamName;
|
||||
public final KinesisAsyncClient kinesisClient;
|
||||
private MetricsConfig metricsConfig;
|
||||
private RetrievalConfig retrievalConfig;
|
||||
private CheckpointConfig checkpointConfig;
|
||||
private CoordinatorConfig coordinatorConfig;
|
||||
private LeaseManagementConfig leaseManagementConfig;
|
||||
private LifecycleConfig lifecycleConfig;
|
||||
private ProcessorConfig processorConfig;
|
||||
private Scheduler scheduler;
|
||||
private ScheduledExecutorService producerExecutor;
|
||||
private ScheduledFuture<?> producerFuture;
|
||||
private ScheduledExecutorService consumerExecutor;
|
||||
private ScheduledFuture<?> consumerFuture;
|
||||
private DynamoDbAsyncClient dynamoClient;
|
||||
private final ObjectMapper mapper = new ObjectMapper();
|
||||
public int successfulPutRecords = 0;
|
||||
public BigInteger payloadCounter = new BigInteger("0");
|
||||
|
||||
public TestConsumer(KCLAppConfig consumerConfig) throws Exception {
|
||||
this.consumerConfig = consumerConfig;
|
||||
this.region = consumerConfig.getRegion();
|
||||
this.streamName = consumerConfig.getStreamName();
|
||||
this.kinesisClient = consumerConfig.buildAsyncKinesisClient();
|
||||
this.dynamoClient = consumerConfig.buildAsyncDynamoDbClient();
|
||||
}
|
||||
|
||||
public void run() throws Exception {
|
||||
|
||||
final StreamExistenceManager streamExistenceManager = new StreamExistenceManager(this.consumerConfig);
|
||||
final LeaseTableManager leaseTableManager = new LeaseTableManager(this.dynamoClient);
|
||||
|
||||
// Clean up any old streams or lease tables left in test environment
|
||||
cleanTestResources(streamExistenceManager, leaseTableManager);
|
||||
|
||||
// Check if stream is created. If not, create it
|
||||
streamExistenceManager.checkStreamAndCreateIfNecessary(this.streamName);
|
||||
|
||||
startProducer();
|
||||
setUpConsumerResources();
|
||||
|
||||
try {
|
||||
startConsumer();
|
||||
|
||||
// Sleep for three minutes to allow the producer/consumer to run and then end the test case.
|
||||
Thread.sleep(TimeUnit.SECONDS.toMillis(60 * 3));
|
||||
|
||||
// Stops sending dummy data.
|
||||
stopProducer();
|
||||
|
||||
// Wait a few seconds for the last few records to be processed
|
||||
Thread.sleep(TimeUnit.SECONDS.toMillis(10));
|
||||
|
||||
// Finishes processing current batch of data already received from Kinesis before shutting down.
|
||||
awaitConsumerFinish();
|
||||
|
||||
// Validate processed data
|
||||
validateRecordProcessor();
|
||||
|
||||
} catch (Exception e) {
|
||||
// Test Failed. Clean up resources and then throw exception.
|
||||
log.info("----------Test Failed: Cleaning up resources------------");
|
||||
throw e;
|
||||
} finally {
|
||||
// Clean up resources created
|
||||
deleteResources(streamExistenceManager, leaseTableManager);
|
||||
}
|
||||
}
|
||||
|
||||
private void cleanTestResources(StreamExistenceManager streamExistenceManager, LeaseTableManager leaseTableManager) throws Exception {
|
||||
log.info("----------Before starting, Cleaning test environment----------");
|
||||
log.info("----------Deleting all lease tables in account----------");
|
||||
leaseTableManager.deleteAllResource();
|
||||
log.info("----------Finished deleting all lease tables-------------");
|
||||
|
||||
log.info("----------Deleting all streams in account----------");
|
||||
streamExistenceManager.deleteAllResource();
|
||||
log.info("----------Finished deleting all streams-------------");
|
||||
}
|
||||
|
||||
private void startProducer() {
|
||||
// Send dummy data to stream
|
||||
this.producerExecutor = Executors.newSingleThreadScheduledExecutor();
|
||||
this.producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 60, 1, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
private void setUpConsumerResources() throws Exception {
|
||||
// Setup configuration of KCL (including DynamoDB and CloudWatch)
|
||||
final ConfigsBuilder configsBuilder = consumerConfig.getConfigsBuilder();
|
||||
|
||||
retrievalConfig = consumerConfig.getRetrievalConfig();
|
||||
checkpointConfig = configsBuilder.checkpointConfig();
|
||||
coordinatorConfig = configsBuilder.coordinatorConfig();
|
||||
leaseManagementConfig = configsBuilder.leaseManagementConfig()
|
||||
.initialPositionInStream(InitialPositionInStreamExtended.newInitialPosition(consumerConfig.getInitialPosition()))
|
||||
.initialLeaseTableReadCapacity(50).initialLeaseTableWriteCapacity(50);
|
||||
lifecycleConfig = configsBuilder.lifecycleConfig();
|
||||
processorConfig = configsBuilder.processorConfig();
|
||||
metricsConfig = configsBuilder.metricsConfig();
|
||||
|
||||
// Create Scheduler
|
||||
this.scheduler = new Scheduler(
|
||||
checkpointConfig,
|
||||
coordinatorConfig,
|
||||
leaseManagementConfig,
|
||||
lifecycleConfig,
|
||||
metricsConfig,
|
||||
processorConfig,
|
||||
retrievalConfig
|
||||
);
|
||||
}
|
||||
|
||||
private void startConsumer() {
|
||||
// Start record processing of dummy data
|
||||
this.consumerExecutor = Executors.newSingleThreadScheduledExecutor();
|
||||
this.consumerFuture = consumerExecutor.schedule(scheduler, 0, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
public void publishRecord() {
|
||||
final PutRecordRequest request;
|
||||
try {
|
||||
request = PutRecordRequest.builder()
|
||||
.partitionKey(RandomStringUtils.randomAlphabetic(5, 20))
|
||||
.streamName(this.streamName)
|
||||
.data(SdkBytes.fromByteBuffer(wrapWithCounter(5, payloadCounter))) // 1024 is 1 KB
|
||||
.build();
|
||||
kinesisClient.putRecord(request).get();
|
||||
|
||||
// Increment the payload counter if the putRecord call was successful
|
||||
payloadCounter = payloadCounter.add(new BigInteger("1"));
|
||||
successfulPutRecords += 1;
|
||||
log.info("---------Record published, successfulPutRecords is now: {}", successfulPutRecords);
|
||||
} catch (InterruptedException e) {
|
||||
log.info("Interrupted, assuming shutdown. ", e);
|
||||
} catch (ExecutionException | RuntimeException e) {
|
||||
log.error("Error during publish records", e);
|
||||
}
|
||||
}
|
||||
|
||||
private ByteBuffer wrapWithCounter(int payloadSize, BigInteger payloadCounter) throws RuntimeException {
|
||||
final byte[] returnData;
|
||||
log.info("--------------Putting record with data: {}", payloadCounter);
|
||||
try {
|
||||
returnData = mapper.writeValueAsBytes(payloadCounter);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Error converting object to bytes: ", e);
|
||||
}
|
||||
return ByteBuffer.wrap(returnData);
|
||||
}
|
||||
|
||||
private void stopProducer() {
|
||||
log.info("Cancelling producer and shutting down executor.");
|
||||
producerFuture.cancel(false);
|
||||
producerExecutor.shutdown();
|
||||
}
|
||||
|
||||
private void awaitConsumerFinish() throws Exception {
|
||||
Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
|
||||
log.info("Waiting up to 20 seconds for shutdown to complete.");
|
||||
try {
|
||||
gracefulShutdownFuture.get(20, TimeUnit.SECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
log.info("Interrupted while waiting for graceful shutdown. Continuing.");
|
||||
} catch (ExecutionException | TimeoutException e) {
|
||||
throw e;
|
||||
}
|
||||
log.info("Completed, shutting down now.");
|
||||
}
|
||||
|
||||
private void validateRecordProcessor() throws Exception {
|
||||
log.info("The number of expected records is: {}", successfulPutRecords);
|
||||
final RecordValidationStatus errorVal = consumerConfig.getRecordValidator().validateRecords(successfulPutRecords);
|
||||
if (errorVal != RecordValidationStatus.NO_ERROR) {
|
||||
throw new RuntimeException("There was an error validating the records that were processed: " + errorVal.toString());
|
||||
}
|
||||
log.info("--------------Completed validation of processed records.--------------");
|
||||
}
|
||||
|
||||
private void deleteResources(StreamExistenceManager streamExistenceManager, LeaseTableManager leaseTableManager) throws Exception {
|
||||
log.info("-------------Start deleting stream.----------------");
|
||||
streamExistenceManager.deleteResource(this.streamName);
|
||||
log.info("-------------Start deleting lease table.----------------");
|
||||
leaseTableManager.deleteResource(this.consumerConfig.getStreamName());
|
||||
log.info("-------------Finished deleting resources.----------------");
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,108 @@
|
|||
package software.amazon.kinesis.utils;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.slf4j.MDC;
|
||||
import software.amazon.kinesis.exceptions.InvalidStateException;
|
||||
import software.amazon.kinesis.exceptions.ShutdownException;
|
||||
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
|
||||
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
|
||||
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
|
||||
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
|
||||
import software.amazon.kinesis.processor.ShardRecordProcessor;
|
||||
import software.amazon.kinesis.lifecycle.events.InitializationInput;
|
||||
import software.amazon.kinesis.retrieval.KinesisClientRecord;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
/**
|
||||
* Implement initialization and deletion of shards and shard record processing
|
||||
*/
|
||||
@Slf4j
|
||||
public class TestRecordProcessor implements ShardRecordProcessor {
|
||||
|
||||
private static final String SHARD_ID_MDC_KEY = "ShardId";
|
||||
|
||||
private String shardId;
|
||||
|
||||
private final RecordValidatorQueue recordValidator;
|
||||
|
||||
public TestRecordProcessor(RecordValidatorQueue recordValidator) {
|
||||
this.recordValidator = recordValidator;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void initialize(InitializationInput initializationInput) {
|
||||
shardId = initializationInput.shardId();
|
||||
MDC.put(SHARD_ID_MDC_KEY, shardId);
|
||||
try {
|
||||
log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
|
||||
} finally {
|
||||
MDC.remove(SHARD_ID_MDC_KEY);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void processRecords(ProcessRecordsInput processRecordsInput) {
|
||||
MDC.put(SHARD_ID_MDC_KEY, shardId);
|
||||
try {
|
||||
log.info("Processing {} record(s)", processRecordsInput.records().size());
|
||||
|
||||
for (KinesisClientRecord kinesisRecord : processRecordsInput.records()) {
|
||||
final String data = new String(asByteArray(kinesisRecord.data()));
|
||||
log.info("Processing record pk: {}", data);
|
||||
recordValidator.add(shardId, data);
|
||||
}
|
||||
|
||||
} catch (Throwable t) {
|
||||
log.error("Caught throwable while processing records. Aborting.", t);
|
||||
Runtime.getRuntime().halt(1);
|
||||
} finally {
|
||||
MDC.remove(SHARD_ID_MDC_KEY);
|
||||
}
|
||||
}
|
||||
|
||||
public static byte[] asByteArray(ByteBuffer buf) {
|
||||
byte[] bytes = new byte[buf.remaining()];
|
||||
buf.get(bytes);
|
||||
return bytes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void leaseLost(LeaseLostInput leaseLostInput) {
|
||||
MDC.put(SHARD_ID_MDC_KEY, shardId);
|
||||
try {
|
||||
log.info("Lost lease, so terminating.");
|
||||
} finally {
|
||||
MDC.remove(SHARD_ID_MDC_KEY);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shardEnded(ShardEndedInput shardEndedInput) {
|
||||
MDC.put(SHARD_ID_MDC_KEY, shardId);
|
||||
try {
|
||||
log.info("Reached shard end checkpointing.");
|
||||
shardEndedInput.checkpointer().checkpoint();
|
||||
} catch (ShutdownException | InvalidStateException e) {
|
||||
log.error("Exception while checkpointing at shard end. Giving up.", e);
|
||||
} finally {
|
||||
MDC.remove(SHARD_ID_MDC_KEY);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
|
||||
MDC.put(SHARD_ID_MDC_KEY, shardId);
|
||||
try {
|
||||
log.info("Scheduler is shutting down, checkpointing.");
|
||||
shutdownRequestedInput.checkpointer().checkpoint();
|
||||
} catch (ShutdownException | InvalidStateException e) {
|
||||
log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
|
||||
} finally {
|
||||
MDC.remove(SHARD_ID_MDC_KEY);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,19 @@
|
|||
package software.amazon.kinesis.utils;
|
||||
|
||||
import software.amazon.kinesis.processor.ShardRecordProcessor;
|
||||
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
|
||||
|
||||
public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
|
||||
|
||||
private final RecordValidatorQueue recordValidator;
|
||||
|
||||
public TestRecordProcessorFactory(RecordValidatorQueue recordValidator) {
|
||||
this.recordValidator = recordValidator;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ShardRecordProcessor shardRecordProcessor() {
|
||||
return new TestRecordProcessor(this.recordValidator);
|
||||
}
|
||||
|
||||
}
|
||||
Loading…
Reference in a new issue