diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/IPreparedCheckpointer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/IPreparedCheckpointer.java index 7d8059e6..04827a63 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/IPreparedCheckpointer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/IPreparedCheckpointer.java @@ -16,7 +16,7 @@ public interface IPreparedCheckpointer { /** * @return sequence number of pending checkpoint */ - ExtendedSequenceNumber getSNOfPendingCheckpoint(); + ExtendedSequenceNumber getPendingCheckpoint(); /** * This method will record a pending checkpoint. diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/checkpoint/Checkpoint.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/checkpoint/Checkpoint.java index 670a775c..d81c632f 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/checkpoint/Checkpoint.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/checkpoint/Checkpoint.java @@ -1,11 +1,12 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; +import lombok.Data; /** * A class encapsulating the 2 pieces of state stored in a checkpoint. */ -public class Checkpoint { +@Data public class Checkpoint { private final ExtendedSequenceNumber checkpoint; private final ExtendedSequenceNumber pendingCheckpoint; @@ -23,44 +24,4 @@ public class Checkpoint { this.checkpoint = checkpoint; this.pendingCheckpoint = pendingCheckpoint; } - - /** - * @return checkpoint sequence number - */ - public ExtendedSequenceNumber getCheckpoint() { - return checkpoint; - } - - /** - * @return pending checkpoint sequence number - */ - public ExtendedSequenceNumber getPendingCheckpoint() { - return pendingCheckpoint; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - Checkpoint that = (Checkpoint) o; - - if (checkpoint != null ? !checkpoint.equals(that.checkpoint) : that.checkpoint != null) return false; - return pendingCheckpoint != null ? pendingCheckpoint.equals(that.pendingCheckpoint) : that.pendingCheckpoint == null; - } - - @Override - public int hashCode() { - int result = checkpoint != null ? checkpoint.hashCode() : 0; - result = 31 * result + (pendingCheckpoint != null ? pendingCheckpoint.hashCode() : 0); - return result; - } - - @Override - public String toString() { - return "Checkpoint{" - + "checkpoint=" + checkpoint - + ", pendingCheckpoint=" + pendingCheckpoint - + '}'; - } -} \ No newline at end of file +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DoesNothingPreparedCheckpointer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DoesNothingPreparedCheckpointer.java index 2e76b81e..ed72f317 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DoesNothingPreparedCheckpointer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DoesNothingPreparedCheckpointer.java @@ -34,7 +34,7 @@ public class DoesNothingPreparedCheckpointer implements IPreparedCheckpointer { * {@inheritDoc} */ @Override - public ExtendedSequenceNumber getSNOfPendingCheckpoint() { + public ExtendedSequenceNumber getPendingCheckpoint() { return sequenceNumber; } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PreparedCheckpointer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PreparedCheckpointer.java index 6d013f14..1b399fcc 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PreparedCheckpointer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PreparedCheckpointer.java @@ -15,17 +15,18 @@ import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber */ public class PreparedCheckpointer implements IPreparedCheckpointer { - private final ExtendedSequenceNumber snToCheckpoint; + private final ExtendedSequenceNumber pendingCheckpointSequenceNumber; private final IRecordProcessorCheckpointer checkpointer; /** * Constructor. * - * @param snToCheckpoint sequence number to checkpoint at + * @param pendingCheckpointSequenceNumber sequence number to checkpoint at * @param checkpointer checkpointer to use */ - public PreparedCheckpointer(ExtendedSequenceNumber snToCheckpoint, IRecordProcessorCheckpointer checkpointer) { - this.snToCheckpoint = snToCheckpoint; + public PreparedCheckpointer(ExtendedSequenceNumber pendingCheckpointSequenceNumber, + IRecordProcessorCheckpointer checkpointer) { + this.pendingCheckpointSequenceNumber = pendingCheckpointSequenceNumber; this.checkpointer = checkpointer; } @@ -33,8 +34,8 @@ public class PreparedCheckpointer implements IPreparedCheckpointer { * {@inheritDoc} */ @Override - public ExtendedSequenceNumber getSNOfPendingCheckpoint() { - return snToCheckpoint; + public ExtendedSequenceNumber getPendingCheckpoint() { + return pendingCheckpointSequenceNumber; } /** @@ -44,6 +45,7 @@ public class PreparedCheckpointer implements IPreparedCheckpointer { public void checkpoint() throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException { - checkpointer.checkpoint(snToCheckpoint.getSequenceNumber(), snToCheckpoint.getSubSequenceNumber()); + checkpointer.checkpoint(pendingCheckpointSequenceNumber.getSequenceNumber(), + pendingCheckpointSequenceNumber.getSubSequenceNumber()); } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointer.java index b640b057..25a106f7 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointer.java @@ -324,7 +324,6 @@ class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer { * @throws InvalidStateException * @throws ThrottlingException * @throws ShutdownException - * @formatteR:off */ private IPreparedCheckpointer doPrepareCheckpoint(ExtendedSequenceNumber extendedSequenceNumber) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException { diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PreparedCheckpointerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PreparedCheckpointerTest.java index f80cf945..bfcd7723 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PreparedCheckpointerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PreparedCheckpointerTest.java @@ -10,13 +10,13 @@ import org.mockito.Mockito; public class PreparedCheckpointerTest { /** - * This test verifies the relationship between the constructor and getSNOfPendingCheckpoint. + * This test verifies the relationship between the constructor and getPendingCheckpoint. */ @Test public void testGetSequenceNumber() { ExtendedSequenceNumber sn = new ExtendedSequenceNumber("sn"); IPreparedCheckpointer checkpointer = new PreparedCheckpointer(sn, null); - Assert.assertEquals(sn, checkpointer.getSNOfPendingCheckpoint()); + Assert.assertEquals(sn, checkpointer.getPendingCheckpoint()); } /** @@ -42,7 +42,7 @@ public class PreparedCheckpointerTest { public void testDoesNothingPreparedCheckpoint() throws Exception { ExtendedSequenceNumber sn = new ExtendedSequenceNumber("sn"); IPreparedCheckpointer checkpointer = new DoesNothingPreparedCheckpointer(sn); - Assert.assertEquals(sn, checkpointer.getSNOfPendingCheckpoint()); + Assert.assertEquals(sn, checkpointer.getPendingCheckpoint()); // nothing happens here checkpointer.checkpoint(); } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointerTest.java index 1242e6b4..31a1e184 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointerTest.java @@ -168,7 +168,7 @@ public class RecordProcessorCheckpointerTest { ExtendedSequenceNumber sequenceNumber1 = new ExtendedSequenceNumber("5001"); processingCheckpointer.setLargestPermittedCheckpointValue(sequenceNumber1); IPreparedCheckpointer preparedCheckpoint = processingCheckpointer.prepareCheckpoint(); - Assert.assertEquals(sequenceNumber1, preparedCheckpoint.getSNOfPendingCheckpoint()); + Assert.assertEquals(sequenceNumber1, preparedCheckpoint.getPendingCheckpoint()); Assert.assertEquals(sequenceNumber1, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint()); // Advance checkpoint @@ -176,7 +176,7 @@ public class RecordProcessorCheckpointerTest { processingCheckpointer.setLargestPermittedCheckpointValue(sequenceNumber2); preparedCheckpoint = processingCheckpointer.prepareCheckpoint(); - Assert.assertEquals(sequenceNumber2, preparedCheckpoint.getSNOfPendingCheckpoint()); + Assert.assertEquals(sequenceNumber2, preparedCheckpoint.getPendingCheckpoint()); Assert.assertEquals(sequenceNumber2, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint()); // Checkpoint using preparedCheckpoint @@ -201,7 +201,7 @@ public class RecordProcessorCheckpointerTest { IPreparedCheckpointer preparedCheckpoint = processingCheckpointer.prepareCheckpoint(record); Assert.assertEquals(startingExtendedSequenceNumber, checkpoint.getCheckpoint(shardId)); Assert.assertEquals(startingExtendedSequenceNumber, checkpoint.getCheckpointObject(shardId).getCheckpoint()); - Assert.assertEquals(extendedSequenceNumber, preparedCheckpoint.getSNOfPendingCheckpoint()); + Assert.assertEquals(extendedSequenceNumber, preparedCheckpoint.getPendingCheckpoint()); Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint()); // Checkpoint using preparedCheckpoint @@ -227,7 +227,7 @@ public class RecordProcessorCheckpointerTest { IPreparedCheckpointer preparedCheckpoint = processingCheckpointer.prepareCheckpoint(subRecord); Assert.assertEquals(startingExtendedSequenceNumber, checkpoint.getCheckpoint(shardId)); Assert.assertEquals(startingExtendedSequenceNumber, checkpoint.getCheckpointObject(shardId).getCheckpoint()); - Assert.assertEquals(extendedSequenceNumber, preparedCheckpoint.getSNOfPendingCheckpoint()); + Assert.assertEquals(extendedSequenceNumber, preparedCheckpoint.getPendingCheckpoint()); Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint()); // Checkpoint using preparedCheckpoint @@ -251,7 +251,7 @@ public class RecordProcessorCheckpointerTest { IPreparedCheckpointer preparedCheckpoint = processingCheckpointer.prepareCheckpoint("5035"); Assert.assertEquals(startingExtendedSequenceNumber, checkpoint.getCheckpoint(shardId)); Assert.assertEquals(startingExtendedSequenceNumber, checkpoint.getCheckpointObject(shardId).getCheckpoint()); - Assert.assertEquals(extendedSequenceNumber, preparedCheckpoint.getSNOfPendingCheckpoint()); + Assert.assertEquals(extendedSequenceNumber, preparedCheckpoint.getPendingCheckpoint()); Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint()); // Checkpoint using preparedCheckpoint @@ -275,7 +275,7 @@ public class RecordProcessorCheckpointerTest { IPreparedCheckpointer preparedCheckpoint = processingCheckpointer.prepareCheckpoint("5040", 0); Assert.assertEquals(startingExtendedSequenceNumber, checkpoint.getCheckpoint(shardId)); Assert.assertEquals(startingExtendedSequenceNumber, checkpoint.getCheckpointObject(shardId).getCheckpoint()); - Assert.assertEquals(extendedSequenceNumber, preparedCheckpoint.getSNOfPendingCheckpoint()); + Assert.assertEquals(extendedSequenceNumber, preparedCheckpoint.getPendingCheckpoint()); Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint()); // Checkpoint using preparedCheckpoint @@ -297,12 +297,12 @@ public class RecordProcessorCheckpointerTest { ExtendedSequenceNumber sn1 = new ExtendedSequenceNumber("6010"); IPreparedCheckpointer firstPreparedCheckpoint = processingCheckpointer.prepareCheckpoint("6010", 0); - Assert.assertEquals(sn1, firstPreparedCheckpoint.getSNOfPendingCheckpoint()); + Assert.assertEquals(sn1, firstPreparedCheckpoint.getPendingCheckpoint()); Assert.assertEquals(sn1, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint()); ExtendedSequenceNumber sn2 = new ExtendedSequenceNumber("6020"); IPreparedCheckpointer secondPreparedCheckpoint = processingCheckpointer.prepareCheckpoint("6020", 0); - Assert.assertEquals(sn2, secondPreparedCheckpoint.getSNOfPendingCheckpoint()); + Assert.assertEquals(sn2, secondPreparedCheckpoint.getPendingCheckpoint()); Assert.assertEquals(sn2, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint()); // checkpoint in order @@ -329,12 +329,12 @@ public class RecordProcessorCheckpointerTest { ExtendedSequenceNumber sn1 = new ExtendedSequenceNumber("7010"); IPreparedCheckpointer firstPreparedCheckpoint = processingCheckpointer.prepareCheckpoint("7010", 0); - Assert.assertEquals(sn1, firstPreparedCheckpoint.getSNOfPendingCheckpoint()); + Assert.assertEquals(sn1, firstPreparedCheckpoint.getPendingCheckpoint()); Assert.assertEquals(sn1, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint()); ExtendedSequenceNumber sn2 = new ExtendedSequenceNumber("7020"); IPreparedCheckpointer secondPreparedCheckpoint = processingCheckpointer.prepareCheckpoint("7020", 0); - Assert.assertEquals(sn2, secondPreparedCheckpoint.getSNOfPendingCheckpoint()); + Assert.assertEquals(sn2, secondPreparedCheckpoint.getPendingCheckpoint()); Assert.assertEquals(sn2, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint()); // checkpoint out of order @@ -503,7 +503,7 @@ public class RecordProcessorCheckpointerTest { IPreparedCheckpointer doesNothingPreparedCheckpoint = processingCheckpointer.prepareCheckpoint(firstSequenceNumber.getSequenceNumber(), firstSequenceNumber.getSubSequenceNumber()); Assert.assertTrue(doesNothingPreparedCheckpoint instanceof DoesNothingPreparedCheckpointer); - Assert.assertEquals(firstSequenceNumber, doesNothingPreparedCheckpoint.getSNOfPendingCheckpoint()); + Assert.assertEquals(firstSequenceNumber, doesNothingPreparedCheckpoint.getPendingCheckpoint()); Assert.assertEquals(firstSequenceNumber, checkpoint.getCheckpoint(shardId)); Assert.assertEquals(firstSequenceNumber, checkpoint.getCheckpointObject(shardId).getCheckpoint()); Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint()); @@ -751,8 +751,8 @@ public class RecordProcessorCheckpointerTest { case PREPARE_THEN_CHECKPOINTER: preparedCheckpoint = processingCheckpointer.prepareCheckpoint(); processingCheckpointer.checkpoint( - preparedCheckpoint.getSNOfPendingCheckpoint().getSequenceNumber(), - preparedCheckpoint.getSNOfPendingCheckpoint().getSubSequenceNumber()); + preparedCheckpoint.getPendingCheckpoint().getSequenceNumber(), + preparedCheckpoint.getPendingCheckpoint().getSubSequenceNumber()); } break; case WITH_SEQUENCE_NUMBER: @@ -766,8 +766,8 @@ public class RecordProcessorCheckpointerTest { case PREPARE_THEN_CHECKPOINTER: preparedCheckpoint = processingCheckpointer.prepareCheckpoint(entry.getKey()); processingCheckpointer.checkpoint( - preparedCheckpoint.getSNOfPendingCheckpoint().getSequenceNumber(), - preparedCheckpoint.getSNOfPendingCheckpoint().getSubSequenceNumber()); + preparedCheckpoint.getPendingCheckpoint().getSequenceNumber(), + preparedCheckpoint.getPendingCheckpoint().getSubSequenceNumber()); } break; } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java index 329a452c..8073d0df 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java @@ -47,9 +47,11 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeMatcher; import org.junit.Test; import org.junit.runner.RunWith; -import org.mockito.ArgumentMatcher; import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; @@ -224,9 +226,11 @@ public class ShardConsumerTest { taskBackoffTimeMillis, KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); + final ExtendedSequenceNumber checkpointSequenceNumber = new ExtendedSequenceNumber("123"); + final ExtendedSequenceNumber pendingCheckpointSequenceNumber = null; when(leaseManager.getLease(anyString())).thenReturn(null); when(checkpoint.getCheckpointObject(anyString())).thenReturn( - new Checkpoint(new ExtendedSequenceNumber("123"), null)); + new Checkpoint(checkpointSequenceNumber, pendingCheckpointSequenceNumber)); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // submit BlockOnParentShardTask @@ -240,7 +244,8 @@ public class ShardConsumerTest { consumer.consumeShard(); // submit InitializeTask Thread.sleep(50L); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); - verify(processor, times(1)).initialize(any(InitializationInput.class)); + verify(processor, times(1)).initialize(argThat( + initializationInputMatcher(checkpointSequenceNumber, pendingCheckpointSequenceNumber))); try { // Checking the status of submitted InitializeTask from above should throw exception. @@ -251,14 +256,17 @@ public class ShardConsumerTest { } Thread.sleep(50L); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); - verify(processor, times(1)).initialize(any(InitializationInput.class)); + verify(processor, times(1)).initialize(argThat( + initializationInputMatcher(checkpointSequenceNumber, pendingCheckpointSequenceNumber))); doNothing().when(processor).initialize(any(InitializationInput.class)); consumer.consumeShard(); // submit InitializeTask again. Thread.sleep(50L); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); - verify(processor, times(2)).initialize(any(InitializationInput.class)); + verify(processor, times(2)).initialize(argThat( + initializationInputMatcher(checkpointSequenceNumber, pendingCheckpointSequenceNumber))); + verify(processor, times(2)).initialize(any(InitializationInput.class)); // no other calls with different args // Checking the status of submitted InitializeTask from above should pass. consumer.consumeShard(); @@ -493,18 +501,9 @@ public class ShardConsumerTest { consumer.consumeShard(); // submit InitializeTask Thread.sleep(50L); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); - verify(processor, times(1)).initialize(argThat(new ArgumentMatcher() { - @Override - public boolean matches(Object argument) { - if (argument instanceof InitializationInput) { - InitializationInput initializationInput = (InitializationInput) argument; - return Objects.equals(checkpointSequenceNumber, initializationInput.getExtendedSequenceNumber()) - && Objects.equals(pendingCheckpointSequenceNumber, - initializationInput.getPendingCheckpointSequenceNumber()); - } - return false; - } - })); + verify(processor, times(1)).initialize(argThat( + initializationInputMatcher(checkpointSequenceNumber, pendingCheckpointSequenceNumber))); + verify(processor, times(1)).initialize(any(InitializationInput.class)); // no other calls with different args consumer.consumeShard(); Thread.sleep(50L); @@ -533,4 +532,21 @@ public class ShardConsumerTest { } return userRecords; } + + Matcher initializationInputMatcher(final ExtendedSequenceNumber checkpoint, + final ExtendedSequenceNumber pendingCheckpoint) { + return new TypeSafeMatcher() { + @Override + protected boolean matchesSafely(InitializationInput item) { + return Objects.equals(checkpoint, item.getExtendedSequenceNumber()) + && Objects.equals(pendingCheckpoint, item.getPendingCheckpointSequenceNumber()); + } + + @Override + public void describeTo(Description description) { + description.appendText(String.format("Checkpoint should be %s and pending checkpoint should be %s", + checkpoint, pendingCheckpoint)); + } + }; + } }