Few minor revisions

This commit is contained in:
Meher Mankikar 2023-06-19 09:37:09 -07:00
parent ae42e96e55
commit 97d7831446
4 changed files with 16 additions and 12 deletions

View file

@ -26,7 +26,7 @@ public class RecordValidatorQueue {
values.add(data);
}
public RecordValidationStatus validateRecords(int trueTotalShardCount) {
public RecordValidationStatus validateRecords(int expectedShardCount) {
// Validate that each List in the HashMap has data records in increasing order
boolean incOrder = true;
@ -54,16 +54,16 @@ public class RecordValidatorQueue {
}
// Validate that no records are missing over all shards
int totalShardCount = 0;
int actualShardCount = 0;
for (Map.Entry<String, List<String>> entry : dict.entrySet()) {
List<String> recordsPerShard = entry.getValue();
Set<String> noDupRecords = new HashSet<String>(recordsPerShard);
totalShardCount += noDupRecords.size();
actualShardCount += noDupRecords.size();
}
// If this is true, then there was some record that was missed during processing.
if (totalShardCount != trueTotalShardCount) {
log.error("Failed to get correct number of records processed. Should be {} but was {}", trueTotalShardCount, totalShardCount);
if (actualShardCount != expectedShardCount) {
log.error("Failed to get correct number of records processed. Should be {} but was {}", expectedShardCount, actualShardCount);
return RecordValidationStatus.MISSING_RECORD;
}

View file

@ -43,12 +43,10 @@ public class StreamExistenceManager {
throw new RuntimeException("Stream is not active, instead in status: " + response.streamDescriptionSummary().streamStatus());
}
return true;
} catch (ResourceNotFoundException e) {
return false;
} catch (ExecutionException e) {
if (e.getCause() instanceof ResourceNotFoundException) {
return false;
} else {
throw new RuntimeException(e);
}
throw new RuntimeException(e);
} catch (Exception e) {
throw new RuntimeException(e);
}

View file

@ -189,9 +189,11 @@ public class TestConsumer {
}
private void deleteResources(StreamExistenceManager streamExistenceManager, DynamoDbAsyncClient dynamoDBClient) throws Exception {
log.info("-------------Start deleting test resources.----------------");
log.info("-------------Start deleting stream.----------------");
streamExistenceManager.deleteStream(this.streamName);
log.info("-------------Start deleting lease table.----------------");
deleteLeaseTable(dynamoDBClient, consumerConfig.getStreamName());
log.info("-------------Finished deleting resources.----------------");
}
private void deleteLeaseTable(DynamoDbAsyncClient dynamoClient, String tableName) throws Exception {

View file

@ -5,6 +5,7 @@ import org.slf4j.MDC;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.processor.ShardRecordProcessor;
@ -13,6 +14,9 @@ import software.amazon.kinesis.retrieval.KinesisClientRecord;
import java.nio.ByteBuffer;
/**
* Implement initialization and deletion of shards and shard record processing
*/
@Slf4j
public class TestRecordProcessor implements ShardRecordProcessor {
@ -39,7 +43,7 @@ public class TestRecordProcessor implements ShardRecordProcessor {
@Override
public void processRecords(software.amazon.kinesis.lifecycle.events.ProcessRecordsInput processRecordsInput) {
public void processRecords(ProcessRecordsInput processRecordsInput) {
MDC.put(SHARD_ID_MDC_KEY, shardId);
try {
log.info("Processing {} record(s)", processRecordsInput.records().size());