Fixing record processor processing deleted leases in cycle

This commit is contained in:
Ashwin Giridharan 2021-01-26 15:18:29 -08:00
parent 03e162b770
commit 530fd477b1
3 changed files with 29 additions and 2 deletions

View file

@ -14,6 +14,7 @@
*/ */
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibException;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -79,7 +80,15 @@ class InitializeTask implements ITask {
try { try {
LOG.debug("Initializing ShardId " + shardInfo.getShardId()); LOG.debug("Initializing ShardId " + shardInfo.getShardId());
Checkpoint initialCheckpointObject = checkpoint.getCheckpointObject(shardInfo.getShardId()); Checkpoint initialCheckpointObject;
try {
initialCheckpointObject = checkpoint.getCheckpointObject(shardInfo.getShardId());
} catch (KinesisClientLibException e) {
LOG.error("Caught exception while fetching checkpoint for " + shardInfo.getShardId(), e);
final TaskResult result = new TaskResult(e);
result.leaseNotFound();
return result;
}
ExtendedSequenceNumber initialCheckpoint = initialCheckpointObject.getCheckpoint(); ExtendedSequenceNumber initialCheckpoint = initialCheckpointObject.getCheckpoint();
dataFetcher.initialize(initialCheckpoint.getSequenceNumber(), streamConfig.getInitialPositionInStream()); dataFetcher.initialize(initialCheckpoint.getSequenceNumber(), streamConfig.getInitialPositionInStream());

View file

@ -374,7 +374,7 @@ class ShardConsumer {
} }
private enum TaskOutcome { private enum TaskOutcome {
SUCCESSFUL, END_OF_SHARD, NOT_COMPLETE, FAILURE SUCCESSFUL, END_OF_SHARD, NOT_COMPLETE, FAILURE, LEASE_NOT_FOUND
} }
private TaskOutcome determineTaskOutcome() { private TaskOutcome determineTaskOutcome() {
@ -391,6 +391,10 @@ class ShardConsumer {
return TaskOutcome.SUCCESSFUL; return TaskOutcome.SUCCESSFUL;
} }
logTaskException(result); logTaskException(result);
// This is the case of result with exception
if (result.isLeaseNotFound()) {
return TaskOutcome.LEASE_NOT_FOUND;
}
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} finally { } finally {
@ -487,6 +491,10 @@ class ShardConsumer {
markForShutdown(ShutdownReason.TERMINATE); markForShutdown(ShutdownReason.TERMINATE);
LOG.info("Shard " + shardInfo.getShardId() + ": Mark for shutdown with reason TERMINATE"); LOG.info("Shard " + shardInfo.getShardId() + ": Mark for shutdown with reason TERMINATE");
} }
if (taskOutcome == TaskOutcome.LEASE_NOT_FOUND) {
markForShutdown(ShutdownReason.ZOMBIE);
LOG.info("Shard " + shardInfo.getShardId() + ": Mark for shutdown with reason ZOMBIE as lease was not found");
}
if (isShutdownRequested() && taskOutcome != TaskOutcome.FAILURE) { if (isShutdownRequested() && taskOutcome != TaskOutcome.FAILURE) {
currentState = currentState.shutdownTransition(shutdownReason); currentState = currentState.shutdownTransition(shutdownReason);
} else if (isShutdownRequested() && ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS.equals(currentState.getState())) { } else if (isShutdownRequested() && ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS.equals(currentState.getState())) {

View file

@ -33,6 +33,8 @@ class TaskResult {
// List of childShards of the current shard. This field is only required for the task result when we reach end of a shard. // List of childShards of the current shard. This field is only required for the task result when we reach end of a shard.
private List<ChildShard> childShards; private List<ChildShard> childShards;
private boolean leaseNotFound;
/** /**
* @return the shardEndReached * @return the shardEndReached
*/ */
@ -57,6 +59,14 @@ class TaskResult {
*/ */
protected void setChildShards(List<ChildShard> childShards) { this.childShards = childShards; } protected void setChildShards(List<ChildShard> childShards) { this.childShards = childShards; }
public boolean isLeaseNotFound() {
return leaseNotFound;
}
public void leaseNotFound() {
this.leaseNotFound = true;
}
/** /**
* @return the exception * @return the exception
*/ */