Enabled Checkstyle validation of test resources. (#1150)

No functional change.
This commit is contained in:
stair 2023-06-26 15:25:10 -04:00 committed by GitHub
parent 5105317eb4
commit 74d8f4b780
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
28 changed files with 183 additions and 175 deletions

View file

@ -34,9 +34,9 @@ import com.fasterxml.jackson.databind.ObjectMapper;
public class MessageReaderTest { public class MessageReaderTest {
private static final String shardId = "shard-123"; private static final String SHARD_ID = "shard-123";
/* /**
* This line is based on the definition of the protocol for communication between the KCL record processor and * This line is based on the definition of the protocol for communication between the KCL record processor and
* the client's process. * the client's process.
*/ */
@ -44,7 +44,7 @@ public class MessageReaderTest {
return String.format("{\"action\":\"checkpoint\", \"checkpoint\":\"%s\"}", sequenceNumber); return String.format("{\"action\":\"checkpoint\", \"checkpoint\":\"%s\"}", sequenceNumber);
} }
/* /**
* This line is based on the definition of the protocol for communication between the KCL record processor and * This line is based on the definition of the protocol for communication between the KCL record processor and
* the client's process. * the client's process.
*/ */
@ -79,10 +79,9 @@ public class MessageReaderTest {
String[] responseFors = new String[] { "initialize", "processRecords", "processRecords", "shutdown" }; String[] responseFors = new String[] { "initialize", "processRecords", "processRecords", "shutdown" };
InputStream stream = buildInputStreamOfGoodInput(sequenceNumbers, responseFors); InputStream stream = buildInputStreamOfGoodInput(sequenceNumbers, responseFors);
MessageReader reader = MessageReader reader =
new MessageReader().initialize(stream, shardId, new ObjectMapper(), Executors.newCachedThreadPool()); new MessageReader().initialize(stream, SHARD_ID, new ObjectMapper(), Executors.newCachedThreadPool());
for (String responseFor : responseFors) { for (String responseFor : responseFors) {
StatusMessage statusMessage = null;
try { try {
Message message = reader.getNextMessageFromSTDOUT().get(); Message message = reader.getNextMessageFromSTDOUT().get();
if (message instanceof StatusMessage) { if (message instanceof StatusMessage) {
@ -102,14 +101,14 @@ public class MessageReaderTest {
InputStream stream = buildInputStreamOfGoodInput(sequenceNumbers, responseFors); InputStream stream = buildInputStreamOfGoodInput(sequenceNumbers, responseFors);
MessageReader reader = MessageReader reader =
new MessageReader().initialize(stream, shardId, new ObjectMapper(), Executors.newCachedThreadPool()); new MessageReader().initialize(stream, SHARD_ID, new ObjectMapper(), Executors.newCachedThreadPool());
Future<Boolean> drainFuture = reader.drainSTDOUT(); Future<Boolean> drainFuture = reader.drainSTDOUT();
Boolean drainResult = drainFuture.get(); Boolean drainResult = drainFuture.get();
Assert.assertNotNull(drainResult); Assert.assertNotNull(drainResult);
Assert.assertTrue(drainResult); Assert.assertTrue(drainResult);
} }
/* /**
* readValue should fail safely and just continue looping * readValue should fail safely and just continue looping
*/ */
@Test @Test
@ -134,7 +133,7 @@ public class MessageReaderTest {
} }
MessageReader reader = MessageReader reader =
new MessageReader().initialize(bufferReader, shardId, new ObjectMapper(), new MessageReader().initialize(bufferReader, SHARD_ID, new ObjectMapper(),
Executors.newCachedThreadPool()); Executors.newCachedThreadPool());
try { try {
@ -149,7 +148,7 @@ public class MessageReaderTest {
public void messageReaderBuilderTest() { public void messageReaderBuilderTest() {
InputStream stream = new ByteArrayInputStream("".getBytes()); InputStream stream = new ByteArrayInputStream("".getBytes());
MessageReader reader = MessageReader reader =
new MessageReader().initialize(stream, shardId, new ObjectMapper(), Executors.newCachedThreadPool()); new MessageReader().initialize(stream, SHARD_ID, new ObjectMapper(), Executors.newCachedThreadPool());
Assert.assertNotNull(reader); Assert.assertNotNull(reader);
} }
@ -158,7 +157,7 @@ public class MessageReaderTest {
BufferedReader input = Mockito.mock(BufferedReader.class); BufferedReader input = Mockito.mock(BufferedReader.class);
Mockito.doThrow(IOException.class).when(input).readLine(); Mockito.doThrow(IOException.class).when(input).readLine();
MessageReader reader = MessageReader reader =
new MessageReader().initialize(input, shardId, new ObjectMapper(), Executors.newCachedThreadPool()); new MessageReader().initialize(input, SHARD_ID, new ObjectMapper(), Executors.newCachedThreadPool());
Future<Message> readTask = reader.getNextMessageFromSTDOUT(); Future<Message> readTask = reader.getNextMessageFromSTDOUT();
@ -176,7 +175,7 @@ public class MessageReaderTest {
public void noMoreMessagesTest() throws InterruptedException { public void noMoreMessagesTest() throws InterruptedException {
InputStream stream = new ByteArrayInputStream("".getBytes()); InputStream stream = new ByteArrayInputStream("".getBytes());
MessageReader reader = MessageReader reader =
new MessageReader().initialize(stream, shardId, new ObjectMapper(), Executors.newCachedThreadPool()); new MessageReader().initialize(stream, SHARD_ID, new ObjectMapper(), Executors.newCachedThreadPool());
Future<Message> future = reader.getNextMessageFromSTDOUT(); Future<Message> future = reader.getNextMessageFromSTDOUT();
try { try {

View file

@ -44,7 +44,7 @@ import static org.mockito.Mockito.verify;
public class MessageWriterTest { public class MessageWriterTest {
private static final String shardId = "shard-123"; private static final String SHARD_ID = "shard-123";
MessageWriter messageWriter; MessageWriter messageWriter;
OutputStream stream; OutputStream stream;
@ -57,7 +57,7 @@ public class MessageWriterTest {
public void setup() { public void setup() {
stream = Mockito.mock(OutputStream.class); stream = Mockito.mock(OutputStream.class);
messageWriter = messageWriter =
new MessageWriter().initialize(stream, shardId, new ObjectMapper(), Executors.newCachedThreadPool()); new MessageWriter().initialize(stream, SHARD_ID, new ObjectMapper(), Executors.newCachedThreadPool());
} }
/* /*
@ -83,7 +83,7 @@ public class MessageWriterTest {
@Test @Test
public void writeInitializeMessageTest() throws IOException, InterruptedException, ExecutionException { public void writeInitializeMessageTest() throws IOException, InterruptedException, ExecutionException {
Future<Boolean> future = this.messageWriter.writeInitializeMessage(InitializationInput.builder().shardId(shardId).build()); Future<Boolean> future = this.messageWriter.writeInitializeMessage(InitializationInput.builder().shardId(SHARD_ID).build());
future.get(); future.get();
verify(this.stream, Mockito.atLeastOnce()).write(Mockito.any(byte[].class), Mockito.anyInt(), verify(this.stream, Mockito.atLeastOnce()).write(Mockito.any(byte[].class), Mockito.anyInt(),
Mockito.anyInt()); Mockito.anyInt());
@ -128,20 +128,20 @@ public class MessageWriterTest {
@Test @Test
public void streamIOExceptionTest() throws IOException, InterruptedException, ExecutionException { public void streamIOExceptionTest() throws IOException, InterruptedException, ExecutionException {
Mockito.doThrow(IOException.class).when(stream).flush(); Mockito.doThrow(IOException.class).when(stream).flush();
Future<Boolean> initializeTask = this.messageWriter.writeInitializeMessage(InitializationInput.builder().shardId(shardId).build()); Future<Boolean> initializeTask = this.messageWriter.writeInitializeMessage(InitializationInput.builder().shardId(SHARD_ID).build());
Boolean result = initializeTask.get(); Boolean result = initializeTask.get();
Assert.assertNotNull(result); Assert.assertNotNull(result);
Assert.assertFalse(result); Assert.assertFalse(result);
} }
@Test @Test
public void objectMapperFails() throws JsonProcessingException, InterruptedException, ExecutionException { public void objectMapperFails() throws JsonProcessingException {
thrown.expect(RuntimeException.class); thrown.expect(RuntimeException.class);
thrown.expectMessage("Encountered I/O error while writing LeaseLostMessage action to subprocess"); thrown.expectMessage("Encountered I/O error while writing LeaseLostMessage action to subprocess");
ObjectMapper mapper = Mockito.mock(ObjectMapper.class); ObjectMapper mapper = Mockito.mock(ObjectMapper.class);
Mockito.doThrow(JsonProcessingException.class).when(mapper).writeValueAsString(Mockito.any(Message.class)); Mockito.doThrow(JsonProcessingException.class).when(mapper).writeValueAsString(Mockito.any(Message.class));
messageWriter = new MessageWriter().initialize(stream, shardId, mapper, Executors.newCachedThreadPool()); messageWriter = new MessageWriter().initialize(stream, SHARD_ID, mapper, Executors.newCachedThreadPool());
messageWriter.writeLeaseLossMessage(LeaseLostInput.builder().build()); messageWriter.writeLeaseLossMessage(LeaseLostInput.builder().build());
} }
@ -154,7 +154,7 @@ public class MessageWriterTest {
Assert.assertFalse(this.messageWriter.isOpen()); Assert.assertFalse(this.messageWriter.isOpen());
try { try {
// Any message should fail // Any message should fail
this.messageWriter.writeInitializeMessage(InitializationInput.builder().shardId(shardId).build()); this.messageWriter.writeInitializeMessage(InitializationInput.builder().shardId(SHARD_ID).build());
Assert.fail("MessageWriter should be closed and unable to write."); Assert.fail("MessageWriter should be closed and unable to write.");
} catch (IllegalStateException e) { } catch (IllegalStateException e) {
// This should happen. // This should happen.

View file

@ -164,7 +164,7 @@ public class MultiLangDaemonConfigTest {
/** /**
* Verify the daemonConfig properties are what we expect them to be. * Verify the daemonConfig properties are what we expect them to be.
* @param deamonConfig *
* @param expectedStreamName * @param expectedStreamName
*/ */
private void assertConfigurationsMatch(String expectedStreamName, String expectedStreamArn) { private void assertConfigurationsMatch(String expectedStreamName, String expectedStreamArn) {
@ -184,16 +184,15 @@ public class MultiLangDaemonConfigTest {
@Test @Test
public void testPropertyValidation() { public void testPropertyValidation() {
String PROPERTIES_NO_EXECUTABLE_NAME = "applicationName = testApp \n" + "streamName = fakeStream \n" String propertiesNoExecutableName = "applicationName = testApp \n" + "streamName = fakeStream \n"
+ "AWSCredentialsProvider = DefaultAWSCredentialsProviderChain\n" + "processingLanguage = malbolge"; + "AWSCredentialsProvider = DefaultAWSCredentialsProviderChain\n" + "processingLanguage = malbolge";
ClassLoader classLoader = Mockito.mock(ClassLoader.class); ClassLoader classLoader = Mockito.mock(ClassLoader.class);
Mockito.doReturn(new ByteArrayInputStream(PROPERTIES_NO_EXECUTABLE_NAME.getBytes())).when(classLoader) Mockito.doReturn(new ByteArrayInputStream(propertiesNoExecutableName.getBytes())).when(classLoader)
.getResourceAsStream(FILENAME); .getResourceAsStream(FILENAME);
MultiLangDaemonConfig config;
try { try {
config = new MultiLangDaemonConfig(FILENAME, classLoader, configurator); new MultiLangDaemonConfig(FILENAME, classLoader, configurator);
Assert.fail("Construction of the config should have failed due to property validation failing."); Assert.fail("Construction of the config should have failed due to property validation failing.");
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
// Good // Good

View file

@ -30,7 +30,7 @@ import org.mockito.Mockito;
public class ReadSTDERRTaskTest { public class ReadSTDERRTaskTest {
private static final String shardId = "shard-123"; private static final String SHARD_ID = "shard-123";
private BufferedReader mockBufferReader; private BufferedReader mockBufferReader;
@Before @Before
@ -43,7 +43,7 @@ public class ReadSTDERRTaskTest {
String errorMessages = "OMG\nThis is test message\n blah blah blah \n"; String errorMessages = "OMG\nThis is test message\n blah blah blah \n";
InputStream stream = new ByteArrayInputStream(errorMessages.getBytes()); InputStream stream = new ByteArrayInputStream(errorMessages.getBytes());
LineReaderTask<Boolean> reader = new DrainChildSTDERRTask().initialize(stream, shardId, ""); LineReaderTask<Boolean> reader = new DrainChildSTDERRTask().initialize(stream, SHARD_ID, "");
Assert.assertNotNull(reader); Assert.assertNotNull(reader);
} }
@ -52,7 +52,7 @@ public class ReadSTDERRTaskTest {
String errorMessages = "OMG\nThis is test message\n blah blah blah \n"; String errorMessages = "OMG\nThis is test message\n blah blah blah \n";
BufferedReader bufferReader = BufferedReader bufferReader =
new BufferedReader(new InputStreamReader(new ByteArrayInputStream(errorMessages.getBytes()))); new BufferedReader(new InputStreamReader(new ByteArrayInputStream(errorMessages.getBytes())));
LineReaderTask<Boolean> errorReader = new DrainChildSTDERRTask().initialize(bufferReader, shardId, ""); LineReaderTask<Boolean> errorReader = new DrainChildSTDERRTask().initialize(bufferReader, SHARD_ID, "");
Assert.assertNotNull(errorReader); Assert.assertNotNull(errorReader);
Boolean result = errorReader.call(); Boolean result = errorReader.call();
@ -65,7 +65,7 @@ public class ReadSTDERRTaskTest {
} catch (IOException e) { } catch (IOException e) {
Assert.fail("Not supposed to get an exception when we're just building our mock."); Assert.fail("Not supposed to get an exception when we're just building our mock.");
} }
LineReaderTask<Boolean> errorReader = new DrainChildSTDERRTask().initialize(mockBufferReader, shardId, ""); LineReaderTask<Boolean> errorReader = new DrainChildSTDERRTask().initialize(mockBufferReader, SHARD_ID, "");
Assert.assertNotNull(errorReader); Assert.assertNotNull(errorReader);
Future<Boolean> result = Executors.newCachedThreadPool().submit(errorReader); Future<Boolean> result = Executors.newCachedThreadPool().submit(errorReader);
Boolean finishedCleanly = null; Boolean finishedCleanly = null;

View file

@ -23,7 +23,6 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.Collections; import java.util.Collections;
@ -46,9 +45,7 @@ import org.mockito.stubbing.Answer;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import software.amazon.awssdk.services.kinesis.model.Record; import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.KinesisClientLibDependencyException; import software.amazon.kinesis.exceptions.KinesisClientLibDependencyException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.exceptions.ThrottlingException; import software.amazon.kinesis.exceptions.ThrottlingException;
import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
@ -67,7 +64,7 @@ import software.amazon.kinesis.retrieval.KinesisClientRecord;
@RunWith(MockitoJUnitRunner.class) @RunWith(MockitoJUnitRunner.class)
public class StreamingShardRecordProcessorTest { public class StreamingShardRecordProcessorTest {
private static final String shardId = "shard-123"; private static final String SHARD_ID = "shard-123";
private int systemExitCount = 0; private int systemExitCount = 0;
@ -79,77 +76,73 @@ public class StreamingShardRecordProcessorTest {
private RecordProcessorCheckpointer unimplementedCheckpointer = new RecordProcessorCheckpointer() { private RecordProcessorCheckpointer unimplementedCheckpointer = new RecordProcessorCheckpointer() {
@Override @Override
public void checkpoint() throws KinesisClientLibDependencyException, InvalidStateException, public void checkpoint() throws KinesisClientLibDependencyException, ThrottlingException {
ThrottlingException, ShutdownException {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override @Override
public void checkpoint(String sequenceNumber) throws KinesisClientLibDependencyException, public void checkpoint(String sequenceNumber) throws KinesisClientLibDependencyException,
InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException { ThrottlingException, IllegalArgumentException {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override @Override
public void checkpoint(Record record) public void checkpoint(Record record)
throws KinesisClientLibDependencyException, throws KinesisClientLibDependencyException, ThrottlingException {
InvalidStateException, ThrottlingException, ShutdownException {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override @Override
public void checkpoint(String sequenceNumber, long subSequenceNumber) public void checkpoint(String sequenceNumber, long subSequenceNumber)
throws KinesisClientLibDependencyException, throws KinesisClientLibDependencyException, ThrottlingException, IllegalArgumentException {
InvalidStateException, ThrottlingException, ShutdownException,
IllegalArgumentException {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override @Override
public PreparedCheckpointer prepareCheckpoint() public PreparedCheckpointer prepareCheckpoint()
throws KinesisClientLibDependencyException, throws KinesisClientLibDependencyException, ThrottlingException {
InvalidStateException, ThrottlingException, ShutdownException {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override @Override
public PreparedCheckpointer prepareCheckpoint(byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException { public PreparedCheckpointer prepareCheckpoint(byte[] applicationState)
throws KinesisClientLibDependencyException, ThrottlingException {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override @Override
public PreparedCheckpointer prepareCheckpoint(Record record) public PreparedCheckpointer prepareCheckpoint(Record record)
throws KinesisClientLibDependencyException, throws KinesisClientLibDependencyException, ThrottlingException {
InvalidStateException, ThrottlingException, ShutdownException {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override @Override
public PreparedCheckpointer prepareCheckpoint(Record record, byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException { public PreparedCheckpointer prepareCheckpoint(Record record, byte[] applicationState)
throws KinesisClientLibDependencyException, ThrottlingException {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override @Override
public PreparedCheckpointer prepareCheckpoint(String sequenceNumber) public PreparedCheckpointer prepareCheckpoint(String sequenceNumber)
throws KinesisClientLibDependencyException, throws KinesisClientLibDependencyException, ThrottlingException, IllegalArgumentException {
InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override @Override
public PreparedCheckpointer prepareCheckpoint(String sequenceNumber, byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException { public PreparedCheckpointer prepareCheckpoint(String sequenceNumber, byte[] applicationState)
throws KinesisClientLibDependencyException, ThrottlingException, IllegalArgumentException {
return null; return null;
} }
@Override @Override
public PreparedCheckpointer prepareCheckpoint(String sequenceNumber, long subSequenceNumber) public PreparedCheckpointer prepareCheckpoint(String sequenceNumber, long subSequenceNumber)
throws KinesisClientLibDependencyException, throws KinesisClientLibDependencyException, ThrottlingException, IllegalArgumentException {
InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override @Override
public PreparedCheckpointer prepareCheckpoint(String sequenceNumber, long subSequenceNumber, byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException { public PreparedCheckpointer prepareCheckpoint(String sequenceNumber, long subSequenceNumber, byte[] applicationState)
throws KinesisClientLibDependencyException, ThrottlingException, IllegalArgumentException {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@ -171,7 +164,7 @@ public class StreamingShardRecordProcessorTest {
private MultiLangDaemonConfiguration configuration; private MultiLangDaemonConfiguration configuration;
@Before @Before
public void prepare() throws IOException, InterruptedException, ExecutionException { public void prepare() throws InterruptedException, ExecutionException {
// Fake command // Fake command
systemExitCount = 0; systemExitCount = 0;
@ -230,7 +223,7 @@ public class StreamingShardRecordProcessorTest {
List<KinesisClientRecord> testRecords = Collections.emptyList(); List<KinesisClientRecord> testRecords = Collections.emptyList();
recordProcessor.initialize(InitializationInput.builder().shardId(shardId).build()); recordProcessor.initialize(InitializationInput.builder().shardId(SHARD_ID).build());
recordProcessor.processRecords(ProcessRecordsInput.builder().records(testRecords) recordProcessor.processRecords(ProcessRecordsInput.builder().records(testRecords)
.checkpointer(unimplementedCheckpointer).build()); .checkpointer(unimplementedCheckpointer).build());
recordProcessor.processRecords(ProcessRecordsInput.builder().records(testRecords) recordProcessor.processRecords(ProcessRecordsInput.builder().records(testRecords)
@ -240,7 +233,6 @@ public class StreamingShardRecordProcessorTest {
@Test @Test
public void processorPhasesTest() throws InterruptedException, ExecutionException { public void processorPhasesTest() throws InterruptedException, ExecutionException {
Answer<StatusMessage> answer = new Answer<StatusMessage>() { Answer<StatusMessage> answer = new Answer<StatusMessage>() {
StatusMessage[] answers = new StatusMessage[] { new StatusMessage(InitializeMessage.ACTION), StatusMessage[] answers = new StatusMessage[] { new StatusMessage(InitializeMessage.ACTION),
@ -263,7 +255,7 @@ public class StreamingShardRecordProcessorTest {
verify(messageWriter) verify(messageWriter)
.writeInitializeMessage(argThat(Matchers.withInit( .writeInitializeMessage(argThat(Matchers.withInit(
InitializationInput.builder().shardId(shardId).build()))); InitializationInput.builder().shardId(SHARD_ID).build())));
verify(messageWriter, times(2)).writeProcessRecordsMessage(any(ProcessRecordsInput.class)); verify(messageWriter, times(2)).writeProcessRecordsMessage(any(ProcessRecordsInput.class));
verify(messageWriter).writeLeaseLossMessage(any(LeaseLostInput.class)); verify(messageWriter).writeLeaseLossMessage(any(LeaseLostInput.class));
} }
@ -295,7 +287,7 @@ public class StreamingShardRecordProcessorTest {
phases(answer); phases(answer);
verify(messageWriter).writeInitializeMessage(argThat(Matchers.withInit(InitializationInput.builder() verify(messageWriter).writeInitializeMessage(argThat(Matchers.withInit(InitializationInput.builder()
.shardId(shardId).build()))); .shardId(SHARD_ID).build())));
verify(messageWriter, times(2)).writeProcessRecordsMessage(any(ProcessRecordsInput.class)); verify(messageWriter, times(2)).writeProcessRecordsMessage(any(ProcessRecordsInput.class));
verify(messageWriter, never()).writeLeaseLossMessage(any(LeaseLostInput.class)); verify(messageWriter, never()).writeLeaseLossMessage(any(LeaseLostInput.class));
Assert.assertEquals(1, systemExitCount); Assert.assertEquals(1, systemExitCount);

View file

@ -39,8 +39,7 @@ public class InMemoryCheckpointer implements Checkpointer {
* {@inheritDoc} * {@inheritDoc}
*/ */
@Override @Override
public void setCheckpoint(String leaseKey, ExtendedSequenceNumber checkpointValue, String concurrencyToken) public void setCheckpoint(String leaseKey, ExtendedSequenceNumber checkpointValue, String concurrencyToken) {
throws KinesisClientLibException {
checkpoints.put(leaseKey, checkpointValue); checkpoints.put(leaseKey, checkpointValue);
flushpoints.put(leaseKey, checkpointValue); flushpoints.put(leaseKey, checkpointValue);
pendingCheckpoints.remove(leaseKey); pendingCheckpoints.remove(leaseKey);
@ -49,33 +48,32 @@ public class InMemoryCheckpointer implements Checkpointer {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("shardId: {} checkpoint: {}", leaseKey, checkpointValue); log.debug("shardId: {} checkpoint: {}", leaseKey, checkpointValue);
} }
} }
/** /**
* {@inheritDoc} * {@inheritDoc}
*/ */
@Override @Override
public ExtendedSequenceNumber getCheckpoint(String leaseKey) throws KinesisClientLibException { public ExtendedSequenceNumber getCheckpoint(String leaseKey) {
ExtendedSequenceNumber checkpoint = flushpoints.get(leaseKey); ExtendedSequenceNumber checkpoint = flushpoints.get(leaseKey);
log.debug("checkpoint shardId: {} checkpoint: {}", leaseKey, checkpoint); log.debug("checkpoint shardId: {} checkpoint: {}", leaseKey, checkpoint);
return checkpoint; return checkpoint;
} }
@Override @Override
public void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken) public void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken) {
throws KinesisClientLibException {
prepareCheckpoint(leaseKey, pendingCheckpoint, concurrencyToken, null); prepareCheckpoint(leaseKey, pendingCheckpoint, concurrencyToken, null);
} }
@Override @Override
public void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken, byte[] pendingCheckpointState) throws KinesisClientLibException { public void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken,
byte[] pendingCheckpointState) {
pendingCheckpoints.put(leaseKey, pendingCheckpoint); pendingCheckpoints.put(leaseKey, pendingCheckpoint);
pendingCheckpointStates.put(leaseKey, pendingCheckpointState); pendingCheckpointStates.put(leaseKey, pendingCheckpointState);
} }
@Override @Override
public Checkpoint getCheckpointObject(String leaseKey) throws KinesisClientLibException { public Checkpoint getCheckpointObject(String leaseKey) {
ExtendedSequenceNumber checkpoint = flushpoints.get(leaseKey); ExtendedSequenceNumber checkpoint = flushpoints.get(leaseKey);
ExtendedSequenceNumber pendingCheckpoint = pendingCheckpoints.get(leaseKey); ExtendedSequenceNumber pendingCheckpoint = pendingCheckpoints.get(leaseKey);
byte[] pendingCheckpointState = pendingCheckpointStates.get(leaseKey); byte[] pendingCheckpointState = pendingCheckpointStates.get(leaseKey);

View file

@ -105,7 +105,7 @@ public class DeterministicShuffleShardSyncLeaderDeciderTest {
@Test @Test
public void testElectedLeadersAsPerExpectedShufflingOrder() public void testElectedLeadersAsPerExpectedShufflingOrder()
throws Exception { throws Exception {
List<Lease> leases = getLeases(5, false /*emptyLeaseOwner */,false /* duplicateLeaseOwner */, true /* activeLeases */); List<Lease> leases = getLeases(5, false /*emptyLeaseOwner */, false /* duplicateLeaseOwner */, true /* activeLeases */);
when(leaseRefresher.listLeases()).thenReturn(leases); when(leaseRefresher.listLeases()).thenReturn(leases);
Set<String> expectedLeaders = getExpectedLeaders(leases); Set<String> expectedLeaders = getExpectedLeaders(leases);
for (String leader : expectedLeaders) { for (String leader : expectedLeaders) {

View file

@ -86,7 +86,7 @@ public class DiagnosticEventsTest {
assertEquals(event.getLargestPoolSize(), largestPoolSize); assertEquals(event.getLargestPoolSize(), largestPoolSize);
assertEquals(event.getMaximumPoolSize(), maximumPoolSize); assertEquals(event.getMaximumPoolSize(), maximumPoolSize);
assertEquals(event.getLeasesOwned(), leaseAssignments.size()); assertEquals(event.getLeasesOwned(), leaseAssignments.size());
assertEquals(event.getCurrentQueueSize(),0); assertEquals(0, event.getCurrentQueueSize());
verify(defaultHandler, times(1)).visit(event); verify(defaultHandler, times(1)).visit(event);
} }
@ -110,7 +110,7 @@ public class DiagnosticEventsTest {
assertEquals(event.getExecutorStateEvent().getLargestPoolSize(), largestPoolSize); assertEquals(event.getExecutorStateEvent().getLargestPoolSize(), largestPoolSize);
assertEquals(event.getExecutorStateEvent().getMaximumPoolSize(), maximumPoolSize); assertEquals(event.getExecutorStateEvent().getMaximumPoolSize(), maximumPoolSize);
assertEquals(event.getExecutorStateEvent().getLeasesOwned(), leaseAssignments.size()); assertEquals(event.getExecutorStateEvent().getLeasesOwned(), leaseAssignments.size());
assertEquals(event.getExecutorStateEvent().getCurrentQueueSize(),0); assertEquals(0, event.getExecutorStateEvent().getCurrentQueueSize());
assertTrue(event.getThrowable() instanceof TestRejectedTaskException); assertTrue(event.getThrowable() instanceof TestRejectedTaskException);
verify(defaultHandler, times(1)).visit(event); verify(defaultHandler, times(1)).visit(event);
@ -136,7 +136,7 @@ public class DiagnosticEventsTest {
assertEquals(executorStateEvent.getLargestPoolSize(), largestPoolSize); assertEquals(executorStateEvent.getLargestPoolSize(), largestPoolSize);
assertEquals(executorStateEvent.getMaximumPoolSize(), maximumPoolSize); assertEquals(executorStateEvent.getMaximumPoolSize(), maximumPoolSize);
assertEquals(executorStateEvent.getLeasesOwned(), leaseAssignments.size()); assertEquals(executorStateEvent.getLeasesOwned(), leaseAssignments.size());
assertEquals(executorStateEvent.getCurrentQueueSize(),0); assertEquals(0, executorStateEvent.getCurrentQueueSize());
RejectedTaskEvent rejectedTaskEvent = factory.rejectedTaskEvent(executorStateEvent, RejectedTaskEvent rejectedTaskEvent = factory.rejectedTaskEvent(executorStateEvent,
new TestRejectedTaskException()); new TestRejectedTaskException());
@ -145,7 +145,7 @@ public class DiagnosticEventsTest {
assertEquals(rejectedTaskEvent.getExecutorStateEvent().getLargestPoolSize(), largestPoolSize); assertEquals(rejectedTaskEvent.getExecutorStateEvent().getLargestPoolSize(), largestPoolSize);
assertEquals(rejectedTaskEvent.getExecutorStateEvent().getMaximumPoolSize(), maximumPoolSize); assertEquals(rejectedTaskEvent.getExecutorStateEvent().getMaximumPoolSize(), maximumPoolSize);
assertEquals(rejectedTaskEvent.getExecutorStateEvent().getLeasesOwned(), leaseAssignments.size()); assertEquals(rejectedTaskEvent.getExecutorStateEvent().getLeasesOwned(), leaseAssignments.size());
assertEquals(rejectedTaskEvent.getExecutorStateEvent().getCurrentQueueSize(),0); assertEquals(0, rejectedTaskEvent.getExecutorStateEvent().getCurrentQueueSize());
assertTrue(rejectedTaskEvent.getThrowable() instanceof TestRejectedTaskException); assertTrue(rejectedTaskEvent.getThrowable() instanceof TestRejectedTaskException);
} }

View file

@ -210,7 +210,7 @@ public class PeriodicShardSyncManagerTest {
}}.stream().map(hashKeyRangeForLease -> { }}.stream().map(hashKeyRangeForLease -> {
MultiStreamLease lease = new MultiStreamLease(); MultiStreamLease lease = new MultiStreamLease();
lease.hashKeyRange(hashKeyRangeForLease); lease.hashKeyRange(hashKeyRangeForLease);
if(lease.hashKeyRangeForLease().startingHashKey().toString().equals("4")) { if (lease.hashKeyRangeForLease().startingHashKey().toString().equals("4")) {
lease.checkpoint(ExtendedSequenceNumber.SHARD_END); lease.checkpoint(ExtendedSequenceNumber.SHARD_END);
} else { } else {
lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
@ -342,7 +342,7 @@ public class PeriodicShardSyncManagerTest {
lease.leaseKey(MultiStreamLease.getLeaseKey(streamIdentifier.serialize(), "shard-"+(++leaseCounter[0]))); lease.leaseKey(MultiStreamLease.getLeaseKey(streamIdentifier.serialize(), "shard-"+(++leaseCounter[0])));
lease.shardId("shard-"+(leaseCounter[0])); lease.shardId("shard-"+(leaseCounter[0]));
// Setting the hashrange only for last two leases // Setting the hashrange only for last two leases
if(leaseCounter[0] >= 3) { if (leaseCounter[0] >= 3) {
lease.hashKeyRange(hashKeyRangeForLease); lease.hashKeyRange(hashKeyRangeForLease);
} }
lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
@ -355,7 +355,7 @@ public class PeriodicShardSyncManagerTest {
Assert.assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()); Assert.assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync());
// Assert that all the leases now has hashRanges set. // Assert that all the leases now has hashRanges set.
for(Lease lease : multiStreamLeases) { for (Lease lease : multiStreamLeases) {
Assert.assertNotNull(lease.hashKeyRangeForLease()); Assert.assertNotNull(lease.hashKeyRangeForLease());
} }
} }
@ -390,7 +390,7 @@ public class PeriodicShardSyncManagerTest {
lease.leaseKey(MultiStreamLease.getLeaseKey(streamIdentifier.serialize(), "shard-"+(++leaseCounter[0]))); lease.leaseKey(MultiStreamLease.getLeaseKey(streamIdentifier.serialize(), "shard-"+(++leaseCounter[0])));
lease.shardId("shard-"+(leaseCounter[0])); lease.shardId("shard-"+(leaseCounter[0]));
// Setting the hashrange only for last two leases // Setting the hashrange only for last two leases
if(leaseCounter[0] >= 3) { if (leaseCounter[0] >= 3) {
lease.hashKeyRange(hashKeyRangeForLease); lease.hashKeyRange(hashKeyRangeForLease);
} }
lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
@ -403,14 +403,14 @@ public class PeriodicShardSyncManagerTest {
Assert.assertTrue(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()); Assert.assertTrue(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync());
// Assert that all the leases now has hashRanges set. // Assert that all the leases now has hashRanges set.
for(Lease lease : multiStreamLeases) { for (Lease lease : multiStreamLeases) {
Assert.assertNotNull(lease.hashKeyRangeForLease()); Assert.assertNotNull(lease.hashKeyRangeForLease());
} }
} }
@Test @Test
public void testFor1000DifferentValidSplitHierarchyTreeTheHashRangesAreAlwaysComplete() { public void testFor1000DifferentValidSplitHierarchyTreeTheHashRangesAreAlwaysComplete() {
for(int i=0; i < 1000; i++) { for (int i=0; i < 1000; i++) {
int maxInitialLeaseCount = 100; int maxInitialLeaseCount = 100;
List<Lease> leases = generateInitialLeases(maxInitialLeaseCount); List<Lease> leases = generateInitialLeases(maxInitialLeaseCount);
reshard(leases, 5, ReshardType.SPLIT, maxInitialLeaseCount, false); reshard(leases, 5, ReshardType.SPLIT, maxInitialLeaseCount, false);
@ -514,7 +514,7 @@ public class PeriodicShardSyncManagerTest {
for (int i = 0; i < leasesToMerge; i += 2) { for (int i = 0; i < leasesToMerge; i += 2) {
Lease parent1 = leasesEligibleForMerge.get(i); Lease parent1 = leasesEligibleForMerge.get(i);
Lease parent2 = leasesEligibleForMerge.get(i + 1); Lease parent2 = leasesEligibleForMerge.get(i + 1);
if(parent2.hashKeyRangeForLease().startingHashKey().subtract(parent1.hashKeyRangeForLease().endingHashKey()).equals(BigInteger.ONE)) if (parent2.hashKeyRangeForLease().startingHashKey().subtract(parent1.hashKeyRangeForLease().endingHashKey()).equals(BigInteger.ONE))
{ {
parent1.checkpoint(ExtendedSequenceNumber.SHARD_END); parent1.checkpoint(ExtendedSequenceNumber.SHARD_END);
if (!shouldKeepSomeParentsInProgress || (shouldKeepSomeParentsInProgress && isOneFromDiceRoll())) { if (!shouldKeepSomeParentsInProgress || (shouldKeepSomeParentsInProgress && isOneFromDiceRoll())) {

View file

@ -35,7 +35,6 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import static org.mockito.internal.verification.VerificationModeFactory.atMost; import static org.mockito.internal.verification.VerificationModeFactory.atMost;
import static software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy.*;
import java.time.Duration; import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
@ -104,6 +103,9 @@ import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.MetricsConfig; import software.amazon.kinesis.metrics.MetricsConfig;
import software.amazon.kinesis.processor.Checkpointer; import software.amazon.kinesis.processor.Checkpointer;
import software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy; import software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy;
import software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy.AutoDetectionAndDeferredDeletionStrategy;
import software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy.NoLeaseDeletionStrategy;
import software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy.ProvidedStreamsDeferredDeletionStrategy;
import software.amazon.kinesis.processor.MultiStreamTracker; import software.amazon.kinesis.processor.MultiStreamTracker;
import software.amazon.kinesis.processor.ProcessorConfig; import software.amazon.kinesis.processor.ProcessorConfig;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory; import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
@ -727,8 +729,8 @@ public class SchedulerTest {
boolean expectPendingStreamsForDeletion, boolean expectPendingStreamsForDeletion,
boolean onlyStreamsNoLeasesDeletion) boolean onlyStreamsNoLeasesDeletion)
throws DependencyException, ProvisionedThroughputException, InvalidStateException { throws DependencyException, ProvisionedThroughputException, InvalidStateException {
List<StreamConfig> streamConfigList1 = createDummyStreamConfigList(1,5); List<StreamConfig> streamConfigList1 = createDummyStreamConfigList(1, 5);
List<StreamConfig> streamConfigList2 = createDummyStreamConfigList(3,7); List<StreamConfig> streamConfigList2 = createDummyStreamConfigList(3, 7);
retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName) retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName)
.retrievalFactory(retrievalFactory); .retrievalFactory(retrievalFactory);
when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList1, streamConfigList2); when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList1, streamConfigList2);
@ -742,7 +744,7 @@ public class SchedulerTest {
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))) Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)))
.collect(Collectors.toCollection(HashSet::new)); .collect(Collectors.toCollection(HashSet::new));
if(onlyStreamsNoLeasesDeletion) { if (onlyStreamsNoLeasesDeletion) {
expectedSyncedStreams = IntStream.concat(IntStream.range(1, 3), IntStream.range(5, 7)) expectedSyncedStreams = IntStream.concat(IntStream.range(1, 3), IntStream.range(5, 7))
.mapToObj(streamId -> StreamIdentifier.multiStreamInstance( .mapToObj(streamId -> StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))) Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)))
@ -756,7 +758,7 @@ public class SchedulerTest {
Assert.assertEquals(expectedSyncedStreams, syncedStreams); Assert.assertEquals(expectedSyncedStreams, syncedStreams);
List<StreamConfig> expectedCurrentStreamConfigs; List<StreamConfig> expectedCurrentStreamConfigs;
if(onlyStreamsNoLeasesDeletion) { if (onlyStreamsNoLeasesDeletion) {
expectedCurrentStreamConfigs = IntStream.range(3, 7).mapToObj(streamId -> new StreamConfig( expectedCurrentStreamConfigs = IntStream.range(3, 7).mapToObj(streamId -> new StreamConfig(
StreamIdentifier.multiStreamInstance( StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)), Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)),
@ -778,8 +780,8 @@ public class SchedulerTest {
@Test @Test
public void testKinesisStaleDeletedStreamCleanup() throws ProvisionedThroughputException, InvalidStateException, DependencyException { public void testKinesisStaleDeletedStreamCleanup() throws ProvisionedThroughputException, InvalidStateException, DependencyException {
List<StreamConfig> streamConfigList1 = createDummyStreamConfigList(1,6); List<StreamConfig> streamConfigList1 = createDummyStreamConfigList(1, 6);
List<StreamConfig> streamConfigList2 = createDummyStreamConfigList(1,4); List<StreamConfig> streamConfigList2 = createDummyStreamConfigList(1, 4);
prepareForStaleDeletedStreamCleanupTests(streamConfigList1, streamConfigList2); prepareForStaleDeletedStreamCleanupTests(streamConfigList1, streamConfigList2);
@ -820,7 +822,7 @@ public class SchedulerTest {
@Test @Test
public void testKinesisStaleDeletedStreamNoCleanUpForTrackedStream() public void testKinesisStaleDeletedStreamNoCleanUpForTrackedStream()
throws ProvisionedThroughputException, InvalidStateException, DependencyException { throws ProvisionedThroughputException, InvalidStateException, DependencyException {
List<StreamConfig> streamConfigList1 = createDummyStreamConfigList(1,6); List<StreamConfig> streamConfigList1 = createDummyStreamConfigList(1, 6);
prepareForStaleDeletedStreamCleanupTests(streamConfigList1); prepareForStaleDeletedStreamCleanupTests(streamConfigList1);
scheduler.deletedStreamListProvider().add(createDummyStreamConfig(3).streamIdentifier()); scheduler.deletedStreamListProvider().add(createDummyStreamConfig(3).streamIdentifier());
@ -1243,7 +1245,7 @@ public class SchedulerTest {
@Override @Override
public ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFactory, public ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFactory,
StreamConfig streamConfig, DeletedStreamListProvider deletedStreamListProvider) { StreamConfig streamConfig, DeletedStreamListProvider deletedStreamListProvider) {
if(shouldReturnDefaultShardSyncTaskmanager) { if (shouldReturnDefaultShardSyncTaskmanager) {
return shardSyncTaskManager; return shardSyncTaskManager;
} }
final ShardSyncTaskManager shardSyncTaskManager = mock(ShardSyncTaskManager.class); final ShardSyncTaskManager shardSyncTaskManager = mock(ShardSyncTaskManager.class);
@ -1255,7 +1257,7 @@ public class SchedulerTest {
when(shardSyncTaskManager.hierarchicalShardSyncer()).thenReturn(hierarchicalShardSyncer); when(shardSyncTaskManager.hierarchicalShardSyncer()).thenReturn(hierarchicalShardSyncer);
when(shardDetector.streamIdentifier()).thenReturn(streamConfig.streamIdentifier()); when(shardDetector.streamIdentifier()).thenReturn(streamConfig.streamIdentifier());
when(shardSyncTaskManager.callShardSyncTask()).thenReturn(new TaskResult(null)); when(shardSyncTaskManager.callShardSyncTask()).thenReturn(new TaskResult(null));
if(shardSyncFirstAttemptFailure) { if (shardSyncFirstAttemptFailure) {
when(shardDetector.listShards()) when(shardDetector.listShards())
.thenThrow(new RuntimeException("Service Exception")) .thenThrow(new RuntimeException("Service Exception"))
.thenReturn(Collections.EMPTY_LIST); .thenReturn(Collections.EMPTY_LIST);

View file

@ -118,8 +118,7 @@ public class WorkerTest {
private static final IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY_V2 = SAMPLE_RECORD_PROCESSOR_FACTORY; private static final IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY_V2 = SAMPLE_RECORD_PROCESSOR_FACTORY;
*//*
*//**
* Test method for {@link Worker#getApplicationName()}. * Test method for {@link Worker#getApplicationName()}.
*//* *//*
@Test @Test
@ -346,7 +345,7 @@ public class WorkerTest {
Assert.assertTrue(count > 0); Assert.assertTrue(count > 0);
} }
*//** *//*
* Runs worker with threadPoolSize == numShards * Runs worker with threadPoolSize == numShards
* Test method for {@link Worker#run()}. * Test method for {@link Worker#run()}.
*//* *//*
@ -357,7 +356,7 @@ public class WorkerTest {
runAndTestWorker(numShards, threadPoolSize); runAndTestWorker(numShards, threadPoolSize);
} }
*//** *//*
* Runs worker with threadPoolSize < numShards * Runs worker with threadPoolSize < numShards
* Test method for {@link Worker#run()}. * Test method for {@link Worker#run()}.
*//* *//*
@ -368,7 +367,7 @@ public class WorkerTest {
runAndTestWorker(numShards, threadPoolSize); runAndTestWorker(numShards, threadPoolSize);
} }
*//** *//*
* Runs worker with threadPoolSize > numShards * Runs worker with threadPoolSize > numShards
* Test method for {@link Worker#run()}. * Test method for {@link Worker#run()}.
*//* *//*
@ -379,7 +378,7 @@ public class WorkerTest {
runAndTestWorker(numShards, threadPoolSize); runAndTestWorker(numShards, threadPoolSize);
} }
*//** *//*
* Runs worker with threadPoolSize < numShards * Runs worker with threadPoolSize < numShards
* Test method for {@link Worker#run()}. * Test method for {@link Worker#run()}.
*//* *//*
@ -395,7 +394,7 @@ public class WorkerTest {
runAndTestWorker(shardList, threadPoolSize, initialLeases, callProcessRecordsForEmptyRecordList, numberOfRecordsPerShard, config); runAndTestWorker(shardList, threadPoolSize, initialLeases, callProcessRecordsForEmptyRecordList, numberOfRecordsPerShard, config);
} }
*//** *//*
* Runs worker with threadPoolSize < numShards * Runs worker with threadPoolSize < numShards
* Test method for {@link Worker#run()}. * Test method for {@link Worker#run()}.
*//* *//*
@ -557,7 +556,7 @@ public class WorkerTest {
verify(v2RecordProcessor, times(1)).shutdown(any(ShutdownInput.class)); verify(v2RecordProcessor, times(1)).shutdown(any(ShutdownInput.class));
} }
*//** *//*
* This test is testing the {@link Worker}'s shutdown behavior and by extension the behavior of * This test is testing the {@link Worker}'s shutdown behavior and by extension the behavior of
* {@link ThreadPoolExecutor#shutdownNow()}. It depends on the thread pool sending an interrupt to the pool threads. * {@link ThreadPoolExecutor#shutdownNow()}. It depends on the thread pool sending an interrupt to the pool threads.
* This behavior makes the test a bit racy, since we need to ensure a specific order of events. * This behavior makes the test a bit racy, since we need to ensure a specific order of events.
@ -1734,7 +1733,8 @@ public class WorkerTest {
return new ReflectionFieldMatcher<>(itemClass, fieldName, fieldMatcher); return new ReflectionFieldMatcher<>(itemClass, fieldName, fieldMatcher);
} }
} }
*//**
*//*
* Returns executor service that will be owned by the worker. This is useful to test the scenario * Returns executor service that will be owned by the worker. This is useful to test the scenario
* where worker shuts down the executor service also during shutdown flow. * where worker shuts down the executor service also during shutdown flow.
* *
@ -1756,9 +1756,6 @@ public class WorkerTest {
return shards; return shards;
} }
*//**
* @return
*//*
private List<Shard> createShardListWithOneSplit() { private List<Shard> createShardListWithOneSplit() {
List<Shard> shards = new ArrayList<Shard>(); List<Shard> shards = new ArrayList<Shard>();
SequenceNumberRange range0 = ShardObjectHelper.newSequenceNumberRange("39428", "987324"); SequenceNumberRange range0 = ShardObjectHelper.newSequenceNumberRange("39428", "987324");

View file

@ -1592,7 +1592,7 @@ public class HierarchicalShardSyncerTest {
assertExpectedLeasesAreCreated(SHARD_GRAPH_A, shardIdsOfCurrentLeases, INITIAL_POSITION_TRIM_HORIZON); assertExpectedLeasesAreCreated(SHARD_GRAPH_A, shardIdsOfCurrentLeases, INITIAL_POSITION_TRIM_HORIZON);
} }
/** /*
* <pre> * <pre>
* Shard structure (x-axis is epochs): * Shard structure (x-axis is epochs):
* 0 3 6 9 * 0 3 6 9
@ -1869,7 +1869,7 @@ public class HierarchicalShardSyncerTest {
assertExpectedLeasesAreCreated(SHARD_GRAPH_A, shardIdsOfCurrentLeases, INITIAL_POSITION_AT_TIMESTAMP); assertExpectedLeasesAreCreated(SHARD_GRAPH_A, shardIdsOfCurrentLeases, INITIAL_POSITION_AT_TIMESTAMP);
} }
/** /*
* <pre> * <pre>
* Shard structure (x-axis is epochs): * Shard structure (x-axis is epochs):
* 0 3 6 9 * 0 3 6 9
@ -2325,12 +2325,16 @@ public class HierarchicalShardSyncerTest {
@Test @Test
public void testEmptyLeaseTablePopulatesLeasesWithCompleteHashRangeAfterTwoRetries() throws Exception { public void testEmptyLeaseTablePopulatesLeasesWithCompleteHashRangeAfterTwoRetries() throws Exception {
final List<Shard> shardsWithIncompleteHashRange = Arrays.asList( final List<Shard> shardsWithIncompleteHashRange = Arrays.asList(
ShardObjectHelper.newShard("shardId-0", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange(ShardObjectHelper.MIN_HASH_KEY, "69")), ShardObjectHelper.newShard("shardId-0", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"),
ShardObjectHelper.newShard("shardId-1", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange("71", ShardObjectHelper.MAX_HASH_KEY)) ShardObjectHelper.newHashKeyRange(ShardObjectHelper.MIN_HASH_KEY, "69")),
ShardObjectHelper.newShard("shardId-1", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"),
ShardObjectHelper.newHashKeyRange("71", ShardObjectHelper.MAX_HASH_KEY))
); );
final List<Shard> shardsWithCompleteHashRange = Arrays.asList( final List<Shard> shardsWithCompleteHashRange = Arrays.asList(
ShardObjectHelper.newShard("shardId-2", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange(ShardObjectHelper.MIN_HASH_KEY, "420")), ShardObjectHelper.newShard("shardId-2", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"),
ShardObjectHelper.newShard("shardId-3", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange("421", ShardObjectHelper.MAX_HASH_KEY)) ShardObjectHelper.newHashKeyRange(ShardObjectHelper.MIN_HASH_KEY, "420")),
ShardObjectHelper.newShard("shardId-3", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"),
ShardObjectHelper.newHashKeyRange("421", ShardObjectHelper.MAX_HASH_KEY))
); );
when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(true); when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(true);
@ -2352,8 +2356,10 @@ public class HierarchicalShardSyncerTest {
@Test @Test
public void testEmptyLeaseTablePopulatesLeasesWithCompleteHashRange() throws Exception { public void testEmptyLeaseTablePopulatesLeasesWithCompleteHashRange() throws Exception {
final List<Shard> shardsWithCompleteHashRange = Arrays.asList( final List<Shard> shardsWithCompleteHashRange = Arrays.asList(
ShardObjectHelper.newShard("shardId-2", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange(ShardObjectHelper.MIN_HASH_KEY, "420")), ShardObjectHelper.newShard("shardId-2", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"),
ShardObjectHelper.newShard("shardId-3", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange("421", ShardObjectHelper.MAX_HASH_KEY)) ShardObjectHelper.newHashKeyRange(ShardObjectHelper.MIN_HASH_KEY, "420")),
ShardObjectHelper.newShard("shardId-3", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"),
ShardObjectHelper.newHashKeyRange("421", ShardObjectHelper.MAX_HASH_KEY))
); );
when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(true); when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(true);

View file

@ -14,10 +14,11 @@
*/ */
package software.amazon.kinesis.leases; package software.amazon.kinesis.leases;
import java.awt.*; import java.awt.Button;
import java.awt.Dimension;
import java.awt.GridLayout;
import java.awt.event.ActionEvent; import java.awt.event.ActionEvent;
import java.awt.event.ActionListener; import java.awt.event.ActionListener;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
@ -25,7 +26,10 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import javax.swing.*; import javax.swing.BoxLayout;
import javax.swing.JFrame;
import javax.swing.JLabel;
import javax.swing.JPanel;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
@ -54,9 +58,8 @@ public class LeaseCoordinatorExerciser {
private static final long INITIAL_LEASE_TABLE_READ_CAPACITY = 10L; private static final long INITIAL_LEASE_TABLE_READ_CAPACITY = 10L;
private static final long INITIAL_LEASE_TABLE_WRITE_CAPACITY = 50L; private static final long INITIAL_LEASE_TABLE_WRITE_CAPACITY = 50L;
public static void main(String[] args) throws InterruptedException, DependencyException, InvalidStateException, public static void main(String[] args) throws DependencyException, InvalidStateException,
ProvisionedThroughputException, IOException { ProvisionedThroughputException {
int numCoordinators = 9; int numCoordinators = 9;
int numLeases = 73; int numLeases = 73;
int leaseDurationMillis = 10000; int leaseDurationMillis = 10000;

View file

@ -56,7 +56,6 @@ public class ShardObjectHelper {
private ShardObjectHelper() { private ShardObjectHelper() {
} }
/** Helper method to create a new shard object. /** Helper method to create a new shard object.
* @param shardId * @param shardId
* @param parentShardId * @param parentShardId
@ -84,7 +83,9 @@ public class ShardObjectHelper {
String adjacentParentShardId, String adjacentParentShardId,
SequenceNumberRange sequenceNumberRange, SequenceNumberRange sequenceNumberRange,
HashKeyRange hashKeyRange) { HashKeyRange hashKeyRange) {
return Shard.builder().shardId(shardId).parentShardId(parentShardId).adjacentParentShardId(adjacentParentShardId).sequenceNumberRange(sequenceNumberRange).hashKeyRange(hashKeyRange).build(); return Shard.builder().shardId(shardId).parentShardId(parentShardId)
.adjacentParentShardId(adjacentParentShardId).sequenceNumberRange(sequenceNumberRange)
.hashKeyRange(hashKeyRange).build();
} }
/** Helper method. /** Helper method.
@ -116,5 +117,4 @@ public class ShardObjectHelper {
return parentShardIds; return parentShardIds;
} }
} }

View file

@ -37,7 +37,7 @@ import static org.junit.Assert.assertThat;
@RunWith(MockitoJUnitRunner.class) @RunWith(MockitoJUnitRunner.class)
public class DynamoDBLeaseRenewerBillingModePayPerRequestIntegrationTest extends public class DynamoDBLeaseRenewerBillingModePayPerRequestIntegrationTest extends
LeaseIntegrationBillingModePayPerRequestTest { LeaseIntegrationBillingModePayPerRequestTest {
private final String TEST_METRIC = "TestOperation"; private static final String TEST_METRIC = "TestOperation";
// This test case's leases last 2 seconds // This test case's leases last 2 seconds
private static final long LEASE_DURATION_MILLIS = 2000L; private static final long LEASE_DURATION_MILLIS = 2000L;

View file

@ -36,7 +36,7 @@ import software.amazon.kinesis.metrics.NullMetricsFactory;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
@RunWith(MockitoJUnitRunner.class) @RunWith(MockitoJUnitRunner.class)
public class DynamoDBLeaseRenewerIntegrationTest extends LeaseIntegrationTest { public class DynamoDBLeaseRenewerIntegrationTest extends LeaseIntegrationTest {
private final String TEST_METRIC = "TestOperation"; private static final String TEST_METRIC = "TestOperation";
// This test case's leases last 2 seconds // This test case's leases last 2 seconds
private static final long LEASE_DURATION_MILLIS = 2000L; private static final long LEASE_DURATION_MILLIS = 2000L;

View file

@ -86,7 +86,7 @@ public class DynamoDBLeaseRenewerTest {
*/ */
Lease lease1 = newLease("1"); Lease lease1 = newLease("1");
Lease lease2 = newLease("2"); Lease lease2 = newLease("2");
leasesToRenew = Arrays.asList(lease1,lease2); leasesToRenew = Arrays.asList(lease1, lease2);
renewer.addLeasesToRenew(leasesToRenew); renewer.addLeasesToRenew(leasesToRenew);
doReturn(true).when(leaseRefresher).renewLease(lease1); doReturn(true).when(leaseRefresher).renewLease(lease1);

View file

@ -47,8 +47,8 @@ public class MetricAccumulatingQueueTest {
*/ */
@Test @Test
public void testAccumulation() { public void testAccumulation() {
Collection<Dimension> dimensionsA = Collections.singleton(dim("name","a")); Collection<Dimension> dimensionsA = Collections.singleton(dim("name", "a"));
Collection<Dimension> dimensionsB = Collections.singleton(dim("name","b")); Collection<Dimension> dimensionsB = Collections.singleton(dim("name", "b"));
String keyA = "a"; String keyA = "a";
String keyB = "b"; String keyB = "b";

View file

@ -176,7 +176,7 @@ public class FanOutRecordsPublisherTest {
} }
@Test @Test
public void InvalidEventTest() throws Exception { public void testInvalidEvent() {
FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN); FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN);
ArgumentCaptor<FanOutRecordsPublisher.RecordSubscription> captor = ArgumentCaptor ArgumentCaptor<FanOutRecordsPublisher.RecordSubscription> captor = ArgumentCaptor
@ -443,10 +443,11 @@ public class FanOutRecordsPublisherTest {
@Override public void onNext(RecordsRetrieved input) { @Override public void onNext(RecordsRetrieved input) {
receivedInput.add(input.processRecordsInput()); receivedInput.add(input.processRecordsInput());
assertEquals("" + ++lastSeenSeqNum, ((FanOutRecordsPublisher.FanoutRecordsRetrieved)input).continuationSequenceNumber()); assertEquals("" + ++lastSeenSeqNum,
((FanOutRecordsPublisher.FanoutRecordsRetrieved) input).continuationSequenceNumber());
subscription.request(1); subscription.request(1);
servicePublisher.request(1); servicePublisher.request(1);
if(receivedInput.size() == totalServicePublisherEvents) { if (receivedInput.size() == totalServicePublisherEvents) {
servicePublisherTaskCompletionLatch.countDown(); servicePublisherTaskCompletionLatch.countDown();
} }
} }
@ -549,10 +550,11 @@ public class FanOutRecordsPublisherTest {
@Override public void onNext(RecordsRetrieved input) { @Override public void onNext(RecordsRetrieved input) {
receivedInput.add(input.processRecordsInput()); receivedInput.add(input.processRecordsInput());
assertEquals("" + ++lastSeenSeqNum, ((FanOutRecordsPublisher.FanoutRecordsRetrieved)input).continuationSequenceNumber()); assertEquals("" + ++lastSeenSeqNum,
((FanOutRecordsPublisher.FanoutRecordsRetrieved) input).continuationSequenceNumber());
subscription.request(1); subscription.request(1);
servicePublisher.request(1); servicePublisher.request(1);
if(receivedInput.size() == triggerCompleteAtNthEvent) { if (receivedInput.size() == triggerCompleteAtNthEvent) {
servicePublisherTaskCompletionLatch.countDown(); servicePublisherTaskCompletionLatch.countDown();
} }
} }
@ -681,7 +683,7 @@ public class FanOutRecordsPublisherTest {
receivedInput.add(input.processRecordsInput()); receivedInput.add(input.processRecordsInput());
subscription.request(1); subscription.request(1);
servicePublisher.request(1); servicePublisher.request(1);
if(receivedInput.size() == triggerCompleteAtNthEvent) { if (receivedInput.size() == triggerCompleteAtNthEvent) {
servicePublisherTaskCompletionLatch.countDown(); servicePublisherTaskCompletionLatch.countDown();
} }
} }
@ -783,10 +785,11 @@ public class FanOutRecordsPublisherTest {
@Override public void onNext(RecordsRetrieved input) { @Override public void onNext(RecordsRetrieved input) {
receivedInput.add(input.processRecordsInput()); receivedInput.add(input.processRecordsInput());
assertEquals("" + ++lastSeenSeqNum, ((FanOutRecordsPublisher.FanoutRecordsRetrieved)input).continuationSequenceNumber()); assertEquals("" + ++lastSeenSeqNum,
((FanOutRecordsPublisher.FanoutRecordsRetrieved) input).continuationSequenceNumber());
subscription.request(1); subscription.request(1);
servicePublisher.request(1); servicePublisher.request(1);
if(receivedInput.size() == triggerErrorAtNthEvent) { if (receivedInput.size() == triggerErrorAtNthEvent) {
servicePublisherTaskCompletionLatch.countDown(); servicePublisherTaskCompletionLatch.countDown();
} }
} }
@ -879,10 +882,11 @@ public class FanOutRecordsPublisherTest {
@Override public void onNext(RecordsRetrieved input) { @Override public void onNext(RecordsRetrieved input) {
receivedInput.add(input.processRecordsInput()); receivedInput.add(input.processRecordsInput());
assertEquals("" + ++lastSeenSeqNum, ((FanOutRecordsPublisher.FanoutRecordsRetrieved)input).continuationSequenceNumber()); assertEquals("" + ++lastSeenSeqNum,
((FanOutRecordsPublisher.FanoutRecordsRetrieved) input).continuationSequenceNumber());
subscription.request(1); subscription.request(1);
servicePublisher.request(1); servicePublisher.request(1);
if(receivedInput.size() == totalServicePublisherEvents) { if (receivedInput.size() == totalServicePublisherEvents) {
servicePublisherTaskCompletionLatch.countDown(); servicePublisherTaskCompletionLatch.countDown();
} }
} }
@ -973,7 +977,8 @@ public class FanOutRecordsPublisherTest {
@Override public void onNext(RecordsRetrieved input) { @Override public void onNext(RecordsRetrieved input) {
receivedInput.add(input.processRecordsInput()); receivedInput.add(input.processRecordsInput());
assertEquals("" + ++lastSeenSeqNum, ((FanOutRecordsPublisher.FanoutRecordsRetrieved)input).continuationSequenceNumber()); assertEquals("" + ++lastSeenSeqNum,
((FanOutRecordsPublisher.FanoutRecordsRetrieved) input).continuationSequenceNumber());
subscription.request(1); subscription.request(1);
servicePublisher.request(1); servicePublisher.request(1);
} }
@ -1328,7 +1333,7 @@ public class FanOutRecordsPublisherTest {
fanOutRecordsPublisher fanOutRecordsPublisher
.evictAckedEventAndScheduleNextEvent(() -> recordsRetrieved.batchUniqueIdentifier()); .evictAckedEventAndScheduleNextEvent(() -> recordsRetrieved.batchUniqueIdentifier());
// Send stale event periodically // Send stale event periodically
if(totalRecordsRetrieved[0] % 10 == 0) { if (totalRecordsRetrieved[0] % 10 == 0) {
fanOutRecordsPublisher.evictAckedEventAndScheduleNextEvent( fanOutRecordsPublisher.evictAckedEventAndScheduleNextEvent(
() -> new BatchUniqueIdentifier("some_uuid_str", "some_old_flow")); () -> new BatchUniqueIdentifier("some_uuid_str", "some_old_flow"));
} }
@ -1368,7 +1373,7 @@ public class FanOutRecordsPublisherTest {
int count = 0; int count = 0;
// Now that we allowed upto 10 elements queued up, send a pair of good and stale ack to verify records // Now that we allowed upto 10 elements queued up, send a pair of good and stale ack to verify records
// delivered as expected. // delivered as expected.
while(count++ < 10 && (batchUniqueIdentifierQueued = ackQueue.take()) != null) { while (count++ < 10 && (batchUniqueIdentifierQueued = ackQueue.take()) != null) {
final BatchUniqueIdentifier batchUniqueIdentifierFinal = batchUniqueIdentifierQueued; final BatchUniqueIdentifier batchUniqueIdentifierFinal = batchUniqueIdentifierQueued;
fanOutRecordsPublisher fanOutRecordsPublisher
.evictAckedEventAndScheduleNextEvent(() -> batchUniqueIdentifierFinal); .evictAckedEventAndScheduleNextEvent(() -> batchUniqueIdentifierFinal);
@ -1403,7 +1408,7 @@ public class FanOutRecordsPublisherTest {
int count = 0; int count = 0;
// Now that we allowed upto 10 elements queued up, send a pair of good and stale ack to verify records // Now that we allowed upto 10 elements queued up, send a pair of good and stale ack to verify records
// delivered as expected. // delivered as expected.
while(count++ < 2 && (batchUniqueIdentifierQueued = ackQueue.poll(1000, TimeUnit.MILLISECONDS)) != null) { while (count++ < 2 && (batchUniqueIdentifierQueued = ackQueue.poll(1000, TimeUnit.MILLISECONDS)) != null) {
final BatchUniqueIdentifier batchUniqueIdentifierFinal = batchUniqueIdentifierQueued; final BatchUniqueIdentifier batchUniqueIdentifierFinal = batchUniqueIdentifierQueued;
fanOutRecordsPublisher.evictAckedEventAndScheduleNextEvent( fanOutRecordsPublisher.evictAckedEventAndScheduleNextEvent(
() -> new BatchUniqueIdentifier("some_uuid_str", batchUniqueIdentifierFinal.getFlowIdentifier())); () -> new BatchUniqueIdentifier("some_uuid_str", batchUniqueIdentifierFinal.getFlowIdentifier()));
@ -1457,7 +1462,8 @@ public class FanOutRecordsPublisherTest {
flowCaptor.getValue().exceptionOccurred(exception); flowCaptor.getValue().exceptionOccurred(exception);
Optional<OnErrorEvent> onErrorEvent = subscriber.events.stream().filter(e -> e instanceof OnErrorEvent).map(e -> (OnErrorEvent)e).findFirst(); Optional<OnErrorEvent> onErrorEvent = subscriber.events.stream().filter(e -> e instanceof OnErrorEvent)
.map(e -> (OnErrorEvent) e).findFirst();
assertThat(onErrorEvent, equalTo(Optional.of(new OnErrorEvent(exception)))); assertThat(onErrorEvent, equalTo(Optional.of(new OnErrorEvent(exception))));
assertThat(acquireTimeoutLogged.get(), equalTo(true)); assertThat(acquireTimeoutLogged.get(), equalTo(true));
@ -1587,8 +1593,8 @@ public class FanOutRecordsPublisherTest {
public void run() { public void run() {
for (int i = 1; i <= numOfTimes; ) { for (int i = 1; i <= numOfTimes; ) {
demandNotifier.acquireUninterruptibly(); demandNotifier.acquireUninterruptibly();
if(i == sendCompletionAt) { if (i == sendCompletionAt) {
if(shardEndAction != null) { if (shardEndAction != null) {
shardEndAction.accept(i++); shardEndAction.accept(i++);
} else { } else {
action.accept(i++); action.accept(i++);
@ -1596,7 +1602,7 @@ public class FanOutRecordsPublisherTest {
completeAction.run(); completeAction.run();
break; break;
} }
if(i == sendErrorAt) { if (i == sendErrorAt) {
action.accept(i++); action.accept(i++);
errorAction.run(); errorAction.run();
break; break;

View file

@ -331,7 +331,7 @@ public class KinesisDataFetcherTest {
private CompletableFuture<GetRecordsResponse> makeGetRecordsResponse(String nextIterator, List<Record> records) { private CompletableFuture<GetRecordsResponse> makeGetRecordsResponse(String nextIterator, List<Record> records) {
List<ChildShard> childShards = new ArrayList<>(); List<ChildShard> childShards = new ArrayList<>();
if(nextIterator == null) { if (nextIterator == null) {
childShards = createChildShards(); childShards = createChildShards();
} }
return CompletableFuture.completedFuture(GetRecordsResponse.builder().nextShardIterator(nextIterator) return CompletableFuture.completedFuture(GetRecordsResponse.builder().nextShardIterator(nextIterator)

View file

@ -22,7 +22,6 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
@ -277,7 +276,8 @@ public class PrefetchRecordsPublisherIntegrationTest {
@Override @Override
public DataFetcherResult getRecords() { public DataFetcherResult getRecords() {
GetRecordsResponse getRecordsResult = GetRecordsResponse.builder().records(new ArrayList<>(records)).nextShardIterator(nextShardIterator).millisBehindLatest(1000L).build(); GetRecordsResponse getRecordsResult = GetRecordsResponse.builder().records(new ArrayList<>(records))
.nextShardIterator(nextShardIterator).millisBehindLatest(1000L).build();
return new AdvancingResult(getRecordsResult); return new AdvancingResult(getRecordsResult);
} }

View file

@ -327,7 +327,7 @@ public class PrefetchRecordsPublisherTest {
// TODO: fix this verification // TODO: fix this verification
// verify(getRecordsRetrievalStrategy, times(callRate)).getRecords(MAX_RECORDS_PER_CALL); // verify(getRecordsRetrievalStrategy, times(callRate)).getRecords(MAX_RECORDS_PER_CALL);
// assertEquals(spyQueue.size(), callRate); // assertEquals(spyQueue.size(), callRate);
assertTrue("Call Rate is "+callRate,callRate < MAX_SIZE); assertTrue("Call Rate is " + callRate, callRate < MAX_SIZE);
} }
@Test @Test
@ -422,8 +422,10 @@ public class PrefetchRecordsPublisherTest {
@Test @Test
public void testRetryableRetrievalExceptionContinues() { public void testRetryableRetrievalExceptionContinues() {
GetRecordsResponse response = GetRecordsResponse.builder().millisBehindLatest(100L).records(Collections.emptyList()).nextShardIterator(NEXT_SHARD_ITERATOR).build(); GetRecordsResponse response = GetRecordsResponse.builder().millisBehindLatest(100L)
when(getRecordsRetrievalStrategy.getRecords(anyInt())).thenThrow(new RetryableRetrievalException("Timeout", new TimeoutException("Timeout"))).thenReturn(response); .records(Collections.emptyList()).nextShardIterator(NEXT_SHARD_ITERATOR).build();
when(getRecordsRetrievalStrategy.getRecords(anyInt()))
.thenThrow(new RetryableRetrievalException("Timeout", new TimeoutException("Timeout"))).thenReturn(response);
getRecordsCache.start(sequenceNumber, initialPosition); getRecordsCache.start(sequenceNumber, initialPosition);
@ -638,7 +640,7 @@ public class PrefetchRecordsPublisherTest {
verify(getRecordsRetrievalStrategy, atLeast(2)).getRecords(anyInt()); verify(getRecordsRetrievalStrategy, atLeast(2)).getRecords(anyInt());
while(getRecordsCache.getPublisherSession().prefetchRecordsQueue().remainingCapacity() > 0) { while (getRecordsCache.getPublisherSession().prefetchRecordsQueue().remainingCapacity() > 0) {
Thread.yield(); Thread.yield();
} }
@ -697,7 +699,7 @@ public class PrefetchRecordsPublisherTest {
public void resetIteratorTo(String nextIterator) { public void resetIteratorTo(String nextIterator) {
Iterator<GetRecordsResponse> newIterator = responses.iterator(); Iterator<GetRecordsResponse> newIterator = responses.iterator();
while(newIterator.hasNext()) { while (newIterator.hasNext()) {
GetRecordsResponse current = newIterator.next(); GetRecordsResponse current = newIterator.next();
if (StringUtils.equals(nextIterator, current.nextShardIterator())) { if (StringUtils.equals(nextIterator, current.nextShardIterator())) {
if (!newIterator.hasNext()) { if (!newIterator.hasNext()) {
@ -725,7 +727,7 @@ public class PrefetchRecordsPublisherTest {
private static final int LOSS_EVERY_NTH_RECORD = 50; private static final int LOSS_EVERY_NTH_RECORD = 50;
private static int recordCounter = 0; private static int recordCounter = 0;
private static final ScheduledExecutorService consumerHealthChecker = Executors.newScheduledThreadPool(1); private static final ScheduledExecutorService CONSUMER_HEALTH_CHECKER = Executors.newScheduledThreadPool(1);
public LossyNotificationSubscriber(Subscriber<RecordsRetrieved> delegate, RecordsPublisher recordsPublisher) { public LossyNotificationSubscriber(Subscriber<RecordsRetrieved> delegate, RecordsPublisher recordsPublisher) {
super(delegate, recordsPublisher); super(delegate, recordsPublisher);
@ -738,7 +740,7 @@ public class PrefetchRecordsPublisherTest {
getDelegateSubscriber().onNext(recordsRetrieved); getDelegateSubscriber().onNext(recordsRetrieved);
} else { } else {
log.info("Record Loss Triggered"); log.info("Record Loss Triggered");
consumerHealthChecker.schedule(() -> { CONSUMER_HEALTH_CHECKER.schedule(() -> {
getRecordsPublisher().restartFrom(recordsRetrieved); getRecordsPublisher().restartFrom(recordsRetrieved);
Flowable.fromPublisher(getRecordsPublisher()).subscribeOn(Schedulers.computation()) Flowable.fromPublisher(getRecordsPublisher()).subscribeOn(Schedulers.computation())
.observeOn(Schedulers.computation(), true, 8).subscribe(this); .observeOn(Schedulers.computation(), true, 8).subscribe(this);

View file

@ -21,7 +21,7 @@ public class BlockingUtils {
public static <Records> Records blockUntilRecordsAvailable(Supplier<Records> recordsSupplier, long timeoutMillis) { public static <Records> Records blockUntilRecordsAvailable(Supplier<Records> recordsSupplier, long timeoutMillis) {
Records recordsRetrieved; Records recordsRetrieved;
while((recordsRetrieved = recordsSupplier.get()) == null && timeoutMillis > 0 ) { while ((recordsRetrieved = recordsSupplier.get()) == null && timeoutMillis > 0 ) {
try { try {
Thread.sleep(100); Thread.sleep(100);
} catch (InterruptedException e) { } catch (InterruptedException e) {
@ -29,7 +29,7 @@ public class BlockingUtils {
} }
timeoutMillis -= 100; timeoutMillis -= 100;
} }
if(recordsRetrieved != null) { if (recordsRetrieved != null) {
return recordsRetrieved; return recordsRetrieved;
} else { } else {
throw new RuntimeException("No records found"); throw new RuntimeException("No records found");
@ -37,7 +37,7 @@ public class BlockingUtils {
} }
public static boolean blockUntilConditionSatisfied(Supplier<Boolean> conditionSupplier, long timeoutMillis) { public static boolean blockUntilConditionSatisfied(Supplier<Boolean> conditionSupplier, long timeoutMillis) {
while(!conditionSupplier.get() && timeoutMillis > 0 ) { while (!conditionSupplier.get() && timeoutMillis > 0 ) {
try { try {
Thread.sleep(100); Thread.sleep(100);
} catch (InterruptedException e) { } catch (InterruptedException e) {

View file

@ -12,7 +12,7 @@ public class SubscribeToShardRequestMatcher extends ArgumentMatcher<SubscribeToS
} }
public boolean matches(Object rightObject) { public boolean matches(Object rightObject) {
SubscribeToShardRequest right = (SubscribeToShardRequest)rightObject; SubscribeToShardRequest right = (SubscribeToShardRequest) rightObject;
return left.shardId().equals(right.shardId()) && return left.shardId().equals(right.shardId()) &&
left.consumerARN().equals(right.consumerARN()) && left.consumerARN().equals(right.consumerARN()) &&
left.startingPosition().equals(right.startingPosition()); left.startingPosition().equals(right.startingPosition());

View file

@ -27,7 +27,10 @@
<module name="InvalidJavadocPosition"/> <module name="InvalidJavadocPosition"/>
<module name="LocalVariableName"/> <module name="LocalVariableName"/>
<module name="MemberName"/> <module name="MemberName"/>
<module name="MethodName"/> <module name="MethodName">
<!-- Method names must start with a lowercase letter. -->
<property name="format" value="^[a-z]\w*$"/>
</module>
<module name="NeedBraces"/> <module name="NeedBraces"/>
<module name="OneStatementPerLine"/> <module name="OneStatementPerLine"/>
<module name="OneTopLevelClass"/> <module name="OneTopLevelClass"/>

View file

@ -81,6 +81,7 @@
<configLocation>checkstyle/checkstyle.xml</configLocation> <configLocation>checkstyle/checkstyle.xml</configLocation>
<consoleOutput>true</consoleOutput> <consoleOutput>true</consoleOutput>
<failOnViolation>true</failOnViolation> <failOnViolation>true</failOnViolation>
<includeTestSourceDirectory>true</includeTestSourceDirectory>
<suppressionsLocation>checkstyle/checkstyle-suppressions.xml</suppressionsLocation> <suppressionsLocation>checkstyle/checkstyle-suppressions.xml</suppressionsLocation>
</configuration> </configuration>
<executions> <executions>