refactor(ProcessTask): expose shardId

This commit is contained in:
glarwood 2019-03-20 18:30:39 +00:00
parent 0f78ff0bae
commit 0a4fc8bef0
2 changed files with 11 additions and 0 deletions

View file

@ -214,6 +214,7 @@ class ProcessTask implements ITask {
+ shardInfo.getShardId()); + shardInfo.getShardId());
final ProcessRecordsInput processRecordsInput = new ProcessRecordsInput().withRecords(records) final ProcessRecordsInput processRecordsInput = new ProcessRecordsInput().withRecords(records)
.withCheckpointer(recordProcessorCheckpointer) .withCheckpointer(recordProcessorCheckpointer)
.withShardId(shardInfo.getShardId())
.withMillisBehindLatest(input.getMillisBehindLatest()); .withMillisBehindLatest(input.getMillisBehindLatest());
final long recordProcessorStartTimeMillis = System.currentTimeMillis(); final long recordProcessorStartTimeMillis = System.currentTimeMillis();

View file

@ -32,6 +32,7 @@ public class ProcessRecordsInput {
private List<Record> records; private List<Record> records;
private IRecordProcessorCheckpointer checkpointer; private IRecordProcessorCheckpointer checkpointer;
private Long millisBehindLatest; private Long millisBehindLatest;
private String shardId;
/** /**
* Default constructor. * Default constructor.
@ -89,6 +90,10 @@ public class ProcessRecordsInput {
return millisBehindLatest; return millisBehindLatest;
} }
public String getShardId() {
return shardId;
}
/** /**
* Set milliseconds behind latest. * Set milliseconds behind latest.
* *
@ -111,6 +116,11 @@ public class ProcessRecordsInput {
return this; return this;
} }
public ProcessRecordsInput withShardId(String shardId) {
this.shardId = shardId;
return this;
}
public Duration getTimeSpentInCache() { public Duration getTimeSpentInCache() {
if (cacheEntryTime == null || cacheExitTime == null) { if (cacheEntryTime == null || cacheExitTime == null) {
return Duration.ZERO; return Duration.ZERO;