Addressed PR comments
This commit is contained in:
parent
1d50d766f4
commit
94a6324724
8 changed files with 62 additions and 49 deletions
|
|
@ -15,6 +15,7 @@
|
||||||
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||||
|
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.proxies.ShardClosureVerificationResponse;
|
import com.amazonaws.services.kinesis.clientlibrary.proxies.ShardClosureVerificationResponse;
|
||||||
|
import com.amazonaws.services.kinesis.clientlibrary.proxies.ShardListWrappingShardClosureVerificationResponse;
|
||||||
import com.amazonaws.services.kinesis.model.Shard;
|
import com.amazonaws.services.kinesis.model.Shard;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
@ -105,12 +106,13 @@ class ShutdownTask implements ITask {
|
||||||
*/
|
*/
|
||||||
if(localReason == ShutdownReason.TERMINATE) {
|
if(localReason == ShutdownReason.TERMINATE) {
|
||||||
ShardClosureVerificationResponse shardClosureVerificationResponse = kinesisProxy.verifyShardClosure(shardInfo.getShardId());
|
ShardClosureVerificationResponse shardClosureVerificationResponse = kinesisProxy.verifyShardClosure(shardInfo.getShardId());
|
||||||
latestShards = shardClosureVerificationResponse.getLatestShards();
|
if (shardClosureVerificationResponse instanceof ShardListWrappingShardClosureVerificationResponse) {
|
||||||
|
latestShards = ((ShardListWrappingShardClosureVerificationResponse)shardClosureVerificationResponse).getLatestShards();
|
||||||
|
}
|
||||||
|
|
||||||
// If latestShards is null or empty, it means shard in context is not closed yet.
|
// If shard in context is not closed yet we should shut down the ShardConsumer with Zombie state
|
||||||
// We should shut down the ShardConsumer with Zombie state which avoids checkpoint-ing
|
// which avoids checkpoint-ing with SHARD_END sequence number.
|
||||||
// with SHARD_END sequence number.
|
if(!shardClosureVerificationResponse.isShardClosed()) {
|
||||||
if(!shardClosureVerificationResponse.isVerifiedShardWasClosed()) {
|
|
||||||
localReason = ShutdownReason.ZOMBIE;
|
localReason = ShutdownReason.ZOMBIE;
|
||||||
dropLease();
|
dropLease();
|
||||||
LOG.info("Forcing the lease to be lost before shutting down the consumer for Shard: " + shardInfo.getShardId());
|
LOG.info("Forcing the lease to be lost before shutting down the consumer for Shard: " + shardInfo.getShardId());
|
||||||
|
|
|
||||||
|
|
@ -17,7 +17,6 @@ package com.amazonaws.services.kinesis.clientlibrary.proxies;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import com.amazonaws.services.kinesis.model.DescribeStreamResult;
|
import com.amazonaws.services.kinesis.model.DescribeStreamResult;
|
||||||
|
|
@ -81,15 +80,10 @@ public interface IKinesisProxy {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Used to verify during ShardConsumer shutdown if the provided shardId is for a shard that has been closed.
|
* Used to verify during ShardConsumer shutdown if the provided shardId is for a shard that has been closed.
|
||||||
* Returns the list of shards so it can be used for lease creation (instead of calling getShardList() again)
|
|
||||||
* along with the result of verification.
|
|
||||||
* Note that DynamoDBStreamsProxy does not implement this check and hence resorts to the default response.
|
|
||||||
* @param shardId Id of the shard that needs to be verified.
|
* @param shardId Id of the shard that needs to be verified.
|
||||||
* @return an Object of type ShardClosureVerificationResponse.
|
* @return an Object of type ShardClosureVerificationResponse.
|
||||||
*/
|
*/
|
||||||
default ShardClosureVerificationResponse verifyShardClosure(String shardId) {
|
ShardClosureVerificationResponse verifyShardClosure(String shardId);
|
||||||
return new ShardClosureVerificationResponse(true /*isVerifiedShardWasClosed*/, null /*latestShards*/);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Fetch a shard iterator from the specified position in the shard.
|
* Fetch a shard iterator from the specified position in the shard.
|
||||||
|
|
|
||||||
|
|
@ -479,29 +479,29 @@ public class KinesisProxy implements IKinesisProxyExtended {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Used to verify during ShardConsumer shutdown if the provided shardId is for a shard that has been closed.
|
* Used to verify during ShardConsumer shutdown if the provided shardId is for a shard that has been closed.
|
||||||
* Returns the list of shards so it can be used for lease creation (instead of calling getShardList() again)
|
* Also returns the list of shards wrapped in the response so that it can be used for lease creation
|
||||||
* along with the result of verification.
|
* (instead of calling getShardList() again).
|
||||||
* @param shardId Id of the shard that needs to be verified.
|
* @param shardId Id of the shard that needs to be verified.
|
||||||
* @return an Object of type ShardClosureVerificationResponse.
|
* @return an Object of type ShardClosureVerificationResponse.
|
||||||
*/
|
*/
|
||||||
@Override public ShardClosureVerificationResponse verifyShardClosure(String shardId) {
|
@Override public ShardClosureVerificationResponse verifyShardClosure(String shardId) {
|
||||||
List<Shard> shards = this.getShardList();
|
List<Shard> shards = this.getShardList();
|
||||||
if (!CollectionUtils.isNullOrEmpty(shards) && isShardParentOfAny(shardId, shards)) {
|
if (!CollectionUtils.isNullOrEmpty(shards) && isShardParentOfAny(shardId, shards)) {
|
||||||
return new ShardClosureVerificationResponse(true /*isVerifiedShardWasClosed*/, shards /*latestShards*/);
|
return new ShardListWrappingShardClosureVerificationResponse(true /*isVerifiedShardWasClosed*/, shards /*latestShards*/);
|
||||||
}
|
}
|
||||||
return new ShardClosureVerificationResponse(false /*isVerifiedShardWasClosed*/, shards /*latestShards*/);
|
return new ShardListWrappingShardClosureVerificationResponse(false /*isVerifiedShardWasClosed*/, shards /*latestShards*/);
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isShardParentOfAny(String shardId, List<Shard> shards) {
|
private boolean isShardParentOfAny(String shardId, List<Shard> shards) {
|
||||||
for(Shard shard : shards) {
|
for(Shard shard : shards) {
|
||||||
if (isChildShardOfShard(shard, shardId)) {
|
if (isShardAParent(shard, shardId)) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isChildShardOfShard(Shard shard, String shardId) {
|
private boolean isShardAParent(Shard shard, String shardId) {
|
||||||
return (StringUtils.equals(shard.getParentShardId(), shardId)
|
return (StringUtils.equals(shard.getParentShardId(), shardId)
|
||||||
|| StringUtils.equals(shard.getAdjacentParentShardId(), shardId));
|
|| StringUtils.equals(shard.getAdjacentParentShardId(), shardId));
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -180,8 +180,7 @@ public class MetricsCollectingKinesisProxyDecorator implements IKinesisProxy {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param shardId Id of the shard that needs to be verified.
|
* {@inheritDoc}
|
||||||
* @return
|
|
||||||
*/
|
*/
|
||||||
@Override public ShardClosureVerificationResponse verifyShardClosure(String shardId) {
|
@Override public ShardClosureVerificationResponse verifyShardClosure(String shardId) {
|
||||||
return other.verifyShardClosure(shardId);
|
return other.verifyShardClosure(shardId);
|
||||||
|
|
|
||||||
|
|
@ -1,27 +1,13 @@
|
||||||
package com.amazonaws.services.kinesis.clientlibrary.proxies;
|
package com.amazonaws.services.kinesis.clientlibrary.proxies;
|
||||||
|
|
||||||
import com.amazonaws.services.kinesis.model.Shard;
|
|
||||||
import lombok.Getter;
|
|
||||||
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* ShutDown task verifies if the shard in context is necessarily closed before checkpointing the shard at SHARD_END.
|
* ShutDown task verifies if the shard in context is necessarily closed before checkpointing the shard at SHARD_END.
|
||||||
* The verification is done by IKinesisProxy and this class is used to wrap the verification results.
|
* The verification is done by IKinesisProxy and implementations of this interface are used to wrap the verification results.
|
||||||
*/
|
*/
|
||||||
@Getter
|
public interface ShardClosureVerificationResponse {
|
||||||
public class ShardClosureVerificationResponse {
|
|
||||||
|
|
||||||
private final boolean verifiedShardWasClosed;
|
|
||||||
private final List<Shard> latestShards;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Used to capture response from KinesisProxy.verifyShardClosure method.
|
* @return A boolean value indicating whether or not the shard in context (for which the request was made) is closed.
|
||||||
* @param verifiedShardWasClosed If the provided shardId corresponds to a shard that was verified as closed.
|
|
||||||
* @param latestShards Result returned by KinesisProxy.getShardList() used for verification.
|
|
||||||
*/
|
*/
|
||||||
public ShardClosureVerificationResponse(boolean verifiedShardWasClosed, List<Shard> latestShards) {
|
boolean isShardClosed();
|
||||||
this.verifiedShardWasClosed = verifiedShardWasClosed;
|
|
||||||
this.latestShards = latestShards;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,32 @@
|
||||||
|
package com.amazonaws.services.kinesis.clientlibrary.proxies;
|
||||||
|
|
||||||
|
import com.amazonaws.services.kinesis.model.Shard;
|
||||||
|
import lombok.Getter;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Implementation of ShardClosureVerificationResponse that also wraps the latest results
|
||||||
|
* from IKinesisProxy.getShardList() in it.
|
||||||
|
*/
|
||||||
|
public class ShardListWrappingShardClosureVerificationResponse implements ShardClosureVerificationResponse {
|
||||||
|
|
||||||
|
private final boolean isShardClosed;
|
||||||
|
|
||||||
|
@Getter
|
||||||
|
private final List<Shard> latestShards;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Used to capture response from KinesisProxy.verifyShardClosure method.
|
||||||
|
* @param isShardClosed If the provided shardId corresponds to a shard that was verified as closed.
|
||||||
|
* @param latestShards Result returned by KinesisProxy.getShardList() used for verification.
|
||||||
|
*/
|
||||||
|
public ShardListWrappingShardClosureVerificationResponse(boolean isShardClosed, List<Shard> latestShards) {
|
||||||
|
this.isShardClosed = isShardClosed;
|
||||||
|
this.latestShards = latestShards;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public boolean isShardClosed() {
|
||||||
|
return isShardClosed;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -28,7 +28,7 @@ import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.proxies.ShardClosureVerificationResponse;
|
import com.amazonaws.services.kinesis.clientlibrary.proxies.ShardListWrappingShardClosureVerificationResponse;
|
||||||
import com.amazonaws.services.kinesis.model.HashKeyRange;
|
import com.amazonaws.services.kinesis.model.HashKeyRange;
|
||||||
import com.amazonaws.services.kinesis.model.SequenceNumberRange;
|
import com.amazonaws.services.kinesis.model.SequenceNumberRange;
|
||||||
import com.amazonaws.services.kinesis.model.Shard;
|
import com.amazonaws.services.kinesis.model.Shard;
|
||||||
|
|
@ -114,7 +114,7 @@ public class ShutdownTaskTest {
|
||||||
IKinesisProxy kinesisProxy = mock(IKinesisProxy.class);
|
IKinesisProxy kinesisProxy = mock(IKinesisProxy.class);
|
||||||
List<Shard> shards = constructShardListForGraphA();
|
List<Shard> shards = constructShardListForGraphA();
|
||||||
when(kinesisProxy.getShardList()).thenReturn(shards);
|
when(kinesisProxy.getShardList()).thenReturn(shards);
|
||||||
when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardClosureVerificationResponse(true, shards));
|
when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardListWrappingShardClosureVerificationResponse(true, shards));
|
||||||
KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class);
|
KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class);
|
||||||
ILeaseManager<KinesisClientLease> leaseManager = mock(KinesisClientLeaseManager.class);
|
ILeaseManager<KinesisClientLease> leaseManager = mock(KinesisClientLeaseManager.class);
|
||||||
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
|
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
|
||||||
|
|
@ -148,7 +148,7 @@ public class ShutdownTaskTest {
|
||||||
List<Shard> shards = constructShardListForGraphA();
|
List<Shard> shards = constructShardListForGraphA();
|
||||||
IKinesisProxy kinesisProxy = mock(IKinesisProxy.class);
|
IKinesisProxy kinesisProxy = mock(IKinesisProxy.class);
|
||||||
when(kinesisProxy.getShardList()).thenReturn(shards);
|
when(kinesisProxy.getShardList()).thenReturn(shards);
|
||||||
when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardClosureVerificationResponse(true, shards));
|
when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardListWrappingShardClosureVerificationResponse(true, shards));
|
||||||
KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class);
|
KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class);
|
||||||
ILeaseManager<KinesisClientLease> leaseManager = mock(KinesisClientLeaseManager.class);
|
ILeaseManager<KinesisClientLease> leaseManager = mock(KinesisClientLeaseManager.class);
|
||||||
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
|
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
|
||||||
|
|
@ -184,7 +184,7 @@ public class ShutdownTaskTest {
|
||||||
List<Shard> shards = constructShardListForGraphA();
|
List<Shard> shards = constructShardListForGraphA();
|
||||||
IKinesisProxy kinesisProxy = mock(IKinesisProxy.class);
|
IKinesisProxy kinesisProxy = mock(IKinesisProxy.class);
|
||||||
when(kinesisProxy.getShardList()).thenReturn(shards);
|
when(kinesisProxy.getShardList()).thenReturn(shards);
|
||||||
when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardClosureVerificationResponse(true, shards));
|
when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardListWrappingShardClosureVerificationResponse(true, shards));
|
||||||
KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class);
|
KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class);
|
||||||
ILeaseManager<KinesisClientLease> leaseManager = mock(KinesisClientLeaseManager.class);
|
ILeaseManager<KinesisClientLease> leaseManager = mock(KinesisClientLeaseManager.class);
|
||||||
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
|
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
|
||||||
|
|
@ -225,7 +225,7 @@ public class ShutdownTaskTest {
|
||||||
List<Shard> shards = constructShardListForGraphA();
|
List<Shard> shards = constructShardListForGraphA();
|
||||||
IKinesisProxy kinesisProxy = mock(IKinesisProxy.class);
|
IKinesisProxy kinesisProxy = mock(IKinesisProxy.class);
|
||||||
when(kinesisProxy.getShardList()).thenReturn(shards);
|
when(kinesisProxy.getShardList()).thenReturn(shards);
|
||||||
when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardClosureVerificationResponse(false, shards));
|
when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardListWrappingShardClosureVerificationResponse(false, shards));
|
||||||
KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class);
|
KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class);
|
||||||
ILeaseManager<KinesisClientLease> leaseManager = mock(KinesisClientLeaseManager.class);
|
ILeaseManager<KinesisClientLease> leaseManager = mock(KinesisClientLeaseManager.class);
|
||||||
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
|
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
|
||||||
|
|
@ -263,7 +263,7 @@ public class ShutdownTaskTest {
|
||||||
List<Shard> shards = constructShardListForGraphA();
|
List<Shard> shards = constructShardListForGraphA();
|
||||||
IKinesisProxy kinesisProxy = mock(IKinesisProxy.class);
|
IKinesisProxy kinesisProxy = mock(IKinesisProxy.class);
|
||||||
when(kinesisProxy.getShardList()).thenReturn(shards);
|
when(kinesisProxy.getShardList()).thenReturn(shards);
|
||||||
when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardClosureVerificationResponse(false, shards));
|
when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardListWrappingShardClosureVerificationResponse(false, shards));
|
||||||
KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class);
|
KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class);
|
||||||
ILeaseManager<KinesisClientLease> leaseManager = mock(KinesisClientLeaseManager.class);
|
ILeaseManager<KinesisClientLease> leaseManager = mock(KinesisClientLeaseManager.class);
|
||||||
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
|
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
|
||||||
|
|
|
||||||
|
|
@ -428,24 +428,24 @@ public class KinesisLocalFileProxy implements IKinesisProxy {
|
||||||
/**
|
/**
|
||||||
* {@inheritDoc}
|
* {@inheritDoc}
|
||||||
*/
|
*/
|
||||||
@Override public ShardClosureVerificationResponse verifyShardClosure(String shardId) {
|
@Override public ShardListWrappingShardClosureVerificationResponse verifyShardClosure(String shardId) {
|
||||||
List<Shard> shards = this.getShardList();
|
List<Shard> shards = this.getShardList();
|
||||||
if (!CollectionUtils.isNullOrEmpty(shards) && isShardParentOfAny(shardId, shards)) {
|
if (!CollectionUtils.isNullOrEmpty(shards) && isShardParentOfAny(shardId, shards)) {
|
||||||
return new ShardClosureVerificationResponse(true /*isVerifiedShardWasClosed*/, shards /*latestShards*/);
|
return new ShardListWrappingShardClosureVerificationResponse(true /*isVerifiedShardWasClosed*/, shards /*latestShards*/);
|
||||||
}
|
}
|
||||||
return new ShardClosureVerificationResponse(false /*isVerifiedShardWasClosed*/, shards /*latestShards*/);
|
return new ShardListWrappingShardClosureVerificationResponse(false /*isVerifiedShardWasClosed*/, shards /*latestShards*/);
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isShardParentOfAny(String shardId, List<Shard> shards) {
|
private boolean isShardParentOfAny(String shardId, List<Shard> shards) {
|
||||||
for(Shard shard : shards) {
|
for(Shard shard : shards) {
|
||||||
if (isChildShardOfShard(shard, shardId)) {
|
if (isShardAParent(shard, shardId)) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isChildShardOfShard(Shard shard, String shardId) {
|
private boolean isShardAParent(Shard shard, String shardId) {
|
||||||
return (StringUtils.equals(shard.getParentShardId(), shardId)
|
return (StringUtils.equals(shard.getParentShardId(), shardId)
|
||||||
|| StringUtils.equals(shard.getAdjacentParentShardId(), shardId));
|
|| StringUtils.equals(shard.getAdjacentParentShardId(), shardId));
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue