diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java index abbc7bb1..a9ff5080 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java @@ -14,9 +14,9 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +import com.amazonaws.services.kinesis.clientlibrary.proxies.ShardClosureVerificationResponse; +import com.amazonaws.services.kinesis.clientlibrary.proxies.ShardListWrappingShardClosureVerificationResponse; import com.amazonaws.services.kinesis.model.Shard; -import com.amazonaws.util.CollectionUtils; -import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -29,8 +29,6 @@ import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper; import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; import com.google.common.annotations.VisibleForTesting; -import java.util.ArrayList; -import java.util.Collection; import java.util.List; /** @@ -107,11 +105,14 @@ class ShutdownTask implements ITask { * workers to contend for the lease of this shard. */ if(localReason == ShutdownReason.TERMINATE) { - latestShards = kinesisProxy.getShardList(); + ShardClosureVerificationResponse shardClosureVerificationResponse = kinesisProxy.verifyShardClosure(shardInfo.getShardId()); + if (shardClosureVerificationResponse instanceof ShardListWrappingShardClosureVerificationResponse) { + latestShards = ((ShardListWrappingShardClosureVerificationResponse)shardClosureVerificationResponse).getLatestShards(); + } - //If latestShards is null or empty, we should still shut down the ShardConsumer with Zombie state which avoid - // checking point with SHARD_END sequence number. - if(CollectionUtils.isNullOrEmpty(latestShards) || !isShardInContextParentOfAny(latestShards)) { + // If shard in context is not closed yet we should shut down the ShardConsumer with Zombie state + // which avoids checkpoint-ing with SHARD_END sequence number. + if(!shardClosureVerificationResponse.isShardClosed()) { localReason = ShutdownReason.ZOMBIE; dropLease(); LOG.info("Forcing the lease to be lost before shutting down the consumer for Shard: " + shardInfo.getShardId()); @@ -201,20 +202,6 @@ class ShutdownTask implements ITask { return reason; } - private boolean isShardInContextParentOfAny(List shards) { - for(Shard shard : shards) { - if (isChildShardOfShardInContext(shard)) { - return true; - } - } - return false; - } - - private boolean isChildShardOfShardInContext(Shard shard) { - return (StringUtils.equals(shard.getParentShardId(), shardInfo.getShardId()) - || StringUtils.equals(shard.getAdjacentParentShardId(), shardInfo.getShardId())); - } - private void dropLease() { KinesisClientLease lease = leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId()); leaseCoordinator.dropLease(lease); diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/IKinesisProxy.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/IKinesisProxy.java index 41850965..6e148969 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/IKinesisProxy.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/IKinesisProxy.java @@ -78,6 +78,13 @@ public interface IKinesisProxy { */ List getShardList() throws ResourceNotFoundException; + /** + * Used to verify during ShardConsumer shutdown if the provided shardId is for a shard that has been closed. + * @param shardId Id of the shard that needs to be verified. + * @return an Object of type ShardClosureVerificationResponse. + */ + ShardClosureVerificationResponse verifyShardClosure(String shardId); + /** * Fetch a shard iterator from the specified position in the shard. * This is to fetch a shard iterator for ShardIteratorType AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER which diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java index 06badc77..6717208b 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java @@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.stream.Collectors; +import com.amazonaws.util.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -476,6 +477,35 @@ public class KinesisProxy implements IKinesisProxyExtended { return shards; } + /** + * Used to verify during ShardConsumer shutdown if the provided shardId is for a shard that has been closed. + * Also returns the list of shards wrapped in the response so that it can be used for lease creation + * (instead of calling getShardList() again). + * @param shardId Id of the shard that needs to be verified. + * @return an Object of type ShardClosureVerificationResponse. + */ + @Override public ShardClosureVerificationResponse verifyShardClosure(String shardId) { + List shards = this.getShardList(); + if (!CollectionUtils.isNullOrEmpty(shards) && isShardParentOfAny(shardId, shards)) { + return new ShardListWrappingShardClosureVerificationResponse(true /*isVerifiedShardWasClosed*/, shards /*latestShards*/); + } + return new ShardListWrappingShardClosureVerificationResponse(false /*isVerifiedShardWasClosed*/, shards /*latestShards*/); + } + + private boolean isShardParentOfAny(String shardId, List shards) { + for(Shard shard : shards) { + if (isShardAParent(shard, shardId)) { + return true; + } + } + return false; + } + + private boolean isShardAParent(Shard shard, String shardId) { + return (StringUtils.equals(shard.getParentShardId(), shardId) + || StringUtils.equals(shard.getAdjacentParentShardId(), shardId)); + } + /** * {@inheritDoc} */ diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/MetricsCollectingKinesisProxyDecorator.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/MetricsCollectingKinesisProxyDecorator.java index 1c8e925a..230ee710 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/MetricsCollectingKinesisProxyDecorator.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/MetricsCollectingKinesisProxyDecorator.java @@ -179,6 +179,13 @@ public class MetricsCollectingKinesisProxyDecorator implements IKinesisProxy { } } + /** + * {@inheritDoc} + */ + @Override public ShardClosureVerificationResponse verifyShardClosure(String shardId) { + return other.verifyShardClosure(shardId); + } + /** * {@inheritDoc} */ diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/ShardClosureVerificationResponse.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/ShardClosureVerificationResponse.java new file mode 100644 index 00000000..4360a4cf --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/ShardClosureVerificationResponse.java @@ -0,0 +1,13 @@ +package com.amazonaws.services.kinesis.clientlibrary.proxies; + +/** + * ShutDown task verifies if the shard in context is necessarily closed before checkpointing the shard at SHARD_END. + * The verification is done by IKinesisProxy and implementations of this interface are used to wrap the verification results. + */ +public interface ShardClosureVerificationResponse { + + /** + * @return A boolean value indicating whether or not the shard in context (for which the request was made) is closed. + */ + boolean isShardClosed(); +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/ShardListWrappingShardClosureVerificationResponse.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/ShardListWrappingShardClosureVerificationResponse.java new file mode 100644 index 00000000..83227c71 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/ShardListWrappingShardClosureVerificationResponse.java @@ -0,0 +1,32 @@ +package com.amazonaws.services.kinesis.clientlibrary.proxies; + +import com.amazonaws.services.kinesis.model.Shard; +import lombok.Getter; + +import java.util.List; + +/** + * Implementation of ShardClosureVerificationResponse that also wraps the latest results + * from IKinesisProxy.getShardList() in it. + */ +public class ShardListWrappingShardClosureVerificationResponse implements ShardClosureVerificationResponse { + + private final boolean isShardClosed; + + @Getter + private final List latestShards; + + /** + * Used to capture response from KinesisProxy.verifyShardClosure method. + * @param isShardClosed If the provided shardId corresponds to a shard that was verified as closed. + * @param latestShards Result returned by KinesisProxy.getShardList() used for verification. + */ + public ShardListWrappingShardClosureVerificationResponse(boolean isShardClosed, List latestShards) { + this.isShardClosed = isShardClosed; + this.latestShards = latestShards; + } + + @Override public boolean isShardClosed() { + return isShardClosed; + } +} diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java index cd82e475..04fadd88 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java @@ -15,6 +15,7 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -27,6 +28,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import com.amazonaws.services.kinesis.clientlibrary.proxies.ShardListWrappingShardClosureVerificationResponse; import com.amazonaws.services.kinesis.model.HashKeyRange; import com.amazonaws.services.kinesis.model.SequenceNumberRange; import com.amazonaws.services.kinesis.model.Shard; @@ -112,6 +114,7 @@ public class ShutdownTaskTest { IKinesisProxy kinesisProxy = mock(IKinesisProxy.class); List shards = constructShardListForGraphA(); when(kinesisProxy.getShardList()).thenReturn(shards); + when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardListWrappingShardClosureVerificationResponse(true, shards)); KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class); ILeaseManager leaseManager = mock(KinesisClientLeaseManager.class); when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); @@ -145,6 +148,7 @@ public class ShutdownTaskTest { List shards = constructShardListForGraphA(); IKinesisProxy kinesisProxy = mock(IKinesisProxy.class); when(kinesisProxy.getShardList()).thenReturn(shards); + when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardListWrappingShardClosureVerificationResponse(true, shards)); KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class); ILeaseManager leaseManager = mock(KinesisClientLeaseManager.class); when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); @@ -180,6 +184,7 @@ public class ShutdownTaskTest { List shards = constructShardListForGraphA(); IKinesisProxy kinesisProxy = mock(IKinesisProxy.class); when(kinesisProxy.getShardList()).thenReturn(shards); + when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardListWrappingShardClosureVerificationResponse(true, shards)); KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class); ILeaseManager leaseManager = mock(KinesisClientLeaseManager.class); when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); @@ -203,7 +208,7 @@ public class ShutdownTaskTest { shardSyncStrategy); TaskResult result = task.call(); verify(shardSyncStrategy).onShardConsumerShutDown(shards); - verify(kinesisProxy, times(1)).getShardList(); + verify(kinesisProxy, times(1)).verifyShardClosure(anyString()); Assert.assertNull(result.getException()); verify(getRecordsCache).shutdown(); verify(leaseCoordinator, never()).dropLease(any()); @@ -220,6 +225,7 @@ public class ShutdownTaskTest { List shards = constructShardListForGraphA(); IKinesisProxy kinesisProxy = mock(IKinesisProxy.class); when(kinesisProxy.getShardList()).thenReturn(shards); + when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardListWrappingShardClosureVerificationResponse(false, shards)); KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class); ILeaseManager leaseManager = mock(KinesisClientLeaseManager.class); when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); @@ -244,7 +250,7 @@ public class ShutdownTaskTest { shardSyncStrategy); TaskResult result = task.call(); verify(shardSyncStrategy, never()).onShardConsumerShutDown(shards); - verify(kinesisProxy, times(1)).getShardList(); + verify(kinesisProxy, times(1)).verifyShardClosure(anyString()); Assert.assertNull(result.getException()); verify(getRecordsCache).shutdown(); verify(leaseCoordinator).dropLease(any()); @@ -257,6 +263,7 @@ public class ShutdownTaskTest { List shards = constructShardListForGraphA(); IKinesisProxy kinesisProxy = mock(IKinesisProxy.class); when(kinesisProxy.getShardList()).thenReturn(shards); + when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardListWrappingShardClosureVerificationResponse(false, shards)); KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class); ILeaseManager leaseManager = mock(KinesisClientLeaseManager.class); when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxy.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxy.java index cc5089c4..d78f5ca0 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxy.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxy.java @@ -33,6 +33,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import com.amazonaws.util.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -423,6 +425,31 @@ public class KinesisLocalFileProxy implements IKinesisProxy { return shards; } + /** + * {@inheritDoc} + */ + @Override public ShardListWrappingShardClosureVerificationResponse verifyShardClosure(String shardId) { + List shards = this.getShardList(); + if (!CollectionUtils.isNullOrEmpty(shards) && isShardParentOfAny(shardId, shards)) { + return new ShardListWrappingShardClosureVerificationResponse(true /*isVerifiedShardWasClosed*/, shards /*latestShards*/); + } + return new ShardListWrappingShardClosureVerificationResponse(false /*isVerifiedShardWasClosed*/, shards /*latestShards*/); + } + + private boolean isShardParentOfAny(String shardId, List shards) { + for(Shard shard : shards) { + if (isShardAParent(shard, shardId)) { + return true; + } + } + return false; + } + + private boolean isShardAParent(Shard shard, String shardId) { + return (StringUtils.equals(shard.getParentShardId(), shardId) + || StringUtils.equals(shard.getAdjacentParentShardId(), shardId)); + } + /** * Used for serializing/deserializing the shard list to the file. */