From 94a63247245ffa8255f5dea53c7bdce951b4691c Mon Sep 17 00:00:00 2001 From: parijas Date: Tue, 11 Feb 2020 14:13:19 -0800 Subject: [PATCH] Addressed PR comments --- .../lib/worker/ShutdownTask.java | 12 ++++--- .../clientlibrary/proxies/IKinesisProxy.java | 8 +---- .../clientlibrary/proxies/KinesisProxy.java | 12 +++---- ...etricsCollectingKinesisProxyDecorator.java | 3 +- .../ShardClosureVerificationResponse.java | 22 +++---------- ...ppingShardClosureVerificationResponse.java | 32 +++++++++++++++++++ .../lib/worker/ShutdownTaskTest.java | 12 +++---- .../proxies/KinesisLocalFileProxy.java | 10 +++--- 8 files changed, 62 insertions(+), 49 deletions(-) create mode 100644 src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/ShardListWrappingShardClosureVerificationResponse.java diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java index 56c46516..a9ff5080 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java @@ -15,6 +15,7 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import com.amazonaws.services.kinesis.clientlibrary.proxies.ShardClosureVerificationResponse; +import com.amazonaws.services.kinesis.clientlibrary.proxies.ShardListWrappingShardClosureVerificationResponse; import com.amazonaws.services.kinesis.model.Shard; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -105,12 +106,13 @@ class ShutdownTask implements ITask { */ if(localReason == ShutdownReason.TERMINATE) { ShardClosureVerificationResponse shardClosureVerificationResponse = kinesisProxy.verifyShardClosure(shardInfo.getShardId()); - latestShards = shardClosureVerificationResponse.getLatestShards(); + if (shardClosureVerificationResponse instanceof ShardListWrappingShardClosureVerificationResponse) { + latestShards = ((ShardListWrappingShardClosureVerificationResponse)shardClosureVerificationResponse).getLatestShards(); + } - // If latestShards is null or empty, it means shard in context is not closed yet. - // We should shut down the ShardConsumer with Zombie state which avoids checkpoint-ing - // with SHARD_END sequence number. - if(!shardClosureVerificationResponse.isVerifiedShardWasClosed()) { + // If shard in context is not closed yet we should shut down the ShardConsumer with Zombie state + // which avoids checkpoint-ing with SHARD_END sequence number. + if(!shardClosureVerificationResponse.isShardClosed()) { localReason = ShutdownReason.ZOMBIE; dropLease(); LOG.info("Forcing the lease to be lost before shutting down the consumer for Shard: " + shardInfo.getShardId()); diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/IKinesisProxy.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/IKinesisProxy.java index 22722271..6e148969 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/IKinesisProxy.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/IKinesisProxy.java @@ -17,7 +17,6 @@ package com.amazonaws.services.kinesis.clientlibrary.proxies; import java.nio.ByteBuffer; import java.util.Date; import java.util.List; -import java.util.Map; import java.util.Set; import com.amazonaws.services.kinesis.model.DescribeStreamResult; @@ -81,15 +80,10 @@ public interface IKinesisProxy { /** * Used to verify during ShardConsumer shutdown if the provided shardId is for a shard that has been closed. - * Returns the list of shards so it can be used for lease creation (instead of calling getShardList() again) - * along with the result of verification. - * Note that DynamoDBStreamsProxy does not implement this check and hence resorts to the default response. * @param shardId Id of the shard that needs to be verified. * @return an Object of type ShardClosureVerificationResponse. */ - default ShardClosureVerificationResponse verifyShardClosure(String shardId) { - return new ShardClosureVerificationResponse(true /*isVerifiedShardWasClosed*/, null /*latestShards*/); - } + ShardClosureVerificationResponse verifyShardClosure(String shardId); /** * Fetch a shard iterator from the specified position in the shard. diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java index bb494d46..6717208b 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java @@ -479,29 +479,29 @@ public class KinesisProxy implements IKinesisProxyExtended { /** * Used to verify during ShardConsumer shutdown if the provided shardId is for a shard that has been closed. - * Returns the list of shards so it can be used for lease creation (instead of calling getShardList() again) - * along with the result of verification. + * Also returns the list of shards wrapped in the response so that it can be used for lease creation + * (instead of calling getShardList() again). * @param shardId Id of the shard that needs to be verified. * @return an Object of type ShardClosureVerificationResponse. */ @Override public ShardClosureVerificationResponse verifyShardClosure(String shardId) { List shards = this.getShardList(); if (!CollectionUtils.isNullOrEmpty(shards) && isShardParentOfAny(shardId, shards)) { - return new ShardClosureVerificationResponse(true /*isVerifiedShardWasClosed*/, shards /*latestShards*/); + return new ShardListWrappingShardClosureVerificationResponse(true /*isVerifiedShardWasClosed*/, shards /*latestShards*/); } - return new ShardClosureVerificationResponse(false /*isVerifiedShardWasClosed*/, shards /*latestShards*/); + return new ShardListWrappingShardClosureVerificationResponse(false /*isVerifiedShardWasClosed*/, shards /*latestShards*/); } private boolean isShardParentOfAny(String shardId, List shards) { for(Shard shard : shards) { - if (isChildShardOfShard(shard, shardId)) { + if (isShardAParent(shard, shardId)) { return true; } } return false; } - private boolean isChildShardOfShard(Shard shard, String shardId) { + private boolean isShardAParent(Shard shard, String shardId) { return (StringUtils.equals(shard.getParentShardId(), shardId) || StringUtils.equals(shard.getAdjacentParentShardId(), shardId)); } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/MetricsCollectingKinesisProxyDecorator.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/MetricsCollectingKinesisProxyDecorator.java index 2dc5e520..230ee710 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/MetricsCollectingKinesisProxyDecorator.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/MetricsCollectingKinesisProxyDecorator.java @@ -180,8 +180,7 @@ public class MetricsCollectingKinesisProxyDecorator implements IKinesisProxy { } /** - * @param shardId Id of the shard that needs to be verified. - * @return + * {@inheritDoc} */ @Override public ShardClosureVerificationResponse verifyShardClosure(String shardId) { return other.verifyShardClosure(shardId); diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/ShardClosureVerificationResponse.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/ShardClosureVerificationResponse.java index ffaaf38d..4360a4cf 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/ShardClosureVerificationResponse.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/ShardClosureVerificationResponse.java @@ -1,27 +1,13 @@ package com.amazonaws.services.kinesis.clientlibrary.proxies; -import com.amazonaws.services.kinesis.model.Shard; -import lombok.Getter; - -import java.util.List; - /** * ShutDown task verifies if the shard in context is necessarily closed before checkpointing the shard at SHARD_END. - * The verification is done by IKinesisProxy and this class is used to wrap the verification results. + * The verification is done by IKinesisProxy and implementations of this interface are used to wrap the verification results. */ -@Getter -public class ShardClosureVerificationResponse { - - private final boolean verifiedShardWasClosed; - private final List latestShards; +public interface ShardClosureVerificationResponse { /** - * Used to capture response from KinesisProxy.verifyShardClosure method. - * @param verifiedShardWasClosed If the provided shardId corresponds to a shard that was verified as closed. - * @param latestShards Result returned by KinesisProxy.getShardList() used for verification. + * @return A boolean value indicating whether or not the shard in context (for which the request was made) is closed. */ - public ShardClosureVerificationResponse(boolean verifiedShardWasClosed, List latestShards) { - this.verifiedShardWasClosed = verifiedShardWasClosed; - this.latestShards = latestShards; - } + boolean isShardClosed(); } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/ShardListWrappingShardClosureVerificationResponse.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/ShardListWrappingShardClosureVerificationResponse.java new file mode 100644 index 00000000..83227c71 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/ShardListWrappingShardClosureVerificationResponse.java @@ -0,0 +1,32 @@ +package com.amazonaws.services.kinesis.clientlibrary.proxies; + +import com.amazonaws.services.kinesis.model.Shard; +import lombok.Getter; + +import java.util.List; + +/** + * Implementation of ShardClosureVerificationResponse that also wraps the latest results + * from IKinesisProxy.getShardList() in it. + */ +public class ShardListWrappingShardClosureVerificationResponse implements ShardClosureVerificationResponse { + + private final boolean isShardClosed; + + @Getter + private final List latestShards; + + /** + * Used to capture response from KinesisProxy.verifyShardClosure method. + * @param isShardClosed If the provided shardId corresponds to a shard that was verified as closed. + * @param latestShards Result returned by KinesisProxy.getShardList() used for verification. + */ + public ShardListWrappingShardClosureVerificationResponse(boolean isShardClosed, List latestShards) { + this.isShardClosed = isShardClosed; + this.latestShards = latestShards; + } + + @Override public boolean isShardClosed() { + return isShardClosed; + } +} diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java index b408d441..04fadd88 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java @@ -28,7 +28,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; -import com.amazonaws.services.kinesis.clientlibrary.proxies.ShardClosureVerificationResponse; +import com.amazonaws.services.kinesis.clientlibrary.proxies.ShardListWrappingShardClosureVerificationResponse; import com.amazonaws.services.kinesis.model.HashKeyRange; import com.amazonaws.services.kinesis.model.SequenceNumberRange; import com.amazonaws.services.kinesis.model.Shard; @@ -114,7 +114,7 @@ public class ShutdownTaskTest { IKinesisProxy kinesisProxy = mock(IKinesisProxy.class); List shards = constructShardListForGraphA(); when(kinesisProxy.getShardList()).thenReturn(shards); - when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardClosureVerificationResponse(true, shards)); + when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardListWrappingShardClosureVerificationResponse(true, shards)); KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class); ILeaseManager leaseManager = mock(KinesisClientLeaseManager.class); when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); @@ -148,7 +148,7 @@ public class ShutdownTaskTest { List shards = constructShardListForGraphA(); IKinesisProxy kinesisProxy = mock(IKinesisProxy.class); when(kinesisProxy.getShardList()).thenReturn(shards); - when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardClosureVerificationResponse(true, shards)); + when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardListWrappingShardClosureVerificationResponse(true, shards)); KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class); ILeaseManager leaseManager = mock(KinesisClientLeaseManager.class); when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); @@ -184,7 +184,7 @@ public class ShutdownTaskTest { List shards = constructShardListForGraphA(); IKinesisProxy kinesisProxy = mock(IKinesisProxy.class); when(kinesisProxy.getShardList()).thenReturn(shards); - when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardClosureVerificationResponse(true, shards)); + when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardListWrappingShardClosureVerificationResponse(true, shards)); KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class); ILeaseManager leaseManager = mock(KinesisClientLeaseManager.class); when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); @@ -225,7 +225,7 @@ public class ShutdownTaskTest { List shards = constructShardListForGraphA(); IKinesisProxy kinesisProxy = mock(IKinesisProxy.class); when(kinesisProxy.getShardList()).thenReturn(shards); - when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardClosureVerificationResponse(false, shards)); + when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardListWrappingShardClosureVerificationResponse(false, shards)); KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class); ILeaseManager leaseManager = mock(KinesisClientLeaseManager.class); when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); @@ -263,7 +263,7 @@ public class ShutdownTaskTest { List shards = constructShardListForGraphA(); IKinesisProxy kinesisProxy = mock(IKinesisProxy.class); when(kinesisProxy.getShardList()).thenReturn(shards); - when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardClosureVerificationResponse(false, shards)); + when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardListWrappingShardClosureVerificationResponse(false, shards)); KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class); ILeaseManager leaseManager = mock(KinesisClientLeaseManager.class); when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxy.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxy.java index abb183d0..d78f5ca0 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxy.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxy.java @@ -428,24 +428,24 @@ public class KinesisLocalFileProxy implements IKinesisProxy { /** * {@inheritDoc} */ - @Override public ShardClosureVerificationResponse verifyShardClosure(String shardId) { + @Override public ShardListWrappingShardClosureVerificationResponse verifyShardClosure(String shardId) { List shards = this.getShardList(); if (!CollectionUtils.isNullOrEmpty(shards) && isShardParentOfAny(shardId, shards)) { - return new ShardClosureVerificationResponse(true /*isVerifiedShardWasClosed*/, shards /*latestShards*/); + return new ShardListWrappingShardClosureVerificationResponse(true /*isVerifiedShardWasClosed*/, shards /*latestShards*/); } - return new ShardClosureVerificationResponse(false /*isVerifiedShardWasClosed*/, shards /*latestShards*/); + return new ShardListWrappingShardClosureVerificationResponse(false /*isVerifiedShardWasClosed*/, shards /*latestShards*/); } private boolean isShardParentOfAny(String shardId, List shards) { for(Shard shard : shards) { - if (isChildShardOfShard(shard, shardId)) { + if (isShardAParent(shard, shardId)) { return true; } } return false; } - private boolean isChildShardOfShard(Shard shard, String shardId) { + private boolean isShardAParent(Shard shard, String shardId) { return (StringUtils.equals(shard.getParentShardId(), shardId) || StringUtils.equals(shard.getAdjacentParentShardId(), shardId)); }