Force Lease to be lost before shutting down with Zombi state

This commit is contained in:
Chunxue Yang 2019-10-10 22:06:55 -07:00
parent afd7742c70
commit 800d24309d
6 changed files with 42 additions and 12 deletions

View file

@ -588,7 +588,7 @@ public class Scheduler implements Runnable {
checkpoint);
ShardConsumerArgument argument = new ShardConsumerArgument(shardInfo,
streamName,
leaseRefresher,
leaseCoordinator,
executorService,
cache,
shardRecordProcessorFactory.shardRecordProcessor(),

View file

@ -135,7 +135,7 @@ class ConsumerStates {
@Override
public ConsumerTask createTask(ShardConsumerArgument consumerArgument, ShardConsumer consumer, ProcessRecordsInput input) {
return new BlockOnParentShardTask(consumerArgument.shardInfo(),
consumerArgument.leaseRefresher(),
consumerArgument.leaseCoordinator().leaseRefresher(),
consumerArgument.parentShardPollIntervalMillis());
}
@ -492,7 +492,7 @@ class ConsumerStates {
argument.initialPositionInStream(),
argument.cleanupLeasesOfCompletedShards(),
argument.ignoreUnexpectedChildShards(),
argument.leaseRefresher(),
argument.leaseCoordinator(),
argument.taskBackoffTimeMillis(),
argument.recordsPublisher(),
argument.hierarchicalShardSyncer(),

View file

@ -21,6 +21,7 @@ import lombok.experimental.Accessors;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardInfo;
@ -42,7 +43,7 @@ public class ShardConsumerArgument {
@NonNull
private final String streamName;
@NonNull
private final LeaseRefresher leaseRefresher;
private final LeaseCoordinator leaseCoordinator;
@NonNull
private final ExecutorService executorService;
@NonNull

View file

@ -25,6 +25,8 @@ import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardInfo;
@ -40,6 +42,7 @@ import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
/**
@ -67,7 +70,7 @@ public class ShutdownTask implements ConsumerTask {
private final boolean cleanupLeasesOfCompletedShards;
private final boolean ignoreUnexpectedChildShards;
@NonNull
private final LeaseRefresher leaseRefresher;
private final LeaseCoordinator leaseCoordinator;
private final long backoffTimeMillis;
@NonNull
private final RecordsPublisher recordsPublisher;
@ -106,6 +109,8 @@ public class ShutdownTask implements ConsumerTask {
if (!CollectionUtils.isNullOrEmpty(allShards) && !validateShardEnd(allShards)) {
localReason = ShutdownReason.LEASE_LOST;
forceLoseLease();
log.debug("Force the lease to be lost before shutting down the consumer.");
}
}
@ -147,7 +152,7 @@ public class ShutdownTask implements ConsumerTask {
if (localReason == ShutdownReason.SHARD_END) {
log.debug("Looking for child shards of shard {}", shardInfo.shardId());
// create leases for the child shards
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(allShards, shardDetector, leaseRefresher,
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(allShards, shardDetector, leaseCoordinator.leaseRefresher(),
initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope);
log.debug("Finished checking for child shards of shard {}", shardInfo.shardId());
}
@ -204,4 +209,15 @@ public class ShutdownTask implements ConsumerTask {
|| shard.adjacentParentShardId() != null && shard.adjacentParentShardId().equals(shardInfo.shardId()));
}
private void forceLoseLease() {
Collection<Lease> leases = leaseCoordinator.getAssignments();
if(leases != null && !leases.isEmpty()) {
for(Lease lease : leases) {
if(lease.leaseKey().equals(shardInfo.shardId())) {
leaseCoordinator.dropLease(lease);
}
}
}
}
}

View file

@ -43,6 +43,7 @@ import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardInfo;
@ -55,6 +56,8 @@ import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.retrieval.AggregatorUtil;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import javax.swing.*;
@RunWith(MockitoJUnitRunner.class)
public class ConsumerStatesTest {
private static final String STREAM_NAME = "TestStream";
@ -73,6 +76,8 @@ public class ConsumerStatesTest {
@Mock
private ShardInfo shardInfo;
@Mock
private LeaseCoordinator leaseCoordinator;
@Mock
private LeaseRefresher leaseRefresher;
@Mock
private Checkpointer checkpointer;
@ -109,7 +114,7 @@ public class ConsumerStatesTest {
@Before
public void setup() {
argument = new ShardConsumerArgument(shardInfo, STREAM_NAME, leaseRefresher, executorService, recordsPublisher,
argument = new ShardConsumerArgument(shardInfo, STREAM_NAME, leaseCoordinator, executorService, recordsPublisher,
shardRecordProcessor, checkpointer, recordProcessorCheckpointer, parentShardPollIntervalMillis,
taskBackoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, listShardsBackoffTimeInMillis,
maxListShardsRetryAttempts, shouldCallProcessRecordsEvenForEmptyRecordList, idleTimeInMillis,
@ -127,6 +132,7 @@ public class ConsumerStatesTest {
@Test
public void blockOnParentStateTest() {
ConsumerState state = ShardConsumerState.WAITING_ON_PARENT_SHARDS.consumerState();
when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher);
ConsumerTask task = state.createTask(argument, consumer, null);
@ -309,7 +315,7 @@ public class ConsumerStatesTest {
assertThat(task, shutdownTask(ShardRecordProcessorCheckpointer.class, "recordProcessorCheckpointer",
equalTo(recordProcessorCheckpointer)));
assertThat(task, shutdownTask(ShutdownReason.class, "reason", equalTo(reason)));
assertThat(task, shutdownTask(LEASE_REFRESHER_CLASS, "leaseRefresher", equalTo(leaseRefresher)));
assertThat(task, shutdownTask(LeaseCoordinator.class, "leaseCoordinator", equalTo(leaseCoordinator)));
assertThat(task, shutdownTask(InitialPositionInStreamExtended.class, "initialPositionInStream",
equalTo(initialPositionInStream)));
assertThat(task,

View file

@ -42,6 +42,7 @@ import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardInfo;
@ -83,6 +84,8 @@ public class ShutdownTaskTest {
@Mock
private LeaseRefresher leaseRefresher;
@Mock
private LeaseCoordinator leaseCoordinator;
@Mock
private ShardDetector shardDetector;
@Mock
private HierarchicalShardSyncer hierarchicalShardSyncer;
@ -99,7 +102,7 @@ public class ShutdownTaskTest {
task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer,
SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards, leaseRefresher, TASK_BACKOFF_TIME_MILLIS, recordsPublisher,
ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher,
hierarchicalShardSyncer, NULL_METRICS_FACTORY);
}
@ -124,6 +127,7 @@ public class ShutdownTaskTest {
List<Shard> shards = constructShardListGraphA();
when(shardDetector.listShards()).thenReturn(shards);
when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END);
when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher);
doAnswer((invocation) -> {
throw new KinesisClientLibIOException("KinesisClientLibIOException");
@ -149,7 +153,7 @@ public class ShutdownTaskTest {
ExtendedSequenceNumber.LATEST);
task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer,
SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards, leaseRefresher, TASK_BACKOFF_TIME_MILLIS, recordsPublisher,
ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher,
hierarchicalShardSyncer, NULL_METRICS_FACTORY);
when(shardDetector.listShards()).thenReturn(constructShardListGraphA());
@ -161,6 +165,7 @@ public class ShutdownTaskTest {
verify(shardRecordProcessor).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build());
verify(shardRecordProcessor, never()).leaseLost(LeaseLostInput.builder().build());
verify(shardDetector, times(1)).listShards();
verify(leaseCoordinator, never()).getAssignments();
}
/**
@ -172,7 +177,7 @@ public class ShutdownTaskTest {
ExtendedSequenceNumber.LATEST);
task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer,
SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards, leaseRefresher, TASK_BACKOFF_TIME_MILLIS, recordsPublisher,
ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher,
hierarchicalShardSyncer, NULL_METRICS_FACTORY);
when(shardDetector.listShards()).thenReturn(constructShardListGraphA());
@ -183,6 +188,7 @@ public class ShutdownTaskTest {
verify(shardRecordProcessor, never()).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build());
verify(shardRecordProcessor).leaseLost(LeaseLostInput.builder().build());
verify(shardDetector, times(1)).listShards();
verify(leaseCoordinator).getAssignments();
}
/**
@ -194,7 +200,7 @@ public class ShutdownTaskTest {
ExtendedSequenceNumber.LATEST);
task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer,
LEASE_LOST_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards, leaseRefresher, TASK_BACKOFF_TIME_MILLIS, recordsPublisher,
ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher,
hierarchicalShardSyncer, NULL_METRICS_FACTORY);
when(shardDetector.listShards()).thenReturn(constructShardListGraphA());
@ -205,6 +211,7 @@ public class ShutdownTaskTest {
verify(shardRecordProcessor, never()).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build());
verify(shardRecordProcessor).leaseLost(LeaseLostInput.builder().build());
verify(shardDetector, never()).listShards();
verify(leaseCoordinator, never()).getAssignments();
}
/**