diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java index 274aaaa1..07c9fff2 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java @@ -222,11 +222,29 @@ class ShutdownTask implements ITask { .withCheckpointer(recordProcessorCheckpointer); recordProcessor.shutdown(shardEndShutdownInput); - final ExtendedSequenceNumber lastCheckpointValue = recordProcessorCheckpointer.getLastCheckpointValue(); + boolean successfullyCheckpointedShardEnd = false; - final boolean successfullyCheckpointedShardEnd = lastCheckpointValue.equals(ExtendedSequenceNumber.SHARD_END); + KinesisClientLease leaseFromDdb = null; + try { + leaseFromDdb = leaseCoordinator.getLeaseManager().getLease(shardInfo.getShardId()); + } catch (Exception e) { + LOG.error("Shard " + shardInfo.getShardId() + " : Unable to get lease entry for shard to verify shard end checkpointing.", e); + } - if ((lastCheckpointValue == null) || (!successfullyCheckpointedShardEnd)) { + if (leaseFromDdb != null && leaseFromDdb.getCheckpoint() != null) { + successfullyCheckpointedShardEnd = leaseFromDdb.getCheckpoint().equals(ExtendedSequenceNumber.SHARD_END); + final ExtendedSequenceNumber lastCheckpointValue = recordProcessorCheckpointer.getLastCheckpointValue(); + if (!leaseFromDdb.getCheckpoint().equals(lastCheckpointValue)) { + LOG.error("Shard " + shardInfo.getShardId() + + " : Checkpoint information mismatch between authoritative source and local cache. " + + "This does not affect the application flow, but cut a ticket to Kinesis when you see this. " + + "Authoritative entry : " + leaseFromDdb.getCheckpoint() + " Cache entry : " + lastCheckpointValue); + } + } else { + LOG.error("Shard " + shardInfo.getShardId() + " : No lease checkpoint entry for shard to verify shard end checkpointing. Lease Entry : " + leaseFromDdb); + } + + if (!successfullyCheckpointedShardEnd) { throw new IllegalArgumentException("Application didn't checkpoint at end of shard " + shardInfo.getShardId() + ". Application must checkpoint upon shutdown. " + "See IRecordProcessor.shutdown javadocs for more information."); diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseSerializer.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseSerializer.java index 5a7fb9b2..27568fc4 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseSerializer.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseSerializer.java @@ -22,6 +22,7 @@ import com.amazonaws.services.dynamodbv2.model.AttributeAction; import com.amazonaws.services.dynamodbv2.model.AttributeDefinition; import com.amazonaws.services.dynamodbv2.model.AttributeValue; import com.amazonaws.services.dynamodbv2.model.AttributeValueUpdate; +import com.amazonaws.services.dynamodbv2.model.ComparisonOperator; import com.amazonaws.services.dynamodbv2.model.ExpectedAttributeValue; import com.amazonaws.services.dynamodbv2.model.KeySchemaElement; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; @@ -126,6 +127,19 @@ public class KinesisClientLeaseSerializer implements ILeaseSerializer getDynamoLeaseCheckpointExpectation(KinesisClientLease lease) { + Map result = baseSerializer.getDynamoLeaseCheckpointExpectation(lease); + ExpectedAttributeValue eav; + + if (!lease.getCheckpoint().equals(ExtendedSequenceNumber.SHARD_END)) { + eav = new ExpectedAttributeValue(DynamoUtils.createAttributeValue(ExtendedSequenceNumber.SHARD_END.getSequenceNumber())); + eav.setComparisonOperator(ComparisonOperator.NE); + result.put(CHECKPOINT_SEQUENCE_NUMBER_KEY, eav); + } + return result; + } + @Override public Map getDynamoNonexistantExpectation() { return baseSerializer.getDynamoNonexistantExpectation(); diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java index d813e627..d9250c2c 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java @@ -20,6 +20,7 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import com.amazonaws.services.dynamodbv2.model.BillingMode; +import com.amazonaws.services.dynamodbv2.model.ExpectedAttributeValue; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; import com.amazonaws.services.kinesis.leases.util.DynamoUtils; import org.apache.commons.logging.Log; @@ -582,7 +583,9 @@ public class LeaseManager implements ILeaseManager { UpdateItemRequest request = new UpdateItemRequest(); request.setTableName(table); request.setKey(serializer.getDynamoHashKey(lease)); - request.setExpected(serializer.getDynamoLeaseCounterExpectation(lease)); + Map expectations = serializer.getDynamoLeaseCounterExpectation(lease); + expectations.putAll(serializer.getDynamoLeaseCheckpointExpectation(lease)); + request.setExpected(expectations); Map updates = serializer.getDynamoLeaseCounterUpdate(lease); updates.putAll(serializer.getDynamoUpdateLeaseUpdate(lease)); diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseSerializer.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseSerializer.java index 85381560..adb7087a 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseSerializer.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseSerializer.java @@ -127,6 +127,11 @@ public class LeaseSerializer implements ILeaseSerializer { return result; } + @Override + public Map getDynamoLeaseCheckpointExpectation(final Lease lease) { + return new HashMap<>(); + } + @Override public Map getDynamoNonexistantExpectation() { Map result = new HashMap(); diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/interfaces/ILeaseSerializer.java b/src/main/java/com/amazonaws/services/kinesis/leases/interfaces/ILeaseSerializer.java index 2d9ea0c9..e69d1bf6 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/interfaces/ILeaseSerializer.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/interfaces/ILeaseSerializer.java @@ -74,6 +74,14 @@ public interface ILeaseSerializer { */ public Map getDynamoLeaseOwnerExpectation(T lease); + /** + * @param lease + * @return the attribute value map asserting that the checkpoint state is as expected. + */ + default Map getDynamoLeaseCheckpointExpectation(T lease) { + throw new UnsupportedOperationException("DynamoLeaseCheckpointExpectation is not implemented"); + } + /** * @return the attribute value map asserting that a lease does not exist. */ diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java index cb89b619..7afa3a8a 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java @@ -587,7 +587,9 @@ public class ShardConsumerTest { parentShardIds.add("parentShardId"); KinesisClientLease currentLease = createLease(streamShardId, "leaseOwner", parentShardIds); currentLease.setCheckpoint(new ExtendedSequenceNumber("testSequenceNumbeer")); - when(leaseManager.getLease(streamShardId)).thenReturn(currentLease); + KinesisClientLease currentLease1 = createLease(streamShardId, "leaseOwner", parentShardIds); + currentLease1.setCheckpoint(ExtendedSequenceNumber.SHARD_END); + when(leaseManager.getLease(streamShardId)).thenReturn(currentLease, currentLease, currentLease1); when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId())).thenReturn(currentLease); RecordProcessorCheckpointer recordProcessorCheckpointer = new RecordProcessorCheckpointer( @@ -714,7 +716,10 @@ public class ShardConsumerTest { parentShardIds.add("parentShardId"); KinesisClientLease currentLease = createLease(streamShardId, "leaseOwner", parentShardIds); currentLease.setCheckpoint(new ExtendedSequenceNumber("testSequenceNumbeer")); - when(leaseManager.getLease(streamShardId)).thenReturn(currentLease); + KinesisClientLease currentLease1 = createLease(streamShardId, "leaseOwner", parentShardIds); + currentLease1.setCheckpoint(ExtendedSequenceNumber.SHARD_END); + when(leaseManager.getLease(streamShardId)).thenReturn(currentLease, currentLease, currentLease1); + when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); TransientShutdownErrorTestStreamlet processor = new TransientShutdownErrorTestStreamlet(); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java index 8b67f5dc..053a8bf7 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java @@ -19,6 +19,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -211,11 +212,16 @@ public class ShutdownTaskTest { boolean cleanupLeasesOfCompletedShards = false; boolean ignoreUnexpectedChildShards = false; - KinesisClientLease currentLease = createLease(defaultShardId, "leaseOwner", Collections.emptyList()); - currentLease.setCheckpoint(new ExtendedSequenceNumber("3298")); + KinesisClientLease currentLease1 = createLease(defaultShardId, "leaseOwner", Collections.emptyList()); + currentLease1.setCheckpoint(new ExtendedSequenceNumber("3298")); + KinesisClientLease currentLease2 = createLease(defaultShardId, "leaseOwner", Collections.emptyList()); + currentLease2.setCheckpoint(ExtendedSequenceNumber.SHARD_END); + KinesisClientLease adjacentParentLease = createLease("ShardId-1", "leaseOwner", Collections.emptyList()); - when(leaseCoordinator.getCurrentlyHeldLease(defaultShardId)).thenReturn( currentLease); - when(leaseManager.getLease(defaultShardId)).thenReturn(currentLease); + when(leaseCoordinator.getCurrentlyHeldLease(defaultShardId)).thenReturn( currentLease1); + // 6 times as part of parent lease get in failure mode and then two times in actual execution + when(leaseManager.getLease(defaultShardId)).thenReturn(currentLease1, currentLease1, currentLease1, currentLease1, + currentLease1, currentLease1, currentLease1, currentLease2); when(leaseManager.getLease("ShardId-1")).thenReturn(null, null, null, null, null, adjacentParentLease); // Make first 5 attempts with partial parent info in lease table @@ -267,7 +273,7 @@ public class ShutdownTaskTest { verify(task, never()).isOneInNProbability(RETRY_RANDOM_MAX_RANGE); verify(getRecordsCache).shutdown(); verify(defaultRecordProcessor).shutdown(any(ShutdownInput.class)); - verify(leaseCoordinator, never()).dropLease(currentLease); + verify(leaseCoordinator, never()).dropLease(currentLease1); } @Test @@ -337,6 +343,11 @@ public class ShutdownTaskTest { public final void testCallWhenShardEnd() throws Exception { RecordProcessorCheckpointer checkpointer = mock(RecordProcessorCheckpointer.class); when(checkpointer.getLastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END); + final KinesisClientLease parentLease1 = createLease(defaultShardId, "leaseOwner", Collections.emptyList()); + parentLease1.setCheckpoint(new ExtendedSequenceNumber("3298")); + final KinesisClientLease parentLease2 = createLease(defaultShardId, "leaseOwner", Collections.emptyList()); + parentLease2.setCheckpoint(ExtendedSequenceNumber.SHARD_END); + when(leaseManager.getLease(defaultShardId)).thenReturn(parentLease1).thenReturn(parentLease2); boolean cleanupLeasesOfCompletedShards = false; boolean ignoreUnexpectedChildShards = false;