Merge pull request #781 from awslabs/dead_shard_initialize_cycle_fix
Fixing record processor processing deleted leases in cycle
This commit is contained in:
commit
539550b664
6 changed files with 78 additions and 7 deletions
|
|
@ -14,6 +14,7 @@
|
||||||
*/
|
*/
|
||||||
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||||
|
|
||||||
|
import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibNonRetryableException;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
|
|
@ -79,7 +80,15 @@ class InitializeTask implements ITask {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
LOG.debug("Initializing ShardId " + shardInfo.getShardId());
|
LOG.debug("Initializing ShardId " + shardInfo.getShardId());
|
||||||
Checkpoint initialCheckpointObject = checkpoint.getCheckpointObject(shardInfo.getShardId());
|
Checkpoint initialCheckpointObject;
|
||||||
|
try {
|
||||||
|
initialCheckpointObject = checkpoint.getCheckpointObject(shardInfo.getShardId());
|
||||||
|
} catch (KinesisClientLibNonRetryableException e) {
|
||||||
|
LOG.error("Caught exception while fetching checkpoint for " + shardInfo.getShardId(), e);
|
||||||
|
final TaskResult result = new TaskResult(e);
|
||||||
|
result.leaseNotFound();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
ExtendedSequenceNumber initialCheckpoint = initialCheckpointObject.getCheckpoint();
|
ExtendedSequenceNumber initialCheckpoint = initialCheckpointObject.getCheckpoint();
|
||||||
|
|
||||||
dataFetcher.initialize(initialCheckpoint.getSequenceNumber(), streamConfig.getInitialPositionInStream());
|
dataFetcher.initialize(initialCheckpoint.getSequenceNumber(), streamConfig.getInitialPositionInStream());
|
||||||
|
|
|
||||||
|
|
@ -283,7 +283,8 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator<KinesisClientLea
|
||||||
try {
|
try {
|
||||||
KinesisClientLease lease = leaseManager.getLease(shardId);
|
KinesisClientLease lease = leaseManager.getLease(shardId);
|
||||||
if (lease == null) {
|
if (lease == null) {
|
||||||
throw new KinesisClientLibIOException(errorMessage);
|
// This is a KinesisClientLibNonRetryableException
|
||||||
|
throw new com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException(errorMessage);
|
||||||
}
|
}
|
||||||
return new Checkpoint(lease.getCheckpoint(), lease.getPendingCheckpoint());
|
return new Checkpoint(lease.getCheckpoint(), lease.getPendingCheckpoint());
|
||||||
} catch (DependencyException | InvalidStateException | ProvisionedThroughputException e) {
|
} catch (DependencyException | InvalidStateException | ProvisionedThroughputException e) {
|
||||||
|
|
|
||||||
|
|
@ -374,7 +374,7 @@ class ShardConsumer {
|
||||||
}
|
}
|
||||||
|
|
||||||
private enum TaskOutcome {
|
private enum TaskOutcome {
|
||||||
SUCCESSFUL, END_OF_SHARD, NOT_COMPLETE, FAILURE
|
SUCCESSFUL, END_OF_SHARD, NOT_COMPLETE, FAILURE, LEASE_NOT_FOUND
|
||||||
}
|
}
|
||||||
|
|
||||||
private TaskOutcome determineTaskOutcome() {
|
private TaskOutcome determineTaskOutcome() {
|
||||||
|
|
@ -391,6 +391,10 @@ class ShardConsumer {
|
||||||
return TaskOutcome.SUCCESSFUL;
|
return TaskOutcome.SUCCESSFUL;
|
||||||
}
|
}
|
||||||
logTaskException(result);
|
logTaskException(result);
|
||||||
|
// This is the case of result with exception
|
||||||
|
if (result.isLeaseNotFound()) {
|
||||||
|
return TaskOutcome.LEASE_NOT_FOUND;
|
||||||
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
} finally {
|
} finally {
|
||||||
|
|
@ -487,6 +491,10 @@ class ShardConsumer {
|
||||||
markForShutdown(ShutdownReason.TERMINATE);
|
markForShutdown(ShutdownReason.TERMINATE);
|
||||||
LOG.info("Shard " + shardInfo.getShardId() + ": Mark for shutdown with reason TERMINATE");
|
LOG.info("Shard " + shardInfo.getShardId() + ": Mark for shutdown with reason TERMINATE");
|
||||||
}
|
}
|
||||||
|
if (taskOutcome == TaskOutcome.LEASE_NOT_FOUND) {
|
||||||
|
markForShutdown(ShutdownReason.ZOMBIE);
|
||||||
|
LOG.info("Shard " + shardInfo.getShardId() + ": Mark for shutdown with reason ZOMBIE as lease was not found");
|
||||||
|
}
|
||||||
if (isShutdownRequested() && taskOutcome != TaskOutcome.FAILURE) {
|
if (isShutdownRequested() && taskOutcome != TaskOutcome.FAILURE) {
|
||||||
currentState = currentState.shutdownTransition(shutdownReason);
|
currentState = currentState.shutdownTransition(shutdownReason);
|
||||||
} else if (isShutdownRequested() && ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS.equals(currentState.getState())) {
|
} else if (isShutdownRequested() && ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS.equals(currentState.getState())) {
|
||||||
|
|
|
||||||
|
|
@ -33,6 +33,8 @@ class TaskResult {
|
||||||
// List of childShards of the current shard. This field is only required for the task result when we reach end of a shard.
|
// List of childShards of the current shard. This field is only required for the task result when we reach end of a shard.
|
||||||
private List<ChildShard> childShards;
|
private List<ChildShard> childShards;
|
||||||
|
|
||||||
|
private boolean leaseNotFound;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return the shardEndReached
|
* @return the shardEndReached
|
||||||
*/
|
*/
|
||||||
|
|
@ -57,6 +59,14 @@ class TaskResult {
|
||||||
*/
|
*/
|
||||||
protected void setChildShards(List<ChildShard> childShards) { this.childShards = childShards; }
|
protected void setChildShards(List<ChildShard> childShards) { this.childShards = childShards; }
|
||||||
|
|
||||||
|
public boolean isLeaseNotFound() {
|
||||||
|
return leaseNotFound;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void leaseNotFound() {
|
||||||
|
this.leaseNotFound = true;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return the exception
|
* @return the exception
|
||||||
*/
|
*/
|
||||||
|
|
|
||||||
|
|
@ -79,7 +79,7 @@ public class KinesisClientLibLeaseCoordinatorTest {
|
||||||
leaseCoordinator.initialize();
|
leaseCoordinator.initialize();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(expected = KinesisClientLibIOException.class)
|
@Test(expected = com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException.class)
|
||||||
public void testGetCheckpointObjectWithNoLease()
|
public void testGetCheckpointObjectWithNoLease()
|
||||||
throws DependencyException, ProvisionedThroughputException, IllegalStateException, InvalidStateException,
|
throws DependencyException, ProvisionedThroughputException, IllegalStateException, InvalidStateException,
|
||||||
KinesisClientLibException {
|
KinesisClientLibException {
|
||||||
|
|
|
||||||
|
|
@ -54,8 +54,6 @@ import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.RejectedExecutionException;
|
import java.util.concurrent.RejectedExecutionException;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager;
|
|
||||||
import com.amazonaws.services.kinesis.leases.impl.LeaseManager;
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.hamcrest.Description;
|
import org.hamcrest.Description;
|
||||||
|
|
@ -64,7 +62,6 @@ import org.hamcrest.TypeSafeMatcher;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
import org.mockito.InjectMocks;
|
|
||||||
import org.mockito.Mock;
|
import org.mockito.Mock;
|
||||||
import org.mockito.runners.MockitoJUnitRunner;
|
import org.mockito.runners.MockitoJUnitRunner;
|
||||||
|
|
||||||
|
|
@ -246,6 +243,52 @@ public class ShardConsumerTest {
|
||||||
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING)));
|
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInitializationStateTransitionsToShutdownOnLeaseNotFound() throws Exception {
|
||||||
|
ShardInfo shardInfo = new ShardInfo("s-0-0", "testToken", null, ExtendedSequenceNumber.TRIM_HORIZON);
|
||||||
|
|
||||||
|
ICheckpoint checkpoint = new KinesisClientLibLeaseCoordinator(leaseManager, "", 0, 0);
|
||||||
|
|
||||||
|
when(leaseManager.getLease(anyString())).thenReturn(null);
|
||||||
|
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
|
||||||
|
StreamConfig streamConfig =
|
||||||
|
new StreamConfig(streamProxy,
|
||||||
|
1,
|
||||||
|
10,
|
||||||
|
callProcessRecordsForEmptyRecordList,
|
||||||
|
skipCheckpointValidationValue, INITIAL_POSITION_LATEST);
|
||||||
|
|
||||||
|
ShardConsumer consumer =
|
||||||
|
new ShardConsumer(shardInfo,
|
||||||
|
streamConfig,
|
||||||
|
checkpoint,
|
||||||
|
processor,
|
||||||
|
leaseCoordinator,
|
||||||
|
parentShardPollIntervalMillis,
|
||||||
|
cleanupLeasesOfCompletedShards,
|
||||||
|
executorService,
|
||||||
|
metricsFactory,
|
||||||
|
taskBackoffTimeMillis,
|
||||||
|
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
|
||||||
|
config,
|
||||||
|
shardSyncer,
|
||||||
|
shardSyncStrategy);
|
||||||
|
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
|
||||||
|
consumer.consumeShard();
|
||||||
|
Thread.sleep(50L);
|
||||||
|
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
|
||||||
|
consumer.consumeShard();
|
||||||
|
Thread.sleep(50L);
|
||||||
|
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING)));
|
||||||
|
consumer.consumeShard();
|
||||||
|
Thread.sleep(50L);
|
||||||
|
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTTING_DOWN)));
|
||||||
|
consumer.consumeShard();
|
||||||
|
Thread.sleep(50L);
|
||||||
|
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE)));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
@Test
|
@Test
|
||||||
public final void testRecordProcessorThrowable() throws Exception {
|
public final void testRecordProcessorThrowable() throws Exception {
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue