Add logging around shard end codepaths (#585)
* Add logging around shard end codepaths * Update logging messaging
This commit is contained in:
parent
2ca3cbd21c
commit
41f996b833
1 changed files with 3 additions and 0 deletions
|
|
@ -77,6 +77,7 @@ class KinesisDataFetcher {
|
||||||
return TERMINAL_RESULT;
|
return TERMINAL_RESULT;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
LOG.info("Skipping fetching records from Kinesis for shard " + shardId + ": nextIterator is null.");
|
||||||
return TERMINAL_RESULT;
|
return TERMINAL_RESULT;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -117,6 +118,7 @@ class KinesisDataFetcher {
|
||||||
lastKnownSequenceNumber = Iterables.getLast(result.getRecords()).getSequenceNumber();
|
lastKnownSequenceNumber = Iterables.getLast(result.getRecords()).getSequenceNumber();
|
||||||
}
|
}
|
||||||
if (nextIterator == null) {
|
if (nextIterator == null) {
|
||||||
|
LOG.info("Reached shard end: nextIterator is null in AdvancingResult.accept for shard " + shardId);
|
||||||
isShardEndReached = true;
|
isShardEndReached = true;
|
||||||
}
|
}
|
||||||
return getResult();
|
return getResult();
|
||||||
|
|
@ -167,6 +169,7 @@ class KinesisDataFetcher {
|
||||||
nextIterator = getIterator(ShardIteratorType.AT_SEQUENCE_NUMBER.toString(), sequenceNumber);
|
nextIterator = getIterator(ShardIteratorType.AT_SEQUENCE_NUMBER.toString(), sequenceNumber);
|
||||||
}
|
}
|
||||||
if (nextIterator == null) {
|
if (nextIterator == null) {
|
||||||
|
LOG.info("Reached shard end: cannot advance iterator for shard " + shardId);
|
||||||
isShardEndReached = true;
|
isShardEndReached = true;
|
||||||
}
|
}
|
||||||
this.lastKnownSequenceNumber = sequenceNumber;
|
this.lastKnownSequenceNumber = sequenceNumber;
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue