diff --git a/.github/dependabot.yml b/.github/dependabot.yml
index cad9988a..fd4483ad 100644
--- a/.github/dependabot.yml
+++ b/.github/dependabot.yml
@@ -2,12 +2,18 @@ version: 2
updates:
- package-ecosystem: "maven"
directory: "/"
+ labels:
+ - "dependencies"
+ - "v2.x"
schedule:
interval: "weekly"
# branch - v1.x
- package-ecosystem: "maven"
directory: "/"
+ labels:
+ - "dependencies"
+ - "v1.x"
target-branch: "v1.x"
schedule:
interval: "weekly"
diff --git a/amazon-kinesis-client-multilang/pom.xml b/amazon-kinesis-client-multilang/pom.xml
index 8494c3c8..673b3eaa 100644
--- a/amazon-kinesis-client-multilang/pom.xml
+++ b/amazon-kinesis-client-multilang/pom.xml
@@ -21,7 +21,7 @@
amazon-kinesis-client-pom
software.amazon.kinesis
- 2.5.1
+ 2.5.2-SNAPSHOT
4.0.0
diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/credentials/V2CredentialWrapper.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/credentials/V2CredentialWrapper.java
index d0afdf75..50880a83 100644
--- a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/credentials/V2CredentialWrapper.java
+++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/credentials/V2CredentialWrapper.java
@@ -32,7 +32,8 @@ public class V2CredentialWrapper implements AwsCredentialsProvider {
public AwsCredentials resolveCredentials() {
AWSCredentials current = oldCredentialsProvider.getCredentials();
if (current instanceof AWSSessionCredentials) {
- return AwsSessionCredentials.create(current.getAWSAccessKeyId(), current.getAWSSecretKey(), ((AWSSessionCredentials) current).getSessionToken());
+ return AwsSessionCredentials.create(current.getAWSAccessKeyId(), current.getAWSSecretKey(),
+ ((AWSSessionCredentials) current).getSessionToken());
}
return new AwsCredentials() {
@Override
diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/AWSCredentialsProviderPropertyValueDecoderTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/AWSCredentialsProviderPropertyValueDecoderTest.java
index 36f496d3..ced63f24 100644
--- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/AWSCredentialsProviderPropertyValueDecoderTest.java
+++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/AWSCredentialsProviderPropertyValueDecoderTest.java
@@ -36,8 +36,8 @@ public class AWSCredentialsProviderPropertyValueDecoderTest {
private static final String TEST_ACCESS_KEY_ID = "123";
private static final String TEST_SECRET_KEY = "456";
- private String credentialName1 = "software.amazon.kinesis.multilang.config.AWSCredentialsProviderPropertyValueDecoderTest$AlwaysSucceedCredentialsProvider";
- private String credentialName2 = "software.amazon.kinesis.multilang.config.AWSCredentialsProviderPropertyValueDecoderTest$ConstructorCredentialsProvider";
+ private final String credentialName1 = AlwaysSucceedCredentialsProvider.class.getName();
+ private final String credentialName2 = ConstructorCredentialsProvider.class.getName();
private AWSCredentialsProviderPropertyValueDecoder decoder = new AWSCredentialsProviderPropertyValueDecoder();
@ToString
diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfiguratorTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfiguratorTest.java
index 1f05240a..2b02ea43 100644
--- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfiguratorTest.java
+++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfiguratorTest.java
@@ -36,35 +36,25 @@ import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
-import org.junit.Rule;
import org.junit.Test;
import com.google.common.collect.ImmutableSet;
-import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
-import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.metrics.MetricsLevel;
-import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
@RunWith(MockitoJUnitRunner.class)
public class KinesisClientLibConfiguratorTest {
- private String credentialName1 = "software.amazon.kinesis.multilang.config.KinesisClientLibConfiguratorTest$AlwaysSucceedCredentialsProvider";
- private String credentialName2 = "software.amazon.kinesis.multilang.config.KinesisClientLibConfiguratorTest$AlwaysFailCredentialsProvider";
- private String credentialNameKinesis = "software.amazon.kinesis.multilang.config.KinesisClientLibConfiguratorTest$AlwaysSucceedCredentialsProviderKinesis";
- private String credentialNameDynamoDB = "software.amazon.kinesis.multilang.config.KinesisClientLibConfiguratorTest$AlwaysSucceedCredentialsProviderDynamoDB";
- private String credentialNameCloudWatch = "software.amazon.kinesis.multilang.config.KinesisClientLibConfiguratorTest$AlwaysSucceedCredentialsProviderCloudWatch";
- private KinesisClientLibConfigurator configurator = new KinesisClientLibConfigurator();
-
- @Rule
- public final ExpectedException thrown = ExpectedException.none();
-
- @Mock
- private ShardRecordProcessorFactory shardRecordProcessorFactory;
+ private final String credentialName1 = AlwaysSucceedCredentialsProvider.class.getName();
+ private final String credentialName2 = AlwaysFailCredentialsProvider.class.getName();
+ private final String credentialNameKinesis = AlwaysSucceedCredentialsProviderKinesis.class.getName();
+ private final String credentialNameDynamoDB = AlwaysSucceedCredentialsProviderDynamoDB.class.getName();
+ private final String credentialNameCloudWatch = AlwaysSucceedCredentialsProviderCloudWatch.class.getName();
+ private final KinesisClientLibConfigurator configurator = new KinesisClientLibConfigurator();
@Test
public void testWithBasicSetup() {
@@ -241,52 +231,32 @@ public class KinesisClientLibConfiguratorTest {
"AWSCredentialsProvider = ABCD," + credentialName1, "workerId = 123",
"initialPositionInStream = TriM_Horizon", "maxGetRecordsThreadPool = 0",
"retryGetRecordsInSeconds = 0" }, '\n');
- InputStream input = new ByteArrayInputStream(test.getBytes());
-
- try {
- configurator.getConfiguration(input);
- } catch (Exception e) {
- fail("Don't expect to fail on invalid variable value");
-
- }
+ getConfiguration(test);
}
@Test
public void testWithInvalidIntValue() {
String test = StringUtils.join(new String[] { "streamName = a", "applicationName = b",
"AWSCredentialsProvider = " + credentialName1, "workerId = 123", "failoverTimeMillis = 100nf" }, '\n');
- InputStream input = new ByteArrayInputStream(test.getBytes());
-
- try {
- configurator.getConfiguration(input);
- } catch (Exception e) {
- fail("Don't expect to fail on invalid variable value");
- }
+ getConfiguration(test);
}
@Test
public void testWithNegativeIntValue() {
String test = StringUtils.join(new String[] { "streamName = a", "applicationName = b",
"AWSCredentialsProvider = " + credentialName1, "workerId = 123", "failoverTimeMillis = -12" }, '\n');
- InputStream input = new ByteArrayInputStream(test.getBytes());
// separate input stream with getConfiguration to explicitly catch exception from the getConfiguration statement
- try {
- configurator.getConfiguration(input);
- } catch (Exception e) {
- fail("Don't expect to fail on invalid variable value");
- }
+ getConfiguration(test);
}
@Test(expected = IllegalArgumentException.class)
public void testWithMissingCredentialsProvider() {
-
String test = StringUtils.join(new String[] { "streamName = a", "applicationName = b", "workerId = 123",
"failoverTimeMillis = 100", "shardSyncIntervalMillis = 500" }, '\n');
- InputStream input = new ByteArrayInputStream(test.getBytes());
// separate input stream with getConfiguration to explicitly catch exception from the getConfiguration statement
- configurator.getConfiguration(input);
+ getConfiguration(test);
}
@Test
@@ -295,8 +265,7 @@ public class KinesisClientLibConfiguratorTest {
new String[] { "streamName = a", "applicationName = b", "AWSCredentialsProvider = " + credentialName1,
"failoverTimeMillis = 100", "shardSyncIntervalMillis = 500" },
'\n');
- InputStream input = new ByteArrayInputStream(test.getBytes());
- MultiLangDaemonConfiguration config = configurator.getConfiguration(input);
+ MultiLangDaemonConfiguration config = getConfiguration(test);
// if workerId is not provided, configurator should assign one for it automatically
assertNotNull(config.getWorkerIdentifier());
@@ -311,14 +280,11 @@ public class KinesisClientLibConfiguratorTest {
"workerId = 123",
"failoverTimeMillis = 100" },
'\n');
- InputStream input = new ByteArrayInputStream(test.getBytes());
-
- configurator.getConfiguration(input);
+ getConfiguration(test);
}
@Test(expected = IllegalArgumentException.class)
public void testWithEmptyStreamNameAndMissingStreamArn() {
-
String test = StringUtils.join(new String[] {
"applicationName = b",
"AWSCredentialsProvider = " + credentialName1,
@@ -327,18 +293,14 @@ public class KinesisClientLibConfiguratorTest {
"streamName = ",
"streamArn = "},
'\n');
- InputStream input = new ByteArrayInputStream(test.getBytes());
-
- configurator.getConfiguration(input);
+ getConfiguration(test);
}
@Test(expected = NullPointerException.class)
public void testWithMissingApplicationName() {
-
String test = StringUtils.join(new String[] { "streamName = a", "AWSCredentialsProvider = " + credentialName1,
"workerId = 123", "failoverTimeMillis = 100" }, '\n');
- InputStream input = new ByteArrayInputStream(test.getBytes());
- configurator.getConfiguration(input);
+ getConfiguration(test);
}
@Test
@@ -347,11 +309,10 @@ public class KinesisClientLibConfiguratorTest {
new String[] { "streamName = a", "applicationName = b", "AWSCredentialsProvider = " + credentialName2,
"failoverTimeMillis = 100", "shardSyncIntervalMillis = 500" },
'\n');
- InputStream input = new ByteArrayInputStream(test.getBytes());
+ MultiLangDaemonConfiguration config = getConfiguration(test);
// separate input stream with getConfiguration to explicitly catch exception from the getConfiguration statement
try {
- MultiLangDaemonConfiguration config = configurator.getConfiguration(input);
config.getKinesisCredentialsProvider().build(AwsCredentialsProvider.class).resolveCredentials();
fail("expect failure with wrong credentials provider");
} catch (Exception e) {
@@ -367,25 +328,12 @@ public class KinesisClientLibConfiguratorTest {
"AWSCredentialsProviderDynamoDB = " + credentialNameDynamoDB,
"AWSCredentialsProviderCloudWatch = " + credentialNameCloudWatch, "failoverTimeMillis = 100",
"shardSyncIntervalMillis = 500" }, '\n');
- InputStream input = new ByteArrayInputStream(test.getBytes());
// separate input stream with getConfiguration to explicitly catch exception from the getConfiguration statement
- MultiLangDaemonConfiguration config = configurator.getConfiguration(input);
- try {
- config.getKinesisCredentialsProvider().build(AwsCredentialsProvider.class).resolveCredentials();
- } catch (Exception e) {
- fail("Kinesis credential providers should not fail.");
- }
- try {
- config.getDynamoDBCredentialsProvider().build(AwsCredentialsProvider.class).resolveCredentials();
- } catch (Exception e) {
- fail("DynamoDB credential providers should not fail.");
- }
- try {
- config.getCloudWatchCredentialsProvider().build(AwsCredentialsProvider.class).resolveCredentials();
- } catch (Exception e) {
- fail("CloudWatch credential providers should not fail.");
- }
+ final MultiLangDaemonConfiguration config = getConfiguration(test);
+ config.getKinesisCredentialsProvider().build(AwsCredentialsProvider.class).resolveCredentials();
+ config.getDynamoDBCredentialsProvider().build(AwsCredentialsProvider.class).resolveCredentials();
+ config.getCloudWatchCredentialsProvider().build(AwsCredentialsProvider.class).resolveCredentials();
}
// TODO: fix this test
@@ -396,17 +344,10 @@ public class KinesisClientLibConfiguratorTest {
"AWSCredentialsProviderDynamoDB = " + credentialName2,
"AWSCredentialsProviderCloudWatch = " + credentialName2, "failoverTimeMillis = 100",
"shardSyncIntervalMillis = 500" }, '\n');
- InputStream input = new ByteArrayInputStream(test.getBytes());
// separate input stream with getConfiguration to explicitly catch exception from the getConfiguration statement
-
- // separate input stream with getConfiguration to explicitly catch exception from the getConfiguration statement
- MultiLangDaemonConfiguration config = configurator.getConfiguration(input);
- try {
- config.getKinesisCredentialsProvider().build(AwsCredentialsProvider.class).resolveCredentials();
- } catch (Exception e) {
- fail("Kinesis credential providers should not fail.");
- }
+ final MultiLangDaemonConfiguration config = getConfiguration(test);
+ config.getKinesisCredentialsProvider().build(AwsCredentialsProvider.class).resolveCredentials();
try {
config.getDynamoDBCredentialsProvider().build(AwsCredentialsProvider.class).resolveCredentials();
fail("DynamoDB credential providers should fail.");
@@ -503,7 +444,6 @@ public class KinesisClientLibConfiguratorTest {
private MultiLangDaemonConfiguration getConfiguration(String configString) {
InputStream input = new ByteArrayInputStream(configString.getBytes());
- MultiLangDaemonConfiguration config = configurator.getConfiguration(input);
- return config;
+ return configurator.getConfiguration(input);
}
}
\ No newline at end of file
diff --git a/amazon-kinesis-client/pom.xml b/amazon-kinesis-client/pom.xml
index b7a4689c..653d581f 100644
--- a/amazon-kinesis-client/pom.xml
+++ b/amazon-kinesis-client/pom.xml
@@ -22,7 +22,7 @@
software.amazon.kinesis
amazon-kinesis-client-pom
- 2.5.1
+ 2.5.2-SNAPSHOT
amazon-kinesis-client
@@ -89,7 +89,7 @@
com.google.guava
guava
- 32.0.0-jre
+ 32.1.1-jre
com.google.protobuf
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java
index d67fff96..11983ded 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java
@@ -97,6 +97,7 @@ import software.amazon.kinesis.retrieval.RetrievalConfig;
import software.amazon.kinesis.schemaregistry.SchemaRegistryDecoder;
import static software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy.StreamsLeasesDeletionType;
+import static software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy.StreamsLeasesDeletionType.FORMER_STREAMS_AUTO_DETECTION_DEFERRED_DELETION;
/**
*
@@ -489,7 +490,7 @@ public class Scheduler implements Runnable {
}
};
- if (formerStreamsLeasesDeletionStrategy.leaseDeletionType() == StreamsLeasesDeletionType.FORMER_STREAMS_AUTO_DETECTION_DEFERRED_DELETION) {
+ if (formerStreamsLeasesDeletionStrategy.leaseDeletionType() == FORMER_STREAMS_AUTO_DETECTION_DEFERRED_DELETION) {
// Now, we are identifying the stale/old streams and enqueuing it for deferred deletion.
// It is assumed that all the workers will always have the latest and consistent snapshot of streams
// from the multiStreamTracker.
@@ -521,7 +522,8 @@ public class Scheduler implements Runnable {
if (!newStreamConfigMap.containsKey(streamIdentifier)) {
if (SHOULD_DO_LEASE_SYNC_FOR_OLD_STREAMS) {
log.info(
- "Found old/deleted stream : {}. Triggering shard sync. Removing from tracked active streams.", streamIdentifier);
+ "Found old/deleted stream : {}. Triggering shard sync. Removing from tracked active streams.",
+ streamIdentifier);
ShardSyncTaskManager shardSyncTaskManager = createOrGetShardSyncTaskManager(
currentStreamConfigMap.get(streamIdentifier));
shardSyncTaskManager.submitShardSyncTask();
@@ -541,10 +543,13 @@ public class Scheduler implements Runnable {
// Now let's scan the streamIdentifiersForLeaseCleanup eligible for deferred deletion and delete them.
// StreamIdentifiers are eligible for deletion only when the deferment period has elapsed and
// the streamIdentifiersForLeaseCleanup are not present in the latest snapshot.
- final Map> staleStreamIdDeletionDecisionMap = staleStreamDeletionMap.keySet().stream().collect(Collectors
- .partitioningBy(newStreamConfigMap::containsKey, Collectors.toSet()));
- final Set staleStreamIdsToBeDeleted = staleStreamIdDeletionDecisionMap.get(false).stream().filter(streamIdentifier ->
- Duration.between(staleStreamDeletionMap.get(streamIdentifier), Instant.now()).toMillis() >= waitPeriodToDeleteOldStreams.toMillis())
+ final Map> staleStreamIdDeletionDecisionMap =
+ staleStreamDeletionMap.keySet().stream().collect(
+ Collectors.partitioningBy(newStreamConfigMap::containsKey, Collectors.toSet()));
+ final Set staleStreamIdsToBeDeleted = staleStreamIdDeletionDecisionMap.get(false)
+ .stream().filter(streamIdentifier ->
+ Duration.between(staleStreamDeletionMap.get(streamIdentifier), Instant.now())
+ .toMillis() >= waitPeriodToDeleteOldStreams.toMillis())
.collect(Collectors.toSet());
// These are the streams which are deleted in Kinesis and we encounter resource not found during
// shardSyncTask. This is applicable in MultiStreamMode only, in case of SingleStreamMode, store will
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java
index 534b5fd3..11014505 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java
@@ -92,7 +92,8 @@ public class HierarchicalShardSyncer {
this(isMultiStreamMode, streamIdentifier, null);
}
- public HierarchicalShardSyncer(final boolean isMultiStreamMode, final String streamIdentifier, final DeletedStreamListProvider deletedStreamListProvider) {
+ public HierarchicalShardSyncer(final boolean isMultiStreamMode, final String streamIdentifier,
+ final DeletedStreamListProvider deletedStreamListProvider) {
this.isMultiStreamMode = isMultiStreamMode;
this.streamIdentifier = streamIdentifier;
this.deletedStreamListProvider = deletedStreamListProvider;
@@ -191,7 +192,9 @@ public class HierarchicalShardSyncer {
if (!CollectionUtils.isNullOrEmpty(inconsistentShardIds)) {
final String ids = StringUtils.join(inconsistentShardIds, ' ');
throw new KinesisClientLibIOException(String.format(
+ // CHECKSTYLE.OFF: LineLength
"%d open child shards (%s) are inconsistent. This can happen due to a race condition between describeStream and a reshard operation.",
+ // CHECKSTYLE.ON: LineLength
inconsistentShardIds.size(), ids));
}
}
@@ -564,7 +567,8 @@ public class HierarchicalShardSyncer {
return parentShardIds;
}
- public synchronized Lease createLeaseForChildShard(final ChildShard childShard, final StreamIdentifier streamIdentifier) throws InvalidStateException {
+ public synchronized Lease createLeaseForChildShard(final ChildShard childShard,
+ final StreamIdentifier streamIdentifier) throws InvalidStateException {
final MultiStreamArgs multiStreamArgs = new MultiStreamArgs(isMultiStreamMode, streamIdentifier);
return multiStreamArgs.isMultiStreamMode() ? newKCLMultiStreamLeaseForChildShard(childShard, streamIdentifier)
@@ -583,7 +587,8 @@ public class HierarchicalShardSyncer {
if (!CollectionUtils.isNullOrEmpty(childShard.parentShards())) {
newLease.parentShardIds(childShard.parentShards());
} else {
- throw new InvalidStateException("Unable to populate new lease for child shard " + childShard.shardId() + "because parent shards cannot be found.");
+ throw new InvalidStateException("Unable to populate new lease for child shard " + childShard.shardId()
+ + " because parent shards cannot be found.");
}
newLease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
newLease.ownerSwitchesSinceCheckpoint(0L);
@@ -591,13 +596,15 @@ public class HierarchicalShardSyncer {
return newLease;
}
- private static Lease newKCLMultiStreamLeaseForChildShard(final ChildShard childShard, final StreamIdentifier streamIdentifier) throws InvalidStateException {
+ private static Lease newKCLMultiStreamLeaseForChildShard(final ChildShard childShard,
+ final StreamIdentifier streamIdentifier) throws InvalidStateException {
MultiStreamLease newLease = new MultiStreamLease();
newLease.leaseKey(MultiStreamLease.getLeaseKey(streamIdentifier.serialize(), childShard.shardId()));
if (!CollectionUtils.isNullOrEmpty(childShard.parentShards())) {
newLease.parentShardIds(childShard.parentShards());
} else {
- throw new InvalidStateException("Unable to populate new lease for child shard " + childShard.shardId() + "because parent shards cannot be found.");
+ throw new InvalidStateException("Unable to populate new lease for child shard " + childShard.shardId()
+ + " because parent shards cannot be found.");
}
newLease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
newLease.ownerSwitchesSinceCheckpoint(0L);
@@ -612,7 +619,6 @@ public class HierarchicalShardSyncer {
* Note: Package level access only for testing purposes
*
* @param shard
- * @return
*/
private static Lease newKCLLease(final Shard shard) {
Lease newLease = new Lease();
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java
index 8a442bd3..861626b6 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java
@@ -208,7 +208,8 @@ public class LeaseCleanupManager {
log.warn("Unable to cleanup lease for shard {} in {}", shardInfo.shardId(), streamIdentifier.streamName(), e);
}
} else {
- log.info("Lease not present in lease table while cleaning the shard {} of {}", shardInfo.shardId(), streamIdentifier.streamName());
+ log.info("Lease not present in lease table while cleaning the shard {} of {}",
+ shardInfo.shardId(), streamIdentifier.streamName());
cleanedUpCompletedLease = true;
}
}
@@ -232,14 +233,17 @@ public class LeaseCleanupManager {
// A lease that ended with SHARD_END from ResourceNotFoundException is safe to delete if it no longer exists in the
// stream (known explicitly from ResourceNotFound being thrown when processing this shard),
- private boolean cleanupLeaseForGarbageShard(Lease lease, Throwable e) throws DependencyException, ProvisionedThroughputException, InvalidStateException {
+ private boolean cleanupLeaseForGarbageShard(Lease lease, Throwable e)
+ throws DependencyException, ProvisionedThroughputException, InvalidStateException {
log.warn("Deleting lease {} as it is not present in the stream.", lease, e);
leaseCoordinator.leaseRefresher().deleteLease(lease);
return true;
}
/**
- * Check if the all of the parent shards for a given lease have an ongoing lease. If any one parent still has a lease, return false. Otherwise return true
+ * Check if the all of the parent shards for a given lease have an ongoing lease. If any one parent still has a
+ * lease, return false. Otherwise return true
+ *
* @param lease
* @param shardInfo
* @return
@@ -247,7 +251,8 @@ public class LeaseCleanupManager {
* @throws ProvisionedThroughputException
* @throws InvalidStateException
*/
- private boolean allParentShardLeasesDeleted(Lease lease, ShardInfo shardInfo) throws DependencyException, ProvisionedThroughputException, InvalidStateException {
+ private boolean allParentShardLeasesDeleted(Lease lease, ShardInfo shardInfo)
+ throws DependencyException, ProvisionedThroughputException, InvalidStateException {
for (String parentShard : lease.parentShardIds()) {
final Lease parentLease = leaseCoordinator.leaseRefresher().getLease(ShardInfo.getLeaseKey(shardInfo, parentShard));
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java
index 4bef8442..c87f3eb8 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java
@@ -152,7 +152,8 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
final LeaseSerializer serializer, final boolean consistentReads,
@NonNull final TableCreatorCallback tableCreatorCallback, Duration dynamoDbRequestTimeout,
final BillingMode billingMode) {
- this(table, dynamoDBClient, serializer, consistentReads, tableCreatorCallback, dynamoDbRequestTimeout, billingMode, DefaultSdkAutoConstructList.getInstance());
+ this(table, dynamoDBClient, serializer, consistentReads, tableCreatorCallback, dynamoDbRequestTimeout,
+ billingMode, DefaultSdkAutoConstructList.getInstance());
}
/**
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java
index 9ebed654..7e12b9a9 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java
@@ -224,11 +224,13 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer {
public Map getDynamoTakeLeaseUpdate(final Lease lease, String owner) {
Map result = new HashMap<>();
- result.put(LEASE_OWNER_KEY, AttributeValueUpdate.builder().value(DynamoUtils.createAttributeValue(owner)).action(AttributeAction.PUT).build());
+ result.put(LEASE_OWNER_KEY, AttributeValueUpdate.builder().value(DynamoUtils.createAttributeValue(owner))
+ .action(AttributeAction.PUT).build());
String oldOwner = lease.leaseOwner();
if (oldOwner != null && !oldOwner.equals(owner)) {
- result.put(OWNER_SWITCHES_KEY, AttributeValueUpdate.builder().value(DynamoUtils.createAttributeValue(1L)).action(AttributeAction.ADD).build());
+ result.put(OWNER_SWITCHES_KEY, AttributeValueUpdate.builder().value(DynamoUtils.createAttributeValue(1L))
+ .action(AttributeAction.ADD).build());
}
return result;
@@ -257,7 +259,8 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer {
if (lease.pendingCheckpoint() != null && !lease.pendingCheckpoint().sequenceNumber().isEmpty()) {
result.put(PENDING_CHECKPOINT_SEQUENCE_KEY, putUpdate(DynamoUtils.createAttributeValue(lease.pendingCheckpoint().sequenceNumber())));
- result.put(PENDING_CHECKPOINT_SUBSEQUENCE_KEY, putUpdate(DynamoUtils.createAttributeValue(lease.pendingCheckpoint().subSequenceNumber())));
+ result.put(PENDING_CHECKPOINT_SUBSEQUENCE_KEY, putUpdate(DynamoUtils.createAttributeValue(
+ lease.pendingCheckpoint().subSequenceNumber())));
} else {
result.put(PENDING_CHECKPOINT_SEQUENCE_KEY, AttributeValueUpdate.builder().action(AttributeAction.DELETE).build());
result.put(PENDING_CHECKPOINT_SUBSEQUENCE_KEY, AttributeValueUpdate.builder().action(AttributeAction.DELETE).build());
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java
index 4a96d87d..16398963 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java
@@ -180,7 +180,8 @@ public class ShutdownTask implements ConsumerTask {
// Create new lease for the child shards if they don't exist.
// We have one valid scenario that shutdown task got created with SHARD_END reason and an empty list of childShards.
// This would happen when KinesisDataFetcher(for polling mode) or FanOutRecordsPublisher(for StoS mode) catches ResourceNotFound exception.
- // In this case, KinesisDataFetcher and FanOutRecordsPublisher will send out SHARD_END signal to trigger a shutdown task with empty list of childShards.
+ // In this case, KinesisDataFetcher and FanOutRecordsPublisher will send out SHARD_END signal to trigger a
+ // shutdown task with empty list of childShards.
// This scenario could happen when customer deletes the stream while leaving the KCL application running.
if (currentShardLease == null) {
throw new InvalidStateException(leaseKey
@@ -286,7 +287,8 @@ public class ShutdownTask implements ConsumerTask {
for (ChildShard childShard : childShards) {
final String leaseKey = ShardInfo.getLeaseKey(shardInfo, childShard.shardId());
if (leaseRefresher.getLease(leaseKey) == null) {
- log.debug("{} - Shard {} - Attempting to create lease for child shard {}", shardDetector.streamIdentifier(), shardInfo.shardId(), leaseKey);
+ log.debug("{} - Shard {} - Attempting to create lease for child shard {}",
+ shardDetector.streamIdentifier(), shardInfo.shardId(), leaseKey);
final Lease leaseToCreate = hierarchicalShardSyncer.createLeaseForChildShard(childShard, shardDetector.streamIdentifier());
final long startTime = System.currentTimeMillis();
boolean success = false;
@@ -296,7 +298,8 @@ public class ShutdownTask implements ConsumerTask {
} finally {
MetricsUtil.addSuccessAndLatency(scope, "CreateLease", success, startTime, MetricsLevel.DETAILED);
if (leaseToCreate.checkpoint() != null) {
- final String metricName = leaseToCreate.checkpoint().isSentinelCheckpoint() ? leaseToCreate.checkpoint().sequenceNumber() : "SEQUENCE_NUMBER";
+ final String metricName = leaseToCreate.checkpoint().isSentinelCheckpoint() ?
+ leaseToCreate.checkpoint().sequenceNumber() : "SEQUENCE_NUMBER";
MetricsUtil.addSuccess(scope, "CreateLease_" + metricName, true, MetricsLevel.DETAILED);
}
}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java
index 3cb4a0d9..ba9f791f 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java
@@ -49,7 +49,7 @@ public class RetrievalConfig {
*/
public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java";
- public static final String KINESIS_CLIENT_LIB_USER_AGENT_VERSION = "2.5.1";
+ public static final String KINESIS_CLIENT_LIB_USER_AGENT_VERSION = "2.5.2-SNAPSHOT";
/**
* Client used to make calls to Kinesis for records retrieval
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java
index 7177211f..a17e5e82 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java
@@ -230,8 +230,9 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
subscriber.onNext(recordsRetrieved);
}
} catch (IllegalStateException e) {
-
+ // CHECKSTYLE.OFF: LineLength
log.warn("{}: Unable to enqueue the payload due to capacity restrictions in delivery queue with remaining capacity {}. Last successful request details -- {}",
+ // CHECKSTYLE.ON: LineLength
streamAndShardId, recordsDeliveryQueue.remainingCapacity(), lastSuccessfulRequestDetails);
throw e;
} catch (Throwable t) {
@@ -382,7 +383,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
// Clear any lingering records in the queue.
if (!recordsDeliveryQueue.isEmpty()) {
log.warn("{}: Found non-empty queue while starting subscription. This indicates unsuccessful clean up of "
- + "previous subscription - {}. Last successful request details -- {}", streamAndShardId, subscribeToShardId, lastSuccessfulRequestDetails);
+ + "previous subscription - {}. Last successful request details -- {}",
+ streamAndShardId, subscribeToShardId, lastSuccessfulRequestDetails);
recordsDeliveryQueue.clear();
}
}
@@ -402,7 +404,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
// The ack received for this onNext event will be ignored by the publisher as the global flow object should
// be either null or renewed when the ack's flow identifier is evaluated.
FanoutRecordsRetrieved response = new FanoutRecordsRetrieved(
- ProcessRecordsInput.builder().records(Collections.emptyList()).isAtShardEnd(true).childShards(Collections.emptyList()).build(), null,
+ ProcessRecordsInput.builder().records(Collections.emptyList()).isAtShardEnd(true)
+ .childShards(Collections.emptyList()).build(), null,
triggeringFlow != null ? triggeringFlow.getSubscribeToShardId() : shardId + "-no-flow-found");
subscriber.onNext(response);
subscriber.onComplete();
@@ -515,7 +518,9 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
private void updateAvailableQueueSpaceAndRequestUpstream(RecordFlow triggeringFlow) {
if (availableQueueSpace <= 0) {
log.debug(
+ // CHECKSTYLE.OFF: LineLength
"{}: [SubscriptionLifetime] (FanOutRecordsPublisher#recordsReceived) @ {} id: {} -- Attempted to decrement availableQueueSpace to below 0",
+ // CHECKSTYLE.ON: LineLength
streamAndShardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId);
} else {
availableQueueSpace--;
@@ -544,7 +549,9 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
if (!isActiveFlow(triggeringFlow)) {
log.debug(
+ // CHECKSTYLE.OFF: LineLength
"{}: [SubscriptionLifetime]: (FanOutRecordsPublisher#onComplete) @ {} id: {} -- Received spurious onComplete from unexpected flow. Ignoring.",
+ // CHECKSTYLE.ON: LineLength
streamAndShardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId);
return;
}
@@ -801,7 +808,9 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
throwable.getMessage());
if (this.isDisposed) {
log.debug(
+ // CHECKSTYLE.OFF: LineLength
"{}: [SubscriptionLifetime]: (RecordFlow#exceptionOccurred) @ {} id: {} -- This flow has been disposed, not dispatching error. {}: {}",
+ // CHECKSTYLE.ON: LineLength
parent.streamAndShardId, connectionStartedAt, subscribeToShardId, throwable.getClass().getName(),
throwable.getMessage());
this.isErrorDispatched = true;
@@ -891,7 +900,9 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
subscription.cancel();
} catch (Throwable t) {
log.error(
+ // CHECKSTYLE.OFF: LineLength
"{}: [SubscriptionLifetime]: (RecordFlow#complete) @ {} id: {} -- Exception while trying to cancel failed subscription: {}",
+ // CHECKSTYLE.ON: LineLength
parent.streamAndShardId, connectionStartedAt, subscribeToShardId, t.getMessage(), t);
}
}
@@ -953,12 +964,16 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
if (flow.shouldSubscriptionCancel()) {
if (flow.isCancelled) {
log.debug(
+ // CHECKSTYLE.OFF: LineLength
"{}: [SubscriptionLifetime]: (RecordSubscription#onSubscribe) @ {} id: {} -- Subscription was cancelled before onSubscribe",
+ // CHECKSTYLE.ON: LineLength
parent.streamAndShardId, connectionStartedAt, subscribeToShardId);
}
if (flow.isDisposed) {
log.debug(
+ // CHECKSTYLE.OFF: LineLength
"{}: [SubscriptionLifetime]: (RecordSubscription#onSubscribe) @ {} id: {} -- RecordFlow has been disposed cancelling subscribe",
+ // CHECKSTYLE.ON: LineLength
parent.streamAndShardId, connectionStartedAt, subscribeToShardId);
}
log.debug(
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java
index 3b5bfec9..fd1380f2 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java
@@ -38,6 +38,7 @@ import static org.mockito.internal.verification.VerificationModeFactory.atMost;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
@@ -176,7 +177,8 @@ public class SchedulerTest {
shardRecordProcessorFactory = new TestShardRecordProcessorFactory();
checkpointConfig = new CheckpointConfig().checkpointFactory(new TestKinesisCheckpointFactory());
- coordinatorConfig = new CoordinatorConfig(applicationName).parentShardPollIntervalMillis(100L).workerStateChangeListener(workerStateChangeListener);
+ coordinatorConfig = new CoordinatorConfig(applicationName).parentShardPollIntervalMillis(100L)
+ .workerStateChangeListener(workerStateChangeListener);
leaseManagementConfig = new LeaseManagementConfig(tableName, dynamoDBClient, kinesisClient, streamName,
workerIdentifier).leaseManagementFactory(new TestKinesisLeaseManagementFactory(false, false));
lifecycleConfig = new LifecycleConfig();
@@ -188,7 +190,8 @@ public class SchedulerTest {
when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector);
when(shardSyncTaskManager.hierarchicalShardSyncer()).thenReturn(new HierarchicalShardSyncer());
when(shardSyncTaskManager.callShardSyncTask()).thenReturn(new TaskResult(null));
- when(retrievalFactory.createGetRecordsCache(any(ShardInfo.class), any(StreamConfig.class), any(MetricsFactory.class))).thenReturn(recordsPublisher);
+ when(retrievalFactory.createGetRecordsCache(any(ShardInfo.class), any(StreamConfig.class),
+ any(MetricsFactory.class))).thenReturn(recordsPublisher);
when(shardDetector.streamIdentifier()).thenReturn(mock(StreamIdentifier.class));
scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
@@ -645,7 +648,8 @@ public class SchedulerTest {
testMultiStreamStaleStreamsAreDeletedAfterDefermentPeriod(true, null);
}
- private final void testMultiStreamStaleStreamsAreDeletedAfterDefermentPeriod(boolean expectSyncedStreams, Set currentStreamConfigMapOverride)
+ private void testMultiStreamStaleStreamsAreDeletedAfterDefermentPeriod(boolean expectSyncedStreams,
+ Set currentStreamConfigMapOverride)
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
List streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig(
StreamIdentifier.multiStreamInstance(
@@ -1294,21 +1298,20 @@ public class SchedulerTest {
}
}
- // TODO: Upgrade to mockito >= 2.7.13, and use Spy on MultiStreamTracker to directly access the default methods without implementing TestMultiStreamTracker class
+ // TODO: Upgrade to mockito >= 2.7.13, and use Spy on MultiStreamTracker to directly access the default methods
+ // without implementing TestMultiStreamTracker class
@NoArgsConstructor
private class TestMultiStreamTracker implements MultiStreamTracker {
@Override
- public List streamConfigList(){
- return new ArrayList() {{
- add(new StreamConfig(StreamIdentifier.multiStreamInstance("123456789012:stream1:1"), InitialPositionInStreamExtended.newInitialPosition(
- InitialPositionInStream.LATEST)));
- add(new StreamConfig(StreamIdentifier.multiStreamInstance("123456789012:stream2:2"), InitialPositionInStreamExtended.newInitialPosition(
- InitialPositionInStream.LATEST)));
- add(new StreamConfig(StreamIdentifier.multiStreamInstance("210987654321:stream1:1"), InitialPositionInStreamExtended.newInitialPosition(
- InitialPositionInStream.LATEST)));
- add(new StreamConfig(StreamIdentifier.multiStreamInstance("210987654321:stream2:3"), InitialPositionInStreamExtended.newInitialPosition(
- InitialPositionInStream.LATEST)));
- }};
+ public List streamConfigList() {
+ final InitialPositionInStreamExtended latest =
+ InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST);
+
+ return Arrays.asList(
+ new StreamConfig(StreamIdentifier.multiStreamInstance("123456789012:stream1:1"), latest),
+ new StreamConfig(StreamIdentifier.multiStreamInstance("123456789012:stream2:2"), latest),
+ new StreamConfig(StreamIdentifier.multiStreamInstance("210987654321:stream1:1"), latest),
+ new StreamConfig(StreamIdentifier.multiStreamInstance("210987654321:stream2:3"), latest));
}
@Override
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ExceptionThrowingLeaseRefresher.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ExceptionThrowingLeaseRefresher.java
index 5e612ade..7f93216d 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ExceptionThrowingLeaseRefresher.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ExceptionThrowingLeaseRefresher.java
@@ -141,7 +141,8 @@ public class ExceptionThrowingLeaseRefresher implements LeaseRefresher {
}
@Override
- public List listLeasesForStream(StreamIdentifier streamIdentifier) throws DependencyException, InvalidStateException, ProvisionedThroughputException {
+ public List listLeasesForStream(StreamIdentifier streamIdentifier)
+ throws DependencyException, InvalidStateException, ProvisionedThroughputException {
throwExceptions("listLeasesForStream", ExceptionThrowingLeaseRefresherMethods.LISTLEASESFORSTREAM);
return leaseRefresher.listLeasesForStream(streamIdentifier);
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java
index 1a1abc0e..6df34633 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java
@@ -2257,10 +2257,12 @@ public class HierarchicalShardSyncerTest {
testEmptyLeaseTableBootstrapUsesListShardsWithFilter(INITIAL_POSITION_AT_TIMESTAMP, shardFilter);
}
- public void testEmptyLeaseTableBootstrapUsesListShardsWithFilter(InitialPositionInStreamExtended initialPosition, ShardFilter shardFilter) throws Exception {
+ public void testEmptyLeaseTableBootstrapUsesListShardsWithFilter(InitialPositionInStreamExtended initialPosition,
+ ShardFilter shardFilter) throws Exception {
final String shardId0 = "shardId-0";
final List shards = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null,
- ShardObjectHelper.newSequenceNumberRange("1", null), ShardObjectHelper.newHashKeyRange(ShardObjectHelper.MIN_HASH_KEY, ShardObjectHelper.MAX_HASH_KEY)));
+ ShardObjectHelper.newSequenceNumberRange("1", null),
+ ShardObjectHelper.newHashKeyRange(ShardObjectHelper.MIN_HASH_KEY, ShardObjectHelper.MAX_HASH_KEY)));
when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(true);
when(shardDetector.listShardsWithFilter(shardFilter)).thenReturn(shards);
@@ -2278,8 +2280,10 @@ public class HierarchicalShardSyncerTest {
final String shardId0 = "shardId-0";
final String shardId1 = "shardId-1";
- final List shardsWithLeases = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, ShardObjectHelper.newSequenceNumberRange("1", "2")));
- final List shardsWithoutLeases = Arrays.asList(ShardObjectHelper.newShard(shardId1, null, null, ShardObjectHelper.newSequenceNumberRange("3", "4")));
+ final List shardsWithLeases = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null,
+ ShardObjectHelper.newSequenceNumberRange("1", "2")));
+ final List shardsWithoutLeases = Arrays.asList(ShardObjectHelper.newShard(shardId1, null, null,
+ ShardObjectHelper.newSequenceNumberRange("3", "4")));
final List currentLeases = createLeasesFromShards(shardsWithLeases, ExtendedSequenceNumber.LATEST, "foo");
@@ -2301,8 +2305,10 @@ public class HierarchicalShardSyncerTest {
@Test(expected = KinesisClientLibIOException.class)
public void testEmptyLeaseTableThrowsExceptionWhenHashRangeIsStillIncompleteAfterRetries() throws Exception {
final List shardsWithIncompleteHashRange = Arrays.asList(
- ShardObjectHelper.newShard("shardId-0", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange("0", "1")),
- ShardObjectHelper.newShard("shardId-1", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange("2", "3"))
+ ShardObjectHelper.newShard("shardId-0", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"),
+ ShardObjectHelper.newHashKeyRange("0", "1")),
+ ShardObjectHelper.newShard("shardId-1", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"),
+ ShardObjectHelper.newHashKeyRange("2", "3"))
);
when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(true);
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardObjectHelper.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardObjectHelper.java
index cc03a203..4e2bae48 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardObjectHelper.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardObjectHelper.java
@@ -67,7 +67,8 @@ public class ShardObjectHelper {
String parentShardId,
String adjacentParentShardId,
SequenceNumberRange sequenceNumberRange) {
- return newShard(shardId, parentShardId, adjacentParentShardId, sequenceNumberRange, HashKeyRange.builder().startingHashKey("1").endingHashKey("100").build());
+ return newShard(shardId, parentShardId, adjacentParentShardId, sequenceNumberRange,
+ HashKeyRange.builder().startingHashKey("1").endingHashKey("100").build());
}
/** Helper method to create a new shard object.
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherIntegrationTest.java
index 1b2fa78a..51e12d2d 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherIntegrationTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherIntegrationTest.java
@@ -17,6 +17,7 @@ package software.amazon.kinesis.leases.dynamodb;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.junit.Before;
@@ -34,7 +35,6 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.verify;
@@ -299,17 +299,12 @@ public class DynamoDBLeaseRefresherIntegrationTest extends LeaseIntegrationTest
@Test
public void testWaitUntilLeaseTableExists() throws LeasingException {
- DynamoDBLeaseRefresher refresher = new DynamoDBLeaseRefresher("nagl_ShardProgress", ddbClient,
- new DynamoDBLeaseSerializer(), true, tableCreatorCallback) {
- @Override
- long sleep(long timeToSleepMillis) {
- fail("Should not sleep");
- return 0L;
- }
+ final UUID uniqueId = UUID.randomUUID();
+ DynamoDBLeaseRefresher refresher = new DynamoDBLeaseRefresher("tableEventuallyExists_" + uniqueId, ddbClient,
+ new DynamoDBLeaseSerializer(), true, tableCreatorCallback);
- };
-
- assertTrue(refresher.waitUntilLeaseTableExists(1, 1));
+ refresher.createLeaseTableIfNotExists();
+ assertTrue(refresher.waitUntilLeaseTableExists(1, 20));
}
@Test
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerIntegrationTest.java
index 772aa542..d6c2d0b3 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerIntegrationTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerIntegrationTest.java
@@ -141,7 +141,8 @@ public class DynamoDBLeaseTakerIntegrationTest extends LeaseIntegrationTest {
.withLease("5", "foo")
.build();
- // In the current DynamoDBLeaseTaker implementation getAllLeases() gets leases from an internal cache that is built during takeLeases() operation
+ // In the current DynamoDBLeaseTaker implementation getAllLeases() gets leases from an internal cache that is
+ // built during takeLeases() operation
assertThat(taker.allLeases().size(), equalTo(0));
taker.takeLeases();
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java
index f94d82fd..c085f196 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java
@@ -40,7 +40,6 @@ import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
-import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.ChildShard;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.common.InitialPositionInStream;
@@ -88,12 +87,8 @@ public class ConsumerStatesTest {
@Mock
private ShutdownNotification shutdownNotification;
@Mock
- private InitialPositionInStreamExtended initialPositionInStream;
- @Mock
private RecordsPublisher recordsPublisher;
@Mock
- private KinesisAsyncClient kinesisClient;
- @Mock
private ShardDetector shardDetector;
@Mock
private HierarchicalShardSyncer hierarchicalShardSyncer;
@@ -121,7 +116,8 @@ public class ConsumerStatesTest {
@Before
public void setup() {
- argument = new ShardConsumerArgument(shardInfo, StreamIdentifier.singleStreamInstance(STREAM_NAME), leaseCoordinator, executorService, recordsPublisher,
+ argument = new ShardConsumerArgument(shardInfo, StreamIdentifier.singleStreamInstance(STREAM_NAME),
+ leaseCoordinator, executorService, recordsPublisher,
shardRecordProcessor, checkpointer, recordProcessorCheckpointer, parentShardPollIntervalMillis,
taskBackoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, listShardsBackoffTimeInMillis,
maxListShardsRetryAttempts, shouldCallProcessRecordsEvenForEmptyRecordList, idleTimeInMillis,
@@ -134,7 +130,7 @@ public class ConsumerStatesTest {
when(recordProcessorCheckpointer.checkpointer()).thenReturn(checkpointer);
}
- private static final Class LEASE_REFRESHER_CLASS = (Class) (Class>) LeaseRefresher.class;
+ private static final Class LEASE_REFRESHER_CLASS = LeaseRefresher.class;
@Test
public void blockOnParentStateTest() {
@@ -431,7 +427,6 @@ public class ConsumerStatesTest {
}
}
this.matchingField = matching;
-
}
@Override
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ProcessTaskTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ProcessTaskTest.java
index 12476837..1e89d8cc 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ProcessTaskTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ProcessTaskTest.java
@@ -101,7 +101,6 @@ public class ProcessTaskTest {
@Mock
private GlueSchemaRegistryDeserializer glueSchemaRegistryDeserializer;
-
private static final byte[] TEST_DATA = new byte[] { 1, 2, 3, 4 };
private final String shardId = "shard-test";
@@ -116,7 +115,6 @@ public class ProcessTaskTest {
private ProcessTask processTask;
-
@Before
public void setUpProcessTask() {
when(checkpointer.checkpointer()).thenReturn(mock(Checkpointer.class));
@@ -130,7 +128,8 @@ public class ProcessTaskTest {
}
private ProcessTask makeProcessTask(ProcessRecordsInput processRecordsInput, GlueSchemaRegistryDeserializer deserializer) {
- return makeProcessTask(processRecordsInput, new AggregatorUtil(), skipShardSyncAtWorkerInitializationIfLeasesExist, new SchemaRegistryDecoder(deserializer));
+ return makeProcessTask(processRecordsInput, new AggregatorUtil(), skipShardSyncAtWorkerInitializationIfLeasesExist,
+ new SchemaRegistryDecoder(deserializer));
}
private ProcessTask makeProcessTask(ProcessRecordsInput processRecordsInput, AggregatorUtil aggregatorUtil,
@@ -149,11 +148,8 @@ public class ProcessTaskTest {
);
}
-
-
@Test
public void testProcessTaskWithShardEndReached() {
-
processTask = makeProcessTask(processRecordsInput);
when(processRecordsInput.isAtShardEnd()).thenReturn(true);
@@ -429,7 +425,8 @@ public class ProcessTaskTest {
when(processRecordsInput.records()).thenReturn(rawRecords);
ProcessTask processTask = makeProcessTask(processRecordsInput, aggregatorUtil, false);
- ShardRecordProcessorOutcome outcome = testWithRecords(processTask, new ExtendedSequenceNumber(sequenceNumber.subtract(BigInteger.valueOf(100)).toString(), 0L),
+ ShardRecordProcessorOutcome outcome = testWithRecords(processTask,
+ new ExtendedSequenceNumber(sequenceNumber.subtract(BigInteger.valueOf(100)).toString(), 0L),
new ExtendedSequenceNumber(sequenceNumber.toString(), 0L));
assertThat(outcome.processRecordsCall.records(), equalTo(expectedRecords));
@@ -645,12 +642,12 @@ public class ProcessTaskTest {
}
private ShardRecordProcessorOutcome testWithRecords(List records,
- ExtendedSequenceNumber lastCheckpointValue, ExtendedSequenceNumber largestPermittedCheckpointValue) {
+ ExtendedSequenceNumber lastCheckpointValue, ExtendedSequenceNumber largestPermittedCheckpointValue) {
return testWithRecords(records, lastCheckpointValue, largestPermittedCheckpointValue, new AggregatorUtil());
}
private ShardRecordProcessorOutcome testWithRecords(List records, ExtendedSequenceNumber lastCheckpointValue,
- ExtendedSequenceNumber largestPermittedCheckpointValue, AggregatorUtil aggregatorUtil) {
+ ExtendedSequenceNumber largestPermittedCheckpointValue, AggregatorUtil aggregatorUtil) {
when(processRecordsInput.records()).thenReturn(records);
return testWithRecords(
makeProcessTask(processRecordsInput, aggregatorUtil, skipShardSyncAtWorkerInitializationIfLeasesExist),
@@ -658,7 +655,7 @@ public class ProcessTaskTest {
}
private ShardRecordProcessorOutcome testWithRecords(ProcessTask processTask, ExtendedSequenceNumber lastCheckpointValue,
- ExtendedSequenceNumber largestPermittedCheckpointValue) {
+ ExtendedSequenceNumber largestPermittedCheckpointValue) {
when(checkpointer.lastCheckpointValue()).thenReturn(lastCheckpointValue);
when(checkpointer.largestPermittedCheckpointValue()).thenReturn(largestPermittedCheckpointValue);
processTask.call();
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java
index 9615794b..fc242fed 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java
@@ -103,7 +103,7 @@ public class FanOutRecordsPublisherTest {
private SubscribeToShardEvent batchEvent;
@Test
- public void simpleTest() throws Exception {
+ public void testSimple() {
FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN);
ArgumentCaptor captor = ArgumentCaptor
@@ -218,8 +218,10 @@ public class FanOutRecordsPublisherTest {
List matchers = records.stream().map(KinesisClientRecordMatcher::new)
.collect(Collectors.toList());
- batchEvent = SubscribeToShardEvent.builder().millisBehindLatest(100L).records(records).continuationSequenceNumber(CONTINUATION_SEQUENCE_NUMBER).build();
- SubscribeToShardEvent invalidEvent = SubscribeToShardEvent.builder().millisBehindLatest(100L).records(records).childShards(Collections.emptyList()).build();
+ batchEvent = SubscribeToShardEvent.builder().millisBehindLatest(100L).records(records)
+ .continuationSequenceNumber(CONTINUATION_SEQUENCE_NUMBER).build();
+ SubscribeToShardEvent invalidEvent = SubscribeToShardEvent.builder().millisBehindLatest(100L)
+ .records(records).childShards(Collections.emptyList()).build();
captor.getValue().onNext(batchEvent);
captor.getValue().onNext(invalidEvent);
@@ -238,7 +240,7 @@ public class FanOutRecordsPublisherTest {
}
@Test
- public void testIfAllEventsReceivedWhenNoTasksRejectedByExecutor() throws Exception {
+ public void testIfAllEventsReceivedWhenNoTasksRejectedByExecutor() {
FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN);
ArgumentCaptor captor = ArgumentCaptor
@@ -414,7 +416,8 @@ public class FanOutRecordsPublisherTest {
int totalServicePublisherEvents = 1000;
int initialDemand = 0;
BackpressureAdheringServicePublisher servicePublisher =
- new BackpressureAdheringServicePublisher(servicePublisherAction, totalServicePublisherEvents, servicePublisherTaskCompletionLatch, initialDemand);
+ new BackpressureAdheringServicePublisher(servicePublisherAction, totalServicePublisherEvents,
+ servicePublisherTaskCompletionLatch, initialDemand);
doNothing().when(publisher).subscribe(captor.capture());
@@ -848,7 +851,8 @@ public class FanOutRecordsPublisherTest {
int totalServicePublisherEvents = 1000;
int initialDemand = 9;
BackpressureAdheringServicePublisher servicePublisher =
- new BackpressureAdheringServicePublisher(servicePublisherAction, totalServicePublisherEvents, servicePublisherTaskCompletionLatch, initialDemand);
+ new BackpressureAdheringServicePublisher(servicePublisherAction, totalServicePublisherEvents,
+ servicePublisherTaskCompletionLatch, initialDemand);
doNothing().when(publisher).subscribe(captor.capture());
@@ -941,7 +945,8 @@ public class FanOutRecordsPublisherTest {
int totalServicePublisherEvents = 1000;
int initialDemand = 11;
BackpressureAdheringServicePublisher servicePublisher =
- new BackpressureAdheringServicePublisher(servicePublisherAction, totalServicePublisherEvents, servicePublisherTaskCompletionLatch, initialDemand);
+ new BackpressureAdheringServicePublisher(servicePublisherAction, totalServicePublisherEvents,
+ servicePublisherTaskCompletionLatch, initialDemand);
doNothing().when(publisher).subscribe(captor.capture());
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/AsynchronousGetRecordsRetrievalStrategyTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/AsynchronousGetRecordsRetrievalStrategyTest.java
index 2c7d8fd1..fc4b4fe1 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/AsynchronousGetRecordsRetrievalStrategyTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/AsynchronousGetRecordsRetrievalStrategyTest.java
@@ -126,7 +126,7 @@ public class AsynchronousGetRecordsRetrievalStrategyTest {
}
@Test(expected = IllegalStateException.class)
- public void testStrategyIsShutdown() throws Exception {
+ public void testStrategyIsShutdown() {
AsynchronousGetRecordsRetrievalStrategy strategy = new AsynchronousGetRecordsRetrievalStrategy(dataFetcher,
executorService, (int) RETRY_GET_RECORDS_IN_SECONDS, completionServiceSupplier, SHARD_ID);
@@ -141,7 +141,8 @@ public class AsynchronousGetRecordsRetrievalStrategyTest {
executorService, (int) RETRY_GET_RECORDS_IN_SECONDS, completionServiceSupplier, SHARD_ID);
when(executorService.isShutdown()).thenReturn(false);
- when(completionService.submit(any())).thenReturn(blockedFuture).thenThrow(new RejectedExecutionException("Rejected!")).thenReturn(successfulFuture);
+ when(completionService.submit(any())).thenReturn(blockedFuture)
+ .thenThrow(new RejectedExecutionException("Rejected!")).thenReturn(successfulFuture);
when(completionService.poll(anyLong(), any())).thenReturn(null).thenReturn(null).thenReturn(successfulFuture);
when(successfulFuture.get()).thenReturn(dataFetcherResult);
when(successfulFuture.cancel(anyBoolean())).thenReturn(false);
@@ -156,7 +157,6 @@ public class AsynchronousGetRecordsRetrievalStrategyTest {
verify(successfulFuture).cancel(eq(true));
verify(blockedFuture).cancel(eq(true));
-
assertThat(actualResult, equalTo(expectedResponses));
}
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherIntegrationTest.java
index d9955da4..780ac4ad 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherIntegrationTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherIntegrationTest.java
@@ -162,7 +162,8 @@ public class PrefetchRecordsPublisherIntegrationTest {
@Test
public void testDifferentShardCaches() {
final ExecutorService executorService2 = spy(Executors.newFixedThreadPool(1));
- final KinesisDataFetcher kinesisDataFetcher = spy(new KinesisDataFetcher(kinesisClient, streamName, shardId, MAX_RECORDS_PER_CALL, NULL_METRICS_FACTORY));
+ final KinesisDataFetcher kinesisDataFetcher = spy(new KinesisDataFetcher(kinesisClient, streamName, shardId,
+ MAX_RECORDS_PER_CALL, NULL_METRICS_FACTORY));
final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy2 =
spy(new AsynchronousGetRecordsRetrievalStrategy(kinesisDataFetcher, 5 , 5, shardId));
final PrefetchRecordsPublisher recordsPublisher2 = new PrefetchRecordsPublisher(
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java
index af02469a..8d88151b 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java
@@ -133,7 +133,8 @@ public class PrefetchRecordsPublisherTest {
getRecordsCache = createPrefetchRecordsPublisher(0L);
spyQueue = spy(getRecordsCache.getPublisherSession().prefetchRecordsQueue());
records = spy(new ArrayList<>());
- getRecordsResponse = GetRecordsResponse.builder().records(records).nextShardIterator(NEXT_SHARD_ITERATOR).childShards(new ArrayList<>()).build();
+ getRecordsResponse = GetRecordsResponse.builder().records(records).nextShardIterator(NEXT_SHARD_ITERATOR)
+ .childShards(Collections.emptyList()).build();
when(getRecordsRetrievalStrategy.getRecords(eq(MAX_RECORDS_PER_CALL))).thenReturn(getRecordsResponse);
}
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/AWSResourceManager.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/AWSResourceManager.java
index 4a6bcfaf..0d4ba656 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/AWSResourceManager.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/AWSResourceManager.java
@@ -65,7 +65,10 @@ public abstract class AWSResourceManager {
public void deleteAllResource() throws Exception {
final List resourceNames = getAllResourceNames();
for (String resourceName : resourceNames) {
- deleteResource(resourceName);
+ // Delete all resources that have prefix "KCLRelease"
+ if (resourceName.startsWith("KCLRelease")) {
+ deleteResource(resourceName);
+ }
}
}
}
diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml
index 76c4b330..363527bd 100644
--- a/checkstyle/checkstyle.xml
+++ b/checkstyle/checkstyle.xml
@@ -10,7 +10,7 @@
-
+
@@ -22,8 +22,8 @@
-
+
diff --git a/pom.xml b/pom.xml
index dcebcb9c..abd1308e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -22,7 +22,7 @@
amazon-kinesis-client-pom
pom
Amazon Kinesis Client Library
- 2.5.1
+ 2.5.2-SNAPSHOT
The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data
from Amazon Kinesis.