From ae005ce0f898992d36855b5d3fe5737a5449d2e3 Mon Sep 17 00:00:00 2001 From: Joshua Kim Date: Tue, 7 Apr 2020 04:03:38 -0400 Subject: [PATCH] Adding unit tests --- .../kinesis/checkpoint/CheckpointerTest.java | 71 +++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/CheckpointerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/CheckpointerTest.java index 1cf77a3d..b823c8e3 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/CheckpointerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/CheckpointerTest.java @@ -90,6 +90,26 @@ public class CheckpointerTest { Assert.assertEquals(extendedPendingCheckpointNumber, checkpoint.getCheckpointObject(shardId).pendingCheckpoint()); } + + @Test + public final void testInitialPrepareCheckpointWithApplicationState() throws Exception { + String sequenceNumber = "1"; + String pendingCheckpointValue = "99999"; + String shardId = "myShardId"; + byte[] applicationState = "applicationState".getBytes(); + ExtendedSequenceNumber extendedCheckpointNumber = new ExtendedSequenceNumber(sequenceNumber); + checkpoint.setCheckpoint(shardId, new ExtendedSequenceNumber(sequenceNumber), testConcurrencyToken); + + ExtendedSequenceNumber extendedPendingCheckpointNumber = new ExtendedSequenceNumber(pendingCheckpointValue); + checkpoint.prepareCheckpoint(shardId, new ExtendedSequenceNumber(pendingCheckpointValue), testConcurrencyToken, + applicationState); + + Assert.assertEquals(extendedCheckpointNumber, checkpoint.getCheckpoint(shardId)); + Assert.assertEquals(extendedCheckpointNumber, checkpoint.getCheckpointObject(shardId).checkpoint()); + Assert.assertEquals(extendedPendingCheckpointNumber, checkpoint.getCheckpointObject(shardId).pendingCheckpoint()); + Assert.assertEquals(applicationState, checkpoint.getCheckpointObject(shardId).pendingCheckpointState()); + } + @Test public final void testAdvancingPrepareCheckpoint() throws Exception { String shardId = "myShardId"; @@ -107,6 +127,26 @@ public class CheckpointerTest { } } + @Test + public final void testAdvancingPrepareCheckpointWithApplicationState() throws Exception { + String shardId = "myShardId"; + String checkpointValue = "12345"; + byte[] applicationState = "applicationState".getBytes(); + ExtendedSequenceNumber extendedCheckpointNumber = new ExtendedSequenceNumber(checkpointValue); + checkpoint.setCheckpoint(shardId, new ExtendedSequenceNumber(checkpointValue), testConcurrencyToken); + + for (Integer i = 0; i < 10; i++) { + String sequenceNumber = i.toString(); + ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber(sequenceNumber); + checkpoint.prepareCheckpoint(shardId, new ExtendedSequenceNumber(sequenceNumber), testConcurrencyToken, + applicationState); + Assert.assertEquals(extendedCheckpointNumber, checkpoint.getCheckpoint(shardId)); + Assert.assertEquals(extendedCheckpointNumber, checkpoint.getCheckpointObject(shardId).checkpoint()); + Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpointObject(shardId).pendingCheckpoint()); + Assert.assertEquals(applicationState, checkpoint.getCheckpointObject(shardId).pendingCheckpointState()); + } + } + @Test public final void testPrepareAndSetCheckpoint() throws Exception { String checkpointValue = "12345"; @@ -134,4 +174,35 @@ public class CheckpointerTest { Assert.assertEquals(extendedPendingCheckpointNumber, checkpoint.getCheckpointObject(shardId).checkpoint()); Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).pendingCheckpoint()); } + + @Test + public final void testPrepareAndSetCheckpointWithApplicationState() throws Exception { + String checkpointValue = "12345"; + String shardId = "testShardId-1"; + String concurrencyToken = "token-1"; + String pendingCheckpointValue = "99999"; + byte[] applicationState = "applicationState".getBytes(); + + // set initial checkpoint + ExtendedSequenceNumber extendedCheckpointNumber = new ExtendedSequenceNumber(checkpointValue); + checkpoint.setCheckpoint(shardId, new ExtendedSequenceNumber(checkpointValue), concurrencyToken); + Assert.assertEquals(extendedCheckpointNumber, checkpoint.getCheckpoint(shardId)); + Assert.assertEquals(extendedCheckpointNumber, checkpoint.getCheckpointObject(shardId).checkpoint()); + Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).pendingCheckpoint()); + + // prepare checkpoint + ExtendedSequenceNumber extendedPendingCheckpointNumber = new ExtendedSequenceNumber(pendingCheckpointValue); + checkpoint.prepareCheckpoint(shardId, new ExtendedSequenceNumber(pendingCheckpointValue), concurrencyToken, applicationState); + Assert.assertEquals(extendedCheckpointNumber, checkpoint.getCheckpoint(shardId)); + Assert.assertEquals(extendedCheckpointNumber, checkpoint.getCheckpointObject(shardId).checkpoint()); + Assert.assertEquals(extendedPendingCheckpointNumber, checkpoint.getCheckpointObject(shardId).pendingCheckpoint()); + Assert.assertEquals(applicationState, checkpoint.getCheckpointObject(shardId).pendingCheckpointState()); + + // do checkpoint + checkpoint.setCheckpoint(shardId, new ExtendedSequenceNumber(pendingCheckpointValue), concurrencyToken); + Assert.assertEquals(extendedPendingCheckpointNumber, checkpoint.getCheckpoint(shardId)); + Assert.assertEquals(extendedPendingCheckpointNumber, checkpoint.getCheckpointObject(shardId).checkpoint()); + Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).pendingCheckpoint()); + Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).pendingCheckpointState()); + } }