Debugging Logs for Initialization of FanOutRecordsPublisher (#398)

* Some debug logging to understand mismatched sequence numbers

Added some logging messages for sequence numbers when starting up.

* Added debug support and logging

Added @ToString to InitialPositionInStreamExtended for debugging
purposes.
Added a debug log about the initialization of the
FanOutRecordsPublisher to ensure that the publisher is being
initialized as expected.
This commit is contained in:
Justin Pfifer 2018-09-11 12:16:12 -07:00 committed by GitHub
parent c8a3a031f4
commit e8735a4742
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 11 additions and 3 deletions

View file

@ -87,6 +87,7 @@ public class DynamoDBCheckpointer implements Checkpointer {
public Checkpoint getCheckpointObject(final String shardId) throws KinesisClientLibException {
try {
Lease lease = leaseRefresher.getLease(shardId);
log.debug("[{}] Retrieved lease => {}", shardId, lease);
return new Checkpoint(lease.checkpoint(), lease.pendingCheckpoint());
} catch (DependencyException | InvalidStateException | ProvisionedThroughputException e) {
String message = "Unable to fetch checkpoint for shardId " + shardId;

View file

@ -14,12 +14,15 @@
*/
package software.amazon.kinesis.common;
import lombok.ToString;
import java.util.Date;
/**
* Class that houses the entities needed to specify the position in the stream from where a new application should
* start.
*/
@ToString
public class InitialPositionInStreamExtended {
private final InitialPositionInStream position;

View file

@ -14,18 +14,18 @@
*/
package software.amazon.kinesis.lifecycle;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.checkpoint.Checkpoint;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.MetricsScope;
import software.amazon.kinesis.metrics.MetricsLevel;
import software.amazon.kinesis.metrics.MetricsScope;
import software.amazon.kinesis.metrics.MetricsUtil;
import software.amazon.kinesis.processor.Checkpointer;
import software.amazon.kinesis.processor.ShardRecordProcessor;
@ -77,6 +77,8 @@ public class InitializeTask implements ConsumerTask {
log.debug("Initializing ShardId {}", shardInfo);
Checkpoint initialCheckpointObject = checkpoint.getCheckpointObject(shardInfo.shardId());
ExtendedSequenceNumber initialCheckpoint = initialCheckpointObject.checkpoint();
log.debug("[{}]: Checkpoint: {} -- Initial Position: {}", shardInfo.shardId(), initialCheckpoint,
initialPositionInStream);
cache.start(initialCheckpoint, initialPositionInStream);

View file

@ -69,6 +69,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
public void start(ExtendedSequenceNumber extendedSequenceNumber,
InitialPositionInStreamExtended initialPositionInStreamExtended) {
synchronized (lockObject) {
log.debug("[{}] Initializing Publisher @ Sequence: {} -- Initial Position: {}", shardId,
extendedSequenceNumber, initialPositionInStreamExtended);
this.initialPositionInStreamExtended = initialPositionInStreamExtended;
this.currentSequenceNumber = extendedSequenceNumber.sequenceNumber();
this.isFirstConnection = true;