From e8735a4742de98e64d91ca7ac2ef60ab687d79e0 Mon Sep 17 00:00:00 2001 From: Justin Pfifer Date: Tue, 11 Sep 2018 12:16:12 -0700 Subject: [PATCH] Debugging Logs for Initialization of FanOutRecordsPublisher (#398) * Some debug logging to understand mismatched sequence numbers Added some logging messages for sequence numbers when starting up. * Added debug support and logging Added @ToString to InitialPositionInStreamExtended for debugging purposes. Added a debug log about the initialization of the FanOutRecordsPublisher to ensure that the publisher is being initialized as expected. --- .../kinesis/checkpoint/dynamodb/DynamoDBCheckpointer.java | 1 + .../kinesis/common/InitialPositionInStreamExtended.java | 3 +++ .../software/amazon/kinesis/lifecycle/InitializeTask.java | 8 +++++--- .../kinesis/retrieval/fanout/FanOutRecordsPublisher.java | 2 ++ 4 files changed, 11 insertions(+), 3 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/dynamodb/DynamoDBCheckpointer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/dynamodb/DynamoDBCheckpointer.java index 171002e4..9b05cd86 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/dynamodb/DynamoDBCheckpointer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/dynamodb/DynamoDBCheckpointer.java @@ -87,6 +87,7 @@ public class DynamoDBCheckpointer implements Checkpointer { public Checkpoint getCheckpointObject(final String shardId) throws KinesisClientLibException { try { Lease lease = leaseRefresher.getLease(shardId); + log.debug("[{}] Retrieved lease => {}", shardId, lease); return new Checkpoint(lease.checkpoint(), lease.pendingCheckpoint()); } catch (DependencyException | InvalidStateException | ProvisionedThroughputException e) { String message = "Unable to fetch checkpoint for shardId " + shardId; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/InitialPositionInStreamExtended.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/InitialPositionInStreamExtended.java index 82dc2f15..30c5e935 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/InitialPositionInStreamExtended.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/InitialPositionInStreamExtended.java @@ -14,12 +14,15 @@ */ package software.amazon.kinesis.common; +import lombok.ToString; + import java.util.Date; /** * Class that houses the entities needed to specify the position in the stream from where a new application should * start. */ +@ToString public class InitialPositionInStreamExtended { private final InitialPositionInStream position; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/InitializeTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/InitializeTask.java index f03ccac4..4d151574 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/InitializeTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/InitializeTask.java @@ -14,18 +14,18 @@ */ package software.amazon.kinesis.lifecycle; -import software.amazon.kinesis.annotations.KinesisClientInternalApi; -import software.amazon.kinesis.common.InitialPositionInStreamExtended; import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.checkpoint.Checkpoint; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; +import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.metrics.MetricsFactory; -import software.amazon.kinesis.metrics.MetricsScope; import software.amazon.kinesis.metrics.MetricsLevel; +import software.amazon.kinesis.metrics.MetricsScope; import software.amazon.kinesis.metrics.MetricsUtil; import software.amazon.kinesis.processor.Checkpointer; import software.amazon.kinesis.processor.ShardRecordProcessor; @@ -77,6 +77,8 @@ public class InitializeTask implements ConsumerTask { log.debug("Initializing ShardId {}", shardInfo); Checkpoint initialCheckpointObject = checkpoint.getCheckpointObject(shardInfo.shardId()); ExtendedSequenceNumber initialCheckpoint = initialCheckpointObject.checkpoint(); + log.debug("[{}]: Checkpoint: {} -- Initial Position: {}", shardInfo.shardId(), initialCheckpoint, + initialPositionInStream); cache.start(initialCheckpoint, initialPositionInStream); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java index 978cd9f9..728aafc0 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java @@ -69,6 +69,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher { public void start(ExtendedSequenceNumber extendedSequenceNumber, InitialPositionInStreamExtended initialPositionInStreamExtended) { synchronized (lockObject) { + log.debug("[{}] Initializing Publisher @ Sequence: {} -- Initial Position: {}", shardId, + extendedSequenceNumber, initialPositionInStreamExtended); this.initialPositionInStreamExtended = initialPositionInStreamExtended; this.currentSequenceNumber = extendedSequenceNumber.sequenceNumber(); this.isFirstConnection = true;