Merge branch 'awslabs:master' into KCL2XReshardTests

This commit is contained in:
Meher M 2023-06-28 10:51:21 -07:00 committed by GitHub
commit a207c3760c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
88 changed files with 634 additions and 475 deletions

View file

@ -3,6 +3,17 @@
For **1.x** release notes, please see [v1.x/CHANGELOG.md](https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/CHANGELOG.md)
---
### Release 2.5.1 (June 27, 2023)
* [#1143](https://github.com/awslabs/amazon-kinesis-client/pull/1143) Upgrade MultiLangDaemon to support StreamARN
* [#1145](https://github.com/awslabs/amazon-kinesis-client/pull/1145) Introduced GitHub actions to trigger Maven builds during merge/pull requests
* [#1136](https://github.com/awslabs/amazon-kinesis-client/pull/1136) Added testing architecture and KCL 2.x basic polling/streaming tests
* [#1153](https://github.com/awslabs/amazon-kinesis-client/pull/1153) Checkstyle: added `UnusedImports` check.
* [#1150](https://github.com/awslabs/amazon-kinesis-client/pull/1150) Enabled Checkstyle validation of test resources.
* [#1149](https://github.com/awslabs/amazon-kinesis-client/pull/1149) Bound Checkstyle to `validate` goal for automated enforcement.
* [#1148](https://github.com/awslabs/amazon-kinesis-client/pull/1148) Code cleanup to faciliate Checkstyle enforcement.
* [#1142](https://github.com/awslabs/amazon-kinesis-client/pull/1142) Upgrade Google Guava dependency version from 31.1-jre to 32.0.0-jre
* [#1115](https://github.com/awslabs/amazon-kinesis-client/pull/1115) Update KCL version from 2.5.0 to 2.5.1-SNAPSHOT
### Release 2.5.0 (May 19, 2023)
* **[#1109](https://github.com/awslabs/amazon-kinesis-client/pull/1109) Add support for stream ARNs**
* **[#1065](https://github.com/awslabs/amazon-kinesis-client/pull/1065) Allow tags to be added when lease table is created**

View file

@ -58,7 +58,7 @@ The recommended way to use the KCL for Java is to consume it from Maven.
<dependency>
<groupId>software.amazon.kinesis</groupId>
<artifactId>amazon-kinesis-client</artifactId>
<version>2.4.8</version>
<version>2.5.1</version>
</dependency>
```

View file

@ -21,7 +21,7 @@
<parent>
<artifactId>amazon-kinesis-client-pom</artifactId>
<groupId>software.amazon.kinesis</groupId>
<version>2.5.1-SNAPSHOT</version>
<version>2.5.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>

View file

@ -19,7 +19,6 @@ import java.util.concurrent.ExecutorService;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import software.amazon.kinesis.coordinator.KinesisClientLibConfiguration;
import software.amazon.kinesis.multilang.config.MultiLangDaemonConfiguration;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.processor.ShardRecordProcessor;

View file

@ -22,8 +22,6 @@ import java.util.List;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSCredentialsProviderChain;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain;
/**
* Get AWSCredentialsProvider property.

View file

@ -20,7 +20,6 @@ import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Supplier;
import lombok.Getter;
import org.apache.commons.beanutils.ConvertUtilsBean;
@ -150,7 +149,6 @@ public class BuilderDynaBean implements DynaBean {
} else {
return expected.cast(dynaBeanCreateSupport.build());
}
}
private void validateResolvedEmptyHandler() {

View file

@ -23,8 +23,9 @@ import org.apache.commons.beanutils.BeanUtilsBean;
import org.apache.commons.beanutils.ConvertUtilsBean;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
import software.amazon.awssdk.arns.Arn;
import software.amazon.kinesis.common.StreamIdentifier;
/**
* KinesisClientLibConfigurator constructs a KinesisClientLibConfiguration from java properties file. The following
@ -40,7 +41,6 @@ public class KinesisClientLibConfigurator {
private final BeanUtilsBean utilsBean;
private final MultiLangDaemonConfiguration configuration;
/**
* Constructor.
*/
@ -69,8 +69,18 @@ public class KinesisClientLibConfigurator {
});
Validate.notBlank(configuration.getApplicationName(), "Application name is required");
Validate.notBlank(configuration.getStreamName(), "Stream name is required");
if (configuration.getStreamArn() != null && !configuration.getStreamArn().trim().isEmpty()) {
final Arn streamArnObj = Arn.fromString(configuration.getStreamArn());
StreamIdentifier.validateArn(streamArnObj);
//Parse out the stream Name from the Arn (and/or override existing value for Stream Name)
final String streamNameFromArn = streamArnObj.resource().resource();
configuration.setStreamName(streamNameFromArn);
}
Validate.notBlank(configuration.getStreamName(), "Stream name or Stream Arn is required. Stream Arn takes precedence if both are passed in.");
Validate.isTrue(configuration.getKinesisCredentialsProvider().isDirty(), "A basic set of AWS credentials must be provided");
return configuration;
}
@ -96,5 +106,4 @@ public class KinesisClientLibConfigurator {
return getConfiguration(properties);
}
}

View file

@ -28,7 +28,6 @@ import java.util.UUID;
import java.util.function.Function;
import org.apache.commons.beanutils.BeanUtilsBean;
import org.apache.commons.beanutils.ConvertUtils;
import org.apache.commons.beanutils.ConvertUtilsBean;
import org.apache.commons.beanutils.Converter;
import org.apache.commons.beanutils.converters.ArrayConverter;
@ -73,6 +72,8 @@ public class MultiLangDaemonConfiguration {
private String applicationName;
private String streamName;
private String streamArn;
@ConfigurationSettable(configurationClass = ConfigsBuilder.class)
private String tableName;
@ -157,7 +158,6 @@ public class MultiLangDaemonConfiguration {
metricsEnabledDimensions = new HashSet<>(Arrays.asList(dimensions));
}
private RetrievalMode retrievalMode = RetrievalMode.DEFAULT;
private final FanoutConfigBean fanoutConfig = new FanoutConfigBean();
@ -169,7 +169,6 @@ public class MultiLangDaemonConfiguration {
private long shutdownGraceMillis;
private Integer timeoutInSeconds;
private final BuilderDynaBean kinesisCredentialsProvider;
public void setAWSCredentialsProvider(String providerString) {

View file

@ -28,16 +28,15 @@ import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import software.amazon.kinesis.multilang.MessageReader;
import software.amazon.kinesis.multilang.messages.Message;
import software.amazon.kinesis.multilang.messages.StatusMessage;
import com.fasterxml.jackson.databind.ObjectMapper;
public class MessageReaderTest {
private static final String shardId = "shard-123";
private static final String SHARD_ID = "shard-123";
/*
/**
* This line is based on the definition of the protocol for communication between the KCL record processor and
* the client's process.
*/
@ -45,7 +44,7 @@ public class MessageReaderTest {
return String.format("{\"action\":\"checkpoint\", \"checkpoint\":\"%s\"}", sequenceNumber);
}
/*
/**
* This line is based on the definition of the protocol for communication between the KCL record processor and
* the client's process.
*/
@ -80,10 +79,9 @@ public class MessageReaderTest {
String[] responseFors = new String[] { "initialize", "processRecords", "processRecords", "shutdown" };
InputStream stream = buildInputStreamOfGoodInput(sequenceNumbers, responseFors);
MessageReader reader =
new MessageReader().initialize(stream, shardId, new ObjectMapper(), Executors.newCachedThreadPool());
new MessageReader().initialize(stream, SHARD_ID, new ObjectMapper(), Executors.newCachedThreadPool());
for (String responseFor : responseFors) {
StatusMessage statusMessage = null;
try {
Message message = reader.getNextMessageFromSTDOUT().get();
if (message instanceof StatusMessage) {
@ -103,14 +101,14 @@ public class MessageReaderTest {
InputStream stream = buildInputStreamOfGoodInput(sequenceNumbers, responseFors);
MessageReader reader =
new MessageReader().initialize(stream, shardId, new ObjectMapper(), Executors.newCachedThreadPool());
new MessageReader().initialize(stream, SHARD_ID, new ObjectMapper(), Executors.newCachedThreadPool());
Future<Boolean> drainFuture = reader.drainSTDOUT();
Boolean drainResult = drainFuture.get();
Assert.assertNotNull(drainResult);
Assert.assertTrue(drainResult);
}
/*
/**
* readValue should fail safely and just continue looping
*/
@Test
@ -135,7 +133,7 @@ public class MessageReaderTest {
}
MessageReader reader =
new MessageReader().initialize(bufferReader, shardId, new ObjectMapper(),
new MessageReader().initialize(bufferReader, SHARD_ID, new ObjectMapper(),
Executors.newCachedThreadPool());
try {
@ -150,7 +148,7 @@ public class MessageReaderTest {
public void messageReaderBuilderTest() {
InputStream stream = new ByteArrayInputStream("".getBytes());
MessageReader reader =
new MessageReader().initialize(stream, shardId, new ObjectMapper(), Executors.newCachedThreadPool());
new MessageReader().initialize(stream, SHARD_ID, new ObjectMapper(), Executors.newCachedThreadPool());
Assert.assertNotNull(reader);
}
@ -159,7 +157,7 @@ public class MessageReaderTest {
BufferedReader input = Mockito.mock(BufferedReader.class);
Mockito.doThrow(IOException.class).when(input).readLine();
MessageReader reader =
new MessageReader().initialize(input, shardId, new ObjectMapper(), Executors.newCachedThreadPool());
new MessageReader().initialize(input, SHARD_ID, new ObjectMapper(), Executors.newCachedThreadPool());
Future<Message> readTask = reader.getNextMessageFromSTDOUT();
@ -177,7 +175,7 @@ public class MessageReaderTest {
public void noMoreMessagesTest() throws InterruptedException {
InputStream stream = new ByteArrayInputStream("".getBytes());
MessageReader reader =
new MessageReader().initialize(stream, shardId, new ObjectMapper(), Executors.newCachedThreadPool());
new MessageReader().initialize(stream, SHARD_ID, new ObjectMapper(), Executors.newCachedThreadPool());
Future<Message> future = reader.getNextMessageFromSTDOUT();
try {

View file

@ -32,35 +32,30 @@ import org.mockito.Mockito;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.multilang.MessageWriter;
import software.amazon.kinesis.multilang.messages.LeaseLostMessage;
import software.amazon.kinesis.multilang.messages.Message;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.ShutdownReason;
import software.amazon.kinesis.retrieval.KinesisClientRecord;
import static org.mockito.Mockito.verify;
public class MessageWriterTest {
private static final String shardId = "shard-123";
private static final String SHARD_ID = "shard-123";
MessageWriter messageWriter;
OutputStream stream;
@Rule
public final ExpectedException thrown = ExpectedException.none();
// ExecutorService executor;
@Before
public void setup() {
stream = Mockito.mock(OutputStream.class);
messageWriter =
new MessageWriter().initialize(stream, shardId, new ObjectMapper(), Executors.newCachedThreadPool());
new MessageWriter().initialize(stream, SHARD_ID, new ObjectMapper(), Executors.newCachedThreadPool());
}
/*
@ -86,7 +81,7 @@ public class MessageWriterTest {
@Test
public void writeInitializeMessageTest() throws IOException, InterruptedException, ExecutionException {
Future<Boolean> future = this.messageWriter.writeInitializeMessage(InitializationInput.builder().shardId(shardId).build());
Future<Boolean> future = this.messageWriter.writeInitializeMessage(InitializationInput.builder().shardId(SHARD_ID).build());
future.get();
verify(this.stream, Mockito.atLeastOnce()).write(Mockito.any(byte[].class), Mockito.anyInt(),
Mockito.anyInt());
@ -131,20 +126,20 @@ public class MessageWriterTest {
@Test
public void streamIOExceptionTest() throws IOException, InterruptedException, ExecutionException {
Mockito.doThrow(IOException.class).when(stream).flush();
Future<Boolean> initializeTask = this.messageWriter.writeInitializeMessage(InitializationInput.builder().shardId(shardId).build());
Future<Boolean> initializeTask = this.messageWriter.writeInitializeMessage(InitializationInput.builder().shardId(SHARD_ID).build());
Boolean result = initializeTask.get();
Assert.assertNotNull(result);
Assert.assertFalse(result);
}
@Test
public void objectMapperFails() throws JsonProcessingException, InterruptedException, ExecutionException {
public void objectMapperFails() throws JsonProcessingException {
thrown.expect(RuntimeException.class);
thrown.expectMessage("Encountered I/O error while writing LeaseLostMessage action to subprocess");
ObjectMapper mapper = Mockito.mock(ObjectMapper.class);
Mockito.doThrow(JsonProcessingException.class).when(mapper).writeValueAsString(Mockito.any(Message.class));
messageWriter = new MessageWriter().initialize(stream, shardId, mapper, Executors.newCachedThreadPool());
messageWriter = new MessageWriter().initialize(stream, SHARD_ID, mapper, Executors.newCachedThreadPool());
messageWriter.writeLeaseLossMessage(LeaseLostInput.builder().build());
}
@ -157,7 +152,7 @@ public class MessageWriterTest {
Assert.assertFalse(this.messageWriter.isOpen());
try {
// Any message should fail
this.messageWriter.writeInitializeMessage(InitializationInput.builder().shardId(shardId).build());
this.messageWriter.writeInitializeMessage(InitializationInput.builder().shardId(SHARD_ID).build());
Assert.fail("MessageWriter should be closed and unable to write.");
} catch (IllegalStateException e) {
// This should happen.

View file

@ -14,17 +14,14 @@
*/
package software.amazon.kinesis.multilang;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.when;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.Properties;
import org.apache.commons.beanutils.BeanUtilsBean;
import org.apache.commons.beanutils.ConvertUtilsBean;
import org.junit.Before;
import software.amazon.awssdk.regions.Region;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
@ -39,58 +36,163 @@ import software.amazon.kinesis.multilang.config.MultiLangDaemonConfiguration;
@RunWith(MockitoJUnitRunner.class)
public class MultiLangDaemonConfigTest {
private static String FILENAME = "some.properties";
private static final String FILENAME = "some.properties";
private static final String EXE = "TestExe.exe";
private static final String APPLICATION_NAME = MultiLangDaemonConfigTest.class.getSimpleName();
private static final String STREAM_NAME = "fakeStream";
private static final String STREAM_NAME_IN_ARN = "FAKE_STREAM_NAME";
private static final Region REGION = Region.US_EAST_1;
private static final String STREAM_ARN = "arn:aws:kinesis:us-east-2:012345678987:stream/" + STREAM_NAME_IN_ARN;
@Mock
private ClassLoader classLoader;
@Mock
private AwsCredentialsProvider credentialsProvider;
@Mock
private AwsCredentials creds;
@Mock
private KinesisClientLibConfigurator configurator;
private MultiLangDaemonConfig deamonConfig;
@Before
public void setup() {
ConvertUtilsBean convertUtilsBean = new ConvertUtilsBean();
BeanUtilsBean utilsBean = new BeanUtilsBean(convertUtilsBean);
MultiLangDaemonConfiguration multiLangDaemonConfiguration = new MultiLangDaemonConfiguration(utilsBean,
convertUtilsBean);
multiLangDaemonConfiguration.setApplicationName("cool-app");
multiLangDaemonConfiguration.setStreamName("cool-stream");
multiLangDaemonConfiguration.setWorkerIdentifier("cool-worker");
when(credentialsProvider.resolveCredentials()).thenReturn(creds);
when(creds.accessKeyId()).thenReturn("cool-user");
when(configurator.getConfiguration(any(Properties.class))).thenReturn(multiLangDaemonConfiguration);
/**
* Instantiate a MultiLangDaemonConfig object
* @param streamName
* @param streamArn
* @throws IOException
*/
public void setup(String streamName, String streamArn) throws IOException {
String properties = String.format("executableName = %s\n"
+ "applicationName = %s\n"
+ "AWSCredentialsProvider = DefaultAWSCredentialsProviderChain\n"
+ "processingLanguage = malbolge\n"
+ "regionName = %s\n",
EXE,
APPLICATION_NAME,
"us-east-1");
if (streamName != null) {
properties += String.format("streamName = %s\n", streamName);
}
if (streamArn != null) {
properties += String.format("streamArn = %s\n", streamArn);
}
classLoader = Mockito.mock(ClassLoader.class);
@Test
public void constructorTest() throws IOException {
String PROPERTIES = "executableName = randomEXE \n" + "applicationName = testApp \n"
+ "streamName = fakeStream \n" + "AWSCredentialsProvider = DefaultAWSCredentialsProviderChain\n"
+ "processingLanguage = malbolge";
ClassLoader classLoader = Mockito.mock(ClassLoader.class);
Mockito.doReturn(new ByteArrayInputStream(PROPERTIES.getBytes())).when(classLoader)
Mockito.doReturn(new ByteArrayInputStream(properties.getBytes())).when(classLoader)
.getResourceAsStream(FILENAME);
MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator);
when(credentialsProvider.resolveCredentials()).thenReturn(creds);
when(creds.accessKeyId()).thenReturn("cool-user");
configurator = new KinesisClientLibConfigurator();
assertNotNull(deamonConfig.getExecutorService());
assertNotNull(deamonConfig.getMultiLangDaemonConfiguration());
assertNotNull(deamonConfig.getRecordProcessorFactory());
deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator);
}
@Test(expected = IllegalArgumentException.class)
public void testConstructorFailsBecauseStreamArnIsInvalid() throws Exception {
setup("", "this_is_not_a_valid_arn");
}
@Test(expected = IllegalArgumentException.class)
public void testConstructorFailsBecauseStreamArnIsInvalid2() throws Exception {
setup("", "arn:aws:kinesis:us-east-2:ACCOUNT_ID:BadFormatting:stream/" + STREAM_NAME_IN_ARN);
}
@Test(expected = IllegalArgumentException.class)
public void testConstructorFailsBecauseStreamNameAndArnAreEmpty() throws Exception {
setup("", "");
}
@Test(expected = NullPointerException.class)
public void testConstructorFailsBecauseStreamNameAndArnAreNull() throws Exception {
setup(null, null);
}
@Test(expected = NullPointerException.class)
public void testConstructorFailsBecauseStreamNameIsNullAndArnIsEmpty() throws Exception {
setup(null, "");
}
@Test(expected = IllegalArgumentException.class)
public void testConstructorFailsBecauseStreamNameIsEmptyAndArnIsNull() throws Exception {
setup("", null);
}
@Test
public void propertyValidation() {
String PROPERTIES_NO_EXECUTABLE_NAME = "applicationName = testApp \n" + "streamName = fakeStream \n"
public void testConstructorUsingStreamName() throws IOException {
setup(STREAM_NAME, null);
assertConfigurationsMatch(STREAM_NAME, null);
}
@Test
public void testConstructorUsingStreamNameAndStreamArnIsEmpty() throws IOException {
setup(STREAM_NAME, "");
assertConfigurationsMatch(STREAM_NAME, "");
}
@Test
public void testConstructorUsingStreamNameAndStreamArnIsWhitespace() throws IOException {
setup(STREAM_NAME, " ");
assertConfigurationsMatch(STREAM_NAME, "");
}
@Test
public void testConstructorUsingStreamArn() throws IOException {
setup(null, STREAM_ARN);
assertConfigurationsMatch(STREAM_NAME_IN_ARN, STREAM_ARN);
}
@Test
public void testConstructorUsingStreamNameAsEmptyAndStreamArn() throws IOException {
setup("", STREAM_ARN);
assertConfigurationsMatch(STREAM_NAME_IN_ARN, STREAM_ARN);
}
@Test
public void testConstructorUsingStreamArnOverStreamName() throws IOException {
setup(STREAM_NAME, STREAM_ARN);
assertConfigurationsMatch(STREAM_NAME_IN_ARN, STREAM_ARN);
}
/**
* Verify the daemonConfig properties are what we expect them to be.
*
* @param expectedStreamName
*/
private void assertConfigurationsMatch(String expectedStreamName, String expectedStreamArn) {
final MultiLangDaemonConfiguration multiLangConfiguration = deamonConfig.getMultiLangDaemonConfiguration();
assertNotNull(deamonConfig.getExecutorService());
assertNotNull(multiLangConfiguration);
assertNotNull(deamonConfig.getRecordProcessorFactory());
assertEquals(EXE, deamonConfig.getRecordProcessorFactory().getCommandArray()[0]);
assertEquals(APPLICATION_NAME, multiLangConfiguration.getApplicationName());
assertEquals(expectedStreamName, multiLangConfiguration.getStreamName());
assertEquals(REGION, multiLangConfiguration.getDynamoDbClient().get("region"));
assertEquals(REGION, multiLangConfiguration.getCloudWatchClient().get("region"));
assertEquals(REGION, multiLangConfiguration.getKinesisClient().get("region"));
assertEquals(expectedStreamArn, multiLangConfiguration.getStreamArn());
}
@Test
public void testPropertyValidation() {
String propertiesNoExecutableName = "applicationName = testApp \n" + "streamName = fakeStream \n"
+ "AWSCredentialsProvider = DefaultAWSCredentialsProviderChain\n" + "processingLanguage = malbolge";
ClassLoader classLoader = Mockito.mock(ClassLoader.class);
Mockito.doReturn(new ByteArrayInputStream(PROPERTIES_NO_EXECUTABLE_NAME.getBytes())).when(classLoader)
Mockito.doReturn(new ByteArrayInputStream(propertiesNoExecutableName.getBytes())).when(classLoader)
.getResourceAsStream(FILENAME);
MultiLangDaemonConfig config;
try {
config = new MultiLangDaemonConfig(FILENAME, classLoader, configurator);
new MultiLangDaemonConfig(FILENAME, classLoader, configurator);
Assert.fail("Construction of the config should have failed due to property validation failing.");
} catch (IllegalArgumentException e) {
// Good

View file

@ -17,7 +17,6 @@ package software.amazon.kinesis.multilang;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.isEmptyOrNullString;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.anyObject;

View file

@ -31,7 +31,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@ -61,10 +60,8 @@ import software.amazon.kinesis.multilang.messages.ShardEndedMessage;
import software.amazon.kinesis.multilang.messages.StatusMessage;
import com.google.common.util.concurrent.SettableFuture;
import software.amazon.kinesis.coordinator.KinesisClientLibConfiguration;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.ShutdownReason;
import software.amazon.kinesis.processor.RecordProcessorCheckpointer;
import software.amazon.kinesis.retrieval.KinesisClientRecord;
@ -106,7 +103,7 @@ public class MultiLangProtocolTest {
}
@Test
public void initializeTest() throws InterruptedException, ExecutionException {
public void testInitialize() {
when(messageWriter
.writeInitializeMessage(argThat(Matchers.withInit(InitializationInput.builder()
.shardId(shardId).build())))).thenReturn(buildFuture(true));
@ -116,7 +113,7 @@ public class MultiLangProtocolTest {
}
@Test
public void processRecordsTest() throws InterruptedException, ExecutionException {
public void testProcessRecords() {
when(messageWriter.writeProcessRecordsMessage(any(ProcessRecordsInput.class))).thenReturn(buildFuture(true));
when(messageReader.getNextMessageFromSTDOUT()).thenReturn(buildFuture(
new StatusMessage("processRecords"), Message.class));
@ -131,7 +128,6 @@ public class MultiLangProtocolTest {
when(messageReader.getNextMessageFromSTDOUT()).thenReturn(buildFuture(new StatusMessage(LeaseLostMessage.ACTION), Message.class));
assertThat(protocol.leaseLost(LeaseLostInput.builder().build()), equalTo(true));
}
@Test
@ -177,7 +173,7 @@ public class MultiLangProtocolTest {
}
@Test
public void processRecordsWithCheckpointsTest() throws InterruptedException, ExecutionException,
public void testProcessRecordsWithCheckpoints() throws
KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
when(messageWriter.writeProcessRecordsMessage(any(ProcessRecordsInput.class))).thenReturn(buildFuture(true));
@ -206,7 +202,7 @@ public class MultiLangProtocolTest {
}
@Test
public void processRecordsWithABadCheckpointTest() throws InterruptedException, ExecutionException {
public void testProcessRecordsWithABadCheckpoint() {
when(messageWriter.writeProcessRecordsMessage(any(ProcessRecordsInput.class))).thenReturn(buildFuture(true));
when(messageWriter.writeCheckpointMessageWithError(anyString(), anyLong(), any(Throwable.class))).thenReturn(buildFuture(false));
when(messageReader.getNextMessageFromSTDOUT()).thenAnswer(buildMessageAnswers(new ArrayList<Message>() {

View file

@ -27,12 +27,10 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import software.amazon.kinesis.multilang.DrainChildSTDERRTask;
import software.amazon.kinesis.multilang.LineReaderTask;
public class ReadSTDERRTaskTest {
private static final String shardId = "shard-123";
private static final String SHARD_ID = "shard-123";
private BufferedReader mockBufferReader;
@Before
@ -45,7 +43,7 @@ public class ReadSTDERRTaskTest {
String errorMessages = "OMG\nThis is test message\n blah blah blah \n";
InputStream stream = new ByteArrayInputStream(errorMessages.getBytes());
LineReaderTask<Boolean> reader = new DrainChildSTDERRTask().initialize(stream, shardId, "");
LineReaderTask<Boolean> reader = new DrainChildSTDERRTask().initialize(stream, SHARD_ID, "");
Assert.assertNotNull(reader);
}
@ -54,7 +52,7 @@ public class ReadSTDERRTaskTest {
String errorMessages = "OMG\nThis is test message\n blah blah blah \n";
BufferedReader bufferReader =
new BufferedReader(new InputStreamReader(new ByteArrayInputStream(errorMessages.getBytes())));
LineReaderTask<Boolean> errorReader = new DrainChildSTDERRTask().initialize(bufferReader, shardId, "");
LineReaderTask<Boolean> errorReader = new DrainChildSTDERRTask().initialize(bufferReader, SHARD_ID, "");
Assert.assertNotNull(errorReader);
Boolean result = errorReader.call();
@ -67,7 +65,7 @@ public class ReadSTDERRTaskTest {
} catch (IOException e) {
Assert.fail("Not supposed to get an exception when we're just building our mock.");
}
LineReaderTask<Boolean> errorReader = new DrainChildSTDERRTask().initialize(mockBufferReader, shardId, "");
LineReaderTask<Boolean> errorReader = new DrainChildSTDERRTask().initialize(mockBufferReader, SHARD_ID, "");
Assert.assertNotNull(errorReader);
Future<Boolean> result = Executors.newCachedThreadPool().submit(errorReader);
Boolean finishedCleanly = null;

View file

@ -14,12 +14,9 @@
*/
package software.amazon.kinesis.multilang;
import software.amazon.kinesis.coordinator.KinesisClientLibConfiguration;
import org.junit.Assert;
import org.junit.Test;
import software.amazon.kinesis.multilang.MultiLangRecordProcessorFactory;
import software.amazon.kinesis.multilang.MultiLangShardRecordProcessor;
import software.amazon.kinesis.multilang.config.MultiLangDaemonConfiguration;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import org.junit.runner.RunWith;

View file

@ -23,7 +23,6 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collections;
@ -46,9 +45,7 @@ import org.mockito.stubbing.Answer;
import com.fasterxml.jackson.databind.ObjectMapper;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.KinesisClientLibDependencyException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.exceptions.ThrottlingException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
@ -67,7 +64,7 @@ import software.amazon.kinesis.retrieval.KinesisClientRecord;
@RunWith(MockitoJUnitRunner.class)
public class StreamingShardRecordProcessorTest {
private static final String shardId = "shard-123";
private static final String SHARD_ID = "shard-123";
private int systemExitCount = 0;
@ -79,77 +76,73 @@ public class StreamingShardRecordProcessorTest {
private RecordProcessorCheckpointer unimplementedCheckpointer = new RecordProcessorCheckpointer() {
@Override
public void checkpoint() throws KinesisClientLibDependencyException, InvalidStateException,
ThrottlingException, ShutdownException {
public void checkpoint() throws KinesisClientLibDependencyException, ThrottlingException {
throw new UnsupportedOperationException();
}
@Override
public void checkpoint(String sequenceNumber) throws KinesisClientLibDependencyException,
InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException {
ThrottlingException, IllegalArgumentException {
throw new UnsupportedOperationException();
}
@Override
public void checkpoint(Record record)
throws KinesisClientLibDependencyException,
InvalidStateException, ThrottlingException, ShutdownException {
throws KinesisClientLibDependencyException, ThrottlingException {
throw new UnsupportedOperationException();
}
@Override
public void checkpoint(String sequenceNumber, long subSequenceNumber)
throws KinesisClientLibDependencyException,
InvalidStateException, ThrottlingException, ShutdownException,
IllegalArgumentException {
throws KinesisClientLibDependencyException, ThrottlingException, IllegalArgumentException {
throw new UnsupportedOperationException();
}
@Override
public PreparedCheckpointer prepareCheckpoint()
throws KinesisClientLibDependencyException,
InvalidStateException, ThrottlingException, ShutdownException {
throws KinesisClientLibDependencyException, ThrottlingException {
throw new UnsupportedOperationException();
}
@Override
public PreparedCheckpointer prepareCheckpoint(byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
public PreparedCheckpointer prepareCheckpoint(byte[] applicationState)
throws KinesisClientLibDependencyException, ThrottlingException {
throw new UnsupportedOperationException();
}
@Override
public PreparedCheckpointer prepareCheckpoint(Record record)
throws KinesisClientLibDependencyException,
InvalidStateException, ThrottlingException, ShutdownException {
throws KinesisClientLibDependencyException, ThrottlingException {
throw new UnsupportedOperationException();
}
@Override
public PreparedCheckpointer prepareCheckpoint(Record record, byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
public PreparedCheckpointer prepareCheckpoint(Record record, byte[] applicationState)
throws KinesisClientLibDependencyException, ThrottlingException {
throw new UnsupportedOperationException();
}
@Override
public PreparedCheckpointer prepareCheckpoint(String sequenceNumber)
throws KinesisClientLibDependencyException,
InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException {
throws KinesisClientLibDependencyException, ThrottlingException, IllegalArgumentException {
throw new UnsupportedOperationException();
}
@Override
public PreparedCheckpointer prepareCheckpoint(String sequenceNumber, byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException {
public PreparedCheckpointer prepareCheckpoint(String sequenceNumber, byte[] applicationState)
throws KinesisClientLibDependencyException, ThrottlingException, IllegalArgumentException {
return null;
}
@Override
public PreparedCheckpointer prepareCheckpoint(String sequenceNumber, long subSequenceNumber)
throws KinesisClientLibDependencyException,
InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException {
throws KinesisClientLibDependencyException, ThrottlingException, IllegalArgumentException {
throw new UnsupportedOperationException();
}
@Override
public PreparedCheckpointer prepareCheckpoint(String sequenceNumber, long subSequenceNumber, byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException {
public PreparedCheckpointer prepareCheckpoint(String sequenceNumber, long subSequenceNumber, byte[] applicationState)
throws KinesisClientLibDependencyException, ThrottlingException, IllegalArgumentException {
throw new UnsupportedOperationException();
}
@ -171,7 +164,7 @@ public class StreamingShardRecordProcessorTest {
private MultiLangDaemonConfiguration configuration;
@Before
public void prepare() throws IOException, InterruptedException, ExecutionException {
public void prepare() throws InterruptedException, ExecutionException {
// Fake command
systemExitCount = 0;
@ -230,7 +223,7 @@ public class StreamingShardRecordProcessorTest {
List<KinesisClientRecord> testRecords = Collections.emptyList();
recordProcessor.initialize(InitializationInput.builder().shardId(shardId).build());
recordProcessor.initialize(InitializationInput.builder().shardId(SHARD_ID).build());
recordProcessor.processRecords(ProcessRecordsInput.builder().records(testRecords)
.checkpointer(unimplementedCheckpointer).build());
recordProcessor.processRecords(ProcessRecordsInput.builder().records(testRecords)
@ -240,7 +233,6 @@ public class StreamingShardRecordProcessorTest {
@Test
public void processorPhasesTest() throws InterruptedException, ExecutionException {
Answer<StatusMessage> answer = new Answer<StatusMessage>() {
StatusMessage[] answers = new StatusMessage[] { new StatusMessage(InitializeMessage.ACTION),
@ -263,7 +255,7 @@ public class StreamingShardRecordProcessorTest {
verify(messageWriter)
.writeInitializeMessage(argThat(Matchers.withInit(
InitializationInput.builder().shardId(shardId).build())));
InitializationInput.builder().shardId(SHARD_ID).build())));
verify(messageWriter, times(2)).writeProcessRecordsMessage(any(ProcessRecordsInput.class));
verify(messageWriter).writeLeaseLossMessage(any(LeaseLostInput.class));
}
@ -295,7 +287,7 @@ public class StreamingShardRecordProcessorTest {
phases(answer);
verify(messageWriter).writeInitializeMessage(argThat(Matchers.withInit(InitializationInput.builder()
.shardId(shardId).build())));
.shardId(SHARD_ID).build())));
verify(messageWriter, times(2)).writeProcessRecordsMessage(any(ProcessRecordsInput.class));
verify(messageWriter, never()).writeLeaseLossMessage(any(LeaseLostInput.class));
Assert.assertEquals(1, systemExitCount);

View file

@ -16,7 +16,6 @@ package software.amazon.kinesis.multilang.config;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import java.util.Arrays;
@ -32,11 +31,6 @@ import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSCredentialsProviderChain;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain;
public class AWSCredentialsProviderPropertyValueDecoderTest {
private static final String TEST_ACCESS_KEY_ID = "123";

View file

@ -278,10 +278,8 @@ public class KinesisClientLibConfiguratorTest {
}
}
@Test
@Test(expected = IllegalArgumentException.class)
public void testWithMissingCredentialsProvider() {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("A basic set of AWS credentials must be provided");
String test = StringUtils.join(new String[] { "streamName = a", "applicationName = b", "workerId = 123",
"failoverTimeMillis = 100", "shardSyncIntervalMillis = 500" }, '\n');
@ -305,22 +303,37 @@ public class KinesisClientLibConfiguratorTest {
assertFalse(config.getWorkerIdentifier().isEmpty());
}
@Test
public void testWithMissingStreamName() {
thrown.expect(NullPointerException.class);
thrown.expectMessage("Stream name is required");
String test = StringUtils.join(new String[] { "applicationName = b",
"AWSCredentialsProvider = " + credentialName1, "workerId = 123", "failoverTimeMillis = 100" }, '\n');
@Test(expected = NullPointerException.class)
public void testWithMissingStreamNameAndMissingStreamArn() {
String test = StringUtils.join(new String[] {
"applicationName = b",
"AWSCredentialsProvider = " + credentialName1,
"workerId = 123",
"failoverTimeMillis = 100" },
'\n');
InputStream input = new ByteArrayInputStream(test.getBytes());
configurator.getConfiguration(input);
}
@Test
@Test(expected = IllegalArgumentException.class)
public void testWithEmptyStreamNameAndMissingStreamArn() {
String test = StringUtils.join(new String[] {
"applicationName = b",
"AWSCredentialsProvider = " + credentialName1,
"workerId = 123",
"failoverTimeMillis = 100",
"streamName = ",
"streamArn = "},
'\n');
InputStream input = new ByteArrayInputStream(test.getBytes());
configurator.getConfiguration(input);
}
@Test(expected = NullPointerException.class)
public void testWithMissingApplicationName() {
thrown.expect(NullPointerException.class);
thrown.expectMessage("Application name is required");
String test = StringUtils.join(new String[] { "streamName = a", "AWSCredentialsProvider = " + credentialName1,
"workerId = 123", "failoverTimeMillis = 100" }, '\n');

View file

@ -26,13 +26,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.ShutdownReason;
import software.amazon.kinesis.multilang.messages.CheckpointMessage;
import software.amazon.kinesis.multilang.messages.InitializeMessage;
import software.amazon.kinesis.multilang.messages.Message;
import software.amazon.kinesis.multilang.messages.ProcessRecordsMessage;
import software.amazon.kinesis.multilang.messages.ShutdownMessage;
import software.amazon.kinesis.multilang.messages.ShutdownRequestedMessage;
import software.amazon.kinesis.multilang.messages.StatusMessage;
import software.amazon.kinesis.retrieval.KinesisClientRecord;
public class MessageTest {
@ -56,7 +49,7 @@ public class MessageTest {
new ProcessRecordsMessage(),
new ShutdownRequestedMessage(),
new LeaseLostMessage(),
new ShardEndedMessage()
new ShardEndedMessage(),
};
// TODO: fix this

View file

@ -22,7 +22,7 @@
<parent>
<groupId>software.amazon.kinesis</groupId>
<artifactId>amazon-kinesis-client-pom</artifactId>
<version>2.5.1-SNAPSHOT</version>
<version>2.5.1</version>
</parent>
<artifactId>amazon-kinesis-client</artifactId>
@ -89,7 +89,7 @@
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.1-jre</version>
<version>32.0.0-jre</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>

View file

@ -144,7 +144,8 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi
* {@inheritDoc}
*/
@Override
public PreparedCheckpointer prepareCheckpoint(byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
public PreparedCheckpointer prepareCheckpoint(byte[] applicationState)
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
return prepareCheckpoint(largestPermittedCheckpointValue.sequenceNumber(), applicationState);
}
@ -152,7 +153,8 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi
* {@inheritDoc}
*/
@Override
public PreparedCheckpointer prepareCheckpoint(Record record, byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
public PreparedCheckpointer prepareCheckpoint(Record record, byte[] applicationState)
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
//
// TODO: UserRecord Deprecation
//

View file

@ -103,7 +103,8 @@ public class DynamoDBCheckpointer implements Checkpointer {
}
@Override
public void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken, byte[] pendingCheckpointState) throws KinesisClientLibException {
public void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken,
byte[] pendingCheckpointState) throws KinesisClientLibException {
try {
boolean wasSuccessful =
prepareCheckpoint(leaseKey, pendingCheckpoint, UUID.fromString(concurrencyToken), pendingCheckpointState);

View file

@ -23,10 +23,11 @@ import software.amazon.awssdk.services.kinesis.model.HashKeyRange;
import java.math.BigInteger;
@Value @Accessors(fluent = true)
/**
* Lease POJO to hold the starting hashkey range and ending hashkey range of kinesis shards.
*/
@Accessors(fluent = true)
@Value
public class HashKeyRangeForLease {
private final BigInteger startingHashKey;

View file

@ -167,12 +167,22 @@ public class StreamIdentifier {
.build();
}
private static void validateArn(Arn streamArn) {
/**
* Verify the streamArn follows the appropriate formatting.
* Throw an exception if it does not.
* @param streamArn
*/
public static void validateArn(Arn streamArn) {
if (!STREAM_ARN_PATTERN.matcher(streamArn.toString()).matches() || !streamArn.region().isPresent()) {
throw new IllegalArgumentException("Unable to create a StreamIdentifier from " + streamArn);
throw new IllegalArgumentException("Invalid streamArn " + streamArn);
}
}
/**
* Verify creationEpoch is greater than 0.
* Throw an exception if it is not.
* @param creationEpoch
*/
private static void validateCreationEpoch(long creationEpoch) {
if (creationEpoch <= 0) {
throw new IllegalArgumentException(

View file

@ -417,8 +417,9 @@ class PeriodicShardSyncManager {
@VisibleForTesting
static List<Lease> sortLeasesByHashRange(List<Lease> leasesWithHashKeyRanges) {
if (leasesWithHashKeyRanges.size() == 0 || leasesWithHashKeyRanges.size() == 1)
if (leasesWithHashKeyRanges.size() == 0 || leasesWithHashKeyRanges.size() == 1) {
return leasesWithHashKeyRanges;
}
Collections.sort(leasesWithHashKeyRanges, new HashKeyRangeComparator());
return leasesWithHashKeyRanges;
}

View file

@ -544,7 +544,8 @@ public class Scheduler implements Runnable {
final Map<Boolean, Set<StreamIdentifier>> staleStreamIdDeletionDecisionMap = staleStreamDeletionMap.keySet().stream().collect(Collectors
.partitioningBy(newStreamConfigMap::containsKey, Collectors.toSet()));
final Set<StreamIdentifier> staleStreamIdsToBeDeleted = staleStreamIdDeletionDecisionMap.get(false).stream().filter(streamIdentifier ->
Duration.between(staleStreamDeletionMap.get(streamIdentifier), Instant.now()).toMillis() >= waitPeriodToDeleteOldStreams.toMillis()).collect(Collectors.toSet());
Duration.between(staleStreamDeletionMap.get(streamIdentifier), Instant.now()).toMillis() >= waitPeriodToDeleteOldStreams.toMillis())
.collect(Collectors.toSet());
// These are the streams which are deleted in Kinesis and we encounter resource not found during
// shardSyncTask. This is applicable in MultiStreamMode only, in case of SingleStreamMode, store will
// not have any data.

View file

@ -80,7 +80,7 @@ public class HierarchicalShardSyncer {
private static final String MIN_HASH_KEY = BigInteger.ZERO.toString();
private static final String MAX_HASH_KEY = new BigInteger("2").pow(128).subtract(BigInteger.ONE).toString();
private static final int retriesForCompleteHashRange = 3;
private static final int RETRIES_FOR_COMPLETE_HASH_RANGE = 3;
private static final long DELAY_BETWEEN_LIST_SHARDS_MILLIS = 1000;
@ -98,7 +98,7 @@ public class HierarchicalShardSyncer {
this.deletedStreamListProvider = deletedStreamListProvider;
}
private static final BiFunction<Lease, MultiStreamArgs, String> shardIdFromLeaseDeducer =
private static final BiFunction<Lease, MultiStreamArgs, String> SHARD_ID_FROM_LEASE_DEDUCER =
(lease, multiStreamArgs) ->
multiStreamArgs.isMultiStreamMode() ?
((MultiStreamLease) lease).shardId() :
@ -129,7 +129,9 @@ public class HierarchicalShardSyncer {
isLeaseTableEmpty);
}
//Provide a pre-collcted list of shards to avoid calling ListShards API
/**
* Provide a pre-collected list of shards to avoid calling ListShards API
*/
public synchronized boolean checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector,
final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition,
List<Shard> latestShards, final boolean ignoreUnexpectedChildShards, final MetricsScope scope, final boolean isLeaseTableEmpty)
@ -268,7 +270,7 @@ public class HierarchicalShardSyncer {
List<Shard> shards;
for (int i = 0; i < retriesForCompleteHashRange; i++) {
for (int i = 0; i < RETRIES_FOR_COMPLETE_HASH_RANGE; i++) {
shards = shardDetector.listShardsWithFilter(shardFilter);
if (shards == null) {
@ -284,7 +286,7 @@ public class HierarchicalShardSyncer {
}
throw new KinesisClientLibIOException("Hash range of shards returned for " + streamName + " was incomplete after "
+ retriesForCompleteHashRange + " retries.");
+ RETRIES_FOR_COMPLETE_HASH_RANGE + " retries.");
}
private List<Shard> getShardList(@NonNull final ShardDetector shardDetector) throws KinesisClientLibIOException {
@ -365,7 +367,8 @@ public class HierarchicalShardSyncer {
* @return List of new leases to create sorted by starting sequenceNumber of the corresponding shard
*/
static List<Lease> determineNewLeasesToCreate(final LeaseSynchronizer leaseSynchronizer, final List<Shard> shards,
final List<Lease> currentLeases, final InitialPositionInStreamExtended initialPosition,final Set<String> inconsistentShardIds) {
final List<Lease> currentLeases, final InitialPositionInStreamExtended initialPosition,
final Set<String> inconsistentShardIds) {
return determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, initialPosition, inconsistentShardIds,
new MultiStreamArgs(false, null));
}
@ -499,11 +502,13 @@ public class HierarchicalShardSyncer {
if (descendantParentShardIds.contains(parentShardId)
&& !initialPosition.getInitialPositionInStream()
.equals(InitialPositionInStream.AT_TIMESTAMP)) {
log.info("Setting Lease '{}' checkpoint to 'TRIM_HORIZON'. Checkpoint was previously set to {}", lease.leaseKey(), lease.checkpoint());
log.info("Setting Lease '{}' checkpoint to 'TRIM_HORIZON'. Checkpoint was previously set to {}",
lease.leaseKey(), lease.checkpoint());
lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
} else {
final ExtendedSequenceNumber newCheckpoint = convertToCheckpoint(initialPosition);
log.info("Setting Lease '{}' checkpoint to '{}'. Checkpoint was previously set to {}", lease.leaseKey(), newCheckpoint, lease.checkpoint());
log.info("Setting Lease '{}' checkpoint to '{}'. Checkpoint was previously set to {}",
lease.leaseKey(), newCheckpoint, lease.checkpoint());
lease.checkpoint(newCheckpoint);
}
}
@ -728,8 +733,8 @@ public class HierarchicalShardSyncer {
@Override
public int compare(final Lease lease1, final Lease lease2) {
int result = 0;
final String shardId1 = shardIdFromLeaseDeducer.apply(lease1, multiStreamArgs);
final String shardId2 = shardIdFromLeaseDeducer.apply(lease2, multiStreamArgs);
final String shardId1 = SHARD_ID_FROM_LEASE_DEDUCER.apply(lease1, multiStreamArgs);
final String shardId2 = SHARD_ID_FROM_LEASE_DEDUCER.apply(lease2, multiStreamArgs);
final Shard shard1 = shardIdToShardMap.get(shardId1);
final Shard shard2 = shardIdToShardMap.get(shardId2);
@ -802,7 +807,7 @@ public class HierarchicalShardSyncer {
final Map<String, Shard> shardIdToShardMapOfAllKinesisShards = constructShardIdToShardMap(shards);
currentLeases.stream().peek(lease -> log.debug("{} : Existing lease: {}", streamIdentifier, lease))
.map(lease -> shardIdFromLeaseDeducer.apply(lease, multiStreamArgs))
.map(lease -> SHARD_ID_FROM_LEASE_DEDUCER.apply(lease, multiStreamArgs))
.collect(Collectors.toSet());
final List<Lease> newLeasesToCreate = getLeasesToCreateForOpenAndClosedShards(initialPosition, shards, multiStreamArgs, streamIdentifier);
@ -908,7 +913,7 @@ public class HierarchicalShardSyncer {
.map(streamId -> streamId.serialize()).orElse("");
final Set<String> shardIdsOfCurrentLeases = currentLeases.stream()
.peek(lease -> log.debug("{} : Existing lease: {}", streamIdentifier, lease))
.map(lease -> shardIdFromLeaseDeducer.apply(lease, multiStreamArgs))
.map(lease -> SHARD_ID_FROM_LEASE_DEDUCER.apply(lease, multiStreamArgs))
.collect(Collectors.toSet());
final List<Shard> openShards = getOpenShards(shards, streamIdentifier);

View file

@ -23,8 +23,6 @@ import lombok.Setter;
import lombok.experimental.Accessors;
import org.apache.commons.lang3.Validate;
import java.util.Objects;
import static com.google.common.base.Verify.verifyNotNull;
@Setter

View file

@ -37,7 +37,7 @@ import software.amazon.kinesis.metrics.MetricsUtil;
@Slf4j
@KinesisClientInternalApi
public class ShardSyncTask implements ConsumerTask {
private final String SHARD_SYNC_TASK_OPERATION = "ShardSyncTask";
private static final String SHARD_SYNC_TASK_OPERATION = "ShardSyncTask";
@NonNull
private final ShardDetector shardDetector;

View file

@ -329,8 +329,9 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
@Override
public void stopLeaseTaker() {
if (takerFuture != null) {
takerFuture.cancel(false);
}
}
@Override

View file

@ -212,8 +212,10 @@ public class ProcessTask implements ConsumerTask {
log.debug("Calling application processRecords() with {} records from {}", records.size(),
shardInfoId);
final ProcessRecordsInput processRecordsInput = ProcessRecordsInput.builder().records(records).cacheExitTime(input.cacheExitTime()).cacheEntryTime(input.cacheEntryTime())
.isAtShardEnd(input.isAtShardEnd()).checkpointer(recordProcessorCheckpointer).millisBehindLatest(input.millisBehindLatest()).build();
final ProcessRecordsInput processRecordsInput = ProcessRecordsInput.builder().records(records)
.cacheExitTime(input.cacheExitTime()).cacheEntryTime(input.cacheEntryTime())
.isAtShardEnd(input.isAtShardEnd()).checkpointer(recordProcessorCheckpointer)
.millisBehindLatest(input.millisBehindLatest()).build();
final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, PROCESS_TASK_OPERATION);
shardInfo.streamIdentifierSerOpt()

View file

@ -74,7 +74,6 @@ class ShardConsumerSubscriber implements Subscriber<RecordsRetrieved> {
this.shardInfoId = ShardInfo.getLeaseKey(shardConsumer.shardInfo());
}
void startSubscriptions() {
synchronized (lockObject) {
// Setting the lastRequestTime to allow for health checks to restart subscriptions if they failed to
@ -131,7 +130,9 @@ class ShardConsumerSubscriber implements Subscriber<RecordsRetrieved> {
Duration timeSinceLastResponse = Duration.between(lastRequestTime, now);
if (timeSinceLastResponse.toMillis() > maxTimeBetweenRequests) {
log.error(
// CHECKSTYLE.OFF: LineLength
"{}: Last request was dispatched at {}, but no response as of {} ({}). Cancelling subscription, and restarting. Last successful request details -- {}",
// CHECKSTYLE.ON: LineLength
shardInfoId, lastRequestTime, now, timeSinceLastResponse, recordsPublisher.getLastSuccessfulRequestDetails());
cancel();

View file

@ -20,9 +20,7 @@ import java.util.Objects;
import software.amazon.awssdk.services.cloudwatch.model.Dimension;
import software.amazon.awssdk.services.cloudwatch.model.MetricDatum;
/*
/**
* A representation of a key of a MetricDatum. This class is useful when wanting to compare
* whether 2 keys have the same MetricDatum. This feature will be used in MetricAccumulatingQueue
* where we aggregate metrics across multiple MetricScopes.
@ -48,12 +46,15 @@ public class CloudWatchMetricKey {
@Override
public boolean equals(Object obj) {
if (this == obj)
if (this == obj) {
return true;
if (obj == null)
}
if (obj == null) {
return false;
if (getClass() != obj.getClass())
}
if (getClass() != obj.getClass()) {
return false;
}
CloudWatchMetricKey other = (CloudWatchMetricKey) obj;
return Objects.equals(other.dimensions, dimensions) && Objects.equals(other.metricName, metricName);
}

View file

@ -15,7 +15,6 @@
package software.amazon.kinesis.metrics;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.Setter;
import lombok.experimental.Accessors;
import software.amazon.awssdk.services.cloudwatch.model.MetricDatum;
@ -36,7 +35,6 @@ import java.util.Objects;
*
* MetricDatumWithKey<SampleMetricKey> sampleDatumWithKey = new MetricDatumWithKey<SampleMetricKey>(new
* SampleMetricKey(System.currentTimeMillis()), datum)
*
*/
@AllArgsConstructor
@Setter
@ -59,12 +57,15 @@ public class MetricDatumWithKey<KeyType> {
@Override
public boolean equals(Object obj) {
if (this == obj)
if (this == obj) {
return true;
if (obj == null)
}
if (obj == null) {
return false;
if (getClass() != obj.getClass())
}
if (getClass() != obj.getClass()) {
return false;
}
MetricDatumWithKey<?> other = (MetricDatumWithKey<?>) obj;
return Objects.equals(other.key, key) && Objects.equals(other.datum, datum);
}

View file

@ -49,7 +49,7 @@ public class RetrievalConfig {
*/
public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java";
public static final String KINESIS_CLIENT_LIB_USER_AGENT_VERSION = "2.5.1-SNAPSHOT";
public static final String KINESIS_CLIENT_LIB_USER_AGENT_VERSION = "2.5.1";
/**
* Client used to make calls to Kinesis for records retrieval
@ -152,7 +152,7 @@ public class RetrievalConfig {
if (streamTracker().isMultiStream()) {
throw new IllegalArgumentException(
"Cannot set initialPositionInStreamExtended when multiStreamTracker is set");
};
}
final StreamIdentifier streamIdentifier = getSingleStreamIdentifier();
final StreamConfig updatedConfig = new StreamConfig(streamIdentifier, initialPositionInStreamExtended);

View file

@ -27,14 +27,12 @@ import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.ChildShard;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponse;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.awssdk.utils.Either;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
@ -117,7 +115,6 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
this.currentSequenceNumber = extendedSequenceNumber.sequenceNumber();
this.isFirstConnection = true;
}
}
@Override
@ -206,7 +203,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
if (flow != null && recordsDeliveryAck.batchUniqueIdentifier().getFlowIdentifier()
.equals(flow.getSubscribeToShardId())) {
log.error(
"{}: Received unexpected ack for the active subscription {}. Throwing. ", streamAndShardId, recordsDeliveryAck.batchUniqueIdentifier().getFlowIdentifier());
"{}: Received unexpected ack for the active subscription {}. Throwing.",
streamAndShardId, recordsDeliveryAck.batchUniqueIdentifier().getFlowIdentifier());
throw new IllegalStateException("Unexpected ack for the active subscription");
}
// Otherwise publisher received a stale ack.
@ -275,7 +273,6 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
SubscriptionShutdownEvent(Runnable subscriptionShutdownAction, String eventIdentifier) {
this(subscriptionShutdownAction, eventIdentifier, null);
}
}
private boolean hasValidSubscriber() {
@ -335,7 +332,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
if (flow != null) {
String logMessage = String.format(
"%s: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ %s id: %s -- %s." +
" Last successful request details -- %s", streamAndShardId, flow.connectionStartedAt, flow.subscribeToShardId, category.throwableTypeString, lastSuccessfulRequestDetails);
" Last successful request details -- %s", streamAndShardId, flow.connectionStartedAt,
flow.subscribeToShardId, category.throwableTypeString, lastSuccessfulRequestDetails);
switch (category.throwableType) {
case READ_TIMEOUT:
log.debug(logMessage, propagationThrowable);
@ -367,7 +365,9 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
} else {
if (triggeringFlow != null) {
log.debug(
// CHECKSTYLE.OFF: LineLength
"{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ {} id: {} -- {} -> triggeringFlow wasn't the active flow. Didn't dispatch error",
// CHECKSTYLE.ON: LineLength
streamAndShardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId,
category.throwableTypeString);
triggeringFlow.cancel();
@ -603,7 +603,9 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
synchronized (lockObject) {
if (subscriber != s) {
log.warn(
// CHECKSTYLE.OFF: LineLength
"{}: (FanOutRecordsPublisher/Subscription#request) - Rejected an attempt to request({}), because subscribers don't match. Last successful request details -- {}",
// CHECKSTYLE.ON: LineLength
streamAndShardId, n, lastSuccessfulRequestDetails);
return;
}
@ -630,13 +632,17 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
synchronized (lockObject) {
if (subscriber != s) {
log.warn(
// CHECKSTYLE.OFF: LineLength
"{}: (FanOutRecordsPublisher/Subscription#cancel) - Rejected attempt to cancel subscription, because subscribers don't match. Last successful request details -- {}",
// CHECKSTYLE.ON: LineLength
streamAndShardId, lastSuccessfulRequestDetails);
return;
}
if (!hasValidSubscriber()) {
log.warn(
// CHECKSTYLE.OFF: LineLength
"{}: (FanOutRecordsPublisher/Subscription#cancel) - Cancelled called even with an invalid subscriber. Last successful request details -- {}",
// CHECKSTYLE.ON: LineLength
streamAndShardId, lastSuccessfulRequestDetails);
}
subscriber = null;
@ -778,7 +784,11 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
executeExceptionOccurred(throwable);
} else {
final SubscriptionShutdownEvent subscriptionShutdownEvent = new SubscriptionShutdownEvent(
() -> {parent.recordsDeliveryQueue.poll(); executeExceptionOccurred(throwable);}, "onError", throwable);
() -> {
parent.recordsDeliveryQueue.poll();
executeExceptionOccurred(throwable);
},
"onError", throwable);
tryEnqueueSubscriptionShutdownEvent(subscriptionShutdownEvent);
}
}
@ -786,7 +796,6 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
private void executeExceptionOccurred(Throwable throwable) {
synchronized (parent.lockObject) {
log.debug("{}: [SubscriptionLifetime]: (RecordFlow#exceptionOccurred) @ {} id: {} -- {}: {}",
parent.streamAndShardId, connectionStartedAt, subscribeToShardId, throwable.getClass().getName(),
throwable.getMessage());
@ -803,7 +812,9 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
isErrorDispatched = true;
} else {
log.debug(
// CHECKSTYLE.OFF: LineLength
"{}: [SubscriptionLifetime]: (RecordFlow#exceptionOccurred) @ {} id: {} -- An error has previously been dispatched, not dispatching this error {}: {}",
// CHECKSTYLE.OFF: LineLength
parent.streamAndShardId, connectionStartedAt, subscribeToShardId, throwable.getClass().getName(),
throwable.getMessage());
}
@ -817,7 +828,11 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
executeComplete();
} else {
final SubscriptionShutdownEvent subscriptionShutdownEvent = new SubscriptionShutdownEvent(
() -> {parent.recordsDeliveryQueue.poll(); executeComplete();}, "onComplete");
() -> {
parent.recordsDeliveryQueue.poll();
executeComplete();
},
"onComplete");
tryEnqueueSubscriptionShutdownEvent(subscriptionShutdownEvent);
}
}
@ -830,7 +845,9 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
.add(new RecordsRetrievedContext(Either.right(subscriptionShutdownEvent), this, Instant.now()));
} catch (Exception e) {
log.warn(
// CHECKSTYLE.OFF: LineLength
"{}: Unable to enqueue the {} shutdown event due to capacity restrictions in delivery queue with remaining capacity {}. Ignoring. Last successful request details -- {}",
// CHECKSTYLE.ON: LineLength
parent.streamAndShardId, subscriptionShutdownEvent.getEventIdentifier(), parent.recordsDeliveryQueue.remainingCapacity(),
parent.lastSuccessfulRequestDetails, subscriptionShutdownEvent.getShutdownEventThrowableOptional());
}
@ -854,7 +871,9 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
}
if (this.isDisposed) {
log.warn(
// CHECKSTYLE.OFF: LineLength
"{}: [SubscriptionLifetime]: (RecordFlow#complete) @ {} id: {} -- This flow has been disposed not dispatching completion. Last successful request details -- {}",
// CHECKSTYLE.ON: LineLength
parent.streamAndShardId, connectionStartedAt, subscribeToShardId, parent.lastSuccessfulRequestDetails);
return;
}

View file

@ -145,7 +145,9 @@ public class KinesisDataFetcher implements DataFetcher {
}
}
// CHECKSTYLE.OFF: MemberName
final DataFetcherResult TERMINAL_RESULT = new DataFetcherResult() {
// CHECKSTYLE.ON: MemberName
@Override
public GetRecordsResponse getResult() {
return GetRecordsResponse.builder()

View file

@ -17,7 +17,6 @@ package software.amazon.kinesis.checkpoint;
import java.util.HashMap;
import java.util.Map;
import software.amazon.kinesis.exceptions.KinesisClientLibException;
import software.amazon.kinesis.processor.Checkpointer;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
@ -39,8 +38,7 @@ public class InMemoryCheckpointer implements Checkpointer {
* {@inheritDoc}
*/
@Override
public void setCheckpoint(String leaseKey, ExtendedSequenceNumber checkpointValue, String concurrencyToken)
throws KinesisClientLibException {
public void setCheckpoint(String leaseKey, ExtendedSequenceNumber checkpointValue, String concurrencyToken) {
checkpoints.put(leaseKey, checkpointValue);
flushpoints.put(leaseKey, checkpointValue);
pendingCheckpoints.remove(leaseKey);
@ -49,33 +47,32 @@ public class InMemoryCheckpointer implements Checkpointer {
if (log.isDebugEnabled()) {
log.debug("shardId: {} checkpoint: {}", leaseKey, checkpointValue);
}
}
/**
* {@inheritDoc}
*/
@Override
public ExtendedSequenceNumber getCheckpoint(String leaseKey) throws KinesisClientLibException {
public ExtendedSequenceNumber getCheckpoint(String leaseKey) {
ExtendedSequenceNumber checkpoint = flushpoints.get(leaseKey);
log.debug("checkpoint shardId: {} checkpoint: {}", leaseKey, checkpoint);
return checkpoint;
}
@Override
public void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken)
throws KinesisClientLibException {
public void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken) {
prepareCheckpoint(leaseKey, pendingCheckpoint, concurrencyToken, null);
}
@Override
public void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken, byte[] pendingCheckpointState) throws KinesisClientLibException {
public void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken,
byte[] pendingCheckpointState) {
pendingCheckpoints.put(leaseKey, pendingCheckpoint);
pendingCheckpointStates.put(leaseKey, pendingCheckpointState);
}
@Override
public Checkpoint getCheckpointObject(String leaseKey) throws KinesisClientLibException {
public Checkpoint getCheckpointObject(String leaseKey) {
ExtendedSequenceNumber checkpoint = flushpoints.get(leaseKey);
ExtendedSequenceNumber pendingCheckpoint = pendingCheckpoints.get(leaseKey);
byte[] pendingCheckpointState = pendingCheckpointStates.get(leaseKey);

View file

@ -397,7 +397,7 @@ public class ShardShardRecordProcessorCheckpointerTest {
assertThat(checkpointer.largestPermittedCheckpointValue(), equalTo(sequenceNumber));
}
/*
/**
* This test is a mixed test of checking some basic functionality of checkpointing at a sequence number and making
* sure certain bounds checks and validations are being performed inside the checkpointer to prevent clients from
* checkpointing out of order/too big/non-numeric values that aren't valid strings for them to be checkpointing
@ -444,7 +444,7 @@ public class ShardShardRecordProcessorCheckpointerTest {
new ExtendedSequenceNumber("bogus-checkpoint-value"), // Can't checkpoint at non-numeric string
ExtendedSequenceNumber.SHARD_END, // Can't go to the end unless it is set as the max
ExtendedSequenceNumber.TRIM_HORIZON, // Can't go back to an initial sentinel value
ExtendedSequenceNumber.LATEST // Can't go back to an initial sentinel value
ExtendedSequenceNumber.LATEST, // Can't go back to an initial sentinel value
};
for (ExtendedSequenceNumber badCheckpointValue : valuesWeShouldNotBeAbleToCheckpointAt) {
try {
@ -477,7 +477,7 @@ public class ShardShardRecordProcessorCheckpointerTest {
processingCheckpointer.lastCheckpointValue(), equalTo(ExtendedSequenceNumber.SHARD_END));
}
/*
/**
* This test is a mixed test of checking some basic functionality of two phase checkpointing at a sequence number
* and making sure certain bounds checks and validations are being performed inside the checkpointer to prevent
* clients from checkpointing out of order/too big/non-numeric values that aren't valid strings for them to be
@ -548,7 +548,7 @@ public class ShardShardRecordProcessorCheckpointerTest {
new ExtendedSequenceNumber("bogus-checkpoint-value"), // Can't checkpoint at non-numeric string
ExtendedSequenceNumber.SHARD_END, // Can't go to the end unless it is set as the max
ExtendedSequenceNumber.TRIM_HORIZON, // Can't go back to an initial sentinel value
ExtendedSequenceNumber.LATEST // Can't go back to an initial sentinel value
ExtendedSequenceNumber.LATEST, // Can't go back to an initial sentinel value
};
for (ExtendedSequenceNumber badCheckpointValue : valuesWeShouldNotBeAbleToCheckpointAt) {
try {
@ -566,7 +566,6 @@ public class ShardShardRecordProcessorCheckpointerTest {
assertThat("Largest sequence number should not have changed",
processingCheckpointer.largestPermittedCheckpointValue(), equalTo(thirdSequenceNumber));
assertThat(checkpoint.getCheckpointObject(shardId).pendingCheckpoint(), nullValue());
}
// advance to third number
@ -601,7 +600,6 @@ public class ShardShardRecordProcessorCheckpointerTest {
*
* @throws Exception
*/
@SuppressWarnings("serial")
@Test
public final void testMixedCheckpointCalls() throws Exception {
for (LinkedHashMap<String, CheckpointAction> testPlan : getMixedCallsTestPlan()) {
@ -617,7 +615,6 @@ public class ShardShardRecordProcessorCheckpointerTest {
*
* @throws Exception
*/
@SuppressWarnings("serial")
@Test
public final void testMixedTwoPhaseCheckpointCalls() throws Exception {
for (LinkedHashMap<String, CheckpointAction> testPlan : getMixedCallsTestPlan()) {

View file

@ -30,7 +30,6 @@ import java.io.IOException;
import java.net.Inet4Address;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.Optional;
/**
* Default configuration for a producer or consumer used in integration tests.
@ -82,7 +81,6 @@ public abstract class KCLAppConfig {
}
public final KinesisAsyncClient buildAsyncKinesisClient() throws URISyntaxException, IOException {
if (kinesisAsyncClient == null) {
// Setup H2 client config.
final NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder()

View file

@ -86,7 +86,7 @@ public class DiagnosticEventsTest {
assertEquals(event.getLargestPoolSize(), largestPoolSize);
assertEquals(event.getMaximumPoolSize(), maximumPoolSize);
assertEquals(event.getLeasesOwned(), leaseAssignments.size());
assertEquals(event.getCurrentQueueSize(),0);
assertEquals(0, event.getCurrentQueueSize());
verify(defaultHandler, times(1)).visit(event);
}
@ -110,7 +110,7 @@ public class DiagnosticEventsTest {
assertEquals(event.getExecutorStateEvent().getLargestPoolSize(), largestPoolSize);
assertEquals(event.getExecutorStateEvent().getMaximumPoolSize(), maximumPoolSize);
assertEquals(event.getExecutorStateEvent().getLeasesOwned(), leaseAssignments.size());
assertEquals(event.getExecutorStateEvent().getCurrentQueueSize(),0);
assertEquals(0, event.getExecutorStateEvent().getCurrentQueueSize());
assertTrue(event.getThrowable() instanceof TestRejectedTaskException);
verify(defaultHandler, times(1)).visit(event);
@ -136,7 +136,7 @@ public class DiagnosticEventsTest {
assertEquals(executorStateEvent.getLargestPoolSize(), largestPoolSize);
assertEquals(executorStateEvent.getMaximumPoolSize(), maximumPoolSize);
assertEquals(executorStateEvent.getLeasesOwned(), leaseAssignments.size());
assertEquals(executorStateEvent.getCurrentQueueSize(),0);
assertEquals(0, executorStateEvent.getCurrentQueueSize());
RejectedTaskEvent rejectedTaskEvent = factory.rejectedTaskEvent(executorStateEvent,
new TestRejectedTaskException());
@ -145,7 +145,7 @@ public class DiagnosticEventsTest {
assertEquals(rejectedTaskEvent.getExecutorStateEvent().getLargestPoolSize(), largestPoolSize);
assertEquals(rejectedTaskEvent.getExecutorStateEvent().getMaximumPoolSize(), maximumPoolSize);
assertEquals(rejectedTaskEvent.getExecutorStateEvent().getLeasesOwned(), leaseAssignments.size());
assertEquals(rejectedTaskEvent.getExecutorStateEvent().getCurrentQueueSize(),0);
assertEquals(0, rejectedTaskEvent.getExecutorStateEvent().getCurrentQueueSize());
assertTrue(rejectedTaskEvent.getThrowable() instanceof TestRejectedTaskException);
}

View file

@ -15,12 +15,10 @@
package software.amazon.kinesis.coordinator;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.not;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

View file

@ -35,7 +35,6 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.internal.verification.VerificationModeFactory.atMost;
import static software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy.*;
import java.time.Duration;
import java.util.ArrayList;
@ -104,6 +103,9 @@ import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.MetricsConfig;
import software.amazon.kinesis.processor.Checkpointer;
import software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy;
import software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy.AutoDetectionAndDeferredDeletionStrategy;
import software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy.NoLeaseDeletionStrategy;
import software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy.ProvidedStreamsDeferredDeletionStrategy;
import software.amazon.kinesis.processor.MultiStreamTracker;
import software.amazon.kinesis.processor.ProcessorConfig;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

View file

@ -14,8 +14,6 @@
*/
package software.amazon.kinesis.coordinator;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Unit tests of Worker.
*/
@ -118,8 +116,7 @@ public class WorkerTest {
private static final IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY_V2 = SAMPLE_RECORD_PROCESSOR_FACTORY;
*//**
*//*
* Test method for {@link Worker#getApplicationName()}.
*//*
@Test
@ -346,7 +343,7 @@ public class WorkerTest {
Assert.assertTrue(count > 0);
}
*//**
*//*
* Runs worker with threadPoolSize == numShards
* Test method for {@link Worker#run()}.
*//*
@ -357,7 +354,7 @@ public class WorkerTest {
runAndTestWorker(numShards, threadPoolSize);
}
*//**
*//*
* Runs worker with threadPoolSize < numShards
* Test method for {@link Worker#run()}.
*//*
@ -368,7 +365,7 @@ public class WorkerTest {
runAndTestWorker(numShards, threadPoolSize);
}
*//**
*//*
* Runs worker with threadPoolSize > numShards
* Test method for {@link Worker#run()}.
*//*
@ -379,7 +376,7 @@ public class WorkerTest {
runAndTestWorker(numShards, threadPoolSize);
}
*//**
*//*
* Runs worker with threadPoolSize < numShards
* Test method for {@link Worker#run()}.
*//*
@ -395,7 +392,7 @@ public class WorkerTest {
runAndTestWorker(shardList, threadPoolSize, initialLeases, callProcessRecordsForEmptyRecordList, numberOfRecordsPerShard, config);
}
*//**
*//*
* Runs worker with threadPoolSize < numShards
* Test method for {@link Worker#run()}.
*//*
@ -557,7 +554,7 @@ public class WorkerTest {
verify(v2RecordProcessor, times(1)).shutdown(any(ShutdownInput.class));
}
*//**
*//*
* This test is testing the {@link Worker}'s shutdown behavior and by extension the behavior of
* {@link ThreadPoolExecutor#shutdownNow()}. It depends on the thread pool sending an interrupt to the pool threads.
* This behavior makes the test a bit racy, since we need to ensure a specific order of events.
@ -1734,7 +1731,8 @@ public class WorkerTest {
return new ReflectionFieldMatcher<>(itemClass, fieldName, fieldMatcher);
}
}
*//**
*//*
* Returns executor service that will be owned by the worker. This is useful to test the scenario
* where worker shuts down the executor service also during shutdown flow.
*
@ -1756,9 +1754,6 @@ public class WorkerTest {
return shards;
}
*//**
* @return
*//*
private List<Shard> createShardListWithOneSplit() {
List<Shard> shards = new ArrayList<Shard>();
SequenceNumberRange range0 = ShardObjectHelper.newSequenceNumberRange("39428", "987324");

View file

@ -1592,7 +1592,7 @@ public class HierarchicalShardSyncerTest {
assertExpectedLeasesAreCreated(SHARD_GRAPH_A, shardIdsOfCurrentLeases, INITIAL_POSITION_TRIM_HORIZON);
}
/**
/*
* <pre>
* Shard structure (x-axis is epochs):
* 0 3 6 9
@ -1869,7 +1869,7 @@ public class HierarchicalShardSyncerTest {
assertExpectedLeasesAreCreated(SHARD_GRAPH_A, shardIdsOfCurrentLeases, INITIAL_POSITION_AT_TIMESTAMP);
}
/**
/*
* <pre>
* Shard structure (x-axis is epochs):
* 0 3 6 9
@ -2325,12 +2325,16 @@ public class HierarchicalShardSyncerTest {
@Test
public void testEmptyLeaseTablePopulatesLeasesWithCompleteHashRangeAfterTwoRetries() throws Exception {
final List<Shard> shardsWithIncompleteHashRange = Arrays.asList(
ShardObjectHelper.newShard("shardId-0", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange(ShardObjectHelper.MIN_HASH_KEY, "69")),
ShardObjectHelper.newShard("shardId-1", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange("71", ShardObjectHelper.MAX_HASH_KEY))
ShardObjectHelper.newShard("shardId-0", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"),
ShardObjectHelper.newHashKeyRange(ShardObjectHelper.MIN_HASH_KEY, "69")),
ShardObjectHelper.newShard("shardId-1", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"),
ShardObjectHelper.newHashKeyRange("71", ShardObjectHelper.MAX_HASH_KEY))
);
final List<Shard> shardsWithCompleteHashRange = Arrays.asList(
ShardObjectHelper.newShard("shardId-2", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange(ShardObjectHelper.MIN_HASH_KEY, "420")),
ShardObjectHelper.newShard("shardId-3", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange("421", ShardObjectHelper.MAX_HASH_KEY))
ShardObjectHelper.newShard("shardId-2", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"),
ShardObjectHelper.newHashKeyRange(ShardObjectHelper.MIN_HASH_KEY, "420")),
ShardObjectHelper.newShard("shardId-3", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"),
ShardObjectHelper.newHashKeyRange("421", ShardObjectHelper.MAX_HASH_KEY))
);
when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(true);
@ -2352,8 +2356,10 @@ public class HierarchicalShardSyncerTest {
@Test
public void testEmptyLeaseTablePopulatesLeasesWithCompleteHashRange() throws Exception {
final List<Shard> shardsWithCompleteHashRange = Arrays.asList(
ShardObjectHelper.newShard("shardId-2", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange(ShardObjectHelper.MIN_HASH_KEY, "420")),
ShardObjectHelper.newShard("shardId-3", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange("421", ShardObjectHelper.MAX_HASH_KEY))
ShardObjectHelper.newShard("shardId-2", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"),
ShardObjectHelper.newHashKeyRange(ShardObjectHelper.MIN_HASH_KEY, "420")),
ShardObjectHelper.newShard("shardId-3", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"),
ShardObjectHelper.newHashKeyRange("421", ShardObjectHelper.MAX_HASH_KEY))
);
when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(true);

View file

@ -14,10 +14,11 @@
*/
package software.amazon.kinesis.leases;
import java.awt.*;
import java.awt.Button;
import java.awt.Dimension;
import java.awt.GridLayout;
import java.awt.event.ActionEvent;
import java.awt.event.ActionListener;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
@ -25,7 +26,10 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.swing.*;
import javax.swing.BoxLayout;
import javax.swing.JFrame;
import javax.swing.JLabel;
import javax.swing.JPanel;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
@ -54,9 +58,8 @@ public class LeaseCoordinatorExerciser {
private static final long INITIAL_LEASE_TABLE_READ_CAPACITY = 10L;
private static final long INITIAL_LEASE_TABLE_WRITE_CAPACITY = 50L;
public static void main(String[] args) throws InterruptedException, DependencyException, InvalidStateException,
ProvisionedThroughputException, IOException {
public static void main(String[] args) throws DependencyException, InvalidStateException,
ProvisionedThroughputException {
int numCoordinators = 9;
int numLeases = 73;
int leaseDurationMillis = 10000;

View file

@ -15,16 +15,8 @@
package software.amazon.kinesis.leases;
import lombok.extern.slf4j.Slf4j;
import org.junit.Rule;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
import org.mockito.Mock;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseRefresher;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseSerializer;
import software.amazon.kinesis.leases.dynamodb.TableCreatorCallback;
@Slf4j
public class LeaseIntegrationBillingModePayPerRequestTest extends LeaseIntegrationTest {

View file

@ -26,11 +26,9 @@ import java.util.List;
import java.util.Set;
import java.util.UUID;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
public class ShardInfoTest {

View file

@ -56,7 +56,6 @@ public class ShardObjectHelper {
private ShardObjectHelper() {
}
/** Helper method to create a new shard object.
* @param shardId
* @param parentShardId
@ -84,7 +83,9 @@ public class ShardObjectHelper {
String adjacentParentShardId,
SequenceNumberRange sequenceNumberRange,
HashKeyRange hashKeyRange) {
return Shard.builder().shardId(shardId).parentShardId(parentShardId).adjacentParentShardId(adjacentParentShardId).sequenceNumberRange(sequenceNumberRange).hashKeyRange(hashKeyRange).build();
return Shard.builder().shardId(shardId).parentShardId(parentShardId)
.adjacentParentShardId(adjacentParentShardId).sequenceNumberRange(sequenceNumberRange)
.hashKeyRange(hashKeyRange).build();
}
/** Helper method.
@ -116,5 +117,4 @@ public class ShardObjectHelper {
return parentShardIds;
}
}

View file

@ -22,7 +22,6 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;

View file

@ -1,6 +1,5 @@
package software.amazon.kinesis.leases.dynamodb;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -12,7 +11,7 @@ import software.amazon.kinesis.metrics.MetricsFactory;
import java.util.UUID;
import static org.mockito.Mockito.times;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -51,17 +50,34 @@ public class DynamoDBLeaseCoordinatorTest {
leaseCoordinator.initialize();
verify(leaseRefresher, times(1)).createLeaseTableIfNotExists();
verify(leaseRefresher, times(1)).waitUntilLeaseTableExists(SECONDS_BETWEEN_POLLS, TIMEOUT_SECONDS);
verify(leaseRefresher).createLeaseTableIfNotExists();
verify(leaseRefresher).waitUntilLeaseTableExists(SECONDS_BETWEEN_POLLS, TIMEOUT_SECONDS);
}
@Test
@Test(expected = DependencyException.class)
public void testInitialize_tableCreationFails() throws Exception {
when(leaseRefresher.createLeaseTableIfNotExists()).thenReturn(false);
when(leaseRefresher.waitUntilLeaseTableExists(SECONDS_BETWEEN_POLLS, TIMEOUT_SECONDS)).thenReturn(false);
Assert.assertThrows(DependencyException.class, () -> leaseCoordinator.initialize());
verify(leaseRefresher, times(1)).createLeaseTableIfNotExists();
verify(leaseRefresher, times(1)).waitUntilLeaseTableExists(SECONDS_BETWEEN_POLLS, TIMEOUT_SECONDS);
try {
leaseCoordinator.initialize();
} finally {
verify(leaseRefresher).createLeaseTableIfNotExists();
verify(leaseRefresher).waitUntilLeaseTableExists(SECONDS_BETWEEN_POLLS, TIMEOUT_SECONDS);
}
}
/**
* Validates a {@link NullPointerException} is not thrown when the lease taker
* is stopped before it starts/exists.
*
* @see <a href="https://github.com/awslabs/amazon-kinesis-client/issues/745">issue #745</a>
* @see <a href="https://github.com/awslabs/amazon-kinesis-client/issues/900">issue #900</a>
*/
@Test
public void testStopLeaseTakerBeforeStart() {
leaseCoordinator.stopLeaseTaker();
assertTrue(leaseCoordinator.getAssignments().isEmpty());
}
}

View file

@ -26,7 +26,6 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@ -158,7 +157,6 @@ public class DynamoDBLeaseRefresherTest {
verify(mockScanFuture, times(2)).get(anyLong(), any(TimeUnit.class));
verify(dynamoDbClient, times(2)).scan(any(ScanRequest.class));
}
@Test

View file

@ -37,7 +37,7 @@ import static org.junit.Assert.assertThat;
@RunWith(MockitoJUnitRunner.class)
public class DynamoDBLeaseRenewerBillingModePayPerRequestIntegrationTest extends
LeaseIntegrationBillingModePayPerRequestTest {
private final String TEST_METRIC = "TestOperation";
private static final String TEST_METRIC = "TestOperation";
// This test case's leases last 2 seconds
private static final long LEASE_DURATION_MILLIS = 2000L;

View file

@ -36,7 +36,7 @@ import software.amazon.kinesis.metrics.NullMetricsFactory;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
@RunWith(MockitoJUnitRunner.class)
public class DynamoDBLeaseRenewerIntegrationTest extends LeaseIntegrationTest {
private final String TEST_METRIC = "TestOperation";
private static final String TEST_METRIC = "TestOperation";
// This test case's leases last 2 seconds
private static final long LEASE_DURATION_MILLIS = 2000L;

View file

@ -15,9 +15,7 @@
package software.amazon.kinesis.leases.dynamodb;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -153,7 +151,6 @@ public class DynamoDBLeaseTakerIntegrationTest extends LeaseIntegrationTest {
assertThat(addedLeases.values().containsAll(allLeases), equalTo(true));
}
/**
* Sets the leaseDurationMillis to 0, ensuring a get request to update the existing lease after computing
* leases to take

View file

@ -22,7 +22,6 @@ import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.junit.Before;
import org.junit.Test;
@ -43,7 +42,7 @@ public class BlockOnParentShardTaskTest {
private final String shardId = "shardId-97";
private final String streamId = "123:stream:146";
private final String concurrencyToken = "testToken";
private final List<String> emptyParentShardIds = new ArrayList<String>();
private final List<String> emptyParentShardIds = new ArrayList<>();
private ShardInfo shardInfo;
@Before
@ -77,7 +76,6 @@ public class BlockOnParentShardTaskTest {
@Test
public final void testCallShouldNotThrowBlockedOnParentWhenParentsHaveFinished()
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
ShardInfo shardInfo = null;
BlockOnParentShardTask task = null;
String parent1ShardId = "shardId-1";
@ -118,7 +116,6 @@ public class BlockOnParentShardTaskTest {
@Test
public final void testCallShouldNotThrowBlockedOnParentWhenParentsHaveFinishedMultiStream()
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
ShardInfo shardInfo = null;
BlockOnParentShardTask task = null;
String parent1LeaseKey = streamId + ":" + "shardId-1";
@ -162,7 +159,6 @@ public class BlockOnParentShardTaskTest {
@Test
public final void testCallWhenParentsHaveNotFinished()
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
ShardInfo shardInfo = null;
BlockOnParentShardTask task = null;
String parent1ShardId = "shardId-1";

View file

@ -35,7 +35,6 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -173,7 +172,6 @@ public class ShardConsumerSubscriberTest {
assertThat(subscriber.getAndResetDispatchFailure(), nullValue());
verify(shardConsumer, times(20)).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)), any(Subscription.class));
}
@Test
@ -293,12 +291,10 @@ public class ShardConsumerSubscriberTest {
assertThat(received.size(), equalTo(recordsPublisher.responses.size()));
Stream.iterate(0, i -> i + 1).limit(received.size()).forEach(i -> assertThat(received.get(i),
eqProcessRecordsInput(recordsPublisher.responses.get(i).recordsRetrieved.processRecordsInput())));
}
@Test
public void restartAfterRequestTimerExpiresWhenNotGettingRecordsAfterInitialization() throws Exception {
executorService = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
.setNameFormat("test-" + testName.getMethodName() + "-%04d").setDaemon(true).build());
@ -347,12 +343,10 @@ public class ShardConsumerSubscriberTest {
assertThat(received.size(), equalTo(recordsPublisher.responses.size()));
Stream.iterate(0, i -> i + 1).limit(received.size()).forEach(i -> assertThat(received.get(i),
eqProcessRecordsInput(recordsPublisher.responses.get(i).recordsRetrieved.processRecordsInput())));
}
@Test
public void restartAfterRequestTimerExpiresWhenInitialTaskExecutionIsRejected() throws Exception {
executorService = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
.setNameFormat("test-" + testName.getMethodName() + "-%04d").setDaemon(true).build());
@ -405,7 +399,6 @@ public class ShardConsumerSubscriberTest {
assertThat(received.size(), equalTo(recordsPublisher.responses.size()));
Stream.iterate(0, i -> i + 1).limit(received.size()).forEach(i -> assertThat(received.get(i),
eqProcessRecordsInput(recordsPublisher.responses.get(i).recordsRetrieved.processRecordsInput())));
}
private Object directlyExecuteRunnable(InvocationOnMock invocation) {
@ -623,8 +616,6 @@ public class ShardConsumerSubscriberTest {
/**
* Test to validate the warning message from ShardConsumer is not suppressed with the default configuration of 0
*
* @throws Exception
*/
@Test
public void noLoggingSuppressionNeededOnHappyPathTest() {
@ -648,8 +639,6 @@ public class ShardConsumerSubscriberTest {
/**
* Test to validate the warning message from ShardConsumer is not suppressed with the default configuration of 0
*
* @throws Exception
*/
@Test
public void loggingNotSuppressedAfterTimeoutTest() {
@ -677,8 +666,6 @@ public class ShardConsumerSubscriberTest {
/**
* Test to validate the warning message from ShardConsumer is successfully supressed if we only have intermittant
* readTimeouts.
*
* @throws Exception
*/
@Test
public void loggingSuppressedAfterIntermittentTimeoutTest() {
@ -705,8 +692,6 @@ public class ShardConsumerSubscriberTest {
/**
* Test to validate the warning message from ShardConsumer is successfully logged if multiple sequential timeouts
* occur.
*
* @throws Exception
*/
@Test
public void loggingPartiallySuppressedAfterMultipleTimeoutTest() {
@ -733,8 +718,6 @@ public class ShardConsumerSubscriberTest {
/**
* Test to validate the warning message from ShardConsumer is successfully logged if sequential timeouts occur.
*
* @throws Exception
*/
@Test
public void loggingPartiallySuppressedAfterConsecutiveTimeoutTest() {
@ -763,8 +746,6 @@ public class ShardConsumerSubscriberTest {
/**
* Test to validate the non-timeout warning message from ShardConsumer is not suppressed with the default
* configuration of 0
*
* @throws Exception
*/
@Test
public void loggingNotSuppressedOnNonReadTimeoutExceptionNotIgnoringReadTimeoutsExceptionTest() {
@ -792,12 +773,9 @@ public class ShardConsumerSubscriberTest {
/**
* Test to validate the non-timeout warning message from ShardConsumer is not suppressed with 2 ReadTimeouts to
* ignore
*
* @throws Exception
*/
@Test
public void loggingNotSuppressedOnNonReadTimeoutExceptionIgnoringReadTimeoutsTest() {
// We're not throwing a ReadTimeout, so no suppression is expected.
// The test expects a non-ReadTimeout exception to be thrown on requests 3 and 5, and we expect logs on
// each Non-ReadTimeout Exception, no matter what the number of ReadTimeoutsToIgnore we pass in,

View file

@ -17,7 +17,6 @@ package software.amazon.kinesis.metrics;
import org.junit.Test;
import software.amazon.awssdk.services.cloudwatch.model.StandardUnit;
import software.amazon.kinesis.metrics.EndingMetricsScope;
public class EndingMetricsScopeTest {

View file

@ -27,31 +27,28 @@ import static org.junit.Assert.assertThat;
@Slf4j
public class AWSExceptionManagerTest {
private static final String EXPECTED_HANDLING_MARKER = AWSExceptionManagerTest.class.getSimpleName();
private final AWSExceptionManager manager = new AWSExceptionManager();
@Test
public void testSpecificException() {
AWSExceptionManager manager = new AWSExceptionManager();
final String EXPECTED_HANDLING_MARKER = "Handled-TestException";
manager.add(TestException.class, t -> {
log.info("Handling test exception: {} -> {}", t.getMessage(), t.getAdditionalMessage());
return new RuntimeException(EXPECTED_HANDLING_MARKER, t);
});
TestException te = new TestException("Main Mesage", "Sub Message");
TestException te = new TestException("Main Message", "Sub Message");
RuntimeException converted = manager.apply(te);
assertThat(converted, isA(RuntimeException.class));
assertThat(converted.getMessage(), equalTo(EXPECTED_HANDLING_MARKER));
assertThat(converted.getCause(), equalTo(te));
}
@Test
public void testParentException() {
AWSExceptionManager manager = new AWSExceptionManager();
final String EXPECTED_HANDLING_MARKER = "Handled-IllegalStateException";
manager.add(IllegalArgumentException.class, i -> new RuntimeException("IllegalArgument", i));
manager.add(Exception.class, i -> new RuntimeException("RawException", i));
manager.add(IllegalStateException.class, i -> new RuntimeException(EXPECTED_HANDLING_MARKER, i));
@ -66,8 +63,7 @@ public class AWSExceptionManagerTest {
@Test
public void testDefaultHandler() {
final String EXPECTED_HANDLING_MARKER = "Handled-Default";
AWSExceptionManager manager = new AWSExceptionManager().defaultFunction(i -> new RuntimeException(EXPECTED_HANDLING_MARKER, i));
manager.defaultFunction(i -> new RuntimeException(EXPECTED_HANDLING_MARKER, i));
manager.add(IllegalArgumentException.class, i -> new RuntimeException("IllegalArgument", i));
manager.add(Exception.class, i -> new RuntimeException("RawException", i));
@ -83,8 +79,6 @@ public class AWSExceptionManagerTest {
@Test
public void testIdHandler() {
AWSExceptionManager manager = new AWSExceptionManager();
manager.add(IllegalArgumentException.class, i -> new RuntimeException("IllegalArgument", i));
manager.add(Exception.class, i -> new RuntimeException("RawException", i));
manager.add(IllegalStateException.class, i -> i);

View file

@ -24,7 +24,6 @@ import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import org.slf4j.Logger;
import software.amazon.kinesis.retrieval.ThrottlingReporter;
@RunWith(MockitoJUnitRunner.class)
public class ThrottlingReporterTest {
@ -40,7 +39,6 @@ public class ThrottlingReporterTest {
reporter.throttled();
verify(throttleLog).warn(anyString());
verify(throttleLog, never()).error(anyString());
}
@Test
@ -63,7 +61,6 @@ public class ThrottlingReporterTest {
reporter.throttled();
verify(throttleLog, times(2)).warn(anyString());
verify(throttleLog, times(3)).error(anyString());
}
private class LogTestingThrottingReporter extends ThrottlingReporter {

View file

@ -19,7 +19,6 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ -28,7 +27,6 @@ import static org.mockito.Mockito.when;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.StringUtils;
import org.hamcrest.Matchers;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;

View file

@ -8,7 +8,6 @@ import io.reactivex.rxjava3.schedulers.Schedulers;
import io.reactivex.rxjava3.subscribers.SafeSubscriber;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
@ -54,7 +53,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@ -77,7 +75,6 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.never;
@ -172,11 +169,10 @@ public class FanOutRecordsPublisherTest {
assertThat(clientRecordsList.get(i), matchers.get(i));
}
});
}
@Test
public void InvalidEventTest() throws Exception {
public void testInvalidEvent() {
FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN);
ArgumentCaptor<FanOutRecordsPublisher.RecordSubscription> captor = ArgumentCaptor
@ -239,7 +235,6 @@ public class FanOutRecordsPublisherTest {
assertThat(clientRecordsList.get(i), matchers.get(i));
}
});
}
@Test
@ -317,11 +312,10 @@ public class FanOutRecordsPublisherTest {
});
assertThat(source.getCurrentSequenceNumber(), equalTo("3000"));
}
@Test
public void testIfEventsAreNotDeliveredToShardConsumerWhenPreviousEventDeliveryTaskGetsRejected() throws Exception {
public void testIfEventsAreNotDeliveredToShardConsumerWhenPreviousEventDeliveryTaskGetsRejected() {
FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN);
ArgumentCaptor<FanOutRecordsPublisher.RecordSubscription> captor = ArgumentCaptor
@ -395,7 +389,6 @@ public class FanOutRecordsPublisherTest {
});
assertThat(source.getCurrentSequenceNumber(), equalTo("1000"));
}
@Test
@ -443,7 +436,8 @@ public class FanOutRecordsPublisherTest {
@Override public void onNext(RecordsRetrieved input) {
receivedInput.add(input.processRecordsInput());
assertEquals("" + ++lastSeenSeqNum, ((FanOutRecordsPublisher.FanoutRecordsRetrieved)input).continuationSequenceNumber());
assertEquals("" + ++lastSeenSeqNum,
((FanOutRecordsPublisher.FanoutRecordsRetrieved) input).continuationSequenceNumber());
subscription.request(1);
servicePublisher.request(1);
if (receivedInput.size() == totalServicePublisherEvents) {
@ -488,12 +482,10 @@ public class FanOutRecordsPublisherTest {
});
assertThat(source.getCurrentSequenceNumber(), equalTo(totalServicePublisherEvents + ""));
}
@Test
public void testIfStreamOfEventsAndOnCompleteAreDeliveredInOrderWithBackpressureAdheringServicePublisher() throws Exception {
CountDownLatch onS2SCallLatch = new CountDownLatch(2);
doAnswer(new Answer() {
@ -549,7 +541,8 @@ public class FanOutRecordsPublisherTest {
@Override public void onNext(RecordsRetrieved input) {
receivedInput.add(input.processRecordsInput());
assertEquals("" + ++lastSeenSeqNum, ((FanOutRecordsPublisher.FanoutRecordsRetrieved)input).continuationSequenceNumber());
assertEquals("" + ++lastSeenSeqNum,
((FanOutRecordsPublisher.FanoutRecordsRetrieved) input).continuationSequenceNumber());
subscription.request(1);
servicePublisher.request(1);
if (receivedInput.size() == triggerCompleteAtNthEvent) {
@ -599,7 +592,6 @@ public class FanOutRecordsPublisherTest {
// Let's wait for sometime to allow the publisher to re-subscribe
onS2SCallLatch.await(5000, TimeUnit.MILLISECONDS);
verify(kinesisClient, times(2)).subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture());
}
@Test
@ -728,7 +720,6 @@ public class FanOutRecordsPublisherTest {
// With shard end event, onComplete must be propagated to the subscriber.
onCompleteLatch.await(5000, TimeUnit.MILLISECONDS);
assertTrue("OnComplete should be triggered", isOnCompleteTriggered[0]);
}
@Test
@ -783,7 +774,8 @@ public class FanOutRecordsPublisherTest {
@Override public void onNext(RecordsRetrieved input) {
receivedInput.add(input.processRecordsInput());
assertEquals("" + ++lastSeenSeqNum, ((FanOutRecordsPublisher.FanoutRecordsRetrieved)input).continuationSequenceNumber());
assertEquals("" + ++lastSeenSeqNum,
((FanOutRecordsPublisher.FanoutRecordsRetrieved) input).continuationSequenceNumber());
subscription.request(1);
servicePublisher.request(1);
if (receivedInput.size() == triggerErrorAtNthEvent) {
@ -831,7 +823,6 @@ public class FanOutRecordsPublisherTest {
assertThat(source.getCurrentSequenceNumber(), equalTo(triggerErrorAtNthEvent + ""));
onErrorReceiveLatch.await(5000, TimeUnit.MILLISECONDS);
assertTrue("OnError should have been thrown", isOnErrorThrown[0]);
}
@Test
@ -879,7 +870,8 @@ public class FanOutRecordsPublisherTest {
@Override public void onNext(RecordsRetrieved input) {
receivedInput.add(input.processRecordsInput());
assertEquals("" + ++lastSeenSeqNum, ((FanOutRecordsPublisher.FanoutRecordsRetrieved)input).continuationSequenceNumber());
assertEquals("" + ++lastSeenSeqNum,
((FanOutRecordsPublisher.FanoutRecordsRetrieved) input).continuationSequenceNumber());
subscription.request(1);
servicePublisher.request(1);
if (receivedInput.size() == totalServicePublisherEvents) {
@ -924,7 +916,6 @@ public class FanOutRecordsPublisherTest {
});
assertThat(source.getCurrentSequenceNumber(), equalTo(totalServicePublisherEvents + ""));
}
@Test
@ -973,7 +964,8 @@ public class FanOutRecordsPublisherTest {
@Override public void onNext(RecordsRetrieved input) {
receivedInput.add(input.processRecordsInput());
assertEquals("" + ++lastSeenSeqNum, ((FanOutRecordsPublisher.FanoutRecordsRetrieved)input).continuationSequenceNumber());
assertEquals("" + ++lastSeenSeqNum,
((FanOutRecordsPublisher.FanoutRecordsRetrieved) input).continuationSequenceNumber());
subscription.request(1);
servicePublisher.request(1);
}
@ -1126,7 +1118,6 @@ public class FanOutRecordsPublisherTest {
assertThat(clientRecordsList.get(i), matchers.get(i));
}
});
}
@Test
@ -1242,7 +1233,6 @@ public class FanOutRecordsPublisherTest {
verifyRecords(nonFailingSubscriber.received.get(0).records(), matchers);
verifyRecords(nonFailingSubscriber.received.get(1).records(), nextMatchers);
}
@Test
@ -1457,11 +1447,11 @@ public class FanOutRecordsPublisherTest {
flowCaptor.getValue().exceptionOccurred(exception);
Optional<OnErrorEvent> onErrorEvent = subscriber.events.stream().filter(e -> e instanceof OnErrorEvent).map(e -> (OnErrorEvent)e).findFirst();
Optional<OnErrorEvent> onErrorEvent = subscriber.events.stream().filter(e -> e instanceof OnErrorEvent)
.map(e -> (OnErrorEvent) e).findFirst();
assertThat(onErrorEvent, equalTo(Optional.of(new OnErrorEvent(exception))));
assertThat(acquireTimeoutLogged.get(), equalTo(true));
}
private void verifyRecords(List<KinesisClientRecord> clientRecordsList, List<KinesisClientRecordMatcher> matchers) {

View file

@ -22,7 +22,6 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
@ -277,7 +276,8 @@ public class PrefetchRecordsPublisherIntegrationTest {
@Override
public DataFetcherResult getRecords() {
GetRecordsResponse getRecordsResult = GetRecordsResponse.builder().records(new ArrayList<>(records)).nextShardIterator(nextShardIterator).millisBehindLatest(1000L).build();
GetRecordsResponse getRecordsResult = GetRecordsResponse.builder().records(new ArrayList<>(records))
.nextShardIterator(nextShardIterator).millisBehindLatest(1000L).build();
return new AdvancingResult(getRecordsResult);
}

View file

@ -422,8 +422,10 @@ public class PrefetchRecordsPublisherTest {
@Test
public void testRetryableRetrievalExceptionContinues() {
GetRecordsResponse response = GetRecordsResponse.builder().millisBehindLatest(100L).records(Collections.emptyList()).nextShardIterator(NEXT_SHARD_ITERATOR).build();
when(getRecordsRetrievalStrategy.getRecords(anyInt())).thenThrow(new RetryableRetrievalException("Timeout", new TimeoutException("Timeout"))).thenReturn(response);
GetRecordsResponse response = GetRecordsResponse.builder().millisBehindLatest(100L)
.records(Collections.emptyList()).nextShardIterator(NEXT_SHARD_ITERATOR).build();
when(getRecordsRetrievalStrategy.getRecords(anyInt()))
.thenThrow(new RetryableRetrievalException("Timeout", new TimeoutException("Timeout"))).thenReturn(response);
getRecordsCache.start(sequenceNumber, initialPosition);
@ -725,7 +727,7 @@ public class PrefetchRecordsPublisherTest {
private static final int LOSS_EVERY_NTH_RECORD = 50;
private static int recordCounter = 0;
private static final ScheduledExecutorService consumerHealthChecker = Executors.newScheduledThreadPool(1);
private static final ScheduledExecutorService CONSUMER_HEALTH_CHECKER = Executors.newScheduledThreadPool(1);
public LossyNotificationSubscriber(Subscriber<RecordsRetrieved> delegate, RecordsPublisher recordsPublisher) {
super(delegate, recordsPublisher);
@ -738,7 +740,7 @@ public class PrefetchRecordsPublisherTest {
getDelegateSubscriber().onNext(recordsRetrieved);
} else {
log.info("Record Loss Triggered");
consumerHealthChecker.schedule(() -> {
CONSUMER_HEALTH_CHECKER.schedule(() -> {
getRecordsPublisher().restartFrom(recordsRetrieved);
Flowable.fromPublisher(getRecordsPublisher()).subscribeOn(Schedulers.computation())
.observeOn(Schedulers.computation(), true, 8).subscribe(this);

View file

@ -2,14 +2,8 @@ package software.amazon.kinesis.utils;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.dynamodb.model.ListTablesRequest;
import software.amazon.awssdk.services.dynamodb.model.ListTablesResponse;
import software.amazon.kinesis.common.FutureUtils;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@Slf4j
@ -28,7 +22,7 @@ public abstract class AWSResourceManager {
/**
* Get a list of all the names of resources of a specified type
* @return
*
* @throws Exception
*/
public abstract List<String> getAllResourceNames() throws Exception;

View file

@ -12,8 +12,6 @@ import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
import software.amazon.awssdk.services.dynamodb.model.TableStatus;
import software.amazon.kinesis.common.FutureUtils;
import java.io.IOException;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;

View file

@ -0,0 +1,8 @@
<!DOCTYPE suppressions PUBLIC
"-//Puppy Crawl//DTD Suppressions 1.1//EN"
"http://www.puppycrawl.com/dtds/suppressions_1_1.dtd">
<suppressions>
<!-- Disable all checks for protobuf-generated files. -->
<suppress files=".*/kpl/Messages.java" checks="[a-zA-Z0-9]*"/>
</suppressions>

50
checkstyle/checkstyle.xml Normal file
View file

@ -0,0 +1,50 @@
<?xml version="1.0"?>
<!DOCTYPE module PUBLIC
"-//Checkstyle//DTD Checkstyle Configuration 1.3//EN"
"https://checkstyle.org/dtds/configuration_1_3.dtd">
<module name="Checker">
<module name="FileTabCharacter">
<property name="eachLine" value="true"/>
</module>
<module name="LineLength">
<property name="fileExtensions" value="java"/>
<property name="max" value="170"/>
<property name="ignorePattern" value="^package.*|^import.*|a href|href|http://|https://|ftp://"/>
</module>
<module name="SuppressWithPlainTextCommentFilter">
<property name="offCommentFormat" value="CHECKSTYLE.OFF\: ([\w\|]+)"/>
<property name="onCommentFormat" value="CHECKSTYLE.ON\: ([\w\|]+)"/>
<!-- $1 refers to the first match group in the regex defined in commentFormat -->
<property name="checkFormat" value="$1"/>
</module>
<module name="TreeWalker">
<module name="AvoidStarImport"/>
<module name="ArrayTrailingComma"/>
<module name="ConstantName"/>
<module name="CovariantEquals"/>
<module name="EmptyStatement"/>
<module name="EqualsHashCode"/>
<module name="InvalidJavadocPosition"/>
<module name="LocalFinalVariableName"/>
<module name="LocalVariableName"/>
<module name="MemberName"/>
<module name="MethodName">
<!-- Method names must start with a lowercase letter. -->
<property name="format" value="^[a-z]\w*$"/>
</module>
<module name="NeedBraces"/>
<module name="OneStatementPerLine"/>
<module name="OneTopLevelClass"/>
<module name="OuterTypeFilename"/>
<module name="ParameterName"/>
<module name="RedundantImport"/>
<module name="UnusedImports"/>
<module name="UpperEll"/>
<module name="WhitespaceAfter"/>
</module>
</module>

24
pom.xml
View file

@ -22,7 +22,7 @@
<artifactId>amazon-kinesis-client-pom</artifactId>
<packaging>pom</packaging>
<name>Amazon Kinesis Client Library</name>
<version>2.5.1-SNAPSHOT</version>
<version>2.5.1</version>
<description>The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data
from Amazon Kinesis.
</description>
@ -72,6 +72,28 @@
</distributionManagement>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>3.3.0</version>
<configuration>
<configLocation>checkstyle/checkstyle.xml</configLocation>
<consoleOutput>true</consoleOutput>
<failOnViolation>true</failOnViolation>
<includeTestSourceDirectory>true</includeTestSourceDirectory>
<suppressionsLocation>checkstyle/checkstyle-suppressions.xml</suppressionsLocation>
</configuration>
<executions>
<execution>
<phase>validate</phase>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<plugin>