Code cleanup to faciliate Checkstyle enforcement. (#1148)

No functional change.
This commit is contained in:
stair 2023-06-23 14:58:10 -04:00 committed by GitHub
parent 53dbb4ea79
commit 3d6800874c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
23 changed files with 101 additions and 77 deletions

View file

@ -144,7 +144,8 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi
* {@inheritDoc} * {@inheritDoc}
*/ */
@Override @Override
public PreparedCheckpointer prepareCheckpoint(byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException { public PreparedCheckpointer prepareCheckpoint(byte[] applicationState)
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
return prepareCheckpoint(largestPermittedCheckpointValue.sequenceNumber(), applicationState); return prepareCheckpoint(largestPermittedCheckpointValue.sequenceNumber(), applicationState);
} }
@ -152,7 +153,8 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi
* {@inheritDoc} * {@inheritDoc}
*/ */
@Override @Override
public PreparedCheckpointer prepareCheckpoint(Record record, byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException { public PreparedCheckpointer prepareCheckpoint(Record record, byte[] applicationState)
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
// //
// TODO: UserRecord Deprecation // TODO: UserRecord Deprecation
// //

View file

@ -103,7 +103,8 @@ public class DynamoDBCheckpointer implements Checkpointer {
} }
@Override @Override
public void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken, byte[] pendingCheckpointState) throws KinesisClientLibException { public void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken,
byte[] pendingCheckpointState) throws KinesisClientLibException {
try { try {
boolean wasSuccessful = boolean wasSuccessful =
prepareCheckpoint(leaseKey, pendingCheckpoint, UUID.fromString(concurrencyToken), pendingCheckpointState); prepareCheckpoint(leaseKey, pendingCheckpoint, UUID.fromString(concurrencyToken), pendingCheckpointState);

View file

@ -23,10 +23,11 @@ import software.amazon.awssdk.services.kinesis.model.HashKeyRange;
import java.math.BigInteger; import java.math.BigInteger;
@Value @Accessors(fluent = true)
/** /**
* Lease POJO to hold the starting hashkey range and ending hashkey range of kinesis shards. * Lease POJO to hold the starting hashkey range and ending hashkey range of kinesis shards.
*/ */
@Accessors(fluent = true)
@Value
public class HashKeyRangeForLease { public class HashKeyRangeForLease {
private final BigInteger startingHashKey; private final BigInteger startingHashKey;

View file

@ -348,7 +348,7 @@ class PeriodicShardSyncManager {
((MultiStreamLease) lease).shardId() : ((MultiStreamLease) lease).shardId() :
lease.leaseKey(); lease.leaseKey();
final Shard shard = kinesisShards.get(shardId); final Shard shard = kinesisShards.get(shardId);
if(shard == null) { if (shard == null) {
return lease; return lease;
} }
lease.hashKeyRange(fromHashKeyRange(shard.hashKeyRange())); lease.hashKeyRange(fromHashKeyRange(shard.hashKeyRange()));
@ -372,7 +372,7 @@ class PeriodicShardSyncManager {
List<Lease> leasesWithHashKeyRanges) { List<Lease> leasesWithHashKeyRanges) {
// Sort the hash ranges by starting hash key. // Sort the hash ranges by starting hash key.
List<Lease> sortedLeasesWithHashKeyRanges = sortLeasesByHashRange(leasesWithHashKeyRanges); List<Lease> sortedLeasesWithHashKeyRanges = sortLeasesByHashRange(leasesWithHashKeyRanges);
if(sortedLeasesWithHashKeyRanges.isEmpty()) { if (sortedLeasesWithHashKeyRanges.isEmpty()) {
log.error("No leases with valid hashranges found for stream {}", streamIdentifier); log.error("No leases with valid hashranges found for stream {}", streamIdentifier);
return Optional.of(new HashRangeHole()); return Optional.of(new HashRangeHole());
} }
@ -417,8 +417,9 @@ class PeriodicShardSyncManager {
@VisibleForTesting @VisibleForTesting
static List<Lease> sortLeasesByHashRange(List<Lease> leasesWithHashKeyRanges) { static List<Lease> sortLeasesByHashRange(List<Lease> leasesWithHashKeyRanges) {
if (leasesWithHashKeyRanges.size() == 0 || leasesWithHashKeyRanges.size() == 1) if (leasesWithHashKeyRanges.size() == 0 || leasesWithHashKeyRanges.size() == 1) {
return leasesWithHashKeyRanges; return leasesWithHashKeyRanges;
}
Collections.sort(leasesWithHashKeyRanges, new HashKeyRangeComparator()); Collections.sort(leasesWithHashKeyRanges, new HashKeyRangeComparator());
return leasesWithHashKeyRanges; return leasesWithHashKeyRanges;
} }

View file

@ -544,7 +544,8 @@ public class Scheduler implements Runnable {
final Map<Boolean, Set<StreamIdentifier>> staleStreamIdDeletionDecisionMap = staleStreamDeletionMap.keySet().stream().collect(Collectors final Map<Boolean, Set<StreamIdentifier>> staleStreamIdDeletionDecisionMap = staleStreamDeletionMap.keySet().stream().collect(Collectors
.partitioningBy(newStreamConfigMap::containsKey, Collectors.toSet())); .partitioningBy(newStreamConfigMap::containsKey, Collectors.toSet()));
final Set<StreamIdentifier> staleStreamIdsToBeDeleted = staleStreamIdDeletionDecisionMap.get(false).stream().filter(streamIdentifier -> final Set<StreamIdentifier> staleStreamIdsToBeDeleted = staleStreamIdDeletionDecisionMap.get(false).stream().filter(streamIdentifier ->
Duration.between(staleStreamDeletionMap.get(streamIdentifier), Instant.now()).toMillis() >= waitPeriodToDeleteOldStreams.toMillis()).collect(Collectors.toSet()); Duration.between(staleStreamDeletionMap.get(streamIdentifier), Instant.now()).toMillis() >= waitPeriodToDeleteOldStreams.toMillis())
.collect(Collectors.toSet());
// These are the streams which are deleted in Kinesis and we encounter resource not found during // These are the streams which are deleted in Kinesis and we encounter resource not found during
// shardSyncTask. This is applicable in MultiStreamMode only, in case of SingleStreamMode, store will // shardSyncTask. This is applicable in MultiStreamMode only, in case of SingleStreamMode, store will
// not have any data. // not have any data.
@ -611,7 +612,7 @@ public class Scheduler implements Runnable {
} }
private void removeStreamsFromStaleStreamsList(Set<StreamIdentifier> streamIdentifiers) { private void removeStreamsFromStaleStreamsList(Set<StreamIdentifier> streamIdentifiers) {
for(StreamIdentifier streamIdentifier : streamIdentifiers) { for (StreamIdentifier streamIdentifier : streamIdentifiers) {
staleStreamDeletionMap.remove(streamIdentifier); staleStreamDeletionMap.remove(streamIdentifier);
} }
} }

View file

@ -80,7 +80,7 @@ public class HierarchicalShardSyncer {
private static final String MIN_HASH_KEY = BigInteger.ZERO.toString(); private static final String MIN_HASH_KEY = BigInteger.ZERO.toString();
private static final String MAX_HASH_KEY = new BigInteger("2").pow(128).subtract(BigInteger.ONE).toString(); private static final String MAX_HASH_KEY = new BigInteger("2").pow(128).subtract(BigInteger.ONE).toString();
private static final int retriesForCompleteHashRange = 3; private static final int RETRIES_FOR_COMPLETE_HASH_RANGE = 3;
private static final long DELAY_BETWEEN_LIST_SHARDS_MILLIS = 1000; private static final long DELAY_BETWEEN_LIST_SHARDS_MILLIS = 1000;
@ -98,7 +98,7 @@ public class HierarchicalShardSyncer {
this.deletedStreamListProvider = deletedStreamListProvider; this.deletedStreamListProvider = deletedStreamListProvider;
} }
private static final BiFunction<Lease, MultiStreamArgs, String> shardIdFromLeaseDeducer = private static final BiFunction<Lease, MultiStreamArgs, String> SHARD_ID_FROM_LEASE_DEDUCER =
(lease, multiStreamArgs) -> (lease, multiStreamArgs) ->
multiStreamArgs.isMultiStreamMode() ? multiStreamArgs.isMultiStreamMode() ?
((MultiStreamLease) lease).shardId() : ((MultiStreamLease) lease).shardId() :
@ -129,7 +129,9 @@ public class HierarchicalShardSyncer {
isLeaseTableEmpty); isLeaseTableEmpty);
} }
//Provide a pre-collcted list of shards to avoid calling ListShards API /**
* Provide a pre-collected list of shards to avoid calling ListShards API
*/
public synchronized boolean checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector, public synchronized boolean checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector,
final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition, final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition,
List<Shard> latestShards, final boolean ignoreUnexpectedChildShards, final MetricsScope scope, final boolean isLeaseTableEmpty) List<Shard> latestShards, final boolean ignoreUnexpectedChildShards, final MetricsScope scope, final boolean isLeaseTableEmpty)
@ -163,7 +165,7 @@ public class HierarchicalShardSyncer {
final long startTime = System.currentTimeMillis(); final long startTime = System.currentTimeMillis();
boolean success = false; boolean success = false;
try { try {
if(leaseRefresher.createLeaseIfNotExists(lease)) { if (leaseRefresher.createLeaseIfNotExists(lease)) {
createdLeases.add(lease); createdLeases.add(lease);
} }
success = true; success = true;
@ -268,7 +270,7 @@ public class HierarchicalShardSyncer {
List<Shard> shards; List<Shard> shards;
for (int i = 0; i < retriesForCompleteHashRange; i++) { for (int i = 0; i < RETRIES_FOR_COMPLETE_HASH_RANGE; i++) {
shards = shardDetector.listShardsWithFilter(shardFilter); shards = shardDetector.listShardsWithFilter(shardFilter);
if (shards == null) { if (shards == null) {
@ -284,7 +286,7 @@ public class HierarchicalShardSyncer {
} }
throw new KinesisClientLibIOException("Hash range of shards returned for " + streamName + " was incomplete after " throw new KinesisClientLibIOException("Hash range of shards returned for " + streamName + " was incomplete after "
+ retriesForCompleteHashRange + " retries."); + RETRIES_FOR_COMPLETE_HASH_RANGE + " retries.");
} }
private List<Shard> getShardList(@NonNull final ShardDetector shardDetector) throws KinesisClientLibIOException { private List<Shard> getShardList(@NonNull final ShardDetector shardDetector) throws KinesisClientLibIOException {
@ -365,7 +367,8 @@ public class HierarchicalShardSyncer {
* @return List of new leases to create sorted by starting sequenceNumber of the corresponding shard * @return List of new leases to create sorted by starting sequenceNumber of the corresponding shard
*/ */
static List<Lease> determineNewLeasesToCreate(final LeaseSynchronizer leaseSynchronizer, final List<Shard> shards, static List<Lease> determineNewLeasesToCreate(final LeaseSynchronizer leaseSynchronizer, final List<Shard> shards,
final List<Lease> currentLeases, final InitialPositionInStreamExtended initialPosition,final Set<String> inconsistentShardIds) { final List<Lease> currentLeases, final InitialPositionInStreamExtended initialPosition,
final Set<String> inconsistentShardIds) {
return determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, initialPosition, inconsistentShardIds, return determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, initialPosition, inconsistentShardIds,
new MultiStreamArgs(false, null)); new MultiStreamArgs(false, null));
} }
@ -499,11 +502,13 @@ public class HierarchicalShardSyncer {
if (descendantParentShardIds.contains(parentShardId) if (descendantParentShardIds.contains(parentShardId)
&& !initialPosition.getInitialPositionInStream() && !initialPosition.getInitialPositionInStream()
.equals(InitialPositionInStream.AT_TIMESTAMP)) { .equals(InitialPositionInStream.AT_TIMESTAMP)) {
log.info("Setting Lease '{}' checkpoint to 'TRIM_HORIZON'. Checkpoint was previously set to {}", lease.leaseKey(), lease.checkpoint()); log.info("Setting Lease '{}' checkpoint to 'TRIM_HORIZON'. Checkpoint was previously set to {}",
lease.leaseKey(), lease.checkpoint());
lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
} else { } else {
final ExtendedSequenceNumber newCheckpoint = convertToCheckpoint(initialPosition); final ExtendedSequenceNumber newCheckpoint = convertToCheckpoint(initialPosition);
log.info("Setting Lease '{}' checkpoint to '{}'. Checkpoint was previously set to {}", lease.leaseKey(), newCheckpoint, lease.checkpoint()); log.info("Setting Lease '{}' checkpoint to '{}'. Checkpoint was previously set to {}",
lease.leaseKey(), newCheckpoint, lease.checkpoint());
lease.checkpoint(newCheckpoint); lease.checkpoint(newCheckpoint);
} }
} }
@ -728,8 +733,8 @@ public class HierarchicalShardSyncer {
@Override @Override
public int compare(final Lease lease1, final Lease lease2) { public int compare(final Lease lease1, final Lease lease2) {
int result = 0; int result = 0;
final String shardId1 = shardIdFromLeaseDeducer.apply(lease1, multiStreamArgs); final String shardId1 = SHARD_ID_FROM_LEASE_DEDUCER.apply(lease1, multiStreamArgs);
final String shardId2 = shardIdFromLeaseDeducer.apply(lease2, multiStreamArgs); final String shardId2 = SHARD_ID_FROM_LEASE_DEDUCER.apply(lease2, multiStreamArgs);
final Shard shard1 = shardIdToShardMap.get(shardId1); final Shard shard1 = shardIdToShardMap.get(shardId1);
final Shard shard2 = shardIdToShardMap.get(shardId2); final Shard shard2 = shardIdToShardMap.get(shardId2);
@ -802,7 +807,7 @@ public class HierarchicalShardSyncer {
final Map<String, Shard> shardIdToShardMapOfAllKinesisShards = constructShardIdToShardMap(shards); final Map<String, Shard> shardIdToShardMapOfAllKinesisShards = constructShardIdToShardMap(shards);
currentLeases.stream().peek(lease -> log.debug("{} : Existing lease: {}", streamIdentifier, lease)) currentLeases.stream().peek(lease -> log.debug("{} : Existing lease: {}", streamIdentifier, lease))
.map(lease -> shardIdFromLeaseDeducer.apply(lease, multiStreamArgs)) .map(lease -> SHARD_ID_FROM_LEASE_DEDUCER.apply(lease, multiStreamArgs))
.collect(Collectors.toSet()); .collect(Collectors.toSet());
final List<Lease> newLeasesToCreate = getLeasesToCreateForOpenAndClosedShards(initialPosition, shards, multiStreamArgs, streamIdentifier); final List<Lease> newLeasesToCreate = getLeasesToCreateForOpenAndClosedShards(initialPosition, shards, multiStreamArgs, streamIdentifier);
@ -908,7 +913,7 @@ public class HierarchicalShardSyncer {
.map(streamId -> streamId.serialize()).orElse(""); .map(streamId -> streamId.serialize()).orElse("");
final Set<String> shardIdsOfCurrentLeases = currentLeases.stream() final Set<String> shardIdsOfCurrentLeases = currentLeases.stream()
.peek(lease -> log.debug("{} : Existing lease: {}", streamIdentifier, lease)) .peek(lease -> log.debug("{} : Existing lease: {}", streamIdentifier, lease))
.map(lease -> shardIdFromLeaseDeducer.apply(lease, multiStreamArgs)) .map(lease -> SHARD_ID_FROM_LEASE_DEDUCER.apply(lease, multiStreamArgs))
.collect(Collectors.toSet()); .collect(Collectors.toSet());
final List<Shard> openShards = getOpenShards(shards, streamIdentifier); final List<Shard> openShards = getOpenShards(shards, streamIdentifier);

View file

@ -179,7 +179,7 @@ public class LeaseCleanupManager {
try { try {
if (cleanupLeasesUponShardCompletion && timeToCheckForCompletedShard) { if (cleanupLeasesUponShardCompletion && timeToCheckForCompletedShard) {
final Lease leaseFromDDB = leaseCoordinator.leaseRefresher().getLease(lease.leaseKey()); final Lease leaseFromDDB = leaseCoordinator.leaseRefresher().getLease(lease.leaseKey());
if(leaseFromDDB != null) { if (leaseFromDDB != null) {
Set<String> childShardKeys = leaseFromDDB.childShardIds(); Set<String> childShardKeys = leaseFromDDB.childShardIds();
if (CollectionUtils.isNullOrEmpty(childShardKeys)) { if (CollectionUtils.isNullOrEmpty(childShardKeys)) {
try { try {

View file

@ -310,7 +310,7 @@ public class LeaseManagementConfig {
private LeaseManagementFactory leaseManagementFactory; private LeaseManagementFactory leaseManagementFactory;
public HierarchicalShardSyncer hierarchicalShardSyncer() { public HierarchicalShardSyncer hierarchicalShardSyncer() {
if(hierarchicalShardSyncer == null) { if (hierarchicalShardSyncer == null) {
hierarchicalShardSyncer = new HierarchicalShardSyncer(); hierarchicalShardSyncer = new HierarchicalShardSyncer();
} }
return hierarchicalShardSyncer; return hierarchicalShardSyncer;
@ -356,7 +356,7 @@ public class LeaseManagementConfig {
* @return LeaseManagementFactory * @return LeaseManagementFactory
*/ */
public LeaseManagementFactory leaseManagementFactory(final LeaseSerializer leaseSerializer, boolean isMultiStreamingMode) { public LeaseManagementFactory leaseManagementFactory(final LeaseSerializer leaseSerializer, boolean isMultiStreamingMode) {
if(leaseManagementFactory == null) { if (leaseManagementFactory == null) {
leaseManagementFactory = new DynamoDBLeaseManagementFactory(kinesisClient(), leaseManagementFactory = new DynamoDBLeaseManagementFactory(kinesisClient(),
dynamoDBClient(), dynamoDBClient(),
tableName(), tableName(),

View file

@ -37,7 +37,7 @@ import software.amazon.kinesis.metrics.MetricsUtil;
@Slf4j @Slf4j
@KinesisClientInternalApi @KinesisClientInternalApi
public class ShardSyncTask implements ConsumerTask { public class ShardSyncTask implements ConsumerTask {
private final String SHARD_SYNC_TASK_OPERATION = "ShardSyncTask"; private static final String SHARD_SYNC_TASK_OPERATION = "ShardSyncTask";
@NonNull @NonNull
private final ShardDetector shardDetector; private final ShardDetector shardDetector;

View file

@ -187,7 +187,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
public boolean createLeaseTableIfNotExists(@NonNull final Long readCapacity, @NonNull final Long writeCapacity) public boolean createLeaseTableIfNotExists(@NonNull final Long readCapacity, @NonNull final Long writeCapacity)
throws ProvisionedThroughputException, DependencyException { throws ProvisionedThroughputException, DependencyException {
final CreateTableRequest.Builder builder = createTableRequestBuilder(); final CreateTableRequest.Builder builder = createTableRequestBuilder();
if(BillingMode.PROVISIONED.equals(billingMode)) { if (BillingMode.PROVISIONED.equals(billingMode)) {
ProvisionedThroughput throughput = ProvisionedThroughput.builder().readCapacityUnits(readCapacity) ProvisionedThroughput throughput = ProvisionedThroughput.builder().readCapacityUnits(readCapacity)
.writeCapacityUnits(writeCapacity).build(); .writeCapacityUnits(writeCapacity).build();
builder.provisionedThroughput(throughput); builder.provisionedThroughput(throughput);
@ -467,7 +467,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
} catch (DynamoDbException | TimeoutException e) { } catch (DynamoDbException | TimeoutException e) {
throw convertAndRethrowExceptions("create", lease.leaseKey(), e); throw convertAndRethrowExceptions("create", lease.leaseKey(), e);
} }
log.info("Created lease: {}",lease); log.info("Created lease: {}", lease);
return true; return true;
} }

View file

@ -89,7 +89,7 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer {
result.put(PENDING_CHECKPOINT_STATE_KEY, DynamoUtils.createAttributeValue(lease.checkpoint().subSequenceNumber())); result.put(PENDING_CHECKPOINT_STATE_KEY, DynamoUtils.createAttributeValue(lease.checkpoint().subSequenceNumber()));
} }
if(lease.hashKeyRangeForLease() != null) { if (lease.hashKeyRangeForLease() != null) {
result.put(STARTING_HASH_KEY, DynamoUtils.createAttributeValue(lease.hashKeyRangeForLease().serializedStartingHashKey())); result.put(STARTING_HASH_KEY, DynamoUtils.createAttributeValue(lease.hashKeyRangeForLease().serializedStartingHashKey()));
result.put(ENDING_HASH_KEY, DynamoUtils.createAttributeValue(lease.hashKeyRangeForLease().serializedEndingHashKey())); result.put(ENDING_HASH_KEY, DynamoUtils.createAttributeValue(lease.hashKeyRangeForLease().serializedEndingHashKey()));
} }
@ -274,7 +274,7 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer {
result.put(CHILD_SHARD_IDS_KEY, putUpdate(DynamoUtils.createAttributeValue(lease.childShardIds()))); result.put(CHILD_SHARD_IDS_KEY, putUpdate(DynamoUtils.createAttributeValue(lease.childShardIds())));
} }
if(lease.hashKeyRangeForLease() != null) { if (lease.hashKeyRangeForLease() != null) {
result.put(STARTING_HASH_KEY, putUpdate(DynamoUtils.createAttributeValue(lease.hashKeyRangeForLease().serializedStartingHashKey()))); result.put(STARTING_HASH_KEY, putUpdate(DynamoUtils.createAttributeValue(lease.hashKeyRangeForLease().serializedStartingHashKey())));
result.put(ENDING_HASH_KEY, putUpdate(DynamoUtils.createAttributeValue(lease.hashKeyRangeForLease().serializedEndingHashKey()))); result.put(ENDING_HASH_KEY, putUpdate(DynamoUtils.createAttributeValue(lease.hashKeyRangeForLease().serializedEndingHashKey())));
} }

View file

@ -19,9 +19,9 @@ package software.amazon.kinesis.leases.exceptions;
*/ */
public class CustomerApplicationException extends Exception { public class CustomerApplicationException extends Exception {
public CustomerApplicationException(Throwable e) { super(e);} public CustomerApplicationException(Throwable e) { super(e); }
public CustomerApplicationException(String message, Throwable e) { super(message, e);} public CustomerApplicationException(String message, Throwable e) { super(message, e); }
public CustomerApplicationException(String message) { super(message);} public CustomerApplicationException(String message) { super(message); }
} }

View file

@ -212,8 +212,10 @@ public class ProcessTask implements ConsumerTask {
log.debug("Calling application processRecords() with {} records from {}", records.size(), log.debug("Calling application processRecords() with {} records from {}", records.size(),
shardInfoId); shardInfoId);
final ProcessRecordsInput processRecordsInput = ProcessRecordsInput.builder().records(records).cacheExitTime(input.cacheExitTime()).cacheEntryTime(input.cacheEntryTime()) final ProcessRecordsInput processRecordsInput = ProcessRecordsInput.builder().records(records)
.isAtShardEnd(input.isAtShardEnd()).checkpointer(recordProcessorCheckpointer).millisBehindLatest(input.millisBehindLatest()).build(); .cacheExitTime(input.cacheExitTime()).cacheEntryTime(input.cacheEntryTime())
.isAtShardEnd(input.isAtShardEnd()).checkpointer(recordProcessorCheckpointer)
.millisBehindLatest(input.millisBehindLatest()).build();
final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, PROCESS_TASK_OPERATION); final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, PROCESS_TASK_OPERATION);
shardInfo.streamIdentifierSerOpt() shardInfo.streamIdentifierSerOpt()

View file

@ -61,7 +61,7 @@ class ShardConsumerSubscriber implements Subscriber<RecordsRetrieved> {
@Deprecated @Deprecated
ShardConsumerSubscriber(RecordsPublisher recordsPublisher, ExecutorService executorService, int bufferSize, ShardConsumerSubscriber(RecordsPublisher recordsPublisher, ExecutorService executorService, int bufferSize,
ShardConsumer shardConsumer) { ShardConsumer shardConsumer) {
this(recordsPublisher,executorService,bufferSize,shardConsumer, LifecycleConfig.DEFAULT_READ_TIMEOUTS_TO_IGNORE); this(recordsPublisher, executorService, bufferSize, shardConsumer, LifecycleConfig.DEFAULT_READ_TIMEOUTS_TO_IGNORE);
} }
ShardConsumerSubscriber(RecordsPublisher recordsPublisher, ExecutorService executorService, int bufferSize, ShardConsumerSubscriber(RecordsPublisher recordsPublisher, ExecutorService executorService, int bufferSize,
@ -74,7 +74,6 @@ class ShardConsumerSubscriber implements Subscriber<RecordsRetrieved> {
this.shardInfoId = ShardInfo.getLeaseKey(shardConsumer.shardInfo()); this.shardInfoId = ShardInfo.getLeaseKey(shardConsumer.shardInfo());
} }
void startSubscriptions() { void startSubscriptions() {
synchronized (lockObject) { synchronized (lockObject) {
// Setting the lastRequestTime to allow for health checks to restart subscriptions if they failed to // Setting the lastRequestTime to allow for health checks to restart subscriptions if they failed to

View file

@ -283,7 +283,7 @@ public class ShutdownTask implements ConsumerTask {
} }
} }
for(ChildShard childShard : childShards) { for (ChildShard childShard : childShards) {
final String leaseKey = ShardInfo.getLeaseKey(shardInfo, childShard.shardId()); final String leaseKey = ShardInfo.getLeaseKey(shardInfo, childShard.shardId());
if (leaseRefresher.getLease(leaseKey) == null) { if (leaseRefresher.getLease(leaseKey) == null) {
log.debug("{} - Shard {} - Attempting to create lease for child shard {}", shardDetector.streamIdentifier(), shardInfo.shardId(), leaseKey); log.debug("{} - Shard {} - Attempting to create lease for child shard {}", shardDetector.streamIdentifier(), shardInfo.shardId(), leaseKey);

View file

@ -20,9 +20,7 @@ import java.util.Objects;
import software.amazon.awssdk.services.cloudwatch.model.Dimension; import software.amazon.awssdk.services.cloudwatch.model.Dimension;
import software.amazon.awssdk.services.cloudwatch.model.MetricDatum; import software.amazon.awssdk.services.cloudwatch.model.MetricDatum;
/**
/*
* A representation of a key of a MetricDatum. This class is useful when wanting to compare * A representation of a key of a MetricDatum. This class is useful when wanting to compare
* whether 2 keys have the same MetricDatum. This feature will be used in MetricAccumulatingQueue * whether 2 keys have the same MetricDatum. This feature will be used in MetricAccumulatingQueue
* where we aggregate metrics across multiple MetricScopes. * where we aggregate metrics across multiple MetricScopes.
@ -48,12 +46,15 @@ public class CloudWatchMetricKey {
@Override @Override
public boolean equals(Object obj) { public boolean equals(Object obj) {
if (this == obj) if (this == obj) {
return true; return true;
if (obj == null) }
if (obj == null) {
return false; return false;
if (getClass() != obj.getClass()) }
if (getClass() != obj.getClass()) {
return false; return false;
}
CloudWatchMetricKey other = (CloudWatchMetricKey) obj; CloudWatchMetricKey other = (CloudWatchMetricKey) obj;
return Objects.equals(other.dimensions, dimensions) && Objects.equals(other.metricName, metricName); return Objects.equals(other.dimensions, dimensions) && Objects.equals(other.metricName, metricName);
} }

View file

@ -36,7 +36,6 @@ import java.util.Objects;
* *
* MetricDatumWithKey<SampleMetricKey> sampleDatumWithKey = new MetricDatumWithKey<SampleMetricKey>(new * MetricDatumWithKey<SampleMetricKey> sampleDatumWithKey = new MetricDatumWithKey<SampleMetricKey>(new
* SampleMetricKey(System.currentTimeMillis()), datum) * SampleMetricKey(System.currentTimeMillis()), datum)
*
*/ */
@AllArgsConstructor @AllArgsConstructor
@Setter @Setter
@ -59,12 +58,15 @@ public class MetricDatumWithKey<KeyType> {
@Override @Override
public boolean equals(Object obj) { public boolean equals(Object obj) {
if (this == obj) if (this == obj) {
return true; return true;
if (obj == null) }
if (obj == null) {
return false; return false;
if (getClass() != obj.getClass()) }
if (getClass() != obj.getClass()) {
return false; return false;
}
MetricDatumWithKey<?> other = (MetricDatumWithKey<?>) obj; MetricDatumWithKey<?> other = (MetricDatumWithKey<?>) obj;
return Objects.equals(other.key, key) && Objects.equals(other.datum, datum); return Objects.equals(other.key, key) && Objects.equals(other.datum, datum);
} }

View file

@ -192,7 +192,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
// Take action based on the time spent by the event in queue. // Take action based on the time spent by the event in queue.
takeDelayedDeliveryActionIfRequired(streamAndShardId, recordsRetrievedContext.getEnqueueTimestamp(), log); takeDelayedDeliveryActionIfRequired(streamAndShardId, recordsRetrievedContext.getEnqueueTimestamp(), log);
// Update current sequence number for the successfully delivered event. // Update current sequence number for the successfully delivered event.
currentSequenceNumber = ((FanoutRecordsRetrieved)recordsRetrieved).continuationSequenceNumber(); currentSequenceNumber = ((FanoutRecordsRetrieved) recordsRetrieved).continuationSequenceNumber();
// Update the triggering flow for post scheduling upstream request. // Update the triggering flow for post scheduling upstream request.
flowToBeReturned = recordsRetrievedContext.getRecordFlow(); flowToBeReturned = recordsRetrievedContext.getRecordFlow();
// Try scheduling the next event in the queue or execute the subscription shutdown action. // Try scheduling the next event in the queue or execute the subscription shutdown action.
@ -206,7 +206,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
if (flow != null && recordsDeliveryAck.batchUniqueIdentifier().getFlowIdentifier() if (flow != null && recordsDeliveryAck.batchUniqueIdentifier().getFlowIdentifier()
.equals(flow.getSubscribeToShardId())) { .equals(flow.getSubscribeToShardId())) {
log.error( log.error(
"{}: Received unexpected ack for the active subscription {}. Throwing. ", streamAndShardId, recordsDeliveryAck.batchUniqueIdentifier().getFlowIdentifier()); "{}: Received unexpected ack for the active subscription {}. Throwing.",
streamAndShardId, recordsDeliveryAck.batchUniqueIdentifier().getFlowIdentifier());
throw new IllegalStateException("Unexpected ack for the active subscription"); throw new IllegalStateException("Unexpected ack for the active subscription");
} }
// Otherwise publisher received a stale ack. // Otherwise publisher received a stale ack.
@ -315,7 +316,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
synchronized (lockObject) { synchronized (lockObject) {
if (!hasValidSubscriber()) { if (!hasValidSubscriber()) {
if(hasValidFlow()) { if (hasValidFlow()) {
log.warn( log.warn(
"{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ {} id: {} -- Subscriber is null." + "{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ {} id: {} -- Subscriber is null." +
" Last successful request details -- {}", streamAndShardId, flow.connectionStartedAt, " Last successful request details -- {}", streamAndShardId, flow.connectionStartedAt,
@ -335,7 +336,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
if (flow != null) { if (flow != null) {
String logMessage = String.format( String logMessage = String.format(
"%s: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ %s id: %s -- %s." + "%s: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ %s id: %s -- %s." +
" Last successful request details -- %s", streamAndShardId, flow.connectionStartedAt, flow.subscribeToShardId, category.throwableTypeString, lastSuccessfulRequestDetails); " Last successful request details -- %s", streamAndShardId, flow.connectionStartedAt,
flow.subscribeToShardId, category.throwableTypeString, lastSuccessfulRequestDetails);
switch (category.throwableType) { switch (category.throwableType) {
case READ_TIMEOUT: case READ_TIMEOUT:
log.debug(logMessage, propagationThrowable); log.debug(logMessage, propagationThrowable);
@ -778,7 +780,11 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
executeExceptionOccurred(throwable); executeExceptionOccurred(throwable);
} else { } else {
final SubscriptionShutdownEvent subscriptionShutdownEvent = new SubscriptionShutdownEvent( final SubscriptionShutdownEvent subscriptionShutdownEvent = new SubscriptionShutdownEvent(
() -> {parent.recordsDeliveryQueue.poll(); executeExceptionOccurred(throwable);}, "onError", throwable); () -> {
parent.recordsDeliveryQueue.poll();
executeExceptionOccurred(throwable);
},
"onError", throwable);
tryEnqueueSubscriptionShutdownEvent(subscriptionShutdownEvent); tryEnqueueSubscriptionShutdownEvent(subscriptionShutdownEvent);
} }
} }
@ -786,7 +792,6 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
private void executeExceptionOccurred(Throwable throwable) { private void executeExceptionOccurred(Throwable throwable) {
synchronized (parent.lockObject) { synchronized (parent.lockObject) {
log.debug("{}: [SubscriptionLifetime]: (RecordFlow#exceptionOccurred) @ {} id: {} -- {}: {}", log.debug("{}: [SubscriptionLifetime]: (RecordFlow#exceptionOccurred) @ {} id: {} -- {}: {}",
parent.streamAndShardId, connectionStartedAt, subscribeToShardId, throwable.getClass().getName(), parent.streamAndShardId, connectionStartedAt, subscribeToShardId, throwable.getClass().getName(),
throwable.getMessage()); throwable.getMessage());
@ -817,7 +822,11 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
executeComplete(); executeComplete();
} else { } else {
final SubscriptionShutdownEvent subscriptionShutdownEvent = new SubscriptionShutdownEvent( final SubscriptionShutdownEvent subscriptionShutdownEvent = new SubscriptionShutdownEvent(
() -> {parent.recordsDeliveryQueue.poll(); executeComplete();}, "onComplete"); () -> {
parent.recordsDeliveryQueue.poll();
executeComplete();
},
"onComplete");
tryEnqueueSubscriptionShutdownEvent(subscriptionShutdownEvent); tryEnqueueSubscriptionShutdownEvent(subscriptionShutdownEvent);
} }
} }

View file

@ -54,7 +54,7 @@ public class FanOutRetrievalFactory implements RetrievalFactory {
final StreamConfig streamConfig, final StreamConfig streamConfig,
final MetricsFactory metricsFactory) { final MetricsFactory metricsFactory) {
final Optional<String> streamIdentifierStr = shardInfo.streamIdentifierSerOpt(); final Optional<String> streamIdentifierStr = shardInfo.streamIdentifierSerOpt();
if(streamIdentifierStr.isPresent()) { if (streamIdentifierStr.isPresent()) {
final StreamIdentifier streamIdentifier = StreamIdentifier.multiStreamInstance(streamIdentifierStr.get()); final StreamIdentifier streamIdentifier = StreamIdentifier.multiStreamInstance(streamIdentifierStr.get());
return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(), return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(),
getOrCreateConsumerArn(streamIdentifier, streamConfig.consumerArn()), getOrCreateConsumerArn(streamIdentifier, streamConfig.consumerArn()),

View file

@ -137,7 +137,7 @@ public class PollingConfig implements RetrievalSpecificConfig {
@Override @Override
public RetrievalFactory retrievalFactory() { public RetrievalFactory retrievalFactory() {
// Prioritize the PollingConfig specified value if its updated. // Prioritize the PollingConfig specified value if its updated.
if(usePollingConfigIdleTimeValue) { if (usePollingConfigIdleTimeValue) {
recordsFetcherFactory.idleMillisBetweenCalls(idleTimeBetweenReadsInMillis); recordsFetcherFactory.idleMillisBetweenCalls(idleTimeBetweenReadsInMillis);
} }
return new SynchronousBlockingRetrievalFactory(streamName(), kinesisClient(), recordsFetcherFactory, return new SynchronousBlockingRetrievalFactory(streamName(), kinesisClient(), recordsFetcherFactory,

View file

@ -327,7 +327,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
} }
resetLock.writeLock().lock(); resetLock.writeLock().lock();
try { try {
publisherSession.reset((PrefetchRecordsRetrieved)recordsRetrieved); publisherSession.reset((PrefetchRecordsRetrieved) recordsRetrieved);
wasReset = true; wasReset = true;
} finally { } finally {
resetLock.writeLock().unlock(); resetLock.writeLock().unlock();
@ -555,7 +555,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
return; return;
} }
// Add a sleep if lastSuccessfulCall is still null but this is not the first try to avoid retry storm // Add a sleep if lastSuccessfulCall is still null but this is not the first try to avoid retry storm
if(lastSuccessfulCall == null) { if (lastSuccessfulCall == null) {
Thread.sleep(idleMillisBetweenCalls); Thread.sleep(idleMillisBetweenCalls);
return; return;
} }