From 800d24309de7326bb3acc3bcad2b5d6e137c68f6 Mon Sep 17 00:00:00 2001 From: Chunxue Yang Date: Thu, 10 Oct 2019 22:06:55 -0700 Subject: [PATCH] Force Lease to be lost before shutting down with Zombi state --- .../amazon/kinesis/coordinator/Scheduler.java | 2 +- .../kinesis/lifecycle/ConsumerStates.java | 4 ++-- .../lifecycle/ShardConsumerArgument.java | 3 ++- .../kinesis/lifecycle/ShutdownTask.java | 20 +++++++++++++++++-- .../kinesis/lifecycle/ConsumerStatesTest.java | 10 ++++++++-- .../kinesis/lifecycle/ShutdownTaskTest.java | 15 ++++++++++---- 6 files changed, 42 insertions(+), 12 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index 0f26c3d6..fd8dfcb1 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -588,7 +588,7 @@ public class Scheduler implements Runnable { checkpoint); ShardConsumerArgument argument = new ShardConsumerArgument(shardInfo, streamName, - leaseRefresher, + leaseCoordinator, executorService, cache, shardRecordProcessorFactory.shardRecordProcessor(), diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java index df7ea6aa..bb1788b2 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java @@ -135,7 +135,7 @@ class ConsumerStates { @Override public ConsumerTask createTask(ShardConsumerArgument consumerArgument, ShardConsumer consumer, ProcessRecordsInput input) { return new BlockOnParentShardTask(consumerArgument.shardInfo(), - consumerArgument.leaseRefresher(), + consumerArgument.leaseCoordinator().leaseRefresher(), consumerArgument.parentShardPollIntervalMillis()); } @@ -492,7 +492,7 @@ class ConsumerStates { argument.initialPositionInStream(), argument.cleanupLeasesOfCompletedShards(), argument.ignoreUnexpectedChildShards(), - argument.leaseRefresher(), + argument.leaseCoordinator(), argument.taskBackoffTimeMillis(), argument.recordsPublisher(), argument.hierarchicalShardSyncer(), diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java index fcc49f63..4f1db733 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java @@ -21,6 +21,7 @@ import lombok.experimental.Accessors; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardInfo; @@ -42,7 +43,7 @@ public class ShardConsumerArgument { @NonNull private final String streamName; @NonNull - private final LeaseRefresher leaseRefresher; + private final LeaseCoordinator leaseCoordinator; @NonNull private final ExecutorService executorService; @NonNull diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index 1f9ed9db..d8f30f2f 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -25,6 +25,8 @@ import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.leases.Lease; +import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardInfo; @@ -40,6 +42,7 @@ import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import java.util.ArrayList; +import java.util.Collection; import java.util.List; /** @@ -67,7 +70,7 @@ public class ShutdownTask implements ConsumerTask { private final boolean cleanupLeasesOfCompletedShards; private final boolean ignoreUnexpectedChildShards; @NonNull - private final LeaseRefresher leaseRefresher; + private final LeaseCoordinator leaseCoordinator; private final long backoffTimeMillis; @NonNull private final RecordsPublisher recordsPublisher; @@ -106,6 +109,8 @@ public class ShutdownTask implements ConsumerTask { if (!CollectionUtils.isNullOrEmpty(allShards) && !validateShardEnd(allShards)) { localReason = ShutdownReason.LEASE_LOST; + forceLoseLease(); + log.debug("Force the lease to be lost before shutting down the consumer."); } } @@ -147,7 +152,7 @@ public class ShutdownTask implements ConsumerTask { if (localReason == ShutdownReason.SHARD_END) { log.debug("Looking for child shards of shard {}", shardInfo.shardId()); // create leases for the child shards - hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(allShards, shardDetector, leaseRefresher, + hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(allShards, shardDetector, leaseCoordinator.leaseRefresher(), initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope); log.debug("Finished checking for child shards of shard {}", shardInfo.shardId()); } @@ -204,4 +209,15 @@ public class ShutdownTask implements ConsumerTask { || shard.adjacentParentShardId() != null && shard.adjacentParentShardId().equals(shardInfo.shardId())); } + private void forceLoseLease() { + Collection leases = leaseCoordinator.getAssignments(); + if(leases != null && !leases.isEmpty()) { + for(Lease lease : leases) { + if(lease.leaseKey().equals(shardInfo.shardId())) { + leaseCoordinator.dropLease(lease); + } + } + } + } + } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java index f9287701..16f5e9a4 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java @@ -43,6 +43,7 @@ import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardInfo; @@ -55,6 +56,8 @@ import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.retrieval.AggregatorUtil; import software.amazon.kinesis.retrieval.RecordsPublisher; +import javax.swing.*; + @RunWith(MockitoJUnitRunner.class) public class ConsumerStatesTest { private static final String STREAM_NAME = "TestStream"; @@ -73,6 +76,8 @@ public class ConsumerStatesTest { @Mock private ShardInfo shardInfo; @Mock + private LeaseCoordinator leaseCoordinator; + @Mock private LeaseRefresher leaseRefresher; @Mock private Checkpointer checkpointer; @@ -109,7 +114,7 @@ public class ConsumerStatesTest { @Before public void setup() { - argument = new ShardConsumerArgument(shardInfo, STREAM_NAME, leaseRefresher, executorService, recordsPublisher, + argument = new ShardConsumerArgument(shardInfo, STREAM_NAME, leaseCoordinator, executorService, recordsPublisher, shardRecordProcessor, checkpointer, recordProcessorCheckpointer, parentShardPollIntervalMillis, taskBackoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, listShardsBackoffTimeInMillis, maxListShardsRetryAttempts, shouldCallProcessRecordsEvenForEmptyRecordList, idleTimeInMillis, @@ -127,6 +132,7 @@ public class ConsumerStatesTest { @Test public void blockOnParentStateTest() { ConsumerState state = ShardConsumerState.WAITING_ON_PARENT_SHARDS.consumerState(); + when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher); ConsumerTask task = state.createTask(argument, consumer, null); @@ -309,7 +315,7 @@ public class ConsumerStatesTest { assertThat(task, shutdownTask(ShardRecordProcessorCheckpointer.class, "recordProcessorCheckpointer", equalTo(recordProcessorCheckpointer))); assertThat(task, shutdownTask(ShutdownReason.class, "reason", equalTo(reason))); - assertThat(task, shutdownTask(LEASE_REFRESHER_CLASS, "leaseRefresher", equalTo(leaseRefresher))); + assertThat(task, shutdownTask(LeaseCoordinator.class, "leaseCoordinator", equalTo(leaseCoordinator))); assertThat(task, shutdownTask(InitialPositionInStreamExtended.class, "initialPositionInStream", equalTo(initialPositionInStream))); assertThat(task, diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java index 8a62bb6c..597c9ae8 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java @@ -42,6 +42,7 @@ import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException; import software.amazon.kinesis.leases.HierarchicalShardSyncer; +import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardInfo; @@ -83,6 +84,8 @@ public class ShutdownTaskTest { @Mock private LeaseRefresher leaseRefresher; @Mock + private LeaseCoordinator leaseCoordinator; + @Mock private ShardDetector shardDetector; @Mock private HierarchicalShardSyncer hierarchicalShardSyncer; @@ -99,7 +102,7 @@ public class ShutdownTaskTest { task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer, SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, - ignoreUnexpectedChildShards, leaseRefresher, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, + ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, hierarchicalShardSyncer, NULL_METRICS_FACTORY); } @@ -124,6 +127,7 @@ public class ShutdownTaskTest { List shards = constructShardListGraphA(); when(shardDetector.listShards()).thenReturn(shards); when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END); + when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher); doAnswer((invocation) -> { throw new KinesisClientLibIOException("KinesisClientLibIOException"); @@ -149,7 +153,7 @@ public class ShutdownTaskTest { ExtendedSequenceNumber.LATEST); task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer, SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, - ignoreUnexpectedChildShards, leaseRefresher, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, + ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, hierarchicalShardSyncer, NULL_METRICS_FACTORY); when(shardDetector.listShards()).thenReturn(constructShardListGraphA()); @@ -161,6 +165,7 @@ public class ShutdownTaskTest { verify(shardRecordProcessor).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build()); verify(shardRecordProcessor, never()).leaseLost(LeaseLostInput.builder().build()); verify(shardDetector, times(1)).listShards(); + verify(leaseCoordinator, never()).getAssignments(); } /** @@ -172,7 +177,7 @@ public class ShutdownTaskTest { ExtendedSequenceNumber.LATEST); task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer, SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, - ignoreUnexpectedChildShards, leaseRefresher, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, + ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, hierarchicalShardSyncer, NULL_METRICS_FACTORY); when(shardDetector.listShards()).thenReturn(constructShardListGraphA()); @@ -183,6 +188,7 @@ public class ShutdownTaskTest { verify(shardRecordProcessor, never()).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build()); verify(shardRecordProcessor).leaseLost(LeaseLostInput.builder().build()); verify(shardDetector, times(1)).listShards(); + verify(leaseCoordinator).getAssignments(); } /** @@ -194,7 +200,7 @@ public class ShutdownTaskTest { ExtendedSequenceNumber.LATEST); task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer, LEASE_LOST_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, - ignoreUnexpectedChildShards, leaseRefresher, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, + ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, hierarchicalShardSyncer, NULL_METRICS_FACTORY); when(shardDetector.listShards()).thenReturn(constructShardListGraphA()); @@ -205,6 +211,7 @@ public class ShutdownTaskTest { verify(shardRecordProcessor, never()).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build()); verify(shardRecordProcessor).leaseLost(LeaseLostInput.builder().build()); verify(shardDetector, never()).listShards(); + verify(leaseCoordinator, never()).getAssignments(); } /**