From 97d78314468aff513d116f107877b54d8377dbbd Mon Sep 17 00:00:00 2001 From: Meher Mankikar Date: Mon, 19 Jun 2023 09:37:09 -0700 Subject: [PATCH] Few minor revisions --- .../amazon/kinesis/utils/RecordValidatorQueue.java | 10 +++++----- .../amazon/kinesis/utils/StreamExistenceManager.java | 8 +++----- .../software/amazon/kinesis/utils/TestConsumer.java | 4 +++- .../amazon/kinesis/utils/TestRecordProcessor.java | 6 +++++- 4 files changed, 16 insertions(+), 12 deletions(-) diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/RecordValidatorQueue.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/RecordValidatorQueue.java index 936b4ad1..934443dd 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/RecordValidatorQueue.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/RecordValidatorQueue.java @@ -26,7 +26,7 @@ public class RecordValidatorQueue { values.add(data); } - public RecordValidationStatus validateRecords(int trueTotalShardCount) { + public RecordValidationStatus validateRecords(int expectedShardCount) { // Validate that each List in the HashMap has data records in increasing order boolean incOrder = true; @@ -54,16 +54,16 @@ public class RecordValidatorQueue { } // Validate that no records are missing over all shards - int totalShardCount = 0; + int actualShardCount = 0; for (Map.Entry> entry : dict.entrySet()) { List recordsPerShard = entry.getValue(); Set noDupRecords = new HashSet(recordsPerShard); - totalShardCount += noDupRecords.size(); + actualShardCount += noDupRecords.size(); } // If this is true, then there was some record that was missed during processing. - if (totalShardCount != trueTotalShardCount) { - log.error("Failed to get correct number of records processed. Should be {} but was {}", trueTotalShardCount, totalShardCount); + if (actualShardCount != expectedShardCount) { + log.error("Failed to get correct number of records processed. Should be {} but was {}", expectedShardCount, actualShardCount); return RecordValidationStatus.MISSING_RECORD; } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/StreamExistenceManager.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/StreamExistenceManager.java index ee0450a0..0181a222 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/StreamExistenceManager.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/StreamExistenceManager.java @@ -43,12 +43,10 @@ public class StreamExistenceManager { throw new RuntimeException("Stream is not active, instead in status: " + response.streamDescriptionSummary().streamStatus()); } return true; + } catch (ResourceNotFoundException e) { + return false; } catch (ExecutionException e) { - if (e.getCause() instanceof ResourceNotFoundException) { - return false; - } else { - throw new RuntimeException(e); - } + throw new RuntimeException(e); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestConsumer.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestConsumer.java index 5485524d..ece63922 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestConsumer.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestConsumer.java @@ -189,9 +189,11 @@ public class TestConsumer { } private void deleteResources(StreamExistenceManager streamExistenceManager, DynamoDbAsyncClient dynamoDBClient) throws Exception { - log.info("-------------Start deleting test resources.----------------"); + log.info("-------------Start deleting stream.----------------"); streamExistenceManager.deleteStream(this.streamName); + log.info("-------------Start deleting lease table.----------------"); deleteLeaseTable(dynamoDBClient, consumerConfig.getStreamName()); + log.info("-------------Finished deleting resources.----------------"); } private void deleteLeaseTable(DynamoDbAsyncClient dynamoClient, String tableName) throws Exception { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestRecordProcessor.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestRecordProcessor.java index 32b53743..ab008951 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestRecordProcessor.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestRecordProcessor.java @@ -5,6 +5,7 @@ import org.slf4j.MDC; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; +import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import software.amazon.kinesis.processor.ShardRecordProcessor; @@ -13,6 +14,9 @@ import software.amazon.kinesis.retrieval.KinesisClientRecord; import java.nio.ByteBuffer; +/** + * Implement initialization and deletion of shards and shard record processing + */ @Slf4j public class TestRecordProcessor implements ShardRecordProcessor { @@ -39,7 +43,7 @@ public class TestRecordProcessor implements ShardRecordProcessor { @Override - public void processRecords(software.amazon.kinesis.lifecycle.events.ProcessRecordsInput processRecordsInput) { + public void processRecords(ProcessRecordsInput processRecordsInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Processing {} record(s)", processRecordsInput.records().size());