From 5bf24bda431a1b902210b00f3a74de232138214e Mon Sep 17 00:00:00 2001 From: Chunxue Yang Date: Wed, 15 Jul 2020 12:33:53 -0700 Subject: [PATCH] fix for premature childShard lease creation --- .../lib/worker/ShutdownTask.java | 114 ++++++++------- .../lib/worker/ShutdownTaskTest.java | 131 ++++++++++++++++-- 2 files changed, 188 insertions(+), 57 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java index 71cf3b9d..ed9de7e2 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java @@ -14,9 +14,11 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.BlockedOnParentShardException; import com.amazonaws.services.kinesis.leases.exceptions.DependencyException; import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException; +import com.amazonaws.services.kinesis.leases.impl.UpdateField; import com.amazonaws.services.kinesis.model.ChildShard; import com.amazonaws.util.CollectionUtils; import org.apache.commons.logging.Log; @@ -32,8 +34,8 @@ import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; import com.google.common.annotations.VisibleForTesting; import java.util.List; +import java.util.Objects; import java.util.Set; -import java.util.UUID; import java.util.stream.Collectors; /** @@ -44,6 +46,7 @@ class ShutdownTask implements ITask { private static final Log LOG = LogFactory.getLog(ShutdownTask.class); private static final String RECORD_PROCESSOR_SHUTDOWN_METRIC = "RecordProcessor.shutdown"; + private int retryLeftForValidParentState = 10; private final ShardInfo shardInfo; private final IRecordProcessor recordProcessor; @@ -107,43 +110,8 @@ class ShutdownTask implements ITask { try { LOG.info("Invoking shutdown() for shard " + shardInfo.getShardId() + ", concurrencyToken: " + shardInfo.getConcurrencyToken() + ", original Shutdown reason: " + reason + ". childShards:" + childShards); - ShutdownReason localReason = reason; - /* - * Revalidate if the current shard is closed before shutting down the shard consumer with reason SHARD_END - * If current shard is not closed, shut down the shard consumer with reason LEASE_LOST that allows active - * workers to contend for the lease of this shard. - */ - if(localReason == ShutdownReason.TERMINATE) { - // Create new lease for the child shards if they don't exist. - // We have one valid scenario that shutdown task got created with SHARD_END reason and an empty list of childShards. - // This would happen when KinesisDataFetcher catches ResourceNotFound exception. - // In this case, KinesisDataFetcher will send out SHARD_END signal to trigger a shutdown task with empty list of childShards. - // This scenario could happen when customer deletes the stream while leaving the KCL application running. - try { - if (!CollectionUtils.isNullOrEmpty(childShards)) { - createLeasesForChildShardsIfNotExist(); - updateCurrentLeaseWithChildShards(); - } else { - LOG.warn("Shard " + shardInfo.getShardId() - + ": Shutting down consumer with SHARD_END reason without creating leases for child shards."); - } - } catch (InvalidStateException e) { - // If invalidStateException happens, it indicates we are missing childShard related information. - // In this scenario, we should shutdown the shardConsumer with ZOMBIE reason to allow other worker to take the lease and retry getting - // childShard information in the processTask. - localReason = ShutdownReason.ZOMBIE; - dropLease(); - LOG.warn("Shard " + shardInfo.getShardId() + ": Exception happened while shutting down shardConsumer with TERMINATE reason. " + - "Dropping the lease and shutting down shardConsumer using ZOMBIE reason. Exception: ", e); - } - } - // If we reached end of the shard, set sequence number to SHARD_END. - if (localReason == ShutdownReason.TERMINATE) { - recordProcessorCheckpointer.setSequenceNumberAtShardEnd( - recordProcessorCheckpointer.getLargestPermittedCheckpointValue()); - recordProcessorCheckpointer.setLargestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END); - } + ShutdownReason localReason = attemptPersistingChildShardInfoAndOverrideShutdownReasonOnFailure(reason); final ShutdownInput shutdownInput = new ShutdownInput() .withShutdownReason(localReason) @@ -191,7 +159,67 @@ class ShutdownTask implements ITask { return new TaskResult(exception); } + private ShutdownReason attemptPersistingChildShardInfoAndOverrideShutdownReasonOnFailure(ShutdownReason originalReason) + throws DependencyException, ProvisionedThroughputException { + ShutdownReason shutdownReason = originalReason; + if(originalReason == ShutdownReason.TERMINATE) { + // For TERMINATE shutdown reason, try to create and persist childShard leases before setting checkpoint. + try { + final KinesisClientLease currentLease = leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId()); + if (currentLease == null) { + throw new InvalidStateException(shardInfo.getShardId() + + " : Lease not owned by the current worker. Leaving ShardEnd handling to new owner."); + } + if (!CollectionUtils.isNullOrEmpty(childShards)) { + createLeasesForChildShardsIfNotExist(); + updateCurrentLeaseWithChildShards(currentLease); + recordProcessorCheckpointer.setSequenceNumberAtShardEnd( + recordProcessorCheckpointer.getLargestPermittedCheckpointValue()); + recordProcessorCheckpointer.setLargestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END); + } else { + LOG.warn("Shard " + shardInfo.getShardId() + + ": Shutting down consumer with SHARD_END reason without creating leases for child shards."); + } + } catch (InvalidStateException e) { + // If invalidStateException happens, it indicates we are missing childShard related information. + // In this scenario, we should shutdown the shardConsumer with ZOMBIE reason to allow other worker to take the lease and retry getting + // childShard information in the processTask. + shutdownReason = ShutdownReason.ZOMBIE; + dropLease(); + LOG.warn("Shard " + shardInfo.getShardId() + ": Exception happened while shutting down shardConsumer with TERMINATE reason. " + + "Dropping the lease and shutting down shardConsumer using ZOMBIE reason. Exception: ", e); + } + + } + return shutdownReason; + } + private void createLeasesForChildShardsIfNotExist() throws InvalidStateException, DependencyException, ProvisionedThroughputException { + // For child shard resulted from merge of two parent shards, verify if both the parents are either present or + // not present in the lease table before creating the lease entry. + if (!CollectionUtils.isNullOrEmpty(childShards) && childShards.size() == 1) { + final ChildShard childShard = childShards.get(0); + final List parentLeaseKeys = childShard.getParentShards(); + + if (parentLeaseKeys.size() != 2) { + throw new InvalidStateException("Shard " + shardInfo.getShardId()+ "'s only child shard " + childShard + + " does not contain other parent information."); + } else { + boolean isValidLeaseTableState = Objects.isNull(leaseCoordinator.getLeaseManager().getLease(parentLeaseKeys.get(0))) == + Objects.isNull(leaseCoordinator.getLeaseManager().getLease(parentLeaseKeys.get(1))); + if (!isValidLeaseTableState) { + if(--retryLeftForValidParentState >= 0) { + throw new BlockedOnParentShardException( + "Shard " + shardInfo.getShardId() + "'s only child shard " + childShard + + " has partial parent information in lease table. Hence deferring lease creation of child shard."); + } else { + throw new InvalidStateException("Shard " + shardInfo.getShardId() + "'s only child shard " + childShard + + " has partial parent information in lease table."); + } + } + } + } + // Attempt create leases for child shards. for (ChildShard childShard : childShards) { final String leaseKey = childShard.getShardId(); if (leaseCoordinator.getLeaseManager().getLease(leaseKey) == null) { @@ -202,18 +230,10 @@ class ShutdownTask implements ITask { } } - private void updateCurrentLeaseWithChildShards() throws DependencyException, InvalidStateException, ProvisionedThroughputException { - final KinesisClientLease currentLease = leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId()); - if (currentLease == null) { - throw new InvalidStateException("Failed to retrieve current lease for shard " + shardInfo.getShardId()); - } + private void updateCurrentLeaseWithChildShards(KinesisClientLease currentLease) throws DependencyException, InvalidStateException, ProvisionedThroughputException { final Set childShardIds = childShards.stream().map(ChildShard::getShardId).collect(Collectors.toSet()); - currentLease.setChildShardIds(childShardIds); - final boolean updateResult = leaseCoordinator.updateLease(currentLease, UUID.fromString(shardInfo.getConcurrencyToken())); - if (!updateResult) { - throw new InvalidStateException("Failed to update parent lease with child shard information for shard " + shardInfo.getShardId()); - } + leaseCoordinator.getLeaseManager().updateLeaseWithMetaInfo(currentLease, UpdateField.CHILD_SHARDS); LOG.info("Shard " + shardInfo.getShardId() + ": Updated current lease with child shard information: " + currentLease.getLeaseKey()); } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java index cbfdf54a..6400b839 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java @@ -14,6 +14,9 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +import static junit.framework.TestCase.assertTrue; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doNothing; @@ -31,9 +34,11 @@ import java.util.List; import java.util.Set; import java.util.UUID; +import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.BlockedOnParentShardException; import com.amazonaws.services.kinesis.clientlibrary.proxies.ShardListWrappingShardClosureVerificationResponse; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException; +import com.amazonaws.services.kinesis.leases.impl.UpdateField; import com.amazonaws.services.kinesis.model.ChildShard; import com.amazonaws.services.kinesis.model.HashKeyRange; import com.amazonaws.services.kinesis.model.SequenceNumberRange; @@ -72,7 +77,6 @@ public class ShutdownTaskTest { defaultConcurrencyToken, defaultParentShardIds, ExtendedSequenceNumber.LATEST); - IRecordProcessor defaultRecordProcessor = new TestStreamlet(); ShardSyncer shardSyncer = new KinesisShardSyncer(new KinesisLeaseCleanupValidator()); @@ -86,6 +90,8 @@ public class ShutdownTaskTest { private ILeaseManager leaseManager; @Mock private KinesisClientLibLeaseCoordinator leaseCoordinator; + @Mock + private IRecordProcessor defaultRecordProcessor; /** * @throws java.lang.Exception @@ -110,7 +116,6 @@ public class ShutdownTaskTest { final KinesisClientLease parentLease = createLease(defaultShardId, "leaseOwner", Collections.emptyList()); when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); when(leaseCoordinator.getCurrentlyHeldLease(defaultShardId)).thenReturn(parentLease); - when(leaseCoordinator.updateLease(any(KinesisClientLease.class), any(UUID.class))).thenReturn(true); } /** @@ -143,9 +148,9 @@ public class ShutdownTaskTest { getRecordsCache, shardSyncer, shardSyncStrategy, - constructChildShards()); + constructSplitChildShards()); TaskResult result = task.call(); - Assert.assertNotNull(result.getException()); + assertNotNull(result.getException()); Assert.assertTrue(result.getException() instanceof IllegalArgumentException); final String expectedExceptionMessage = "Application didn't checkpoint at end of shard shardId-0. " + "Application must checkpoint upon shutdown. See IRecordProcessor.shutdown javadocs for more information."; @@ -178,13 +183,104 @@ public class ShutdownTaskTest { getRecordsCache, shardSyncer, shardSyncStrategy, - constructChildShards()); + constructSplitChildShards()); TaskResult result = task.call(); verify(getRecordsCache).shutdown(); verify(leaseCoordinator).dropLease(any(KinesisClientLease.class)); Assert.assertNull(result.getException()); } + @Test + public final void testCallWhenParentInfoNotPresentInLease() throws Exception { + RecordProcessorCheckpointer checkpointer = mock(RecordProcessorCheckpointer.class); + when(checkpointer.getLastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END); + when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); + boolean cleanupLeasesOfCompletedShards = false; + boolean ignoreUnexpectedChildShards = false; + + KinesisClientLease currentLease = createLease(defaultShardId, "leaseOwner", Collections.emptyList()); + KinesisClientLease adjacentParentLease = createLease("ShardId-1", "leaseOwner", Collections.emptyList()); + when(leaseCoordinator.getCurrentlyHeldLease(defaultShardId)).thenReturn( currentLease); + when(leaseManager.getLease(defaultShardId)).thenReturn(currentLease); + when(leaseManager.getLease("ShardId-1")).thenReturn(null, null, null, null, null, adjacentParentLease); + + ShutdownTask task = new ShutdownTask(defaultShardInfo, + defaultRecordProcessor, + checkpointer, + ShutdownReason.TERMINATE, + kinesisProxy, + INITIAL_POSITION_TRIM_HORIZON, + cleanupLeasesOfCompletedShards, + ignoreUnexpectedChildShards, + leaseCoordinator, + TASK_BACKOFF_TIME_MILLIS, + getRecordsCache, + shardSyncer, + shardSyncStrategy, + constructMergeChildShards()); + + // Make first 5 attempts with partial parent info in lease table + for (int i = 0; i < 5; i++) { + TaskResult result = task.call(); + assertNotNull(result.getException()); + assertTrue(result.getException() instanceof BlockedOnParentShardException); + assertTrue(result.getException().getMessage().contains("has partial parent information in lease table")); + verify(getRecordsCache, never()).shutdown(); + verify(defaultRecordProcessor, never()).shutdown(any(ShutdownInput.class)); + } + + // Make next attempt with complete parent info in lease table + TaskResult result = task.call(); + assertNull(result.getException()); + verify(getRecordsCache).shutdown(); + verify(defaultRecordProcessor).shutdown(any(ShutdownInput.class)); + verify(leaseCoordinator, never()).dropLease(currentLease); + } + + @Test + public final void testCallTriggersLeaseLossWhenParentInfoNotPresentInLease() throws Exception { + RecordProcessorCheckpointer checkpointer = mock(RecordProcessorCheckpointer.class); + when(checkpointer.getLastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END); + when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); + boolean cleanupLeasesOfCompletedShards = false; + boolean ignoreUnexpectedChildShards = false; + + KinesisClientLease currentLease = createLease(defaultShardId, "leaseOwner", Collections.emptyList()); + when(leaseCoordinator.getCurrentlyHeldLease(defaultShardId)).thenReturn( currentLease); + when(leaseManager.getLease(defaultShardId)).thenReturn(currentLease); + when(leaseManager.getLease("ShardId-1")).thenReturn(null, null, null, null, null, null, null, null, null, null, null); + + ShutdownTask task = new ShutdownTask(defaultShardInfo, + defaultRecordProcessor, + checkpointer, + ShutdownReason.TERMINATE, + kinesisProxy, + INITIAL_POSITION_TRIM_HORIZON, + cleanupLeasesOfCompletedShards, + ignoreUnexpectedChildShards, + leaseCoordinator, + TASK_BACKOFF_TIME_MILLIS, + getRecordsCache, + shardSyncer, + shardSyncStrategy, + constructMergeChildShards()); + + for (int i = 0; i < 10; i++) { + TaskResult result = task.call(); + assertNotNull(result.getException()); + assertTrue(result.getException() instanceof BlockedOnParentShardException); + assertTrue(result.getException().getMessage().contains("has partial parent information in lease table")); + verify(getRecordsCache, never()).shutdown(); + verify(defaultRecordProcessor, never()).shutdown(any(ShutdownInput.class)); + } + + TaskResult result = task.call(); + assertNull(result.getException()); + verify(getRecordsCache).shutdown(); + verify(defaultRecordProcessor).shutdown(any(ShutdownInput.class)); + verify(leaseCoordinator).dropLease(currentLease); + } + @Test public final void testCallWhenShardEnd() throws Exception { RecordProcessorCheckpointer checkpointer = mock(RecordProcessorCheckpointer.class); @@ -205,10 +301,10 @@ public class ShutdownTaskTest { getRecordsCache, shardSyncer, shardSyncStrategy, - constructChildShards()); + constructSplitChildShards()); TaskResult result = task.call(); verify(leaseManager, times(2)).createLeaseIfNotExists(any(KinesisClientLease.class)); - verify(leaseCoordinator).updateLease(any(KinesisClientLease.class), any(UUID.class)); + verify(leaseManager).updateLeaseWithMetaInfo(any(KinesisClientLease.class), any(UpdateField.class)); Assert.assertNull(result.getException()); verify(getRecordsCache).shutdown(); } @@ -241,7 +337,7 @@ public class ShutdownTaskTest { Collections.emptyList()); TaskResult result = task.call(); verify(leaseManager, never()).createLeaseIfNotExists(any(KinesisClientLease.class)); - verify(leaseCoordinator, never()).updateLease(any(KinesisClientLease.class), any(UUID.class)); + verify(leaseManager, never()).updateLeaseWithMetaInfo(any(KinesisClientLease.class), any(UpdateField.class)); Assert.assertNull(result.getException()); verify(getRecordsCache).shutdown(); } @@ -270,7 +366,7 @@ public class ShutdownTaskTest { Collections.emptyList()); TaskResult result = task.call(); verify(leaseManager, never()).createLeaseIfNotExists(any(KinesisClientLease.class)); - verify(leaseCoordinator, never()).updateLease(any(KinesisClientLease.class), any(UUID.class)); + verify(leaseManager, never()).updateLeaseWithMetaInfo(any(KinesisClientLease.class), any(UpdateField.class)); Assert.assertNull(result.getException()); verify(getRecordsCache).shutdown(); } @@ -288,7 +384,7 @@ public class ShutdownTaskTest { Assert.assertEquals(TaskType.SHUTDOWN, task.getTaskType()); } - private List constructChildShards() { + private List constructSplitChildShards() { List childShards = new ArrayList<>(); List parentShards = new ArrayList<>(); parentShards.add(defaultShardId); @@ -307,6 +403,21 @@ public class ShutdownTaskTest { return childShards; } + private List constructMergeChildShards() { + List childShards = new ArrayList<>(); + List parentShards = new ArrayList<>(); + parentShards.add(defaultShardId); + parentShards.add("ShardId-1"); + + ChildShard childShard = new ChildShard(); + childShard.setShardId("ShardId-2"); + childShard.setParentShards(parentShards); + childShard.setHashKeyRange(ShardObjectHelper.newHashKeyRange("0", "99")); + childShards.add(childShard); + + return childShards; + } + private KinesisClientLease createLease(String leaseKey, String leaseOwner, Collection parentShardIds) { KinesisClientLease lease = new KinesisClientLease(); lease.setLeaseKey(leaseKey);