Initial architecture for KCL 1.x testing and basic polling test
This commit is contained in:
parent
b011206f7b
commit
c581a2ce54
15 changed files with 870 additions and 0 deletions
|
|
@ -26,6 +26,12 @@ The **Amazon Kinesis Client Library for Java** (Amazon KCL) enables Java develop
|
|||
|
||||
After you've downloaded the code from GitHub, you can build it using Maven. To disable GPG signing in the build, use this command: `mvn clean install -Dgpg.skip=true`
|
||||
|
||||
## Running Integration Tests
|
||||
|
||||
To run integration tests: `mvn -Dit.test=*IntegrationTest verify`.
|
||||
This will look for a default AWS profile specified in your local `.aws/credentials`.
|
||||
Optionally, you can provide the name of an IAM user/role to run tests with as a string using this command: `mvn -Dit.test=*IntegrationTest -DawsProfile="<IAM_USER>" verify`.
|
||||
|
||||
## Integration with the Kinesis Producer Library
|
||||
For producer-side developers using the **[Kinesis Producer Library (KPL)][kinesis-guide-kpl]**, the KCL integrates without additional effort. When the KCL retrieves an aggregated Amazon Kinesis record consisting of multiple KPL user records, it will automatically invoke the KPL to extract the individual user records before returning them to the user.
|
||||
|
||||
|
|
|
|||
16
pom.xml
16
pom.xml
|
|
@ -29,6 +29,7 @@
|
|||
<sqlite4java.version>1.0.392</sqlite4java.version>
|
||||
<sqlite4java.native>libsqlite4java</sqlite4java.native>
|
||||
<sqlite4java.libpath>${project.build.directory}/test-lib</sqlite4java.libpath>
|
||||
<slf4j.version>2.0.7</slf4j.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
|
|
@ -122,6 +123,17 @@
|
|||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
<version>${slf4j.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-simple</artifactId>
|
||||
<version>${slf4j.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-core</artifactId>
|
||||
|
|
@ -178,6 +190,10 @@
|
|||
<name>sqlite4java.library.path</name>
|
||||
<value>${sqlite4java.libpath}</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>awsProfile</name>
|
||||
<value>${awsProfile}</value>
|
||||
</property>
|
||||
</systemProperties>
|
||||
</configuration>
|
||||
</plugin>
|
||||
|
|
|
|||
|
|
@ -0,0 +1,155 @@
|
|||
package com.amazonaws.services.kinesis.clientlibrary.config;
|
||||
|
||||
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
|
||||
import com.amazonaws.auth.AWSCredentialsProvider;
|
||||
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
|
||||
import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
|
||||
import com.amazonaws.services.cloudwatch.AmazonCloudWatchClient;
|
||||
import com.amazonaws.services.cloudwatch.AmazonCloudWatchClientBuilder;
|
||||
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
|
||||
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
|
||||
import com.amazonaws.services.kinesis.AmazonKinesis;
|
||||
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.utils.RecordValidatorQueue;
|
||||
import lombok.Value;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.utils.ReshardOptions;
|
||||
|
||||
import lombok.Builder;
|
||||
import software.amazon.awssdk.regions.Region;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.Inet4Address;
|
||||
import java.net.UnknownHostException;
|
||||
|
||||
/**
|
||||
* Default configuration for a producer or consumer used in integration tests.
|
||||
* Producer: puts records of size 60 KB at an interval of 100 ms
|
||||
* Consumer: starts polling for records to process at shard horizon
|
||||
*/
|
||||
public abstract class KCLAppConfig {
|
||||
|
||||
private AmazonKinesis kinesisClient;
|
||||
private AmazonDynamoDB dynamoDbClient;
|
||||
private AmazonCloudWatch cloudWatchClient;
|
||||
private RecordValidatorQueue recordValidator;
|
||||
|
||||
/**
|
||||
* Name used for test stream and lease tracker table
|
||||
*/
|
||||
public abstract String getStreamName();
|
||||
|
||||
public int getShardCount() { return 4; }
|
||||
|
||||
public Region getRegion() { return Region.US_WEST_2; }
|
||||
|
||||
/**
|
||||
* "default" profile, should match with profiles listed in "cat ~/.aws/config"
|
||||
*/
|
||||
private AWSCredentialsProvider getCredentialsProvider() {
|
||||
String awsProfile = System.getProperty("awsProfile");
|
||||
return ((awsProfile != null) ?
|
||||
new com.amazonaws.auth.profile.ProfileCredentialsProvider(awsProfile) : new DefaultAWSCredentialsProviderChain());
|
||||
}
|
||||
|
||||
public InitialPositionInStream getKclInitialPosition() {
|
||||
return InitialPositionInStream.TRIM_HORIZON;
|
||||
}
|
||||
|
||||
public ProducerConfig getProducerConfig() {
|
||||
return ProducerConfig.builder()
|
||||
.isBatchPut(false)
|
||||
.batchSize(1)
|
||||
.recordSizeKB(60)
|
||||
.callPeriodMills(100)
|
||||
.build();
|
||||
}
|
||||
|
||||
public ReshardConfig getReshardConfig() {
|
||||
return null;
|
||||
}
|
||||
|
||||
public final AmazonKinesis buildSyncKinesisClient() throws IOException {
|
||||
|
||||
if (this.kinesisClient == null) {
|
||||
AmazonKinesisClientBuilder builder;
|
||||
|
||||
builder = AmazonKinesisClientBuilder.standard().withRegion(getRegion().id());
|
||||
builder = builder.withCredentials(getCredentialsProvider());
|
||||
this.kinesisClient = builder.build();
|
||||
}
|
||||
return this.kinesisClient;
|
||||
}
|
||||
|
||||
public final AmazonDynamoDB buildSyncDynamoDbClient() throws IOException {
|
||||
if (this.dynamoDbClient == null) {
|
||||
AmazonDynamoDBClientBuilder builder = AmazonDynamoDBClient.builder().withRegion(getRegion().id());
|
||||
builder = builder.withCredentials(getCredentialsProvider());
|
||||
this.dynamoDbClient = builder.build();
|
||||
}
|
||||
return this.dynamoDbClient;
|
||||
}
|
||||
|
||||
public final AmazonCloudWatch buildSyncCloudWatchClient() throws IOException {
|
||||
if (this.cloudWatchClient == null) {
|
||||
AmazonCloudWatchClientBuilder builder = AmazonCloudWatchClient.builder().withRegion(getRegion().id());
|
||||
builder = builder.withCredentials(getCredentialsProvider());
|
||||
this.cloudWatchClient = builder.build();
|
||||
}
|
||||
return this.cloudWatchClient;
|
||||
}
|
||||
|
||||
public final RecordValidatorQueue getRecordValidator() {
|
||||
if(this.recordValidator == null) {
|
||||
this.recordValidator = new RecordValidatorQueue();
|
||||
}
|
||||
return this.recordValidator;
|
||||
}
|
||||
|
||||
public final String getWorkerId() throws UnknownHostException {
|
||||
return Inet4Address.getLocalHost().getHostName();
|
||||
}
|
||||
|
||||
public final KinesisClientLibConfiguration getKclConfiguration() throws IOException {
|
||||
return new KinesisClientLibConfiguration(getStreamName(),
|
||||
getStreamName(),
|
||||
getCredentialsProvider(),
|
||||
getWorkerId()).withInitialPositionInStream(getKclInitialPosition());
|
||||
}
|
||||
|
||||
/**
|
||||
* Configure ingress load (batch size, record size, and calling interval)
|
||||
*/
|
||||
@Value
|
||||
@Builder
|
||||
static class ProducerConfig {
|
||||
private boolean isBatchPut;
|
||||
private int batchSize;
|
||||
private int recordSizeKB;
|
||||
private long callPeriodMills;
|
||||
}
|
||||
|
||||
/**
|
||||
* Description of the method of resharding for a test case
|
||||
*/
|
||||
@Value
|
||||
@Builder
|
||||
static class ReshardConfig {
|
||||
/**
|
||||
* reshardingFactorCycle: lists the order or reshards that will be done during one reshard cycle
|
||||
* e.g {SPLIT, MERGE} means that the number of shards will first be doubled, then halved
|
||||
*/
|
||||
private ReshardOptions[] reshardingFactorCycle;
|
||||
|
||||
/**
|
||||
* numReshardCycles: the number of resharding cycles that will be executed in a test
|
||||
*/
|
||||
private int numReshardCycles;
|
||||
|
||||
/**
|
||||
* reshardFrequencyMillis: the period of time between reshard cycles (in milliseconds)
|
||||
*/
|
||||
private long reshardFrequencyMillis;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,15 @@
|
|||
package com.amazonaws.services.kinesis.clientlibrary.config;
|
||||
|
||||
import software.amazon.awssdk.http.Protocol;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
public class ReleaseCanaryPollingTestConfig extends KCLAppConfig {
|
||||
|
||||
private final UUID uniqueId = UUID.randomUUID();
|
||||
@Override
|
||||
public String getStreamName() {
|
||||
return "KCLReleaseCanary1XPollingTestStream_" + uniqueId;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,20 @@
|
|||
package com.amazonaws.services.kinesis.clientlibrary.lifecycle;
|
||||
|
||||
import com.amazonaws.services.kinesis.clientlibrary.config.ReleaseCanaryPollingTestConfig;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.utils.TestConsumer;
|
||||
import org.junit.Test;
|
||||
|
||||
public class BasicPollingIntegrationTest {
|
||||
|
||||
/**
|
||||
* Test with a polling consumer.
|
||||
* In the polling case, consumer makes calls to the producer each time to request records to process.
|
||||
*/
|
||||
@Test
|
||||
public void kclReleaseCanaryPollingTest() throws Exception {
|
||||
ReleaseCanaryPollingTestConfig consumerConfig = new ReleaseCanaryPollingTestConfig();
|
||||
TestConsumer consumer = new TestConsumer(consumerConfig);
|
||||
consumer.run();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,59 @@
|
|||
package com.amazonaws.services.kinesis.clientlibrary.utils;
|
||||
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import java.util.List;
|
||||
import static java.lang.Thread.sleep;
|
||||
|
||||
@Slf4j
|
||||
@NoArgsConstructor
|
||||
public abstract class AWSResourceManager {
|
||||
|
||||
public abstract void deleteResourceCall(String resourceName) throws Exception;
|
||||
public abstract boolean isResourceActive(String name);
|
||||
public abstract List<String> getAllResourceNames() throws Exception;
|
||||
|
||||
/**
|
||||
* Deletes resource with specified resource name
|
||||
* @param resourceName
|
||||
* @throws Exception
|
||||
*/
|
||||
public void deleteResource(String resourceName) throws Exception {
|
||||
|
||||
try{
|
||||
deleteResourceCall(resourceName);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Failed to delete resource with name " + resourceName, e);
|
||||
}
|
||||
|
||||
int i = 0;
|
||||
while (true) {
|
||||
i++;
|
||||
if (i > 100) {
|
||||
throw new RuntimeException("Failed to delete resource");
|
||||
}
|
||||
try {
|
||||
if (!isResourceActive(resourceName)) {
|
||||
log.info("Succesfully deleted the resource " + resourceName);
|
||||
return;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
try {
|
||||
sleep(10_000); // 10 secs backoff.
|
||||
} catch (InterruptedException e1) {}
|
||||
log.info("Resource {} is not deleted yet, exception: ", resourceName, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete all instances of a particular resource type
|
||||
*/
|
||||
public void deleteAllResource() throws Exception {
|
||||
|
||||
List<String> resourceNames = getAllResourceNames();
|
||||
for(String resourceName: resourceNames) {
|
||||
deleteResource(resourceName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,60 @@
|
|||
package com.amazonaws.services.kinesis.clientlibrary.utils;
|
||||
|
||||
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
|
||||
import com.amazonaws.services.dynamodbv2.model.DeleteTableRequest;
|
||||
import com.amazonaws.services.dynamodbv2.model.DescribeTableRequest;
|
||||
import com.amazonaws.services.dynamodbv2.model.DescribeTableResult;
|
||||
import com.amazonaws.services.dynamodbv2.model.ListTablesRequest;
|
||||
import com.amazonaws.services.dynamodbv2.model.ListTablesResult;
|
||||
import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
@Slf4j
|
||||
@AllArgsConstructor
|
||||
public class LeaseTableManager extends AWSResourceManager {
|
||||
|
||||
private final AmazonDynamoDB dynamoClient;
|
||||
private static final String ACTIVE_TABLE_STATE = "ACTIVE";
|
||||
|
||||
public boolean isResourceActive(String tableName) {
|
||||
|
||||
DescribeTableRequest request = new DescribeTableRequest();
|
||||
request.withTableName(tableName);
|
||||
|
||||
try {
|
||||
final DescribeTableResult response = this.dynamoClient.describeTable(request);
|
||||
String tableStatus = response.getTable().getTableStatus();
|
||||
boolean isActive = tableStatus.equals(ACTIVE_TABLE_STATE);
|
||||
if (!isActive) {
|
||||
throw new RuntimeException("Table is not active, instead in status: " + response.getTable().getTableStatus());
|
||||
}
|
||||
return true;
|
||||
} catch (ResourceNotFoundException e) {
|
||||
return false;
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public void deleteResourceCall(String tableName) throws Exception {
|
||||
DeleteTableRequest request = new DeleteTableRequest();
|
||||
request.setTableName(tableName);
|
||||
this.dynamoClient.deleteTable(request);
|
||||
}
|
||||
|
||||
public List<String> getAllResourceNames() throws Exception {
|
||||
ListTablesRequest listTableRequest = new ListTablesRequest();
|
||||
List<String> allTableNames = new ArrayList<>();
|
||||
ListTablesResult result = null;
|
||||
do {
|
||||
result = dynamoClient.listTables(listTableRequest);
|
||||
allTableNames.addAll(result.getTableNames());
|
||||
listTableRequest = listTableRequest.withExclusiveStartTableName(result.getLastEvaluatedTableName());
|
||||
} while (result.getLastEvaluatedTableName() != null);
|
||||
return allTableNames;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,10 @@
|
|||
package com.amazonaws.services.kinesis.clientlibrary.utils;
|
||||
|
||||
/**
|
||||
* Possible outcomes for record validation in RecordValidatorQueue
|
||||
*/
|
||||
public enum RecordValidationStatus {
|
||||
OUT_OF_ORDER,
|
||||
MISSING_RECORD,
|
||||
NO_ERROR
|
||||
}
|
||||
|
|
@ -0,0 +1,63 @@
|
|||
package com.amazonaws.services.kinesis.clientlibrary.utils;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
* Class that maintains a dictionary that maps shard IDs to a list of records
|
||||
* that are processed by that shard.
|
||||
* Validation ensures that
|
||||
* 1. The records processed by each shard are in increasing order (duplicates allowed)
|
||||
* 2. The total number of unique records processed is equal to the number of records put on the stream
|
||||
*/
|
||||
@Slf4j
|
||||
public class RecordValidatorQueue {
|
||||
|
||||
private final ConcurrentHashMap<String, List<String>> dict = new ConcurrentHashMap<>();
|
||||
|
||||
public void add(String shardId, String data) {
|
||||
final List<String> values = dict.computeIfAbsent(shardId, key -> new ArrayList<>());
|
||||
values.add(data);
|
||||
}
|
||||
|
||||
public RecordValidationStatus validateRecords(int expectedRecordCount) {
|
||||
|
||||
// Validate that each List in the HashMap has data records in increasing order
|
||||
for (Map.Entry<String, List<String>> entry : dict.entrySet()) {
|
||||
List<String> recordsPerShard = entry.getValue();
|
||||
int prevVal = -1;
|
||||
for (String record : recordsPerShard) {
|
||||
int nextVal = Integer.parseInt(record);
|
||||
if (prevVal > nextVal) {
|
||||
log.error("The records are not in increasing order. Saw record data {} before {}.", prevVal, nextVal);
|
||||
return RecordValidationStatus.OUT_OF_ORDER;
|
||||
}
|
||||
prevVal = nextVal;
|
||||
}
|
||||
}
|
||||
|
||||
// Validate that no records are missing over all shards
|
||||
int actualRecordCount = 0;
|
||||
for (Map.Entry<String, List<String>> entry : dict.entrySet()) {
|
||||
List<String> recordsPerShard = entry.getValue();
|
||||
Set<String> noDupRecords = new HashSet<String>(recordsPerShard);
|
||||
actualRecordCount += noDupRecords.size();
|
||||
}
|
||||
|
||||
// If this is true, then there was some record that was missed during processing.
|
||||
if (actualRecordCount != expectedRecordCount) {
|
||||
log.error("Failed to get correct number of records processed. Should be {} but was {}", expectedRecordCount, actualRecordCount);
|
||||
return RecordValidationStatus.MISSING_RECORD;
|
||||
}
|
||||
|
||||
// Record validation succeeded.
|
||||
return RecordValidationStatus.NO_ERROR;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,44 @@
|
|||
package com.amazonaws.services.kinesis.clientlibrary.utils;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public class RecordValidatorQueueTest {
|
||||
|
||||
private final RecordValidatorQueue recordValidator = new RecordValidatorQueue();
|
||||
|
||||
private static final String SHARD_ID = "ABC";
|
||||
|
||||
@Test
|
||||
public void testValidationFailedRecordOutOfOrder() {
|
||||
recordValidator.add(SHARD_ID, "0");
|
||||
recordValidator.add(SHARD_ID, "1");
|
||||
recordValidator.add(SHARD_ID, "3");
|
||||
recordValidator.add(SHARD_ID, "2");
|
||||
|
||||
RecordValidationStatus error = recordValidator.validateRecords(4);
|
||||
Assert.assertEquals(RecordValidationStatus.OUT_OF_ORDER, error);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidationFailedMissingRecord() {
|
||||
recordValidator.add(SHARD_ID, "0");
|
||||
recordValidator.add(SHARD_ID, "1");
|
||||
recordValidator.add(SHARD_ID, "2");
|
||||
recordValidator.add(SHARD_ID, "3");
|
||||
|
||||
RecordValidationStatus error = recordValidator.validateRecords(5);
|
||||
Assert.assertEquals(RecordValidationStatus.MISSING_RECORD, error);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidRecords() {
|
||||
recordValidator.add(SHARD_ID, "0");
|
||||
recordValidator.add(SHARD_ID, "1");
|
||||
recordValidator.add(SHARD_ID, "2");
|
||||
recordValidator.add(SHARD_ID, "3");
|
||||
|
||||
RecordValidationStatus error = recordValidator.validateRecords(4);
|
||||
Assert.assertEquals(RecordValidationStatus.NO_ERROR, error);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,11 @@
|
|||
package com.amazonaws.services.kinesis.clientlibrary.utils;
|
||||
|
||||
/**
|
||||
* Specifies the types of resharding possible in integration tests
|
||||
* Split doubles the number of shards.
|
||||
* Merge halves the number of shards.
|
||||
*/
|
||||
public enum ReshardOptions {
|
||||
SPLIT,
|
||||
MERGE
|
||||
}
|
||||
|
|
@ -0,0 +1,106 @@
|
|||
package com.amazonaws.services.kinesis.clientlibrary.utils;
|
||||
|
||||
import com.amazonaws.services.kinesis.AmazonKinesis;
|
||||
import com.amazonaws.services.kinesis.model.DescribeStreamSummaryResult;
|
||||
import com.amazonaws.services.kinesis.model.ListStreamsRequest;
|
||||
import com.amazonaws.services.kinesis.model.ListStreamsResult;
|
||||
import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
|
||||
import lombok.Value;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import com.amazonaws.services.kinesis.model.CreateStreamRequest;
|
||||
import com.amazonaws.services.kinesis.model.DeleteStreamRequest;
|
||||
import com.amazonaws.services.kinesis.model.DescribeStreamSummaryRequest;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.config.KCLAppConfig;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import static java.lang.Thread.sleep;
|
||||
|
||||
@Value
|
||||
@Slf4j
|
||||
public class StreamExistenceManager extends AWSResourceManager {
|
||||
private final AmazonKinesis client;
|
||||
private final KCLAppConfig testConfig;
|
||||
private static final String ACTIVE_STREAM_STATE = "ACTIVE";
|
||||
|
||||
public StreamExistenceManager(KCLAppConfig config) throws URISyntaxException, IOException {
|
||||
this.testConfig = config;
|
||||
this.client = config.buildSyncKinesisClient();
|
||||
}
|
||||
|
||||
public boolean isResourceActive(String streamName) {
|
||||
|
||||
DescribeStreamSummaryRequest request = new DescribeStreamSummaryRequest();
|
||||
request.setStreamName(streamName);
|
||||
|
||||
try {
|
||||
final DescribeStreamSummaryResult response = client.describeStreamSummary(request);
|
||||
String streamStatus = response.getStreamDescriptionSummary().getStreamStatus();
|
||||
boolean isActive = streamStatus.equals(ACTIVE_STREAM_STATE);
|
||||
if (!isActive) {
|
||||
throw new RuntimeException("Stream is not active, instead in status: " + response.getStreamDescriptionSummary().getStreamStatus());
|
||||
}
|
||||
return true;
|
||||
} catch (ResourceNotFoundException e) {
|
||||
return false;
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public void deleteResourceCall(String streamName) {
|
||||
DeleteStreamRequest request = new DeleteStreamRequest();
|
||||
request.setStreamName(streamName);
|
||||
request.withEnforceConsumerDeletion(true);
|
||||
client.deleteStream(request);
|
||||
}
|
||||
|
||||
public List<String> getAllResourceNames() throws Exception {
|
||||
ListStreamsRequest listStreamRequest = new ListStreamsRequest();
|
||||
List<String> allStreamNames = new ArrayList<>();
|
||||
ListStreamsResult result = null;
|
||||
do {
|
||||
result = client.listStreams(listStreamRequest);
|
||||
allStreamNames.addAll(result.getStreamNames());
|
||||
listStreamRequest = listStreamRequest.withExclusiveStartStreamName(result.getNextToken());
|
||||
} while (result.getHasMoreStreams());
|
||||
return allStreamNames;
|
||||
}
|
||||
|
||||
public void createStream(String streamName, int shardCount) {
|
||||
CreateStreamRequest request = new CreateStreamRequest();
|
||||
request.setStreamName(streamName);
|
||||
request.setShardCount(shardCount);
|
||||
|
||||
try {
|
||||
client.createStream(request);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Failed to create stream with name " + streamName, e);
|
||||
}
|
||||
|
||||
int i = 0;
|
||||
while (true) {
|
||||
i++;
|
||||
if (i > 100) {
|
||||
throw new RuntimeException("Failed stream creation, did not transition into active");
|
||||
}
|
||||
try {
|
||||
boolean isActive = isResourceActive(streamName);
|
||||
if (isActive) {
|
||||
log.info("Succesfully created the stream " + streamName);
|
||||
return;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
try {
|
||||
sleep(10_000); // 10 secs backoff.
|
||||
} catch (InterruptedException e1) {
|
||||
log.error("Failed to sleep");
|
||||
}
|
||||
log.info("Stream {} is not active yet, exception: ", streamName, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,196 @@
|
|||
package com.amazonaws.services.kinesis.clientlibrary.utils;
|
||||
|
||||
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
|
||||
import com.amazonaws.services.kinesis.AmazonKinesis;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.RandomStringUtils;
|
||||
import software.amazon.awssdk.regions.Region;
|
||||
import com.amazonaws.services.kinesis.model.PutRecordRequest;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.config.KCLAppConfig;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.math.BigInteger;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
@Slf4j
|
||||
public class TestConsumer {
|
||||
private KinesisClientLibConfiguration kclConfig;
|
||||
private Worker worker;
|
||||
public final KCLAppConfig consumerConfig;
|
||||
public final Region region;
|
||||
public final String streamName;
|
||||
public final AmazonKinesis kinesisClient;
|
||||
public final AmazonDynamoDB dynamoClient;
|
||||
private ScheduledExecutorService producerExecutor;
|
||||
private ScheduledFuture<?> producerFuture;
|
||||
private ScheduledExecutorService consumerExecutor;
|
||||
private ScheduledFuture<?> consumerFuture;
|
||||
private final ObjectMapper mapper = new ObjectMapper();
|
||||
public int successfulPutRecords = 0;
|
||||
public BigInteger payloadCounter = new BigInteger("0");
|
||||
|
||||
public TestConsumer(KCLAppConfig consumerConfig) {
|
||||
this.consumerConfig = consumerConfig;
|
||||
this.region = consumerConfig.getRegion();
|
||||
this.streamName = consumerConfig.getStreamName();
|
||||
|
||||
try {
|
||||
this.kinesisClient = consumerConfig.buildSyncKinesisClient();
|
||||
this.dynamoClient = consumerConfig.buildSyncDynamoDbClient();
|
||||
this.kclConfig = consumerConfig.getKclConfiguration();
|
||||
} catch ( IOException e ) {
|
||||
throw new RuntimeException( e );
|
||||
}
|
||||
}
|
||||
|
||||
public void run() throws Exception {
|
||||
|
||||
StreamExistenceManager streamExistenceManager = new StreamExistenceManager(this.consumerConfig);
|
||||
LeaseTableManager leaseTableManager = new LeaseTableManager(this.dynamoClient);
|
||||
|
||||
// Clean up any old streams or lease tables left in test environment
|
||||
cleanTestResources(streamExistenceManager, leaseTableManager);
|
||||
|
||||
// Create new stream for test case
|
||||
streamExistenceManager.createStream(this.streamName, this.consumerConfig.getShardCount());
|
||||
|
||||
// Send dummy data to stream
|
||||
startProducer();
|
||||
setupConsumerResources();
|
||||
|
||||
try {
|
||||
// Start record processing of dummy data
|
||||
startConsumer();
|
||||
|
||||
// Sleep for three minutes to allow the producer/consumer to run and then end the test case.
|
||||
Thread.sleep(TimeUnit.SECONDS.toMillis(60 * 3 ));
|
||||
|
||||
// Stops sending dummy data.
|
||||
stopProducer();
|
||||
|
||||
// Wait a few seconds for the last few records to be processed
|
||||
Thread.sleep(TimeUnit.SECONDS.toMillis(10));
|
||||
|
||||
// Finishes processing current batch of data already received from Kinesis before shutting down.
|
||||
awaitConsumerFinish();
|
||||
|
||||
// Validate processed data
|
||||
validateRecordProcessor();
|
||||
|
||||
} catch (Exception e) {
|
||||
// Test Failed. Throw exception and clean resources
|
||||
log.info("----------Test Failed: Cleaning up resources-----------");
|
||||
throw e;
|
||||
} finally {
|
||||
// Clean up resources created
|
||||
deleteResources(streamExistenceManager, leaseTableManager);
|
||||
}
|
||||
}
|
||||
|
||||
private void cleanTestResources(StreamExistenceManager streamExistenceManager, LeaseTableManager leaseTableManager) throws Exception {
|
||||
log.info("----------Before starting, Cleaning test environment----------");
|
||||
log.info("----------Deleting all lease tables in account----------");
|
||||
leaseTableManager.deleteAllResource();
|
||||
log.info("----------Finished deleting all lease tables-------------");
|
||||
|
||||
log.info("----------Deleting all streams in account----------");
|
||||
streamExistenceManager.deleteAllResource();
|
||||
log.info("----------Finished deleting all streams-------------");
|
||||
}
|
||||
|
||||
private void startProducer() {
|
||||
this.producerExecutor = Executors.newSingleThreadScheduledExecutor();
|
||||
this.producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 60, 1, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
private void setupConsumerResources() throws Exception {
|
||||
final TestRecordProcessorFactory recordProcessorFactory =
|
||||
new TestRecordProcessorFactory( consumerConfig.getRecordValidator() );
|
||||
|
||||
worker = new Worker.Builder()
|
||||
.kinesisClient( consumerConfig.buildSyncKinesisClient() )
|
||||
.dynamoDBClient( dynamoClient )
|
||||
.cloudWatchClient( consumerConfig.buildSyncCloudWatchClient() )
|
||||
.recordProcessorFactory( recordProcessorFactory ).config( kclConfig )
|
||||
.build();
|
||||
}
|
||||
|
||||
private void startConsumer() {
|
||||
// Start record processing of dummy data
|
||||
this.consumerExecutor = Executors.newSingleThreadScheduledExecutor();
|
||||
this.consumerFuture = consumerExecutor.schedule(worker, 0, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
private void stopProducer() {
|
||||
log.info("Cancelling producer and shutting down executor.");
|
||||
producerFuture.cancel(false);
|
||||
producerExecutor.shutdown();
|
||||
}
|
||||
|
||||
private void awaitConsumerFinish() throws Exception{
|
||||
Future<Boolean> gracefulShutdownFuture = worker.startGracefulShutdown();
|
||||
log.info("Waiting up to 20 seconds for shutdown to complete.");
|
||||
try {
|
||||
gracefulShutdownFuture.get(20, TimeUnit.SECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
log.info("Interrupted while waiting for graceful shutdown. Continuing.");
|
||||
} catch (ExecutionException | TimeoutException e) {
|
||||
throw e;
|
||||
}
|
||||
log.info("Completed, shutting down now.");
|
||||
}
|
||||
|
||||
private void validateRecordProcessor() {
|
||||
log.info("The number of expected records is: {}", successfulPutRecords);
|
||||
RecordValidationStatus errorVal = consumerConfig.getRecordValidator().validateRecords(successfulPutRecords);
|
||||
if (errorVal == RecordValidationStatus.OUT_OF_ORDER) {
|
||||
throw new RuntimeException("There was an error validating the records that were processed. The records were out of order");
|
||||
} else if (errorVal == RecordValidationStatus.MISSING_RECORD) {
|
||||
throw new RuntimeException("There was an error validating the records that were processed. Some records were missing.");
|
||||
}
|
||||
log.info("--------------Completed validation of processed records.--------------");
|
||||
}
|
||||
|
||||
public void publishRecord() {
|
||||
PutRecordRequest request;
|
||||
request = new PutRecordRequest();
|
||||
request.setPartitionKey(RandomStringUtils.randomAlphabetic(5, 20));
|
||||
request.setStreamName(streamName);
|
||||
request.setData(wrapWithCounter(5, payloadCounter));
|
||||
|
||||
kinesisClient.putRecord(request);
|
||||
|
||||
// Increment the payload counter if the putRecord call was successful
|
||||
payloadCounter = payloadCounter.add(new BigInteger("1"));
|
||||
successfulPutRecords += 1;
|
||||
}
|
||||
|
||||
private ByteBuffer wrapWithCounter(int payloadSize, BigInteger payloadCounter) throws RuntimeException {
|
||||
byte[] returnData;
|
||||
log.info("--------------Putting record with data: {}", payloadCounter);
|
||||
try {
|
||||
returnData = mapper.writeValueAsBytes(payloadCounter);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Error converting object to bytes: ", e);
|
||||
}
|
||||
return ByteBuffer.wrap(returnData);
|
||||
}
|
||||
|
||||
private void deleteResources(StreamExistenceManager streamExistenceManager, LeaseTableManager leaseTableManager) throws Exception {
|
||||
log.info("-------------Start deleting stream.----------------");
|
||||
streamExistenceManager.deleteResource(this.streamName);
|
||||
log.info("-------------Start deleting lease table.----------------");
|
||||
leaseTableManager.deleteResource(consumerConfig.getStreamName());
|
||||
log.info("-------------Finished deleting test resources.----------------");
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,91 @@
|
|||
package com.amazonaws.services.kinesis.clientlibrary.utils;
|
||||
|
||||
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
|
||||
import com.amazonaws.services.kinesis.model.Record;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.slf4j.MDC;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
/**
|
||||
* Implements initalization and shutdown of shards as well as shard record processing
|
||||
*/
|
||||
@Slf4j
|
||||
public class TestRecordProcessor implements IRecordProcessor {
|
||||
|
||||
private String shardId;
|
||||
private static final String SHARD_ID_MDC_KEY = "ShardId";
|
||||
private final RecordValidatorQueue recordValidator;
|
||||
|
||||
public TestRecordProcessor(RecordValidatorQueue recordValidator) {
|
||||
this.recordValidator = recordValidator;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void initialize(InitializationInput initializationInput) {
|
||||
shardId = initializationInput.getShardId();
|
||||
MDC.put(SHARD_ID_MDC_KEY, shardId);
|
||||
try {
|
||||
log.info("Initializing @ Sequence: {}", initializationInput.getExtendedSequenceNumber());
|
||||
} finally {
|
||||
MDC.remove(SHARD_ID_MDC_KEY);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void processRecords(ProcessRecordsInput processRecordsInput) {
|
||||
MDC.put(SHARD_ID_MDC_KEY, shardId);
|
||||
try {
|
||||
log.info("Processing {} record(s)", processRecordsInput.getRecords().size());
|
||||
|
||||
for (Record kinesisRecord : processRecordsInput.getRecords()) {
|
||||
final String data = new String(asByteArray(kinesisRecord.getData()));
|
||||
log.info("Processing record pk: {}", data);
|
||||
recordValidator.add(shardId, data);
|
||||
}
|
||||
checkpoint(processRecordsInput.getCheckpointer(), "ProcessRecords");
|
||||
} catch (Throwable t) {
|
||||
log.error("Caught throwable while processing records. Aborting.", t);
|
||||
Runtime.getRuntime().halt(1);
|
||||
} finally {
|
||||
MDC.remove(SHARD_ID_MDC_KEY);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdown(ShutdownInput shutdownInput) {
|
||||
MDC.put(SHARD_ID_MDC_KEY, shardId);
|
||||
|
||||
try {
|
||||
log.info("Scheduler is shutting down, checkpointing.");
|
||||
shutdownInput.getCheckpointer().checkpoint();
|
||||
} catch (ShutdownException | InvalidStateException e) {
|
||||
log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
|
||||
} finally {
|
||||
MDC.remove(SHARD_ID_MDC_KEY);
|
||||
}
|
||||
}
|
||||
|
||||
private static byte[] asByteArray(ByteBuffer buf) {
|
||||
byte[] bytes = new byte[buf.remaining()];
|
||||
buf.get(bytes);
|
||||
return bytes;
|
||||
}
|
||||
|
||||
private void checkpoint(IRecordProcessorCheckpointer checkpointer, String operation) {
|
||||
try {
|
||||
checkpointer.checkpoint();
|
||||
} catch (com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException |
|
||||
com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException e) {
|
||||
log.error("{}: Error while checkpointing, called from {}", shardId, operation, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,18 @@
|
|||
package com.amazonaws.services.kinesis.clientlibrary.utils;
|
||||
|
||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
|
||||
|
||||
public class TestRecordProcessorFactory implements IRecordProcessorFactory {
|
||||
|
||||
private final RecordValidatorQueue recordValidator;
|
||||
|
||||
public TestRecordProcessorFactory(RecordValidatorQueue recordValidator) {
|
||||
this.recordValidator = recordValidator;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IRecordProcessor createProcessor() {
|
||||
return new TestRecordProcessor(this.recordValidator);
|
||||
}
|
||||
}
|
||||
Loading…
Reference in a new issue