diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/ShardRecordProcessorCheckpointer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/ShardRecordProcessorCheckpointer.java index fd375264..5fbac1d7 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/ShardRecordProcessorCheckpointer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/ShardRecordProcessorCheckpointer.java @@ -144,7 +144,8 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi * {@inheritDoc} */ @Override - public PreparedCheckpointer prepareCheckpoint(byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException { + public PreparedCheckpointer prepareCheckpoint(byte[] applicationState) + throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException { return prepareCheckpoint(largestPermittedCheckpointValue.sequenceNumber(), applicationState); } @@ -152,7 +153,8 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi * {@inheritDoc} */ @Override - public PreparedCheckpointer prepareCheckpoint(Record record, byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException { + public PreparedCheckpointer prepareCheckpoint(Record record, byte[] applicationState) + throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException { // // TODO: UserRecord Deprecation // diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/dynamodb/DynamoDBCheckpointer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/dynamodb/DynamoDBCheckpointer.java index d9646351..1aa258bb 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/dynamodb/DynamoDBCheckpointer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/dynamodb/DynamoDBCheckpointer.java @@ -103,7 +103,8 @@ public class DynamoDBCheckpointer implements Checkpointer { } @Override - public void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken, byte[] pendingCheckpointState) throws KinesisClientLibException { + public void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken, + byte[] pendingCheckpointState) throws KinesisClientLibException { try { boolean wasSuccessful = prepareCheckpoint(leaseKey, pendingCheckpoint, UUID.fromString(concurrencyToken), pendingCheckpointState); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/HashKeyRangeForLease.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/HashKeyRangeForLease.java index d2540073..30f8963a 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/HashKeyRangeForLease.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/HashKeyRangeForLease.java @@ -23,10 +23,11 @@ import software.amazon.awssdk.services.kinesis.model.HashKeyRange; import java.math.BigInteger; -@Value @Accessors(fluent = true) /** * Lease POJO to hold the starting hashkey range and ending hashkey range of kinesis shards. */ +@Accessors(fluent = true) +@Value public class HashKeyRangeForLease { private final BigInteger startingHashKey; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/NoOpWorkerStateChangeListener.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/NoOpWorkerStateChangeListener.java index 3e0432cb..ec21e4f6 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/NoOpWorkerStateChangeListener.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/NoOpWorkerStateChangeListener.java @@ -16,15 +16,15 @@ package software.amazon.kinesis.coordinator; public class NoOpWorkerStateChangeListener implements WorkerStateChangeListener { - /** - * Empty constructor for NoOp Worker State Change Listener - */ - public NoOpWorkerStateChangeListener() { + /** + * Empty constructor for NoOp Worker State Change Listener + */ + public NoOpWorkerStateChangeListener() { - } + } - @Override - public void onWorkerStateChange(WorkerState newState) { + @Override + public void onWorkerStateChange(WorkerState newState) { - } + } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java index dac77351..aecb1331 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java @@ -348,7 +348,7 @@ class PeriodicShardSyncManager { ((MultiStreamLease) lease).shardId() : lease.leaseKey(); final Shard shard = kinesisShards.get(shardId); - if(shard == null) { + if (shard == null) { return lease; } lease.hashKeyRange(fromHashKeyRange(shard.hashKeyRange())); @@ -372,7 +372,7 @@ class PeriodicShardSyncManager { List leasesWithHashKeyRanges) { // Sort the hash ranges by starting hash key. List sortedLeasesWithHashKeyRanges = sortLeasesByHashRange(leasesWithHashKeyRanges); - if(sortedLeasesWithHashKeyRanges.isEmpty()) { + if (sortedLeasesWithHashKeyRanges.isEmpty()) { log.error("No leases with valid hashranges found for stream {}", streamIdentifier); return Optional.of(new HashRangeHole()); } @@ -417,8 +417,9 @@ class PeriodicShardSyncManager { @VisibleForTesting static List sortLeasesByHashRange(List leasesWithHashKeyRanges) { - if (leasesWithHashKeyRanges.size() == 0 || leasesWithHashKeyRanges.size() == 1) + if (leasesWithHashKeyRanges.size() == 0 || leasesWithHashKeyRanges.size() == 1) { return leasesWithHashKeyRanges; + } Collections.sort(leasesWithHashKeyRanges, new HashKeyRangeComparator()); return leasesWithHashKeyRanges; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index 49d271b2..d67fff96 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -544,7 +544,8 @@ public class Scheduler implements Runnable { final Map> staleStreamIdDeletionDecisionMap = staleStreamDeletionMap.keySet().stream().collect(Collectors .partitioningBy(newStreamConfigMap::containsKey, Collectors.toSet())); final Set staleStreamIdsToBeDeleted = staleStreamIdDeletionDecisionMap.get(false).stream().filter(streamIdentifier -> - Duration.between(staleStreamDeletionMap.get(streamIdentifier), Instant.now()).toMillis() >= waitPeriodToDeleteOldStreams.toMillis()).collect(Collectors.toSet()); + Duration.between(staleStreamDeletionMap.get(streamIdentifier), Instant.now()).toMillis() >= waitPeriodToDeleteOldStreams.toMillis()) + .collect(Collectors.toSet()); // These are the streams which are deleted in Kinesis and we encounter resource not found during // shardSyncTask. This is applicable in MultiStreamMode only, in case of SingleStreamMode, store will // not have any data. @@ -611,7 +612,7 @@ public class Scheduler implements Runnable { } private void removeStreamsFromStaleStreamsList(Set streamIdentifiers) { - for(StreamIdentifier streamIdentifier : streamIdentifiers) { + for (StreamIdentifier streamIdentifier : streamIdentifiers) { staleStreamDeletionMap.remove(streamIdentifier); } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/WorkerStateChangeListener.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/WorkerStateChangeListener.java index ddce2a10..dd7162b3 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/WorkerStateChangeListener.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/WorkerStateChangeListener.java @@ -19,16 +19,16 @@ package software.amazon.kinesis.coordinator; */ @FunctionalInterface public interface WorkerStateChangeListener { - enum WorkerState { - CREATED, - INITIALIZING, - STARTED, - SHUT_DOWN_STARTED, - SHUT_DOWN - } + enum WorkerState { + CREATED, + INITIALIZING, + STARTED, + SHUT_DOWN_STARTED, + SHUT_DOWN + } - void onWorkerStateChange(WorkerState newState); + void onWorkerStateChange(WorkerState newState); - default void onAllInitializationAttemptsFailed(Throwable e) { - } + default void onAllInitializationAttemptsFailed(Throwable e) { + } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java index b71796d3..534b5fd3 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java @@ -80,7 +80,7 @@ public class HierarchicalShardSyncer { private static final String MIN_HASH_KEY = BigInteger.ZERO.toString(); private static final String MAX_HASH_KEY = new BigInteger("2").pow(128).subtract(BigInteger.ONE).toString(); - private static final int retriesForCompleteHashRange = 3; + private static final int RETRIES_FOR_COMPLETE_HASH_RANGE = 3; private static final long DELAY_BETWEEN_LIST_SHARDS_MILLIS = 1000; @@ -98,7 +98,7 @@ public class HierarchicalShardSyncer { this.deletedStreamListProvider = deletedStreamListProvider; } - private static final BiFunction shardIdFromLeaseDeducer = + private static final BiFunction SHARD_ID_FROM_LEASE_DEDUCER = (lease, multiStreamArgs) -> multiStreamArgs.isMultiStreamMode() ? ((MultiStreamLease) lease).shardId() : @@ -129,7 +129,9 @@ public class HierarchicalShardSyncer { isLeaseTableEmpty); } - //Provide a pre-collcted list of shards to avoid calling ListShards API + /** + * Provide a pre-collected list of shards to avoid calling ListShards API + */ public synchronized boolean checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector, final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition, List latestShards, final boolean ignoreUnexpectedChildShards, final MetricsScope scope, final boolean isLeaseTableEmpty) @@ -163,7 +165,7 @@ public class HierarchicalShardSyncer { final long startTime = System.currentTimeMillis(); boolean success = false; try { - if(leaseRefresher.createLeaseIfNotExists(lease)) { + if (leaseRefresher.createLeaseIfNotExists(lease)) { createdLeases.add(lease); } success = true; @@ -268,7 +270,7 @@ public class HierarchicalShardSyncer { List shards; - for (int i = 0; i < retriesForCompleteHashRange; i++) { + for (int i = 0; i < RETRIES_FOR_COMPLETE_HASH_RANGE; i++) { shards = shardDetector.listShardsWithFilter(shardFilter); if (shards == null) { @@ -284,7 +286,7 @@ public class HierarchicalShardSyncer { } throw new KinesisClientLibIOException("Hash range of shards returned for " + streamName + " was incomplete after " - + retriesForCompleteHashRange + " retries."); + + RETRIES_FOR_COMPLETE_HASH_RANGE + " retries."); } private List getShardList(@NonNull final ShardDetector shardDetector) throws KinesisClientLibIOException { @@ -365,7 +367,8 @@ public class HierarchicalShardSyncer { * @return List of new leases to create sorted by starting sequenceNumber of the corresponding shard */ static List determineNewLeasesToCreate(final LeaseSynchronizer leaseSynchronizer, final List shards, - final List currentLeases, final InitialPositionInStreamExtended initialPosition,final Set inconsistentShardIds) { + final List currentLeases, final InitialPositionInStreamExtended initialPosition, + final Set inconsistentShardIds) { return determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, initialPosition, inconsistentShardIds, new MultiStreamArgs(false, null)); } @@ -499,11 +502,13 @@ public class HierarchicalShardSyncer { if (descendantParentShardIds.contains(parentShardId) && !initialPosition.getInitialPositionInStream() .equals(InitialPositionInStream.AT_TIMESTAMP)) { - log.info("Setting Lease '{}' checkpoint to 'TRIM_HORIZON'. Checkpoint was previously set to {}", lease.leaseKey(), lease.checkpoint()); + log.info("Setting Lease '{}' checkpoint to 'TRIM_HORIZON'. Checkpoint was previously set to {}", + lease.leaseKey(), lease.checkpoint()); lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); } else { final ExtendedSequenceNumber newCheckpoint = convertToCheckpoint(initialPosition); - log.info("Setting Lease '{}' checkpoint to '{}'. Checkpoint was previously set to {}", lease.leaseKey(), newCheckpoint, lease.checkpoint()); + log.info("Setting Lease '{}' checkpoint to '{}'. Checkpoint was previously set to {}", + lease.leaseKey(), newCheckpoint, lease.checkpoint()); lease.checkpoint(newCheckpoint); } } @@ -728,8 +733,8 @@ public class HierarchicalShardSyncer { @Override public int compare(final Lease lease1, final Lease lease2) { int result = 0; - final String shardId1 = shardIdFromLeaseDeducer.apply(lease1, multiStreamArgs); - final String shardId2 = shardIdFromLeaseDeducer.apply(lease2, multiStreamArgs); + final String shardId1 = SHARD_ID_FROM_LEASE_DEDUCER.apply(lease1, multiStreamArgs); + final String shardId2 = SHARD_ID_FROM_LEASE_DEDUCER.apply(lease2, multiStreamArgs); final Shard shard1 = shardIdToShardMap.get(shardId1); final Shard shard2 = shardIdToShardMap.get(shardId2); @@ -802,7 +807,7 @@ public class HierarchicalShardSyncer { final Map shardIdToShardMapOfAllKinesisShards = constructShardIdToShardMap(shards); currentLeases.stream().peek(lease -> log.debug("{} : Existing lease: {}", streamIdentifier, lease)) - .map(lease -> shardIdFromLeaseDeducer.apply(lease, multiStreamArgs)) + .map(lease -> SHARD_ID_FROM_LEASE_DEDUCER.apply(lease, multiStreamArgs)) .collect(Collectors.toSet()); final List newLeasesToCreate = getLeasesToCreateForOpenAndClosedShards(initialPosition, shards, multiStreamArgs, streamIdentifier); @@ -908,7 +913,7 @@ public class HierarchicalShardSyncer { .map(streamId -> streamId.serialize()).orElse(""); final Set shardIdsOfCurrentLeases = currentLeases.stream() .peek(lease -> log.debug("{} : Existing lease: {}", streamIdentifier, lease)) - .map(lease -> shardIdFromLeaseDeducer.apply(lease, multiStreamArgs)) + .map(lease -> SHARD_ID_FROM_LEASE_DEDUCER.apply(lease, multiStreamArgs)) .collect(Collectors.toSet()); final List openShards = getOpenShards(shards, streamIdentifier); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java index d62cd476..8a442bd3 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java @@ -179,7 +179,7 @@ public class LeaseCleanupManager { try { if (cleanupLeasesUponShardCompletion && timeToCheckForCompletedShard) { final Lease leaseFromDDB = leaseCoordinator.leaseRefresher().getLease(lease.leaseKey()); - if(leaseFromDDB != null) { + if (leaseFromDDB != null) { Set childShardKeys = leaseFromDDB.childShardIds(); if (CollectionUtils.isNullOrEmpty(childShardKeys)) { try { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java index d80799fa..9b95f562 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java @@ -310,7 +310,7 @@ public class LeaseManagementConfig { private LeaseManagementFactory leaseManagementFactory; public HierarchicalShardSyncer hierarchicalShardSyncer() { - if(hierarchicalShardSyncer == null) { + if (hierarchicalShardSyncer == null) { hierarchicalShardSyncer = new HierarchicalShardSyncer(); } return hierarchicalShardSyncer; @@ -356,7 +356,7 @@ public class LeaseManagementConfig { * @return LeaseManagementFactory */ public LeaseManagementFactory leaseManagementFactory(final LeaseSerializer leaseSerializer, boolean isMultiStreamingMode) { - if(leaseManagementFactory == null) { + if (leaseManagementFactory == null) { leaseManagementFactory = new DynamoDBLeaseManagementFactory(kinesisClient(), dynamoDBClient(), tableName(), diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java index dd576114..1986fa49 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java @@ -37,7 +37,7 @@ import software.amazon.kinesis.metrics.MetricsUtil; @Slf4j @KinesisClientInternalApi public class ShardSyncTask implements ConsumerTask { - private final String SHARD_SYNC_TASK_OPERATION = "ShardSyncTask"; + private static final String SHARD_SYNC_TASK_OPERATION = "ShardSyncTask"; @NonNull private final ShardDetector shardDetector; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java index a6887f40..4bef8442 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java @@ -187,7 +187,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { public boolean createLeaseTableIfNotExists(@NonNull final Long readCapacity, @NonNull final Long writeCapacity) throws ProvisionedThroughputException, DependencyException { final CreateTableRequest.Builder builder = createTableRequestBuilder(); - if(BillingMode.PROVISIONED.equals(billingMode)) { + if (BillingMode.PROVISIONED.equals(billingMode)) { ProvisionedThroughput throughput = ProvisionedThroughput.builder().readCapacityUnits(readCapacity) .writeCapacityUnits(writeCapacity).build(); builder.provisionedThroughput(throughput); @@ -467,7 +467,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { } catch (DynamoDbException | TimeoutException e) { throw convertAndRethrowExceptions("create", lease.leaseKey(), e); } - log.info("Created lease: {}",lease); + log.info("Created lease: {}", lease); return true; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java index 64a7840c..9ebed654 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java @@ -89,7 +89,7 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer { result.put(PENDING_CHECKPOINT_STATE_KEY, DynamoUtils.createAttributeValue(lease.checkpoint().subSequenceNumber())); } - if(lease.hashKeyRangeForLease() != null) { + if (lease.hashKeyRangeForLease() != null) { result.put(STARTING_HASH_KEY, DynamoUtils.createAttributeValue(lease.hashKeyRangeForLease().serializedStartingHashKey())); result.put(ENDING_HASH_KEY, DynamoUtils.createAttributeValue(lease.hashKeyRangeForLease().serializedEndingHashKey())); } @@ -274,7 +274,7 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer { result.put(CHILD_SHARD_IDS_KEY, putUpdate(DynamoUtils.createAttributeValue(lease.childShardIds()))); } - if(lease.hashKeyRangeForLease() != null) { + if (lease.hashKeyRangeForLease() != null) { result.put(STARTING_HASH_KEY, putUpdate(DynamoUtils.createAttributeValue(lease.hashKeyRangeForLease().serializedStartingHashKey()))); result.put(ENDING_HASH_KEY, putUpdate(DynamoUtils.createAttributeValue(lease.hashKeyRangeForLease().serializedEndingHashKey()))); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/CustomerApplicationException.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/CustomerApplicationException.java index ba97ab08..8f2e8149 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/CustomerApplicationException.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/CustomerApplicationException.java @@ -19,9 +19,9 @@ package software.amazon.kinesis.leases.exceptions; */ public class CustomerApplicationException extends Exception { - public CustomerApplicationException(Throwable e) { super(e);} + public CustomerApplicationException(Throwable e) { super(e); } - public CustomerApplicationException(String message, Throwable e) { super(message, e);} + public CustomerApplicationException(String message, Throwable e) { super(message, e); } - public CustomerApplicationException(String message) { super(message);} + public CustomerApplicationException(String message) { super(message); } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java index c3f9523d..fb398cda 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java @@ -212,8 +212,10 @@ public class ProcessTask implements ConsumerTask { log.debug("Calling application processRecords() with {} records from {}", records.size(), shardInfoId); - final ProcessRecordsInput processRecordsInput = ProcessRecordsInput.builder().records(records).cacheExitTime(input.cacheExitTime()).cacheEntryTime(input.cacheEntryTime()) - .isAtShardEnd(input.isAtShardEnd()).checkpointer(recordProcessorCheckpointer).millisBehindLatest(input.millisBehindLatest()).build(); + final ProcessRecordsInput processRecordsInput = ProcessRecordsInput.builder().records(records) + .cacheExitTime(input.cacheExitTime()).cacheEntryTime(input.cacheEntryTime()) + .isAtShardEnd(input.isAtShardEnd()).checkpointer(recordProcessorCheckpointer) + .millisBehindLatest(input.millisBehindLatest()).build(); final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, PROCESS_TASK_OPERATION); shardInfo.streamIdentifierSerOpt() diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java index e8406d92..7a034000 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java @@ -61,7 +61,7 @@ class ShardConsumerSubscriber implements Subscriber { @Deprecated ShardConsumerSubscriber(RecordsPublisher recordsPublisher, ExecutorService executorService, int bufferSize, ShardConsumer shardConsumer) { - this(recordsPublisher,executorService,bufferSize,shardConsumer, LifecycleConfig.DEFAULT_READ_TIMEOUTS_TO_IGNORE); + this(recordsPublisher, executorService, bufferSize, shardConsumer, LifecycleConfig.DEFAULT_READ_TIMEOUTS_TO_IGNORE); } ShardConsumerSubscriber(RecordsPublisher recordsPublisher, ExecutorService executorService, int bufferSize, @@ -74,7 +74,6 @@ class ShardConsumerSubscriber implements Subscriber { this.shardInfoId = ShardInfo.getLeaseKey(shardConsumer.shardInfo()); } - void startSubscriptions() { synchronized (lockObject) { // Setting the lastRequestTime to allow for health checks to restart subscriptions if they failed to diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index 31bc8f88..4a96d87d 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -283,7 +283,7 @@ public class ShutdownTask implements ConsumerTask { } } - for(ChildShard childShard : childShards) { + for (ChildShard childShard : childShards) { final String leaseKey = ShardInfo.getLeaseKey(shardInfo, childShard.shardId()); if (leaseRefresher.getLease(leaseKey) == null) { log.debug("{} - Shard {} - Attempting to create lease for child shard {}", shardDetector.streamIdentifier(), shardInfo.shardId(), leaseKey); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/metrics/CloudWatchMetricKey.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/metrics/CloudWatchMetricKey.java index 4b04cad7..e52893cd 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/metrics/CloudWatchMetricKey.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/metrics/CloudWatchMetricKey.java @@ -20,9 +20,7 @@ import java.util.Objects; import software.amazon.awssdk.services.cloudwatch.model.Dimension; import software.amazon.awssdk.services.cloudwatch.model.MetricDatum; - - -/* +/** * A representation of a key of a MetricDatum. This class is useful when wanting to compare * whether 2 keys have the same MetricDatum. This feature will be used in MetricAccumulatingQueue * where we aggregate metrics across multiple MetricScopes. @@ -48,12 +46,15 @@ public class CloudWatchMetricKey { @Override public boolean equals(Object obj) { - if (this == obj) + if (this == obj) { return true; - if (obj == null) + } + if (obj == null) { return false; - if (getClass() != obj.getClass()) + } + if (getClass() != obj.getClass()) { return false; + } CloudWatchMetricKey other = (CloudWatchMetricKey) obj; return Objects.equals(other.dimensions, dimensions) && Objects.equals(other.metricName, metricName); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/metrics/MetricDatumWithKey.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/metrics/MetricDatumWithKey.java index 5234ffe4..da83675f 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/metrics/MetricDatumWithKey.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/metrics/MetricDatumWithKey.java @@ -36,7 +36,6 @@ import java.util.Objects; * * MetricDatumWithKey sampleDatumWithKey = new MetricDatumWithKey(new * SampleMetricKey(System.currentTimeMillis()), datum) - * */ @AllArgsConstructor @Setter @@ -59,12 +58,15 @@ public class MetricDatumWithKey { @Override public boolean equals(Object obj) { - if (this == obj) + if (this == obj) { return true; - if (obj == null) + } + if (obj == null) { return false; - if (getClass() != obj.getClass()) + } + if (getClass() != obj.getClass()) { return false; + } MetricDatumWithKey other = (MetricDatumWithKey) obj; return Objects.equals(other.key, key) && Objects.equals(other.datum, datum); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java index 8404925d..3a8af8c2 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java @@ -192,7 +192,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { // Take action based on the time spent by the event in queue. takeDelayedDeliveryActionIfRequired(streamAndShardId, recordsRetrievedContext.getEnqueueTimestamp(), log); // Update current sequence number for the successfully delivered event. - currentSequenceNumber = ((FanoutRecordsRetrieved)recordsRetrieved).continuationSequenceNumber(); + currentSequenceNumber = ((FanoutRecordsRetrieved) recordsRetrieved).continuationSequenceNumber(); // Update the triggering flow for post scheduling upstream request. flowToBeReturned = recordsRetrievedContext.getRecordFlow(); // Try scheduling the next event in the queue or execute the subscription shutdown action. @@ -206,7 +206,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher { if (flow != null && recordsDeliveryAck.batchUniqueIdentifier().getFlowIdentifier() .equals(flow.getSubscribeToShardId())) { log.error( - "{}: Received unexpected ack for the active subscription {}. Throwing. ", streamAndShardId, recordsDeliveryAck.batchUniqueIdentifier().getFlowIdentifier()); + "{}: Received unexpected ack for the active subscription {}. Throwing.", + streamAndShardId, recordsDeliveryAck.batchUniqueIdentifier().getFlowIdentifier()); throw new IllegalStateException("Unexpected ack for the active subscription"); } // Otherwise publisher received a stale ack. @@ -315,7 +316,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { synchronized (lockObject) { if (!hasValidSubscriber()) { - if(hasValidFlow()) { + if (hasValidFlow()) { log.warn( "{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ {} id: {} -- Subscriber is null." + " Last successful request details -- {}", streamAndShardId, flow.connectionStartedAt, @@ -335,7 +336,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher { if (flow != null) { String logMessage = String.format( "%s: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ %s id: %s -- %s." + - " Last successful request details -- %s", streamAndShardId, flow.connectionStartedAt, flow.subscribeToShardId, category.throwableTypeString, lastSuccessfulRequestDetails); + " Last successful request details -- %s", streamAndShardId, flow.connectionStartedAt, + flow.subscribeToShardId, category.throwableTypeString, lastSuccessfulRequestDetails); switch (category.throwableType) { case READ_TIMEOUT: log.debug(logMessage, propagationThrowable); @@ -778,7 +780,11 @@ public class FanOutRecordsPublisher implements RecordsPublisher { executeExceptionOccurred(throwable); } else { final SubscriptionShutdownEvent subscriptionShutdownEvent = new SubscriptionShutdownEvent( - () -> {parent.recordsDeliveryQueue.poll(); executeExceptionOccurred(throwable);}, "onError", throwable); + () -> { + parent.recordsDeliveryQueue.poll(); + executeExceptionOccurred(throwable); + }, + "onError", throwable); tryEnqueueSubscriptionShutdownEvent(subscriptionShutdownEvent); } } @@ -786,7 +792,6 @@ public class FanOutRecordsPublisher implements RecordsPublisher { private void executeExceptionOccurred(Throwable throwable) { synchronized (parent.lockObject) { - log.debug("{}: [SubscriptionLifetime]: (RecordFlow#exceptionOccurred) @ {} id: {} -- {}: {}", parent.streamAndShardId, connectionStartedAt, subscribeToShardId, throwable.getClass().getName(), throwable.getMessage()); @@ -817,7 +822,11 @@ public class FanOutRecordsPublisher implements RecordsPublisher { executeComplete(); } else { final SubscriptionShutdownEvent subscriptionShutdownEvent = new SubscriptionShutdownEvent( - () -> {parent.recordsDeliveryQueue.poll(); executeComplete();}, "onComplete"); + () -> { + parent.recordsDeliveryQueue.poll(); + executeComplete(); + }, + "onComplete"); tryEnqueueSubscriptionShutdownEvent(subscriptionShutdownEvent); } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java index 35301624..bcfb1081 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java @@ -54,7 +54,7 @@ public class FanOutRetrievalFactory implements RetrievalFactory { final StreamConfig streamConfig, final MetricsFactory metricsFactory) { final Optional streamIdentifierStr = shardInfo.streamIdentifierSerOpt(); - if(streamIdentifierStr.isPresent()) { + if (streamIdentifierStr.isPresent()) { final StreamIdentifier streamIdentifier = StreamIdentifier.multiStreamInstance(streamIdentifierStr.get()); return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(), getOrCreateConsumerArn(streamIdentifier, streamConfig.consumerArn()), diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PollingConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PollingConfig.java index 4dd64016..0cc7058d 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PollingConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PollingConfig.java @@ -137,7 +137,7 @@ public class PollingConfig implements RetrievalSpecificConfig { @Override public RetrievalFactory retrievalFactory() { // Prioritize the PollingConfig specified value if its updated. - if(usePollingConfigIdleTimeValue) { + if (usePollingConfigIdleTimeValue) { recordsFetcherFactory.idleMillisBetweenCalls(idleTimeBetweenReadsInMillis); } return new SynchronousBlockingRetrievalFactory(streamName(), kinesisClient(), recordsFetcherFactory, diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java index 07f4aaac..eb5937f7 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java @@ -327,7 +327,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { } resetLock.writeLock().lock(); try { - publisherSession.reset((PrefetchRecordsRetrieved)recordsRetrieved); + publisherSession.reset((PrefetchRecordsRetrieved) recordsRetrieved); wasReset = true; } finally { resetLock.writeLock().unlock(); @@ -555,7 +555,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { return; } // Add a sleep if lastSuccessfulCall is still null but this is not the first try to avoid retry storm - if(lastSuccessfulCall == null) { + if (lastSuccessfulCall == null) { Thread.sleep(idleMillisBetweenCalls); return; }