From 74d8f4b780a53af09830acc957248902ede44248 Mon Sep 17 00:00:00 2001 From: stair <123031771+stair-aws@users.noreply.github.com> Date: Mon, 26 Jun 2023 15:25:10 -0400 Subject: [PATCH] Enabled Checkstyle validation of test resources. (#1150) No functional change. --- .../kinesis/multilang/MessageReaderTest.java | 21 ++++---- .../kinesis/multilang/MessageWriterTest.java | 14 +++--- .../multilang/MultiLangDaemonConfigTest.java | 9 ++-- .../kinesis/multilang/ReadSTDERRTaskTest.java | 8 +-- .../StreamingShardRecordProcessorTest.java | 50 ++++++++----------- .../kinesis/checkpoint/CheckpointerTest.java | 10 ++-- .../checkpoint/InMemoryCheckpointer.java | 14 +++--- ...dShardRecordProcessorCheckpointerTest.java | 34 ++++++------- ...sticShuffleShardSyncLeaderDeciderTest.java | 2 +- .../coordinator/DiagnosticEventsTest.java | 8 +-- .../PeriodicShardSyncManagerTest.java | 14 +++--- .../kinesis/coordinator/SchedulerTest.java | 22 ++++---- .../kinesis/coordinator/WorkerTest.java | 21 ++++---- .../leases/HierarchicalShardSyncerTest.java | 22 +++++--- .../leases/LeaseCoordinatorExerciser.java | 15 +++--- .../kinesis/leases/ShardObjectHelper.java | 6 +-- ...llingModePayPerRequestIntegrationTest.java | 2 +- .../DynamoDBLeaseRenewerIntegrationTest.java | 2 +- .../dynamodb/DynamoDBLeaseRenewerTest.java | 2 +- .../metrics/MetricAccumulatingQueueTest.java | 4 +- .../fanout/FanOutRecordsPublisherTest.java | 42 +++++++++------- .../polling/KinesisDataFetcherTest.java | 2 +- ...efetchRecordsPublisherIntegrationTest.java | 4 +- .../polling/PrefetchRecordsPublisherTest.java | 16 +++--- .../amazon/kinesis/utils/BlockingUtils.java | 6 +-- .../utils/SubscribeToShardRequestMatcher.java | 2 +- checkstyle/checkstyle.xml | 5 +- pom.xml | 1 + 28 files changed, 183 insertions(+), 175 deletions(-) diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MessageReaderTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MessageReaderTest.java index 14ac357c..f6fab4c1 100644 --- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MessageReaderTest.java +++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MessageReaderTest.java @@ -34,9 +34,9 @@ import com.fasterxml.jackson.databind.ObjectMapper; public class MessageReaderTest { - private static final String shardId = "shard-123"; + private static final String SHARD_ID = "shard-123"; - /* + /** * This line is based on the definition of the protocol for communication between the KCL record processor and * the client's process. */ @@ -44,7 +44,7 @@ public class MessageReaderTest { return String.format("{\"action\":\"checkpoint\", \"checkpoint\":\"%s\"}", sequenceNumber); } - /* + /** * This line is based on the definition of the protocol for communication between the KCL record processor and * the client's process. */ @@ -79,10 +79,9 @@ public class MessageReaderTest { String[] responseFors = new String[] { "initialize", "processRecords", "processRecords", "shutdown" }; InputStream stream = buildInputStreamOfGoodInput(sequenceNumbers, responseFors); MessageReader reader = - new MessageReader().initialize(stream, shardId, new ObjectMapper(), Executors.newCachedThreadPool()); + new MessageReader().initialize(stream, SHARD_ID, new ObjectMapper(), Executors.newCachedThreadPool()); for (String responseFor : responseFors) { - StatusMessage statusMessage = null; try { Message message = reader.getNextMessageFromSTDOUT().get(); if (message instanceof StatusMessage) { @@ -102,14 +101,14 @@ public class MessageReaderTest { InputStream stream = buildInputStreamOfGoodInput(sequenceNumbers, responseFors); MessageReader reader = - new MessageReader().initialize(stream, shardId, new ObjectMapper(), Executors.newCachedThreadPool()); + new MessageReader().initialize(stream, SHARD_ID, new ObjectMapper(), Executors.newCachedThreadPool()); Future drainFuture = reader.drainSTDOUT(); Boolean drainResult = drainFuture.get(); Assert.assertNotNull(drainResult); Assert.assertTrue(drainResult); } - /* + /** * readValue should fail safely and just continue looping */ @Test @@ -134,7 +133,7 @@ public class MessageReaderTest { } MessageReader reader = - new MessageReader().initialize(bufferReader, shardId, new ObjectMapper(), + new MessageReader().initialize(bufferReader, SHARD_ID, new ObjectMapper(), Executors.newCachedThreadPool()); try { @@ -149,7 +148,7 @@ public class MessageReaderTest { public void messageReaderBuilderTest() { InputStream stream = new ByteArrayInputStream("".getBytes()); MessageReader reader = - new MessageReader().initialize(stream, shardId, new ObjectMapper(), Executors.newCachedThreadPool()); + new MessageReader().initialize(stream, SHARD_ID, new ObjectMapper(), Executors.newCachedThreadPool()); Assert.assertNotNull(reader); } @@ -158,7 +157,7 @@ public class MessageReaderTest { BufferedReader input = Mockito.mock(BufferedReader.class); Mockito.doThrow(IOException.class).when(input).readLine(); MessageReader reader = - new MessageReader().initialize(input, shardId, new ObjectMapper(), Executors.newCachedThreadPool()); + new MessageReader().initialize(input, SHARD_ID, new ObjectMapper(), Executors.newCachedThreadPool()); Future readTask = reader.getNextMessageFromSTDOUT(); @@ -176,7 +175,7 @@ public class MessageReaderTest { public void noMoreMessagesTest() throws InterruptedException { InputStream stream = new ByteArrayInputStream("".getBytes()); MessageReader reader = - new MessageReader().initialize(stream, shardId, new ObjectMapper(), Executors.newCachedThreadPool()); + new MessageReader().initialize(stream, SHARD_ID, new ObjectMapper(), Executors.newCachedThreadPool()); Future future = reader.getNextMessageFromSTDOUT(); try { diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MessageWriterTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MessageWriterTest.java index eaf6be7b..c997c193 100644 --- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MessageWriterTest.java +++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MessageWriterTest.java @@ -44,7 +44,7 @@ import static org.mockito.Mockito.verify; public class MessageWriterTest { - private static final String shardId = "shard-123"; + private static final String SHARD_ID = "shard-123"; MessageWriter messageWriter; OutputStream stream; @@ -57,7 +57,7 @@ public class MessageWriterTest { public void setup() { stream = Mockito.mock(OutputStream.class); messageWriter = - new MessageWriter().initialize(stream, shardId, new ObjectMapper(), Executors.newCachedThreadPool()); + new MessageWriter().initialize(stream, SHARD_ID, new ObjectMapper(), Executors.newCachedThreadPool()); } /* @@ -83,7 +83,7 @@ public class MessageWriterTest { @Test public void writeInitializeMessageTest() throws IOException, InterruptedException, ExecutionException { - Future future = this.messageWriter.writeInitializeMessage(InitializationInput.builder().shardId(shardId).build()); + Future future = this.messageWriter.writeInitializeMessage(InitializationInput.builder().shardId(SHARD_ID).build()); future.get(); verify(this.stream, Mockito.atLeastOnce()).write(Mockito.any(byte[].class), Mockito.anyInt(), Mockito.anyInt()); @@ -128,20 +128,20 @@ public class MessageWriterTest { @Test public void streamIOExceptionTest() throws IOException, InterruptedException, ExecutionException { Mockito.doThrow(IOException.class).when(stream).flush(); - Future initializeTask = this.messageWriter.writeInitializeMessage(InitializationInput.builder().shardId(shardId).build()); + Future initializeTask = this.messageWriter.writeInitializeMessage(InitializationInput.builder().shardId(SHARD_ID).build()); Boolean result = initializeTask.get(); Assert.assertNotNull(result); Assert.assertFalse(result); } @Test - public void objectMapperFails() throws JsonProcessingException, InterruptedException, ExecutionException { + public void objectMapperFails() throws JsonProcessingException { thrown.expect(RuntimeException.class); thrown.expectMessage("Encountered I/O error while writing LeaseLostMessage action to subprocess"); ObjectMapper mapper = Mockito.mock(ObjectMapper.class); Mockito.doThrow(JsonProcessingException.class).when(mapper).writeValueAsString(Mockito.any(Message.class)); - messageWriter = new MessageWriter().initialize(stream, shardId, mapper, Executors.newCachedThreadPool()); + messageWriter = new MessageWriter().initialize(stream, SHARD_ID, mapper, Executors.newCachedThreadPool()); messageWriter.writeLeaseLossMessage(LeaseLostInput.builder().build()); } @@ -154,7 +154,7 @@ public class MessageWriterTest { Assert.assertFalse(this.messageWriter.isOpen()); try { // Any message should fail - this.messageWriter.writeInitializeMessage(InitializationInput.builder().shardId(shardId).build()); + this.messageWriter.writeInitializeMessage(InitializationInput.builder().shardId(SHARD_ID).build()); Assert.fail("MessageWriter should be closed and unable to write."); } catch (IllegalStateException e) { // This should happen. diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MultiLangDaemonConfigTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MultiLangDaemonConfigTest.java index c6be1157..c5740a2f 100644 --- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MultiLangDaemonConfigTest.java +++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MultiLangDaemonConfigTest.java @@ -164,7 +164,7 @@ public class MultiLangDaemonConfigTest { /** * Verify the daemonConfig properties are what we expect them to be. - * @param deamonConfig + * * @param expectedStreamName */ private void assertConfigurationsMatch(String expectedStreamName, String expectedStreamArn) { @@ -184,16 +184,15 @@ public class MultiLangDaemonConfigTest { @Test public void testPropertyValidation() { - String PROPERTIES_NO_EXECUTABLE_NAME = "applicationName = testApp \n" + "streamName = fakeStream \n" + String propertiesNoExecutableName = "applicationName = testApp \n" + "streamName = fakeStream \n" + "AWSCredentialsProvider = DefaultAWSCredentialsProviderChain\n" + "processingLanguage = malbolge"; ClassLoader classLoader = Mockito.mock(ClassLoader.class); - Mockito.doReturn(new ByteArrayInputStream(PROPERTIES_NO_EXECUTABLE_NAME.getBytes())).when(classLoader) + Mockito.doReturn(new ByteArrayInputStream(propertiesNoExecutableName.getBytes())).when(classLoader) .getResourceAsStream(FILENAME); - MultiLangDaemonConfig config; try { - config = new MultiLangDaemonConfig(FILENAME, classLoader, configurator); + new MultiLangDaemonConfig(FILENAME, classLoader, configurator); Assert.fail("Construction of the config should have failed due to property validation failing."); } catch (IllegalArgumentException e) { // Good diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/ReadSTDERRTaskTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/ReadSTDERRTaskTest.java index bffd431d..45ff3052 100644 --- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/ReadSTDERRTaskTest.java +++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/ReadSTDERRTaskTest.java @@ -30,7 +30,7 @@ import org.mockito.Mockito; public class ReadSTDERRTaskTest { - private static final String shardId = "shard-123"; + private static final String SHARD_ID = "shard-123"; private BufferedReader mockBufferReader; @Before @@ -43,7 +43,7 @@ public class ReadSTDERRTaskTest { String errorMessages = "OMG\nThis is test message\n blah blah blah \n"; InputStream stream = new ByteArrayInputStream(errorMessages.getBytes()); - LineReaderTask reader = new DrainChildSTDERRTask().initialize(stream, shardId, ""); + LineReaderTask reader = new DrainChildSTDERRTask().initialize(stream, SHARD_ID, ""); Assert.assertNotNull(reader); } @@ -52,7 +52,7 @@ public class ReadSTDERRTaskTest { String errorMessages = "OMG\nThis is test message\n blah blah blah \n"; BufferedReader bufferReader = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(errorMessages.getBytes()))); - LineReaderTask errorReader = new DrainChildSTDERRTask().initialize(bufferReader, shardId, ""); + LineReaderTask errorReader = new DrainChildSTDERRTask().initialize(bufferReader, SHARD_ID, ""); Assert.assertNotNull(errorReader); Boolean result = errorReader.call(); @@ -65,7 +65,7 @@ public class ReadSTDERRTaskTest { } catch (IOException e) { Assert.fail("Not supposed to get an exception when we're just building our mock."); } - LineReaderTask errorReader = new DrainChildSTDERRTask().initialize(mockBufferReader, shardId, ""); + LineReaderTask errorReader = new DrainChildSTDERRTask().initialize(mockBufferReader, SHARD_ID, ""); Assert.assertNotNull(errorReader); Future result = Executors.newCachedThreadPool().submit(errorReader); Boolean finishedCleanly = null; diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/StreamingShardRecordProcessorTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/StreamingShardRecordProcessorTest.java index e3368e07..caa925b0 100644 --- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/StreamingShardRecordProcessorTest.java +++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/StreamingShardRecordProcessorTest.java @@ -23,7 +23,6 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.Collections; @@ -46,9 +45,7 @@ import org.mockito.stubbing.Answer; import com.fasterxml.jackson.databind.ObjectMapper; import software.amazon.awssdk.services.kinesis.model.Record; -import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.KinesisClientLibDependencyException; -import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.exceptions.ThrottlingException; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; @@ -67,7 +64,7 @@ import software.amazon.kinesis.retrieval.KinesisClientRecord; @RunWith(MockitoJUnitRunner.class) public class StreamingShardRecordProcessorTest { - private static final String shardId = "shard-123"; + private static final String SHARD_ID = "shard-123"; private int systemExitCount = 0; @@ -79,77 +76,73 @@ public class StreamingShardRecordProcessorTest { private RecordProcessorCheckpointer unimplementedCheckpointer = new RecordProcessorCheckpointer() { @Override - public void checkpoint() throws KinesisClientLibDependencyException, InvalidStateException, - ThrottlingException, ShutdownException { + public void checkpoint() throws KinesisClientLibDependencyException, ThrottlingException { throw new UnsupportedOperationException(); } @Override public void checkpoint(String sequenceNumber) throws KinesisClientLibDependencyException, - InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException { + ThrottlingException, IllegalArgumentException { throw new UnsupportedOperationException(); } @Override public void checkpoint(Record record) - throws KinesisClientLibDependencyException, - InvalidStateException, ThrottlingException, ShutdownException { + throws KinesisClientLibDependencyException, ThrottlingException { throw new UnsupportedOperationException(); } @Override public void checkpoint(String sequenceNumber, long subSequenceNumber) - throws KinesisClientLibDependencyException, - InvalidStateException, ThrottlingException, ShutdownException, - IllegalArgumentException { + throws KinesisClientLibDependencyException, ThrottlingException, IllegalArgumentException { throw new UnsupportedOperationException(); } @Override public PreparedCheckpointer prepareCheckpoint() - throws KinesisClientLibDependencyException, - InvalidStateException, ThrottlingException, ShutdownException { + throws KinesisClientLibDependencyException, ThrottlingException { throw new UnsupportedOperationException(); } @Override - public PreparedCheckpointer prepareCheckpoint(byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException { + public PreparedCheckpointer prepareCheckpoint(byte[] applicationState) + throws KinesisClientLibDependencyException, ThrottlingException { throw new UnsupportedOperationException(); } @Override public PreparedCheckpointer prepareCheckpoint(Record record) - throws KinesisClientLibDependencyException, - InvalidStateException, ThrottlingException, ShutdownException { + throws KinesisClientLibDependencyException, ThrottlingException { throw new UnsupportedOperationException(); } @Override - public PreparedCheckpointer prepareCheckpoint(Record record, byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException { + public PreparedCheckpointer prepareCheckpoint(Record record, byte[] applicationState) + throws KinesisClientLibDependencyException, ThrottlingException { throw new UnsupportedOperationException(); } @Override public PreparedCheckpointer prepareCheckpoint(String sequenceNumber) - throws KinesisClientLibDependencyException, - InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException { + throws KinesisClientLibDependencyException, ThrottlingException, IllegalArgumentException { throw new UnsupportedOperationException(); } @Override - public PreparedCheckpointer prepareCheckpoint(String sequenceNumber, byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException { + public PreparedCheckpointer prepareCheckpoint(String sequenceNumber, byte[] applicationState) + throws KinesisClientLibDependencyException, ThrottlingException, IllegalArgumentException { return null; } @Override public PreparedCheckpointer prepareCheckpoint(String sequenceNumber, long subSequenceNumber) - throws KinesisClientLibDependencyException, - InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException { + throws KinesisClientLibDependencyException, ThrottlingException, IllegalArgumentException { throw new UnsupportedOperationException(); } @Override - public PreparedCheckpointer prepareCheckpoint(String sequenceNumber, long subSequenceNumber, byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException { + public PreparedCheckpointer prepareCheckpoint(String sequenceNumber, long subSequenceNumber, byte[] applicationState) + throws KinesisClientLibDependencyException, ThrottlingException, IllegalArgumentException { throw new UnsupportedOperationException(); } @@ -171,7 +164,7 @@ public class StreamingShardRecordProcessorTest { private MultiLangDaemonConfiguration configuration; @Before - public void prepare() throws IOException, InterruptedException, ExecutionException { + public void prepare() throws InterruptedException, ExecutionException { // Fake command systemExitCount = 0; @@ -230,7 +223,7 @@ public class StreamingShardRecordProcessorTest { List testRecords = Collections.emptyList(); - recordProcessor.initialize(InitializationInput.builder().shardId(shardId).build()); + recordProcessor.initialize(InitializationInput.builder().shardId(SHARD_ID).build()); recordProcessor.processRecords(ProcessRecordsInput.builder().records(testRecords) .checkpointer(unimplementedCheckpointer).build()); recordProcessor.processRecords(ProcessRecordsInput.builder().records(testRecords) @@ -240,7 +233,6 @@ public class StreamingShardRecordProcessorTest { @Test public void processorPhasesTest() throws InterruptedException, ExecutionException { - Answer answer = new Answer() { StatusMessage[] answers = new StatusMessage[] { new StatusMessage(InitializeMessage.ACTION), @@ -263,7 +255,7 @@ public class StreamingShardRecordProcessorTest { verify(messageWriter) .writeInitializeMessage(argThat(Matchers.withInit( - InitializationInput.builder().shardId(shardId).build()))); + InitializationInput.builder().shardId(SHARD_ID).build()))); verify(messageWriter, times(2)).writeProcessRecordsMessage(any(ProcessRecordsInput.class)); verify(messageWriter).writeLeaseLossMessage(any(LeaseLostInput.class)); } @@ -295,7 +287,7 @@ public class StreamingShardRecordProcessorTest { phases(answer); verify(messageWriter).writeInitializeMessage(argThat(Matchers.withInit(InitializationInput.builder() - .shardId(shardId).build()))); + .shardId(SHARD_ID).build()))); verify(messageWriter, times(2)).writeProcessRecordsMessage(any(ProcessRecordsInput.class)); verify(messageWriter, never()).writeLeaseLossMessage(any(LeaseLostInput.class)); Assert.assertEquals(1, systemExitCount); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/CheckpointerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/CheckpointerTest.java index b823c8e3..eb341238 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/CheckpointerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/CheckpointerTest.java @@ -37,9 +37,9 @@ public class CheckpointerTest { @Test public final void testInitialSetCheckpoint() throws Exception { - String sequenceNumber = "1"; + String sequenceNumber = "1"; String shardId = "myShardId"; - ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber(sequenceNumber); + ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber(sequenceNumber); checkpoint.setCheckpoint(shardId, new ExtendedSequenceNumber(sequenceNumber), testConcurrencyToken); ExtendedSequenceNumber registeredCheckpoint = checkpoint.getCheckpoint(shardId); Assert.assertEquals(extendedSequenceNumber, registeredCheckpoint); @@ -49,8 +49,8 @@ public class CheckpointerTest { public final void testAdvancingSetCheckpoint() throws Exception { String shardId = "myShardId"; for (Integer i = 0; i < 10; i++) { - String sequenceNumber = i.toString(); - ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber(sequenceNumber); + String sequenceNumber = i.toString(); + ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber(sequenceNumber); checkpoint.setCheckpoint(shardId, new ExtendedSequenceNumber(sequenceNumber), testConcurrencyToken); ExtendedSequenceNumber registeredCheckpoint = checkpoint.getCheckpoint(shardId); Assert.assertEquals(extendedSequenceNumber, registeredCheckpoint); @@ -67,7 +67,7 @@ public class CheckpointerTest { String checkpointValue = "12345"; String shardId = "testShardId-1"; String concurrencyToken = "token-1"; - ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber(checkpointValue); + ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber(checkpointValue); checkpoint.setCheckpoint(shardId, new ExtendedSequenceNumber(checkpointValue), concurrencyToken); Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpoint(shardId)); Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpointObject(shardId).checkpoint()); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/InMemoryCheckpointer.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/InMemoryCheckpointer.java index 8f6e165d..a2d83568 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/InMemoryCheckpointer.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/InMemoryCheckpointer.java @@ -39,8 +39,7 @@ public class InMemoryCheckpointer implements Checkpointer { * {@inheritDoc} */ @Override - public void setCheckpoint(String leaseKey, ExtendedSequenceNumber checkpointValue, String concurrencyToken) - throws KinesisClientLibException { + public void setCheckpoint(String leaseKey, ExtendedSequenceNumber checkpointValue, String concurrencyToken) { checkpoints.put(leaseKey, checkpointValue); flushpoints.put(leaseKey, checkpointValue); pendingCheckpoints.remove(leaseKey); @@ -49,33 +48,32 @@ public class InMemoryCheckpointer implements Checkpointer { if (log.isDebugEnabled()) { log.debug("shardId: {} checkpoint: {}", leaseKey, checkpointValue); } - } /** * {@inheritDoc} */ @Override - public ExtendedSequenceNumber getCheckpoint(String leaseKey) throws KinesisClientLibException { + public ExtendedSequenceNumber getCheckpoint(String leaseKey) { ExtendedSequenceNumber checkpoint = flushpoints.get(leaseKey); log.debug("checkpoint shardId: {} checkpoint: {}", leaseKey, checkpoint); return checkpoint; } @Override - public void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken) - throws KinesisClientLibException { + public void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken) { prepareCheckpoint(leaseKey, pendingCheckpoint, concurrencyToken, null); } @Override - public void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken, byte[] pendingCheckpointState) throws KinesisClientLibException { + public void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken, + byte[] pendingCheckpointState) { pendingCheckpoints.put(leaseKey, pendingCheckpoint); pendingCheckpointStates.put(leaseKey, pendingCheckpointState); } @Override - public Checkpoint getCheckpointObject(String leaseKey) throws KinesisClientLibException { + public Checkpoint getCheckpointObject(String leaseKey) { ExtendedSequenceNumber checkpoint = flushpoints.get(leaseKey); ExtendedSequenceNumber pendingCheckpoint = pendingCheckpoints.get(leaseKey); byte[] pendingCheckpointState = pendingCheckpointStates.get(leaseKey); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/ShardShardRecordProcessorCheckpointerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/ShardShardRecordProcessorCheckpointerTest.java index 2ff82004..37a40b6b 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/ShardShardRecordProcessorCheckpointerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/ShardShardRecordProcessorCheckpointerTest.java @@ -91,11 +91,11 @@ public class ShardShardRecordProcessorCheckpointerTest { */ @Test public final void testCheckpointRecord() throws Exception { - ShardRecordProcessorCheckpointer processingCheckpointer = + ShardRecordProcessorCheckpointer processingCheckpointer = new ShardRecordProcessorCheckpointer(shardInfo, checkpoint); - processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); - ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5025"); - Record record = makeRecord("5025"); + processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); + ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5025"); + Record record = makeRecord("5025"); processingCheckpointer.largestPermittedCheckpointValue(extendedSequenceNumber); processingCheckpointer.checkpoint(record); assertThat(checkpoint.getCheckpoint(shardId), equalTo(extendedSequenceNumber)); @@ -107,13 +107,13 @@ public class ShardShardRecordProcessorCheckpointerTest { */ @Test public final void testCheckpointSubRecord() throws Exception { - ShardRecordProcessorCheckpointer processingCheckpointer = + ShardRecordProcessorCheckpointer processingCheckpointer = new ShardRecordProcessorCheckpointer(shardInfo, checkpoint); - processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); - ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5030"); - Record record = makeRecord("5030"); + processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); + ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5030"); + Record record = makeRecord("5030"); //UserRecord subRecord = new UserRecord(record); - processingCheckpointer.largestPermittedCheckpointValue(extendedSequenceNumber); + processingCheckpointer.largestPermittedCheckpointValue(extendedSequenceNumber); processingCheckpointer.checkpoint(record); assertThat(checkpoint.getCheckpoint(shardId), equalTo(extendedSequenceNumber)); } @@ -124,11 +124,11 @@ public class ShardShardRecordProcessorCheckpointerTest { */ @Test public final void testCheckpointSequenceNumber() throws Exception { - ShardRecordProcessorCheckpointer processingCheckpointer = + ShardRecordProcessorCheckpointer processingCheckpointer = new ShardRecordProcessorCheckpointer(shardInfo, checkpoint); - processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); - ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5035"); - processingCheckpointer.largestPermittedCheckpointValue(extendedSequenceNumber); + processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); + ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5035"); + processingCheckpointer.largestPermittedCheckpointValue(extendedSequenceNumber); processingCheckpointer.checkpoint("5035"); assertThat(checkpoint.getCheckpoint(shardId), equalTo(extendedSequenceNumber)); } @@ -139,11 +139,11 @@ public class ShardShardRecordProcessorCheckpointerTest { */ @Test public final void testCheckpointExtendedSequenceNumber() throws Exception { - ShardRecordProcessorCheckpointer processingCheckpointer = + ShardRecordProcessorCheckpointer processingCheckpointer = new ShardRecordProcessorCheckpointer(shardInfo, checkpoint); - processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); - ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5040"); - processingCheckpointer.largestPermittedCheckpointValue(extendedSequenceNumber); + processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); + ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5040"); + processingCheckpointer.largestPermittedCheckpointValue(extendedSequenceNumber); processingCheckpointer.checkpoint("5040", 0); assertThat(checkpoint.getCheckpoint(shardId), equalTo(extendedSequenceNumber)); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DeterministicShuffleShardSyncLeaderDeciderTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DeterministicShuffleShardSyncLeaderDeciderTest.java index dff2a8cb..9508903b 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DeterministicShuffleShardSyncLeaderDeciderTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DeterministicShuffleShardSyncLeaderDeciderTest.java @@ -105,7 +105,7 @@ public class DeterministicShuffleShardSyncLeaderDeciderTest { @Test public void testElectedLeadersAsPerExpectedShufflingOrder() throws Exception { - List leases = getLeases(5, false /*emptyLeaseOwner */,false /* duplicateLeaseOwner */, true /* activeLeases */); + List leases = getLeases(5, false /*emptyLeaseOwner */, false /* duplicateLeaseOwner */, true /* activeLeases */); when(leaseRefresher.listLeases()).thenReturn(leases); Set expectedLeaders = getExpectedLeaders(leases); for (String leader : expectedLeaders) { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DiagnosticEventsTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DiagnosticEventsTest.java index d6098cca..08ed8abb 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DiagnosticEventsTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DiagnosticEventsTest.java @@ -86,7 +86,7 @@ public class DiagnosticEventsTest { assertEquals(event.getLargestPoolSize(), largestPoolSize); assertEquals(event.getMaximumPoolSize(), maximumPoolSize); assertEquals(event.getLeasesOwned(), leaseAssignments.size()); - assertEquals(event.getCurrentQueueSize(),0); + assertEquals(0, event.getCurrentQueueSize()); verify(defaultHandler, times(1)).visit(event); } @@ -110,7 +110,7 @@ public class DiagnosticEventsTest { assertEquals(event.getExecutorStateEvent().getLargestPoolSize(), largestPoolSize); assertEquals(event.getExecutorStateEvent().getMaximumPoolSize(), maximumPoolSize); assertEquals(event.getExecutorStateEvent().getLeasesOwned(), leaseAssignments.size()); - assertEquals(event.getExecutorStateEvent().getCurrentQueueSize(),0); + assertEquals(0, event.getExecutorStateEvent().getCurrentQueueSize()); assertTrue(event.getThrowable() instanceof TestRejectedTaskException); verify(defaultHandler, times(1)).visit(event); @@ -136,7 +136,7 @@ public class DiagnosticEventsTest { assertEquals(executorStateEvent.getLargestPoolSize(), largestPoolSize); assertEquals(executorStateEvent.getMaximumPoolSize(), maximumPoolSize); assertEquals(executorStateEvent.getLeasesOwned(), leaseAssignments.size()); - assertEquals(executorStateEvent.getCurrentQueueSize(),0); + assertEquals(0, executorStateEvent.getCurrentQueueSize()); RejectedTaskEvent rejectedTaskEvent = factory.rejectedTaskEvent(executorStateEvent, new TestRejectedTaskException()); @@ -145,7 +145,7 @@ public class DiagnosticEventsTest { assertEquals(rejectedTaskEvent.getExecutorStateEvent().getLargestPoolSize(), largestPoolSize); assertEquals(rejectedTaskEvent.getExecutorStateEvent().getMaximumPoolSize(), maximumPoolSize); assertEquals(rejectedTaskEvent.getExecutorStateEvent().getLeasesOwned(), leaseAssignments.size()); - assertEquals(rejectedTaskEvent.getExecutorStateEvent().getCurrentQueueSize(),0); + assertEquals(0, rejectedTaskEvent.getExecutorStateEvent().getCurrentQueueSize()); assertTrue(rejectedTaskEvent.getThrowable() instanceof TestRejectedTaskException); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManagerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManagerTest.java index f7492d8d..71375c3d 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManagerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManagerTest.java @@ -210,7 +210,7 @@ public class PeriodicShardSyncManagerTest { }}.stream().map(hashKeyRangeForLease -> { MultiStreamLease lease = new MultiStreamLease(); lease.hashKeyRange(hashKeyRangeForLease); - if(lease.hashKeyRangeForLease().startingHashKey().toString().equals("4")) { + if (lease.hashKeyRangeForLease().startingHashKey().toString().equals("4")) { lease.checkpoint(ExtendedSequenceNumber.SHARD_END); } else { lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); @@ -342,7 +342,7 @@ public class PeriodicShardSyncManagerTest { lease.leaseKey(MultiStreamLease.getLeaseKey(streamIdentifier.serialize(), "shard-"+(++leaseCounter[0]))); lease.shardId("shard-"+(leaseCounter[0])); // Setting the hashrange only for last two leases - if(leaseCounter[0] >= 3) { + if (leaseCounter[0] >= 3) { lease.hashKeyRange(hashKeyRangeForLease); } lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); @@ -355,7 +355,7 @@ public class PeriodicShardSyncManagerTest { Assert.assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()); // Assert that all the leases now has hashRanges set. - for(Lease lease : multiStreamLeases) { + for (Lease lease : multiStreamLeases) { Assert.assertNotNull(lease.hashKeyRangeForLease()); } } @@ -390,7 +390,7 @@ public class PeriodicShardSyncManagerTest { lease.leaseKey(MultiStreamLease.getLeaseKey(streamIdentifier.serialize(), "shard-"+(++leaseCounter[0]))); lease.shardId("shard-"+(leaseCounter[0])); // Setting the hashrange only for last two leases - if(leaseCounter[0] >= 3) { + if (leaseCounter[0] >= 3) { lease.hashKeyRange(hashKeyRangeForLease); } lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); @@ -403,14 +403,14 @@ public class PeriodicShardSyncManagerTest { Assert.assertTrue(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()); // Assert that all the leases now has hashRanges set. - for(Lease lease : multiStreamLeases) { + for (Lease lease : multiStreamLeases) { Assert.assertNotNull(lease.hashKeyRangeForLease()); } } @Test public void testFor1000DifferentValidSplitHierarchyTreeTheHashRangesAreAlwaysComplete() { - for(int i=0; i < 1000; i++) { + for (int i=0; i < 1000; i++) { int maxInitialLeaseCount = 100; List leases = generateInitialLeases(maxInitialLeaseCount); reshard(leases, 5, ReshardType.SPLIT, maxInitialLeaseCount, false); @@ -514,7 +514,7 @@ public class PeriodicShardSyncManagerTest { for (int i = 0; i < leasesToMerge; i += 2) { Lease parent1 = leasesEligibleForMerge.get(i); Lease parent2 = leasesEligibleForMerge.get(i + 1); - if(parent2.hashKeyRangeForLease().startingHashKey().subtract(parent1.hashKeyRangeForLease().endingHashKey()).equals(BigInteger.ONE)) + if (parent2.hashKeyRangeForLease().startingHashKey().subtract(parent1.hashKeyRangeForLease().endingHashKey()).equals(BigInteger.ONE)) { parent1.checkpoint(ExtendedSequenceNumber.SHARD_END); if (!shouldKeepSomeParentsInProgress || (shouldKeepSomeParentsInProgress && isOneFromDiceRoll())) { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java index 46918f62..3b5bfec9 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java @@ -35,7 +35,6 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.mockito.internal.verification.VerificationModeFactory.atMost; -import static software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy.*; import java.time.Duration; import java.util.ArrayList; @@ -104,6 +103,9 @@ import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.metrics.MetricsConfig; import software.amazon.kinesis.processor.Checkpointer; import software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy; +import software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy.AutoDetectionAndDeferredDeletionStrategy; +import software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy.NoLeaseDeletionStrategy; +import software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy.ProvidedStreamsDeferredDeletionStrategy; import software.amazon.kinesis.processor.MultiStreamTracker; import software.amazon.kinesis.processor.ProcessorConfig; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; @@ -727,8 +729,8 @@ public class SchedulerTest { boolean expectPendingStreamsForDeletion, boolean onlyStreamsNoLeasesDeletion) throws DependencyException, ProvisionedThroughputException, InvalidStateException { - List streamConfigList1 = createDummyStreamConfigList(1,5); - List streamConfigList2 = createDummyStreamConfigList(3,7); + List streamConfigList1 = createDummyStreamConfigList(1, 5); + List streamConfigList2 = createDummyStreamConfigList(3, 7); retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName) .retrievalFactory(retrievalFactory); when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList1, streamConfigList2); @@ -742,7 +744,7 @@ public class SchedulerTest { Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))) .collect(Collectors.toCollection(HashSet::new)); - if(onlyStreamsNoLeasesDeletion) { + if (onlyStreamsNoLeasesDeletion) { expectedSyncedStreams = IntStream.concat(IntStream.range(1, 3), IntStream.range(5, 7)) .mapToObj(streamId -> StreamIdentifier.multiStreamInstance( Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))) @@ -756,7 +758,7 @@ public class SchedulerTest { Assert.assertEquals(expectedSyncedStreams, syncedStreams); List expectedCurrentStreamConfigs; - if(onlyStreamsNoLeasesDeletion) { + if (onlyStreamsNoLeasesDeletion) { expectedCurrentStreamConfigs = IntStream.range(3, 7).mapToObj(streamId -> new StreamConfig( StreamIdentifier.multiStreamInstance( Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)), @@ -778,8 +780,8 @@ public class SchedulerTest { @Test public void testKinesisStaleDeletedStreamCleanup() throws ProvisionedThroughputException, InvalidStateException, DependencyException { - List streamConfigList1 = createDummyStreamConfigList(1,6); - List streamConfigList2 = createDummyStreamConfigList(1,4); + List streamConfigList1 = createDummyStreamConfigList(1, 6); + List streamConfigList2 = createDummyStreamConfigList(1, 4); prepareForStaleDeletedStreamCleanupTests(streamConfigList1, streamConfigList2); @@ -820,7 +822,7 @@ public class SchedulerTest { @Test public void testKinesisStaleDeletedStreamNoCleanUpForTrackedStream() throws ProvisionedThroughputException, InvalidStateException, DependencyException { - List streamConfigList1 = createDummyStreamConfigList(1,6); + List streamConfigList1 = createDummyStreamConfigList(1, 6); prepareForStaleDeletedStreamCleanupTests(streamConfigList1); scheduler.deletedStreamListProvider().add(createDummyStreamConfig(3).streamIdentifier()); @@ -1243,7 +1245,7 @@ public class SchedulerTest { @Override public ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFactory, StreamConfig streamConfig, DeletedStreamListProvider deletedStreamListProvider) { - if(shouldReturnDefaultShardSyncTaskmanager) { + if (shouldReturnDefaultShardSyncTaskmanager) { return shardSyncTaskManager; } final ShardSyncTaskManager shardSyncTaskManager = mock(ShardSyncTaskManager.class); @@ -1255,7 +1257,7 @@ public class SchedulerTest { when(shardSyncTaskManager.hierarchicalShardSyncer()).thenReturn(hierarchicalShardSyncer); when(shardDetector.streamIdentifier()).thenReturn(streamConfig.streamIdentifier()); when(shardSyncTaskManager.callShardSyncTask()).thenReturn(new TaskResult(null)); - if(shardSyncFirstAttemptFailure) { + if (shardSyncFirstAttemptFailure) { when(shardDetector.listShards()) .thenThrow(new RuntimeException("Service Exception")) .thenReturn(Collections.EMPTY_LIST); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/WorkerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/WorkerTest.java index 11d17368..17cad629 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/WorkerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/WorkerTest.java @@ -118,8 +118,7 @@ public class WorkerTest { private static final IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY_V2 = SAMPLE_RECORD_PROCESSOR_FACTORY; - - *//** + *//* * Test method for {@link Worker#getApplicationName()}. *//* @Test @@ -346,7 +345,7 @@ public class WorkerTest { Assert.assertTrue(count > 0); } - *//** + *//* * Runs worker with threadPoolSize == numShards * Test method for {@link Worker#run()}. *//* @@ -357,7 +356,7 @@ public class WorkerTest { runAndTestWorker(numShards, threadPoolSize); } - *//** + *//* * Runs worker with threadPoolSize < numShards * Test method for {@link Worker#run()}. *//* @@ -368,7 +367,7 @@ public class WorkerTest { runAndTestWorker(numShards, threadPoolSize); } - *//** + *//* * Runs worker with threadPoolSize > numShards * Test method for {@link Worker#run()}. *//* @@ -379,7 +378,7 @@ public class WorkerTest { runAndTestWorker(numShards, threadPoolSize); } - *//** + *//* * Runs worker with threadPoolSize < numShards * Test method for {@link Worker#run()}. *//* @@ -395,7 +394,7 @@ public class WorkerTest { runAndTestWorker(shardList, threadPoolSize, initialLeases, callProcessRecordsForEmptyRecordList, numberOfRecordsPerShard, config); } - *//** + *//* * Runs worker with threadPoolSize < numShards * Test method for {@link Worker#run()}. *//* @@ -557,7 +556,7 @@ public class WorkerTest { verify(v2RecordProcessor, times(1)).shutdown(any(ShutdownInput.class)); } - *//** + *//* * This test is testing the {@link Worker}'s shutdown behavior and by extension the behavior of * {@link ThreadPoolExecutor#shutdownNow()}. It depends on the thread pool sending an interrupt to the pool threads. * This behavior makes the test a bit racy, since we need to ensure a specific order of events. @@ -1734,7 +1733,8 @@ public class WorkerTest { return new ReflectionFieldMatcher<>(itemClass, fieldName, fieldMatcher); } } - *//** + + *//* * Returns executor service that will be owned by the worker. This is useful to test the scenario * where worker shuts down the executor service also during shutdown flow. * @@ -1756,9 +1756,6 @@ public class WorkerTest { return shards; } - *//** - * @return - *//* private List createShardListWithOneSplit() { List shards = new ArrayList(); SequenceNumberRange range0 = ShardObjectHelper.newSequenceNumberRange("39428", "987324"); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java index 9e130c38..1a1abc0e 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java @@ -1592,7 +1592,7 @@ public class HierarchicalShardSyncerTest { assertExpectedLeasesAreCreated(SHARD_GRAPH_A, shardIdsOfCurrentLeases, INITIAL_POSITION_TRIM_HORIZON); } - /** + /* *
      * Shard structure (x-axis is epochs):
      * 0  3   6   9
@@ -1869,7 +1869,7 @@ public class HierarchicalShardSyncerTest {
         assertExpectedLeasesAreCreated(SHARD_GRAPH_A, shardIdsOfCurrentLeases, INITIAL_POSITION_AT_TIMESTAMP);
     }
 
-    /**
+    /*
      * 
      * Shard structure (x-axis is epochs):
      * 0  3   6   9
@@ -2325,12 +2325,16 @@ public class HierarchicalShardSyncerTest {
     @Test
     public void testEmptyLeaseTablePopulatesLeasesWithCompleteHashRangeAfterTwoRetries() throws Exception {
         final List shardsWithIncompleteHashRange = Arrays.asList(
-                ShardObjectHelper.newShard("shardId-0", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange(ShardObjectHelper.MIN_HASH_KEY, "69")),
-                ShardObjectHelper.newShard("shardId-1", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange("71", ShardObjectHelper.MAX_HASH_KEY))
+                ShardObjectHelper.newShard("shardId-0", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"),
+                        ShardObjectHelper.newHashKeyRange(ShardObjectHelper.MIN_HASH_KEY, "69")),
+                ShardObjectHelper.newShard("shardId-1", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"),
+                        ShardObjectHelper.newHashKeyRange("71", ShardObjectHelper.MAX_HASH_KEY))
         );
         final List shardsWithCompleteHashRange = Arrays.asList(
-                ShardObjectHelper.newShard("shardId-2", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange(ShardObjectHelper.MIN_HASH_KEY, "420")),
-                ShardObjectHelper.newShard("shardId-3", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange("421", ShardObjectHelper.MAX_HASH_KEY))
+                ShardObjectHelper.newShard("shardId-2", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"),
+                        ShardObjectHelper.newHashKeyRange(ShardObjectHelper.MIN_HASH_KEY, "420")),
+                ShardObjectHelper.newShard("shardId-3", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"),
+                        ShardObjectHelper.newHashKeyRange("421", ShardObjectHelper.MAX_HASH_KEY))
         );
 
         when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(true);
@@ -2352,8 +2356,10 @@ public class HierarchicalShardSyncerTest {
     @Test
     public void testEmptyLeaseTablePopulatesLeasesWithCompleteHashRange() throws Exception {
         final List shardsWithCompleteHashRange = Arrays.asList(
-                ShardObjectHelper.newShard("shardId-2", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange(ShardObjectHelper.MIN_HASH_KEY, "420")),
-                ShardObjectHelper.newShard("shardId-3", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange("421", ShardObjectHelper.MAX_HASH_KEY))
+                ShardObjectHelper.newShard("shardId-2", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"),
+                        ShardObjectHelper.newHashKeyRange(ShardObjectHelper.MIN_HASH_KEY, "420")),
+                ShardObjectHelper.newShard("shardId-3", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"),
+                        ShardObjectHelper.newHashKeyRange("421", ShardObjectHelper.MAX_HASH_KEY))
         );
 
         when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(true);
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCoordinatorExerciser.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCoordinatorExerciser.java
index 186fe290..72b48f16 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCoordinatorExerciser.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCoordinatorExerciser.java
@@ -14,10 +14,11 @@
  */
 package software.amazon.kinesis.leases;
 
-import java.awt.*;
+import java.awt.Button;
+import java.awt.Dimension;
+import java.awt.GridLayout;
 import java.awt.event.ActionEvent;
 import java.awt.event.ActionListener;
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
@@ -25,7 +26,10 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import javax.swing.*;
+import javax.swing.BoxLayout;
+import javax.swing.JFrame;
+import javax.swing.JLabel;
+import javax.swing.JPanel;
 
 import lombok.extern.slf4j.Slf4j;
 import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
@@ -54,9 +58,8 @@ public class LeaseCoordinatorExerciser {
     private static final long INITIAL_LEASE_TABLE_READ_CAPACITY = 10L;
     private static final long INITIAL_LEASE_TABLE_WRITE_CAPACITY = 50L;
 
-    public static void main(String[] args) throws InterruptedException, DependencyException, InvalidStateException,
-            ProvisionedThroughputException, IOException {
-
+    public static void main(String[] args) throws DependencyException, InvalidStateException,
+            ProvisionedThroughputException {
         int numCoordinators = 9;
         int numLeases = 73;
         int leaseDurationMillis = 10000;
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardObjectHelper.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardObjectHelper.java
index ee2504d8..cc03a203 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardObjectHelper.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardObjectHelper.java
@@ -56,7 +56,6 @@ public class ShardObjectHelper {
     private ShardObjectHelper() {
     }
 
-
     /** Helper method to create a new shard object.
      * @param shardId
      * @param parentShardId
@@ -84,7 +83,9 @@ public class ShardObjectHelper {
                                  String adjacentParentShardId,
                                  SequenceNumberRange sequenceNumberRange,
                                  HashKeyRange hashKeyRange) {
-        return Shard.builder().shardId(shardId).parentShardId(parentShardId).adjacentParentShardId(adjacentParentShardId).sequenceNumberRange(sequenceNumberRange).hashKeyRange(hashKeyRange).build();
+        return Shard.builder().shardId(shardId).parentShardId(parentShardId)
+                .adjacentParentShardId(adjacentParentShardId).sequenceNumberRange(sequenceNumberRange)
+                .hashKeyRange(hashKeyRange).build();
     }
 
     /** Helper method.
@@ -116,5 +117,4 @@ public class ShardObjectHelper {
         return parentShardIds;
     }
 
-
 }
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerBillingModePayPerRequestIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerBillingModePayPerRequestIntegrationTest.java
index 1dad013e..3f692da5 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerBillingModePayPerRequestIntegrationTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerBillingModePayPerRequestIntegrationTest.java
@@ -37,7 +37,7 @@ import static org.junit.Assert.assertThat;
 @RunWith(MockitoJUnitRunner.class)
 public class DynamoDBLeaseRenewerBillingModePayPerRequestIntegrationTest extends
         LeaseIntegrationBillingModePayPerRequestTest {
-    private final String TEST_METRIC = "TestOperation";
+    private static final String TEST_METRIC = "TestOperation";
 
     // This test case's leases last 2 seconds
     private static final long LEASE_DURATION_MILLIS = 2000L;
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerIntegrationTest.java
index 7c884fd6..f179a073 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerIntegrationTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerIntegrationTest.java
@@ -36,7 +36,7 @@ import software.amazon.kinesis.metrics.NullMetricsFactory;
 import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
 @RunWith(MockitoJUnitRunner.class)
 public class DynamoDBLeaseRenewerIntegrationTest extends LeaseIntegrationTest {
-    private final String TEST_METRIC = "TestOperation";
+    private static final String TEST_METRIC = "TestOperation";
 
     // This test case's leases last 2 seconds
     private static final long LEASE_DURATION_MILLIS = 2000L;
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerTest.java
index bfff4e92..72379e88 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerTest.java
@@ -86,7 +86,7 @@ public class DynamoDBLeaseRenewerTest {
          */
         Lease lease1 = newLease("1");
         Lease lease2 = newLease("2");
-        leasesToRenew = Arrays.asList(lease1,lease2);
+        leasesToRenew = Arrays.asList(lease1, lease2);
         renewer.addLeasesToRenew(leasesToRenew);
 
         doReturn(true).when(leaseRefresher).renewLease(lease1);
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/metrics/MetricAccumulatingQueueTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/metrics/MetricAccumulatingQueueTest.java
index 18bba742..0354a214 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/metrics/MetricAccumulatingQueueTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/metrics/MetricAccumulatingQueueTest.java
@@ -47,8 +47,8 @@ public class MetricAccumulatingQueueTest {
      */
     @Test
     public void testAccumulation() {
-        Collection dimensionsA = Collections.singleton(dim("name","a"));
-        Collection dimensionsB = Collections.singleton(dim("name","b"));
+        Collection dimensionsA = Collections.singleton(dim("name", "a"));
+        Collection dimensionsB = Collections.singleton(dim("name", "b"));
         String keyA = "a";
         String keyB = "b";
 
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java
index 40d86c49..0f8e628e 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java
@@ -176,7 +176,7 @@ public class FanOutRecordsPublisherTest {
     }
 
     @Test
-    public void InvalidEventTest() throws Exception {
+    public void testInvalidEvent() {
         FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN);
 
         ArgumentCaptor captor = ArgumentCaptor
@@ -443,10 +443,11 @@ public class FanOutRecordsPublisherTest {
 
                     @Override public void onNext(RecordsRetrieved input) {
                         receivedInput.add(input.processRecordsInput());
-                        assertEquals("" + ++lastSeenSeqNum, ((FanOutRecordsPublisher.FanoutRecordsRetrieved)input).continuationSequenceNumber());
+                        assertEquals("" + ++lastSeenSeqNum,
+                                ((FanOutRecordsPublisher.FanoutRecordsRetrieved) input).continuationSequenceNumber());
                         subscription.request(1);
                         servicePublisher.request(1);
-                        if(receivedInput.size() == totalServicePublisherEvents) {
+                        if (receivedInput.size() == totalServicePublisherEvents) {
                             servicePublisherTaskCompletionLatch.countDown();
                         }
                     }
@@ -549,10 +550,11 @@ public class FanOutRecordsPublisherTest {
 
                     @Override public void onNext(RecordsRetrieved input) {
                         receivedInput.add(input.processRecordsInput());
-                        assertEquals("" + ++lastSeenSeqNum, ((FanOutRecordsPublisher.FanoutRecordsRetrieved)input).continuationSequenceNumber());
+                        assertEquals("" + ++lastSeenSeqNum,
+                                ((FanOutRecordsPublisher.FanoutRecordsRetrieved) input).continuationSequenceNumber());
                         subscription.request(1);
                         servicePublisher.request(1);
-                        if(receivedInput.size() == triggerCompleteAtNthEvent) {
+                        if (receivedInput.size() == triggerCompleteAtNthEvent) {
                             servicePublisherTaskCompletionLatch.countDown();
                         }
                     }
@@ -681,7 +683,7 @@ public class FanOutRecordsPublisherTest {
                         receivedInput.add(input.processRecordsInput());
                         subscription.request(1);
                         servicePublisher.request(1);
-                        if(receivedInput.size() == triggerCompleteAtNthEvent) {
+                        if (receivedInput.size() == triggerCompleteAtNthEvent) {
                             servicePublisherTaskCompletionLatch.countDown();
                         }
                     }
@@ -783,10 +785,11 @@ public class FanOutRecordsPublisherTest {
 
                     @Override public void onNext(RecordsRetrieved input) {
                         receivedInput.add(input.processRecordsInput());
-                        assertEquals("" + ++lastSeenSeqNum, ((FanOutRecordsPublisher.FanoutRecordsRetrieved)input).continuationSequenceNumber());
+                        assertEquals("" + ++lastSeenSeqNum,
+                                ((FanOutRecordsPublisher.FanoutRecordsRetrieved) input).continuationSequenceNumber());
                         subscription.request(1);
                         servicePublisher.request(1);
-                        if(receivedInput.size() == triggerErrorAtNthEvent) {
+                        if (receivedInput.size() == triggerErrorAtNthEvent) {
                             servicePublisherTaskCompletionLatch.countDown();
                         }
                     }
@@ -879,10 +882,11 @@ public class FanOutRecordsPublisherTest {
 
                     @Override public void onNext(RecordsRetrieved input) {
                         receivedInput.add(input.processRecordsInput());
-                        assertEquals("" + ++lastSeenSeqNum, ((FanOutRecordsPublisher.FanoutRecordsRetrieved)input).continuationSequenceNumber());
+                        assertEquals("" + ++lastSeenSeqNum,
+                                ((FanOutRecordsPublisher.FanoutRecordsRetrieved) input).continuationSequenceNumber());
                         subscription.request(1);
                         servicePublisher.request(1);
-                        if(receivedInput.size() == totalServicePublisherEvents) {
+                        if (receivedInput.size() == totalServicePublisherEvents) {
                             servicePublisherTaskCompletionLatch.countDown();
                         }
                     }
@@ -973,7 +977,8 @@ public class FanOutRecordsPublisherTest {
 
                     @Override public void onNext(RecordsRetrieved input) {
                         receivedInput.add(input.processRecordsInput());
-                        assertEquals("" + ++lastSeenSeqNum, ((FanOutRecordsPublisher.FanoutRecordsRetrieved)input).continuationSequenceNumber());
+                        assertEquals("" + ++lastSeenSeqNum,
+                                ((FanOutRecordsPublisher.FanoutRecordsRetrieved) input).continuationSequenceNumber());
                         subscription.request(1);
                         servicePublisher.request(1);
                     }
@@ -1328,7 +1333,7 @@ public class FanOutRecordsPublisherTest {
                 fanOutRecordsPublisher
                         .evictAckedEventAndScheduleNextEvent(() -> recordsRetrieved.batchUniqueIdentifier());
                 // Send stale event periodically
-                if(totalRecordsRetrieved[0] % 10 == 0) {
+                if (totalRecordsRetrieved[0] % 10 == 0) {
                     fanOutRecordsPublisher.evictAckedEventAndScheduleNextEvent(
                             () -> new BatchUniqueIdentifier("some_uuid_str", "some_old_flow"));
                 }
@@ -1368,7 +1373,7 @@ public class FanOutRecordsPublisherTest {
         int count = 0;
         // Now that we allowed upto 10 elements queued up, send a pair of good and stale ack to verify records
         // delivered as expected.
-        while(count++ < 10 && (batchUniqueIdentifierQueued = ackQueue.take()) != null) {
+        while (count++ < 10 && (batchUniqueIdentifierQueued = ackQueue.take()) != null) {
             final BatchUniqueIdentifier batchUniqueIdentifierFinal = batchUniqueIdentifierQueued;
             fanOutRecordsPublisher
                     .evictAckedEventAndScheduleNextEvent(() -> batchUniqueIdentifierFinal);
@@ -1403,7 +1408,7 @@ public class FanOutRecordsPublisherTest {
         int count = 0;
         // Now that we allowed upto 10 elements queued up, send a pair of good and stale ack to verify records
         // delivered as expected.
-        while(count++ < 2 && (batchUniqueIdentifierQueued = ackQueue.poll(1000, TimeUnit.MILLISECONDS)) != null) {
+        while (count++ < 2 && (batchUniqueIdentifierQueued = ackQueue.poll(1000, TimeUnit.MILLISECONDS)) != null) {
             final BatchUniqueIdentifier batchUniqueIdentifierFinal = batchUniqueIdentifierQueued;
             fanOutRecordsPublisher.evictAckedEventAndScheduleNextEvent(
                     () -> new BatchUniqueIdentifier("some_uuid_str", batchUniqueIdentifierFinal.getFlowIdentifier()));
@@ -1457,7 +1462,8 @@ public class FanOutRecordsPublisherTest {
 
         flowCaptor.getValue().exceptionOccurred(exception);
 
-        Optional onErrorEvent = subscriber.events.stream().filter(e -> e instanceof OnErrorEvent).map(e -> (OnErrorEvent)e).findFirst();
+        Optional onErrorEvent = subscriber.events.stream().filter(e -> e instanceof OnErrorEvent)
+                .map(e -> (OnErrorEvent) e).findFirst();
 
         assertThat(onErrorEvent, equalTo(Optional.of(new OnErrorEvent(exception))));
         assertThat(acquireTimeoutLogged.get(), equalTo(true));
@@ -1587,8 +1593,8 @@ public class FanOutRecordsPublisherTest {
         public void run() {
             for (int i = 1; i <= numOfTimes; ) {
                 demandNotifier.acquireUninterruptibly();
-                if(i == sendCompletionAt) {
-                    if(shardEndAction != null) {
+                if (i == sendCompletionAt) {
+                    if (shardEndAction != null) {
                         shardEndAction.accept(i++);
                     } else {
                         action.accept(i++);
@@ -1596,7 +1602,7 @@ public class FanOutRecordsPublisherTest {
                     completeAction.run();
                     break;
                 }
-                if(i == sendErrorAt) {
+                if (i == sendErrorAt) {
                     action.accept(i++);
                     errorAction.run();
                     break;
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcherTest.java
index 2e09f34a..4ac8bbf7 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcherTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcherTest.java
@@ -331,7 +331,7 @@ public class KinesisDataFetcherTest {
 
     private CompletableFuture makeGetRecordsResponse(String nextIterator, List records) {
         List childShards = new ArrayList<>();
-        if(nextIterator == null) {
+        if (nextIterator == null) {
             childShards = createChildShards();
         }
         return CompletableFuture.completedFuture(GetRecordsResponse.builder().nextShardIterator(nextIterator)
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherIntegrationTest.java
index 5d757a6c..d9955da4 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherIntegrationTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherIntegrationTest.java
@@ -22,7 +22,6 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
@@ -277,7 +276,8 @@ public class PrefetchRecordsPublisherIntegrationTest {
 
         @Override
         public DataFetcherResult getRecords() {
-            GetRecordsResponse getRecordsResult = GetRecordsResponse.builder().records(new ArrayList<>(records)).nextShardIterator(nextShardIterator).millisBehindLatest(1000L).build();
+            GetRecordsResponse getRecordsResult = GetRecordsResponse.builder().records(new ArrayList<>(records))
+                    .nextShardIterator(nextShardIterator).millisBehindLatest(1000L).build();
 
             return new AdvancingResult(getRecordsResult);
         }
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java
index 74707eb4..af02469a 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java
@@ -327,7 +327,7 @@ public class PrefetchRecordsPublisherTest {
         //        TODO: fix this verification
         //        verify(getRecordsRetrievalStrategy, times(callRate)).getRecords(MAX_RECORDS_PER_CALL);
         //        assertEquals(spyQueue.size(), callRate);
-        assertTrue("Call Rate is "+callRate,callRate < MAX_SIZE);
+        assertTrue("Call Rate is " + callRate, callRate < MAX_SIZE);
     }
 
     @Test
@@ -422,8 +422,10 @@ public class PrefetchRecordsPublisherTest {
 
     @Test
     public void testRetryableRetrievalExceptionContinues() {
-        GetRecordsResponse response = GetRecordsResponse.builder().millisBehindLatest(100L).records(Collections.emptyList()).nextShardIterator(NEXT_SHARD_ITERATOR).build();
-        when(getRecordsRetrievalStrategy.getRecords(anyInt())).thenThrow(new RetryableRetrievalException("Timeout", new TimeoutException("Timeout"))).thenReturn(response);
+        GetRecordsResponse response = GetRecordsResponse.builder().millisBehindLatest(100L)
+                .records(Collections.emptyList()).nextShardIterator(NEXT_SHARD_ITERATOR).build();
+        when(getRecordsRetrievalStrategy.getRecords(anyInt()))
+                .thenThrow(new RetryableRetrievalException("Timeout", new TimeoutException("Timeout"))).thenReturn(response);
 
         getRecordsCache.start(sequenceNumber, initialPosition);
 
@@ -638,7 +640,7 @@ public class PrefetchRecordsPublisherTest {
 
         verify(getRecordsRetrievalStrategy, atLeast(2)).getRecords(anyInt());
 
-        while(getRecordsCache.getPublisherSession().prefetchRecordsQueue().remainingCapacity() > 0) {
+        while (getRecordsCache.getPublisherSession().prefetchRecordsQueue().remainingCapacity() > 0) {
             Thread.yield();
         }
 
@@ -697,7 +699,7 @@ public class PrefetchRecordsPublisherTest {
 
         public void resetIteratorTo(String nextIterator) {
             Iterator newIterator = responses.iterator();
-            while(newIterator.hasNext()) {
+            while (newIterator.hasNext()) {
                 GetRecordsResponse current = newIterator.next();
                 if (StringUtils.equals(nextIterator, current.nextShardIterator())) {
                     if (!newIterator.hasNext()) {
@@ -725,7 +727,7 @@ public class PrefetchRecordsPublisherTest {
 
         private static final int LOSS_EVERY_NTH_RECORD = 50;
         private static int recordCounter = 0;
-        private static final ScheduledExecutorService consumerHealthChecker = Executors.newScheduledThreadPool(1);
+        private static final ScheduledExecutorService CONSUMER_HEALTH_CHECKER = Executors.newScheduledThreadPool(1);
 
         public LossyNotificationSubscriber(Subscriber delegate, RecordsPublisher recordsPublisher) {
             super(delegate, recordsPublisher);
@@ -738,7 +740,7 @@ public class PrefetchRecordsPublisherTest {
                 getDelegateSubscriber().onNext(recordsRetrieved);
             } else {
                 log.info("Record Loss Triggered");
-                consumerHealthChecker.schedule(() ->  {
+                CONSUMER_HEALTH_CHECKER.schedule(() ->  {
                     getRecordsPublisher().restartFrom(recordsRetrieved);
                     Flowable.fromPublisher(getRecordsPublisher()).subscribeOn(Schedulers.computation())
                             .observeOn(Schedulers.computation(), true, 8).subscribe(this);
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/BlockingUtils.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/BlockingUtils.java
index 0d68e51b..cd7ad8a6 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/BlockingUtils.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/BlockingUtils.java
@@ -21,7 +21,7 @@ public class BlockingUtils {
 
     public static  Records blockUntilRecordsAvailable(Supplier recordsSupplier, long timeoutMillis) {
         Records recordsRetrieved;
-        while((recordsRetrieved = recordsSupplier.get()) == null && timeoutMillis > 0 ) {
+        while ((recordsRetrieved = recordsSupplier.get()) == null && timeoutMillis > 0 ) {
             try {
                 Thread.sleep(100);
             } catch (InterruptedException e) {
@@ -29,7 +29,7 @@ public class BlockingUtils {
             }
             timeoutMillis -= 100;
         }
-        if(recordsRetrieved != null) {
+        if (recordsRetrieved != null) {
             return recordsRetrieved;
         } else {
             throw new RuntimeException("No records found");
@@ -37,7 +37,7 @@ public class BlockingUtils {
     }
 
     public static boolean blockUntilConditionSatisfied(Supplier conditionSupplier, long timeoutMillis) {
-        while(!conditionSupplier.get() && timeoutMillis > 0 ) {
+        while (!conditionSupplier.get() && timeoutMillis > 0 ) {
             try {
                 Thread.sleep(100);
             } catch (InterruptedException e) {
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/SubscribeToShardRequestMatcher.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/SubscribeToShardRequestMatcher.java
index d120d95a..43c887a3 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/SubscribeToShardRequestMatcher.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/SubscribeToShardRequestMatcher.java
@@ -12,7 +12,7 @@ public class SubscribeToShardRequestMatcher extends ArgumentMatcher
         
         
-        
+        
+            
+            
+        
         
         
         
diff --git a/pom.xml b/pom.xml
index 82d7857d..90cec514 100644
--- a/pom.xml
+++ b/pom.xml
@@ -81,6 +81,7 @@
           checkstyle/checkstyle.xml
           true
           true
+          true
           checkstyle/checkstyle-suppressions.xml