remove unwanted RecordProcessor and added new parameter class

This commit is contained in:
eha sah 2025-04-08 19:34:23 -07:00
parent 37513e5f4a
commit c17118429e
3 changed files with 2 additions and 95 deletions

View file

@ -1,84 +0,0 @@
package software.amazon.kinesis.worker;
import org.slf4j.MDC;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.processor.ShardRecordProcessor;
public class RecordProcessor implements ShardRecordProcessor {
private static final String SHARD_ID_MDC_KEY = "ShardId";
private String shardId;
@Override
public void initialize(InitializationInput initializationInput) {
shardId = initializationInput.shardId();
MDC.put(SHARD_ID_MDC_KEY, shardId);
try {
// log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
} finally {
MDC.remove(SHARD_ID_MDC_KEY);
}
}
@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
MDC.put(SHARD_ID_MDC_KEY, shardId);
try {
// log.info("Processing {} record(s)", processRecordsInput.records().size());
// processRecordsInput
// .records()
// .forEach(
// r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(),
// r.sequenceNumber()));
// Checkpoint periodically
processRecordsInput.checkpointer().checkpoint();
} catch (Throwable t) {
// log.error("Caught throwable while processing records. Aborting.", t);
} finally {
MDC.remove(SHARD_ID_MDC_KEY);
}
}
@Override
public void leaseLost(LeaseLostInput leaseLostInput) {
MDC.put(SHARD_ID_MDC_KEY, shardId);
try {
// log.info("Lost lease, so terminating.");
} finally {
MDC.remove(SHARD_ID_MDC_KEY);
}
}
@Override
public void shardEnded(ShardEndedInput shardEndedInput) {
MDC.put(SHARD_ID_MDC_KEY, shardId);
try {
// log.info("Reached shard end checkpointing.");
shardEndedInput.checkpointer().checkpoint();
} catch (ShutdownException | InvalidStateException e) {
// log.error("Exception while checkpointing at shard end. Giving up.", e);
} finally {
MDC.remove(SHARD_ID_MDC_KEY);
}
}
@Override
public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
MDC.put(SHARD_ID_MDC_KEY, shardId);
try {
// log.info("Scheduler is shutting down, checkpointing.");
shutdownRequestedInput.checkpointer().checkpoint();
} catch (ShutdownException | InvalidStateException e) {
// log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
} finally {
MDC.remove(SHARD_ID_MDC_KEY);
}
}
}

View file

@ -1,10 +0,0 @@
package software.amazon.kinesis.worker;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
public class RecordProcessorFactory implements ShardRecordProcessorFactory {
public ShardRecordProcessor shardRecordProcessor() {
return new RecordProcessor();
}
}

View file

@ -132,7 +132,8 @@ public class LeaseCoordinatorExerciser {
metricsFactory,
new LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig(),
LeaseManagementConfig.GracefulLeaseHandoffConfig.builder().build(),
new ConcurrentHashMap<>());
new ConcurrentHashMap<>(),
2 * leaseDurationMillis);
coordinators.add(coord);
}