();
SequenceNumberRange range0 = ShardObjectHelper.newSequenceNumberRange("39428", "987324");
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java
index 9e130c38..1a1abc0e 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java
@@ -1592,7 +1592,7 @@ public class HierarchicalShardSyncerTest {
assertExpectedLeasesAreCreated(SHARD_GRAPH_A, shardIdsOfCurrentLeases, INITIAL_POSITION_TRIM_HORIZON);
}
- /**
+ /*
*
* Shard structure (x-axis is epochs):
* 0 3 6 9
@@ -1869,7 +1869,7 @@ public class HierarchicalShardSyncerTest {
assertExpectedLeasesAreCreated(SHARD_GRAPH_A, shardIdsOfCurrentLeases, INITIAL_POSITION_AT_TIMESTAMP);
}
- /**
+ /*
*
* Shard structure (x-axis is epochs):
* 0 3 6 9
@@ -2325,12 +2325,16 @@ public class HierarchicalShardSyncerTest {
@Test
public void testEmptyLeaseTablePopulatesLeasesWithCompleteHashRangeAfterTwoRetries() throws Exception {
final List shardsWithIncompleteHashRange = Arrays.asList(
- ShardObjectHelper.newShard("shardId-0", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange(ShardObjectHelper.MIN_HASH_KEY, "69")),
- ShardObjectHelper.newShard("shardId-1", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange("71", ShardObjectHelper.MAX_HASH_KEY))
+ ShardObjectHelper.newShard("shardId-0", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"),
+ ShardObjectHelper.newHashKeyRange(ShardObjectHelper.MIN_HASH_KEY, "69")),
+ ShardObjectHelper.newShard("shardId-1", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"),
+ ShardObjectHelper.newHashKeyRange("71", ShardObjectHelper.MAX_HASH_KEY))
);
final List shardsWithCompleteHashRange = Arrays.asList(
- ShardObjectHelper.newShard("shardId-2", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange(ShardObjectHelper.MIN_HASH_KEY, "420")),
- ShardObjectHelper.newShard("shardId-3", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange("421", ShardObjectHelper.MAX_HASH_KEY))
+ ShardObjectHelper.newShard("shardId-2", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"),
+ ShardObjectHelper.newHashKeyRange(ShardObjectHelper.MIN_HASH_KEY, "420")),
+ ShardObjectHelper.newShard("shardId-3", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"),
+ ShardObjectHelper.newHashKeyRange("421", ShardObjectHelper.MAX_HASH_KEY))
);
when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(true);
@@ -2352,8 +2356,10 @@ public class HierarchicalShardSyncerTest {
@Test
public void testEmptyLeaseTablePopulatesLeasesWithCompleteHashRange() throws Exception {
final List shardsWithCompleteHashRange = Arrays.asList(
- ShardObjectHelper.newShard("shardId-2", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange(ShardObjectHelper.MIN_HASH_KEY, "420")),
- ShardObjectHelper.newShard("shardId-3", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange("421", ShardObjectHelper.MAX_HASH_KEY))
+ ShardObjectHelper.newShard("shardId-2", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"),
+ ShardObjectHelper.newHashKeyRange(ShardObjectHelper.MIN_HASH_KEY, "420")),
+ ShardObjectHelper.newShard("shardId-3", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"),
+ ShardObjectHelper.newHashKeyRange("421", ShardObjectHelper.MAX_HASH_KEY))
);
when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(true);
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCoordinatorExerciser.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCoordinatorExerciser.java
index 186fe290..72b48f16 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCoordinatorExerciser.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCoordinatorExerciser.java
@@ -14,10 +14,11 @@
*/
package software.amazon.kinesis.leases;
-import java.awt.*;
+import java.awt.Button;
+import java.awt.Dimension;
+import java.awt.GridLayout;
import java.awt.event.ActionEvent;
import java.awt.event.ActionListener;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
@@ -25,7 +26,10 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import javax.swing.*;
+import javax.swing.BoxLayout;
+import javax.swing.JFrame;
+import javax.swing.JLabel;
+import javax.swing.JPanel;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
@@ -54,9 +58,8 @@ public class LeaseCoordinatorExerciser {
private static final long INITIAL_LEASE_TABLE_READ_CAPACITY = 10L;
private static final long INITIAL_LEASE_TABLE_WRITE_CAPACITY = 50L;
- public static void main(String[] args) throws InterruptedException, DependencyException, InvalidStateException,
- ProvisionedThroughputException, IOException {
-
+ public static void main(String[] args) throws DependencyException, InvalidStateException,
+ ProvisionedThroughputException {
int numCoordinators = 9;
int numLeases = 73;
int leaseDurationMillis = 10000;
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseIntegrationBillingModePayPerRequestTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseIntegrationBillingModePayPerRequestTest.java
index 9f7735f9..128d347a 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseIntegrationBillingModePayPerRequestTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseIntegrationBillingModePayPerRequestTest.java
@@ -15,16 +15,8 @@
package software.amazon.kinesis.leases;
import lombok.extern.slf4j.Slf4j;
-import org.junit.Rule;
-import org.junit.rules.TestWatcher;
-import org.junit.runner.Description;
-import org.mockito.Mock;
-import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
-import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseRefresher;
-import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseSerializer;
-import software.amazon.kinesis.leases.dynamodb.TableCreatorCallback;
@Slf4j
public class LeaseIntegrationBillingModePayPerRequestTest extends LeaseIntegrationTest {
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardInfoTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardInfoTest.java
index 276f6c25..4ccafe52 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardInfoTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardInfoTest.java
@@ -26,11 +26,9 @@ import java.util.List;
import java.util.Set;
import java.util.UUID;
-import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
public class ShardInfoTest {
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardObjectHelper.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardObjectHelper.java
index ee2504d8..cc03a203 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardObjectHelper.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardObjectHelper.java
@@ -56,7 +56,6 @@ public class ShardObjectHelper {
private ShardObjectHelper() {
}
-
/** Helper method to create a new shard object.
* @param shardId
* @param parentShardId
@@ -84,7 +83,9 @@ public class ShardObjectHelper {
String adjacentParentShardId,
SequenceNumberRange sequenceNumberRange,
HashKeyRange hashKeyRange) {
- return Shard.builder().shardId(shardId).parentShardId(parentShardId).adjacentParentShardId(adjacentParentShardId).sequenceNumberRange(sequenceNumberRange).hashKeyRange(hashKeyRange).build();
+ return Shard.builder().shardId(shardId).parentShardId(parentShardId)
+ .adjacentParentShardId(adjacentParentShardId).sequenceNumberRange(sequenceNumberRange)
+ .hashKeyRange(hashKeyRange).build();
}
/** Helper method.
@@ -116,5 +117,4 @@ public class ShardObjectHelper {
return parentShardIds;
}
-
}
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorIntegrationTest.java
index d89c010e..05d4ba74 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorIntegrationTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorIntegrationTest.java
@@ -22,7 +22,6 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorTest.java
index caa7a6c7..cf1c536b 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorTest.java
@@ -1,6 +1,5 @@
package software.amazon.kinesis.leases.dynamodb;
-import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -12,7 +11,7 @@ import software.amazon.kinesis.metrics.MetricsFactory;
import java.util.UUID;
-import static org.mockito.Mockito.times;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -51,17 +50,34 @@ public class DynamoDBLeaseCoordinatorTest {
leaseCoordinator.initialize();
- verify(leaseRefresher, times(1)).createLeaseTableIfNotExists();
- verify(leaseRefresher, times(1)).waitUntilLeaseTableExists(SECONDS_BETWEEN_POLLS, TIMEOUT_SECONDS);
+ verify(leaseRefresher).createLeaseTableIfNotExists();
+ verify(leaseRefresher).waitUntilLeaseTableExists(SECONDS_BETWEEN_POLLS, TIMEOUT_SECONDS);
}
- @Test
+ @Test(expected = DependencyException.class)
public void testInitialize_tableCreationFails() throws Exception {
when(leaseRefresher.createLeaseTableIfNotExists()).thenReturn(false);
when(leaseRefresher.waitUntilLeaseTableExists(SECONDS_BETWEEN_POLLS, TIMEOUT_SECONDS)).thenReturn(false);
- Assert.assertThrows(DependencyException.class, () -> leaseCoordinator.initialize());
- verify(leaseRefresher, times(1)).createLeaseTableIfNotExists();
- verify(leaseRefresher, times(1)).waitUntilLeaseTableExists(SECONDS_BETWEEN_POLLS, TIMEOUT_SECONDS);
+ try {
+ leaseCoordinator.initialize();
+ } finally {
+ verify(leaseRefresher).createLeaseTableIfNotExists();
+ verify(leaseRefresher).waitUntilLeaseTableExists(SECONDS_BETWEEN_POLLS, TIMEOUT_SECONDS);
+ }
}
+
+ /**
+ * Validates a {@link NullPointerException} is not thrown when the lease taker
+ * is stopped before it starts/exists.
+ *
+ * @see issue #745
+ * @see issue #900
+ */
+ @Test
+ public void testStopLeaseTakerBeforeStart() {
+ leaseCoordinator.stopLeaseTaker();
+ assertTrue(leaseCoordinator.getAssignments().isEmpty());
+ }
+
}
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java
index 643cc99c..102a9f17 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java
@@ -26,7 +26,6 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -158,7 +157,6 @@ public class DynamoDBLeaseRefresherTest {
verify(mockScanFuture, times(2)).get(anyLong(), any(TimeUnit.class));
verify(dynamoDbClient, times(2)).scan(any(ScanRequest.class));
-
}
@Test
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerBillingModePayPerRequestIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerBillingModePayPerRequestIntegrationTest.java
index 1dad013e..3f692da5 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerBillingModePayPerRequestIntegrationTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerBillingModePayPerRequestIntegrationTest.java
@@ -37,7 +37,7 @@ import static org.junit.Assert.assertThat;
@RunWith(MockitoJUnitRunner.class)
public class DynamoDBLeaseRenewerBillingModePayPerRequestIntegrationTest extends
LeaseIntegrationBillingModePayPerRequestTest {
- private final String TEST_METRIC = "TestOperation";
+ private static final String TEST_METRIC = "TestOperation";
// This test case's leases last 2 seconds
private static final long LEASE_DURATION_MILLIS = 2000L;
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerIntegrationTest.java
index 7c884fd6..f179a073 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerIntegrationTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerIntegrationTest.java
@@ -36,7 +36,7 @@ import software.amazon.kinesis.metrics.NullMetricsFactory;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
@RunWith(MockitoJUnitRunner.class)
public class DynamoDBLeaseRenewerIntegrationTest extends LeaseIntegrationTest {
- private final String TEST_METRIC = "TestOperation";
+ private static final String TEST_METRIC = "TestOperation";
// This test case's leases last 2 seconds
private static final long LEASE_DURATION_MILLIS = 2000L;
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerTest.java
index bfff4e92..72379e88 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerTest.java
@@ -86,7 +86,7 @@ public class DynamoDBLeaseRenewerTest {
*/
Lease lease1 = newLease("1");
Lease lease2 = newLease("2");
- leasesToRenew = Arrays.asList(lease1,lease2);
+ leasesToRenew = Arrays.asList(lease1, lease2);
renewer.addLeasesToRenew(leasesToRenew);
doReturn(true).when(leaseRefresher).renewLease(lease1);
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerIntegrationTest.java
index 475f1940..772aa542 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerIntegrationTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerIntegrationTest.java
@@ -15,9 +15,7 @@
package software.amazon.kinesis.leases.dynamodb;
import java.util.Collection;
-import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -153,7 +151,6 @@ public class DynamoDBLeaseTakerIntegrationTest extends LeaseIntegrationTest {
assertThat(addedLeases.values().containsAll(allLeases), equalTo(true));
}
-
/**
* Sets the leaseDurationMillis to 0, ensuring a get request to update the existing lease after computing
* leases to take
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTaskTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTaskTest.java
index 06a72230..61473833 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTaskTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTaskTest.java
@@ -22,7 +22,6 @@ import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.List;
-import java.util.Optional;
import org.junit.Before;
import org.junit.Test;
@@ -43,7 +42,7 @@ public class BlockOnParentShardTaskTest {
private final String shardId = "shardId-97";
private final String streamId = "123:stream:146";
private final String concurrencyToken = "testToken";
- private final List emptyParentShardIds = new ArrayList();
+ private final List emptyParentShardIds = new ArrayList<>();
private ShardInfo shardInfo;
@Before
@@ -77,7 +76,6 @@ public class BlockOnParentShardTaskTest {
@Test
public final void testCallShouldNotThrowBlockedOnParentWhenParentsHaveFinished()
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
-
ShardInfo shardInfo = null;
BlockOnParentShardTask task = null;
String parent1ShardId = "shardId-1";
@@ -118,7 +116,6 @@ public class BlockOnParentShardTaskTest {
@Test
public final void testCallShouldNotThrowBlockedOnParentWhenParentsHaveFinishedMultiStream()
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
-
ShardInfo shardInfo = null;
BlockOnParentShardTask task = null;
String parent1LeaseKey = streamId + ":" + "shardId-1";
@@ -162,7 +159,6 @@ public class BlockOnParentShardTaskTest {
@Test
public final void testCallWhenParentsHaveNotFinished()
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
-
ShardInfo shardInfo = null;
BlockOnParentShardTask task = null;
String parent1ShardId = "shardId-1";
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java
index 09ba6ec9..4299c163 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java
@@ -35,7 +35,6 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
-import java.util.Optional;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -173,7 +172,6 @@ public class ShardConsumerSubscriberTest {
assertThat(subscriber.getAndResetDispatchFailure(), nullValue());
verify(shardConsumer, times(20)).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)), any(Subscription.class));
-
}
@Test
@@ -293,12 +291,10 @@ public class ShardConsumerSubscriberTest {
assertThat(received.size(), equalTo(recordsPublisher.responses.size()));
Stream.iterate(0, i -> i + 1).limit(received.size()).forEach(i -> assertThat(received.get(i),
eqProcessRecordsInput(recordsPublisher.responses.get(i).recordsRetrieved.processRecordsInput())));
-
}
@Test
public void restartAfterRequestTimerExpiresWhenNotGettingRecordsAfterInitialization() throws Exception {
-
executorService = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
.setNameFormat("test-" + testName.getMethodName() + "-%04d").setDaemon(true).build());
@@ -347,12 +343,10 @@ public class ShardConsumerSubscriberTest {
assertThat(received.size(), equalTo(recordsPublisher.responses.size()));
Stream.iterate(0, i -> i + 1).limit(received.size()).forEach(i -> assertThat(received.get(i),
eqProcessRecordsInput(recordsPublisher.responses.get(i).recordsRetrieved.processRecordsInput())));
-
}
@Test
public void restartAfterRequestTimerExpiresWhenInitialTaskExecutionIsRejected() throws Exception {
-
executorService = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
.setNameFormat("test-" + testName.getMethodName() + "-%04d").setDaemon(true).build());
@@ -405,7 +399,6 @@ public class ShardConsumerSubscriberTest {
assertThat(received.size(), equalTo(recordsPublisher.responses.size()));
Stream.iterate(0, i -> i + 1).limit(received.size()).forEach(i -> assertThat(received.get(i),
eqProcessRecordsInput(recordsPublisher.responses.get(i).recordsRetrieved.processRecordsInput())));
-
}
private Object directlyExecuteRunnable(InvocationOnMock invocation) {
@@ -623,8 +616,6 @@ public class ShardConsumerSubscriberTest {
/**
* Test to validate the warning message from ShardConsumer is not suppressed with the default configuration of 0
- *
- * @throws Exception
*/
@Test
public void noLoggingSuppressionNeededOnHappyPathTest() {
@@ -648,8 +639,6 @@ public class ShardConsumerSubscriberTest {
/**
* Test to validate the warning message from ShardConsumer is not suppressed with the default configuration of 0
- *
- * @throws Exception
*/
@Test
public void loggingNotSuppressedAfterTimeoutTest() {
@@ -677,8 +666,6 @@ public class ShardConsumerSubscriberTest {
/**
* Test to validate the warning message from ShardConsumer is successfully supressed if we only have intermittant
* readTimeouts.
- *
- * @throws Exception
*/
@Test
public void loggingSuppressedAfterIntermittentTimeoutTest() {
@@ -705,8 +692,6 @@ public class ShardConsumerSubscriberTest {
/**
* Test to validate the warning message from ShardConsumer is successfully logged if multiple sequential timeouts
* occur.
- *
- * @throws Exception
*/
@Test
public void loggingPartiallySuppressedAfterMultipleTimeoutTest() {
@@ -733,8 +718,6 @@ public class ShardConsumerSubscriberTest {
/**
* Test to validate the warning message from ShardConsumer is successfully logged if sequential timeouts occur.
- *
- * @throws Exception
*/
@Test
public void loggingPartiallySuppressedAfterConsecutiveTimeoutTest() {
@@ -763,8 +746,6 @@ public class ShardConsumerSubscriberTest {
/**
* Test to validate the non-timeout warning message from ShardConsumer is not suppressed with the default
* configuration of 0
- *
- * @throws Exception
*/
@Test
public void loggingNotSuppressedOnNonReadTimeoutExceptionNotIgnoringReadTimeoutsExceptionTest() {
@@ -792,12 +773,9 @@ public class ShardConsumerSubscriberTest {
/**
* Test to validate the non-timeout warning message from ShardConsumer is not suppressed with 2 ReadTimeouts to
* ignore
- *
- * @throws Exception
*/
@Test
public void loggingNotSuppressedOnNonReadTimeoutExceptionIgnoringReadTimeoutsTest() {
-
// We're not throwing a ReadTimeout, so no suppression is expected.
// The test expects a non-ReadTimeout exception to be thrown on requests 3 and 5, and we expect logs on
// each Non-ReadTimeout Exception, no matter what the number of ReadTimeoutsToIgnore we pass in,
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/metrics/EndingMetricsScopeTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/metrics/EndingMetricsScopeTest.java
index 2a32764d..a3d792ae 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/metrics/EndingMetricsScopeTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/metrics/EndingMetricsScopeTest.java
@@ -17,7 +17,6 @@ package software.amazon.kinesis.metrics;
import org.junit.Test;
import software.amazon.awssdk.services.cloudwatch.model.StandardUnit;
-import software.amazon.kinesis.metrics.EndingMetricsScope;
public class EndingMetricsScopeTest {
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/metrics/MetricAccumulatingQueueTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/metrics/MetricAccumulatingQueueTest.java
index 18bba742..0354a214 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/metrics/MetricAccumulatingQueueTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/metrics/MetricAccumulatingQueueTest.java
@@ -47,8 +47,8 @@ public class MetricAccumulatingQueueTest {
*/
@Test
public void testAccumulation() {
- Collection dimensionsA = Collections.singleton(dim("name","a"));
- Collection dimensionsB = Collections.singleton(dim("name","b"));
+ Collection dimensionsA = Collections.singleton(dim("name", "a"));
+ Collection dimensionsB = Collections.singleton(dim("name", "b"));
String keyA = "a";
String keyB = "b";
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/AWSExceptionManagerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/AWSExceptionManagerTest.java
index 8319a0ac..030979df 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/AWSExceptionManagerTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/AWSExceptionManagerTest.java
@@ -27,31 +27,28 @@ import static org.junit.Assert.assertThat;
@Slf4j
public class AWSExceptionManagerTest {
+ private static final String EXPECTED_HANDLING_MARKER = AWSExceptionManagerTest.class.getSimpleName();
+
+ private final AWSExceptionManager manager = new AWSExceptionManager();
+
@Test
public void testSpecificException() {
- AWSExceptionManager manager = new AWSExceptionManager();
- final String EXPECTED_HANDLING_MARKER = "Handled-TestException";
-
manager.add(TestException.class, t -> {
log.info("Handling test exception: {} -> {}", t.getMessage(), t.getAdditionalMessage());
return new RuntimeException(EXPECTED_HANDLING_MARKER, t);
});
- TestException te = new TestException("Main Mesage", "Sub Message");
-
+ TestException te = new TestException("Main Message", "Sub Message");
RuntimeException converted = manager.apply(te);
assertThat(converted, isA(RuntimeException.class));
assertThat(converted.getMessage(), equalTo(EXPECTED_HANDLING_MARKER));
assertThat(converted.getCause(), equalTo(te));
-
}
@Test
public void testParentException() {
- AWSExceptionManager manager = new AWSExceptionManager();
- final String EXPECTED_HANDLING_MARKER = "Handled-IllegalStateException";
manager.add(IllegalArgumentException.class, i -> new RuntimeException("IllegalArgument", i));
manager.add(Exception.class, i -> new RuntimeException("RawException", i));
manager.add(IllegalStateException.class, i -> new RuntimeException(EXPECTED_HANDLING_MARKER, i));
@@ -66,8 +63,7 @@ public class AWSExceptionManagerTest {
@Test
public void testDefaultHandler() {
- final String EXPECTED_HANDLING_MARKER = "Handled-Default";
- AWSExceptionManager manager = new AWSExceptionManager().defaultFunction(i -> new RuntimeException(EXPECTED_HANDLING_MARKER, i));
+ manager.defaultFunction(i -> new RuntimeException(EXPECTED_HANDLING_MARKER, i));
manager.add(IllegalArgumentException.class, i -> new RuntimeException("IllegalArgument", i));
manager.add(Exception.class, i -> new RuntimeException("RawException", i));
@@ -83,8 +79,6 @@ public class AWSExceptionManagerTest {
@Test
public void testIdHandler() {
- AWSExceptionManager manager = new AWSExceptionManager();
-
manager.add(IllegalArgumentException.class, i -> new RuntimeException("IllegalArgument", i));
manager.add(Exception.class, i -> new RuntimeException("RawException", i));
manager.add(IllegalStateException.class, i -> i);
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/ThrottlingReporterTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/ThrottlingReporterTest.java
index eec5ea9e..f13f0ad0 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/ThrottlingReporterTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/ThrottlingReporterTest.java
@@ -24,7 +24,6 @@ import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import org.slf4j.Logger;
-import software.amazon.kinesis.retrieval.ThrottlingReporter;
@RunWith(MockitoJUnitRunner.class)
public class ThrottlingReporterTest {
@@ -40,7 +39,6 @@ public class ThrottlingReporterTest {
reporter.throttled();
verify(throttleLog).warn(anyString());
verify(throttleLog, never()).error(anyString());
-
}
@Test
@@ -63,7 +61,6 @@ public class ThrottlingReporterTest {
reporter.throttled();
verify(throttleLog, times(2)).warn(anyString());
verify(throttleLog, times(3)).error(anyString());
-
}
private class LogTestingThrottingReporter extends ThrottlingReporter {
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutConsumerRegistrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutConsumerRegistrationTest.java
index 245e22d5..fca6799d 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutConsumerRegistrationTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutConsumerRegistrationTest.java
@@ -19,7 +19,6 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -28,7 +27,6 @@ import static org.mockito.Mockito.when;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.StringUtils;
-import org.hamcrest.Matchers;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java
index 40d86c49..9615794b 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java
@@ -8,7 +8,6 @@ import io.reactivex.rxjava3.schedulers.Schedulers;
import io.reactivex.rxjava3.subscribers.SafeSubscriber;
import lombok.Data;
import lombok.RequiredArgsConstructor;
-import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
@@ -54,7 +53,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@@ -77,7 +75,6 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.argThat;
-import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.never;
@@ -172,11 +169,10 @@ public class FanOutRecordsPublisherTest {
assertThat(clientRecordsList.get(i), matchers.get(i));
}
});
-
}
@Test
- public void InvalidEventTest() throws Exception {
+ public void testInvalidEvent() {
FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN);
ArgumentCaptor captor = ArgumentCaptor
@@ -239,7 +235,6 @@ public class FanOutRecordsPublisherTest {
assertThat(clientRecordsList.get(i), matchers.get(i));
}
});
-
}
@Test
@@ -317,11 +312,10 @@ public class FanOutRecordsPublisherTest {
});
assertThat(source.getCurrentSequenceNumber(), equalTo("3000"));
-
}
@Test
- public void testIfEventsAreNotDeliveredToShardConsumerWhenPreviousEventDeliveryTaskGetsRejected() throws Exception {
+ public void testIfEventsAreNotDeliveredToShardConsumerWhenPreviousEventDeliveryTaskGetsRejected() {
FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN);
ArgumentCaptor captor = ArgumentCaptor
@@ -395,7 +389,6 @@ public class FanOutRecordsPublisherTest {
});
assertThat(source.getCurrentSequenceNumber(), equalTo("1000"));
-
}
@Test
@@ -443,10 +436,11 @@ public class FanOutRecordsPublisherTest {
@Override public void onNext(RecordsRetrieved input) {
receivedInput.add(input.processRecordsInput());
- assertEquals("" + ++lastSeenSeqNum, ((FanOutRecordsPublisher.FanoutRecordsRetrieved)input).continuationSequenceNumber());
+ assertEquals("" + ++lastSeenSeqNum,
+ ((FanOutRecordsPublisher.FanoutRecordsRetrieved) input).continuationSequenceNumber());
subscription.request(1);
servicePublisher.request(1);
- if(receivedInput.size() == totalServicePublisherEvents) {
+ if (receivedInput.size() == totalServicePublisherEvents) {
servicePublisherTaskCompletionLatch.countDown();
}
}
@@ -488,12 +482,10 @@ public class FanOutRecordsPublisherTest {
});
assertThat(source.getCurrentSequenceNumber(), equalTo(totalServicePublisherEvents + ""));
-
}
@Test
public void testIfStreamOfEventsAndOnCompleteAreDeliveredInOrderWithBackpressureAdheringServicePublisher() throws Exception {
-
CountDownLatch onS2SCallLatch = new CountDownLatch(2);
doAnswer(new Answer() {
@@ -549,10 +541,11 @@ public class FanOutRecordsPublisherTest {
@Override public void onNext(RecordsRetrieved input) {
receivedInput.add(input.processRecordsInput());
- assertEquals("" + ++lastSeenSeqNum, ((FanOutRecordsPublisher.FanoutRecordsRetrieved)input).continuationSequenceNumber());
+ assertEquals("" + ++lastSeenSeqNum,
+ ((FanOutRecordsPublisher.FanoutRecordsRetrieved) input).continuationSequenceNumber());
subscription.request(1);
servicePublisher.request(1);
- if(receivedInput.size() == triggerCompleteAtNthEvent) {
+ if (receivedInput.size() == triggerCompleteAtNthEvent) {
servicePublisherTaskCompletionLatch.countDown();
}
}
@@ -599,7 +592,6 @@ public class FanOutRecordsPublisherTest {
// Let's wait for sometime to allow the publisher to re-subscribe
onS2SCallLatch.await(5000, TimeUnit.MILLISECONDS);
verify(kinesisClient, times(2)).subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture());
-
}
@Test
@@ -681,7 +673,7 @@ public class FanOutRecordsPublisherTest {
receivedInput.add(input.processRecordsInput());
subscription.request(1);
servicePublisher.request(1);
- if(receivedInput.size() == triggerCompleteAtNthEvent) {
+ if (receivedInput.size() == triggerCompleteAtNthEvent) {
servicePublisherTaskCompletionLatch.countDown();
}
}
@@ -728,7 +720,6 @@ public class FanOutRecordsPublisherTest {
// With shard end event, onComplete must be propagated to the subscriber.
onCompleteLatch.await(5000, TimeUnit.MILLISECONDS);
assertTrue("OnComplete should be triggered", isOnCompleteTriggered[0]);
-
}
@Test
@@ -783,10 +774,11 @@ public class FanOutRecordsPublisherTest {
@Override public void onNext(RecordsRetrieved input) {
receivedInput.add(input.processRecordsInput());
- assertEquals("" + ++lastSeenSeqNum, ((FanOutRecordsPublisher.FanoutRecordsRetrieved)input).continuationSequenceNumber());
+ assertEquals("" + ++lastSeenSeqNum,
+ ((FanOutRecordsPublisher.FanoutRecordsRetrieved) input).continuationSequenceNumber());
subscription.request(1);
servicePublisher.request(1);
- if(receivedInput.size() == triggerErrorAtNthEvent) {
+ if (receivedInput.size() == triggerErrorAtNthEvent) {
servicePublisherTaskCompletionLatch.countDown();
}
}
@@ -831,7 +823,6 @@ public class FanOutRecordsPublisherTest {
assertThat(source.getCurrentSequenceNumber(), equalTo(triggerErrorAtNthEvent + ""));
onErrorReceiveLatch.await(5000, TimeUnit.MILLISECONDS);
assertTrue("OnError should have been thrown", isOnErrorThrown[0]);
-
}
@Test
@@ -879,10 +870,11 @@ public class FanOutRecordsPublisherTest {
@Override public void onNext(RecordsRetrieved input) {
receivedInput.add(input.processRecordsInput());
- assertEquals("" + ++lastSeenSeqNum, ((FanOutRecordsPublisher.FanoutRecordsRetrieved)input).continuationSequenceNumber());
+ assertEquals("" + ++lastSeenSeqNum,
+ ((FanOutRecordsPublisher.FanoutRecordsRetrieved) input).continuationSequenceNumber());
subscription.request(1);
servicePublisher.request(1);
- if(receivedInput.size() == totalServicePublisherEvents) {
+ if (receivedInput.size() == totalServicePublisherEvents) {
servicePublisherTaskCompletionLatch.countDown();
}
}
@@ -924,7 +916,6 @@ public class FanOutRecordsPublisherTest {
});
assertThat(source.getCurrentSequenceNumber(), equalTo(totalServicePublisherEvents + ""));
-
}
@Test
@@ -973,7 +964,8 @@ public class FanOutRecordsPublisherTest {
@Override public void onNext(RecordsRetrieved input) {
receivedInput.add(input.processRecordsInput());
- assertEquals("" + ++lastSeenSeqNum, ((FanOutRecordsPublisher.FanoutRecordsRetrieved)input).continuationSequenceNumber());
+ assertEquals("" + ++lastSeenSeqNum,
+ ((FanOutRecordsPublisher.FanoutRecordsRetrieved) input).continuationSequenceNumber());
subscription.request(1);
servicePublisher.request(1);
}
@@ -1126,7 +1118,6 @@ public class FanOutRecordsPublisherTest {
assertThat(clientRecordsList.get(i), matchers.get(i));
}
});
-
}
@Test
@@ -1242,7 +1233,6 @@ public class FanOutRecordsPublisherTest {
verifyRecords(nonFailingSubscriber.received.get(0).records(), matchers);
verifyRecords(nonFailingSubscriber.received.get(1).records(), nextMatchers);
-
}
@Test
@@ -1328,7 +1318,7 @@ public class FanOutRecordsPublisherTest {
fanOutRecordsPublisher
.evictAckedEventAndScheduleNextEvent(() -> recordsRetrieved.batchUniqueIdentifier());
// Send stale event periodically
- if(totalRecordsRetrieved[0] % 10 == 0) {
+ if (totalRecordsRetrieved[0] % 10 == 0) {
fanOutRecordsPublisher.evictAckedEventAndScheduleNextEvent(
() -> new BatchUniqueIdentifier("some_uuid_str", "some_old_flow"));
}
@@ -1368,7 +1358,7 @@ public class FanOutRecordsPublisherTest {
int count = 0;
// Now that we allowed upto 10 elements queued up, send a pair of good and stale ack to verify records
// delivered as expected.
- while(count++ < 10 && (batchUniqueIdentifierQueued = ackQueue.take()) != null) {
+ while (count++ < 10 && (batchUniqueIdentifierQueued = ackQueue.take()) != null) {
final BatchUniqueIdentifier batchUniqueIdentifierFinal = batchUniqueIdentifierQueued;
fanOutRecordsPublisher
.evictAckedEventAndScheduleNextEvent(() -> batchUniqueIdentifierFinal);
@@ -1403,7 +1393,7 @@ public class FanOutRecordsPublisherTest {
int count = 0;
// Now that we allowed upto 10 elements queued up, send a pair of good and stale ack to verify records
// delivered as expected.
- while(count++ < 2 && (batchUniqueIdentifierQueued = ackQueue.poll(1000, TimeUnit.MILLISECONDS)) != null) {
+ while (count++ < 2 && (batchUniqueIdentifierQueued = ackQueue.poll(1000, TimeUnit.MILLISECONDS)) != null) {
final BatchUniqueIdentifier batchUniqueIdentifierFinal = batchUniqueIdentifierQueued;
fanOutRecordsPublisher.evictAckedEventAndScheduleNextEvent(
() -> new BatchUniqueIdentifier("some_uuid_str", batchUniqueIdentifierFinal.getFlowIdentifier()));
@@ -1457,11 +1447,11 @@ public class FanOutRecordsPublisherTest {
flowCaptor.getValue().exceptionOccurred(exception);
- Optional onErrorEvent = subscriber.events.stream().filter(e -> e instanceof OnErrorEvent).map(e -> (OnErrorEvent)e).findFirst();
+ Optional onErrorEvent = subscriber.events.stream().filter(e -> e instanceof OnErrorEvent)
+ .map(e -> (OnErrorEvent) e).findFirst();
assertThat(onErrorEvent, equalTo(Optional.of(new OnErrorEvent(exception))));
assertThat(acquireTimeoutLogged.get(), equalTo(true));
-
}
private void verifyRecords(List clientRecordsList, List matchers) {
@@ -1587,8 +1577,8 @@ public class FanOutRecordsPublisherTest {
public void run() {
for (int i = 1; i <= numOfTimes; ) {
demandNotifier.acquireUninterruptibly();
- if(i == sendCompletionAt) {
- if(shardEndAction != null) {
+ if (i == sendCompletionAt) {
+ if (shardEndAction != null) {
shardEndAction.accept(i++);
} else {
action.accept(i++);
@@ -1596,7 +1586,7 @@ public class FanOutRecordsPublisherTest {
completeAction.run();
break;
}
- if(i == sendErrorAt) {
+ if (i == sendErrorAt) {
action.accept(i++);
errorAction.run();
break;
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcherTest.java
index 2e09f34a..4ac8bbf7 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcherTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcherTest.java
@@ -331,7 +331,7 @@ public class KinesisDataFetcherTest {
private CompletableFuture makeGetRecordsResponse(String nextIterator, List records) {
List childShards = new ArrayList<>();
- if(nextIterator == null) {
+ if (nextIterator == null) {
childShards = createChildShards();
}
return CompletableFuture.completedFuture(GetRecordsResponse.builder().nextShardIterator(nextIterator)
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherIntegrationTest.java
index 5d757a6c..d9955da4 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherIntegrationTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherIntegrationTest.java
@@ -22,7 +22,6 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
@@ -277,7 +276,8 @@ public class PrefetchRecordsPublisherIntegrationTest {
@Override
public DataFetcherResult getRecords() {
- GetRecordsResponse getRecordsResult = GetRecordsResponse.builder().records(new ArrayList<>(records)).nextShardIterator(nextShardIterator).millisBehindLatest(1000L).build();
+ GetRecordsResponse getRecordsResult = GetRecordsResponse.builder().records(new ArrayList<>(records))
+ .nextShardIterator(nextShardIterator).millisBehindLatest(1000L).build();
return new AdvancingResult(getRecordsResult);
}
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java
index 74707eb4..af02469a 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java
@@ -327,7 +327,7 @@ public class PrefetchRecordsPublisherTest {
// TODO: fix this verification
// verify(getRecordsRetrievalStrategy, times(callRate)).getRecords(MAX_RECORDS_PER_CALL);
// assertEquals(spyQueue.size(), callRate);
- assertTrue("Call Rate is "+callRate,callRate < MAX_SIZE);
+ assertTrue("Call Rate is " + callRate, callRate < MAX_SIZE);
}
@Test
@@ -422,8 +422,10 @@ public class PrefetchRecordsPublisherTest {
@Test
public void testRetryableRetrievalExceptionContinues() {
- GetRecordsResponse response = GetRecordsResponse.builder().millisBehindLatest(100L).records(Collections.emptyList()).nextShardIterator(NEXT_SHARD_ITERATOR).build();
- when(getRecordsRetrievalStrategy.getRecords(anyInt())).thenThrow(new RetryableRetrievalException("Timeout", new TimeoutException("Timeout"))).thenReturn(response);
+ GetRecordsResponse response = GetRecordsResponse.builder().millisBehindLatest(100L)
+ .records(Collections.emptyList()).nextShardIterator(NEXT_SHARD_ITERATOR).build();
+ when(getRecordsRetrievalStrategy.getRecords(anyInt()))
+ .thenThrow(new RetryableRetrievalException("Timeout", new TimeoutException("Timeout"))).thenReturn(response);
getRecordsCache.start(sequenceNumber, initialPosition);
@@ -638,7 +640,7 @@ public class PrefetchRecordsPublisherTest {
verify(getRecordsRetrievalStrategy, atLeast(2)).getRecords(anyInt());
- while(getRecordsCache.getPublisherSession().prefetchRecordsQueue().remainingCapacity() > 0) {
+ while (getRecordsCache.getPublisherSession().prefetchRecordsQueue().remainingCapacity() > 0) {
Thread.yield();
}
@@ -697,7 +699,7 @@ public class PrefetchRecordsPublisherTest {
public void resetIteratorTo(String nextIterator) {
Iterator newIterator = responses.iterator();
- while(newIterator.hasNext()) {
+ while (newIterator.hasNext()) {
GetRecordsResponse current = newIterator.next();
if (StringUtils.equals(nextIterator, current.nextShardIterator())) {
if (!newIterator.hasNext()) {
@@ -725,7 +727,7 @@ public class PrefetchRecordsPublisherTest {
private static final int LOSS_EVERY_NTH_RECORD = 50;
private static int recordCounter = 0;
- private static final ScheduledExecutorService consumerHealthChecker = Executors.newScheduledThreadPool(1);
+ private static final ScheduledExecutorService CONSUMER_HEALTH_CHECKER = Executors.newScheduledThreadPool(1);
public LossyNotificationSubscriber(Subscriber delegate, RecordsPublisher recordsPublisher) {
super(delegate, recordsPublisher);
@@ -738,7 +740,7 @@ public class PrefetchRecordsPublisherTest {
getDelegateSubscriber().onNext(recordsRetrieved);
} else {
log.info("Record Loss Triggered");
- consumerHealthChecker.schedule(() -> {
+ CONSUMER_HEALTH_CHECKER.schedule(() -> {
getRecordsPublisher().restartFrom(recordsRetrieved);
Flowable.fromPublisher(getRecordsPublisher()).subscribeOn(Schedulers.computation())
.observeOn(Schedulers.computation(), true, 8).subscribe(this);
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/AWSResourceManager.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/AWSResourceManager.java
index 3314f922..4a6bcfaf 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/AWSResourceManager.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/AWSResourceManager.java
@@ -2,14 +2,8 @@ package software.amazon.kinesis.utils;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import software.amazon.awssdk.services.dynamodb.model.ListTablesRequest;
-import software.amazon.awssdk.services.dynamodb.model.ListTablesResponse;
-import software.amazon.kinesis.common.FutureUtils;
-import java.time.Duration;
-import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@Slf4j
@@ -28,7 +22,7 @@ public abstract class AWSResourceManager {
/**
* Get a list of all the names of resources of a specified type
- * @return
+ *
* @throws Exception
*/
public abstract List getAllResourceNames() throws Exception;
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/BlockingUtils.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/BlockingUtils.java
index 0d68e51b..cd7ad8a6 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/BlockingUtils.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/BlockingUtils.java
@@ -21,7 +21,7 @@ public class BlockingUtils {
public static Records blockUntilRecordsAvailable(Supplier recordsSupplier, long timeoutMillis) {
Records recordsRetrieved;
- while((recordsRetrieved = recordsSupplier.get()) == null && timeoutMillis > 0 ) {
+ while ((recordsRetrieved = recordsSupplier.get()) == null && timeoutMillis > 0 ) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
@@ -29,7 +29,7 @@ public class BlockingUtils {
}
timeoutMillis -= 100;
}
- if(recordsRetrieved != null) {
+ if (recordsRetrieved != null) {
return recordsRetrieved;
} else {
throw new RuntimeException("No records found");
@@ -37,7 +37,7 @@ public class BlockingUtils {
}
public static boolean blockUntilConditionSatisfied(Supplier conditionSupplier, long timeoutMillis) {
- while(!conditionSupplier.get() && timeoutMillis > 0 ) {
+ while (!conditionSupplier.get() && timeoutMillis > 0 ) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/LeaseTableManager.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/LeaseTableManager.java
index e8d1cb05..40d711bd 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/LeaseTableManager.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/LeaseTableManager.java
@@ -12,8 +12,6 @@ import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
import software.amazon.awssdk.services.dynamodb.model.TableStatus;
import software.amazon.kinesis.common.FutureUtils;
-import java.io.IOException;
-import java.net.URISyntaxException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/SubscribeToShardRequestMatcher.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/SubscribeToShardRequestMatcher.java
index d120d95a..43c887a3 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/SubscribeToShardRequestMatcher.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/SubscribeToShardRequestMatcher.java
@@ -12,7 +12,7 @@ public class SubscribeToShardRequestMatcher extends ArgumentMatcher
+
+
+
+
+
\ No newline at end of file
diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml
new file mode 100644
index 00000000..76c4b330
--- /dev/null
+++ b/checkstyle/checkstyle.xml
@@ -0,0 +1,50 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/pom.xml b/pom.xml
index 7ebd6e54..dcebcb9c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -22,7 +22,7 @@
amazon-kinesis-client-pom
pom
Amazon Kinesis Client Library
- 2.5.1-SNAPSHOT
+ 2.5.1
The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data
from Amazon Kinesis.
@@ -72,6 +72,28 @@
+
+
+ org.apache.maven.plugins
+ maven-checkstyle-plugin
+ 3.3.0
+
+ checkstyle/checkstyle.xml
+ true
+ true
+ true
+ checkstyle/checkstyle-suppressions.xml
+
+
+
+ validate
+
+ check
+
+
+
+
+