Checkstyle: added UnusedImports check. (#1153)
This commit is contained in:
parent
74d8f4b780
commit
768f6a36bb
27 changed files with 12 additions and 109 deletions
|
|
@ -19,7 +19,6 @@ import java.util.concurrent.ExecutorService;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import software.amazon.kinesis.coordinator.KinesisClientLibConfiguration;
|
|
||||||
import software.amazon.kinesis.multilang.config.MultiLangDaemonConfiguration;
|
import software.amazon.kinesis.multilang.config.MultiLangDaemonConfiguration;
|
||||||
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
|
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
|
||||||
import software.amazon.kinesis.processor.ShardRecordProcessor;
|
import software.amazon.kinesis.processor.ShardRecordProcessor;
|
||||||
|
|
|
||||||
|
|
@ -22,8 +22,6 @@ import java.util.List;
|
||||||
import com.amazonaws.auth.AWSCredentialsProvider;
|
import com.amazonaws.auth.AWSCredentialsProvider;
|
||||||
import com.amazonaws.auth.AWSCredentialsProviderChain;
|
import com.amazonaws.auth.AWSCredentialsProviderChain;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
|
|
||||||
import software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get AWSCredentialsProvider property.
|
* Get AWSCredentialsProvider property.
|
||||||
|
|
|
||||||
|
|
@ -20,7 +20,6 @@ import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
import java.util.function.Supplier;
|
|
||||||
|
|
||||||
import lombok.Getter;
|
import lombok.Getter;
|
||||||
import org.apache.commons.beanutils.ConvertUtilsBean;
|
import org.apache.commons.beanutils.ConvertUtilsBean;
|
||||||
|
|
@ -150,7 +149,6 @@ public class BuilderDynaBean implements DynaBean {
|
||||||
} else {
|
} else {
|
||||||
return expected.cast(dynaBeanCreateSupport.build());
|
return expected.cast(dynaBeanCreateSupport.build());
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void validateResolvedEmptyHandler() {
|
private void validateResolvedEmptyHandler() {
|
||||||
|
|
|
||||||
|
|
@ -25,7 +25,6 @@ import org.apache.commons.beanutils.ConvertUtilsBean;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.apache.commons.lang3.Validate;
|
import org.apache.commons.lang3.Validate;
|
||||||
import software.amazon.awssdk.arns.Arn;
|
import software.amazon.awssdk.arns.Arn;
|
||||||
import software.amazon.awssdk.regions.Region;
|
|
||||||
import software.amazon.kinesis.common.StreamIdentifier;
|
import software.amazon.kinesis.common.StreamIdentifier;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -42,7 +41,6 @@ public class KinesisClientLibConfigurator {
|
||||||
private final BeanUtilsBean utilsBean;
|
private final BeanUtilsBean utilsBean;
|
||||||
private final MultiLangDaemonConfiguration configuration;
|
private final MultiLangDaemonConfiguration configuration;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor.
|
* Constructor.
|
||||||
*/
|
*/
|
||||||
|
|
@ -78,7 +76,6 @@ public class KinesisClientLibConfigurator {
|
||||||
//Parse out the stream Name from the Arn (and/or override existing value for Stream Name)
|
//Parse out the stream Name from the Arn (and/or override existing value for Stream Name)
|
||||||
final String streamNameFromArn = streamArnObj.resource().resource();
|
final String streamNameFromArn = streamArnObj.resource().resource();
|
||||||
configuration.setStreamName(streamNameFromArn);
|
configuration.setStreamName(streamNameFromArn);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Validate.notBlank(configuration.getStreamName(), "Stream name or Stream Arn is required. Stream Arn takes precedence if both are passed in.");
|
Validate.notBlank(configuration.getStreamName(), "Stream name or Stream Arn is required. Stream Arn takes precedence if both are passed in.");
|
||||||
|
|
@ -109,5 +106,4 @@ public class KinesisClientLibConfigurator {
|
||||||
return getConfiguration(properties);
|
return getConfiguration(properties);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
}
|
||||||
}
|
|
||||||
|
|
|
||||||
|
|
@ -158,7 +158,6 @@ public class MultiLangDaemonConfiguration {
|
||||||
metricsEnabledDimensions = new HashSet<>(Arrays.asList(dimensions));
|
metricsEnabledDimensions = new HashSet<>(Arrays.asList(dimensions));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private RetrievalMode retrievalMode = RetrievalMode.DEFAULT;
|
private RetrievalMode retrievalMode = RetrievalMode.DEFAULT;
|
||||||
|
|
||||||
private final FanoutConfigBean fanoutConfig = new FanoutConfigBean();
|
private final FanoutConfigBean fanoutConfig = new FanoutConfigBean();
|
||||||
|
|
@ -170,7 +169,6 @@ public class MultiLangDaemonConfiguration {
|
||||||
private long shutdownGraceMillis;
|
private long shutdownGraceMillis;
|
||||||
private Integer timeoutInSeconds;
|
private Integer timeoutInSeconds;
|
||||||
|
|
||||||
|
|
||||||
private final BuilderDynaBean kinesisCredentialsProvider;
|
private final BuilderDynaBean kinesisCredentialsProvider;
|
||||||
|
|
||||||
public void setAWSCredentialsProvider(String providerString) {
|
public void setAWSCredentialsProvider(String providerString) {
|
||||||
|
|
|
||||||
|
|
@ -51,8 +51,6 @@ public class MessageWriterTest {
|
||||||
@Rule
|
@Rule
|
||||||
public final ExpectedException thrown = ExpectedException.none();
|
public final ExpectedException thrown = ExpectedException.none();
|
||||||
|
|
||||||
// ExecutorService executor;
|
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setup() {
|
public void setup() {
|
||||||
stream = Mockito.mock(OutputStream.class);
|
stream = Mockito.mock(OutputStream.class);
|
||||||
|
|
|
||||||
|
|
@ -103,7 +103,7 @@ public class MultiLangProtocolTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void initializeTest() throws InterruptedException, ExecutionException {
|
public void testInitialize() {
|
||||||
when(messageWriter
|
when(messageWriter
|
||||||
.writeInitializeMessage(argThat(Matchers.withInit(InitializationInput.builder()
|
.writeInitializeMessage(argThat(Matchers.withInit(InitializationInput.builder()
|
||||||
.shardId(shardId).build())))).thenReturn(buildFuture(true));
|
.shardId(shardId).build())))).thenReturn(buildFuture(true));
|
||||||
|
|
@ -113,7 +113,7 @@ public class MultiLangProtocolTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void processRecordsTest() throws InterruptedException, ExecutionException {
|
public void testProcessRecords() {
|
||||||
when(messageWriter.writeProcessRecordsMessage(any(ProcessRecordsInput.class))).thenReturn(buildFuture(true));
|
when(messageWriter.writeProcessRecordsMessage(any(ProcessRecordsInput.class))).thenReturn(buildFuture(true));
|
||||||
when(messageReader.getNextMessageFromSTDOUT()).thenReturn(buildFuture(
|
when(messageReader.getNextMessageFromSTDOUT()).thenReturn(buildFuture(
|
||||||
new StatusMessage("processRecords"), Message.class));
|
new StatusMessage("processRecords"), Message.class));
|
||||||
|
|
@ -128,7 +128,6 @@ public class MultiLangProtocolTest {
|
||||||
when(messageReader.getNextMessageFromSTDOUT()).thenReturn(buildFuture(new StatusMessage(LeaseLostMessage.ACTION), Message.class));
|
when(messageReader.getNextMessageFromSTDOUT()).thenReturn(buildFuture(new StatusMessage(LeaseLostMessage.ACTION), Message.class));
|
||||||
|
|
||||||
assertThat(protocol.leaseLost(LeaseLostInput.builder().build()), equalTo(true));
|
assertThat(protocol.leaseLost(LeaseLostInput.builder().build()), equalTo(true));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
@ -174,7 +173,7 @@ public class MultiLangProtocolTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void processRecordsWithCheckpointsTest() throws InterruptedException, ExecutionException,
|
public void testProcessRecordsWithCheckpoints() throws
|
||||||
KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
|
KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
|
||||||
|
|
||||||
when(messageWriter.writeProcessRecordsMessage(any(ProcessRecordsInput.class))).thenReturn(buildFuture(true));
|
when(messageWriter.writeProcessRecordsMessage(any(ProcessRecordsInput.class))).thenReturn(buildFuture(true));
|
||||||
|
|
@ -203,7 +202,7 @@ public class MultiLangProtocolTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void processRecordsWithABadCheckpointTest() throws InterruptedException, ExecutionException {
|
public void testProcessRecordsWithABadCheckpoint() {
|
||||||
when(messageWriter.writeProcessRecordsMessage(any(ProcessRecordsInput.class))).thenReturn(buildFuture(true));
|
when(messageWriter.writeProcessRecordsMessage(any(ProcessRecordsInput.class))).thenReturn(buildFuture(true));
|
||||||
when(messageWriter.writeCheckpointMessageWithError(anyString(), anyLong(), any(Throwable.class))).thenReturn(buildFuture(false));
|
when(messageWriter.writeCheckpointMessageWithError(anyString(), anyLong(), any(Throwable.class))).thenReturn(buildFuture(false));
|
||||||
when(messageReader.getNextMessageFromSTDOUT()).thenAnswer(buildMessageAnswers(new ArrayList<Message>() {
|
when(messageReader.getNextMessageFromSTDOUT()).thenAnswer(buildMessageAnswers(new ArrayList<Message>() {
|
||||||
|
|
|
||||||
|
|
@ -23,8 +23,6 @@ import lombok.Setter;
|
||||||
import lombok.experimental.Accessors;
|
import lombok.experimental.Accessors;
|
||||||
import org.apache.commons.lang3.Validate;
|
import org.apache.commons.lang3.Validate;
|
||||||
|
|
||||||
import java.util.Objects;
|
|
||||||
|
|
||||||
import static com.google.common.base.Verify.verifyNotNull;
|
import static com.google.common.base.Verify.verifyNotNull;
|
||||||
|
|
||||||
@Setter
|
@Setter
|
||||||
|
|
|
||||||
|
|
@ -15,7 +15,6 @@
|
||||||
package software.amazon.kinesis.metrics;
|
package software.amazon.kinesis.metrics;
|
||||||
|
|
||||||
import lombok.AllArgsConstructor;
|
import lombok.AllArgsConstructor;
|
||||||
import lombok.Data;
|
|
||||||
import lombok.Setter;
|
import lombok.Setter;
|
||||||
import lombok.experimental.Accessors;
|
import lombok.experimental.Accessors;
|
||||||
import software.amazon.awssdk.services.cloudwatch.model.MetricDatum;
|
import software.amazon.awssdk.services.cloudwatch.model.MetricDatum;
|
||||||
|
|
@ -29,11 +28,11 @@ import java.util.Objects;
|
||||||
* @param <KeyType> is a class that stores information about a MetricDatum. This is useful
|
* @param <KeyType> is a class that stores information about a MetricDatum. This is useful
|
||||||
* to compare MetricDatums, aggregate similar MetricDatums or store information about a datum
|
* to compare MetricDatums, aggregate similar MetricDatums or store information about a datum
|
||||||
* that may be relevant to the user (i.e. MetricName, CustomerId, TimeStamp, etc).
|
* that may be relevant to the user (i.e. MetricName, CustomerId, TimeStamp, etc).
|
||||||
*
|
*
|
||||||
* Example:
|
* Example:
|
||||||
*
|
*
|
||||||
* Let SampleMetricKey be a KeyType that takes in the time in which the datum was created.
|
* Let SampleMetricKey be a KeyType that takes in the time in which the datum was created.
|
||||||
*
|
*
|
||||||
* MetricDatumWithKey<SampleMetricKey> sampleDatumWithKey = new MetricDatumWithKey<SampleMetricKey>(new
|
* MetricDatumWithKey<SampleMetricKey> sampleDatumWithKey = new MetricDatumWithKey<SampleMetricKey>(new
|
||||||
* SampleMetricKey(System.currentTimeMillis()), datum)
|
* SampleMetricKey(System.currentTimeMillis()), datum)
|
||||||
*/
|
*/
|
||||||
|
|
|
||||||
|
|
@ -27,14 +27,12 @@ import org.reactivestreams.Subscriber;
|
||||||
import org.reactivestreams.Subscription;
|
import org.reactivestreams.Subscription;
|
||||||
import software.amazon.awssdk.core.async.SdkPublisher;
|
import software.amazon.awssdk.core.async.SdkPublisher;
|
||||||
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
|
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
|
||||||
import software.amazon.awssdk.services.kinesis.model.ChildShard;
|
|
||||||
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
|
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
|
||||||
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
|
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
|
||||||
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream;
|
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream;
|
||||||
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
|
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
|
||||||
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponse;
|
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponse;
|
||||||
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
|
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
|
||||||
import software.amazon.awssdk.utils.CollectionUtils;
|
|
||||||
import software.amazon.awssdk.utils.Either;
|
import software.amazon.awssdk.utils.Either;
|
||||||
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
|
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
|
||||||
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
|
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
|
||||||
|
|
@ -117,7 +115,6 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
|
||||||
this.currentSequenceNumber = extendedSequenceNumber.sequenceNumber();
|
this.currentSequenceNumber = extendedSequenceNumber.sequenceNumber();
|
||||||
this.isFirstConnection = true;
|
this.isFirstConnection = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
@ -276,7 +273,6 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
|
||||||
SubscriptionShutdownEvent(Runnable subscriptionShutdownAction, String eventIdentifier) {
|
SubscriptionShutdownEvent(Runnable subscriptionShutdownAction, String eventIdentifier) {
|
||||||
this(subscriptionShutdownAction, eventIdentifier, null);
|
this(subscriptionShutdownAction, eventIdentifier, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean hasValidSubscriber() {
|
private boolean hasValidSubscriber() {
|
||||||
|
|
|
||||||
|
|
@ -17,7 +17,6 @@ package software.amazon.kinesis.checkpoint;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import software.amazon.kinesis.exceptions.KinesisClientLibException;
|
|
||||||
import software.amazon.kinesis.processor.Checkpointer;
|
import software.amazon.kinesis.processor.Checkpointer;
|
||||||
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
|
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -29,7 +29,6 @@ import java.io.IOException;
|
||||||
import java.net.Inet4Address;
|
import java.net.Inet4Address;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.net.UnknownHostException;
|
import java.net.UnknownHostException;
|
||||||
import java.util.Optional;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Default configuration for a producer or consumer used in integration tests.
|
* Default configuration for a producer or consumer used in integration tests.
|
||||||
|
|
@ -81,7 +80,6 @@ public abstract class KCLAppConfig {
|
||||||
}
|
}
|
||||||
|
|
||||||
public final KinesisAsyncClient buildAsyncKinesisClient() throws URISyntaxException, IOException {
|
public final KinesisAsyncClient buildAsyncKinesisClient() throws URISyntaxException, IOException {
|
||||||
|
|
||||||
if (kinesisAsyncClient == null) {
|
if (kinesisAsyncClient == null) {
|
||||||
// Setup H2 client config.
|
// Setup H2 client config.
|
||||||
final NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder()
|
final NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder()
|
||||||
|
|
|
||||||
|
|
@ -15,12 +15,10 @@
|
||||||
package software.amazon.kinesis.coordinator;
|
package software.amazon.kinesis.coordinator;
|
||||||
|
|
||||||
import static org.hamcrest.CoreMatchers.equalTo;
|
import static org.hamcrest.CoreMatchers.equalTo;
|
||||||
import static org.hamcrest.CoreMatchers.not;
|
|
||||||
import static org.junit.Assert.assertThat;
|
import static org.junit.Assert.assertThat;
|
||||||
import static org.mockito.Matchers.any;
|
import static org.mockito.Matchers.any;
|
||||||
import static org.mockito.Matchers.anyLong;
|
import static org.mockito.Matchers.anyLong;
|
||||||
import static org.mockito.Mockito.doAnswer;
|
import static org.mockito.Mockito.doAnswer;
|
||||||
import static org.mockito.Mockito.mock;
|
|
||||||
import static org.mockito.Mockito.never;
|
import static org.mockito.Mockito.never;
|
||||||
import static org.mockito.Mockito.times;
|
import static org.mockito.Mockito.times;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
|
|
|
||||||
|
|
@ -14,8 +14,6 @@
|
||||||
*/
|
*/
|
||||||
package software.amazon.kinesis.coordinator;
|
package software.amazon.kinesis.coordinator;
|
||||||
|
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Unit tests of Worker.
|
* Unit tests of Worker.
|
||||||
*/
|
*/
|
||||||
|
|
|
||||||
|
|
@ -15,16 +15,8 @@
|
||||||
package software.amazon.kinesis.leases;
|
package software.amazon.kinesis.leases;
|
||||||
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.junit.Rule;
|
|
||||||
import org.junit.rules.TestWatcher;
|
|
||||||
import org.junit.runner.Description;
|
|
||||||
import org.mockito.Mock;
|
|
||||||
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
|
|
||||||
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
|
|
||||||
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
|
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
|
||||||
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseRefresher;
|
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseRefresher;
|
||||||
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseSerializer;
|
|
||||||
import software.amazon.kinesis.leases.dynamodb.TableCreatorCallback;
|
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class LeaseIntegrationBillingModePayPerRequestTest extends LeaseIntegrationTest {
|
public class LeaseIntegrationBillingModePayPerRequestTest extends LeaseIntegrationTest {
|
||||||
|
|
|
||||||
|
|
@ -26,11 +26,9 @@ import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
|
||||||
import org.junit.Assert;
|
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import software.amazon.kinesis.leases.ShardInfo;
|
|
||||||
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
|
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
|
||||||
|
|
||||||
public class ShardInfoTest {
|
public class ShardInfoTest {
|
||||||
|
|
|
||||||
|
|
@ -22,7 +22,6 @@ import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.assertThat;
|
import static org.junit.Assert.assertThat;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
|
||||||
|
|
@ -26,7 +26,6 @@ import static org.mockito.Mockito.times;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
|
@ -158,7 +157,6 @@ public class DynamoDBLeaseRefresherTest {
|
||||||
|
|
||||||
verify(mockScanFuture, times(2)).get(anyLong(), any(TimeUnit.class));
|
verify(mockScanFuture, times(2)).get(anyLong(), any(TimeUnit.class));
|
||||||
verify(dynamoDbClient, times(2)).scan(any(ScanRequest.class));
|
verify(dynamoDbClient, times(2)).scan(any(ScanRequest.class));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
||||||
|
|
@ -15,9 +15,7 @@
|
||||||
package software.amazon.kinesis.leases.dynamodb;
|
package software.amazon.kinesis.leases.dynamodb;
|
||||||
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.stream.Collectors;
|
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
@ -153,7 +151,6 @@ public class DynamoDBLeaseTakerIntegrationTest extends LeaseIntegrationTest {
|
||||||
assertThat(addedLeases.values().containsAll(allLeases), equalTo(true));
|
assertThat(addedLeases.values().containsAll(allLeases), equalTo(true));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sets the leaseDurationMillis to 0, ensuring a get request to update the existing lease after computing
|
* Sets the leaseDurationMillis to 0, ensuring a get request to update the existing lease after computing
|
||||||
* leases to take
|
* leases to take
|
||||||
|
|
|
||||||
|
|
@ -22,7 +22,6 @@ import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Optional;
|
|
||||||
|
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
@ -43,7 +42,7 @@ public class BlockOnParentShardTaskTest {
|
||||||
private final String shardId = "shardId-97";
|
private final String shardId = "shardId-97";
|
||||||
private final String streamId = "123:stream:146";
|
private final String streamId = "123:stream:146";
|
||||||
private final String concurrencyToken = "testToken";
|
private final String concurrencyToken = "testToken";
|
||||||
private final List<String> emptyParentShardIds = new ArrayList<String>();
|
private final List<String> emptyParentShardIds = new ArrayList<>();
|
||||||
private ShardInfo shardInfo;
|
private ShardInfo shardInfo;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
|
|
@ -77,7 +76,6 @@ public class BlockOnParentShardTaskTest {
|
||||||
@Test
|
@Test
|
||||||
public final void testCallShouldNotThrowBlockedOnParentWhenParentsHaveFinished()
|
public final void testCallShouldNotThrowBlockedOnParentWhenParentsHaveFinished()
|
||||||
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
|
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
|
||||||
|
|
||||||
ShardInfo shardInfo = null;
|
ShardInfo shardInfo = null;
|
||||||
BlockOnParentShardTask task = null;
|
BlockOnParentShardTask task = null;
|
||||||
String parent1ShardId = "shardId-1";
|
String parent1ShardId = "shardId-1";
|
||||||
|
|
@ -118,7 +116,6 @@ public class BlockOnParentShardTaskTest {
|
||||||
@Test
|
@Test
|
||||||
public final void testCallShouldNotThrowBlockedOnParentWhenParentsHaveFinishedMultiStream()
|
public final void testCallShouldNotThrowBlockedOnParentWhenParentsHaveFinishedMultiStream()
|
||||||
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
|
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
|
||||||
|
|
||||||
ShardInfo shardInfo = null;
|
ShardInfo shardInfo = null;
|
||||||
BlockOnParentShardTask task = null;
|
BlockOnParentShardTask task = null;
|
||||||
String parent1LeaseKey = streamId + ":" + "shardId-1";
|
String parent1LeaseKey = streamId + ":" + "shardId-1";
|
||||||
|
|
@ -162,7 +159,6 @@ public class BlockOnParentShardTaskTest {
|
||||||
@Test
|
@Test
|
||||||
public final void testCallWhenParentsHaveNotFinished()
|
public final void testCallWhenParentsHaveNotFinished()
|
||||||
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
|
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
|
||||||
|
|
||||||
ShardInfo shardInfo = null;
|
ShardInfo shardInfo = null;
|
||||||
BlockOnParentShardTask task = null;
|
BlockOnParentShardTask task = null;
|
||||||
String parent1ShardId = "shardId-1";
|
String parent1ShardId = "shardId-1";
|
||||||
|
|
|
||||||
|
|
@ -35,7 +35,6 @@ import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Optional;
|
|
||||||
import java.util.concurrent.CyclicBarrier;
|
import java.util.concurrent.CyclicBarrier;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
|
|
@ -173,7 +172,6 @@ public class ShardConsumerSubscriberTest {
|
||||||
assertThat(subscriber.getAndResetDispatchFailure(), nullValue());
|
assertThat(subscriber.getAndResetDispatchFailure(), nullValue());
|
||||||
|
|
||||||
verify(shardConsumer, times(20)).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)), any(Subscription.class));
|
verify(shardConsumer, times(20)).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)), any(Subscription.class));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
@ -293,12 +291,10 @@ public class ShardConsumerSubscriberTest {
|
||||||
assertThat(received.size(), equalTo(recordsPublisher.responses.size()));
|
assertThat(received.size(), equalTo(recordsPublisher.responses.size()));
|
||||||
Stream.iterate(0, i -> i + 1).limit(received.size()).forEach(i -> assertThat(received.get(i),
|
Stream.iterate(0, i -> i + 1).limit(received.size()).forEach(i -> assertThat(received.get(i),
|
||||||
eqProcessRecordsInput(recordsPublisher.responses.get(i).recordsRetrieved.processRecordsInput())));
|
eqProcessRecordsInput(recordsPublisher.responses.get(i).recordsRetrieved.processRecordsInput())));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void restartAfterRequestTimerExpiresWhenNotGettingRecordsAfterInitialization() throws Exception {
|
public void restartAfterRequestTimerExpiresWhenNotGettingRecordsAfterInitialization() throws Exception {
|
||||||
|
|
||||||
executorService = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
|
executorService = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
|
||||||
.setNameFormat("test-" + testName.getMethodName() + "-%04d").setDaemon(true).build());
|
.setNameFormat("test-" + testName.getMethodName() + "-%04d").setDaemon(true).build());
|
||||||
|
|
||||||
|
|
@ -347,12 +343,10 @@ public class ShardConsumerSubscriberTest {
|
||||||
assertThat(received.size(), equalTo(recordsPublisher.responses.size()));
|
assertThat(received.size(), equalTo(recordsPublisher.responses.size()));
|
||||||
Stream.iterate(0, i -> i + 1).limit(received.size()).forEach(i -> assertThat(received.get(i),
|
Stream.iterate(0, i -> i + 1).limit(received.size()).forEach(i -> assertThat(received.get(i),
|
||||||
eqProcessRecordsInput(recordsPublisher.responses.get(i).recordsRetrieved.processRecordsInput())));
|
eqProcessRecordsInput(recordsPublisher.responses.get(i).recordsRetrieved.processRecordsInput())));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void restartAfterRequestTimerExpiresWhenInitialTaskExecutionIsRejected() throws Exception {
|
public void restartAfterRequestTimerExpiresWhenInitialTaskExecutionIsRejected() throws Exception {
|
||||||
|
|
||||||
executorService = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
|
executorService = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
|
||||||
.setNameFormat("test-" + testName.getMethodName() + "-%04d").setDaemon(true).build());
|
.setNameFormat("test-" + testName.getMethodName() + "-%04d").setDaemon(true).build());
|
||||||
|
|
||||||
|
|
@ -405,7 +399,6 @@ public class ShardConsumerSubscriberTest {
|
||||||
assertThat(received.size(), equalTo(recordsPublisher.responses.size()));
|
assertThat(received.size(), equalTo(recordsPublisher.responses.size()));
|
||||||
Stream.iterate(0, i -> i + 1).limit(received.size()).forEach(i -> assertThat(received.get(i),
|
Stream.iterate(0, i -> i + 1).limit(received.size()).forEach(i -> assertThat(received.get(i),
|
||||||
eqProcessRecordsInput(recordsPublisher.responses.get(i).recordsRetrieved.processRecordsInput())));
|
eqProcessRecordsInput(recordsPublisher.responses.get(i).recordsRetrieved.processRecordsInput())));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private Object directlyExecuteRunnable(InvocationOnMock invocation) {
|
private Object directlyExecuteRunnable(InvocationOnMock invocation) {
|
||||||
|
|
@ -623,8 +616,6 @@ public class ShardConsumerSubscriberTest {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test to validate the warning message from ShardConsumer is not suppressed with the default configuration of 0
|
* Test to validate the warning message from ShardConsumer is not suppressed with the default configuration of 0
|
||||||
*
|
|
||||||
* @throws Exception
|
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void noLoggingSuppressionNeededOnHappyPathTest() {
|
public void noLoggingSuppressionNeededOnHappyPathTest() {
|
||||||
|
|
@ -648,8 +639,6 @@ public class ShardConsumerSubscriberTest {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test to validate the warning message from ShardConsumer is not suppressed with the default configuration of 0
|
* Test to validate the warning message from ShardConsumer is not suppressed with the default configuration of 0
|
||||||
*
|
|
||||||
* @throws Exception
|
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void loggingNotSuppressedAfterTimeoutTest() {
|
public void loggingNotSuppressedAfterTimeoutTest() {
|
||||||
|
|
@ -677,8 +666,6 @@ public class ShardConsumerSubscriberTest {
|
||||||
/**
|
/**
|
||||||
* Test to validate the warning message from ShardConsumer is successfully supressed if we only have intermittant
|
* Test to validate the warning message from ShardConsumer is successfully supressed if we only have intermittant
|
||||||
* readTimeouts.
|
* readTimeouts.
|
||||||
*
|
|
||||||
* @throws Exception
|
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void loggingSuppressedAfterIntermittentTimeoutTest() {
|
public void loggingSuppressedAfterIntermittentTimeoutTest() {
|
||||||
|
|
@ -705,8 +692,6 @@ public class ShardConsumerSubscriberTest {
|
||||||
/**
|
/**
|
||||||
* Test to validate the warning message from ShardConsumer is successfully logged if multiple sequential timeouts
|
* Test to validate the warning message from ShardConsumer is successfully logged if multiple sequential timeouts
|
||||||
* occur.
|
* occur.
|
||||||
*
|
|
||||||
* @throws Exception
|
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void loggingPartiallySuppressedAfterMultipleTimeoutTest() {
|
public void loggingPartiallySuppressedAfterMultipleTimeoutTest() {
|
||||||
|
|
@ -733,8 +718,6 @@ public class ShardConsumerSubscriberTest {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test to validate the warning message from ShardConsumer is successfully logged if sequential timeouts occur.
|
* Test to validate the warning message from ShardConsumer is successfully logged if sequential timeouts occur.
|
||||||
*
|
|
||||||
* @throws Exception
|
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void loggingPartiallySuppressedAfterConsecutiveTimeoutTest() {
|
public void loggingPartiallySuppressedAfterConsecutiveTimeoutTest() {
|
||||||
|
|
@ -763,8 +746,6 @@ public class ShardConsumerSubscriberTest {
|
||||||
/**
|
/**
|
||||||
* Test to validate the non-timeout warning message from ShardConsumer is not suppressed with the default
|
* Test to validate the non-timeout warning message from ShardConsumer is not suppressed with the default
|
||||||
* configuration of 0
|
* configuration of 0
|
||||||
*
|
|
||||||
* @throws Exception
|
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void loggingNotSuppressedOnNonReadTimeoutExceptionNotIgnoringReadTimeoutsExceptionTest() {
|
public void loggingNotSuppressedOnNonReadTimeoutExceptionNotIgnoringReadTimeoutsExceptionTest() {
|
||||||
|
|
@ -792,12 +773,9 @@ public class ShardConsumerSubscriberTest {
|
||||||
/**
|
/**
|
||||||
* Test to validate the non-timeout warning message from ShardConsumer is not suppressed with 2 ReadTimeouts to
|
* Test to validate the non-timeout warning message from ShardConsumer is not suppressed with 2 ReadTimeouts to
|
||||||
* ignore
|
* ignore
|
||||||
*
|
|
||||||
* @throws Exception
|
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void loggingNotSuppressedOnNonReadTimeoutExceptionIgnoringReadTimeoutsTest() {
|
public void loggingNotSuppressedOnNonReadTimeoutExceptionIgnoringReadTimeoutsTest() {
|
||||||
|
|
||||||
// We're not throwing a ReadTimeout, so no suppression is expected.
|
// We're not throwing a ReadTimeout, so no suppression is expected.
|
||||||
// The test expects a non-ReadTimeout exception to be thrown on requests 3 and 5, and we expect logs on
|
// The test expects a non-ReadTimeout exception to be thrown on requests 3 and 5, and we expect logs on
|
||||||
// each Non-ReadTimeout Exception, no matter what the number of ReadTimeoutsToIgnore we pass in,
|
// each Non-ReadTimeout Exception, no matter what the number of ReadTimeoutsToIgnore we pass in,
|
||||||
|
|
|
||||||
|
|
@ -19,7 +19,6 @@ import static org.hamcrest.CoreMatchers.equalTo;
|
||||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||||
import static org.junit.Assert.assertThat;
|
import static org.junit.Assert.assertThat;
|
||||||
import static org.mockito.Matchers.any;
|
import static org.mockito.Matchers.any;
|
||||||
import static org.mockito.Matchers.eq;
|
|
||||||
import static org.mockito.Mockito.never;
|
import static org.mockito.Mockito.never;
|
||||||
import static org.mockito.Mockito.times;
|
import static org.mockito.Mockito.times;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
|
|
@ -28,7 +27,6 @@ import static org.mockito.Mockito.when;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.hamcrest.Matchers;
|
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
|
|
|
||||||
|
|
@ -8,7 +8,6 @@ import io.reactivex.rxjava3.schedulers.Schedulers;
|
||||||
import io.reactivex.rxjava3.subscribers.SafeSubscriber;
|
import io.reactivex.rxjava3.subscribers.SafeSubscriber;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
import lombok.Setter;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.hamcrest.Description;
|
import org.hamcrest.Description;
|
||||||
import org.hamcrest.Matcher;
|
import org.hamcrest.Matcher;
|
||||||
|
|
@ -54,7 +53,6 @@ import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.CompletableFuture;
|
|
||||||
import java.util.concurrent.CompletionException;
|
import java.util.concurrent.CompletionException;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
|
|
@ -77,7 +75,6 @@ import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
import static org.mockito.Matchers.any;
|
import static org.mockito.Matchers.any;
|
||||||
import static org.mockito.Matchers.argThat;
|
import static org.mockito.Matchers.argThat;
|
||||||
import static org.mockito.Matchers.eq;
|
|
||||||
import static org.mockito.Mockito.doAnswer;
|
import static org.mockito.Mockito.doAnswer;
|
||||||
import static org.mockito.Mockito.doNothing;
|
import static org.mockito.Mockito.doNothing;
|
||||||
import static org.mockito.Mockito.never;
|
import static org.mockito.Mockito.never;
|
||||||
|
|
@ -172,7 +169,6 @@ public class FanOutRecordsPublisherTest {
|
||||||
assertThat(clientRecordsList.get(i), matchers.get(i));
|
assertThat(clientRecordsList.get(i), matchers.get(i));
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
@ -239,7 +235,6 @@ public class FanOutRecordsPublisherTest {
|
||||||
assertThat(clientRecordsList.get(i), matchers.get(i));
|
assertThat(clientRecordsList.get(i), matchers.get(i));
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
@ -317,11 +312,10 @@ public class FanOutRecordsPublisherTest {
|
||||||
});
|
});
|
||||||
|
|
||||||
assertThat(source.getCurrentSequenceNumber(), equalTo("3000"));
|
assertThat(source.getCurrentSequenceNumber(), equalTo("3000"));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testIfEventsAreNotDeliveredToShardConsumerWhenPreviousEventDeliveryTaskGetsRejected() throws Exception {
|
public void testIfEventsAreNotDeliveredToShardConsumerWhenPreviousEventDeliveryTaskGetsRejected() {
|
||||||
FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN);
|
FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN);
|
||||||
|
|
||||||
ArgumentCaptor<FanOutRecordsPublisher.RecordSubscription> captor = ArgumentCaptor
|
ArgumentCaptor<FanOutRecordsPublisher.RecordSubscription> captor = ArgumentCaptor
|
||||||
|
|
@ -395,7 +389,6 @@ public class FanOutRecordsPublisherTest {
|
||||||
});
|
});
|
||||||
|
|
||||||
assertThat(source.getCurrentSequenceNumber(), equalTo("1000"));
|
assertThat(source.getCurrentSequenceNumber(), equalTo("1000"));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
@ -489,12 +482,10 @@ public class FanOutRecordsPublisherTest {
|
||||||
});
|
});
|
||||||
|
|
||||||
assertThat(source.getCurrentSequenceNumber(), equalTo(totalServicePublisherEvents + ""));
|
assertThat(source.getCurrentSequenceNumber(), equalTo(totalServicePublisherEvents + ""));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testIfStreamOfEventsAndOnCompleteAreDeliveredInOrderWithBackpressureAdheringServicePublisher() throws Exception {
|
public void testIfStreamOfEventsAndOnCompleteAreDeliveredInOrderWithBackpressureAdheringServicePublisher() throws Exception {
|
||||||
|
|
||||||
CountDownLatch onS2SCallLatch = new CountDownLatch(2);
|
CountDownLatch onS2SCallLatch = new CountDownLatch(2);
|
||||||
|
|
||||||
doAnswer(new Answer() {
|
doAnswer(new Answer() {
|
||||||
|
|
@ -601,7 +592,6 @@ public class FanOutRecordsPublisherTest {
|
||||||
// Let's wait for sometime to allow the publisher to re-subscribe
|
// Let's wait for sometime to allow the publisher to re-subscribe
|
||||||
onS2SCallLatch.await(5000, TimeUnit.MILLISECONDS);
|
onS2SCallLatch.await(5000, TimeUnit.MILLISECONDS);
|
||||||
verify(kinesisClient, times(2)).subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture());
|
verify(kinesisClient, times(2)).subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture());
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
@ -730,7 +720,6 @@ public class FanOutRecordsPublisherTest {
|
||||||
// With shard end event, onComplete must be propagated to the subscriber.
|
// With shard end event, onComplete must be propagated to the subscriber.
|
||||||
onCompleteLatch.await(5000, TimeUnit.MILLISECONDS);
|
onCompleteLatch.await(5000, TimeUnit.MILLISECONDS);
|
||||||
assertTrue("OnComplete should be triggered", isOnCompleteTriggered[0]);
|
assertTrue("OnComplete should be triggered", isOnCompleteTriggered[0]);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
@ -834,7 +823,6 @@ public class FanOutRecordsPublisherTest {
|
||||||
assertThat(source.getCurrentSequenceNumber(), equalTo(triggerErrorAtNthEvent + ""));
|
assertThat(source.getCurrentSequenceNumber(), equalTo(triggerErrorAtNthEvent + ""));
|
||||||
onErrorReceiveLatch.await(5000, TimeUnit.MILLISECONDS);
|
onErrorReceiveLatch.await(5000, TimeUnit.MILLISECONDS);
|
||||||
assertTrue("OnError should have been thrown", isOnErrorThrown[0]);
|
assertTrue("OnError should have been thrown", isOnErrorThrown[0]);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
@ -928,7 +916,6 @@ public class FanOutRecordsPublisherTest {
|
||||||
});
|
});
|
||||||
|
|
||||||
assertThat(source.getCurrentSequenceNumber(), equalTo(totalServicePublisherEvents + ""));
|
assertThat(source.getCurrentSequenceNumber(), equalTo(totalServicePublisherEvents + ""));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
@ -1131,7 +1118,6 @@ public class FanOutRecordsPublisherTest {
|
||||||
assertThat(clientRecordsList.get(i), matchers.get(i));
|
assertThat(clientRecordsList.get(i), matchers.get(i));
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
@ -1247,7 +1233,6 @@ public class FanOutRecordsPublisherTest {
|
||||||
|
|
||||||
verifyRecords(nonFailingSubscriber.received.get(0).records(), matchers);
|
verifyRecords(nonFailingSubscriber.received.get(0).records(), matchers);
|
||||||
verifyRecords(nonFailingSubscriber.received.get(1).records(), nextMatchers);
|
verifyRecords(nonFailingSubscriber.received.get(1).records(), nextMatchers);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
@ -1467,7 +1452,6 @@ public class FanOutRecordsPublisherTest {
|
||||||
|
|
||||||
assertThat(onErrorEvent, equalTo(Optional.of(new OnErrorEvent(exception))));
|
assertThat(onErrorEvent, equalTo(Optional.of(new OnErrorEvent(exception))));
|
||||||
assertThat(acquireTimeoutLogged.get(), equalTo(true));
|
assertThat(acquireTimeoutLogged.get(), equalTo(true));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void verifyRecords(List<KinesisClientRecord> clientRecordsList, List<KinesisClientRecordMatcher> matchers) {
|
private void verifyRecords(List<KinesisClientRecord> clientRecordsList, List<KinesisClientRecordMatcher> matchers) {
|
||||||
|
|
|
||||||
|
|
@ -2,14 +2,8 @@ package software.amazon.kinesis.utils;
|
||||||
|
|
||||||
import lombok.NoArgsConstructor;
|
import lombok.NoArgsConstructor;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import software.amazon.awssdk.services.dynamodb.model.ListTablesRequest;
|
|
||||||
import software.amazon.awssdk.services.dynamodb.model.ListTablesResponse;
|
|
||||||
import software.amazon.kinesis.common.FutureUtils;
|
|
||||||
|
|
||||||
import java.time.Duration;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.ExecutionException;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
|
|
@ -28,7 +22,7 @@ public abstract class AWSResourceManager {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get a list of all the names of resources of a specified type
|
* Get a list of all the names of resources of a specified type
|
||||||
* @return
|
*
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
public abstract List<String> getAllResourceNames() throws Exception;
|
public abstract List<String> getAllResourceNames() throws Exception;
|
||||||
|
|
|
||||||
|
|
@ -12,8 +12,6 @@ import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
|
||||||
import software.amazon.awssdk.services.dynamodb.model.TableStatus;
|
import software.amazon.awssdk.services.dynamodb.model.TableStatus;
|
||||||
import software.amazon.kinesis.common.FutureUtils;
|
import software.amazon.kinesis.common.FutureUtils;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.net.URISyntaxException;
|
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
|
||||||
|
|
@ -2,8 +2,6 @@ package software.amazon.kinesis.utils;
|
||||||
|
|
||||||
import lombok.Value;
|
import lombok.Value;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import software.amazon.awssdk.services.dynamodb.model.ListTablesRequest;
|
|
||||||
import software.amazon.awssdk.services.dynamodb.model.ListTablesResponse;
|
|
||||||
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
|
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
|
||||||
import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
|
import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
|
||||||
import software.amazon.awssdk.services.kinesis.model.DeleteStreamRequest;
|
import software.amazon.awssdk.services.kinesis.model.DeleteStreamRequest;
|
||||||
|
|
|
||||||
|
|
@ -36,6 +36,7 @@
|
||||||
<module name="OneTopLevelClass"/>
|
<module name="OneTopLevelClass"/>
|
||||||
<module name="OuterTypeFilename"/>
|
<module name="OuterTypeFilename"/>
|
||||||
<module name="ParameterName"/>
|
<module name="ParameterName"/>
|
||||||
|
<module name="UnusedImports"/>
|
||||||
<module name="WhitespaceAfter"/>
|
<module name="WhitespaceAfter"/>
|
||||||
</module>
|
</module>
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue