From 1d50d766f45efc2db6b9ee627bfe1a58fe7a3164 Mon Sep 17 00:00:00 2001 From: parijas Date: Mon, 3 Feb 2020 17:00:32 -0800 Subject: [PATCH] Refactored shard closure verification performed by ShutdownTask --- .../lib/worker/ShutdownTask.java | 29 +++++------------- .../clientlibrary/proxies/IKinesisProxy.java | 13 ++++++++ .../clientlibrary/proxies/KinesisProxy.java | 30 +++++++++++++++++++ ...etricsCollectingKinesisProxyDecorator.java | 8 +++++ .../ShardClosureVerificationResponse.java | 27 +++++++++++++++++ .../lib/worker/ShutdownTaskTest.java | 11 +++++-- .../proxies/KinesisLocalFileProxy.java | 27 +++++++++++++++++ 7 files changed, 121 insertions(+), 24 deletions(-) create mode 100644 src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/ShardClosureVerificationResponse.java diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java index abbc7bb1..56c46516 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java @@ -14,9 +14,8 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +import com.amazonaws.services.kinesis.clientlibrary.proxies.ShardClosureVerificationResponse; import com.amazonaws.services.kinesis.model.Shard; -import com.amazonaws.util.CollectionUtils; -import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -29,8 +28,6 @@ import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper; import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; import com.google.common.annotations.VisibleForTesting; -import java.util.ArrayList; -import java.util.Collection; import java.util.List; /** @@ -107,11 +104,13 @@ class ShutdownTask implements ITask { * workers to contend for the lease of this shard. */ if(localReason == ShutdownReason.TERMINATE) { - latestShards = kinesisProxy.getShardList(); + ShardClosureVerificationResponse shardClosureVerificationResponse = kinesisProxy.verifyShardClosure(shardInfo.getShardId()); + latestShards = shardClosureVerificationResponse.getLatestShards(); - //If latestShards is null or empty, we should still shut down the ShardConsumer with Zombie state which avoid - // checking point with SHARD_END sequence number. - if(CollectionUtils.isNullOrEmpty(latestShards) || !isShardInContextParentOfAny(latestShards)) { + // If latestShards is null or empty, it means shard in context is not closed yet. + // We should shut down the ShardConsumer with Zombie state which avoids checkpoint-ing + // with SHARD_END sequence number. + if(!shardClosureVerificationResponse.isVerifiedShardWasClosed()) { localReason = ShutdownReason.ZOMBIE; dropLease(); LOG.info("Forcing the lease to be lost before shutting down the consumer for Shard: " + shardInfo.getShardId()); @@ -201,20 +200,6 @@ class ShutdownTask implements ITask { return reason; } - private boolean isShardInContextParentOfAny(List shards) { - for(Shard shard : shards) { - if (isChildShardOfShardInContext(shard)) { - return true; - } - } - return false; - } - - private boolean isChildShardOfShardInContext(Shard shard) { - return (StringUtils.equals(shard.getParentShardId(), shardInfo.getShardId()) - || StringUtils.equals(shard.getAdjacentParentShardId(), shardInfo.getShardId())); - } - private void dropLease() { KinesisClientLease lease = leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId()); leaseCoordinator.dropLease(lease); diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/IKinesisProxy.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/IKinesisProxy.java index 41850965..22722271 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/IKinesisProxy.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/IKinesisProxy.java @@ -17,6 +17,7 @@ package com.amazonaws.services.kinesis.clientlibrary.proxies; import java.nio.ByteBuffer; import java.util.Date; import java.util.List; +import java.util.Map; import java.util.Set; import com.amazonaws.services.kinesis.model.DescribeStreamResult; @@ -78,6 +79,18 @@ public interface IKinesisProxy { */ List getShardList() throws ResourceNotFoundException; + /** + * Used to verify during ShardConsumer shutdown if the provided shardId is for a shard that has been closed. + * Returns the list of shards so it can be used for lease creation (instead of calling getShardList() again) + * along with the result of verification. + * Note that DynamoDBStreamsProxy does not implement this check and hence resorts to the default response. + * @param shardId Id of the shard that needs to be verified. + * @return an Object of type ShardClosureVerificationResponse. + */ + default ShardClosureVerificationResponse verifyShardClosure(String shardId) { + return new ShardClosureVerificationResponse(true /*isVerifiedShardWasClosed*/, null /*latestShards*/); + } + /** * Fetch a shard iterator from the specified position in the shard. * This is to fetch a shard iterator for ShardIteratorType AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER which diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java index 06badc77..bb494d46 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java @@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.stream.Collectors; +import com.amazonaws.util.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -476,6 +477,35 @@ public class KinesisProxy implements IKinesisProxyExtended { return shards; } + /** + * Used to verify during ShardConsumer shutdown if the provided shardId is for a shard that has been closed. + * Returns the list of shards so it can be used for lease creation (instead of calling getShardList() again) + * along with the result of verification. + * @param shardId Id of the shard that needs to be verified. + * @return an Object of type ShardClosureVerificationResponse. + */ + @Override public ShardClosureVerificationResponse verifyShardClosure(String shardId) { + List shards = this.getShardList(); + if (!CollectionUtils.isNullOrEmpty(shards) && isShardParentOfAny(shardId, shards)) { + return new ShardClosureVerificationResponse(true /*isVerifiedShardWasClosed*/, shards /*latestShards*/); + } + return new ShardClosureVerificationResponse(false /*isVerifiedShardWasClosed*/, shards /*latestShards*/); + } + + private boolean isShardParentOfAny(String shardId, List shards) { + for(Shard shard : shards) { + if (isChildShardOfShard(shard, shardId)) { + return true; + } + } + return false; + } + + private boolean isChildShardOfShard(Shard shard, String shardId) { + return (StringUtils.equals(shard.getParentShardId(), shardId) + || StringUtils.equals(shard.getAdjacentParentShardId(), shardId)); + } + /** * {@inheritDoc} */ diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/MetricsCollectingKinesisProxyDecorator.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/MetricsCollectingKinesisProxyDecorator.java index 1c8e925a..2dc5e520 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/MetricsCollectingKinesisProxyDecorator.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/MetricsCollectingKinesisProxyDecorator.java @@ -179,6 +179,14 @@ public class MetricsCollectingKinesisProxyDecorator implements IKinesisProxy { } } + /** + * @param shardId Id of the shard that needs to be verified. + * @return + */ + @Override public ShardClosureVerificationResponse verifyShardClosure(String shardId) { + return other.verifyShardClosure(shardId); + } + /** * {@inheritDoc} */ diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/ShardClosureVerificationResponse.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/ShardClosureVerificationResponse.java new file mode 100644 index 00000000..ffaaf38d --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/ShardClosureVerificationResponse.java @@ -0,0 +1,27 @@ +package com.amazonaws.services.kinesis.clientlibrary.proxies; + +import com.amazonaws.services.kinesis.model.Shard; +import lombok.Getter; + +import java.util.List; + +/** + * ShutDown task verifies if the shard in context is necessarily closed before checkpointing the shard at SHARD_END. + * The verification is done by IKinesisProxy and this class is used to wrap the verification results. + */ +@Getter +public class ShardClosureVerificationResponse { + + private final boolean verifiedShardWasClosed; + private final List latestShards; + + /** + * Used to capture response from KinesisProxy.verifyShardClosure method. + * @param verifiedShardWasClosed If the provided shardId corresponds to a shard that was verified as closed. + * @param latestShards Result returned by KinesisProxy.getShardList() used for verification. + */ + public ShardClosureVerificationResponse(boolean verifiedShardWasClosed, List latestShards) { + this.verifiedShardWasClosed = verifiedShardWasClosed; + this.latestShards = latestShards; + } +} diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java index cd82e475..b408d441 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java @@ -15,6 +15,7 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -27,6 +28,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import com.amazonaws.services.kinesis.clientlibrary.proxies.ShardClosureVerificationResponse; import com.amazonaws.services.kinesis.model.HashKeyRange; import com.amazonaws.services.kinesis.model.SequenceNumberRange; import com.amazonaws.services.kinesis.model.Shard; @@ -112,6 +114,7 @@ public class ShutdownTaskTest { IKinesisProxy kinesisProxy = mock(IKinesisProxy.class); List shards = constructShardListForGraphA(); when(kinesisProxy.getShardList()).thenReturn(shards); + when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardClosureVerificationResponse(true, shards)); KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class); ILeaseManager leaseManager = mock(KinesisClientLeaseManager.class); when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); @@ -145,6 +148,7 @@ public class ShutdownTaskTest { List shards = constructShardListForGraphA(); IKinesisProxy kinesisProxy = mock(IKinesisProxy.class); when(kinesisProxy.getShardList()).thenReturn(shards); + when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardClosureVerificationResponse(true, shards)); KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class); ILeaseManager leaseManager = mock(KinesisClientLeaseManager.class); when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); @@ -180,6 +184,7 @@ public class ShutdownTaskTest { List shards = constructShardListForGraphA(); IKinesisProxy kinesisProxy = mock(IKinesisProxy.class); when(kinesisProxy.getShardList()).thenReturn(shards); + when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardClosureVerificationResponse(true, shards)); KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class); ILeaseManager leaseManager = mock(KinesisClientLeaseManager.class); when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); @@ -203,7 +208,7 @@ public class ShutdownTaskTest { shardSyncStrategy); TaskResult result = task.call(); verify(shardSyncStrategy).onShardConsumerShutDown(shards); - verify(kinesisProxy, times(1)).getShardList(); + verify(kinesisProxy, times(1)).verifyShardClosure(anyString()); Assert.assertNull(result.getException()); verify(getRecordsCache).shutdown(); verify(leaseCoordinator, never()).dropLease(any()); @@ -220,6 +225,7 @@ public class ShutdownTaskTest { List shards = constructShardListForGraphA(); IKinesisProxy kinesisProxy = mock(IKinesisProxy.class); when(kinesisProxy.getShardList()).thenReturn(shards); + when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardClosureVerificationResponse(false, shards)); KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class); ILeaseManager leaseManager = mock(KinesisClientLeaseManager.class); when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); @@ -244,7 +250,7 @@ public class ShutdownTaskTest { shardSyncStrategy); TaskResult result = task.call(); verify(shardSyncStrategy, never()).onShardConsumerShutDown(shards); - verify(kinesisProxy, times(1)).getShardList(); + verify(kinesisProxy, times(1)).verifyShardClosure(anyString()); Assert.assertNull(result.getException()); verify(getRecordsCache).shutdown(); verify(leaseCoordinator).dropLease(any()); @@ -257,6 +263,7 @@ public class ShutdownTaskTest { List shards = constructShardListForGraphA(); IKinesisProxy kinesisProxy = mock(IKinesisProxy.class); when(kinesisProxy.getShardList()).thenReturn(shards); + when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardClosureVerificationResponse(false, shards)); KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class); ILeaseManager leaseManager = mock(KinesisClientLeaseManager.class); when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxy.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxy.java index cc5089c4..abb183d0 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxy.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxy.java @@ -33,6 +33,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import com.amazonaws.util.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -423,6 +425,31 @@ public class KinesisLocalFileProxy implements IKinesisProxy { return shards; } + /** + * {@inheritDoc} + */ + @Override public ShardClosureVerificationResponse verifyShardClosure(String shardId) { + List shards = this.getShardList(); + if (!CollectionUtils.isNullOrEmpty(shards) && isShardParentOfAny(shardId, shards)) { + return new ShardClosureVerificationResponse(true /*isVerifiedShardWasClosed*/, shards /*latestShards*/); + } + return new ShardClosureVerificationResponse(false /*isVerifiedShardWasClosed*/, shards /*latestShards*/); + } + + private boolean isShardParentOfAny(String shardId, List shards) { + for(Shard shard : shards) { + if (isChildShardOfShard(shard, shardId)) { + return true; + } + } + return false; + } + + private boolean isChildShardOfShard(Shard shard, String shardId) { + return (StringUtils.equals(shard.getParentShardId(), shardId) + || StringUtils.equals(shard.getAdjacentParentShardId(), shardId)); + } + /** * Used for serializing/deserializing the shard list to the file. */