From 5bf24bda431a1b902210b00f3a74de232138214e Mon Sep 17 00:00:00 2001 From: Chunxue Yang Date: Wed, 15 Jul 2020 12:33:53 -0700 Subject: [PATCH 1/7] fix for premature childShard lease creation --- .../lib/worker/ShutdownTask.java | 114 ++++++++------- .../lib/worker/ShutdownTaskTest.java | 131 ++++++++++++++++-- 2 files changed, 188 insertions(+), 57 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java index 71cf3b9d..ed9de7e2 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java @@ -14,9 +14,11 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.BlockedOnParentShardException; import com.amazonaws.services.kinesis.leases.exceptions.DependencyException; import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException; +import com.amazonaws.services.kinesis.leases.impl.UpdateField; import com.amazonaws.services.kinesis.model.ChildShard; import com.amazonaws.util.CollectionUtils; import org.apache.commons.logging.Log; @@ -32,8 +34,8 @@ import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; import com.google.common.annotations.VisibleForTesting; import java.util.List; +import java.util.Objects; import java.util.Set; -import java.util.UUID; import java.util.stream.Collectors; /** @@ -44,6 +46,7 @@ class ShutdownTask implements ITask { private static final Log LOG = LogFactory.getLog(ShutdownTask.class); private static final String RECORD_PROCESSOR_SHUTDOWN_METRIC = "RecordProcessor.shutdown"; + private int retryLeftForValidParentState = 10; private final ShardInfo shardInfo; private final IRecordProcessor recordProcessor; @@ -107,43 +110,8 @@ class ShutdownTask implements ITask { try { LOG.info("Invoking shutdown() for shard " + shardInfo.getShardId() + ", concurrencyToken: " + shardInfo.getConcurrencyToken() + ", original Shutdown reason: " + reason + ". childShards:" + childShards); - ShutdownReason localReason = reason; - /* - * Revalidate if the current shard is closed before shutting down the shard consumer with reason SHARD_END - * If current shard is not closed, shut down the shard consumer with reason LEASE_LOST that allows active - * workers to contend for the lease of this shard. - */ - if(localReason == ShutdownReason.TERMINATE) { - // Create new lease for the child shards if they don't exist. - // We have one valid scenario that shutdown task got created with SHARD_END reason and an empty list of childShards. - // This would happen when KinesisDataFetcher catches ResourceNotFound exception. - // In this case, KinesisDataFetcher will send out SHARD_END signal to trigger a shutdown task with empty list of childShards. - // This scenario could happen when customer deletes the stream while leaving the KCL application running. - try { - if (!CollectionUtils.isNullOrEmpty(childShards)) { - createLeasesForChildShardsIfNotExist(); - updateCurrentLeaseWithChildShards(); - } else { - LOG.warn("Shard " + shardInfo.getShardId() - + ": Shutting down consumer with SHARD_END reason without creating leases for child shards."); - } - } catch (InvalidStateException e) { - // If invalidStateException happens, it indicates we are missing childShard related information. - // In this scenario, we should shutdown the shardConsumer with ZOMBIE reason to allow other worker to take the lease and retry getting - // childShard information in the processTask. - localReason = ShutdownReason.ZOMBIE; - dropLease(); - LOG.warn("Shard " + shardInfo.getShardId() + ": Exception happened while shutting down shardConsumer with TERMINATE reason. " + - "Dropping the lease and shutting down shardConsumer using ZOMBIE reason. Exception: ", e); - } - } - // If we reached end of the shard, set sequence number to SHARD_END. - if (localReason == ShutdownReason.TERMINATE) { - recordProcessorCheckpointer.setSequenceNumberAtShardEnd( - recordProcessorCheckpointer.getLargestPermittedCheckpointValue()); - recordProcessorCheckpointer.setLargestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END); - } + ShutdownReason localReason = attemptPersistingChildShardInfoAndOverrideShutdownReasonOnFailure(reason); final ShutdownInput shutdownInput = new ShutdownInput() .withShutdownReason(localReason) @@ -191,7 +159,67 @@ class ShutdownTask implements ITask { return new TaskResult(exception); } + private ShutdownReason attemptPersistingChildShardInfoAndOverrideShutdownReasonOnFailure(ShutdownReason originalReason) + throws DependencyException, ProvisionedThroughputException { + ShutdownReason shutdownReason = originalReason; + if(originalReason == ShutdownReason.TERMINATE) { + // For TERMINATE shutdown reason, try to create and persist childShard leases before setting checkpoint. + try { + final KinesisClientLease currentLease = leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId()); + if (currentLease == null) { + throw new InvalidStateException(shardInfo.getShardId() + + " : Lease not owned by the current worker. Leaving ShardEnd handling to new owner."); + } + if (!CollectionUtils.isNullOrEmpty(childShards)) { + createLeasesForChildShardsIfNotExist(); + updateCurrentLeaseWithChildShards(currentLease); + recordProcessorCheckpointer.setSequenceNumberAtShardEnd( + recordProcessorCheckpointer.getLargestPermittedCheckpointValue()); + recordProcessorCheckpointer.setLargestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END); + } else { + LOG.warn("Shard " + shardInfo.getShardId() + + ": Shutting down consumer with SHARD_END reason without creating leases for child shards."); + } + } catch (InvalidStateException e) { + // If invalidStateException happens, it indicates we are missing childShard related information. + // In this scenario, we should shutdown the shardConsumer with ZOMBIE reason to allow other worker to take the lease and retry getting + // childShard information in the processTask. + shutdownReason = ShutdownReason.ZOMBIE; + dropLease(); + LOG.warn("Shard " + shardInfo.getShardId() + ": Exception happened while shutting down shardConsumer with TERMINATE reason. " + + "Dropping the lease and shutting down shardConsumer using ZOMBIE reason. Exception: ", e); + } + + } + return shutdownReason; + } + private void createLeasesForChildShardsIfNotExist() throws InvalidStateException, DependencyException, ProvisionedThroughputException { + // For child shard resulted from merge of two parent shards, verify if both the parents are either present or + // not present in the lease table before creating the lease entry. + if (!CollectionUtils.isNullOrEmpty(childShards) && childShards.size() == 1) { + final ChildShard childShard = childShards.get(0); + final List parentLeaseKeys = childShard.getParentShards(); + + if (parentLeaseKeys.size() != 2) { + throw new InvalidStateException("Shard " + shardInfo.getShardId()+ "'s only child shard " + childShard + + " does not contain other parent information."); + } else { + boolean isValidLeaseTableState = Objects.isNull(leaseCoordinator.getLeaseManager().getLease(parentLeaseKeys.get(0))) == + Objects.isNull(leaseCoordinator.getLeaseManager().getLease(parentLeaseKeys.get(1))); + if (!isValidLeaseTableState) { + if(--retryLeftForValidParentState >= 0) { + throw new BlockedOnParentShardException( + "Shard " + shardInfo.getShardId() + "'s only child shard " + childShard + + " has partial parent information in lease table. Hence deferring lease creation of child shard."); + } else { + throw new InvalidStateException("Shard " + shardInfo.getShardId() + "'s only child shard " + childShard + + " has partial parent information in lease table."); + } + } + } + } + // Attempt create leases for child shards. for (ChildShard childShard : childShards) { final String leaseKey = childShard.getShardId(); if (leaseCoordinator.getLeaseManager().getLease(leaseKey) == null) { @@ -202,18 +230,10 @@ class ShutdownTask implements ITask { } } - private void updateCurrentLeaseWithChildShards() throws DependencyException, InvalidStateException, ProvisionedThroughputException { - final KinesisClientLease currentLease = leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId()); - if (currentLease == null) { - throw new InvalidStateException("Failed to retrieve current lease for shard " + shardInfo.getShardId()); - } + private void updateCurrentLeaseWithChildShards(KinesisClientLease currentLease) throws DependencyException, InvalidStateException, ProvisionedThroughputException { final Set childShardIds = childShards.stream().map(ChildShard::getShardId).collect(Collectors.toSet()); - currentLease.setChildShardIds(childShardIds); - final boolean updateResult = leaseCoordinator.updateLease(currentLease, UUID.fromString(shardInfo.getConcurrencyToken())); - if (!updateResult) { - throw new InvalidStateException("Failed to update parent lease with child shard information for shard " + shardInfo.getShardId()); - } + leaseCoordinator.getLeaseManager().updateLeaseWithMetaInfo(currentLease, UpdateField.CHILD_SHARDS); LOG.info("Shard " + shardInfo.getShardId() + ": Updated current lease with child shard information: " + currentLease.getLeaseKey()); } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java index cbfdf54a..6400b839 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java @@ -14,6 +14,9 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +import static junit.framework.TestCase.assertTrue; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doNothing; @@ -31,9 +34,11 @@ import java.util.List; import java.util.Set; import java.util.UUID; +import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.BlockedOnParentShardException; import com.amazonaws.services.kinesis.clientlibrary.proxies.ShardListWrappingShardClosureVerificationResponse; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException; +import com.amazonaws.services.kinesis.leases.impl.UpdateField; import com.amazonaws.services.kinesis.model.ChildShard; import com.amazonaws.services.kinesis.model.HashKeyRange; import com.amazonaws.services.kinesis.model.SequenceNumberRange; @@ -72,7 +77,6 @@ public class ShutdownTaskTest { defaultConcurrencyToken, defaultParentShardIds, ExtendedSequenceNumber.LATEST); - IRecordProcessor defaultRecordProcessor = new TestStreamlet(); ShardSyncer shardSyncer = new KinesisShardSyncer(new KinesisLeaseCleanupValidator()); @@ -86,6 +90,8 @@ public class ShutdownTaskTest { private ILeaseManager leaseManager; @Mock private KinesisClientLibLeaseCoordinator leaseCoordinator; + @Mock + private IRecordProcessor defaultRecordProcessor; /** * @throws java.lang.Exception @@ -110,7 +116,6 @@ public class ShutdownTaskTest { final KinesisClientLease parentLease = createLease(defaultShardId, "leaseOwner", Collections.emptyList()); when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); when(leaseCoordinator.getCurrentlyHeldLease(defaultShardId)).thenReturn(parentLease); - when(leaseCoordinator.updateLease(any(KinesisClientLease.class), any(UUID.class))).thenReturn(true); } /** @@ -143,9 +148,9 @@ public class ShutdownTaskTest { getRecordsCache, shardSyncer, shardSyncStrategy, - constructChildShards()); + constructSplitChildShards()); TaskResult result = task.call(); - Assert.assertNotNull(result.getException()); + assertNotNull(result.getException()); Assert.assertTrue(result.getException() instanceof IllegalArgumentException); final String expectedExceptionMessage = "Application didn't checkpoint at end of shard shardId-0. " + "Application must checkpoint upon shutdown. See IRecordProcessor.shutdown javadocs for more information."; @@ -178,13 +183,104 @@ public class ShutdownTaskTest { getRecordsCache, shardSyncer, shardSyncStrategy, - constructChildShards()); + constructSplitChildShards()); TaskResult result = task.call(); verify(getRecordsCache).shutdown(); verify(leaseCoordinator).dropLease(any(KinesisClientLease.class)); Assert.assertNull(result.getException()); } + @Test + public final void testCallWhenParentInfoNotPresentInLease() throws Exception { + RecordProcessorCheckpointer checkpointer = mock(RecordProcessorCheckpointer.class); + when(checkpointer.getLastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END); + when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); + boolean cleanupLeasesOfCompletedShards = false; + boolean ignoreUnexpectedChildShards = false; + + KinesisClientLease currentLease = createLease(defaultShardId, "leaseOwner", Collections.emptyList()); + KinesisClientLease adjacentParentLease = createLease("ShardId-1", "leaseOwner", Collections.emptyList()); + when(leaseCoordinator.getCurrentlyHeldLease(defaultShardId)).thenReturn( currentLease); + when(leaseManager.getLease(defaultShardId)).thenReturn(currentLease); + when(leaseManager.getLease("ShardId-1")).thenReturn(null, null, null, null, null, adjacentParentLease); + + ShutdownTask task = new ShutdownTask(defaultShardInfo, + defaultRecordProcessor, + checkpointer, + ShutdownReason.TERMINATE, + kinesisProxy, + INITIAL_POSITION_TRIM_HORIZON, + cleanupLeasesOfCompletedShards, + ignoreUnexpectedChildShards, + leaseCoordinator, + TASK_BACKOFF_TIME_MILLIS, + getRecordsCache, + shardSyncer, + shardSyncStrategy, + constructMergeChildShards()); + + // Make first 5 attempts with partial parent info in lease table + for (int i = 0; i < 5; i++) { + TaskResult result = task.call(); + assertNotNull(result.getException()); + assertTrue(result.getException() instanceof BlockedOnParentShardException); + assertTrue(result.getException().getMessage().contains("has partial parent information in lease table")); + verify(getRecordsCache, never()).shutdown(); + verify(defaultRecordProcessor, never()).shutdown(any(ShutdownInput.class)); + } + + // Make next attempt with complete parent info in lease table + TaskResult result = task.call(); + assertNull(result.getException()); + verify(getRecordsCache).shutdown(); + verify(defaultRecordProcessor).shutdown(any(ShutdownInput.class)); + verify(leaseCoordinator, never()).dropLease(currentLease); + } + + @Test + public final void testCallTriggersLeaseLossWhenParentInfoNotPresentInLease() throws Exception { + RecordProcessorCheckpointer checkpointer = mock(RecordProcessorCheckpointer.class); + when(checkpointer.getLastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END); + when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); + boolean cleanupLeasesOfCompletedShards = false; + boolean ignoreUnexpectedChildShards = false; + + KinesisClientLease currentLease = createLease(defaultShardId, "leaseOwner", Collections.emptyList()); + when(leaseCoordinator.getCurrentlyHeldLease(defaultShardId)).thenReturn( currentLease); + when(leaseManager.getLease(defaultShardId)).thenReturn(currentLease); + when(leaseManager.getLease("ShardId-1")).thenReturn(null, null, null, null, null, null, null, null, null, null, null); + + ShutdownTask task = new ShutdownTask(defaultShardInfo, + defaultRecordProcessor, + checkpointer, + ShutdownReason.TERMINATE, + kinesisProxy, + INITIAL_POSITION_TRIM_HORIZON, + cleanupLeasesOfCompletedShards, + ignoreUnexpectedChildShards, + leaseCoordinator, + TASK_BACKOFF_TIME_MILLIS, + getRecordsCache, + shardSyncer, + shardSyncStrategy, + constructMergeChildShards()); + + for (int i = 0; i < 10; i++) { + TaskResult result = task.call(); + assertNotNull(result.getException()); + assertTrue(result.getException() instanceof BlockedOnParentShardException); + assertTrue(result.getException().getMessage().contains("has partial parent information in lease table")); + verify(getRecordsCache, never()).shutdown(); + verify(defaultRecordProcessor, never()).shutdown(any(ShutdownInput.class)); + } + + TaskResult result = task.call(); + assertNull(result.getException()); + verify(getRecordsCache).shutdown(); + verify(defaultRecordProcessor).shutdown(any(ShutdownInput.class)); + verify(leaseCoordinator).dropLease(currentLease); + } + @Test public final void testCallWhenShardEnd() throws Exception { RecordProcessorCheckpointer checkpointer = mock(RecordProcessorCheckpointer.class); @@ -205,10 +301,10 @@ public class ShutdownTaskTest { getRecordsCache, shardSyncer, shardSyncStrategy, - constructChildShards()); + constructSplitChildShards()); TaskResult result = task.call(); verify(leaseManager, times(2)).createLeaseIfNotExists(any(KinesisClientLease.class)); - verify(leaseCoordinator).updateLease(any(KinesisClientLease.class), any(UUID.class)); + verify(leaseManager).updateLeaseWithMetaInfo(any(KinesisClientLease.class), any(UpdateField.class)); Assert.assertNull(result.getException()); verify(getRecordsCache).shutdown(); } @@ -241,7 +337,7 @@ public class ShutdownTaskTest { Collections.emptyList()); TaskResult result = task.call(); verify(leaseManager, never()).createLeaseIfNotExists(any(KinesisClientLease.class)); - verify(leaseCoordinator, never()).updateLease(any(KinesisClientLease.class), any(UUID.class)); + verify(leaseManager, never()).updateLeaseWithMetaInfo(any(KinesisClientLease.class), any(UpdateField.class)); Assert.assertNull(result.getException()); verify(getRecordsCache).shutdown(); } @@ -270,7 +366,7 @@ public class ShutdownTaskTest { Collections.emptyList()); TaskResult result = task.call(); verify(leaseManager, never()).createLeaseIfNotExists(any(KinesisClientLease.class)); - verify(leaseCoordinator, never()).updateLease(any(KinesisClientLease.class), any(UUID.class)); + verify(leaseManager, never()).updateLeaseWithMetaInfo(any(KinesisClientLease.class), any(UpdateField.class)); Assert.assertNull(result.getException()); verify(getRecordsCache).shutdown(); } @@ -288,7 +384,7 @@ public class ShutdownTaskTest { Assert.assertEquals(TaskType.SHUTDOWN, task.getTaskType()); } - private List constructChildShards() { + private List constructSplitChildShards() { List childShards = new ArrayList<>(); List parentShards = new ArrayList<>(); parentShards.add(defaultShardId); @@ -307,6 +403,21 @@ public class ShutdownTaskTest { return childShards; } + private List constructMergeChildShards() { + List childShards = new ArrayList<>(); + List parentShards = new ArrayList<>(); + parentShards.add(defaultShardId); + parentShards.add("ShardId-1"); + + ChildShard childShard = new ChildShard(); + childShard.setShardId("ShardId-2"); + childShard.setParentShards(parentShards); + childShard.setHashKeyRange(ShardObjectHelper.newHashKeyRange("0", "99")); + childShards.add(childShard); + + return childShards; + } + private KinesisClientLease createLease(String leaseKey, String leaseOwner, Collection parentShardIds) { KinesisClientLease lease = new KinesisClientLease(); lease.setLeaseKey(leaseKey); From 8a296a5aa3c50a012f3a1ad27bea1f4bf4ffe65e Mon Sep 17 00:00:00 2001 From: Chunxue Yang Date: Tue, 21 Jul 2020 11:08:56 -0700 Subject: [PATCH 2/7] change the retry logic --- .../lib/worker/ShutdownTask.java | 15 ++- .../lib/worker/ShutdownTaskTest.java | 96 +++++++++++++------ 2 files changed, 79 insertions(+), 32 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java index ed9de7e2..9af4645f 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java @@ -35,6 +35,7 @@ import com.google.common.annotations.VisibleForTesting; import java.util.List; import java.util.Objects; +import java.util.Random; import java.util.Set; import java.util.stream.Collectors; @@ -46,7 +47,8 @@ class ShutdownTask implements ITask { private static final Log LOG = LogFactory.getLog(ShutdownTask.class); private static final String RECORD_PROCESSOR_SHUTDOWN_METRIC = "RecordProcessor.shutdown"; - private int retryLeftForValidParentState = 10; + @VisibleForTesting + static final int RETRY_RANDOM_MAX_RANGE = 10; private final ShardInfo shardInfo; private final IRecordProcessor recordProcessor; @@ -208,7 +210,7 @@ class ShutdownTask implements ITask { boolean isValidLeaseTableState = Objects.isNull(leaseCoordinator.getLeaseManager().getLease(parentLeaseKeys.get(0))) == Objects.isNull(leaseCoordinator.getLeaseManager().getLease(parentLeaseKeys.get(1))); if (!isValidLeaseTableState) { - if(--retryLeftForValidParentState >= 0) { + if(!isOneInNProbability(RETRY_RANDOM_MAX_RANGE)) { throw new BlockedOnParentShardException( "Shard " + shardInfo.getShardId() + "'s only child shard " + childShard + " has partial parent information in lease table. Hence deferring lease creation of child shard."); @@ -230,6 +232,15 @@ class ShutdownTask implements ITask { } } + /** + * Returns true for 1 in N probability. + */ + @VisibleForTesting + boolean isOneInNProbability(int n) { + Random r = new Random(); + return 1 == r.nextInt((n - 1) + 1) + 1; + } + private void updateCurrentLeaseWithChildShards(KinesisClientLease currentLease) throws DependencyException, InvalidStateException, ProvisionedThroughputException { final Set childShardIds = childShards.stream().map(ChildShard::getShardId).collect(Collectors.toSet()); currentLease.setChildShardIds(childShardIds); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java index 6400b839..d9226f44 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java @@ -22,6 +22,7 @@ import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -60,6 +61,7 @@ import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager; import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; +import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownTask.RETRY_RANDOM_MAX_RANGE; /** * @@ -204,34 +206,51 @@ public class ShutdownTaskTest { when(leaseManager.getLease(defaultShardId)).thenReturn(currentLease); when(leaseManager.getLease("ShardId-1")).thenReturn(null, null, null, null, null, adjacentParentLease); - ShutdownTask task = new ShutdownTask(defaultShardInfo, - defaultRecordProcessor, - checkpointer, - ShutdownReason.TERMINATE, - kinesisProxy, - INITIAL_POSITION_TRIM_HORIZON, - cleanupLeasesOfCompletedShards, - ignoreUnexpectedChildShards, - leaseCoordinator, - TASK_BACKOFF_TIME_MILLIS, - getRecordsCache, - shardSyncer, - shardSyncStrategy, - constructMergeChildShards()); - // Make first 5 attempts with partial parent info in lease table for (int i = 0; i < 5; i++) { + ShutdownTask task = spy(new ShutdownTask(defaultShardInfo, + defaultRecordProcessor, + checkpointer, + ShutdownReason.TERMINATE, + kinesisProxy, + INITIAL_POSITION_TRIM_HORIZON, + cleanupLeasesOfCompletedShards, + ignoreUnexpectedChildShards, + leaseCoordinator, + TASK_BACKOFF_TIME_MILLIS, + getRecordsCache, + shardSyncer, + shardSyncStrategy, + constructMergeChildShards())); + when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(false); TaskResult result = task.call(); assertNotNull(result.getException()); assertTrue(result.getException() instanceof BlockedOnParentShardException); assertTrue(result.getException().getMessage().contains("has partial parent information in lease table")); + verify(task, times(1)).isOneInNProbability(RETRY_RANDOM_MAX_RANGE); verify(getRecordsCache, never()).shutdown(); verify(defaultRecordProcessor, never()).shutdown(any(ShutdownInput.class)); } // Make next attempt with complete parent info in lease table + ShutdownTask task = spy(new ShutdownTask(defaultShardInfo, + defaultRecordProcessor, + checkpointer, + ShutdownReason.TERMINATE, + kinesisProxy, + INITIAL_POSITION_TRIM_HORIZON, + cleanupLeasesOfCompletedShards, + ignoreUnexpectedChildShards, + leaseCoordinator, + TASK_BACKOFF_TIME_MILLIS, + getRecordsCache, + shardSyncer, + shardSyncStrategy, + constructMergeChildShards())); + when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(false); TaskResult result = task.call(); assertNull(result.getException()); + verify(task, never()).isOneInNProbability(RETRY_RANDOM_MAX_RANGE); verify(getRecordsCache).shutdown(); verify(defaultRecordProcessor).shutdown(any(ShutdownInput.class)); verify(leaseCoordinator, never()).dropLease(currentLease); @@ -250,32 +269,49 @@ public class ShutdownTaskTest { when(leaseManager.getLease(defaultShardId)).thenReturn(currentLease); when(leaseManager.getLease("ShardId-1")).thenReturn(null, null, null, null, null, null, null, null, null, null, null); - ShutdownTask task = new ShutdownTask(defaultShardInfo, - defaultRecordProcessor, - checkpointer, - ShutdownReason.TERMINATE, - kinesisProxy, - INITIAL_POSITION_TRIM_HORIZON, - cleanupLeasesOfCompletedShards, - ignoreUnexpectedChildShards, - leaseCoordinator, - TASK_BACKOFF_TIME_MILLIS, - getRecordsCache, - shardSyncer, - shardSyncStrategy, - constructMergeChildShards()); - for (int i = 0; i < 10; i++) { + ShutdownTask task = spy(new ShutdownTask(defaultShardInfo, + defaultRecordProcessor, + checkpointer, + ShutdownReason.TERMINATE, + kinesisProxy, + INITIAL_POSITION_TRIM_HORIZON, + cleanupLeasesOfCompletedShards, + ignoreUnexpectedChildShards, + leaseCoordinator, + TASK_BACKOFF_TIME_MILLIS, + getRecordsCache, + shardSyncer, + shardSyncStrategy, + constructMergeChildShards())); + when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(false); TaskResult result = task.call(); assertNotNull(result.getException()); assertTrue(result.getException() instanceof BlockedOnParentShardException); assertTrue(result.getException().getMessage().contains("has partial parent information in lease table")); + verify(task, times(1)).isOneInNProbability(RETRY_RANDOM_MAX_RANGE); verify(getRecordsCache, never()).shutdown(); verify(defaultRecordProcessor, never()).shutdown(any(ShutdownInput.class)); } + ShutdownTask task = spy(new ShutdownTask(defaultShardInfo, + defaultRecordProcessor, + checkpointer, + ShutdownReason.TERMINATE, + kinesisProxy, + INITIAL_POSITION_TRIM_HORIZON, + cleanupLeasesOfCompletedShards, + ignoreUnexpectedChildShards, + leaseCoordinator, + TASK_BACKOFF_TIME_MILLIS, + getRecordsCache, + shardSyncer, + shardSyncStrategy, + constructMergeChildShards())); + when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(true); TaskResult result = task.call(); assertNull(result.getException()); + verify(task, times(1)).isOneInNProbability(RETRY_RANDOM_MAX_RANGE); verify(getRecordsCache).shutdown(); verify(defaultRecordProcessor).shutdown(any(ShutdownInput.class)); verify(leaseCoordinator).dropLease(currentLease); From c3b41c3b5547c3bd665c6852a32e231cdee0812e Mon Sep 17 00:00:00 2001 From: Chunxue Yang Date: Mon, 27 Jul 2020 18:18:32 -0700 Subject: [PATCH 3/7] refactoring shutdownTask --- .../lib/worker/ConsumerStates.java | 3 +- .../lib/worker/ShardConsumer.java | 2 +- .../lib/worker/ShutdownTask.java | 218 ++++++++++-------- .../CustomerApplicationException.java | 24 ++ .../lib/worker/ShardConsumerTest.java | 19 +- .../lib/worker/ShutdownTaskTest.java | 43 ++-- 6 files changed, 197 insertions(+), 112 deletions(-) create mode 100644 src/main/java/com/amazonaws/services/kinesis/leases/exceptions/CustomerApplicationException.java diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java index 5cf55dbf..b5cde2bc 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java @@ -532,7 +532,8 @@ class ConsumerStates { consumer.getTaskBackoffTimeMillis(), consumer.getGetRecordsCache(), consumer.getShardSyncer(), consumer.getShardSyncStrategy(), consumer.getChildShards(), - consumer.getLeaseCleanupManager()); + consumer.getLeaseCleanupManager(), + consumer.getMetricsFactory()); } @Override diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java index 71f8a6bc..10600def 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java @@ -54,6 +54,7 @@ class ShardConsumer { private final ExecutorService executorService; private final ShardInfo shardInfo; private final KinesisDataFetcher dataFetcher; + @Getter private final IMetricsFactory metricsFactory; private final KinesisClientLibLeaseCoordinator leaseCoordinator; private ICheckpoint checkpoint; @@ -221,7 +222,6 @@ class ShardConsumer { * @param maxGetRecordsThreadPool max number of threads in the getRecords thread pool * @param config Kinesis library configuration * @param shardSyncer shardSyncer instance used to check and create new leases - * @param leaseCleanupManager used to clean up leases in lease table. */ @Deprecated ShardConsumer(ShardInfo shardInfo, diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java index d30ec765..a9ec8d76 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java @@ -16,11 +16,13 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import com.amazonaws.services.kinesis.leases.LeasePendingDeletion; import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.BlockedOnParentShardException; +import com.amazonaws.services.kinesis.leases.exceptions.CustomerApplicationException; import com.amazonaws.services.kinesis.leases.exceptions.DependencyException; import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException; import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager; import com.amazonaws.services.kinesis.leases.impl.UpdateField; +import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; import com.amazonaws.services.kinesis.model.ChildShard; import com.amazonaws.util.CollectionUtils; import org.apache.commons.logging.Log; @@ -37,6 +39,7 @@ import com.google.common.annotations.VisibleForTesting; import java.util.List; import java.util.Objects; +import java.util.Optional; import java.util.Random; import java.util.Set; import java.util.stream.Collectors; @@ -48,6 +51,7 @@ class ShutdownTask implements ITask { private static final Log LOG = LogFactory.getLog(ShutdownTask.class); + private static final String SHUTDOWN_TASK_OPERATION = "ShutdownTask"; private static final String RECORD_PROCESSOR_SHUTDOWN_METRIC = "RecordProcessor.shutdown"; @VisibleForTesting static final int RETRY_RANDOM_MAX_RANGE = 10; @@ -68,6 +72,7 @@ class ShutdownTask implements ITask { private final ShardSyncStrategy shardSyncStrategy; private final List childShards; private final LeaseCleanupManager leaseCleanupManager; + private final IMetricsFactory metricsFactory; /** * Constructor. @@ -85,7 +90,7 @@ class ShutdownTask implements ITask { long backoffTimeMillis, GetRecordsCache getRecordsCache, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy, List childShards, - LeaseCleanupManager leaseCleanupManager) { + LeaseCleanupManager leaseCleanupManager, IMetricsFactory metricsFactory) { this.shardInfo = shardInfo; this.recordProcessor = recordProcessor; this.recordProcessorCheckpointer = recordProcessorCheckpointer; @@ -101,6 +106,7 @@ class ShutdownTask implements ITask { this.shardSyncStrategy = shardSyncStrategy; this.childShards = childShards; this.leaseCleanupManager = leaseCleanupManager; + this.metricsFactory = metricsFactory; } /* @@ -111,110 +117,143 @@ class ShutdownTask implements ITask { */ @Override public TaskResult call() { + MetricsHelper.startScope(metricsFactory, SHUTDOWN_TASK_OPERATION); Exception exception; - boolean applicationException = false; try { LOG.info("Invoking shutdown() for shard " + shardInfo.getShardId() + ", concurrencyToken: " - + shardInfo.getConcurrencyToken() + ", original Shutdown reason: " + reason + ". childShards:" + childShards); + + shardInfo.getConcurrencyToken() + ", original Shutdown reason: " + reason + ". childShards:" + childShards); - ShutdownReason localReason = attemptPersistingChildShardInfoAndOverrideShutdownReasonOnFailure(reason); - - final ShutdownInput shutdownInput = new ShutdownInput() - .withShutdownReason(localReason) - .withCheckpointer(recordProcessorCheckpointer); - final long recordProcessorStartTimeMillis = System.currentTimeMillis(); try { - recordProcessor.shutdown(shutdownInput); - ExtendedSequenceNumber lastCheckpointValue = recordProcessorCheckpointer.getLastCheckpointValue(); + final long startTime = System.currentTimeMillis(); + final KinesisClientLease currentShardLease = leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId()); + final Runnable leaseLostAction = () -> takeLeaseLostAction(); - final boolean successfullyCheckpointedShardEnd = lastCheckpointValue.equals(ExtendedSequenceNumber.SHARD_END); - - if (localReason == ShutdownReason.TERMINATE) { - if ((lastCheckpointValue == null) || (!successfullyCheckpointedShardEnd)) { - throw new IllegalArgumentException("Application didn't checkpoint at end of shard " - + shardInfo.getShardId() + ". Application must checkpoint upon shutdown. " + - "See IRecordProcessor.shutdown javadocs for more information."); - } - - // Check if either the shard end ddb persist is successful or - // if childshards is empty. When child shards is empty then either it is due to - // completed shard being reprocessed or we got RNF from service. - // For these cases enqueue the lease for deletion. - if (successfullyCheckpointedShardEnd || CollectionUtils.isNullOrEmpty(childShards)) { - final KinesisClientLease currentLease = leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId()); - final LeasePendingDeletion leasePendingDeletion = new LeasePendingDeletion(currentLease, shardInfo); - - if (!leaseCleanupManager.isEnqueuedForDeletion(leasePendingDeletion)) { - leaseCleanupManager.enqueueForDeletion(leasePendingDeletion); - } - - //TODO: Add shard end checkpointing here. + if (reason == ShutdownReason.TERMINATE) { + try { + takeShardEndAction(currentShardLease, startTime); + } catch (InvalidStateException e) { + // If InvalidStateException happens, it indicates we have a non recoverable error in short term. + // In this scenario, we should shutdown the shardConsumer with ZOMBIE reason to allow other worker to take the lease and retry shutting down. + LOG.warn("Lease " + shardInfo.getShardId() + ": Invalid state encountered while shutting down shardConsumer with TERMINATE reason. " + + "Dropping the lease and shutting down shardConsumer using ZOMBIE reason. ", e); + dropLease(currentShardLease); + throwOnApplicationException(leaseLostAction, startTime); } + } else { + throwOnApplicationException(leaseLostAction, startTime); } + LOG.debug("Shutting down retrieval strategy."); getRecordsCache.shutdown(); LOG.debug("Record processor completed shutdown() for shard " + shardInfo.getShardId()); + return new TaskResult(null); } catch (Exception e) { - applicationException = true; - throw e; - } finally { - MetricsHelper.addLatency(RECORD_PROCESSOR_SHUTDOWN_METRIC, recordProcessorStartTimeMillis, - MetricsLevel.SUMMARY); - } + if (e instanceof CustomerApplicationException) { + LOG.error("Shard " + shardInfo.getShardId() + ": Application exception: ", e); + } else { + LOG.error("Shard " + shardInfo.getShardId() + ": Caught exception: ", e); + } - return new TaskResult(null); - } catch (Exception e) { - if (applicationException) { - LOG.error("Application exception. ", e); - } else { - LOG.error("Caught exception: ", e); - } - exception = e; - // backoff if we encounter an exception. - try { - Thread.sleep(this.backoffTimeMillis); - } catch (InterruptedException ie) { - LOG.debug("Interrupted sleep", ie); + exception = e; + // backoff if we encounter an exception. + try { + Thread.sleep(this.backoffTimeMillis); + } catch (InterruptedException ie) { + LOG.debug("Interrupted sleep", ie); + } } + } finally { + MetricsHelper.endScope(); } return new TaskResult(exception); } - private ShutdownReason attemptPersistingChildShardInfoAndOverrideShutdownReasonOnFailure(ShutdownReason originalReason) - throws DependencyException, ProvisionedThroughputException { - ShutdownReason shutdownReason = originalReason; - if(originalReason == ShutdownReason.TERMINATE) { - // For TERMINATE shutdown reason, try to create and persist childShard leases before setting checkpoint. - try { - final KinesisClientLease currentLease = leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId()); - if (currentLease == null) { - throw new InvalidStateException(shardInfo.getShardId() - + " : Lease not owned by the current worker. Leaving ShardEnd handling to new owner."); - } - if (!CollectionUtils.isNullOrEmpty(childShards)) { - createLeasesForChildShardsIfNotExist(); - updateCurrentLeaseWithChildShards(currentLease); - recordProcessorCheckpointer.setSequenceNumberAtShardEnd( - recordProcessorCheckpointer.getLargestPermittedCheckpointValue()); - recordProcessorCheckpointer.setLargestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END); - } else { - LOG.warn("Shard " + shardInfo.getShardId() - + ": Shutting down consumer with SHARD_END reason without creating leases for child shards."); - } - } catch (InvalidStateException e) { - // If invalidStateException happens, it indicates we are missing childShard related information. - // In this scenario, we should shutdown the shardConsumer with ZOMBIE reason to allow other worker to take the lease and retry getting - // childShard information in the processTask. - shutdownReason = ShutdownReason.ZOMBIE; - dropLease(); - LOG.warn("Shard " + shardInfo.getShardId() + ": Exception happened while shutting down shardConsumer with TERMINATE reason. " + - "Dropping the lease and shutting down shardConsumer using ZOMBIE reason. Exception: ", e); - } - + // Involves persisting child shard info, attempt to checkpoint and enqueueing lease for cleanup. + private void takeShardEndAction(KinesisClientLease currentShardLease, long startTime) + throws InvalidStateException, DependencyException, ProvisionedThroughputException, CustomerApplicationException { + // Create new lease for the child shards if they don't exist. + // We have one valid scenario that shutdown task got created with SHARD_END reason and an empty list of childShards. + // This would happen when KinesisDataFetcher catches ResourceNotFound exception. + // In this case, KinesisDataFetcher will send out SHARD_END signal to trigger a shutdown task with empty list of childShards. + // This scenario could happen when customer deletes the stream while leaving the KCL application running. + if (currentShardLease == null) { + throw new InvalidStateException("Shard " + shardInfo.getShardId() + ": Lease not owned by the current worker. Leaving ShardEnd handling to new owner."); + } + if (!CollectionUtils.isNullOrEmpty(childShards)) { + // If childShards is not empty, create new leases for the childShards and update the current lease with the childShards lease information. + createLeasesForChildShardsIfNotExist(); + updateCurrentLeaseWithChildShards(currentShardLease); + } else { + LOG.warn("Shard " + shardInfo.getShardId() + + ": Shutting down consumer with SHARD_END reason without creating leases for child shards."); + } + // Checkpoint with SHARD_END sequence number. + final LeasePendingDeletion leasePendingDeletion = new LeasePendingDeletion(currentShardLease, shardInfo); + if (!leaseCleanupManager.isEnqueuedForDeletion(leasePendingDeletion)) { + boolean isSuccess = false; + try { + isSuccess = attemptShardEndCheckpointing(startTime); + } finally { + // Check if either the shard end ddb persist is successful or + // if childshards is empty. When child shards is empty then either it is due to + // completed shard being reprocessed or we got RNF from service. + // For these cases enqueue the lease for deletion. + if (isSuccess || CollectionUtils.isNullOrEmpty(childShards)) { + leaseCleanupManager.enqueueForDeletion(leasePendingDeletion); + } + } + } + } + + private void takeLeaseLostAction() { + final ShutdownInput leaseLostShutdownInput = new ShutdownInput() + .withShutdownReason(ShutdownReason.ZOMBIE) + .withCheckpointer(recordProcessorCheckpointer); + recordProcessor.shutdown(leaseLostShutdownInput); + } + + private boolean attemptShardEndCheckpointing(long startTime) + throws DependencyException, ProvisionedThroughputException, InvalidStateException, CustomerApplicationException { + final KinesisClientLease leaseFromDdb = Optional.ofNullable(leaseCoordinator.getLeaseManager().getLease(shardInfo.getShardId())) + .orElseThrow(() -> new InvalidStateException("Lease for shard " + shardInfo.getShardId() + " does not exist.")); + if (!leaseFromDdb.getCheckpoint().equals(ExtendedSequenceNumber.SHARD_END)) { + // Call the recordProcessor to checkpoint with SHARD_END sequence number. + // The recordProcessor.shutdown is implemented by customer. We should validate if the SHARD_END checkpointing is successful after calling recordProcessor.shutdown. + throwOnApplicationException(() -> applicationCheckpointAndVerification(), startTime); + } + return true; + } + + private void applicationCheckpointAndVerification() { + recordProcessorCheckpointer.setSequenceNumberAtShardEnd( + recordProcessorCheckpointer.getLargestPermittedCheckpointValue()); + recordProcessorCheckpointer.setLargestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END); + final ShutdownInput shardEndShutdownInput = new ShutdownInput() + .withShutdownReason(ShutdownReason.TERMINATE) + .withCheckpointer(recordProcessorCheckpointer); + recordProcessor.shutdown(shardEndShutdownInput); + + final ExtendedSequenceNumber lastCheckpointValue = recordProcessorCheckpointer.getLastCheckpointValue(); + + final boolean successfullyCheckpointedShardEnd = lastCheckpointValue.equals(ExtendedSequenceNumber.SHARD_END); + + if ((lastCheckpointValue == null) || (!successfullyCheckpointedShardEnd)) { + throw new IllegalArgumentException("Application didn't checkpoint at end of shard " + + shardInfo.getShardId() + ". Application must checkpoint upon shutdown. " + + "See IRecordProcessor.shutdown javadocs for more information."); + } + } + + private void throwOnApplicationException(Runnable action, long startTime) throws CustomerApplicationException { + try { + action.run(); + } catch (Exception e) { + throw new CustomerApplicationException("Customer application throws exception for shard " + shardInfo.getShardId(), e); + } finally { + MetricsHelper.addLatency(RECORD_PROCESSOR_SHUTDOWN_METRIC, startTime, MetricsLevel.SUMMARY); } - return shutdownReason; } private void createLeasesForChildShardsIfNotExist() throws InvalidStateException, DependencyException, ProvisionedThroughputException { @@ -285,13 +324,12 @@ class ShutdownTask implements ITask { return reason; } - private void dropLease() { - KinesisClientLease lease = leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId()); - if (lease == null) { - LOG.warn("Shard " + shardInfo.getShardId() + ": Lease already dropped. Will shutdown the shardConsumer directly."); + private void dropLease(KinesisClientLease currentShardLease) { + if (currentShardLease == null) { + LOG.warn("Shard " + shardInfo.getShardId() + ": Unable to find the lease for shard. Will shutdown the shardConsumer directly."); return; } - leaseCoordinator.dropLease(lease); - LOG.warn("Dropped lease for shutting down ShardConsumer: " + lease.getLeaseKey()); + leaseCoordinator.dropLease(currentShardLease); + LOG.warn("Dropped lease for shutting down ShardConsumer: " + currentShardLease.getLeaseKey()); } } diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/exceptions/CustomerApplicationException.java b/src/main/java/com/amazonaws/services/kinesis/leases/exceptions/CustomerApplicationException.java new file mode 100644 index 00000000..1ef906af --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/leases/exceptions/CustomerApplicationException.java @@ -0,0 +1,24 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazonaws.services.kinesis.leases.exceptions; + +public class CustomerApplicationException extends Exception { + public CustomerApplicationException(Throwable t) {super(t);} + + public CustomerApplicationException(String message, Throwable t) {super(message, t);} + + public CustomerApplicationException(String message) {super(message);} +} diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java index 67bf3697..cb89b619 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java @@ -583,7 +583,12 @@ public class ShardConsumerTest { .thenReturn(getRecordsCache); when(leaseManager.getLease(anyString())).thenReturn(null); when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); - when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId())).thenReturn(new KinesisClientLease()); + List parentShardIds = new ArrayList<>(); + parentShardIds.add("parentShardId"); + KinesisClientLease currentLease = createLease(streamShardId, "leaseOwner", parentShardIds); + currentLease.setCheckpoint(new ExtendedSequenceNumber("testSequenceNumbeer")); + when(leaseManager.getLease(streamShardId)).thenReturn(currentLease); + when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId())).thenReturn(currentLease); RecordProcessorCheckpointer recordProcessorCheckpointer = new RecordProcessorCheckpointer( shardInfo, @@ -705,7 +710,11 @@ public class ShardConsumerTest { final int idleTimeMS = 0; // keep unit tests fast ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString()); checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.TRIM_HORIZON, testConcurrencyToken); - when(leaseManager.getLease(anyString())).thenReturn(null); + List parentShardIds = new ArrayList<>(); + parentShardIds.add("parentShardId"); + KinesisClientLease currentLease = createLease(streamShardId, "leaseOwner", parentShardIds); + currentLease.setCheckpoint(new ExtendedSequenceNumber("testSequenceNumbeer")); + when(leaseManager.getLease(streamShardId)).thenReturn(currentLease); when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); TransientShutdownErrorTestStreamlet processor = new TransientShutdownErrorTestStreamlet(); @@ -758,11 +767,7 @@ public class ShardConsumerTest { shardSyncer, shardSyncStrategy); - List parentShardIds = new ArrayList<>(); - parentShardIds.add(shardInfo.getShardId()); - when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId())).thenReturn(createLease(shardInfo.getShardId(), - "leaseOwner", - parentShardIds)); + when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId())).thenReturn(currentLease); when(leaseCoordinator.updateLease(any(KinesisClientLease.class), any(UUID.class))).thenReturn(true); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java index 219fd28c..2942dcb7 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java @@ -38,9 +38,12 @@ import java.util.UUID; import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.BlockedOnParentShardException; import com.amazonaws.services.kinesis.clientlibrary.proxies.ShardListWrappingShardClosureVerificationResponse; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; +import com.amazonaws.services.kinesis.leases.exceptions.CustomerApplicationException; import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.leases.impl.UpdateField; import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager; +import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory; +import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; import com.amazonaws.services.kinesis.model.ChildShard; import com.amazonaws.services.kinesis.model.HashKeyRange; import com.amazonaws.services.kinesis.model.SequenceNumberRange; @@ -81,6 +84,7 @@ public class ShutdownTaskTest { defaultParentShardIds, ExtendedSequenceNumber.LATEST); ShardSyncer shardSyncer = new KinesisShardSyncer(new KinesisLeaseCleanupValidator()); + IMetricsFactory metricsFactory = new NullMetricsFactory(); @Mock @@ -119,8 +123,11 @@ public class ShutdownTaskTest { public void setUp() throws Exception { doNothing().when(getRecordsCache).shutdown(); final KinesisClientLease parentLease = createLease(defaultShardId, "leaseOwner", Collections.emptyList()); + parentLease.setCheckpoint(new ExtendedSequenceNumber("3298")); when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); when(leaseCoordinator.getCurrentlyHeldLease(defaultShardId)).thenReturn(parentLease); + when(leaseManager.getLease(defaultShardId)).thenReturn(parentLease); + } /** @@ -154,12 +161,12 @@ public class ShutdownTaskTest { shardSyncer, shardSyncStrategy, constructSplitChildShards(), - leaseCleanupManager); + leaseCleanupManager, + metricsFactory); TaskResult result = task.call(); assertNotNull(result.getException()); - Assert.assertTrue(result.getException() instanceof IllegalArgumentException); - final String expectedExceptionMessage = "Application didn't checkpoint at end of shard shardId-0. " + - "Application must checkpoint upon shutdown. See IRecordProcessor.shutdown javadocs for more information."; + Assert.assertTrue(result.getException() instanceof CustomerApplicationException); + final String expectedExceptionMessage = "Customer application throws exception for shard shardId-0"; Assert.assertEquals(expectedExceptionMessage, result.getException().getMessage()); } @@ -190,7 +197,8 @@ public class ShutdownTaskTest { shardSyncer, shardSyncStrategy, constructSplitChildShards(), - leaseCleanupManager); + leaseCleanupManager, + metricsFactory); TaskResult result = task.call(); verify(getRecordsCache).shutdown(); verify(leaseCoordinator).dropLease(any(KinesisClientLease.class)); @@ -206,6 +214,7 @@ public class ShutdownTaskTest { boolean ignoreUnexpectedChildShards = false; KinesisClientLease currentLease = createLease(defaultShardId, "leaseOwner", Collections.emptyList()); + currentLease.setCheckpoint(new ExtendedSequenceNumber("3298")); KinesisClientLease adjacentParentLease = createLease("ShardId-1", "leaseOwner", Collections.emptyList()); when(leaseCoordinator.getCurrentlyHeldLease(defaultShardId)).thenReturn( currentLease); when(leaseManager.getLease(defaultShardId)).thenReturn(currentLease); @@ -227,7 +236,8 @@ public class ShutdownTaskTest { shardSyncer, shardSyncStrategy, constructMergeChildShards(), - leaseCleanupManager)); + leaseCleanupManager, + metricsFactory)); when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(false); TaskResult result = task.call(); assertNotNull(result.getException()); @@ -253,7 +263,8 @@ public class ShutdownTaskTest { shardSyncer, shardSyncStrategy, constructMergeChildShards(), - leaseCleanupManager)); + leaseCleanupManager, + metricsFactory)); when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(false); TaskResult result = task.call(); assertNull(result.getException()); @@ -291,7 +302,8 @@ public class ShutdownTaskTest { shardSyncer, shardSyncStrategy, constructMergeChildShards(), - leaseCleanupManager)); + leaseCleanupManager, + metricsFactory)); when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(false); TaskResult result = task.call(); assertNotNull(result.getException()); @@ -316,7 +328,8 @@ public class ShutdownTaskTest { shardSyncer, shardSyncStrategy, constructMergeChildShards(), - leaseCleanupManager)); + leaseCleanupManager, + metricsFactory)); when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(true); TaskResult result = task.call(); assertNull(result.getException()); @@ -347,7 +360,8 @@ public class ShutdownTaskTest { shardSyncer, shardSyncStrategy, constructSplitChildShards(), - leaseCleanupManager); + leaseCleanupManager, + metricsFactory); TaskResult result = task.call(); verify(leaseManager, times(2)).createLeaseIfNotExists(any(KinesisClientLease.class)); verify(leaseManager).updateLeaseWithMetaInfo(any(KinesisClientLease.class), any(UpdateField.class)); @@ -381,7 +395,8 @@ public class ShutdownTaskTest { shardSyncer, shardSyncStrategy, Collections.emptyList(), - leaseCleanupManager); + leaseCleanupManager, + metricsFactory); TaskResult result = task.call(); verify(leaseManager, never()).createLeaseIfNotExists(any(KinesisClientLease.class)); verify(leaseManager, never()).updateLeaseWithMetaInfo(any(KinesisClientLease.class), any(UpdateField.class)); @@ -411,7 +426,8 @@ public class ShutdownTaskTest { shardSyncer, shardSyncStrategy, Collections.emptyList(), - leaseCleanupManager); + leaseCleanupManager, + metricsFactory); TaskResult result = task.call(); verify(leaseManager, never()).createLeaseIfNotExists(any(KinesisClientLease.class)); verify(leaseManager, never()).updateLeaseWithMetaInfo(any(KinesisClientLease.class), any(UpdateField.class)); @@ -428,7 +444,8 @@ public class ShutdownTaskTest { ShutdownTask task = new ShutdownTask(null, null, null, null, null, null, false, false, leaseCoordinator, 0, - getRecordsCache, shardSyncer, shardSyncStrategy, Collections.emptyList(), leaseCleanupManager); + getRecordsCache, shardSyncer, shardSyncStrategy, Collections.emptyList(), + leaseCleanupManager, metricsFactory); Assert.assertEquals(TaskType.SHUTDOWN, task.getTaskType()); } From f7130175282b62627bb6538f728dc797ae215a64 Mon Sep 17 00:00:00 2001 From: Chunxue Yang Date: Tue, 28 Jul 2020 13:11:57 -0700 Subject: [PATCH 4/7] LeaseCleanupManager change --- .../clientlibrary/lib/worker/Worker.java | 7 +------ .../leases/impl/LeaseCleanupManager.java | 17 ++++++++++------- .../leases/impl/LeaseCleanupManagerTest.java | 11 +++++++++++ 3 files changed, 22 insertions(+), 13 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 5b4f31e8..8e5cce85 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -733,12 +733,7 @@ public class Worker implements Runnable { } } - if (!leaseCleanupManager.isRunning()) { - LOG.info("Starting LeaseCleanupManager."); - leaseCleanupManager.start(); - } else { - LOG.info("LeaseCleanupManager is already running. No need to start it."); - } + leaseCleanupManager.start(); // If we reach this point, then we either skipped the lease sync or did not have any exception for the // shard sync in the previous attempt. diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCleanupManager.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCleanupManager.java index af02f588..b3157e78 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCleanupManager.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCleanupManager.java @@ -114,13 +114,16 @@ public class LeaseCleanupManager { * {@link LeaseCleanupManager#leaseCleanupIntervalMillis} */ public void start() { - LOG.debug("Starting lease cleanup thread."); - isRunning = true; - completedLeaseStopwatch.start(); - garbageLeaseStopwatch.start(); - - deletionThreadPool.scheduleAtFixedRate(new LeaseCleanupThread(), INITIAL_DELAY, leaseCleanupIntervalMillis, - TimeUnit.MILLISECONDS); + if (!isRunning) { + LOG.info("Starting lease cleanup thread."); + completedLeaseStopwatch.start(); + garbageLeaseStopwatch.start(); + deletionThreadPool.scheduleAtFixedRate(new LeaseCleanupThread(), INITIAL_DELAY, leaseCleanupIntervalMillis, + TimeUnit.MILLISECONDS); + isRunning = true; + } else { + LOG.info("Lease cleanup thread already running, no need to start."); + } } /** diff --git a/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseCleanupManagerTest.java b/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseCleanupManagerTest.java index 367c0ab0..f89ae644 100644 --- a/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseCleanupManagerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseCleanupManagerTest.java @@ -26,6 +26,7 @@ import com.amazonaws.services.kinesis.model.ChildShard; import com.amazonaws.services.kinesis.model.GetRecordsResult; import com.amazonaws.services.kinesis.model.ResourceNotFoundException; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -82,6 +83,16 @@ public class LeaseCleanupManagerTest { cleanupLeasesOfCompletedShards, leaseCleanupIntervalMillis, completedLeaseCleanupIntervalMillis, garbageLeaseCleanupIntervalMillis, maxRecords); } + /** + * Tests subsequent calls to start {@link LeaseCleanupManager}. + */ + @Test + public final void testSubsequentStarts() { + leaseCleanupManager.start(); + Assert.assertTrue(leaseCleanupManager.isRunning()); + leaseCleanupManager.start(); + } + /** * Tests that when both child shard leases are present, we are able to delete the parent shard for the completed * shard case. From 6738087a8fc5fef7b7fdb7a3cbf32bfbbcccdd4a Mon Sep 17 00:00:00 2001 From: Chunxue Yang Date: Tue, 28 Jul 2020 13:32:04 -0700 Subject: [PATCH 5/7] updateLeaseWithMetaInfo with expectation --- .../leases/impl/KinesisClientLeaseSerializer.java | 5 +++++ .../services/kinesis/leases/impl/LeaseManager.java | 2 +- .../services/kinesis/leases/impl/LeaseSerializer.java | 10 ++++++++++ .../kinesis/leases/interfaces/ILeaseSerializer.java | 7 +++++++ 4 files changed, 23 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseSerializer.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseSerializer.java index 0b9271be..966faee4 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseSerializer.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseSerializer.java @@ -120,6 +120,11 @@ public class KinesisClientLeaseSerializer implements ILeaseSerializer getDynamoExistantExpectation(final String leaseKey) { + return baseSerializer.getDynamoExistantExpectation(leaseKey); + } + @Override public Map getDynamoLeaseCounterUpdate(KinesisClientLease lease) { return baseSerializer.getDynamoLeaseCounterUpdate(lease); diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java index e5860870..36f56dda 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java @@ -620,7 +620,7 @@ public class LeaseManager implements ILeaseManager { UpdateItemRequest request = new UpdateItemRequest(); request.setTableName(table); request.setKey(serializer.getDynamoHashKey(lease)); - request.setExpected(serializer.getDynamoLeaseCounterExpectation(lease)); + request.setExpected(serializer.getDynamoExistantExpectation(lease.getLeaseKey())); Map updates = serializer.getDynamoUpdateLeaseUpdate(lease, updateField); updates.putAll(serializer.getDynamoUpdateLeaseUpdate(lease)); diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseSerializer.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseSerializer.java index b02ed34c..62977df7 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseSerializer.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseSerializer.java @@ -137,6 +137,16 @@ public class LeaseSerializer implements ILeaseSerializer { return result; } + @Override + public Map getDynamoExistantExpectation(final String leaseKey) { + Map result = new HashMap<>(); + + ExpectedAttributeValue expectedAV = new ExpectedAttributeValue(DynamoUtils.createAttributeValue(leaseKey)); + result.put(LEASE_KEY_KEY, expectedAV); + + return result; + } + @Override public Map getDynamoLeaseCounterUpdate(Lease lease) { return getDynamoLeaseCounterUpdate(lease.getLeaseCounter()); diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/interfaces/ILeaseSerializer.java b/src/main/java/com/amazonaws/services/kinesis/leases/interfaces/ILeaseSerializer.java index 58eb6613..8ff56b7d 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/interfaces/ILeaseSerializer.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/interfaces/ILeaseSerializer.java @@ -79,6 +79,13 @@ public interface ILeaseSerializer { */ public Map getDynamoNonexistantExpectation(); + /** + * @return the attribute value map asserting that a lease does exist. + */ + default Map getDynamoExistantExpectation(final String leaseKey) { + throw new UnsupportedOperationException("DynamoExistantExpectation is not implemented"); + } + /** * @param lease * @return the attribute value map that increments a lease counter From 38cef8963a442aab43f8bb0356e3e5d99e2f1279 Mon Sep 17 00:00:00 2001 From: Chunxue Yang Date: Tue, 28 Jul 2020 13:43:56 -0700 Subject: [PATCH 6/7] fixing expectation --- .../services/kinesis/leases/impl/LeaseSerializer.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseSerializer.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseSerializer.java index 62977df7..29bf0f9b 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseSerializer.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseSerializer.java @@ -141,7 +141,9 @@ public class LeaseSerializer implements ILeaseSerializer { public Map getDynamoExistantExpectation(final String leaseKey) { Map result = new HashMap<>(); - ExpectedAttributeValue expectedAV = new ExpectedAttributeValue(DynamoUtils.createAttributeValue(leaseKey)); + ExpectedAttributeValue expectedAV = new ExpectedAttributeValue(); + expectedAV.setValue(DynamoUtils.createAttributeValue(leaseKey)); + expectedAV.setExists(true); result.put(LEASE_KEY_KEY, expectedAV); return result; From 22737c4a5be03754da076d1708d92295745f6d1c Mon Sep 17 00:00:00 2001 From: Chunxue Yang Date: Tue, 28 Jul 2020 17:51:50 -0700 Subject: [PATCH 7/7] addressing comments --- .../lib/worker/ConsumerStates.java | 3 +- .../lib/worker/ShardConsumer.java | 1 - .../lib/worker/ShutdownTask.java | 99 ++++++++----------- .../impl/KinesisClientLeaseSerializer.java | 4 +- .../kinesis/leases/impl/LeaseManager.java | 4 +- .../kinesis/leases/impl/LeaseSerializer.java | 2 +- .../leases/interfaces/ILeaseSerializer.java | 4 +- .../lib/worker/ShutdownTaskTest.java | 30 ++---- .../impl/LeaseManagerIntegrationTest.java | 32 ++++++ 9 files changed, 93 insertions(+), 86 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java index b5cde2bc..5cf55dbf 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java @@ -532,8 +532,7 @@ class ConsumerStates { consumer.getTaskBackoffTimeMillis(), consumer.getGetRecordsCache(), consumer.getShardSyncer(), consumer.getShardSyncStrategy(), consumer.getChildShards(), - consumer.getLeaseCleanupManager(), - consumer.getMetricsFactory()); + consumer.getLeaseCleanupManager()); } @Override diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java index 10600def..29a95ac4 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java @@ -54,7 +54,6 @@ class ShardConsumer { private final ExecutorService executorService; private final ShardInfo shardInfo; private final KinesisDataFetcher dataFetcher; - @Getter private final IMetricsFactory metricsFactory; private final KinesisClientLibLeaseCoordinator leaseCoordinator; private ICheckpoint checkpoint; diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java index a9ec8d76..274aaaa1 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java @@ -22,7 +22,6 @@ import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException; import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager; import com.amazonaws.services.kinesis.leases.impl.UpdateField; -import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; import com.amazonaws.services.kinesis.model.ChildShard; import com.amazonaws.util.CollectionUtils; import org.apache.commons.logging.Log; @@ -33,8 +32,6 @@ import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; -import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper; -import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; import com.google.common.annotations.VisibleForTesting; import java.util.List; @@ -51,10 +48,8 @@ class ShutdownTask implements ITask { private static final Log LOG = LogFactory.getLog(ShutdownTask.class); - private static final String SHUTDOWN_TASK_OPERATION = "ShutdownTask"; - private static final String RECORD_PROCESSOR_SHUTDOWN_METRIC = "RecordProcessor.shutdown"; @VisibleForTesting - static final int RETRY_RANDOM_MAX_RANGE = 10; + static final int RETRY_RANDOM_MAX_RANGE = 50; private final ShardInfo shardInfo; private final IRecordProcessor recordProcessor; @@ -72,7 +67,6 @@ class ShutdownTask implements ITask { private final ShardSyncStrategy shardSyncStrategy; private final List childShards; private final LeaseCleanupManager leaseCleanupManager; - private final IMetricsFactory metricsFactory; /** * Constructor. @@ -90,7 +84,7 @@ class ShutdownTask implements ITask { long backoffTimeMillis, GetRecordsCache getRecordsCache, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy, List childShards, - LeaseCleanupManager leaseCleanupManager, IMetricsFactory metricsFactory) { + LeaseCleanupManager leaseCleanupManager) { this.shardInfo = shardInfo; this.recordProcessor = recordProcessor; this.recordProcessorCheckpointer = recordProcessorCheckpointer; @@ -106,7 +100,6 @@ class ShutdownTask implements ITask { this.shardSyncStrategy = shardSyncStrategy; this.childShards = childShards; this.leaseCleanupManager = leaseCleanupManager; - this.metricsFactory = metricsFactory; } /* @@ -117,61 +110,55 @@ class ShutdownTask implements ITask { */ @Override public TaskResult call() { - MetricsHelper.startScope(metricsFactory, SHUTDOWN_TASK_OPERATION); Exception exception; + LOG.info("Invoking shutdown() for shard " + shardInfo.getShardId() + ", concurrencyToken: " + + shardInfo.getConcurrencyToken() + ", original Shutdown reason: " + reason + ". childShards:" + childShards); + try { - LOG.info("Invoking shutdown() for shard " + shardInfo.getShardId() + ", concurrencyToken: " - + shardInfo.getConcurrencyToken() + ", original Shutdown reason: " + reason + ". childShards:" + childShards); + final KinesisClientLease currentShardLease = leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId()); + final Runnable leaseLostAction = () -> takeLeaseLostAction(); - try { - final long startTime = System.currentTimeMillis(); - final KinesisClientLease currentShardLease = leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId()); - final Runnable leaseLostAction = () -> takeLeaseLostAction(); - - if (reason == ShutdownReason.TERMINATE) { - try { - takeShardEndAction(currentShardLease, startTime); - } catch (InvalidStateException e) { - // If InvalidStateException happens, it indicates we have a non recoverable error in short term. - // In this scenario, we should shutdown the shardConsumer with ZOMBIE reason to allow other worker to take the lease and retry shutting down. - LOG.warn("Lease " + shardInfo.getShardId() + ": Invalid state encountered while shutting down shardConsumer with TERMINATE reason. " + - "Dropping the lease and shutting down shardConsumer using ZOMBIE reason. ", e); - dropLease(currentShardLease); - throwOnApplicationException(leaseLostAction, startTime); - } - } else { - throwOnApplicationException(leaseLostAction, startTime); - } - - LOG.debug("Shutting down retrieval strategy."); - getRecordsCache.shutdown(); - LOG.debug("Record processor completed shutdown() for shard " + shardInfo.getShardId()); - return new TaskResult(null); - } catch (Exception e) { - if (e instanceof CustomerApplicationException) { - LOG.error("Shard " + shardInfo.getShardId() + ": Application exception: ", e); - } else { - LOG.error("Shard " + shardInfo.getShardId() + ": Caught exception: ", e); - } - - exception = e; - // backoff if we encounter an exception. + if (reason == ShutdownReason.TERMINATE) { try { - Thread.sleep(this.backoffTimeMillis); - } catch (InterruptedException ie) { - LOG.debug("Interrupted sleep", ie); + takeShardEndAction(currentShardLease); + } catch (InvalidStateException e) { + // If InvalidStateException happens, it indicates we have a non recoverable error in short term. + // In this scenario, we should shutdown the shardConsumer with ZOMBIE reason to allow other worker to take the lease and retry shutting down. + LOG.warn("Lease " + shardInfo.getShardId() + ": Invalid state encountered while shutting down shardConsumer with TERMINATE reason. " + + "Dropping the lease and shutting down shardConsumer using ZOMBIE reason. ", e); + dropLease(currentShardLease); + throwOnApplicationException(leaseLostAction); } + } else { + throwOnApplicationException(leaseLostAction); + } + + LOG.debug("Shutting down retrieval strategy."); + getRecordsCache.shutdown(); + LOG.debug("Record processor completed shutdown() for shard " + shardInfo.getShardId()); + return new TaskResult(null); + } catch (Exception e) { + if (e instanceof CustomerApplicationException) { + LOG.error("Shard " + shardInfo.getShardId() + ": Application exception: ", e); + } else { + LOG.error("Shard " + shardInfo.getShardId() + ": Caught exception: ", e); + } + + exception = e; + // backoff if we encounter an exception. + try { + Thread.sleep(this.backoffTimeMillis); + } catch (InterruptedException ie) { + LOG.debug("Interrupted sleep", ie); } - } finally { - MetricsHelper.endScope(); } return new TaskResult(exception); } // Involves persisting child shard info, attempt to checkpoint and enqueueing lease for cleanup. - private void takeShardEndAction(KinesisClientLease currentShardLease, long startTime) + private void takeShardEndAction(KinesisClientLease currentShardLease) throws InvalidStateException, DependencyException, ProvisionedThroughputException, CustomerApplicationException { // Create new lease for the child shards if they don't exist. // We have one valid scenario that shutdown task got created with SHARD_END reason and an empty list of childShards. @@ -194,7 +181,7 @@ class ShutdownTask implements ITask { if (!leaseCleanupManager.isEnqueuedForDeletion(leasePendingDeletion)) { boolean isSuccess = false; try { - isSuccess = attemptShardEndCheckpointing(startTime); + isSuccess = attemptShardEndCheckpointing(); } finally { // Check if either the shard end ddb persist is successful or // if childshards is empty. When child shards is empty then either it is due to @@ -214,14 +201,14 @@ class ShutdownTask implements ITask { recordProcessor.shutdown(leaseLostShutdownInput); } - private boolean attemptShardEndCheckpointing(long startTime) + private boolean attemptShardEndCheckpointing() throws DependencyException, ProvisionedThroughputException, InvalidStateException, CustomerApplicationException { final KinesisClientLease leaseFromDdb = Optional.ofNullable(leaseCoordinator.getLeaseManager().getLease(shardInfo.getShardId())) .orElseThrow(() -> new InvalidStateException("Lease for shard " + shardInfo.getShardId() + " does not exist.")); if (!leaseFromDdb.getCheckpoint().equals(ExtendedSequenceNumber.SHARD_END)) { // Call the recordProcessor to checkpoint with SHARD_END sequence number. // The recordProcessor.shutdown is implemented by customer. We should validate if the SHARD_END checkpointing is successful after calling recordProcessor.shutdown. - throwOnApplicationException(() -> applicationCheckpointAndVerification(), startTime); + throwOnApplicationException(() -> applicationCheckpointAndVerification()); } return true; } @@ -246,13 +233,11 @@ class ShutdownTask implements ITask { } } - private void throwOnApplicationException(Runnable action, long startTime) throws CustomerApplicationException { + private void throwOnApplicationException(Runnable action) throws CustomerApplicationException { try { action.run(); } catch (Exception e) { throw new CustomerApplicationException("Customer application throws exception for shard " + shardInfo.getShardId(), e); - } finally { - MetricsHelper.addLatency(RECORD_PROCESSOR_SHUTDOWN_METRIC, startTime, MetricsLevel.SUMMARY); } } diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseSerializer.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseSerializer.java index 966faee4..310edb67 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseSerializer.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseSerializer.java @@ -121,8 +121,8 @@ public class KinesisClientLeaseSerializer implements ILeaseSerializer getDynamoExistantExpectation(final String leaseKey) { - return baseSerializer.getDynamoExistantExpectation(leaseKey); + public Map getDynamoExistentExpectation(final String leaseKey) { + return baseSerializer.getDynamoExistentExpectation(leaseKey); } @Override diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java index 36f56dda..0c70aaa7 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java @@ -620,7 +620,7 @@ public class LeaseManager implements ILeaseManager { UpdateItemRequest request = new UpdateItemRequest(); request.setTableName(table); request.setKey(serializer.getDynamoHashKey(lease)); - request.setExpected(serializer.getDynamoExistantExpectation(lease.getLeaseKey())); + request.setExpected(serializer.getDynamoExistentExpectation(lease.getLeaseKey())); Map updates = serializer.getDynamoUpdateLeaseUpdate(lease, updateField); updates.putAll(serializer.getDynamoUpdateLeaseUpdate(lease)); @@ -628,6 +628,8 @@ public class LeaseManager implements ILeaseManager { try { dynamoDBClient.updateItem(request); + } catch (ConditionalCheckFailedException e) { + LOG.warn("Lease update failed for lease with key " + lease.getLeaseKey() + " because the lease did not exist at the time of the update", e); } catch (AmazonClientException e) { throw convertAndRethrowExceptions("update", lease.getLeaseKey(), e); } diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseSerializer.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseSerializer.java index 29bf0f9b..85381560 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseSerializer.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseSerializer.java @@ -138,7 +138,7 @@ public class LeaseSerializer implements ILeaseSerializer { } @Override - public Map getDynamoExistantExpectation(final String leaseKey) { + public Map getDynamoExistentExpectation(final String leaseKey) { Map result = new HashMap<>(); ExpectedAttributeValue expectedAV = new ExpectedAttributeValue(); diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/interfaces/ILeaseSerializer.java b/src/main/java/com/amazonaws/services/kinesis/leases/interfaces/ILeaseSerializer.java index 8ff56b7d..2d9ea0c9 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/interfaces/ILeaseSerializer.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/interfaces/ILeaseSerializer.java @@ -82,8 +82,8 @@ public interface ILeaseSerializer { /** * @return the attribute value map asserting that a lease does exist. */ - default Map getDynamoExistantExpectation(final String leaseKey) { - throw new UnsupportedOperationException("DynamoExistantExpectation is not implemented"); + default Map getDynamoExistentExpectation(final String leaseKey) { + throw new UnsupportedOperationException("DynamoExistentExpectation is not implemented"); } /** diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java index 2942dcb7..8b67f5dc 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java @@ -161,8 +161,7 @@ public class ShutdownTaskTest { shardSyncer, shardSyncStrategy, constructSplitChildShards(), - leaseCleanupManager, - metricsFactory); + leaseCleanupManager); TaskResult result = task.call(); assertNotNull(result.getException()); Assert.assertTrue(result.getException() instanceof CustomerApplicationException); @@ -197,8 +196,7 @@ public class ShutdownTaskTest { shardSyncer, shardSyncStrategy, constructSplitChildShards(), - leaseCleanupManager, - metricsFactory); + leaseCleanupManager); TaskResult result = task.call(); verify(getRecordsCache).shutdown(); verify(leaseCoordinator).dropLease(any(KinesisClientLease.class)); @@ -236,8 +234,7 @@ public class ShutdownTaskTest { shardSyncer, shardSyncStrategy, constructMergeChildShards(), - leaseCleanupManager, - metricsFactory)); + leaseCleanupManager)); when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(false); TaskResult result = task.call(); assertNotNull(result.getException()); @@ -263,8 +260,7 @@ public class ShutdownTaskTest { shardSyncer, shardSyncStrategy, constructMergeChildShards(), - leaseCleanupManager, - metricsFactory)); + leaseCleanupManager)); when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(false); TaskResult result = task.call(); assertNull(result.getException()); @@ -302,8 +298,7 @@ public class ShutdownTaskTest { shardSyncer, shardSyncStrategy, constructMergeChildShards(), - leaseCleanupManager, - metricsFactory)); + leaseCleanupManager)); when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(false); TaskResult result = task.call(); assertNotNull(result.getException()); @@ -328,8 +323,7 @@ public class ShutdownTaskTest { shardSyncer, shardSyncStrategy, constructMergeChildShards(), - leaseCleanupManager, - metricsFactory)); + leaseCleanupManager)); when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(true); TaskResult result = task.call(); assertNull(result.getException()); @@ -360,8 +354,7 @@ public class ShutdownTaskTest { shardSyncer, shardSyncStrategy, constructSplitChildShards(), - leaseCleanupManager, - metricsFactory); + leaseCleanupManager); TaskResult result = task.call(); verify(leaseManager, times(2)).createLeaseIfNotExists(any(KinesisClientLease.class)); verify(leaseManager).updateLeaseWithMetaInfo(any(KinesisClientLease.class), any(UpdateField.class)); @@ -395,8 +388,7 @@ public class ShutdownTaskTest { shardSyncer, shardSyncStrategy, Collections.emptyList(), - leaseCleanupManager, - metricsFactory); + leaseCleanupManager); TaskResult result = task.call(); verify(leaseManager, never()).createLeaseIfNotExists(any(KinesisClientLease.class)); verify(leaseManager, never()).updateLeaseWithMetaInfo(any(KinesisClientLease.class), any(UpdateField.class)); @@ -426,8 +418,7 @@ public class ShutdownTaskTest { shardSyncer, shardSyncStrategy, Collections.emptyList(), - leaseCleanupManager, - metricsFactory); + leaseCleanupManager); TaskResult result = task.call(); verify(leaseManager, never()).createLeaseIfNotExists(any(KinesisClientLease.class)); verify(leaseManager, never()).updateLeaseWithMetaInfo(any(KinesisClientLease.class), any(UpdateField.class)); @@ -444,8 +435,7 @@ public class ShutdownTaskTest { ShutdownTask task = new ShutdownTask(null, null, null, null, null, null, false, false, leaseCoordinator, 0, - getRecordsCache, shardSyncer, shardSyncStrategy, Collections.emptyList(), - leaseCleanupManager, metricsFactory); + getRecordsCache, shardSyncer, shardSyncStrategy, Collections.emptyList(), leaseCleanupManager); Assert.assertEquals(TaskType.SHUTDOWN, task.getTaskType()); } diff --git a/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseManagerIntegrationTest.java b/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseManagerIntegrationTest.java index d0b0813d..c04ee44f 100644 --- a/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseManagerIntegrationTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseManagerIntegrationTest.java @@ -27,6 +27,7 @@ import com.amazonaws.services.dynamodbv2.model.ListTablesResult; import com.amazonaws.services.dynamodbv2.model.TableDescription; import com.amazonaws.services.dynamodbv2.model.TableStatus; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; +import com.amazonaws.services.kinesis.model.HashKeyRange; import junit.framework.Assert; import org.junit.Test; @@ -124,6 +125,37 @@ public class LeaseManagerIntegrationTest extends LeaseIntegrationTest { Assert.assertFalse(leaseManager.renewLease(leaseCopy)); } + /** + * Tests leaseManager.updateLeaseWithMetaInfo() when the lease is deleted before updating it with meta info + */ + @Test + public void testDeleteLeaseThenUpdateLeaseWithMetaInfo() throws LeasingException { + TestHarnessBuilder builder = new TestHarnessBuilder(leaseManager); + KinesisClientLease lease = builder.withLease("1").build().get("1"); + final String leaseKey = lease.getLeaseKey(); + leaseManager.deleteLease(lease); + leaseManager.updateLeaseWithMetaInfo(lease, UpdateField.HASH_KEY_RANGE); + final KinesisClientLease deletedLease = leaseManager.getLease(leaseKey); + Assert.assertNull(deletedLease); + } + + /** + * Tests leaseManager.updateLeaseWithMetaInfo() on hashKeyRange update + */ + @Test + public void testUpdateLeaseWithMetaInfo() throws LeasingException { + TestHarnessBuilder builder = new TestHarnessBuilder(leaseManager); + KinesisClientLease lease = builder.withLease("1").build().get("1"); + final String leaseKey = lease.getLeaseKey(); + final HashKeyRangeForLease hashKeyRangeForLease = HashKeyRangeForLease.fromHashKeyRange(new HashKeyRange() + .withStartingHashKey("1") + .withEndingHashKey("2")); + lease.setHashKeyRange(hashKeyRangeForLease); + leaseManager.updateLeaseWithMetaInfo(lease, UpdateField.HASH_KEY_RANGE); + final KinesisClientLease updatedLease = leaseManager.getLease(leaseKey); + Assert.assertEquals(lease, updatedLease); + } + /** * Tests takeLease when the lease is not already owned. */