Minor optimizations (e.g., calculate-once, put instead of get+put) (#1041)

and code cleanup (e.g., removed unused imports, updated Javadoc).

No functional change.
This commit is contained in:
stair 2023-02-13 13:58:02 -05:00 committed by GitHub
parent 5bfd1ab289
commit 1c8bd8e71e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 30 additions and 112 deletions

View file

@ -76,7 +76,7 @@ class PeriodicShardSyncManager {
@VisibleForTesting
static final BigInteger MAX_HASH_KEY = new BigInteger("2").pow(128).subtract(BigInteger.ONE);
static final String PERIODIC_SHARD_SYNC_MANAGER = "PeriodicShardSyncManager";
private Map<StreamIdentifier, HashRangeHoleTracker> hashRangeHoleTrackerMap = new HashMap<>();
private final Map<StreamIdentifier, HashRangeHoleTracker> hashRangeHoleTrackerMap = new HashMap<>();
private final String workerId;
private final LeaderDecider leaderDecider;
@ -142,15 +142,12 @@ class PeriodicShardSyncManager {
/**
* Runs shardSync once
* Does not schedule periodic shardSync
* @return the result of the task
*/
public synchronized void syncShardsOnce() throws Exception {
// TODO: Resume the shard sync from failed stream in the next attempt, to avoid syncing
// TODO: for already synced streams
for(Map.Entry<StreamIdentifier, StreamConfig> streamConfigEntry : currentStreamConfigMap.entrySet()) {
final StreamIdentifier streamIdentifier = streamConfigEntry.getKey();
log.info("Syncing Kinesis shard info for " + streamIdentifier);
final StreamConfig streamConfig = streamConfigEntry.getValue();
for (StreamConfig streamConfig : currentStreamConfigMap.values()) {
log.info("Syncing Kinesis shard info for {}", streamConfig);
final ShardSyncTaskManager shardSyncTaskManager = shardSyncTaskManagerProvider.apply(streamConfig);
final TaskResult taskResult = shardSyncTaskManager.callShardSyncTask();
if (taskResult.getException() != null) {
@ -283,7 +280,6 @@ class PeriodicShardSyncManager {
"Detected same hole for " + hashRangeHoleTracker.getNumConsecutiveHoles()
+ " times. Shard sync will be initiated when threshold reaches "
+ leasesRecoveryAuditorInconsistencyConfidenceThreshold);
} else {
// If hole is not present, clear any previous tracking for this stream and return false;
hashRangeHoleTrackerMap.remove(streamIdentifier);

View file

@ -62,7 +62,7 @@ import static software.amazon.kinesis.common.HashKeyRangeForLease.fromHashKeyRan
* Helper class to sync leases with shards of the Kinesis stream.
* It will create new leases/activities when it discovers new Kinesis shards (bootstrap/resharding).
* It deletes leases for shards that have been trimmed from Kinesis, or if we've completed processing it
* and begun processing it's child shards.
* and begun processing its child shards.
*/
@Slf4j
@KinesisClientInternalApi
@ -432,7 +432,7 @@ public class HierarchicalShardSyncer {
if (!shardIdsOfCurrentLeases.contains(parentShardId)) {
Lease lease = shardIdToLeaseMapOfNewShards.get(parentShardId);
/**
/*
* If the lease for the parent shard does not already exist, there are two cases in which we
* would want to create it:
* - If we have already marked the parentShardId for lease creation in a prior recursive
@ -454,7 +454,7 @@ public class HierarchicalShardSyncer {
}
}
/**
/*
* If the shard is a descendant and the specified initial position is AT_TIMESTAMP, then the
* checkpoint should be set to AT_TIMESTAMP, else to TRIM_HORIZON. For AT_TIMESTAMP, we will
* add a lease just like we do for TRIM_HORIZON. However we will only return back records

View file

@ -42,7 +42,7 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
@EqualsAndHashCode(exclude = {"concurrencyToken", "lastCounterIncrementNanos", "childShardIds", "pendingCheckpointState", "isMarkedForLeaseSteal"})
@ToString
public class Lease {
/*
/**
* See javadoc for System.nanoTime - summary:
*
* Sometimes System.nanoTime's return values will wrap due to overflow. When they do, the difference between two
@ -51,62 +51,57 @@ public class Lease {
private static final long MAX_ABS_AGE_NANOS = TimeUnit.DAYS.toNanos(365);
/**
* @return leaseKey - identifies the unit of work associated with this lease.
* Identifies the unit of work associated with this lease.
*/
private String leaseKey;
/**
* @return current owner of the lease, may be null.
* Current owner of the lease, may be null.
*/
private String leaseOwner;
/**
* @return leaseCounter is incremented periodically by the holder of the lease. Used for optimistic locking.
* LeaseCounter is incremented periodically by the holder of the lease. Used for optimistic locking.
*/
private Long leaseCounter = 0L;
/*
/**
* This field is used to prevent updates to leases that we have lost and re-acquired. It is deliberately not
* persisted in DynamoDB and excluded from hashCode and equals.
*/
private UUID concurrencyToken;
/*
/**
* This field is used by LeaseRenewer and LeaseTaker to track the last time a lease counter was incremented. It is
* deliberately not persisted in DynamoDB and excluded from hashCode and equals.
*/
private Long lastCounterIncrementNanos;
/**
* @return most recently application-supplied checkpoint value. During fail over, the new worker will pick up after
* Most recently application-supplied checkpoint value. During fail over, the new worker will pick up after
* the old worker's last checkpoint.
*/
private ExtendedSequenceNumber checkpoint;
/**
* @return pending checkpoint, possibly null.
* Pending checkpoint, possibly null.
*/
private ExtendedSequenceNumber pendingCheckpoint;
/**
* Last pending application state. Deliberately excluded from hashCode and equals.
*
* @return pending checkpoint state, possibly null.
* Last pending checkpoint state, possibly null. Deliberately excluded from hashCode and equals.
*/
private byte[] pendingCheckpointState;
/**
* Denotes whether the lease is marked for stealing. Deliberately excluded from hashCode and equals and
* not persisted in DynamoDB.
*
* @return flag for denoting lease is marked for stealing.
*/
@Setter
private boolean isMarkedForLeaseSteal;
/**
* @return count of distinct lease holders between checkpoints.
* Count of distinct lease holders between checkpoints.
*/
private Long ownerSwitchesSinceCheckpoint = 0L;
private Set<String> parentShardIds = new HashSet<>();
private Set<String> childShardIds = new HashSet<>();
private final Set<String> parentShardIds = new HashSet<>();
private final Set<String> childShardIds = new HashSet<>();
private HashKeyRangeForLease hashKeyRangeForLease;
/**
@ -319,5 +314,4 @@ public class Lease {
return new Lease(this);
}
}

View file

@ -64,22 +64,16 @@ public class LeaseManagementConfig {
/**
* Name of the table to use in DynamoDB
*
* @return String
*/
@NonNull
private final String tableName;
/**
* Client to be used to access DynamoDB service.
*
* @return {@link DynamoDbAsyncClient}
*/
@NonNull
private final DynamoDbAsyncClient dynamoDBClient;
/**
* Client to be used to access Kinesis Data Streams service.
*
* @return {@link KinesisAsyncClient}
*/
@NonNull
private final KinesisAsyncClient kinesisClient;
@ -90,8 +84,6 @@ public class LeaseManagementConfig {
private String streamName;
/**
* Used to distinguish different workers/processes of a KCL application.
*
* @return String
*/
@NonNull
private final String workerIdentifier;

View file

@ -210,8 +210,6 @@ public interface LeaseRefresher {
* Update application-specific fields of the given lease in DynamoDB. Does not update fields managed by the leasing
* library such as leaseCounter, leaseOwner, or leaseKey.
*
* @return true if update succeeded, false otherwise
*
* @throws InvalidStateException if lease table does not exist
* @throws ProvisionedThroughputException if DynamoDB update fails due to lack of capacity
* @throws DependencyException if DynamoDB update fails in an unexpected way

View file

@ -73,7 +73,6 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
private long veryOldLeaseDurationNanosMultiplier = 3;
private long lastScanTimeNanos = 0L;
public DynamoDBLeaseTaker(LeaseRefresher leaseRefresher, String workerIdentifier, long leaseDurationMillis,
final MetricsFactory metricsFactory) {
this.leaseRefresher = leaseRefresher;
@ -184,7 +183,6 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
MetricsUtil.addSuccessAndLatency(scope, "ListLeases", success, startTime, MetricsLevel.DETAILED);
}
if (lastException != null) {
log.error("Worker {} could not scan leases table, aborting TAKE_LEASES_DIMENSION. Exception caught by"
+ " last retry:", workerIdentifier, lastException);
@ -319,8 +317,7 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
for (Lease lease : freshList) {
String leaseKey = lease.leaseKey();
Lease oldLease = allLeases.get(leaseKey);
allLeases.put(leaseKey, lease);
final Lease oldLease = allLeases.put(leaseKey, lease);
notUpdated.remove(leaseKey);
if (oldLease != null) {
@ -384,7 +381,6 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
Set<Lease> leasesToTake = new HashSet<>();
final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, TAKE_LEASES_DIMENSION);
MetricsUtil.addWorkerIdentifier(scope, workerIdentifier);
List<Lease> veryOldLeases = new ArrayList<>();
final int numAvailableLeases = expiredLeases.size();
int numLeases = 0;
@ -402,7 +398,6 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
return leasesToTake;
}
int target;
if (numWorkers >= numLeases) {
// If we have n leases and n or more workers, each worker can have up to 1 lease, including myself.
@ -435,9 +430,9 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
// If there are leases that have been expired for an extended period of
// time, take them with priority, disregarding the target (computed
// later) but obeying the maximum limit per worker.
veryOldLeases = allLeases.values().stream()
.filter(lease -> System.nanoTime() - lease.lastCounterIncrementNanos()
> veryOldLeaseDurationNanosMultiplier * leaseDurationNanos)
final long nanoThreshold = System.nanoTime() - (veryOldLeaseDurationNanosMultiplier * leaseDurationNanos);
final List<Lease> veryOldLeases = allLeases.values().stream()
.filter(lease -> nanoThreshold > lease.lastCounterIncrementNanos())
.collect(Collectors.toList());
if (!veryOldLeases.isEmpty()) {
@ -481,7 +476,6 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
workerIdentifier, numLeases, numAvailableLeases, numWorkers, target, myCount,
leasesToTake.size());
}
} finally {
scope.addData("ExpiredLeases", expiredLeases.size(), StandardUnit.COUNT, MetricsLevel.SUMMARY);
scope.addData("LeaseSpillover", leaseSpillover, StandardUnit.COUNT, MetricsLevel.SUMMARY);

View file

@ -21,7 +21,6 @@ import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.checkpoint.Checkpoint;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.leases.MultiStreamLease;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.metrics.MetricsFactory;

View file

@ -16,7 +16,6 @@ package software.amazon.kinesis.lifecycle;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@ -33,7 +32,6 @@ import lombok.Getter;
import lombok.NonNull;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.kinesis.model.ChildShard;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.exceptions.internal.BlockedOnParentShardException;
import software.amazon.kinesis.leases.ShardInfo;
@ -75,7 +73,7 @@ public class ShardConsumer {
private volatile Instant taskDispatchedAt;
private volatile boolean taskIsRunning = false;
/*
/**
* Tracks current state. It is only updated via the consumeStream/shutdown APIs. Therefore we don't do
* much coordination/synchronization to handle concurrent reads/updates.
*/

View file

@ -21,15 +21,11 @@ import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.RequestDetails;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import java.util.Optional;
/**
* Provides a record publisher that will retrieve records from Kinesis for processing
*/
public interface RecordsPublisher extends Publisher<RecordsRetrieved> {
/**
* Initializes the publisher with where to start processing. If there is a stored sequence number the publisher will
* begin from that sequence number, otherwise it will use the initial position.
@ -47,7 +43,6 @@ public interface RecordsPublisher extends Publisher<RecordsRetrieved> {
*/
void restartFrom(RecordsRetrieved recordsRetrieved);
/**
* Shutdowns the publisher. Once this method returns the publisher should no longer provide any records.
*/

View file

@ -108,7 +108,7 @@ public class RetrievalConfig {
*
* @deprecated Initial stream position is now handled by {@link StreamTracker}.
* @see StreamTracker#orphanedStreamInitialPositionInStream()
* @see StreamTracker#createConfig(StreamIdentifier)
* @see StreamTracker#createStreamConfig(StreamIdentifier)
*/
@Deprecated
private InitialPositionInStreamExtended initialPositionInStreamExtended = InitialPositionInStreamExtended
@ -138,7 +138,7 @@ public class RetrievalConfig {
*
* @deprecated Initial stream position is now handled by {@link StreamTracker}.
* @see StreamTracker#orphanedStreamInitialPositionInStream()
* @see StreamTracker#createConfig(StreamIdentifier)
* @see StreamTracker#createStreamConfig(StreamIdentifier)
*/
@Deprecated
public RetrievalConfig initialPositionInStreamExtended(InitialPositionInStreamExtended initialPositionInStreamExtended) {

View file

@ -18,7 +18,6 @@ package software.amazon.kinesis.retrieval.polling;
import java.time.Duration;
import java.util.Optional;
import java.util.function.Function;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NonNull;
@ -47,8 +46,6 @@ public class PollingConfig implements RetrievalSpecificConfig {
Function<DataFetcherProviderConfig, DataFetcher> dataFetcherProvider;
/**
* Name of the Kinesis stream.
*
* @return String
*/
private String streamName;
@ -63,8 +60,6 @@ public class PollingConfig implements RetrievalSpecificConfig {
/**
* Client used to access to Kinesis service.
*
* @return {@link KinesisAsyncClient}
*/
@NonNull
private final KinesisAsyncClient kinesisClient;

View file

@ -70,7 +70,7 @@ import static software.amazon.kinesis.common.DiagnosticUtils.takeDelayedDelivery
* i.e. the byte size of the records stored in the cache and maxRecordsCount i.e. the max number of records that should
* be present in the cache across multiple GetRecordsResult object. If no data is available in the cache, the call from
* the record processor is blocked till records are retrieved from Kinesis.
*
* <br/><br/>
* There are three threads namely publisher, demand-notifier and ack-notifier which will contend to drain the events
* to the Subscriber (ShardConsumer in KCL).
*/
@ -81,9 +81,9 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
// Since this package is being used by all KCL clients keeping the upper threshold of 60 seconds
private static final long DEFAULT_AWAIT_TERMINATION_TIMEOUT_MILLIS = 60_000L;
private int maxPendingProcessRecordsInput;
private int maxByteSize;
private int maxRecordsCount;
private final int maxPendingProcessRecordsInput;
private final int maxByteSize;
private final int maxRecordsCount;
private final int maxRecordsPerCall;
private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
private final ExecutorService executorService;
@ -447,7 +447,6 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
}
private class DefaultGetRecordsCacheDaemon implements Runnable {
volatile boolean isShutdown = false;
@ -483,7 +482,6 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, operation);
if (publisherSession.prefetchCounters().shouldGetNewRecords()) {
try {
sleepBeforeNextCall();
GetRecordsResponse getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall);
lastSuccessfulCall = Instant.now();

View file

@ -14,7 +14,6 @@
*/
package software.amazon.kinesis.retrieval.polling;
import java.util.Optional;
import lombok.Data;
import lombok.NonNull;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;

View file

@ -22,7 +22,6 @@ import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

View file

@ -15,10 +15,8 @@
package software.amazon.kinesis.leases.dynamodb;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -28,19 +26,14 @@ import java.util.concurrent.Callable;
import java.util.function.Function;
import java.util.stream.Collectors;
import junit.framework.Assert;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseTaker;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.NullMetricsScope;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
@ -68,34 +61,6 @@ public class DynamoDBLeaseTakerTest {
this.dynamoDBLeaseTaker = new DynamoDBLeaseTaker(leaseRefresher, WORKER_IDENTIFIER, LEASE_DURATION_MILLIS, metricsFactory);
}
/**
* @throws java.lang.Exception
*/
@BeforeClass
public static void setUpBeforeClass() throws Exception {
}
/**
* @throws java.lang.Exception
*/
@AfterClass
public static void tearDownAfterClass() throws Exception {
}
/**
* @throws java.lang.Exception
*/
@Before
public void setUp() throws Exception {
}
/**
* @throws java.lang.Exception
*/
@After
public void tearDown() throws Exception {
}
/**
* Test method for {@link DynamoDBLeaseTaker#stringJoin(java.util.Collection, java.lang.String)}.
*/

View file

@ -17,10 +17,8 @@ package software.amazon.kinesis.lifecycle;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
@ -48,7 +46,6 @@ import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.exceptions.internal.BlockedOnParentShardException;
import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseCleanupManager;
@ -265,7 +262,6 @@ public class ShutdownTaskTest {
when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END);
Lease heldLease = LeaseHelper.createLease("shardId-0", "leaseOwner", ImmutableList.of("parent1", "parent2"));
Lease parentLease = LeaseHelper.createLease("shardId-1", "leaseOwner", Collections.emptyList());
when(leaseCoordinator.getCurrentlyHeldLease("shardId-0")).thenReturn(heldLease);
when(leaseCoordinator.getCurrentlyHeldLease("shardId-1"))
.thenReturn(null, null, null, null, null, null, null, null, null, null, null);