diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/credentials/V2CredentialWrapper.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/credentials/V2CredentialWrapper.java index d0afdf75..50880a83 100644 --- a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/credentials/V2CredentialWrapper.java +++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/credentials/V2CredentialWrapper.java @@ -32,7 +32,8 @@ public class V2CredentialWrapper implements AwsCredentialsProvider { public AwsCredentials resolveCredentials() { AWSCredentials current = oldCredentialsProvider.getCredentials(); if (current instanceof AWSSessionCredentials) { - return AwsSessionCredentials.create(current.getAWSAccessKeyId(), current.getAWSSecretKey(), ((AWSSessionCredentials) current).getSessionToken()); + return AwsSessionCredentials.create(current.getAWSAccessKeyId(), current.getAWSSecretKey(), + ((AWSSessionCredentials) current).getSessionToken()); } return new AwsCredentials() { @Override diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/AWSCredentialsProviderPropertyValueDecoderTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/AWSCredentialsProviderPropertyValueDecoderTest.java index 36f496d3..ced63f24 100644 --- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/AWSCredentialsProviderPropertyValueDecoderTest.java +++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/AWSCredentialsProviderPropertyValueDecoderTest.java @@ -36,8 +36,8 @@ public class AWSCredentialsProviderPropertyValueDecoderTest { private static final String TEST_ACCESS_KEY_ID = "123"; private static final String TEST_SECRET_KEY = "456"; - private String credentialName1 = "software.amazon.kinesis.multilang.config.AWSCredentialsProviderPropertyValueDecoderTest$AlwaysSucceedCredentialsProvider"; - private String credentialName2 = "software.amazon.kinesis.multilang.config.AWSCredentialsProviderPropertyValueDecoderTest$ConstructorCredentialsProvider"; + private final String credentialName1 = AlwaysSucceedCredentialsProvider.class.getName(); + private final String credentialName2 = ConstructorCredentialsProvider.class.getName(); private AWSCredentialsProviderPropertyValueDecoder decoder = new AWSCredentialsProviderPropertyValueDecoder(); @ToString diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfiguratorTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfiguratorTest.java index 1f05240a..2b02ea43 100644 --- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfiguratorTest.java +++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfiguratorTest.java @@ -36,35 +36,25 @@ import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.exception.ExceptionUtils; -import org.junit.Rule; import org.junit.Test; import com.google.common.collect.ImmutableSet; -import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; -import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.metrics.MetricsLevel; -import software.amazon.kinesis.processor.ShardRecordProcessorFactory; @RunWith(MockitoJUnitRunner.class) public class KinesisClientLibConfiguratorTest { - private String credentialName1 = "software.amazon.kinesis.multilang.config.KinesisClientLibConfiguratorTest$AlwaysSucceedCredentialsProvider"; - private String credentialName2 = "software.amazon.kinesis.multilang.config.KinesisClientLibConfiguratorTest$AlwaysFailCredentialsProvider"; - private String credentialNameKinesis = "software.amazon.kinesis.multilang.config.KinesisClientLibConfiguratorTest$AlwaysSucceedCredentialsProviderKinesis"; - private String credentialNameDynamoDB = "software.amazon.kinesis.multilang.config.KinesisClientLibConfiguratorTest$AlwaysSucceedCredentialsProviderDynamoDB"; - private String credentialNameCloudWatch = "software.amazon.kinesis.multilang.config.KinesisClientLibConfiguratorTest$AlwaysSucceedCredentialsProviderCloudWatch"; - private KinesisClientLibConfigurator configurator = new KinesisClientLibConfigurator(); - - @Rule - public final ExpectedException thrown = ExpectedException.none(); - - @Mock - private ShardRecordProcessorFactory shardRecordProcessorFactory; + private final String credentialName1 = AlwaysSucceedCredentialsProvider.class.getName(); + private final String credentialName2 = AlwaysFailCredentialsProvider.class.getName(); + private final String credentialNameKinesis = AlwaysSucceedCredentialsProviderKinesis.class.getName(); + private final String credentialNameDynamoDB = AlwaysSucceedCredentialsProviderDynamoDB.class.getName(); + private final String credentialNameCloudWatch = AlwaysSucceedCredentialsProviderCloudWatch.class.getName(); + private final KinesisClientLibConfigurator configurator = new KinesisClientLibConfigurator(); @Test public void testWithBasicSetup() { @@ -241,52 +231,32 @@ public class KinesisClientLibConfiguratorTest { "AWSCredentialsProvider = ABCD," + credentialName1, "workerId = 123", "initialPositionInStream = TriM_Horizon", "maxGetRecordsThreadPool = 0", "retryGetRecordsInSeconds = 0" }, '\n'); - InputStream input = new ByteArrayInputStream(test.getBytes()); - - try { - configurator.getConfiguration(input); - } catch (Exception e) { - fail("Don't expect to fail on invalid variable value"); - - } + getConfiguration(test); } @Test public void testWithInvalidIntValue() { String test = StringUtils.join(new String[] { "streamName = a", "applicationName = b", "AWSCredentialsProvider = " + credentialName1, "workerId = 123", "failoverTimeMillis = 100nf" }, '\n'); - InputStream input = new ByteArrayInputStream(test.getBytes()); - - try { - configurator.getConfiguration(input); - } catch (Exception e) { - fail("Don't expect to fail on invalid variable value"); - } + getConfiguration(test); } @Test public void testWithNegativeIntValue() { String test = StringUtils.join(new String[] { "streamName = a", "applicationName = b", "AWSCredentialsProvider = " + credentialName1, "workerId = 123", "failoverTimeMillis = -12" }, '\n'); - InputStream input = new ByteArrayInputStream(test.getBytes()); // separate input stream with getConfiguration to explicitly catch exception from the getConfiguration statement - try { - configurator.getConfiguration(input); - } catch (Exception e) { - fail("Don't expect to fail on invalid variable value"); - } + getConfiguration(test); } @Test(expected = IllegalArgumentException.class) public void testWithMissingCredentialsProvider() { - String test = StringUtils.join(new String[] { "streamName = a", "applicationName = b", "workerId = 123", "failoverTimeMillis = 100", "shardSyncIntervalMillis = 500" }, '\n'); - InputStream input = new ByteArrayInputStream(test.getBytes()); // separate input stream with getConfiguration to explicitly catch exception from the getConfiguration statement - configurator.getConfiguration(input); + getConfiguration(test); } @Test @@ -295,8 +265,7 @@ public class KinesisClientLibConfiguratorTest { new String[] { "streamName = a", "applicationName = b", "AWSCredentialsProvider = " + credentialName1, "failoverTimeMillis = 100", "shardSyncIntervalMillis = 500" }, '\n'); - InputStream input = new ByteArrayInputStream(test.getBytes()); - MultiLangDaemonConfiguration config = configurator.getConfiguration(input); + MultiLangDaemonConfiguration config = getConfiguration(test); // if workerId is not provided, configurator should assign one for it automatically assertNotNull(config.getWorkerIdentifier()); @@ -311,14 +280,11 @@ public class KinesisClientLibConfiguratorTest { "workerId = 123", "failoverTimeMillis = 100" }, '\n'); - InputStream input = new ByteArrayInputStream(test.getBytes()); - - configurator.getConfiguration(input); + getConfiguration(test); } @Test(expected = IllegalArgumentException.class) public void testWithEmptyStreamNameAndMissingStreamArn() { - String test = StringUtils.join(new String[] { "applicationName = b", "AWSCredentialsProvider = " + credentialName1, @@ -327,18 +293,14 @@ public class KinesisClientLibConfiguratorTest { "streamName = ", "streamArn = "}, '\n'); - InputStream input = new ByteArrayInputStream(test.getBytes()); - - configurator.getConfiguration(input); + getConfiguration(test); } @Test(expected = NullPointerException.class) public void testWithMissingApplicationName() { - String test = StringUtils.join(new String[] { "streamName = a", "AWSCredentialsProvider = " + credentialName1, "workerId = 123", "failoverTimeMillis = 100" }, '\n'); - InputStream input = new ByteArrayInputStream(test.getBytes()); - configurator.getConfiguration(input); + getConfiguration(test); } @Test @@ -347,11 +309,10 @@ public class KinesisClientLibConfiguratorTest { new String[] { "streamName = a", "applicationName = b", "AWSCredentialsProvider = " + credentialName2, "failoverTimeMillis = 100", "shardSyncIntervalMillis = 500" }, '\n'); - InputStream input = new ByteArrayInputStream(test.getBytes()); + MultiLangDaemonConfiguration config = getConfiguration(test); // separate input stream with getConfiguration to explicitly catch exception from the getConfiguration statement try { - MultiLangDaemonConfiguration config = configurator.getConfiguration(input); config.getKinesisCredentialsProvider().build(AwsCredentialsProvider.class).resolveCredentials(); fail("expect failure with wrong credentials provider"); } catch (Exception e) { @@ -367,25 +328,12 @@ public class KinesisClientLibConfiguratorTest { "AWSCredentialsProviderDynamoDB = " + credentialNameDynamoDB, "AWSCredentialsProviderCloudWatch = " + credentialNameCloudWatch, "failoverTimeMillis = 100", "shardSyncIntervalMillis = 500" }, '\n'); - InputStream input = new ByteArrayInputStream(test.getBytes()); // separate input stream with getConfiguration to explicitly catch exception from the getConfiguration statement - MultiLangDaemonConfiguration config = configurator.getConfiguration(input); - try { - config.getKinesisCredentialsProvider().build(AwsCredentialsProvider.class).resolveCredentials(); - } catch (Exception e) { - fail("Kinesis credential providers should not fail."); - } - try { - config.getDynamoDBCredentialsProvider().build(AwsCredentialsProvider.class).resolveCredentials(); - } catch (Exception e) { - fail("DynamoDB credential providers should not fail."); - } - try { - config.getCloudWatchCredentialsProvider().build(AwsCredentialsProvider.class).resolveCredentials(); - } catch (Exception e) { - fail("CloudWatch credential providers should not fail."); - } + final MultiLangDaemonConfiguration config = getConfiguration(test); + config.getKinesisCredentialsProvider().build(AwsCredentialsProvider.class).resolveCredentials(); + config.getDynamoDBCredentialsProvider().build(AwsCredentialsProvider.class).resolveCredentials(); + config.getCloudWatchCredentialsProvider().build(AwsCredentialsProvider.class).resolveCredentials(); } // TODO: fix this test @@ -396,17 +344,10 @@ public class KinesisClientLibConfiguratorTest { "AWSCredentialsProviderDynamoDB = " + credentialName2, "AWSCredentialsProviderCloudWatch = " + credentialName2, "failoverTimeMillis = 100", "shardSyncIntervalMillis = 500" }, '\n'); - InputStream input = new ByteArrayInputStream(test.getBytes()); // separate input stream with getConfiguration to explicitly catch exception from the getConfiguration statement - - // separate input stream with getConfiguration to explicitly catch exception from the getConfiguration statement - MultiLangDaemonConfiguration config = configurator.getConfiguration(input); - try { - config.getKinesisCredentialsProvider().build(AwsCredentialsProvider.class).resolveCredentials(); - } catch (Exception e) { - fail("Kinesis credential providers should not fail."); - } + final MultiLangDaemonConfiguration config = getConfiguration(test); + config.getKinesisCredentialsProvider().build(AwsCredentialsProvider.class).resolveCredentials(); try { config.getDynamoDBCredentialsProvider().build(AwsCredentialsProvider.class).resolveCredentials(); fail("DynamoDB credential providers should fail."); @@ -503,7 +444,6 @@ public class KinesisClientLibConfiguratorTest { private MultiLangDaemonConfiguration getConfiguration(String configString) { InputStream input = new ByteArrayInputStream(configString.getBytes()); - MultiLangDaemonConfiguration config = configurator.getConfiguration(input); - return config; + return configurator.getConfiguration(input); } } \ No newline at end of file diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index d67fff96..11983ded 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -97,6 +97,7 @@ import software.amazon.kinesis.retrieval.RetrievalConfig; import software.amazon.kinesis.schemaregistry.SchemaRegistryDecoder; import static software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy.StreamsLeasesDeletionType; +import static software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy.StreamsLeasesDeletionType.FORMER_STREAMS_AUTO_DETECTION_DEFERRED_DELETION; /** * @@ -489,7 +490,7 @@ public class Scheduler implements Runnable { } }; - if (formerStreamsLeasesDeletionStrategy.leaseDeletionType() == StreamsLeasesDeletionType.FORMER_STREAMS_AUTO_DETECTION_DEFERRED_DELETION) { + if (formerStreamsLeasesDeletionStrategy.leaseDeletionType() == FORMER_STREAMS_AUTO_DETECTION_DEFERRED_DELETION) { // Now, we are identifying the stale/old streams and enqueuing it for deferred deletion. // It is assumed that all the workers will always have the latest and consistent snapshot of streams // from the multiStreamTracker. @@ -521,7 +522,8 @@ public class Scheduler implements Runnable { if (!newStreamConfigMap.containsKey(streamIdentifier)) { if (SHOULD_DO_LEASE_SYNC_FOR_OLD_STREAMS) { log.info( - "Found old/deleted stream : {}. Triggering shard sync. Removing from tracked active streams.", streamIdentifier); + "Found old/deleted stream : {}. Triggering shard sync. Removing from tracked active streams.", + streamIdentifier); ShardSyncTaskManager shardSyncTaskManager = createOrGetShardSyncTaskManager( currentStreamConfigMap.get(streamIdentifier)); shardSyncTaskManager.submitShardSyncTask(); @@ -541,10 +543,13 @@ public class Scheduler implements Runnable { // Now let's scan the streamIdentifiersForLeaseCleanup eligible for deferred deletion and delete them. // StreamIdentifiers are eligible for deletion only when the deferment period has elapsed and // the streamIdentifiersForLeaseCleanup are not present in the latest snapshot. - final Map> staleStreamIdDeletionDecisionMap = staleStreamDeletionMap.keySet().stream().collect(Collectors - .partitioningBy(newStreamConfigMap::containsKey, Collectors.toSet())); - final Set staleStreamIdsToBeDeleted = staleStreamIdDeletionDecisionMap.get(false).stream().filter(streamIdentifier -> - Duration.between(staleStreamDeletionMap.get(streamIdentifier), Instant.now()).toMillis() >= waitPeriodToDeleteOldStreams.toMillis()) + final Map> staleStreamIdDeletionDecisionMap = + staleStreamDeletionMap.keySet().stream().collect( + Collectors.partitioningBy(newStreamConfigMap::containsKey, Collectors.toSet())); + final Set staleStreamIdsToBeDeleted = staleStreamIdDeletionDecisionMap.get(false) + .stream().filter(streamIdentifier -> + Duration.between(staleStreamDeletionMap.get(streamIdentifier), Instant.now()) + .toMillis() >= waitPeriodToDeleteOldStreams.toMillis()) .collect(Collectors.toSet()); // These are the streams which are deleted in Kinesis and we encounter resource not found during // shardSyncTask. This is applicable in MultiStreamMode only, in case of SingleStreamMode, store will diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java index 534b5fd3..11014505 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java @@ -92,7 +92,8 @@ public class HierarchicalShardSyncer { this(isMultiStreamMode, streamIdentifier, null); } - public HierarchicalShardSyncer(final boolean isMultiStreamMode, final String streamIdentifier, final DeletedStreamListProvider deletedStreamListProvider) { + public HierarchicalShardSyncer(final boolean isMultiStreamMode, final String streamIdentifier, + final DeletedStreamListProvider deletedStreamListProvider) { this.isMultiStreamMode = isMultiStreamMode; this.streamIdentifier = streamIdentifier; this.deletedStreamListProvider = deletedStreamListProvider; @@ -191,7 +192,9 @@ public class HierarchicalShardSyncer { if (!CollectionUtils.isNullOrEmpty(inconsistentShardIds)) { final String ids = StringUtils.join(inconsistentShardIds, ' '); throw new KinesisClientLibIOException(String.format( + // CHECKSTYLE.OFF: LineLength "%d open child shards (%s) are inconsistent. This can happen due to a race condition between describeStream and a reshard operation.", + // CHECKSTYLE.ON: LineLength inconsistentShardIds.size(), ids)); } } @@ -564,7 +567,8 @@ public class HierarchicalShardSyncer { return parentShardIds; } - public synchronized Lease createLeaseForChildShard(final ChildShard childShard, final StreamIdentifier streamIdentifier) throws InvalidStateException { + public synchronized Lease createLeaseForChildShard(final ChildShard childShard, + final StreamIdentifier streamIdentifier) throws InvalidStateException { final MultiStreamArgs multiStreamArgs = new MultiStreamArgs(isMultiStreamMode, streamIdentifier); return multiStreamArgs.isMultiStreamMode() ? newKCLMultiStreamLeaseForChildShard(childShard, streamIdentifier) @@ -583,7 +587,8 @@ public class HierarchicalShardSyncer { if (!CollectionUtils.isNullOrEmpty(childShard.parentShards())) { newLease.parentShardIds(childShard.parentShards()); } else { - throw new InvalidStateException("Unable to populate new lease for child shard " + childShard.shardId() + "because parent shards cannot be found."); + throw new InvalidStateException("Unable to populate new lease for child shard " + childShard.shardId() + + " because parent shards cannot be found."); } newLease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); newLease.ownerSwitchesSinceCheckpoint(0L); @@ -591,13 +596,15 @@ public class HierarchicalShardSyncer { return newLease; } - private static Lease newKCLMultiStreamLeaseForChildShard(final ChildShard childShard, final StreamIdentifier streamIdentifier) throws InvalidStateException { + private static Lease newKCLMultiStreamLeaseForChildShard(final ChildShard childShard, + final StreamIdentifier streamIdentifier) throws InvalidStateException { MultiStreamLease newLease = new MultiStreamLease(); newLease.leaseKey(MultiStreamLease.getLeaseKey(streamIdentifier.serialize(), childShard.shardId())); if (!CollectionUtils.isNullOrEmpty(childShard.parentShards())) { newLease.parentShardIds(childShard.parentShards()); } else { - throw new InvalidStateException("Unable to populate new lease for child shard " + childShard.shardId() + "because parent shards cannot be found."); + throw new InvalidStateException("Unable to populate new lease for child shard " + childShard.shardId() + + " because parent shards cannot be found."); } newLease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); newLease.ownerSwitchesSinceCheckpoint(0L); @@ -612,7 +619,6 @@ public class HierarchicalShardSyncer { * Note: Package level access only for testing purposes * * @param shard - * @return */ private static Lease newKCLLease(final Shard shard) { Lease newLease = new Lease(); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java index 8a442bd3..861626b6 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java @@ -208,7 +208,8 @@ public class LeaseCleanupManager { log.warn("Unable to cleanup lease for shard {} in {}", shardInfo.shardId(), streamIdentifier.streamName(), e); } } else { - log.info("Lease not present in lease table while cleaning the shard {} of {}", shardInfo.shardId(), streamIdentifier.streamName()); + log.info("Lease not present in lease table while cleaning the shard {} of {}", + shardInfo.shardId(), streamIdentifier.streamName()); cleanedUpCompletedLease = true; } } @@ -232,14 +233,17 @@ public class LeaseCleanupManager { // A lease that ended with SHARD_END from ResourceNotFoundException is safe to delete if it no longer exists in the // stream (known explicitly from ResourceNotFound being thrown when processing this shard), - private boolean cleanupLeaseForGarbageShard(Lease lease, Throwable e) throws DependencyException, ProvisionedThroughputException, InvalidStateException { + private boolean cleanupLeaseForGarbageShard(Lease lease, Throwable e) + throws DependencyException, ProvisionedThroughputException, InvalidStateException { log.warn("Deleting lease {} as it is not present in the stream.", lease, e); leaseCoordinator.leaseRefresher().deleteLease(lease); return true; } /** - * Check if the all of the parent shards for a given lease have an ongoing lease. If any one parent still has a lease, return false. Otherwise return true + * Check if the all of the parent shards for a given lease have an ongoing lease. If any one parent still has a + * lease, return false. Otherwise return true + * * @param lease * @param shardInfo * @return @@ -247,7 +251,8 @@ public class LeaseCleanupManager { * @throws ProvisionedThroughputException * @throws InvalidStateException */ - private boolean allParentShardLeasesDeleted(Lease lease, ShardInfo shardInfo) throws DependencyException, ProvisionedThroughputException, InvalidStateException { + private boolean allParentShardLeasesDeleted(Lease lease, ShardInfo shardInfo) + throws DependencyException, ProvisionedThroughputException, InvalidStateException { for (String parentShard : lease.parentShardIds()) { final Lease parentLease = leaseCoordinator.leaseRefresher().getLease(ShardInfo.getLeaseKey(shardInfo, parentShard)); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java index 4bef8442..c87f3eb8 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java @@ -152,7 +152,8 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { final LeaseSerializer serializer, final boolean consistentReads, @NonNull final TableCreatorCallback tableCreatorCallback, Duration dynamoDbRequestTimeout, final BillingMode billingMode) { - this(table, dynamoDBClient, serializer, consistentReads, tableCreatorCallback, dynamoDbRequestTimeout, billingMode, DefaultSdkAutoConstructList.getInstance()); + this(table, dynamoDBClient, serializer, consistentReads, tableCreatorCallback, dynamoDbRequestTimeout, + billingMode, DefaultSdkAutoConstructList.getInstance()); } /** diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java index 9ebed654..7e12b9a9 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java @@ -224,11 +224,13 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer { public Map getDynamoTakeLeaseUpdate(final Lease lease, String owner) { Map result = new HashMap<>(); - result.put(LEASE_OWNER_KEY, AttributeValueUpdate.builder().value(DynamoUtils.createAttributeValue(owner)).action(AttributeAction.PUT).build()); + result.put(LEASE_OWNER_KEY, AttributeValueUpdate.builder().value(DynamoUtils.createAttributeValue(owner)) + .action(AttributeAction.PUT).build()); String oldOwner = lease.leaseOwner(); if (oldOwner != null && !oldOwner.equals(owner)) { - result.put(OWNER_SWITCHES_KEY, AttributeValueUpdate.builder().value(DynamoUtils.createAttributeValue(1L)).action(AttributeAction.ADD).build()); + result.put(OWNER_SWITCHES_KEY, AttributeValueUpdate.builder().value(DynamoUtils.createAttributeValue(1L)) + .action(AttributeAction.ADD).build()); } return result; @@ -257,7 +259,8 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer { if (lease.pendingCheckpoint() != null && !lease.pendingCheckpoint().sequenceNumber().isEmpty()) { result.put(PENDING_CHECKPOINT_SEQUENCE_KEY, putUpdate(DynamoUtils.createAttributeValue(lease.pendingCheckpoint().sequenceNumber()))); - result.put(PENDING_CHECKPOINT_SUBSEQUENCE_KEY, putUpdate(DynamoUtils.createAttributeValue(lease.pendingCheckpoint().subSequenceNumber()))); + result.put(PENDING_CHECKPOINT_SUBSEQUENCE_KEY, putUpdate(DynamoUtils.createAttributeValue( + lease.pendingCheckpoint().subSequenceNumber()))); } else { result.put(PENDING_CHECKPOINT_SEQUENCE_KEY, AttributeValueUpdate.builder().action(AttributeAction.DELETE).build()); result.put(PENDING_CHECKPOINT_SUBSEQUENCE_KEY, AttributeValueUpdate.builder().action(AttributeAction.DELETE).build()); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index 4a96d87d..16398963 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -180,7 +180,8 @@ public class ShutdownTask implements ConsumerTask { // Create new lease for the child shards if they don't exist. // We have one valid scenario that shutdown task got created with SHARD_END reason and an empty list of childShards. // This would happen when KinesisDataFetcher(for polling mode) or FanOutRecordsPublisher(for StoS mode) catches ResourceNotFound exception. - // In this case, KinesisDataFetcher and FanOutRecordsPublisher will send out SHARD_END signal to trigger a shutdown task with empty list of childShards. + // In this case, KinesisDataFetcher and FanOutRecordsPublisher will send out SHARD_END signal to trigger a + // shutdown task with empty list of childShards. // This scenario could happen when customer deletes the stream while leaving the KCL application running. if (currentShardLease == null) { throw new InvalidStateException(leaseKey @@ -286,7 +287,8 @@ public class ShutdownTask implements ConsumerTask { for (ChildShard childShard : childShards) { final String leaseKey = ShardInfo.getLeaseKey(shardInfo, childShard.shardId()); if (leaseRefresher.getLease(leaseKey) == null) { - log.debug("{} - Shard {} - Attempting to create lease for child shard {}", shardDetector.streamIdentifier(), shardInfo.shardId(), leaseKey); + log.debug("{} - Shard {} - Attempting to create lease for child shard {}", + shardDetector.streamIdentifier(), shardInfo.shardId(), leaseKey); final Lease leaseToCreate = hierarchicalShardSyncer.createLeaseForChildShard(childShard, shardDetector.streamIdentifier()); final long startTime = System.currentTimeMillis(); boolean success = false; @@ -296,7 +298,8 @@ public class ShutdownTask implements ConsumerTask { } finally { MetricsUtil.addSuccessAndLatency(scope, "CreateLease", success, startTime, MetricsLevel.DETAILED); if (leaseToCreate.checkpoint() != null) { - final String metricName = leaseToCreate.checkpoint().isSentinelCheckpoint() ? leaseToCreate.checkpoint().sequenceNumber() : "SEQUENCE_NUMBER"; + final String metricName = leaseToCreate.checkpoint().isSentinelCheckpoint() ? + leaseToCreate.checkpoint().sequenceNumber() : "SEQUENCE_NUMBER"; MetricsUtil.addSuccess(scope, "CreateLease_" + metricName, true, MetricsLevel.DETAILED); } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java index 7177211f..a17e5e82 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java @@ -230,8 +230,9 @@ public class FanOutRecordsPublisher implements RecordsPublisher { subscriber.onNext(recordsRetrieved); } } catch (IllegalStateException e) { - + // CHECKSTYLE.OFF: LineLength log.warn("{}: Unable to enqueue the payload due to capacity restrictions in delivery queue with remaining capacity {}. Last successful request details -- {}", + // CHECKSTYLE.ON: LineLength streamAndShardId, recordsDeliveryQueue.remainingCapacity(), lastSuccessfulRequestDetails); throw e; } catch (Throwable t) { @@ -382,7 +383,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher { // Clear any lingering records in the queue. if (!recordsDeliveryQueue.isEmpty()) { log.warn("{}: Found non-empty queue while starting subscription. This indicates unsuccessful clean up of " - + "previous subscription - {}. Last successful request details -- {}", streamAndShardId, subscribeToShardId, lastSuccessfulRequestDetails); + + "previous subscription - {}. Last successful request details -- {}", + streamAndShardId, subscribeToShardId, lastSuccessfulRequestDetails); recordsDeliveryQueue.clear(); } } @@ -402,7 +404,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher { // The ack received for this onNext event will be ignored by the publisher as the global flow object should // be either null or renewed when the ack's flow identifier is evaluated. FanoutRecordsRetrieved response = new FanoutRecordsRetrieved( - ProcessRecordsInput.builder().records(Collections.emptyList()).isAtShardEnd(true).childShards(Collections.emptyList()).build(), null, + ProcessRecordsInput.builder().records(Collections.emptyList()).isAtShardEnd(true) + .childShards(Collections.emptyList()).build(), null, triggeringFlow != null ? triggeringFlow.getSubscribeToShardId() : shardId + "-no-flow-found"); subscriber.onNext(response); subscriber.onComplete(); @@ -515,7 +518,9 @@ public class FanOutRecordsPublisher implements RecordsPublisher { private void updateAvailableQueueSpaceAndRequestUpstream(RecordFlow triggeringFlow) { if (availableQueueSpace <= 0) { log.debug( + // CHECKSTYLE.OFF: LineLength "{}: [SubscriptionLifetime] (FanOutRecordsPublisher#recordsReceived) @ {} id: {} -- Attempted to decrement availableQueueSpace to below 0", + // CHECKSTYLE.ON: LineLength streamAndShardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId); } else { availableQueueSpace--; @@ -544,7 +549,9 @@ public class FanOutRecordsPublisher implements RecordsPublisher { if (!isActiveFlow(triggeringFlow)) { log.debug( + // CHECKSTYLE.OFF: LineLength "{}: [SubscriptionLifetime]: (FanOutRecordsPublisher#onComplete) @ {} id: {} -- Received spurious onComplete from unexpected flow. Ignoring.", + // CHECKSTYLE.ON: LineLength streamAndShardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId); return; } @@ -801,7 +808,9 @@ public class FanOutRecordsPublisher implements RecordsPublisher { throwable.getMessage()); if (this.isDisposed) { log.debug( + // CHECKSTYLE.OFF: LineLength "{}: [SubscriptionLifetime]: (RecordFlow#exceptionOccurred) @ {} id: {} -- This flow has been disposed, not dispatching error. {}: {}", + // CHECKSTYLE.ON: LineLength parent.streamAndShardId, connectionStartedAt, subscribeToShardId, throwable.getClass().getName(), throwable.getMessage()); this.isErrorDispatched = true; @@ -891,7 +900,9 @@ public class FanOutRecordsPublisher implements RecordsPublisher { subscription.cancel(); } catch (Throwable t) { log.error( + // CHECKSTYLE.OFF: LineLength "{}: [SubscriptionLifetime]: (RecordFlow#complete) @ {} id: {} -- Exception while trying to cancel failed subscription: {}", + // CHECKSTYLE.ON: LineLength parent.streamAndShardId, connectionStartedAt, subscribeToShardId, t.getMessage(), t); } } @@ -953,12 +964,16 @@ public class FanOutRecordsPublisher implements RecordsPublisher { if (flow.shouldSubscriptionCancel()) { if (flow.isCancelled) { log.debug( + // CHECKSTYLE.OFF: LineLength "{}: [SubscriptionLifetime]: (RecordSubscription#onSubscribe) @ {} id: {} -- Subscription was cancelled before onSubscribe", + // CHECKSTYLE.ON: LineLength parent.streamAndShardId, connectionStartedAt, subscribeToShardId); } if (flow.isDisposed) { log.debug( + // CHECKSTYLE.OFF: LineLength "{}: [SubscriptionLifetime]: (RecordSubscription#onSubscribe) @ {} id: {} -- RecordFlow has been disposed cancelling subscribe", + // CHECKSTYLE.ON: LineLength parent.streamAndShardId, connectionStartedAt, subscribeToShardId); } log.debug( diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java index 3b5bfec9..fd1380f2 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java @@ -38,6 +38,7 @@ import static org.mockito.internal.verification.VerificationModeFactory.atMost; import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.HashMap; @@ -176,7 +177,8 @@ public class SchedulerTest { shardRecordProcessorFactory = new TestShardRecordProcessorFactory(); checkpointConfig = new CheckpointConfig().checkpointFactory(new TestKinesisCheckpointFactory()); - coordinatorConfig = new CoordinatorConfig(applicationName).parentShardPollIntervalMillis(100L).workerStateChangeListener(workerStateChangeListener); + coordinatorConfig = new CoordinatorConfig(applicationName).parentShardPollIntervalMillis(100L) + .workerStateChangeListener(workerStateChangeListener); leaseManagementConfig = new LeaseManagementConfig(tableName, dynamoDBClient, kinesisClient, streamName, workerIdentifier).leaseManagementFactory(new TestKinesisLeaseManagementFactory(false, false)); lifecycleConfig = new LifecycleConfig(); @@ -188,7 +190,8 @@ public class SchedulerTest { when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector); when(shardSyncTaskManager.hierarchicalShardSyncer()).thenReturn(new HierarchicalShardSyncer()); when(shardSyncTaskManager.callShardSyncTask()).thenReturn(new TaskResult(null)); - when(retrievalFactory.createGetRecordsCache(any(ShardInfo.class), any(StreamConfig.class), any(MetricsFactory.class))).thenReturn(recordsPublisher); + when(retrievalFactory.createGetRecordsCache(any(ShardInfo.class), any(StreamConfig.class), + any(MetricsFactory.class))).thenReturn(recordsPublisher); when(shardDetector.streamIdentifier()).thenReturn(mock(StreamIdentifier.class)); scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, @@ -645,7 +648,8 @@ public class SchedulerTest { testMultiStreamStaleStreamsAreDeletedAfterDefermentPeriod(true, null); } - private final void testMultiStreamStaleStreamsAreDeletedAfterDefermentPeriod(boolean expectSyncedStreams, Set currentStreamConfigMapOverride) + private void testMultiStreamStaleStreamsAreDeletedAfterDefermentPeriod(boolean expectSyncedStreams, + Set currentStreamConfigMapOverride) throws DependencyException, ProvisionedThroughputException, InvalidStateException { List streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig( StreamIdentifier.multiStreamInstance( @@ -1294,21 +1298,20 @@ public class SchedulerTest { } } - // TODO: Upgrade to mockito >= 2.7.13, and use Spy on MultiStreamTracker to directly access the default methods without implementing TestMultiStreamTracker class + // TODO: Upgrade to mockito >= 2.7.13, and use Spy on MultiStreamTracker to directly access the default methods + // without implementing TestMultiStreamTracker class @NoArgsConstructor private class TestMultiStreamTracker implements MultiStreamTracker { @Override - public List streamConfigList(){ - return new ArrayList() {{ - add(new StreamConfig(StreamIdentifier.multiStreamInstance("123456789012:stream1:1"), InitialPositionInStreamExtended.newInitialPosition( - InitialPositionInStream.LATEST))); - add(new StreamConfig(StreamIdentifier.multiStreamInstance("123456789012:stream2:2"), InitialPositionInStreamExtended.newInitialPosition( - InitialPositionInStream.LATEST))); - add(new StreamConfig(StreamIdentifier.multiStreamInstance("210987654321:stream1:1"), InitialPositionInStreamExtended.newInitialPosition( - InitialPositionInStream.LATEST))); - add(new StreamConfig(StreamIdentifier.multiStreamInstance("210987654321:stream2:3"), InitialPositionInStreamExtended.newInitialPosition( - InitialPositionInStream.LATEST))); - }}; + public List streamConfigList() { + final InitialPositionInStreamExtended latest = + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST); + + return Arrays.asList( + new StreamConfig(StreamIdentifier.multiStreamInstance("123456789012:stream1:1"), latest), + new StreamConfig(StreamIdentifier.multiStreamInstance("123456789012:stream2:2"), latest), + new StreamConfig(StreamIdentifier.multiStreamInstance("210987654321:stream1:1"), latest), + new StreamConfig(StreamIdentifier.multiStreamInstance("210987654321:stream2:3"), latest)); } @Override diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ExceptionThrowingLeaseRefresher.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ExceptionThrowingLeaseRefresher.java index 5e612ade..7f93216d 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ExceptionThrowingLeaseRefresher.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ExceptionThrowingLeaseRefresher.java @@ -141,7 +141,8 @@ public class ExceptionThrowingLeaseRefresher implements LeaseRefresher { } @Override - public List listLeasesForStream(StreamIdentifier streamIdentifier) throws DependencyException, InvalidStateException, ProvisionedThroughputException { + public List listLeasesForStream(StreamIdentifier streamIdentifier) + throws DependencyException, InvalidStateException, ProvisionedThroughputException { throwExceptions("listLeasesForStream", ExceptionThrowingLeaseRefresherMethods.LISTLEASESFORSTREAM); return leaseRefresher.listLeasesForStream(streamIdentifier); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java index 1a1abc0e..6df34633 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java @@ -2257,10 +2257,12 @@ public class HierarchicalShardSyncerTest { testEmptyLeaseTableBootstrapUsesListShardsWithFilter(INITIAL_POSITION_AT_TIMESTAMP, shardFilter); } - public void testEmptyLeaseTableBootstrapUsesListShardsWithFilter(InitialPositionInStreamExtended initialPosition, ShardFilter shardFilter) throws Exception { + public void testEmptyLeaseTableBootstrapUsesListShardsWithFilter(InitialPositionInStreamExtended initialPosition, + ShardFilter shardFilter) throws Exception { final String shardId0 = "shardId-0"; final List shards = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, - ShardObjectHelper.newSequenceNumberRange("1", null), ShardObjectHelper.newHashKeyRange(ShardObjectHelper.MIN_HASH_KEY, ShardObjectHelper.MAX_HASH_KEY))); + ShardObjectHelper.newSequenceNumberRange("1", null), + ShardObjectHelper.newHashKeyRange(ShardObjectHelper.MIN_HASH_KEY, ShardObjectHelper.MAX_HASH_KEY))); when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(true); when(shardDetector.listShardsWithFilter(shardFilter)).thenReturn(shards); @@ -2278,8 +2280,10 @@ public class HierarchicalShardSyncerTest { final String shardId0 = "shardId-0"; final String shardId1 = "shardId-1"; - final List shardsWithLeases = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"))); - final List shardsWithoutLeases = Arrays.asList(ShardObjectHelper.newShard(shardId1, null, null, ShardObjectHelper.newSequenceNumberRange("3", "4"))); + final List shardsWithLeases = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, + ShardObjectHelper.newSequenceNumberRange("1", "2"))); + final List shardsWithoutLeases = Arrays.asList(ShardObjectHelper.newShard(shardId1, null, null, + ShardObjectHelper.newSequenceNumberRange("3", "4"))); final List currentLeases = createLeasesFromShards(shardsWithLeases, ExtendedSequenceNumber.LATEST, "foo"); @@ -2301,8 +2305,10 @@ public class HierarchicalShardSyncerTest { @Test(expected = KinesisClientLibIOException.class) public void testEmptyLeaseTableThrowsExceptionWhenHashRangeIsStillIncompleteAfterRetries() throws Exception { final List shardsWithIncompleteHashRange = Arrays.asList( - ShardObjectHelper.newShard("shardId-0", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange("0", "1")), - ShardObjectHelper.newShard("shardId-1", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange("2", "3")) + ShardObjectHelper.newShard("shardId-0", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), + ShardObjectHelper.newHashKeyRange("0", "1")), + ShardObjectHelper.newShard("shardId-1", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), + ShardObjectHelper.newHashKeyRange("2", "3")) ); when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(true); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardObjectHelper.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardObjectHelper.java index cc03a203..4e2bae48 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardObjectHelper.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardObjectHelper.java @@ -67,7 +67,8 @@ public class ShardObjectHelper { String parentShardId, String adjacentParentShardId, SequenceNumberRange sequenceNumberRange) { - return newShard(shardId, parentShardId, adjacentParentShardId, sequenceNumberRange, HashKeyRange.builder().startingHashKey("1").endingHashKey("100").build()); + return newShard(shardId, parentShardId, adjacentParentShardId, sequenceNumberRange, + HashKeyRange.builder().startingHashKey("1").endingHashKey("100").build()); } /** Helper method to create a new shard object. diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerIntegrationTest.java index 772aa542..d6c2d0b3 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerIntegrationTest.java @@ -141,7 +141,8 @@ public class DynamoDBLeaseTakerIntegrationTest extends LeaseIntegrationTest { .withLease("5", "foo") .build(); - // In the current DynamoDBLeaseTaker implementation getAllLeases() gets leases from an internal cache that is built during takeLeases() operation + // In the current DynamoDBLeaseTaker implementation getAllLeases() gets leases from an internal cache that is + // built during takeLeases() operation assertThat(taker.allLeases().size(), equalTo(0)); taker.takeLeases(); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java index f94d82fd..c085f196 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java @@ -40,7 +40,6 @@ import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; -import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.ChildShard; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; import software.amazon.kinesis.common.InitialPositionInStream; @@ -88,12 +87,8 @@ public class ConsumerStatesTest { @Mock private ShutdownNotification shutdownNotification; @Mock - private InitialPositionInStreamExtended initialPositionInStream; - @Mock private RecordsPublisher recordsPublisher; @Mock - private KinesisAsyncClient kinesisClient; - @Mock private ShardDetector shardDetector; @Mock private HierarchicalShardSyncer hierarchicalShardSyncer; @@ -121,7 +116,8 @@ public class ConsumerStatesTest { @Before public void setup() { - argument = new ShardConsumerArgument(shardInfo, StreamIdentifier.singleStreamInstance(STREAM_NAME), leaseCoordinator, executorService, recordsPublisher, + argument = new ShardConsumerArgument(shardInfo, StreamIdentifier.singleStreamInstance(STREAM_NAME), + leaseCoordinator, executorService, recordsPublisher, shardRecordProcessor, checkpointer, recordProcessorCheckpointer, parentShardPollIntervalMillis, taskBackoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, listShardsBackoffTimeInMillis, maxListShardsRetryAttempts, shouldCallProcessRecordsEvenForEmptyRecordList, idleTimeInMillis, @@ -134,7 +130,7 @@ public class ConsumerStatesTest { when(recordProcessorCheckpointer.checkpointer()).thenReturn(checkpointer); } - private static final Class LEASE_REFRESHER_CLASS = (Class) (Class) LeaseRefresher.class; + private static final Class LEASE_REFRESHER_CLASS = LeaseRefresher.class; @Test public void blockOnParentStateTest() { @@ -431,7 +427,6 @@ public class ConsumerStatesTest { } } this.matchingField = matching; - } @Override diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ProcessTaskTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ProcessTaskTest.java index 12476837..1e89d8cc 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ProcessTaskTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ProcessTaskTest.java @@ -101,7 +101,6 @@ public class ProcessTaskTest { @Mock private GlueSchemaRegistryDeserializer glueSchemaRegistryDeserializer; - private static final byte[] TEST_DATA = new byte[] { 1, 2, 3, 4 }; private final String shardId = "shard-test"; @@ -116,7 +115,6 @@ public class ProcessTaskTest { private ProcessTask processTask; - @Before public void setUpProcessTask() { when(checkpointer.checkpointer()).thenReturn(mock(Checkpointer.class)); @@ -130,7 +128,8 @@ public class ProcessTaskTest { } private ProcessTask makeProcessTask(ProcessRecordsInput processRecordsInput, GlueSchemaRegistryDeserializer deserializer) { - return makeProcessTask(processRecordsInput, new AggregatorUtil(), skipShardSyncAtWorkerInitializationIfLeasesExist, new SchemaRegistryDecoder(deserializer)); + return makeProcessTask(processRecordsInput, new AggregatorUtil(), skipShardSyncAtWorkerInitializationIfLeasesExist, + new SchemaRegistryDecoder(deserializer)); } private ProcessTask makeProcessTask(ProcessRecordsInput processRecordsInput, AggregatorUtil aggregatorUtil, @@ -149,11 +148,8 @@ public class ProcessTaskTest { ); } - - @Test public void testProcessTaskWithShardEndReached() { - processTask = makeProcessTask(processRecordsInput); when(processRecordsInput.isAtShardEnd()).thenReturn(true); @@ -429,7 +425,8 @@ public class ProcessTaskTest { when(processRecordsInput.records()).thenReturn(rawRecords); ProcessTask processTask = makeProcessTask(processRecordsInput, aggregatorUtil, false); - ShardRecordProcessorOutcome outcome = testWithRecords(processTask, new ExtendedSequenceNumber(sequenceNumber.subtract(BigInteger.valueOf(100)).toString(), 0L), + ShardRecordProcessorOutcome outcome = testWithRecords(processTask, + new ExtendedSequenceNumber(sequenceNumber.subtract(BigInteger.valueOf(100)).toString(), 0L), new ExtendedSequenceNumber(sequenceNumber.toString(), 0L)); assertThat(outcome.processRecordsCall.records(), equalTo(expectedRecords)); @@ -645,12 +642,12 @@ public class ProcessTaskTest { } private ShardRecordProcessorOutcome testWithRecords(List records, - ExtendedSequenceNumber lastCheckpointValue, ExtendedSequenceNumber largestPermittedCheckpointValue) { + ExtendedSequenceNumber lastCheckpointValue, ExtendedSequenceNumber largestPermittedCheckpointValue) { return testWithRecords(records, lastCheckpointValue, largestPermittedCheckpointValue, new AggregatorUtil()); } private ShardRecordProcessorOutcome testWithRecords(List records, ExtendedSequenceNumber lastCheckpointValue, - ExtendedSequenceNumber largestPermittedCheckpointValue, AggregatorUtil aggregatorUtil) { + ExtendedSequenceNumber largestPermittedCheckpointValue, AggregatorUtil aggregatorUtil) { when(processRecordsInput.records()).thenReturn(records); return testWithRecords( makeProcessTask(processRecordsInput, aggregatorUtil, skipShardSyncAtWorkerInitializationIfLeasesExist), @@ -658,7 +655,7 @@ public class ProcessTaskTest { } private ShardRecordProcessorOutcome testWithRecords(ProcessTask processTask, ExtendedSequenceNumber lastCheckpointValue, - ExtendedSequenceNumber largestPermittedCheckpointValue) { + ExtendedSequenceNumber largestPermittedCheckpointValue) { when(checkpointer.lastCheckpointValue()).thenReturn(lastCheckpointValue); when(checkpointer.largestPermittedCheckpointValue()).thenReturn(largestPermittedCheckpointValue); processTask.call(); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java index 9615794b..fc242fed 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java @@ -103,7 +103,7 @@ public class FanOutRecordsPublisherTest { private SubscribeToShardEvent batchEvent; @Test - public void simpleTest() throws Exception { + public void testSimple() { FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN); ArgumentCaptor captor = ArgumentCaptor @@ -218,8 +218,10 @@ public class FanOutRecordsPublisherTest { List matchers = records.stream().map(KinesisClientRecordMatcher::new) .collect(Collectors.toList()); - batchEvent = SubscribeToShardEvent.builder().millisBehindLatest(100L).records(records).continuationSequenceNumber(CONTINUATION_SEQUENCE_NUMBER).build(); - SubscribeToShardEvent invalidEvent = SubscribeToShardEvent.builder().millisBehindLatest(100L).records(records).childShards(Collections.emptyList()).build(); + batchEvent = SubscribeToShardEvent.builder().millisBehindLatest(100L).records(records) + .continuationSequenceNumber(CONTINUATION_SEQUENCE_NUMBER).build(); + SubscribeToShardEvent invalidEvent = SubscribeToShardEvent.builder().millisBehindLatest(100L) + .records(records).childShards(Collections.emptyList()).build(); captor.getValue().onNext(batchEvent); captor.getValue().onNext(invalidEvent); @@ -238,7 +240,7 @@ public class FanOutRecordsPublisherTest { } @Test - public void testIfAllEventsReceivedWhenNoTasksRejectedByExecutor() throws Exception { + public void testIfAllEventsReceivedWhenNoTasksRejectedByExecutor() { FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN); ArgumentCaptor captor = ArgumentCaptor @@ -414,7 +416,8 @@ public class FanOutRecordsPublisherTest { int totalServicePublisherEvents = 1000; int initialDemand = 0; BackpressureAdheringServicePublisher servicePublisher = - new BackpressureAdheringServicePublisher(servicePublisherAction, totalServicePublisherEvents, servicePublisherTaskCompletionLatch, initialDemand); + new BackpressureAdheringServicePublisher(servicePublisherAction, totalServicePublisherEvents, + servicePublisherTaskCompletionLatch, initialDemand); doNothing().when(publisher).subscribe(captor.capture()); @@ -848,7 +851,8 @@ public class FanOutRecordsPublisherTest { int totalServicePublisherEvents = 1000; int initialDemand = 9; BackpressureAdheringServicePublisher servicePublisher = - new BackpressureAdheringServicePublisher(servicePublisherAction, totalServicePublisherEvents, servicePublisherTaskCompletionLatch, initialDemand); + new BackpressureAdheringServicePublisher(servicePublisherAction, totalServicePublisherEvents, + servicePublisherTaskCompletionLatch, initialDemand); doNothing().when(publisher).subscribe(captor.capture()); @@ -941,7 +945,8 @@ public class FanOutRecordsPublisherTest { int totalServicePublisherEvents = 1000; int initialDemand = 11; BackpressureAdheringServicePublisher servicePublisher = - new BackpressureAdheringServicePublisher(servicePublisherAction, totalServicePublisherEvents, servicePublisherTaskCompletionLatch, initialDemand); + new BackpressureAdheringServicePublisher(servicePublisherAction, totalServicePublisherEvents, + servicePublisherTaskCompletionLatch, initialDemand); doNothing().when(publisher).subscribe(captor.capture()); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/AsynchronousGetRecordsRetrievalStrategyTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/AsynchronousGetRecordsRetrievalStrategyTest.java index 2c7d8fd1..fc4b4fe1 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/AsynchronousGetRecordsRetrievalStrategyTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/AsynchronousGetRecordsRetrievalStrategyTest.java @@ -126,7 +126,7 @@ public class AsynchronousGetRecordsRetrievalStrategyTest { } @Test(expected = IllegalStateException.class) - public void testStrategyIsShutdown() throws Exception { + public void testStrategyIsShutdown() { AsynchronousGetRecordsRetrievalStrategy strategy = new AsynchronousGetRecordsRetrievalStrategy(dataFetcher, executorService, (int) RETRY_GET_RECORDS_IN_SECONDS, completionServiceSupplier, SHARD_ID); @@ -141,7 +141,8 @@ public class AsynchronousGetRecordsRetrievalStrategyTest { executorService, (int) RETRY_GET_RECORDS_IN_SECONDS, completionServiceSupplier, SHARD_ID); when(executorService.isShutdown()).thenReturn(false); - when(completionService.submit(any())).thenReturn(blockedFuture).thenThrow(new RejectedExecutionException("Rejected!")).thenReturn(successfulFuture); + when(completionService.submit(any())).thenReturn(blockedFuture) + .thenThrow(new RejectedExecutionException("Rejected!")).thenReturn(successfulFuture); when(completionService.poll(anyLong(), any())).thenReturn(null).thenReturn(null).thenReturn(successfulFuture); when(successfulFuture.get()).thenReturn(dataFetcherResult); when(successfulFuture.cancel(anyBoolean())).thenReturn(false); @@ -156,7 +157,6 @@ public class AsynchronousGetRecordsRetrievalStrategyTest { verify(successfulFuture).cancel(eq(true)); verify(blockedFuture).cancel(eq(true)); - assertThat(actualResult, equalTo(expectedResponses)); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherIntegrationTest.java index d9955da4..780ac4ad 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherIntegrationTest.java @@ -162,7 +162,8 @@ public class PrefetchRecordsPublisherIntegrationTest { @Test public void testDifferentShardCaches() { final ExecutorService executorService2 = spy(Executors.newFixedThreadPool(1)); - final KinesisDataFetcher kinesisDataFetcher = spy(new KinesisDataFetcher(kinesisClient, streamName, shardId, MAX_RECORDS_PER_CALL, NULL_METRICS_FACTORY)); + final KinesisDataFetcher kinesisDataFetcher = spy(new KinesisDataFetcher(kinesisClient, streamName, shardId, + MAX_RECORDS_PER_CALL, NULL_METRICS_FACTORY)); final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy2 = spy(new AsynchronousGetRecordsRetrievalStrategy(kinesisDataFetcher, 5 , 5, shardId)); final PrefetchRecordsPublisher recordsPublisher2 = new PrefetchRecordsPublisher( diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java index af02469a..8d88151b 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java @@ -133,7 +133,8 @@ public class PrefetchRecordsPublisherTest { getRecordsCache = createPrefetchRecordsPublisher(0L); spyQueue = spy(getRecordsCache.getPublisherSession().prefetchRecordsQueue()); records = spy(new ArrayList<>()); - getRecordsResponse = GetRecordsResponse.builder().records(records).nextShardIterator(NEXT_SHARD_ITERATOR).childShards(new ArrayList<>()).build(); + getRecordsResponse = GetRecordsResponse.builder().records(records).nextShardIterator(NEXT_SHARD_ITERATOR) + .childShards(Collections.emptyList()).build(); when(getRecordsRetrievalStrategy.getRecords(eq(MAX_RECORDS_PER_CALL))).thenReturn(getRecordsResponse); } diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml index 76c4b330..363527bd 100644 --- a/checkstyle/checkstyle.xml +++ b/checkstyle/checkstyle.xml @@ -10,7 +10,7 @@ - + @@ -22,8 +22,8 @@ - +