PR comments

This commit is contained in:
Walid Baruni 2017-07-26 13:17:03 -07:00
parent eb9e7e54a0
commit 45585ff941
8 changed files with 65 additions and 87 deletions

View file

@ -16,7 +16,7 @@ public interface IPreparedCheckpointer {
/**
* @return sequence number of pending checkpoint
*/
ExtendedSequenceNumber getSNOfPendingCheckpoint();
ExtendedSequenceNumber getPendingCheckpoint();
/**
* This method will record a pending checkpoint.

View file

@ -1,11 +1,12 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import lombok.Data;
/**
* A class encapsulating the 2 pieces of state stored in a checkpoint.
*/
public class Checkpoint {
@Data public class Checkpoint {
private final ExtendedSequenceNumber checkpoint;
private final ExtendedSequenceNumber pendingCheckpoint;
@ -23,44 +24,4 @@ public class Checkpoint {
this.checkpoint = checkpoint;
this.pendingCheckpoint = pendingCheckpoint;
}
/**
* @return checkpoint sequence number
*/
public ExtendedSequenceNumber getCheckpoint() {
return checkpoint;
}
/**
* @return pending checkpoint sequence number
*/
public ExtendedSequenceNumber getPendingCheckpoint() {
return pendingCheckpoint;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Checkpoint that = (Checkpoint) o;
if (checkpoint != null ? !checkpoint.equals(that.checkpoint) : that.checkpoint != null) return false;
return pendingCheckpoint != null ? pendingCheckpoint.equals(that.pendingCheckpoint) : that.pendingCheckpoint == null;
}
@Override
public int hashCode() {
int result = checkpoint != null ? checkpoint.hashCode() : 0;
result = 31 * result + (pendingCheckpoint != null ? pendingCheckpoint.hashCode() : 0);
return result;
}
@Override
public String toString() {
return "Checkpoint{"
+ "checkpoint=" + checkpoint
+ ", pendingCheckpoint=" + pendingCheckpoint
+ '}';
}
}
}

View file

@ -34,7 +34,7 @@ public class DoesNothingPreparedCheckpointer implements IPreparedCheckpointer {
* {@inheritDoc}
*/
@Override
public ExtendedSequenceNumber getSNOfPendingCheckpoint() {
public ExtendedSequenceNumber getPendingCheckpoint() {
return sequenceNumber;
}

View file

@ -15,17 +15,18 @@ import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber
*/
public class PreparedCheckpointer implements IPreparedCheckpointer {
private final ExtendedSequenceNumber snToCheckpoint;
private final ExtendedSequenceNumber pendingCheckpointSequenceNumber;
private final IRecordProcessorCheckpointer checkpointer;
/**
* Constructor.
*
* @param snToCheckpoint sequence number to checkpoint at
* @param pendingCheckpointSequenceNumber sequence number to checkpoint at
* @param checkpointer checkpointer to use
*/
public PreparedCheckpointer(ExtendedSequenceNumber snToCheckpoint, IRecordProcessorCheckpointer checkpointer) {
this.snToCheckpoint = snToCheckpoint;
public PreparedCheckpointer(ExtendedSequenceNumber pendingCheckpointSequenceNumber,
IRecordProcessorCheckpointer checkpointer) {
this.pendingCheckpointSequenceNumber = pendingCheckpointSequenceNumber;
this.checkpointer = checkpointer;
}
@ -33,8 +34,8 @@ public class PreparedCheckpointer implements IPreparedCheckpointer {
* {@inheritDoc}
*/
@Override
public ExtendedSequenceNumber getSNOfPendingCheckpoint() {
return snToCheckpoint;
public ExtendedSequenceNumber getPendingCheckpoint() {
return pendingCheckpointSequenceNumber;
}
/**
@ -44,6 +45,7 @@ public class PreparedCheckpointer implements IPreparedCheckpointer {
public void checkpoint()
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException,
IllegalArgumentException {
checkpointer.checkpoint(snToCheckpoint.getSequenceNumber(), snToCheckpoint.getSubSequenceNumber());
checkpointer.checkpoint(pendingCheckpointSequenceNumber.getSequenceNumber(),
pendingCheckpointSequenceNumber.getSubSequenceNumber());
}
}

View file

@ -324,7 +324,6 @@ class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer {
* @throws InvalidStateException
* @throws ThrottlingException
* @throws ShutdownException
* @formatteR:off
*/
private IPreparedCheckpointer doPrepareCheckpoint(ExtendedSequenceNumber extendedSequenceNumber)
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {

View file

@ -10,13 +10,13 @@ import org.mockito.Mockito;
public class PreparedCheckpointerTest {
/**
* This test verifies the relationship between the constructor and getSNOfPendingCheckpoint.
* This test verifies the relationship between the constructor and getPendingCheckpoint.
*/
@Test
public void testGetSequenceNumber() {
ExtendedSequenceNumber sn = new ExtendedSequenceNumber("sn");
IPreparedCheckpointer checkpointer = new PreparedCheckpointer(sn, null);
Assert.assertEquals(sn, checkpointer.getSNOfPendingCheckpoint());
Assert.assertEquals(sn, checkpointer.getPendingCheckpoint());
}
/**
@ -42,7 +42,7 @@ public class PreparedCheckpointerTest {
public void testDoesNothingPreparedCheckpoint() throws Exception {
ExtendedSequenceNumber sn = new ExtendedSequenceNumber("sn");
IPreparedCheckpointer checkpointer = new DoesNothingPreparedCheckpointer(sn);
Assert.assertEquals(sn, checkpointer.getSNOfPendingCheckpoint());
Assert.assertEquals(sn, checkpointer.getPendingCheckpoint());
// nothing happens here
checkpointer.checkpoint();
}

View file

@ -168,7 +168,7 @@ public class RecordProcessorCheckpointerTest {
ExtendedSequenceNumber sequenceNumber1 = new ExtendedSequenceNumber("5001");
processingCheckpointer.setLargestPermittedCheckpointValue(sequenceNumber1);
IPreparedCheckpointer preparedCheckpoint = processingCheckpointer.prepareCheckpoint();
Assert.assertEquals(sequenceNumber1, preparedCheckpoint.getSNOfPendingCheckpoint());
Assert.assertEquals(sequenceNumber1, preparedCheckpoint.getPendingCheckpoint());
Assert.assertEquals(sequenceNumber1, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint());
// Advance checkpoint
@ -176,7 +176,7 @@ public class RecordProcessorCheckpointerTest {
processingCheckpointer.setLargestPermittedCheckpointValue(sequenceNumber2);
preparedCheckpoint = processingCheckpointer.prepareCheckpoint();
Assert.assertEquals(sequenceNumber2, preparedCheckpoint.getSNOfPendingCheckpoint());
Assert.assertEquals(sequenceNumber2, preparedCheckpoint.getPendingCheckpoint());
Assert.assertEquals(sequenceNumber2, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint());
// Checkpoint using preparedCheckpoint
@ -201,7 +201,7 @@ public class RecordProcessorCheckpointerTest {
IPreparedCheckpointer preparedCheckpoint = processingCheckpointer.prepareCheckpoint(record);
Assert.assertEquals(startingExtendedSequenceNumber, checkpoint.getCheckpoint(shardId));
Assert.assertEquals(startingExtendedSequenceNumber, checkpoint.getCheckpointObject(shardId).getCheckpoint());
Assert.assertEquals(extendedSequenceNumber, preparedCheckpoint.getSNOfPendingCheckpoint());
Assert.assertEquals(extendedSequenceNumber, preparedCheckpoint.getPendingCheckpoint());
Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint());
// Checkpoint using preparedCheckpoint
@ -227,7 +227,7 @@ public class RecordProcessorCheckpointerTest {
IPreparedCheckpointer preparedCheckpoint = processingCheckpointer.prepareCheckpoint(subRecord);
Assert.assertEquals(startingExtendedSequenceNumber, checkpoint.getCheckpoint(shardId));
Assert.assertEquals(startingExtendedSequenceNumber, checkpoint.getCheckpointObject(shardId).getCheckpoint());
Assert.assertEquals(extendedSequenceNumber, preparedCheckpoint.getSNOfPendingCheckpoint());
Assert.assertEquals(extendedSequenceNumber, preparedCheckpoint.getPendingCheckpoint());
Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint());
// Checkpoint using preparedCheckpoint
@ -251,7 +251,7 @@ public class RecordProcessorCheckpointerTest {
IPreparedCheckpointer preparedCheckpoint = processingCheckpointer.prepareCheckpoint("5035");
Assert.assertEquals(startingExtendedSequenceNumber, checkpoint.getCheckpoint(shardId));
Assert.assertEquals(startingExtendedSequenceNumber, checkpoint.getCheckpointObject(shardId).getCheckpoint());
Assert.assertEquals(extendedSequenceNumber, preparedCheckpoint.getSNOfPendingCheckpoint());
Assert.assertEquals(extendedSequenceNumber, preparedCheckpoint.getPendingCheckpoint());
Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint());
// Checkpoint using preparedCheckpoint
@ -275,7 +275,7 @@ public class RecordProcessorCheckpointerTest {
IPreparedCheckpointer preparedCheckpoint = processingCheckpointer.prepareCheckpoint("5040", 0);
Assert.assertEquals(startingExtendedSequenceNumber, checkpoint.getCheckpoint(shardId));
Assert.assertEquals(startingExtendedSequenceNumber, checkpoint.getCheckpointObject(shardId).getCheckpoint());
Assert.assertEquals(extendedSequenceNumber, preparedCheckpoint.getSNOfPendingCheckpoint());
Assert.assertEquals(extendedSequenceNumber, preparedCheckpoint.getPendingCheckpoint());
Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint());
// Checkpoint using preparedCheckpoint
@ -297,12 +297,12 @@ public class RecordProcessorCheckpointerTest {
ExtendedSequenceNumber sn1 = new ExtendedSequenceNumber("6010");
IPreparedCheckpointer firstPreparedCheckpoint = processingCheckpointer.prepareCheckpoint("6010", 0);
Assert.assertEquals(sn1, firstPreparedCheckpoint.getSNOfPendingCheckpoint());
Assert.assertEquals(sn1, firstPreparedCheckpoint.getPendingCheckpoint());
Assert.assertEquals(sn1, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint());
ExtendedSequenceNumber sn2 = new ExtendedSequenceNumber("6020");
IPreparedCheckpointer secondPreparedCheckpoint = processingCheckpointer.prepareCheckpoint("6020", 0);
Assert.assertEquals(sn2, secondPreparedCheckpoint.getSNOfPendingCheckpoint());
Assert.assertEquals(sn2, secondPreparedCheckpoint.getPendingCheckpoint());
Assert.assertEquals(sn2, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint());
// checkpoint in order
@ -329,12 +329,12 @@ public class RecordProcessorCheckpointerTest {
ExtendedSequenceNumber sn1 = new ExtendedSequenceNumber("7010");
IPreparedCheckpointer firstPreparedCheckpoint = processingCheckpointer.prepareCheckpoint("7010", 0);
Assert.assertEquals(sn1, firstPreparedCheckpoint.getSNOfPendingCheckpoint());
Assert.assertEquals(sn1, firstPreparedCheckpoint.getPendingCheckpoint());
Assert.assertEquals(sn1, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint());
ExtendedSequenceNumber sn2 = new ExtendedSequenceNumber("7020");
IPreparedCheckpointer secondPreparedCheckpoint = processingCheckpointer.prepareCheckpoint("7020", 0);
Assert.assertEquals(sn2, secondPreparedCheckpoint.getSNOfPendingCheckpoint());
Assert.assertEquals(sn2, secondPreparedCheckpoint.getPendingCheckpoint());
Assert.assertEquals(sn2, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint());
// checkpoint out of order
@ -503,7 +503,7 @@ public class RecordProcessorCheckpointerTest {
IPreparedCheckpointer doesNothingPreparedCheckpoint =
processingCheckpointer.prepareCheckpoint(firstSequenceNumber.getSequenceNumber(), firstSequenceNumber.getSubSequenceNumber());
Assert.assertTrue(doesNothingPreparedCheckpoint instanceof DoesNothingPreparedCheckpointer);
Assert.assertEquals(firstSequenceNumber, doesNothingPreparedCheckpoint.getSNOfPendingCheckpoint());
Assert.assertEquals(firstSequenceNumber, doesNothingPreparedCheckpoint.getPendingCheckpoint());
Assert.assertEquals(firstSequenceNumber, checkpoint.getCheckpoint(shardId));
Assert.assertEquals(firstSequenceNumber, checkpoint.getCheckpointObject(shardId).getCheckpoint());
Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint());
@ -751,8 +751,8 @@ public class RecordProcessorCheckpointerTest {
case PREPARE_THEN_CHECKPOINTER:
preparedCheckpoint = processingCheckpointer.prepareCheckpoint();
processingCheckpointer.checkpoint(
preparedCheckpoint.getSNOfPendingCheckpoint().getSequenceNumber(),
preparedCheckpoint.getSNOfPendingCheckpoint().getSubSequenceNumber());
preparedCheckpoint.getPendingCheckpoint().getSequenceNumber(),
preparedCheckpoint.getPendingCheckpoint().getSubSequenceNumber());
}
break;
case WITH_SEQUENCE_NUMBER:
@ -766,8 +766,8 @@ public class RecordProcessorCheckpointerTest {
case PREPARE_THEN_CHECKPOINTER:
preparedCheckpoint = processingCheckpointer.prepareCheckpoint(entry.getKey());
processingCheckpointer.checkpoint(
preparedCheckpoint.getSNOfPendingCheckpoint().getSequenceNumber(),
preparedCheckpoint.getSNOfPendingCheckpoint().getSubSequenceNumber());
preparedCheckpoint.getPendingCheckpoint().getSequenceNumber(),
preparedCheckpoint.getPendingCheckpoint().getSubSequenceNumber());
}
break;
}

View file

@ -47,9 +47,11 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeMatcher;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentMatcher;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
@ -224,9 +226,11 @@ public class ShardConsumerTest {
taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST);
final ExtendedSequenceNumber checkpointSequenceNumber = new ExtendedSequenceNumber("123");
final ExtendedSequenceNumber pendingCheckpointSequenceNumber = null;
when(leaseManager.getLease(anyString())).thenReturn(null);
when(checkpoint.getCheckpointObject(anyString())).thenReturn(
new Checkpoint(new ExtendedSequenceNumber("123"), null));
new Checkpoint(checkpointSequenceNumber, pendingCheckpointSequenceNumber));
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // submit BlockOnParentShardTask
@ -240,7 +244,8 @@ public class ShardConsumerTest {
consumer.consumeShard(); // submit InitializeTask
Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING)));
verify(processor, times(1)).initialize(any(InitializationInput.class));
verify(processor, times(1)).initialize(argThat(
initializationInputMatcher(checkpointSequenceNumber, pendingCheckpointSequenceNumber)));
try {
// Checking the status of submitted InitializeTask from above should throw exception.
@ -251,14 +256,17 @@ public class ShardConsumerTest {
}
Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING)));
verify(processor, times(1)).initialize(any(InitializationInput.class));
verify(processor, times(1)).initialize(argThat(
initializationInputMatcher(checkpointSequenceNumber, pendingCheckpointSequenceNumber)));
doNothing().when(processor).initialize(any(InitializationInput.class));
consumer.consumeShard(); // submit InitializeTask again.
Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING)));
verify(processor, times(2)).initialize(any(InitializationInput.class));
verify(processor, times(2)).initialize(argThat(
initializationInputMatcher(checkpointSequenceNumber, pendingCheckpointSequenceNumber)));
verify(processor, times(2)).initialize(any(InitializationInput.class)); // no other calls with different args
// Checking the status of submitted InitializeTask from above should pass.
consumer.consumeShard();
@ -493,18 +501,9 @@ public class ShardConsumerTest {
consumer.consumeShard(); // submit InitializeTask
Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING)));
verify(processor, times(1)).initialize(argThat(new ArgumentMatcher<InitializationInput>() {
@Override
public boolean matches(Object argument) {
if (argument instanceof InitializationInput) {
InitializationInput initializationInput = (InitializationInput) argument;
return Objects.equals(checkpointSequenceNumber, initializationInput.getExtendedSequenceNumber())
&& Objects.equals(pendingCheckpointSequenceNumber,
initializationInput.getPendingCheckpointSequenceNumber());
}
return false;
}
}));
verify(processor, times(1)).initialize(argThat(
initializationInputMatcher(checkpointSequenceNumber, pendingCheckpointSequenceNumber)));
verify(processor, times(1)).initialize(any(InitializationInput.class)); // no other calls with different args
consumer.consumeShard();
Thread.sleep(50L);
@ -533,4 +532,21 @@ public class ShardConsumerTest {
}
return userRecords;
}
Matcher<InitializationInput> initializationInputMatcher(final ExtendedSequenceNumber checkpoint,
final ExtendedSequenceNumber pendingCheckpoint) {
return new TypeSafeMatcher<InitializationInput>() {
@Override
protected boolean matchesSafely(InitializationInput item) {
return Objects.equals(checkpoint, item.getExtendedSequenceNumber())
&& Objects.equals(pendingCheckpoint, item.getPendingCheckpointSequenceNumber());
}
@Override
public void describeTo(Description description) {
description.appendText(String.format("Checkpoint should be %s and pending checkpoint should be %s",
checkpoint, pendingCheckpoint));
}
};
}
}