diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLAppConfig.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLAppConfig.java index 7afda529..22b575fc 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLAppConfig.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLAppConfig.java @@ -65,13 +65,7 @@ public abstract class KCLAppConfig { return InitialPositionInStream.TRIM_HORIZON; } - public Protocol getConsumerProtocol() { - return Protocol.HTTP1_1; - } - - public Protocol getProducerProtocol() { - return Protocol.HTTP1_1; - } + public abstract Protocol getKinesisClientProtocol(); public ProducerConfig getProducerConfig() { return ProducerConfig.builder() @@ -86,37 +80,31 @@ public abstract class KCLAppConfig { return null; } - public final KinesisAsyncClient buildAsyncKinesisClient(Protocol protocol) throws URISyntaxException, IOException { + public final KinesisAsyncClient buildAsyncKinesisClient() throws URISyntaxException, IOException { + if (kinesisAsyncClient == null) { - this.kinesisAsyncClient = buildAsyncKinesisClient(Optional.ofNullable(protocol)); + // Setup H2 client config. + final NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder() + .maxConcurrency(Integer.MAX_VALUE); + + builder.protocol(getKinesisClientProtocol()); + + final SdkAsyncHttpClient sdkAsyncHttpClient = + builder.buildWithDefaults(AttributeMap.builder().build()); + + // Setup client builder by default values + final KinesisAsyncClientBuilder kinesisAsyncClientBuilder = KinesisAsyncClient.builder().region(getRegion()); + + kinesisAsyncClientBuilder.httpClient(sdkAsyncHttpClient); + + kinesisAsyncClientBuilder.credentialsProvider(getCredentialsProvider()); + + this.kinesisAsyncClient = kinesisAsyncClientBuilder.build(); } + return this.kinesisAsyncClient; } - public final KinesisAsyncClient buildAsyncKinesisClient(Optional protocol) throws URISyntaxException, IOException { - - // Setup H2 client config. - final NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder() - .maxConcurrency(Integer.MAX_VALUE); - - // If not present, defaults to HTTP1_1 - if (protocol.isPresent()) { - builder.protocol(protocol.get()); - } - - final SdkAsyncHttpClient sdkAsyncHttpClient = - builder.buildWithDefaults(AttributeMap.builder().build()); - - // Setup client builder by default values - final KinesisAsyncClientBuilder kinesisAsyncClientBuilder = KinesisAsyncClient.builder().region(getRegion()); - - kinesisAsyncClientBuilder.httpClient(sdkAsyncHttpClient); - - kinesisAsyncClientBuilder.credentialsProvider(getCredentialsProvider()); - - return kinesisAsyncClientBuilder.build(); - } - public final DynamoDbAsyncClient buildAsyncDynamoDbClient() throws IOException { if (this.dynamoDbAsyncClient == null) { final DynamoDbAsyncClientBuilder builder = DynamoDbAsyncClient.builder().region(getRegion()); @@ -135,7 +123,7 @@ public abstract class KCLAppConfig { return this.cloudWatchAsyncClient; } - public String getWorkerId() throws UnknownHostException { + public final String getWorkerId() throws UnknownHostException { return Inet4Address.getLocalHost().getHostName(); } @@ -150,9 +138,9 @@ public abstract class KCLAppConfig { return new TestRecordProcessorFactory(getRecordValidator()); } - public ConfigsBuilder getConfigsBuilder() throws IOException, URISyntaxException { + public final ConfigsBuilder getConfigsBuilder() throws IOException, URISyntaxException { final String workerId = getWorkerId(); - return new ConfigsBuilder(getStreamName(), getStreamName(), buildAsyncKinesisClient(getConsumerProtocol()), buildAsyncDynamoDbClient(), + return new ConfigsBuilder(getStreamName(), getStreamName(), buildAsyncKinesisClient(), buildAsyncDynamoDbClient(), buildAsyncCloudWatchClient(), workerId, getShardRecordProcessorFactory()); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryPollingH1TestConfig.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryPollingH1TestConfig.java index 7fdd910d..07a29171 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryPollingH1TestConfig.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryPollingH1TestConfig.java @@ -22,7 +22,7 @@ public class ReleaseCanaryPollingH1TestConfig extends KCLAppConfig { } @Override - public Protocol getConsumerProtocol() { + public Protocol getKinesisClientProtocol() { return Protocol.HTTP1_1; } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryPollingH2TestConfig.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryPollingH2TestConfig.java index b2089519..eb2b2143 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryPollingH2TestConfig.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryPollingH2TestConfig.java @@ -21,7 +21,7 @@ public class ReleaseCanaryPollingH2TestConfig extends KCLAppConfig { } @Override - public Protocol getConsumerProtocol() { + public Protocol getKinesisClientProtocol() { return Protocol.HTTP2; } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryStreamingTestConfig.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryStreamingTestConfig.java index cbe02440..6b0284c7 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryStreamingTestConfig.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryStreamingTestConfig.java @@ -16,7 +16,7 @@ public class ReleaseCanaryStreamingTestConfig extends KCLAppConfig { } @Override - public Protocol getConsumerProtocol() { + public Protocol getKinesisClientProtocol() { return Protocol.HTTP2; } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/BasicStreamConsumerIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/BasicStreamConsumerIntegrationTest.java index 457c4dbf..cd1ed524 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/BasicStreamConsumerIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/BasicStreamConsumerIntegrationTest.java @@ -16,17 +16,17 @@ public class BasicStreamConsumerIntegrationTest { consumer.run(); } - @Test - public void kclReleaseCanaryPollingH1Test() throws Exception { - KCLAppConfig consumerConfig = new ReleaseCanaryPollingH1TestConfig(); - TestConsumer consumer = new TestConsumer(consumerConfig); - consumer.run(); - } - - @Test - public void kclReleaseCanaryStreamingTest() throws Exception { - KCLAppConfig consumerConfig = new ReleaseCanaryStreamingTestConfig(); - TestConsumer consumer = new TestConsumer(consumerConfig); - consumer.run(); - } +// @Test +// public void kclReleaseCanaryPollingH1Test() throws Exception { +// KCLAppConfig consumerConfig = new ReleaseCanaryPollingH1TestConfig(); +// TestConsumer consumer = new TestConsumer(consumerConfig); +// consumer.run(); +// } +// +// @Test +// public void kclReleaseCanaryStreamingTest() throws Exception { +// KCLAppConfig consumerConfig = new ReleaseCanaryStreamingTestConfig(); +// TestConsumer consumer = new TestConsumer(consumerConfig); +// consumer.run(); +// } } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/AWSResourceManager.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/AWSResourceManager.java new file mode 100644 index 00000000..fbe602a6 --- /dev/null +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/AWSResourceManager.java @@ -0,0 +1,59 @@ +package software.amazon.kinesis.utils; + +import lombok.extern.slf4j.Slf4j; +import java.util.List; +import java.util.concurrent.TimeUnit; + +@Slf4j +public abstract class AWSResourceManager { + + public AWSResourceManager() {} + + public abstract void _deleteResource(String resourceName) throws Exception; + public abstract boolean _isResourceActive(String name); + public abstract List _getAllResourceNames() throws Exception; + + /** + * Deletes resource with specified resource name + * @param resourceName + * @throws Exception + */ + public void deleteResource(String resourceName) throws Exception { + + try { + _deleteResource(resourceName); + } catch (Exception e) { + throw new Exception("Could not delete resource: {}", e); + } + + // Wait till resource is deleted to return + int i = 0; + while (true) { + i++; + if (i > 100) { + throw new RuntimeException("Failed resource deletion"); + } + try { + if (!_isResourceActive(resourceName)) { + log.info("Successfully deleted the resource {}", resourceName); + return; + } + } catch (Exception e) { + try { + Thread.sleep(TimeUnit.SECONDS.toMillis(10)); + } catch (InterruptedException e1) {} + log.info("Resource {} is not deleted yet, exception: ", resourceName); + } + } + } + + /** + * Delete all instances of a particular resource type + */ + public void deleteAllResource() throws Exception { + final List streamNames = _getAllResourceNames(); + for (String streamName : streamNames) { + deleteResource(streamName); + } + } +} diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/LeaseTableManager.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/LeaseTableManager.java index 18c88538..dcf930d3 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/LeaseTableManager.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/LeaseTableManager.java @@ -3,19 +3,25 @@ package software.amazon.kinesis.utils; import lombok.extern.slf4j.Slf4j; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest; +import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest; +import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse; import software.amazon.awssdk.services.dynamodb.model.ListTablesRequest; import software.amazon.awssdk.services.dynamodb.model.ListTablesResponse; +import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException; +import software.amazon.awssdk.services.dynamodb.model.TableStatus; import software.amazon.kinesis.common.FutureUtils; import java.io.IOException; import java.net.URISyntaxException; import java.time.Duration; +import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @Slf4j -public class LeaseTableManager { +public class LeaseTableManager extends AWSResourceManager { private final DynamoDbAsyncClient dynamoClient; @@ -23,53 +29,46 @@ public class LeaseTableManager { this.dynamoClient = dynamoClient; } - private List listAllLeaseTables() throws Exception { - final ListTablesRequest request = ListTablesRequest.builder().build(); - final ListTablesResponse response; - try { - response = FutureUtils.resolveOrCancelFuture(dynamoClient.listTables(request), Duration.ofSeconds(60)); - } catch (ExecutionException | InterruptedException e) { - throw new Exception("Error listing all lease tables"); - } - return response.tableNames(); - } + public boolean _isResourceActive(String tableName) { + final DescribeTableRequest request = DescribeTableRequest.builder().tableName(tableName).build(); + final CompletableFuture describeTableResponseCompletableFuture = dynamoClient.describeTable(request); - public void deleteLeaseTable(String tableName) throws Exception { - final DeleteTableRequest request = DeleteTableRequest.builder().tableName(tableName).build(); try { - FutureUtils.resolveOrCancelFuture(dynamoClient.deleteTable(request), Duration.ofSeconds(60)); - } catch (ExecutionException | InterruptedException e) { - throw new Exception("Could not delete lease table: {}", e); - } - - // Wait till table is deleted to return - int i = 0; - while (true) { - i++; - if (i > 100) { - throw new RuntimeException("Failed lease table deletion"); + final DescribeTableResponse response = describeTableResponseCompletableFuture.get(30, TimeUnit.SECONDS); + boolean isActive = response.table().tableStatus().equals(TableStatus.ACTIVE); + if (!isActive) { + throw new RuntimeException("Table is not active, instead in status: " + response.table().tableStatus()); } - - List leaseTableNames = listAllLeaseTables(); - log.info("All lease tables name: {}. Looking for: {}", leaseTableNames, tableName); - if (!listAllLeaseTables().contains(tableName)) { - log.info("Succesfully deleted the lease table {}", tableName); - return; + return true; + } catch (ExecutionException e) { + if (e.getCause() instanceof ResourceNotFoundException) { + return false; } else { - try { - Thread.sleep(TimeUnit.SECONDS.toMillis(10)); - } catch (InterruptedException e1) {} - log.info("Lease table {} is not deleted yet, exception: ", tableName); + throw new RuntimeException(e); } + } catch (Exception e) { + throw new RuntimeException(e); } } - public void deleteAllLeaseTables() throws Exception { + public void _deleteResource(String tableName) throws Exception { + final DeleteTableRequest request = DeleteTableRequest.builder().tableName(tableName).build(); + FutureUtils.resolveOrCancelFuture(dynamoClient.deleteTable(request), Duration.ofSeconds(60)); + } - final List tableNames = listAllLeaseTables(); - for (String tableName : tableNames) { - deleteLeaseTable(tableName); + public List _getAllResourceNames() throws Exception { + List tableNames = new ArrayList<>(); + final ListTablesRequest request = ListTablesRequest.builder().build(); + ListTablesResponse response = null; + while(response == null || response.lastEvaluatedTableName() != null) { + try { + response = FutureUtils.resolveOrCancelFuture(dynamoClient.listTables(request), Duration.ofSeconds(60)); + } catch (ExecutionException | InterruptedException e) { + throw new Exception("Error listing all lease tables"); + } + tableNames.addAll(response.tableNames()); } + return tableNames; } } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/RecordValidatorQueue.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/RecordValidatorQueue.java index 934443dd..79671e49 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/RecordValidatorQueue.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/RecordValidatorQueue.java @@ -33,24 +33,14 @@ public class RecordValidatorQueue { for (Map.Entry> entry : dict.entrySet()) { List recordsPerShard = entry.getValue(); int prevVal = -1; - boolean shardIncOrder = true; for (String record : recordsPerShard) { int nextVal = Integer.parseInt(record); if (prevVal > nextVal) { log.error("The records are not in increasing order. Saw record data {} before {}.", prevVal, nextVal); - shardIncOrder = false; + return RecordValidationStatus.OUT_OF_ORDER; } prevVal = nextVal; } - if (!shardIncOrder) { - incOrder = false; - break; - } - } - - // If this is true, then there was some record that was processed out of order - if (!incOrder) { - return RecordValidationStatus.OUT_OF_ORDER; } // Validate that no records are missing over all shards diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/RecordValidatorQueueTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/RecordValidatorQueueTest.java index e64565d8..c196aa54 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/RecordValidatorQueueTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/RecordValidatorQueueTest.java @@ -5,7 +5,7 @@ import org.junit.Test; public class RecordValidatorQueueTest { - private RecordValidatorQueue recordValidator = new RecordValidatorQueue(); + private final RecordValidatorQueue recordValidator = new RecordValidatorQueue(); private static final String SHARD_ID = "ABC"; diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/StreamExistenceManager.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/StreamExistenceManager.java index 3943cf6a..cf089fd7 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/StreamExistenceManager.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/StreamExistenceManager.java @@ -11,10 +11,13 @@ import software.amazon.awssdk.services.kinesis.model.ListStreamsRequest; import software.amazon.awssdk.services.kinesis.model.ListStreamsResponse; import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; import software.amazon.awssdk.services.kinesis.model.StreamStatus; +import software.amazon.kinesis.common.FutureUtils; import software.amazon.kinesis.config.KCLAppConfig; import java.io.IOException; import java.net.URISyntaxException; +import java.time.Duration; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -22,17 +25,16 @@ import java.util.concurrent.TimeUnit; @Value @Slf4j -public class StreamExistenceManager { +public class StreamExistenceManager extends AWSResourceManager { private final KinesisAsyncClient client; private final KCLAppConfig testConfig; public StreamExistenceManager(KCLAppConfig config) throws URISyntaxException, IOException { this.testConfig = config; - this.client = config.buildAsyncKinesisClient(config.getConsumerProtocol()); + this.client = config.buildAsyncKinesisClient(); } - private boolean isStreamActive(String streamName) { - + public boolean _isResourceActive(String streamName) { final DescribeStreamSummaryRequest request = DescribeStreamSummaryRequest.builder().streamName(streamName).build(); final CompletableFuture describeStreamSummaryResponseCompletableFuture = client.describeStreamSummary(request); @@ -54,6 +56,34 @@ public class StreamExistenceManager { } } + public void _deleteResource(String streamName) throws Exception { + final DeleteStreamRequest request = DeleteStreamRequest.builder().streamName(streamName).enforceConsumerDeletion(true).build(); + client.deleteStream(request).get(30, TimeUnit.SECONDS); + } + + public List _getAllResourceNames() throws Exception { + List streamNames = new ArrayList<>(); + final ListStreamsRequest request = ListStreamsRequest.builder().build(); + ListStreamsResponse response = null; + while(response == null || response.hasMoreStreams()) { + try { + response = FutureUtils.resolveOrCancelFuture(client.listStreams(request), Duration.ofSeconds(60)); + } catch (Exception e) { + throw new Exception("Error listing all lease tables"); + } + streamNames.addAll(response.streamNames()); + } + return streamNames; + } + + public void checkStreamAndCreateIfNecessary(String streamName) { + + if (!_isResourceActive(streamName)) { + createStream(streamName, testConfig.getShardCount()); + } + log.info("Using stream {} with region {}", streamName, testConfig.getRegion()); + } + private void createStream(String streamName, int shardCount) { final CreateStreamRequest request = CreateStreamRequest.builder().streamName(streamName).shardCount(shardCount).build(); try { @@ -69,7 +99,7 @@ public class StreamExistenceManager { throw new RuntimeException("Failed stream creation, did not transition into active"); } try { - boolean isActive = isStreamActive(streamName); + boolean isActive = _isResourceActive(streamName); if (isActive) { log.info("Succesfully created the stream {}", streamName); return; @@ -85,59 +115,4 @@ public class StreamExistenceManager { } } - public void deleteStream(String streamName) { - final DeleteStreamRequest request = DeleteStreamRequest.builder().streamName(streamName).enforceConsumerDeletion(true).build(); - try { - client.deleteStream(request).get(30, TimeUnit.SECONDS); - } catch (Exception e) { - throw new RuntimeException("Failed to delete stream with name " + streamName, e); - } - - int i = 0; - while (true) { - i++; - if (i > 100) { - throw new RuntimeException("Failed stream deletion"); - } - try { - boolean isActive = isStreamActive(streamName); - if (!isActive) { - log.info("Succesfully deleted the stream {}", streamName); - return; - } - } catch (Exception e) { - try { - Thread.sleep(TimeUnit.SECONDS.toMillis(10)); - } catch (InterruptedException e1) { } - log.info("Stream {} is not deleted yet, exception: ", streamName, e); - } - } - } - - public void checkStreamAndCreateIfNecessary(String streamName) { - - if (!isStreamActive(streamName)) { - createStream(streamName, testConfig.getShardCount()); - } - log.info("Using stream {} with region {}", streamName, testConfig.getRegion()); - } - - private List getAllStreamNames() { - final ListStreamsRequest request = ListStreamsRequest.builder().build(); - ListStreamsResponse response; - try { - response = client.listStreams(request).get(30, TimeUnit.SECONDS); - } catch (Exception e) { - throw new RuntimeException("Failed to list all streams", e); - } - return response.streamNames(); - } - - public void deleteAllStreams() { - final List streamNames = getAllStreamNames(); - for (String streamName : streamNames) { - deleteStream(streamName); - } - } - } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestConsumer.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestConsumer.java index b0eb7d9d..2d08ddaf 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestConsumer.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestConsumer.java @@ -46,6 +46,8 @@ public class TestConsumer { private Scheduler scheduler; private ScheduledExecutorService producerExecutor; private ScheduledFuture producerFuture; + private ScheduledExecutorService consumerExecutor; + private ScheduledFuture consumerFuture; private DynamoDbAsyncClient dynamoClient; public int successfulPutRecords = 0; public BigInteger payloadCounter = new BigInteger("0"); @@ -54,7 +56,7 @@ public class TestConsumer { this.consumerConfig = consumerConfig; this.region = consumerConfig.getRegion(); this.streamName = consumerConfig.getStreamName(); - this.kinesisClient = consumerConfig.buildAsyncKinesisClient(consumerConfig.getConsumerProtocol()); + this.kinesisClient = consumerConfig.buildAsyncKinesisClient(); this.dynamoClient = consumerConfig.buildAsyncDynamoDbClient(); } @@ -64,14 +66,13 @@ public class TestConsumer { final LeaseTableManager leaseTableManager = new LeaseTableManager(this.dynamoClient); // Clean up any old streams or lease tables left in test environment - cleanTestEnvironment(streamExistenceManager, leaseTableManager); - Thread.sleep(TimeUnit.SECONDS.toMillis(30)); + cleanTestResources(streamExistenceManager, leaseTableManager); // Check if stream is created. If not, create it streamExistenceManager.checkStreamAndCreateIfNecessary(this.streamName); startProducer(); - setUpTestResources(); + setUpConsumerResources(); try { startConsumer(); @@ -98,20 +99,19 @@ public class TestConsumer { } catch (Exception e) { // Test Failed. Clean up resources and then throw exception. log.info("----------Test Failed: Cleaning up resources------------"); - Thread.sleep(TimeUnit.SECONDS.toMillis(30)); deleteResources(streamExistenceManager, leaseTableManager); throw e; } } - private void cleanTestEnvironment(StreamExistenceManager streamExistenceManager, LeaseTableManager leaseTableManager) throws Exception { + private void cleanTestResources(StreamExistenceManager streamExistenceManager, LeaseTableManager leaseTableManager) throws Exception { log.info("----------Before starting, Cleaning test environment----------"); log.info("----------Deleting all lease tables in account----------"); - leaseTableManager.deleteAllLeaseTables(); + leaseTableManager.deleteAllResource(); log.info("----------Finished deleting all lease tables-------------"); log.info("----------Deleting all streams in account----------"); - streamExistenceManager.deleteAllStreams(); + streamExistenceManager.deleteAllResource(); log.info("----------Finished deleting all streams-------------"); } @@ -121,7 +121,7 @@ public class TestConsumer { this.producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 60, 1, TimeUnit.SECONDS); } - private void setUpTestResources() throws Exception { + private void setUpConsumerResources() throws Exception { // Setup configuration of KCL (including DynamoDB and CloudWatch) final ConfigsBuilder configsBuilder = consumerConfig.getConfigsBuilder(); @@ -149,9 +149,8 @@ public class TestConsumer { private void startConsumer() { // Start record processing of dummy data - final Thread schedulerThread = new Thread(scheduler); - schedulerThread.setDaemon(true); - schedulerThread.start(); + this.consumerExecutor = Executors.newSingleThreadScheduledExecutor(); + this.consumerFuture = consumerExecutor.schedule(scheduler, 0, TimeUnit.SECONDS); } public void publishRecord() { @@ -220,9 +219,9 @@ public class TestConsumer { private void deleteResources(StreamExistenceManager streamExistenceManager, LeaseTableManager leaseTableManager) throws Exception { log.info("-------------Start deleting stream.----------------"); - streamExistenceManager.deleteStream(this.streamName); + streamExistenceManager.deleteResource(this.streamName); log.info("-------------Start deleting lease table.----------------"); - leaseTableManager.deleteLeaseTable(this.consumerConfig.getStreamName()); + leaseTableManager.deleteResource(this.consumerConfig.getStreamName()); log.info("-------------Finished deleting resources.----------------"); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestRecordProcessor.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestRecordProcessor.java index ab008951..f3e43915 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestRecordProcessor.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestRecordProcessor.java @@ -24,7 +24,7 @@ public class TestRecordProcessor implements ShardRecordProcessor { private String shardId; - RecordValidatorQueue recordValidator; + private final RecordValidatorQueue recordValidator; public TestRecordProcessor(RecordValidatorQueue recordValidator) { this.recordValidator = recordValidator; @@ -48,8 +48,8 @@ public class TestRecordProcessor implements ShardRecordProcessor { try { log.info("Processing {} record(s)", processRecordsInput.records().size()); - for (KinesisClientRecord r : processRecordsInput.records()) { - String data = new String(asByteArray(r.data())); + for (KinesisClientRecord kinesisRecord : processRecordsInput.records()) { + final String data = new String(asByteArray(kinesisRecord.data())); log.info("Processing record pk: {}", data); recordValidator.add(shardId, data); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestRecordProcessorFactory.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestRecordProcessorFactory.java index 5e0f317a..03361b6e 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestRecordProcessorFactory.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestRecordProcessorFactory.java @@ -5,7 +5,7 @@ import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class TestRecordProcessorFactory implements ShardRecordProcessorFactory { - RecordValidatorQueue recordValidator; + private final RecordValidatorQueue recordValidator; public TestRecordProcessorFactory(RecordValidatorQueue recordValidator) { this.recordValidator = recordValidator;