1. Update the checkpoint with non SHARD_END sequence number only if the DDB sequence number is not SHARD_END.

2. Verify the shard end checkpointing by directly looking up the ddb lease entry
This commit is contained in:
Ashwin Giridharan 2021-01-25 23:02:35 -08:00
parent 76234d172c
commit b2a747f3ea
7 changed files with 75 additions and 11 deletions

View file

@ -222,11 +222,29 @@ class ShutdownTask implements ITask {
.withCheckpointer(recordProcessorCheckpointer);
recordProcessor.shutdown(shardEndShutdownInput);
final ExtendedSequenceNumber lastCheckpointValue = recordProcessorCheckpointer.getLastCheckpointValue();
boolean successfullyCheckpointedShardEnd = false;
final boolean successfullyCheckpointedShardEnd = lastCheckpointValue.equals(ExtendedSequenceNumber.SHARD_END);
KinesisClientLease leaseFromDdb = null;
try {
leaseFromDdb = leaseCoordinator.getLeaseManager().getLease(shardInfo.getShardId());
} catch (Exception e) {
LOG.error("Shard " + shardInfo.getShardId() + " : Unable to get lease entry for shard to verify shard end checkpointing.", e);
}
if ((lastCheckpointValue == null) || (!successfullyCheckpointedShardEnd)) {
if (leaseFromDdb != null && leaseFromDdb.getCheckpoint() != null) {
successfullyCheckpointedShardEnd = leaseFromDdb.getCheckpoint().equals(ExtendedSequenceNumber.SHARD_END);
final ExtendedSequenceNumber lastCheckpointValue = recordProcessorCheckpointer.getLastCheckpointValue();
if (!leaseFromDdb.getCheckpoint().equals(lastCheckpointValue)) {
LOG.error("Shard " + shardInfo.getShardId() +
" : Checkpoint information mismatch between authoritative source and local cache. " +
"This does not affect the application flow, but cut a ticket to Kinesis when you see this. " +
"Authoritative entry : " + leaseFromDdb.getCheckpoint() + " Cache entry : " + lastCheckpointValue);
}
} else {
LOG.error("Shard " + shardInfo.getShardId() + " : No lease checkpoint entry for shard to verify shard end checkpointing. Lease Entry : " + leaseFromDdb);
}
if (!successfullyCheckpointedShardEnd) {
throw new IllegalArgumentException("Application didn't checkpoint at end of shard "
+ shardInfo.getShardId() + ". Application must checkpoint upon shutdown. " +
"See IRecordProcessor.shutdown javadocs for more information.");

View file

@ -22,6 +22,7 @@ import com.amazonaws.services.dynamodbv2.model.AttributeAction;
import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.AttributeValueUpdate;
import com.amazonaws.services.dynamodbv2.model.ComparisonOperator;
import com.amazonaws.services.dynamodbv2.model.ExpectedAttributeValue;
import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
@ -126,6 +127,19 @@ public class KinesisClientLeaseSerializer implements ILeaseSerializer<KinesisCli
return baseSerializer.getDynamoLeaseOwnerExpectation(lease);
}
@Override
public Map<String, ExpectedAttributeValue> getDynamoLeaseCheckpointExpectation(KinesisClientLease lease) {
Map<String, ExpectedAttributeValue> result = baseSerializer.getDynamoLeaseCheckpointExpectation(lease);
ExpectedAttributeValue eav;
if (!lease.getCheckpoint().equals(ExtendedSequenceNumber.SHARD_END)) {
eav = new ExpectedAttributeValue(DynamoUtils.createAttributeValue(ExtendedSequenceNumber.SHARD_END.getSequenceNumber()));
eav.setComparisonOperator(ComparisonOperator.NE);
result.put(CHECKPOINT_SEQUENCE_NUMBER_KEY, eav);
}
return result;
}
@Override
public Map<String, ExpectedAttributeValue> getDynamoNonexistantExpectation() {
return baseSerializer.getDynamoNonexistantExpectation();

View file

@ -20,6 +20,7 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import com.amazonaws.services.dynamodbv2.model.BillingMode;
import com.amazonaws.services.dynamodbv2.model.ExpectedAttributeValue;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.leases.util.DynamoUtils;
import org.apache.commons.logging.Log;
@ -582,7 +583,9 @@ public class LeaseManager<T extends Lease> implements ILeaseManager<T> {
UpdateItemRequest request = new UpdateItemRequest();
request.setTableName(table);
request.setKey(serializer.getDynamoHashKey(lease));
request.setExpected(serializer.getDynamoLeaseCounterExpectation(lease));
Map<String, ExpectedAttributeValue> expectations = serializer.getDynamoLeaseCounterExpectation(lease);
expectations.putAll(serializer.getDynamoLeaseCheckpointExpectation(lease));
request.setExpected(expectations);
Map<String, AttributeValueUpdate> updates = serializer.getDynamoLeaseCounterUpdate(lease);
updates.putAll(serializer.getDynamoUpdateLeaseUpdate(lease));

View file

@ -127,6 +127,11 @@ public class LeaseSerializer implements ILeaseSerializer<Lease> {
return result;
}
@Override
public Map<String, ExpectedAttributeValue> getDynamoLeaseCheckpointExpectation(final Lease lease) {
return new HashMap<>();
}
@Override
public Map<String, ExpectedAttributeValue> getDynamoNonexistantExpectation() {
Map<String, ExpectedAttributeValue> result = new HashMap<String, ExpectedAttributeValue>();

View file

@ -74,6 +74,14 @@ public interface ILeaseSerializer<T extends Lease> {
*/
public Map<String, ExpectedAttributeValue> getDynamoLeaseOwnerExpectation(T lease);
/**
* @param lease
* @return the attribute value map asserting that the checkpoint state is as expected.
*/
default Map<String, ExpectedAttributeValue> getDynamoLeaseCheckpointExpectation(T lease) {
throw new UnsupportedOperationException("DynamoLeaseCheckpointExpectation is not implemented");
}
/**
* @return the attribute value map asserting that a lease does not exist.
*/

View file

@ -587,7 +587,9 @@ public class ShardConsumerTest {
parentShardIds.add("parentShardId");
KinesisClientLease currentLease = createLease(streamShardId, "leaseOwner", parentShardIds);
currentLease.setCheckpoint(new ExtendedSequenceNumber("testSequenceNumbeer"));
when(leaseManager.getLease(streamShardId)).thenReturn(currentLease);
KinesisClientLease currentLease1 = createLease(streamShardId, "leaseOwner", parentShardIds);
currentLease1.setCheckpoint(ExtendedSequenceNumber.SHARD_END);
when(leaseManager.getLease(streamShardId)).thenReturn(currentLease, currentLease, currentLease1);
when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId())).thenReturn(currentLease);
RecordProcessorCheckpointer recordProcessorCheckpointer = new RecordProcessorCheckpointer(
@ -714,7 +716,10 @@ public class ShardConsumerTest {
parentShardIds.add("parentShardId");
KinesisClientLease currentLease = createLease(streamShardId, "leaseOwner", parentShardIds);
currentLease.setCheckpoint(new ExtendedSequenceNumber("testSequenceNumbeer"));
when(leaseManager.getLease(streamShardId)).thenReturn(currentLease);
KinesisClientLease currentLease1 = createLease(streamShardId, "leaseOwner", parentShardIds);
currentLease1.setCheckpoint(ExtendedSequenceNumber.SHARD_END);
when(leaseManager.getLease(streamShardId)).thenReturn(currentLease, currentLease, currentLease1);
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
TransientShutdownErrorTestStreamlet processor = new TransientShutdownErrorTestStreamlet();

View file

@ -19,6 +19,7 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@ -211,11 +212,16 @@ public class ShutdownTaskTest {
boolean cleanupLeasesOfCompletedShards = false;
boolean ignoreUnexpectedChildShards = false;
KinesisClientLease currentLease = createLease(defaultShardId, "leaseOwner", Collections.emptyList());
currentLease.setCheckpoint(new ExtendedSequenceNumber("3298"));
KinesisClientLease currentLease1 = createLease(defaultShardId, "leaseOwner", Collections.emptyList());
currentLease1.setCheckpoint(new ExtendedSequenceNumber("3298"));
KinesisClientLease currentLease2 = createLease(defaultShardId, "leaseOwner", Collections.emptyList());
currentLease2.setCheckpoint(ExtendedSequenceNumber.SHARD_END);
KinesisClientLease adjacentParentLease = createLease("ShardId-1", "leaseOwner", Collections.emptyList());
when(leaseCoordinator.getCurrentlyHeldLease(defaultShardId)).thenReturn( currentLease);
when(leaseManager.getLease(defaultShardId)).thenReturn(currentLease);
when(leaseCoordinator.getCurrentlyHeldLease(defaultShardId)).thenReturn( currentLease1);
// 6 times as part of parent lease get in failure mode and then two times in actual execution
when(leaseManager.getLease(defaultShardId)).thenReturn(currentLease1, currentLease1, currentLease1, currentLease1,
currentLease1, currentLease1, currentLease1, currentLease2);
when(leaseManager.getLease("ShardId-1")).thenReturn(null, null, null, null, null, adjacentParentLease);
// Make first 5 attempts with partial parent info in lease table
@ -267,7 +273,7 @@ public class ShutdownTaskTest {
verify(task, never()).isOneInNProbability(RETRY_RANDOM_MAX_RANGE);
verify(getRecordsCache).shutdown();
verify(defaultRecordProcessor).shutdown(any(ShutdownInput.class));
verify(leaseCoordinator, never()).dropLease(currentLease);
verify(leaseCoordinator, never()).dropLease(currentLease1);
}
@Test
@ -337,6 +343,11 @@ public class ShutdownTaskTest {
public final void testCallWhenShardEnd() throws Exception {
RecordProcessorCheckpointer checkpointer = mock(RecordProcessorCheckpointer.class);
when(checkpointer.getLastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END);
final KinesisClientLease parentLease1 = createLease(defaultShardId, "leaseOwner", Collections.emptyList());
parentLease1.setCheckpoint(new ExtendedSequenceNumber("3298"));
final KinesisClientLease parentLease2 = createLease(defaultShardId, "leaseOwner", Collections.emptyList());
parentLease2.setCheckpoint(ExtendedSequenceNumber.SHARD_END);
when(leaseManager.getLease(defaultShardId)).thenReturn(parentLease1).thenReturn(parentLease2);
boolean cleanupLeasesOfCompletedShards = false;
boolean ignoreUnexpectedChildShards = false;