Refactored shard closure verification performed by ShutdownTask

This commit is contained in:
parijas 2020-02-03 17:00:32 -08:00
parent 02c2036d5d
commit 1d50d766f4
7 changed files with 121 additions and 24 deletions

View file

@ -14,9 +14,8 @@
*/ */
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import com.amazonaws.services.kinesis.clientlibrary.proxies.ShardClosureVerificationResponse;
import com.amazonaws.services.kinesis.model.Shard; import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.util.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -29,8 +28,6 @@ import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List; import java.util.List;
/** /**
@ -107,11 +104,13 @@ class ShutdownTask implements ITask {
* workers to contend for the lease of this shard. * workers to contend for the lease of this shard.
*/ */
if(localReason == ShutdownReason.TERMINATE) { if(localReason == ShutdownReason.TERMINATE) {
latestShards = kinesisProxy.getShardList(); ShardClosureVerificationResponse shardClosureVerificationResponse = kinesisProxy.verifyShardClosure(shardInfo.getShardId());
latestShards = shardClosureVerificationResponse.getLatestShards();
//If latestShards is null or empty, we should still shut down the ShardConsumer with Zombie state which avoid // If latestShards is null or empty, it means shard in context is not closed yet.
// checking point with SHARD_END sequence number. // We should shut down the ShardConsumer with Zombie state which avoids checkpoint-ing
if(CollectionUtils.isNullOrEmpty(latestShards) || !isShardInContextParentOfAny(latestShards)) { // with SHARD_END sequence number.
if(!shardClosureVerificationResponse.isVerifiedShardWasClosed()) {
localReason = ShutdownReason.ZOMBIE; localReason = ShutdownReason.ZOMBIE;
dropLease(); dropLease();
LOG.info("Forcing the lease to be lost before shutting down the consumer for Shard: " + shardInfo.getShardId()); LOG.info("Forcing the lease to be lost before shutting down the consumer for Shard: " + shardInfo.getShardId());
@ -201,20 +200,6 @@ class ShutdownTask implements ITask {
return reason; return reason;
} }
private boolean isShardInContextParentOfAny(List<Shard> shards) {
for(Shard shard : shards) {
if (isChildShardOfShardInContext(shard)) {
return true;
}
}
return false;
}
private boolean isChildShardOfShardInContext(Shard shard) {
return (StringUtils.equals(shard.getParentShardId(), shardInfo.getShardId())
|| StringUtils.equals(shard.getAdjacentParentShardId(), shardInfo.getShardId()));
}
private void dropLease() { private void dropLease() {
KinesisClientLease lease = leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId()); KinesisClientLease lease = leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId());
leaseCoordinator.dropLease(lease); leaseCoordinator.dropLease(lease);

View file

@ -17,6 +17,7 @@ package com.amazonaws.services.kinesis.clientlibrary.proxies;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import com.amazonaws.services.kinesis.model.DescribeStreamResult; import com.amazonaws.services.kinesis.model.DescribeStreamResult;
@ -78,6 +79,18 @@ public interface IKinesisProxy {
*/ */
List<Shard> getShardList() throws ResourceNotFoundException; List<Shard> getShardList() throws ResourceNotFoundException;
/**
* Used to verify during ShardConsumer shutdown if the provided shardId is for a shard that has been closed.
* Returns the list of shards so it can be used for lease creation (instead of calling getShardList() again)
* along with the result of verification.
* Note that DynamoDBStreamsProxy does not implement this check and hence resorts to the default response.
* @param shardId Id of the shard that needs to be verified.
* @return an Object of type ShardClosureVerificationResponse.
*/
default ShardClosureVerificationResponse verifyShardClosure(String shardId) {
return new ShardClosureVerificationResponse(true /*isVerifiedShardWasClosed*/, null /*latestShards*/);
}
/** /**
* Fetch a shard iterator from the specified position in the shard. * Fetch a shard iterator from the specified position in the shard.
* This is to fetch a shard iterator for ShardIteratorType AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER which * This is to fetch a shard iterator for ShardIteratorType AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER which

View file

@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import com.amazonaws.util.CollectionUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -476,6 +477,35 @@ public class KinesisProxy implements IKinesisProxyExtended {
return shards; return shards;
} }
/**
* Used to verify during ShardConsumer shutdown if the provided shardId is for a shard that has been closed.
* Returns the list of shards so it can be used for lease creation (instead of calling getShardList() again)
* along with the result of verification.
* @param shardId Id of the shard that needs to be verified.
* @return an Object of type ShardClosureVerificationResponse.
*/
@Override public ShardClosureVerificationResponse verifyShardClosure(String shardId) {
List<Shard> shards = this.getShardList();
if (!CollectionUtils.isNullOrEmpty(shards) && isShardParentOfAny(shardId, shards)) {
return new ShardClosureVerificationResponse(true /*isVerifiedShardWasClosed*/, shards /*latestShards*/);
}
return new ShardClosureVerificationResponse(false /*isVerifiedShardWasClosed*/, shards /*latestShards*/);
}
private boolean isShardParentOfAny(String shardId, List<Shard> shards) {
for(Shard shard : shards) {
if (isChildShardOfShard(shard, shardId)) {
return true;
}
}
return false;
}
private boolean isChildShardOfShard(Shard shard, String shardId) {
return (StringUtils.equals(shard.getParentShardId(), shardId)
|| StringUtils.equals(shard.getAdjacentParentShardId(), shardId));
}
/** /**
* {@inheritDoc} * {@inheritDoc}
*/ */

View file

@ -179,6 +179,14 @@ public class MetricsCollectingKinesisProxyDecorator implements IKinesisProxy {
} }
} }
/**
* @param shardId Id of the shard that needs to be verified.
* @return
*/
@Override public ShardClosureVerificationResponse verifyShardClosure(String shardId) {
return other.verifyShardClosure(shardId);
}
/** /**
* {@inheritDoc} * {@inheritDoc}
*/ */

View file

@ -0,0 +1,27 @@
package com.amazonaws.services.kinesis.clientlibrary.proxies;
import com.amazonaws.services.kinesis.model.Shard;
import lombok.Getter;
import java.util.List;
/**
* ShutDown task verifies if the shard in context is necessarily closed before checkpointing the shard at SHARD_END.
* The verification is done by IKinesisProxy and this class is used to wrap the verification results.
*/
@Getter
public class ShardClosureVerificationResponse {
private final boolean verifiedShardWasClosed;
private final List<Shard> latestShards;
/**
* Used to capture response from KinesisProxy.verifyShardClosure method.
* @param verifiedShardWasClosed If the provided shardId corresponds to a shard that was verified as closed.
* @param latestShards Result returned by KinesisProxy.getShardList() used for verification.
*/
public ShardClosureVerificationResponse(boolean verifiedShardWasClosed, List<Shard> latestShards) {
this.verifiedShardWasClosed = verifiedShardWasClosed;
this.latestShards = latestShards;
}
}

View file

@ -15,6 +15,7 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
@ -27,6 +28,7 @@ import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import com.amazonaws.services.kinesis.clientlibrary.proxies.ShardClosureVerificationResponse;
import com.amazonaws.services.kinesis.model.HashKeyRange; import com.amazonaws.services.kinesis.model.HashKeyRange;
import com.amazonaws.services.kinesis.model.SequenceNumberRange; import com.amazonaws.services.kinesis.model.SequenceNumberRange;
import com.amazonaws.services.kinesis.model.Shard; import com.amazonaws.services.kinesis.model.Shard;
@ -112,6 +114,7 @@ public class ShutdownTaskTest {
IKinesisProxy kinesisProxy = mock(IKinesisProxy.class); IKinesisProxy kinesisProxy = mock(IKinesisProxy.class);
List<Shard> shards = constructShardListForGraphA(); List<Shard> shards = constructShardListForGraphA();
when(kinesisProxy.getShardList()).thenReturn(shards); when(kinesisProxy.getShardList()).thenReturn(shards);
when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardClosureVerificationResponse(true, shards));
KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class); KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class);
ILeaseManager<KinesisClientLease> leaseManager = mock(KinesisClientLeaseManager.class); ILeaseManager<KinesisClientLease> leaseManager = mock(KinesisClientLeaseManager.class);
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
@ -145,6 +148,7 @@ public class ShutdownTaskTest {
List<Shard> shards = constructShardListForGraphA(); List<Shard> shards = constructShardListForGraphA();
IKinesisProxy kinesisProxy = mock(IKinesisProxy.class); IKinesisProxy kinesisProxy = mock(IKinesisProxy.class);
when(kinesisProxy.getShardList()).thenReturn(shards); when(kinesisProxy.getShardList()).thenReturn(shards);
when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardClosureVerificationResponse(true, shards));
KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class); KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class);
ILeaseManager<KinesisClientLease> leaseManager = mock(KinesisClientLeaseManager.class); ILeaseManager<KinesisClientLease> leaseManager = mock(KinesisClientLeaseManager.class);
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
@ -180,6 +184,7 @@ public class ShutdownTaskTest {
List<Shard> shards = constructShardListForGraphA(); List<Shard> shards = constructShardListForGraphA();
IKinesisProxy kinesisProxy = mock(IKinesisProxy.class); IKinesisProxy kinesisProxy = mock(IKinesisProxy.class);
when(kinesisProxy.getShardList()).thenReturn(shards); when(kinesisProxy.getShardList()).thenReturn(shards);
when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardClosureVerificationResponse(true, shards));
KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class); KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class);
ILeaseManager<KinesisClientLease> leaseManager = mock(KinesisClientLeaseManager.class); ILeaseManager<KinesisClientLease> leaseManager = mock(KinesisClientLeaseManager.class);
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
@ -203,7 +208,7 @@ public class ShutdownTaskTest {
shardSyncStrategy); shardSyncStrategy);
TaskResult result = task.call(); TaskResult result = task.call();
verify(shardSyncStrategy).onShardConsumerShutDown(shards); verify(shardSyncStrategy).onShardConsumerShutDown(shards);
verify(kinesisProxy, times(1)).getShardList(); verify(kinesisProxy, times(1)).verifyShardClosure(anyString());
Assert.assertNull(result.getException()); Assert.assertNull(result.getException());
verify(getRecordsCache).shutdown(); verify(getRecordsCache).shutdown();
verify(leaseCoordinator, never()).dropLease(any()); verify(leaseCoordinator, never()).dropLease(any());
@ -220,6 +225,7 @@ public class ShutdownTaskTest {
List<Shard> shards = constructShardListForGraphA(); List<Shard> shards = constructShardListForGraphA();
IKinesisProxy kinesisProxy = mock(IKinesisProxy.class); IKinesisProxy kinesisProxy = mock(IKinesisProxy.class);
when(kinesisProxy.getShardList()).thenReturn(shards); when(kinesisProxy.getShardList()).thenReturn(shards);
when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardClosureVerificationResponse(false, shards));
KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class); KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class);
ILeaseManager<KinesisClientLease> leaseManager = mock(KinesisClientLeaseManager.class); ILeaseManager<KinesisClientLease> leaseManager = mock(KinesisClientLeaseManager.class);
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
@ -244,7 +250,7 @@ public class ShutdownTaskTest {
shardSyncStrategy); shardSyncStrategy);
TaskResult result = task.call(); TaskResult result = task.call();
verify(shardSyncStrategy, never()).onShardConsumerShutDown(shards); verify(shardSyncStrategy, never()).onShardConsumerShutDown(shards);
verify(kinesisProxy, times(1)).getShardList(); verify(kinesisProxy, times(1)).verifyShardClosure(anyString());
Assert.assertNull(result.getException()); Assert.assertNull(result.getException());
verify(getRecordsCache).shutdown(); verify(getRecordsCache).shutdown();
verify(leaseCoordinator).dropLease(any()); verify(leaseCoordinator).dropLease(any());
@ -257,6 +263,7 @@ public class ShutdownTaskTest {
List<Shard> shards = constructShardListForGraphA(); List<Shard> shards = constructShardListForGraphA();
IKinesisProxy kinesisProxy = mock(IKinesisProxy.class); IKinesisProxy kinesisProxy = mock(IKinesisProxy.class);
when(kinesisProxy.getShardList()).thenReturn(shards); when(kinesisProxy.getShardList()).thenReturn(shards);
when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardClosureVerificationResponse(false, shards));
KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class); KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class);
ILeaseManager<KinesisClientLease> leaseManager = mock(KinesisClientLeaseManager.class); ILeaseManager<KinesisClientLease> leaseManager = mock(KinesisClientLeaseManager.class);
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);

View file

@ -33,6 +33,8 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import com.amazonaws.util.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -423,6 +425,31 @@ public class KinesisLocalFileProxy implements IKinesisProxy {
return shards; return shards;
} }
/**
* {@inheritDoc}
*/
@Override public ShardClosureVerificationResponse verifyShardClosure(String shardId) {
List<Shard> shards = this.getShardList();
if (!CollectionUtils.isNullOrEmpty(shards) && isShardParentOfAny(shardId, shards)) {
return new ShardClosureVerificationResponse(true /*isVerifiedShardWasClosed*/, shards /*latestShards*/);
}
return new ShardClosureVerificationResponse(false /*isVerifiedShardWasClosed*/, shards /*latestShards*/);
}
private boolean isShardParentOfAny(String shardId, List<Shard> shards) {
for(Shard shard : shards) {
if (isChildShardOfShard(shard, shardId)) {
return true;
}
}
return false;
}
private boolean isChildShardOfShard(Shard shard, String shardId) {
return (StringUtils.equals(shard.getParentShardId(), shardId)
|| StringUtils.equals(shard.getAdjacentParentShardId(), shardId));
}
/** /**
* Used for serializing/deserializing the shard list to the file. * Used for serializing/deserializing the shard list to the file.
*/ */