Merge pull request #684 from parijatsinha/v1.12_fix
Refactored shard closure verification performed by ShutdownTask
This commit is contained in:
commit
de52856b45
8 changed files with 134 additions and 24 deletions
|
|
@ -14,9 +14,9 @@
|
|||
*/
|
||||
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||
|
||||
import com.amazonaws.services.kinesis.clientlibrary.proxies.ShardClosureVerificationResponse;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.proxies.ShardListWrappingShardClosureVerificationResponse;
|
||||
import com.amazonaws.services.kinesis.model.Shard;
|
||||
import com.amazonaws.util.CollectionUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
|
|
@ -29,8 +29,6 @@ import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
|
|||
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
|
|
@ -107,11 +105,14 @@ class ShutdownTask implements ITask {
|
|||
* workers to contend for the lease of this shard.
|
||||
*/
|
||||
if(localReason == ShutdownReason.TERMINATE) {
|
||||
latestShards = kinesisProxy.getShardList();
|
||||
ShardClosureVerificationResponse shardClosureVerificationResponse = kinesisProxy.verifyShardClosure(shardInfo.getShardId());
|
||||
if (shardClosureVerificationResponse instanceof ShardListWrappingShardClosureVerificationResponse) {
|
||||
latestShards = ((ShardListWrappingShardClosureVerificationResponse)shardClosureVerificationResponse).getLatestShards();
|
||||
}
|
||||
|
||||
//If latestShards is null or empty, we should still shut down the ShardConsumer with Zombie state which avoid
|
||||
// checking point with SHARD_END sequence number.
|
||||
if(CollectionUtils.isNullOrEmpty(latestShards) || !isShardInContextParentOfAny(latestShards)) {
|
||||
// If shard in context is not closed yet we should shut down the ShardConsumer with Zombie state
|
||||
// which avoids checkpoint-ing with SHARD_END sequence number.
|
||||
if(!shardClosureVerificationResponse.isShardClosed()) {
|
||||
localReason = ShutdownReason.ZOMBIE;
|
||||
dropLease();
|
||||
LOG.info("Forcing the lease to be lost before shutting down the consumer for Shard: " + shardInfo.getShardId());
|
||||
|
|
@ -201,20 +202,6 @@ class ShutdownTask implements ITask {
|
|||
return reason;
|
||||
}
|
||||
|
||||
private boolean isShardInContextParentOfAny(List<Shard> shards) {
|
||||
for(Shard shard : shards) {
|
||||
if (isChildShardOfShardInContext(shard)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private boolean isChildShardOfShardInContext(Shard shard) {
|
||||
return (StringUtils.equals(shard.getParentShardId(), shardInfo.getShardId())
|
||||
|| StringUtils.equals(shard.getAdjacentParentShardId(), shardInfo.getShardId()));
|
||||
}
|
||||
|
||||
private void dropLease() {
|
||||
KinesisClientLease lease = leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId());
|
||||
leaseCoordinator.dropLease(lease);
|
||||
|
|
|
|||
|
|
@ -78,6 +78,13 @@ public interface IKinesisProxy {
|
|||
*/
|
||||
List<Shard> getShardList() throws ResourceNotFoundException;
|
||||
|
||||
/**
|
||||
* Used to verify during ShardConsumer shutdown if the provided shardId is for a shard that has been closed.
|
||||
* @param shardId Id of the shard that needs to be verified.
|
||||
* @return an Object of type ShardClosureVerificationResponse.
|
||||
*/
|
||||
ShardClosureVerificationResponse verifyShardClosure(String shardId);
|
||||
|
||||
/**
|
||||
* Fetch a shard iterator from the specified position in the shard.
|
||||
* This is to fetch a shard iterator for ShardIteratorType AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER which
|
||||
|
|
|
|||
|
|
@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import com.amazonaws.util.CollectionUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
|
@ -476,6 +477,35 @@ public class KinesisProxy implements IKinesisProxyExtended {
|
|||
return shards;
|
||||
}
|
||||
|
||||
/**
|
||||
* Used to verify during ShardConsumer shutdown if the provided shardId is for a shard that has been closed.
|
||||
* Also returns the list of shards wrapped in the response so that it can be used for lease creation
|
||||
* (instead of calling getShardList() again).
|
||||
* @param shardId Id of the shard that needs to be verified.
|
||||
* @return an Object of type ShardClosureVerificationResponse.
|
||||
*/
|
||||
@Override public ShardClosureVerificationResponse verifyShardClosure(String shardId) {
|
||||
List<Shard> shards = this.getShardList();
|
||||
if (!CollectionUtils.isNullOrEmpty(shards) && isShardParentOfAny(shardId, shards)) {
|
||||
return new ShardListWrappingShardClosureVerificationResponse(true /*isVerifiedShardWasClosed*/, shards /*latestShards*/);
|
||||
}
|
||||
return new ShardListWrappingShardClosureVerificationResponse(false /*isVerifiedShardWasClosed*/, shards /*latestShards*/);
|
||||
}
|
||||
|
||||
private boolean isShardParentOfAny(String shardId, List<Shard> shards) {
|
||||
for(Shard shard : shards) {
|
||||
if (isShardAParent(shard, shardId)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private boolean isShardAParent(Shard shard, String shardId) {
|
||||
return (StringUtils.equals(shard.getParentShardId(), shardId)
|
||||
|| StringUtils.equals(shard.getAdjacentParentShardId(), shardId));
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -179,6 +179,13 @@ public class MetricsCollectingKinesisProxyDecorator implements IKinesisProxy {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override public ShardClosureVerificationResponse verifyShardClosure(String shardId) {
|
||||
return other.verifyShardClosure(shardId);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -0,0 +1,13 @@
|
|||
package com.amazonaws.services.kinesis.clientlibrary.proxies;
|
||||
|
||||
/**
|
||||
* ShutDown task verifies if the shard in context is necessarily closed before checkpointing the shard at SHARD_END.
|
||||
* The verification is done by IKinesisProxy and implementations of this interface are used to wrap the verification results.
|
||||
*/
|
||||
public interface ShardClosureVerificationResponse {
|
||||
|
||||
/**
|
||||
* @return A boolean value indicating whether or not the shard in context (for which the request was made) is closed.
|
||||
*/
|
||||
boolean isShardClosed();
|
||||
}
|
||||
|
|
@ -0,0 +1,32 @@
|
|||
package com.amazonaws.services.kinesis.clientlibrary.proxies;
|
||||
|
||||
import com.amazonaws.services.kinesis.model.Shard;
|
||||
import lombok.Getter;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Implementation of ShardClosureVerificationResponse that also wraps the latest results
|
||||
* from IKinesisProxy.getShardList() in it.
|
||||
*/
|
||||
public class ShardListWrappingShardClosureVerificationResponse implements ShardClosureVerificationResponse {
|
||||
|
||||
private final boolean isShardClosed;
|
||||
|
||||
@Getter
|
||||
private final List<Shard> latestShards;
|
||||
|
||||
/**
|
||||
* Used to capture response from KinesisProxy.verifyShardClosure method.
|
||||
* @param isShardClosed If the provided shardId corresponds to a shard that was verified as closed.
|
||||
* @param latestShards Result returned by KinesisProxy.getShardList() used for verification.
|
||||
*/
|
||||
public ShardListWrappingShardClosureVerificationResponse(boolean isShardClosed, List<Shard> latestShards) {
|
||||
this.isShardClosed = isShardClosed;
|
||||
this.latestShards = latestShards;
|
||||
}
|
||||
|
||||
@Override public boolean isShardClosed() {
|
||||
return isShardClosed;
|
||||
}
|
||||
}
|
||||
|
|
@ -15,6 +15,7 @@
|
|||
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
import static org.mockito.Mockito.doNothing;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.never;
|
||||
|
|
@ -27,6 +28,7 @@ import java.util.HashSet;
|
|||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import com.amazonaws.services.kinesis.clientlibrary.proxies.ShardListWrappingShardClosureVerificationResponse;
|
||||
import com.amazonaws.services.kinesis.model.HashKeyRange;
|
||||
import com.amazonaws.services.kinesis.model.SequenceNumberRange;
|
||||
import com.amazonaws.services.kinesis.model.Shard;
|
||||
|
|
@ -112,6 +114,7 @@ public class ShutdownTaskTest {
|
|||
IKinesisProxy kinesisProxy = mock(IKinesisProxy.class);
|
||||
List<Shard> shards = constructShardListForGraphA();
|
||||
when(kinesisProxy.getShardList()).thenReturn(shards);
|
||||
when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardListWrappingShardClosureVerificationResponse(true, shards));
|
||||
KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class);
|
||||
ILeaseManager<KinesisClientLease> leaseManager = mock(KinesisClientLeaseManager.class);
|
||||
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
|
||||
|
|
@ -145,6 +148,7 @@ public class ShutdownTaskTest {
|
|||
List<Shard> shards = constructShardListForGraphA();
|
||||
IKinesisProxy kinesisProxy = mock(IKinesisProxy.class);
|
||||
when(kinesisProxy.getShardList()).thenReturn(shards);
|
||||
when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardListWrappingShardClosureVerificationResponse(true, shards));
|
||||
KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class);
|
||||
ILeaseManager<KinesisClientLease> leaseManager = mock(KinesisClientLeaseManager.class);
|
||||
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
|
||||
|
|
@ -180,6 +184,7 @@ public class ShutdownTaskTest {
|
|||
List<Shard> shards = constructShardListForGraphA();
|
||||
IKinesisProxy kinesisProxy = mock(IKinesisProxy.class);
|
||||
when(kinesisProxy.getShardList()).thenReturn(shards);
|
||||
when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardListWrappingShardClosureVerificationResponse(true, shards));
|
||||
KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class);
|
||||
ILeaseManager<KinesisClientLease> leaseManager = mock(KinesisClientLeaseManager.class);
|
||||
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
|
||||
|
|
@ -203,7 +208,7 @@ public class ShutdownTaskTest {
|
|||
shardSyncStrategy);
|
||||
TaskResult result = task.call();
|
||||
verify(shardSyncStrategy).onShardConsumerShutDown(shards);
|
||||
verify(kinesisProxy, times(1)).getShardList();
|
||||
verify(kinesisProxy, times(1)).verifyShardClosure(anyString());
|
||||
Assert.assertNull(result.getException());
|
||||
verify(getRecordsCache).shutdown();
|
||||
verify(leaseCoordinator, never()).dropLease(any());
|
||||
|
|
@ -220,6 +225,7 @@ public class ShutdownTaskTest {
|
|||
List<Shard> shards = constructShardListForGraphA();
|
||||
IKinesisProxy kinesisProxy = mock(IKinesisProxy.class);
|
||||
when(kinesisProxy.getShardList()).thenReturn(shards);
|
||||
when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardListWrappingShardClosureVerificationResponse(false, shards));
|
||||
KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class);
|
||||
ILeaseManager<KinesisClientLease> leaseManager = mock(KinesisClientLeaseManager.class);
|
||||
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
|
||||
|
|
@ -244,7 +250,7 @@ public class ShutdownTaskTest {
|
|||
shardSyncStrategy);
|
||||
TaskResult result = task.call();
|
||||
verify(shardSyncStrategy, never()).onShardConsumerShutDown(shards);
|
||||
verify(kinesisProxy, times(1)).getShardList();
|
||||
verify(kinesisProxy, times(1)).verifyShardClosure(anyString());
|
||||
Assert.assertNull(result.getException());
|
||||
verify(getRecordsCache).shutdown();
|
||||
verify(leaseCoordinator).dropLease(any());
|
||||
|
|
@ -257,6 +263,7 @@ public class ShutdownTaskTest {
|
|||
List<Shard> shards = constructShardListForGraphA();
|
||||
IKinesisProxy kinesisProxy = mock(IKinesisProxy.class);
|
||||
when(kinesisProxy.getShardList()).thenReturn(shards);
|
||||
when(kinesisProxy.verifyShardClosure(anyString())).thenReturn(new ShardListWrappingShardClosureVerificationResponse(false, shards));
|
||||
KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class);
|
||||
ILeaseManager<KinesisClientLease> leaseManager = mock(KinesisClientLeaseManager.class);
|
||||
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
|
||||
|
|
|
|||
|
|
@ -33,6 +33,8 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import com.amazonaws.util.CollectionUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
|
|
@ -423,6 +425,31 @@ public class KinesisLocalFileProxy implements IKinesisProxy {
|
|||
return shards;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override public ShardListWrappingShardClosureVerificationResponse verifyShardClosure(String shardId) {
|
||||
List<Shard> shards = this.getShardList();
|
||||
if (!CollectionUtils.isNullOrEmpty(shards) && isShardParentOfAny(shardId, shards)) {
|
||||
return new ShardListWrappingShardClosureVerificationResponse(true /*isVerifiedShardWasClosed*/, shards /*latestShards*/);
|
||||
}
|
||||
return new ShardListWrappingShardClosureVerificationResponse(false /*isVerifiedShardWasClosed*/, shards /*latestShards*/);
|
||||
}
|
||||
|
||||
private boolean isShardParentOfAny(String shardId, List<Shard> shards) {
|
||||
for(Shard shard : shards) {
|
||||
if (isShardAParent(shard, shardId)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private boolean isShardAParent(Shard shard, String shardId) {
|
||||
return (StringUtils.equals(shard.getParentShardId(), shardId)
|
||||
|| StringUtils.equals(shard.getAdjacentParentShardId(), shardId));
|
||||
}
|
||||
|
||||
/**
|
||||
* Used for serializing/deserializing the shard list to the file.
|
||||
*/
|
||||
|
|
|
|||
Loading…
Reference in a new issue