From 530fd477b120e9611bf0da1ab770712b7f0c6a9d Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Tue, 26 Jan 2021 15:18:29 -0800 Subject: [PATCH 1/3] Fixing record processor processing deleted leases in cycle --- .../clientlibrary/lib/worker/InitializeTask.java | 11 ++++++++++- .../clientlibrary/lib/worker/ShardConsumer.java | 10 +++++++++- .../kinesis/clientlibrary/lib/worker/TaskResult.java | 10 ++++++++++ 3 files changed, 29 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java index 21d0599b..474a5767 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java @@ -14,6 +14,7 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -79,7 +80,15 @@ class InitializeTask implements ITask { try { LOG.debug("Initializing ShardId " + shardInfo.getShardId()); - Checkpoint initialCheckpointObject = checkpoint.getCheckpointObject(shardInfo.getShardId()); + Checkpoint initialCheckpointObject; + try { + initialCheckpointObject = checkpoint.getCheckpointObject(shardInfo.getShardId()); + } catch (KinesisClientLibException e) { + LOG.error("Caught exception while fetching checkpoint for " + shardInfo.getShardId(), e); + final TaskResult result = new TaskResult(e); + result.leaseNotFound(); + return result; + } ExtendedSequenceNumber initialCheckpoint = initialCheckpointObject.getCheckpoint(); dataFetcher.initialize(initialCheckpoint.getSequenceNumber(), streamConfig.getInitialPositionInStream()); diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java index 9e73aad9..11ee39b5 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java @@ -374,7 +374,7 @@ class ShardConsumer { } private enum TaskOutcome { - SUCCESSFUL, END_OF_SHARD, NOT_COMPLETE, FAILURE + SUCCESSFUL, END_OF_SHARD, NOT_COMPLETE, FAILURE, LEASE_NOT_FOUND } private TaskOutcome determineTaskOutcome() { @@ -391,6 +391,10 @@ class ShardConsumer { return TaskOutcome.SUCCESSFUL; } logTaskException(result); + // This is the case of result with exception + if (result.isLeaseNotFound()) { + return TaskOutcome.LEASE_NOT_FOUND; + } } catch (Exception e) { throw new RuntimeException(e); } finally { @@ -487,6 +491,10 @@ class ShardConsumer { markForShutdown(ShutdownReason.TERMINATE); LOG.info("Shard " + shardInfo.getShardId() + ": Mark for shutdown with reason TERMINATE"); } + if (taskOutcome == TaskOutcome.LEASE_NOT_FOUND) { + markForShutdown(ShutdownReason.ZOMBIE); + LOG.info("Shard " + shardInfo.getShardId() + ": Mark for shutdown with reason ZOMBIE as lease was not found"); + } if (isShutdownRequested() && taskOutcome != TaskOutcome.FAILURE) { currentState = currentState.shutdownTransition(shutdownReason); } else if (isShutdownRequested() && ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS.equals(currentState.getState())) { diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/TaskResult.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/TaskResult.java index 70109b86..db22c97b 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/TaskResult.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/TaskResult.java @@ -33,6 +33,8 @@ class TaskResult { // List of childShards of the current shard. This field is only required for the task result when we reach end of a shard. private List childShards; + private boolean leaseNotFound; + /** * @return the shardEndReached */ @@ -57,6 +59,14 @@ class TaskResult { */ protected void setChildShards(List childShards) { this.childShards = childShards; } + public boolean isLeaseNotFound() { + return leaseNotFound; + } + + public void leaseNotFound() { + this.leaseNotFound = true; + } + /** * @return the exception */ From 5142ce9bc512a0dcf6e935eb722dbf2b03aa9ef9 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Tue, 26 Jan 2021 21:40:22 -0800 Subject: [PATCH 2/3] Fixing exception type --- .../lib/worker/InitializeTask.java | 4 +- .../KinesisClientLibLeaseCoordinator.java | 3 +- .../lib/worker/ShardConsumerTest.java | 49 +++++++++++++++++-- 3 files changed, 50 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java index 474a5767..5343470f 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java @@ -14,7 +14,7 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; -import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibException; +import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibNonRetryableException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -83,7 +83,7 @@ class InitializeTask implements ITask { Checkpoint initialCheckpointObject; try { initialCheckpointObject = checkpoint.getCheckpointObject(shardInfo.getShardId()); - } catch (KinesisClientLibException e) { + } catch (KinesisClientLibNonRetryableException e) { LOG.error("Caught exception while fetching checkpoint for " + shardInfo.getShardId(), e); final TaskResult result = new TaskResult(e); result.leaseNotFound(); diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java index bcbabcde..47baad04 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java @@ -283,7 +283,8 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator Date: Tue, 26 Jan 2021 23:21:39 -0800 Subject: [PATCH 3/3] Fixing unit test --- .../lib/worker/KinesisClientLibLeaseCoordinatorTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinatorTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinatorTest.java index d8549870..e48cdd16 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinatorTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinatorTest.java @@ -79,7 +79,7 @@ public class KinesisClientLibLeaseCoordinatorTest { leaseCoordinator.initialize(); } - @Test(expected = KinesisClientLibIOException.class) + @Test(expected = com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException.class) public void testGetCheckpointObjectWithNoLease() throws DependencyException, ProvisionedThroughputException, IllegalStateException, InvalidStateException, KinesisClientLibException {