From 1d50d766f45efc2db6b9ee627bfe1a58fe7a3164 Mon Sep 17 00:00:00 2001 From: parijas Date: Mon, 3 Feb 2020 17:00:32 -0800 Subject: [PATCH 1/2] Refactored shard closure verification performed by ShutdownTask --- .../lib/worker/ShutdownTask.java | 29 +++++------------- .../clientlibrary/proxies/IKinesisProxy.java | 13 ++++++++ .../clientlibrary/proxies/KinesisProxy.java | 30 +++++++++++++++++++ ...etricsCollectingKinesisProxyDecorator.java | 8 +++++ .../ShardClosureVerificationResponse.java | 27 +++++++++++++++++ .../lib/worker/ShutdownTaskTest.java | 11 +++++-- .../proxies/KinesisLocalFileProxy.java | 27 +++++++++++++++++ 7 files changed, 121 insertions(+), 24 deletions(-) create mode 100644 src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/ShardClosureVerificationResponse.java diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java index abbc7bb1..56c46516 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java @@ -14,9 +14,8 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +import com.amazonaws.services.kinesis.clientlibrary.proxies.ShardClosureVerificationResponse; import com.amazonaws.services.kinesis.model.Shard; -import com.amazonaws.util.CollectionUtils; -import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -29,8 +28,6 @@ import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper; import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; import com.google.common.annotations.VisibleForTesting; -import java.util.ArrayList; -import java.util.Collection; import java.util.List; /** @@ -107,11 +104,13 @@ class ShutdownTask implements ITask { * workers to contend for the lease of this shard. */ if(localReason == ShutdownReason.TERMINATE) { - latestShards = kinesisProxy.getShardList(); + ShardClosureVerificationResponse shardClosureVerificationResponse = kinesisProxy.verifyShardClosure(shardInfo.getShardId()); + latestShards = shardClosureVerificationResponse.getLatestShards(); - //If latestShards is null or empty, we should still shut down the ShardConsumer with Zombie state which avoid - // checking point with SHARD_END sequence number. - if(CollectionUtils.isNullOrEmpty(latestShards) || !isShardInContextParentOfAny(latestShards)) { + // If latestShards is null or empty, it means shard in context is not closed yet. + // We should shut down the ShardConsumer with Zombie state which avoids checkpoint-ing + // with SHARD_END sequence number. + if(!shardClosureVerificationResponse.isVerifiedShardWasClosed()) { localReason = ShutdownReason.ZOMBIE; dropLease(); LOG.info("Forcing the lease to be lost before shutting down the consumer for Shard: " + shardInfo.getShardId()); @@ -201,20 +200,6 @@ class ShutdownTask implements ITask { return reason; } - private boolean isShardInContextParentOfAny(List shards) { - for(Shard shard : shards) { - if (isChildShardOfShardInContext(shard)) { - return true; - } - } - return false; - } - - private boolean isChildShardOfShardInContext(Shard shard) { - return (StringUtils.equals(shard.getParentShardId(), shardInfo.getShardId()) - || StringUtils.equals(shard.getAdjacentParentShardId(), shardInfo.getShardId())); - } - private void dropLease() { KinesisClientLease lease = leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId()); leaseCoordinator.dropLease(lease); diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/IKinesisProxy.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/IKinesisProxy.java index 41850965..22722271 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/IKinesisProxy.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/IKinesisProxy.java @@ -17,6 +17,7 @@ package com.amazonaws.services.kinesis.clientlibrary.proxies; import java.nio.ByteBuffer; import java.util.Date; import java.util.List; +import java.util.Map; import java.util.Set; import com.amazonaws.services.kinesis.model.DescribeStreamResult; @@ -78,6 +79,18 @@ public interface IKinesisProxy { */ List getShardList() throws ResourceNotFoundException; + /** + * Used to verify during ShardConsumer shutdown if the provided shardId is for a shard that has been closed. + * Returns the list of shards so it can be used for lease creation (instead of calling getShardList() again) + * along with the result of verification. + * Note that DynamoDBStreamsProxy does not implement this check and hence resorts to the default response. + * @param shardId Id of the shard that needs to be verified. + * @return an Object of type ShardClosureVerificationResponse. + */ + default ShardClosureVerificationResponse verifyShardClosure(String shardId) { + return new ShardClosureVerificationResponse(true /*isVerifiedShardWasClosed*/, null /*latestShards*/); + } + /** * Fetch a shard iterator from the specified position in the shard. * This is to fetch a shard iterator for ShardIteratorType AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER which diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java index 06badc77..bb494d46 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java @@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.stream.Collectors; +import com.amazonaws.util.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -476,6 +477,35 @@ public class KinesisProxy implements IKinesisProxyExtended { return shards; } + /** + * Used to verify during ShardConsumer shutdown if the provided shardId is for a shard that has been closed. + * Returns the list of shards so it can be used for lease creation (instead of calling getShardList() again) + * along with the result of verification. + * @param shardId Id of the shard that needs to be verified. + * @return an Object of type ShardClosureVerificationResponse. + */ + @Override public ShardClosureVerificationResponse verifyShardClosure(String shardId) { + List shards = this.getShardList(); + if (!CollectionUtils.isNullOrEmpty(shards) && isShardParentOfAny(shardId, shards)) { + return new ShardClosureVerificationResponse(true /*isVerifiedShardWasClosed*/, shards /*latestShards*/); + } + return new ShardClosureVerificationResponse(false /*isVerifiedShardWasClosed*/, shards /*latestShards*/); + } + + private boolean isShardParentOfAny(String shardId, List shards) { + for(Shard shard : shards) { + if (isChildShardOfShard(shard, shardId)) { + return true; + } + } + return false; + } + + private boolean isChildShardOfShard(Shard shard, String shardId) { + return (StringUtils.equals(shard.getParentShardId(), shardId) + || StringUtils.equals(shard.getAdjacentParentShardId(), shardId)); + } + /** * {@inheritDoc} */ diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/MetricsCollectingKinesisProxyDecorator.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/MetricsCollectingKinesisProxyDecorator.java index 1c8e925a..2dc5e520 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/MetricsCollectingKinesisProxyDecorator.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/MetricsCollectingKinesisProxyDecorator.java @@ -179,6 +179,14 @@ public class MetricsCollectingKinesisProxyDecorator implements IKinesisProxy { } } + /** + * @param shardId Id of the shard that needs to be verified. + * @return + */ + @Override public ShardClosureVerificationResponse verifyShardClosure(String shardId) { + return other.verifyShardClosure(shardId); + } + /** * {@inheritDoc} */ diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/ShardClosureVerificationResponse.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/ShardClosureVerificationResponse.java new file mode 100644 index 00000000..ffaaf38d --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/ShardClosureVerificationResponse.java @@ -0,0 +1,27 @@ +package com.amazonaws.services.kinesis.clientlibrary.proxies; + +import com.amazonaws.services.kinesis.model.Shard; +import lombok.Getter; + +import java.util.List; + +/** + * ShutDown task verifies if the shard in context is necessarily closed before checkpointing the shard at SHARD_END. + * The verification is done by IKinesisProxy and this class is used to wrap the verification results. + */ +@Getter +public class ShardClosureVerificationResponse { + + private final boolean verifiedShardWasClosed; + private final List latestShards; + + /** + * Used to capture response from KinesisProxy.verifyShardClosure method. + * @param verifiedShardWasClosed If the provided shardId corresponds to a shard that was verified as closed. + * @param latestShards Result returned by KinesisProxy.getShardList() used for verification. + */ + public ShardClosureVerificationResponse(boolean verifiedShardWasClosed, List latestShards) { + this.verifiedShardWasClosed = verifiedShardWasClosed; + this.latestShards = latestShards; + } +} diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java index cd82e475..b408d441 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java @@ -15,6 +15,7 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -27,6 +28,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import com.amazonaws.services.kinesis.clientlibrary.proxies.ShardClosureVerificationResponse; import com.amazonaws.services.kinesis.model.HashKeyRange; import com.amazonaws.services.kinesis.model.SequenceNumberRange; import com.amazonaws.services.kinesis.model.Shard; @@ -112,6 +114,7 @@ public class ShutdownTaskTest { IKinesisProxy kinesisProxy = mock(IKinesisProxy.class); List shards = constructShardListForGraphA(); when(kinesisProxy.getShardList()).thenReturn(shards); + when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardClosureVerificationResponse(true, shards)); KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class); ILeaseManager leaseManager = mock(KinesisClientLeaseManager.class); when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); @@ -145,6 +148,7 @@ public class ShutdownTaskTest { List shards = constructShardListForGraphA(); IKinesisProxy kinesisProxy = mock(IKinesisProxy.class); when(kinesisProxy.getShardList()).thenReturn(shards); + when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardClosureVerificationResponse(true, shards)); KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class); ILeaseManager leaseManager = mock(KinesisClientLeaseManager.class); when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); @@ -180,6 +184,7 @@ public class ShutdownTaskTest { List shards = constructShardListForGraphA(); IKinesisProxy kinesisProxy = mock(IKinesisProxy.class); when(kinesisProxy.getShardList()).thenReturn(shards); + when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardClosureVerificationResponse(true, shards)); KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class); ILeaseManager leaseManager = mock(KinesisClientLeaseManager.class); when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); @@ -203,7 +208,7 @@ public class ShutdownTaskTest { shardSyncStrategy); TaskResult result = task.call(); verify(shardSyncStrategy).onShardConsumerShutDown(shards); - verify(kinesisProxy, times(1)).getShardList(); + verify(kinesisProxy, times(1)).verifyShardClosure(anyString()); Assert.assertNull(result.getException()); verify(getRecordsCache).shutdown(); verify(leaseCoordinator, never()).dropLease(any()); @@ -220,6 +225,7 @@ public class ShutdownTaskTest { List shards = constructShardListForGraphA(); IKinesisProxy kinesisProxy = mock(IKinesisProxy.class); when(kinesisProxy.getShardList()).thenReturn(shards); + when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardClosureVerificationResponse(false, shards)); KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class); ILeaseManager leaseManager = mock(KinesisClientLeaseManager.class); when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); @@ -244,7 +250,7 @@ public class ShutdownTaskTest { shardSyncStrategy); TaskResult result = task.call(); verify(shardSyncStrategy, never()).onShardConsumerShutDown(shards); - verify(kinesisProxy, times(1)).getShardList(); + verify(kinesisProxy, times(1)).verifyShardClosure(anyString()); Assert.assertNull(result.getException()); verify(getRecordsCache).shutdown(); verify(leaseCoordinator).dropLease(any()); @@ -257,6 +263,7 @@ public class ShutdownTaskTest { List shards = constructShardListForGraphA(); IKinesisProxy kinesisProxy = mock(IKinesisProxy.class); when(kinesisProxy.getShardList()).thenReturn(shards); + when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardClosureVerificationResponse(false, shards)); KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class); ILeaseManager leaseManager = mock(KinesisClientLeaseManager.class); when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxy.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxy.java index cc5089c4..abb183d0 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxy.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxy.java @@ -33,6 +33,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import com.amazonaws.util.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -423,6 +425,31 @@ public class KinesisLocalFileProxy implements IKinesisProxy { return shards; } + /** + * {@inheritDoc} + */ + @Override public ShardClosureVerificationResponse verifyShardClosure(String shardId) { + List shards = this.getShardList(); + if (!CollectionUtils.isNullOrEmpty(shards) && isShardParentOfAny(shardId, shards)) { + return new ShardClosureVerificationResponse(true /*isVerifiedShardWasClosed*/, shards /*latestShards*/); + } + return new ShardClosureVerificationResponse(false /*isVerifiedShardWasClosed*/, shards /*latestShards*/); + } + + private boolean isShardParentOfAny(String shardId, List shards) { + for(Shard shard : shards) { + if (isChildShardOfShard(shard, shardId)) { + return true; + } + } + return false; + } + + private boolean isChildShardOfShard(Shard shard, String shardId) { + return (StringUtils.equals(shard.getParentShardId(), shardId) + || StringUtils.equals(shard.getAdjacentParentShardId(), shardId)); + } + /** * Used for serializing/deserializing the shard list to the file. */ From 94a63247245ffa8255f5dea53c7bdce951b4691c Mon Sep 17 00:00:00 2001 From: parijas Date: Tue, 11 Feb 2020 14:13:19 -0800 Subject: [PATCH 2/2] Addressed PR comments --- .../lib/worker/ShutdownTask.java | 12 ++++--- .../clientlibrary/proxies/IKinesisProxy.java | 8 +---- .../clientlibrary/proxies/KinesisProxy.java | 12 +++---- ...etricsCollectingKinesisProxyDecorator.java | 3 +- .../ShardClosureVerificationResponse.java | 22 +++---------- ...ppingShardClosureVerificationResponse.java | 32 +++++++++++++++++++ .../lib/worker/ShutdownTaskTest.java | 12 +++---- .../proxies/KinesisLocalFileProxy.java | 10 +++--- 8 files changed, 62 insertions(+), 49 deletions(-) create mode 100644 src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/ShardListWrappingShardClosureVerificationResponse.java diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java index 56c46516..a9ff5080 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java @@ -15,6 +15,7 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import com.amazonaws.services.kinesis.clientlibrary.proxies.ShardClosureVerificationResponse; +import com.amazonaws.services.kinesis.clientlibrary.proxies.ShardListWrappingShardClosureVerificationResponse; import com.amazonaws.services.kinesis.model.Shard; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -105,12 +106,13 @@ class ShutdownTask implements ITask { */ if(localReason == ShutdownReason.TERMINATE) { ShardClosureVerificationResponse shardClosureVerificationResponse = kinesisProxy.verifyShardClosure(shardInfo.getShardId()); - latestShards = shardClosureVerificationResponse.getLatestShards(); + if (shardClosureVerificationResponse instanceof ShardListWrappingShardClosureVerificationResponse) { + latestShards = ((ShardListWrappingShardClosureVerificationResponse)shardClosureVerificationResponse).getLatestShards(); + } - // If latestShards is null or empty, it means shard in context is not closed yet. - // We should shut down the ShardConsumer with Zombie state which avoids checkpoint-ing - // with SHARD_END sequence number. - if(!shardClosureVerificationResponse.isVerifiedShardWasClosed()) { + // If shard in context is not closed yet we should shut down the ShardConsumer with Zombie state + // which avoids checkpoint-ing with SHARD_END sequence number. + if(!shardClosureVerificationResponse.isShardClosed()) { localReason = ShutdownReason.ZOMBIE; dropLease(); LOG.info("Forcing the lease to be lost before shutting down the consumer for Shard: " + shardInfo.getShardId()); diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/IKinesisProxy.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/IKinesisProxy.java index 22722271..6e148969 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/IKinesisProxy.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/IKinesisProxy.java @@ -17,7 +17,6 @@ package com.amazonaws.services.kinesis.clientlibrary.proxies; import java.nio.ByteBuffer; import java.util.Date; import java.util.List; -import java.util.Map; import java.util.Set; import com.amazonaws.services.kinesis.model.DescribeStreamResult; @@ -81,15 +80,10 @@ public interface IKinesisProxy { /** * Used to verify during ShardConsumer shutdown if the provided shardId is for a shard that has been closed. - * Returns the list of shards so it can be used for lease creation (instead of calling getShardList() again) - * along with the result of verification. - * Note that DynamoDBStreamsProxy does not implement this check and hence resorts to the default response. * @param shardId Id of the shard that needs to be verified. * @return an Object of type ShardClosureVerificationResponse. */ - default ShardClosureVerificationResponse verifyShardClosure(String shardId) { - return new ShardClosureVerificationResponse(true /*isVerifiedShardWasClosed*/, null /*latestShards*/); - } + ShardClosureVerificationResponse verifyShardClosure(String shardId); /** * Fetch a shard iterator from the specified position in the shard. diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java index bb494d46..6717208b 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java @@ -479,29 +479,29 @@ public class KinesisProxy implements IKinesisProxyExtended { /** * Used to verify during ShardConsumer shutdown if the provided shardId is for a shard that has been closed. - * Returns the list of shards so it can be used for lease creation (instead of calling getShardList() again) - * along with the result of verification. + * Also returns the list of shards wrapped in the response so that it can be used for lease creation + * (instead of calling getShardList() again). * @param shardId Id of the shard that needs to be verified. * @return an Object of type ShardClosureVerificationResponse. */ @Override public ShardClosureVerificationResponse verifyShardClosure(String shardId) { List shards = this.getShardList(); if (!CollectionUtils.isNullOrEmpty(shards) && isShardParentOfAny(shardId, shards)) { - return new ShardClosureVerificationResponse(true /*isVerifiedShardWasClosed*/, shards /*latestShards*/); + return new ShardListWrappingShardClosureVerificationResponse(true /*isVerifiedShardWasClosed*/, shards /*latestShards*/); } - return new ShardClosureVerificationResponse(false /*isVerifiedShardWasClosed*/, shards /*latestShards*/); + return new ShardListWrappingShardClosureVerificationResponse(false /*isVerifiedShardWasClosed*/, shards /*latestShards*/); } private boolean isShardParentOfAny(String shardId, List shards) { for(Shard shard : shards) { - if (isChildShardOfShard(shard, shardId)) { + if (isShardAParent(shard, shardId)) { return true; } } return false; } - private boolean isChildShardOfShard(Shard shard, String shardId) { + private boolean isShardAParent(Shard shard, String shardId) { return (StringUtils.equals(shard.getParentShardId(), shardId) || StringUtils.equals(shard.getAdjacentParentShardId(), shardId)); } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/MetricsCollectingKinesisProxyDecorator.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/MetricsCollectingKinesisProxyDecorator.java index 2dc5e520..230ee710 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/MetricsCollectingKinesisProxyDecorator.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/MetricsCollectingKinesisProxyDecorator.java @@ -180,8 +180,7 @@ public class MetricsCollectingKinesisProxyDecorator implements IKinesisProxy { } /** - * @param shardId Id of the shard that needs to be verified. - * @return + * {@inheritDoc} */ @Override public ShardClosureVerificationResponse verifyShardClosure(String shardId) { return other.verifyShardClosure(shardId); diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/ShardClosureVerificationResponse.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/ShardClosureVerificationResponse.java index ffaaf38d..4360a4cf 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/ShardClosureVerificationResponse.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/ShardClosureVerificationResponse.java @@ -1,27 +1,13 @@ package com.amazonaws.services.kinesis.clientlibrary.proxies; -import com.amazonaws.services.kinesis.model.Shard; -import lombok.Getter; - -import java.util.List; - /** * ShutDown task verifies if the shard in context is necessarily closed before checkpointing the shard at SHARD_END. - * The verification is done by IKinesisProxy and this class is used to wrap the verification results. + * The verification is done by IKinesisProxy and implementations of this interface are used to wrap the verification results. */ -@Getter -public class ShardClosureVerificationResponse { - - private final boolean verifiedShardWasClosed; - private final List latestShards; +public interface ShardClosureVerificationResponse { /** - * Used to capture response from KinesisProxy.verifyShardClosure method. - * @param verifiedShardWasClosed If the provided shardId corresponds to a shard that was verified as closed. - * @param latestShards Result returned by KinesisProxy.getShardList() used for verification. + * @return A boolean value indicating whether or not the shard in context (for which the request was made) is closed. */ - public ShardClosureVerificationResponse(boolean verifiedShardWasClosed, List latestShards) { - this.verifiedShardWasClosed = verifiedShardWasClosed; - this.latestShards = latestShards; - } + boolean isShardClosed(); } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/ShardListWrappingShardClosureVerificationResponse.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/ShardListWrappingShardClosureVerificationResponse.java new file mode 100644 index 00000000..83227c71 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/ShardListWrappingShardClosureVerificationResponse.java @@ -0,0 +1,32 @@ +package com.amazonaws.services.kinesis.clientlibrary.proxies; + +import com.amazonaws.services.kinesis.model.Shard; +import lombok.Getter; + +import java.util.List; + +/** + * Implementation of ShardClosureVerificationResponse that also wraps the latest results + * from IKinesisProxy.getShardList() in it. + */ +public class ShardListWrappingShardClosureVerificationResponse implements ShardClosureVerificationResponse { + + private final boolean isShardClosed; + + @Getter + private final List latestShards; + + /** + * Used to capture response from KinesisProxy.verifyShardClosure method. + * @param isShardClosed If the provided shardId corresponds to a shard that was verified as closed. + * @param latestShards Result returned by KinesisProxy.getShardList() used for verification. + */ + public ShardListWrappingShardClosureVerificationResponse(boolean isShardClosed, List latestShards) { + this.isShardClosed = isShardClosed; + this.latestShards = latestShards; + } + + @Override public boolean isShardClosed() { + return isShardClosed; + } +} diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java index b408d441..04fadd88 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java @@ -28,7 +28,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; -import com.amazonaws.services.kinesis.clientlibrary.proxies.ShardClosureVerificationResponse; +import com.amazonaws.services.kinesis.clientlibrary.proxies.ShardListWrappingShardClosureVerificationResponse; import com.amazonaws.services.kinesis.model.HashKeyRange; import com.amazonaws.services.kinesis.model.SequenceNumberRange; import com.amazonaws.services.kinesis.model.Shard; @@ -114,7 +114,7 @@ public class ShutdownTaskTest { IKinesisProxy kinesisProxy = mock(IKinesisProxy.class); List shards = constructShardListForGraphA(); when(kinesisProxy.getShardList()).thenReturn(shards); - when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardClosureVerificationResponse(true, shards)); + when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardListWrappingShardClosureVerificationResponse(true, shards)); KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class); ILeaseManager leaseManager = mock(KinesisClientLeaseManager.class); when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); @@ -148,7 +148,7 @@ public class ShutdownTaskTest { List shards = constructShardListForGraphA(); IKinesisProxy kinesisProxy = mock(IKinesisProxy.class); when(kinesisProxy.getShardList()).thenReturn(shards); - when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardClosureVerificationResponse(true, shards)); + when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardListWrappingShardClosureVerificationResponse(true, shards)); KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class); ILeaseManager leaseManager = mock(KinesisClientLeaseManager.class); when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); @@ -184,7 +184,7 @@ public class ShutdownTaskTest { List shards = constructShardListForGraphA(); IKinesisProxy kinesisProxy = mock(IKinesisProxy.class); when(kinesisProxy.getShardList()).thenReturn(shards); - when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardClosureVerificationResponse(true, shards)); + when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardListWrappingShardClosureVerificationResponse(true, shards)); KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class); ILeaseManager leaseManager = mock(KinesisClientLeaseManager.class); when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); @@ -225,7 +225,7 @@ public class ShutdownTaskTest { List shards = constructShardListForGraphA(); IKinesisProxy kinesisProxy = mock(IKinesisProxy.class); when(kinesisProxy.getShardList()).thenReturn(shards); - when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardClosureVerificationResponse(false, shards)); + when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardListWrappingShardClosureVerificationResponse(false, shards)); KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class); ILeaseManager leaseManager = mock(KinesisClientLeaseManager.class); when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); @@ -263,7 +263,7 @@ public class ShutdownTaskTest { List shards = constructShardListForGraphA(); IKinesisProxy kinesisProxy = mock(IKinesisProxy.class); when(kinesisProxy.getShardList()).thenReturn(shards); - when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardClosureVerificationResponse(false, shards)); + when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardListWrappingShardClosureVerificationResponse(false, shards)); KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class); ILeaseManager leaseManager = mock(KinesisClientLeaseManager.class); when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxy.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxy.java index abb183d0..d78f5ca0 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxy.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxy.java @@ -428,24 +428,24 @@ public class KinesisLocalFileProxy implements IKinesisProxy { /** * {@inheritDoc} */ - @Override public ShardClosureVerificationResponse verifyShardClosure(String shardId) { + @Override public ShardListWrappingShardClosureVerificationResponse verifyShardClosure(String shardId) { List shards = this.getShardList(); if (!CollectionUtils.isNullOrEmpty(shards) && isShardParentOfAny(shardId, shards)) { - return new ShardClosureVerificationResponse(true /*isVerifiedShardWasClosed*/, shards /*latestShards*/); + return new ShardListWrappingShardClosureVerificationResponse(true /*isVerifiedShardWasClosed*/, shards /*latestShards*/); } - return new ShardClosureVerificationResponse(false /*isVerifiedShardWasClosed*/, shards /*latestShards*/); + return new ShardListWrappingShardClosureVerificationResponse(false /*isVerifiedShardWasClosed*/, shards /*latestShards*/); } private boolean isShardParentOfAny(String shardId, List shards) { for(Shard shard : shards) { - if (isChildShardOfShard(shard, shardId)) { + if (isShardAParent(shard, shardId)) { return true; } } return false; } - private boolean isChildShardOfShard(Shard shard, String shardId) { + private boolean isShardAParent(Shard shard, String shardId) { return (StringUtils.equals(shard.getParentShardId(), shardId) || StringUtils.equals(shard.getAdjacentParentShardId(), shardId)); }