From c17118429e850afee624cf961e3787650789244e Mon Sep 17 00:00:00 2001 From: eha sah Date: Tue, 8 Apr 2025 19:34:23 -0700 Subject: [PATCH] remove unwanted RecordProcessor and added new parameter class --- .../kinesis/worker/RecordProcessor.java | 84 ------------------- .../worker/RecordProcessorFactory.java | 10 --- .../leases/LeaseCoordinatorExerciser.java | 3 +- 3 files changed, 2 insertions(+), 95 deletions(-) delete mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/worker/RecordProcessor.java delete mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/worker/RecordProcessorFactory.java diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/worker/RecordProcessor.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/worker/RecordProcessor.java deleted file mode 100644 index f9e341f0..00000000 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/worker/RecordProcessor.java +++ /dev/null @@ -1,84 +0,0 @@ -package software.amazon.kinesis.worker; - -import org.slf4j.MDC; -import software.amazon.kinesis.exceptions.InvalidStateException; -import software.amazon.kinesis.exceptions.ShutdownException; -import software.amazon.kinesis.lifecycle.events.InitializationInput; -import software.amazon.kinesis.lifecycle.events.LeaseLostInput; -import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; -import software.amazon.kinesis.lifecycle.events.ShardEndedInput; -import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; -import software.amazon.kinesis.processor.ShardRecordProcessor; - -public class RecordProcessor implements ShardRecordProcessor { - - private static final String SHARD_ID_MDC_KEY = "ShardId"; - private String shardId; - - @Override - public void initialize(InitializationInput initializationInput) { - shardId = initializationInput.shardId(); - MDC.put(SHARD_ID_MDC_KEY, shardId); - try { - // log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber()); - } finally { - MDC.remove(SHARD_ID_MDC_KEY); - } - } - - @Override - public void processRecords(ProcessRecordsInput processRecordsInput) { - MDC.put(SHARD_ID_MDC_KEY, shardId); - try { - // log.info("Processing {} record(s)", processRecordsInput.records().size()); - // processRecordsInput - // .records() - // .forEach( - // r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), - // r.sequenceNumber())); - - // Checkpoint periodically - processRecordsInput.checkpointer().checkpoint(); - } catch (Throwable t) { - // log.error("Caught throwable while processing records. Aborting.", t); - } finally { - MDC.remove(SHARD_ID_MDC_KEY); - } - } - - @Override - public void leaseLost(LeaseLostInput leaseLostInput) { - MDC.put(SHARD_ID_MDC_KEY, shardId); - try { - // log.info("Lost lease, so terminating."); - } finally { - MDC.remove(SHARD_ID_MDC_KEY); - } - } - - @Override - public void shardEnded(ShardEndedInput shardEndedInput) { - MDC.put(SHARD_ID_MDC_KEY, shardId); - try { - // log.info("Reached shard end checkpointing."); - shardEndedInput.checkpointer().checkpoint(); - } catch (ShutdownException | InvalidStateException e) { - // log.error("Exception while checkpointing at shard end. Giving up.", e); - } finally { - MDC.remove(SHARD_ID_MDC_KEY); - } - } - - @Override - public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { - MDC.put(SHARD_ID_MDC_KEY, shardId); - try { - // log.info("Scheduler is shutting down, checkpointing."); - shutdownRequestedInput.checkpointer().checkpoint(); - } catch (ShutdownException | InvalidStateException e) { - // log.error("Exception while checkpointing at requested shutdown. Giving up.", e); - } finally { - MDC.remove(SHARD_ID_MDC_KEY); - } - } -} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/worker/RecordProcessorFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/worker/RecordProcessorFactory.java deleted file mode 100644 index 10f46764..00000000 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/worker/RecordProcessorFactory.java +++ /dev/null @@ -1,10 +0,0 @@ -package software.amazon.kinesis.worker; - -import software.amazon.kinesis.processor.ShardRecordProcessor; -import software.amazon.kinesis.processor.ShardRecordProcessorFactory; - -public class RecordProcessorFactory implements ShardRecordProcessorFactory { - public ShardRecordProcessor shardRecordProcessor() { - return new RecordProcessor(); - } -} diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCoordinatorExerciser.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCoordinatorExerciser.java index 26eb0d3f..e5e70d71 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCoordinatorExerciser.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCoordinatorExerciser.java @@ -132,7 +132,8 @@ public class LeaseCoordinatorExerciser { metricsFactory, new LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig(), LeaseManagementConfig.GracefulLeaseHandoffConfig.builder().build(), - new ConcurrentHashMap<>()); + new ConcurrentHashMap<>(), + 2 * leaseDurationMillis); coordinators.add(coord); }