diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/CommonCalculations.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/CommonCalculations.java new file mode 100644 index 00000000..edb6de2e --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/CommonCalculations.java @@ -0,0 +1,31 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package software.amazon.kinesis.common; + + +public class CommonCalculations { + + + /** + * Convenience method for calculating renewer intervals in milliseconds. + * + * @param leaseDurationMillis Duration of a lease + * @param epsilonMillis Allow for some variance when calculating lease expirations + */ + public static long getRenewerTakerIntervalMillis(long leaseDurationMillis, long epsilonMillis) { + return leaseDurationMillis / 3 - epsilonMillis; + } +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java index 359b7a44..4074db22 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java @@ -14,20 +14,20 @@ */ package software.amazon.kinesis.leases; -import lombok.EqualsAndHashCode; -import lombok.Getter; -import lombok.NoArgsConstructor; -import lombok.NonNull; -import lombok.ToString; -import lombok.experimental.Accessors; -import software.amazon.kinesis.common.HashKeyRangeForLease; -import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; - import java.util.Collection; import java.util.HashSet; import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.NonNull; +import lombok.Setter; +import lombok.ToString; +import lombok.experimental.Accessors; +import software.amazon.kinesis.common.HashKeyRangeForLease; +import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; /** * This class contains data pertaining to a Lease. Distributed systems may use leases to partition work across a @@ -39,7 +39,7 @@ import java.util.concurrent.TimeUnit; @NoArgsConstructor @Getter @Accessors(fluent = true) -@EqualsAndHashCode(exclude = {"concurrencyToken", "lastCounterIncrementNanos", "childShardIds", "pendingCheckpointState"}) +@EqualsAndHashCode(exclude = {"concurrencyToken", "lastCounterIncrementNanos", "childShardIds", "pendingCheckpointState", "isMarkedForLeaseSteal"}) @ToString public class Lease { /* @@ -91,6 +91,16 @@ public class Lease { */ private byte[] pendingCheckpointState; + + /** + * Denotes whether the lease is marked for stealing. Deliberately excluded from hashCode and equals and + * not persisted in DynamoDB. + * + * @return flag for denoting lease is marked for stealing. + */ + @Setter + private boolean isMarkedForLeaseSteal; + /** * @return count of distinct lease holders between checkpoints. */ @@ -141,6 +151,7 @@ public class Lease { } this.hashKeyRangeForLease = hashKeyRangeForLease; this.pendingCheckpointState = pendingCheckpointState; + this.isMarkedForLeaseSteal = false; } /** diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java index 76f91800..65c89956 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java @@ -14,6 +14,8 @@ */ package software.amazon.kinesis.leases.dynamodb; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + import java.util.Collection; import java.util.Collections; import java.util.List; @@ -28,9 +30,6 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; - -import com.google.common.util.concurrent.ThreadFactoryBuilder; - import lombok.extern.slf4j.Slf4j; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.leases.Lease; @@ -48,6 +47,7 @@ import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.metrics.MetricsLevel; import software.amazon.kinesis.metrics.MetricsScope; import software.amazon.kinesis.metrics.MetricsUtil; +import static software.amazon.kinesis.common.CommonCalculations.*; /** * LeaseCoordinator abstracts away LeaseTaker and LeaseRenewer from the application code that's using leasing. It owns @@ -156,7 +156,7 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator { .withMaxLeasesToStealAtOneTime(maxLeasesToStealAtOneTime); this.leaseRenewer = new DynamoDBLeaseRenewer( leaseRefresher, workerIdentifier, leaseDurationMillis, leaseRenewalThreadpool, metricsFactory); - this.renewerIntervalMillis = leaseDurationMillis / 3 - epsilonMillis; + this.renewerIntervalMillis = getRenewerTakerIntervalMillis(leaseDurationMillis, epsilonMillis); this.takerIntervalMillis = (leaseDurationMillis + epsilonMillis) * 2; if (initialLeaseTableReadCapacity <= 0) { throw new IllegalArgumentException("readCapacity should be >= 1"); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java index e7087e62..d0e11dd8 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java @@ -41,6 +41,7 @@ import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.metrics.MetricsLevel; import software.amazon.kinesis.metrics.MetricsScope; import software.amazon.kinesis.metrics.MetricsUtil; +import static software.amazon.kinesis.common.CommonCalculations.*; /** * An implementation of {@link LeaseTaker} that uses DynamoDB via {@link LeaseRefresher}. @@ -60,20 +61,23 @@ public class DynamoDBLeaseTaker implements LeaseTaker { private final LeaseRefresher leaseRefresher; private final String workerIdentifier; private final long leaseDurationNanos; + private final long leaseRenewalIntervalMillis; private final MetricsFactory metricsFactory; + private final double RENEWAL_SLACK_PERCENTAGE = 0.5; + + private final Map allLeases = new HashMap<>(); // TODO: Remove these defaults and use the defaults in the config private int maxLeasesForWorker = Integer.MAX_VALUE; private int maxLeasesToStealAtOneTime = 1; private long lastScanTimeNanos = 0L; - final Map allLeases = new HashMap<>(); - public DynamoDBLeaseTaker(LeaseRefresher leaseRefresher, String workerIdentifier, long leaseDurationMillis, final MetricsFactory metricsFactory) { this.leaseRefresher = leaseRefresher; this.workerIdentifier = workerIdentifier; + this.leaseRenewalIntervalMillis = getRenewerTakerIntervalMillis(leaseDurationMillis, 0); this.leaseDurationNanos = TimeUnit.MILLISECONDS.toNanos(leaseDurationMillis); this.metricsFactory = metricsFactory; } @@ -156,6 +160,7 @@ public class DynamoDBLeaseTaker implements LeaseTaker { final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, TAKE_LEASES_DIMENSION); long startTime = System.currentTimeMillis(); + long updateAllLeasesTotalTimeMillis; boolean success = false; ProvisionedThroughputException lastException = null; @@ -173,19 +178,23 @@ public class DynamoDBLeaseTaker implements LeaseTaker { } } } finally { + updateAllLeasesTotalTimeMillis = System.currentTimeMillis() - startTime; MetricsUtil.addWorkerIdentifier(scope, workerIdentifier); MetricsUtil.addSuccessAndLatency(scope, "ListLeases", success, startTime, MetricsLevel.DETAILED); } + if (lastException != null) { log.error("Worker {} could not scan leases table, aborting TAKE_LEASES_DIMENSION. Exception caught by" - + " last retry:", workerIdentifier, lastException); + + " last retry:", workerIdentifier, lastException); return takenLeases; } List expiredLeases = getExpiredLeases(); Set leasesToTake = computeLeasesToTake(expiredLeases); + leasesToTake = updateStaleLeasesWithLatestState(updateAllLeasesTotalTimeMillis, leasesToTake); + Set untakenLeaseKeys = new HashSet<>(); for (Lease lease : leasesToTake) { @@ -233,6 +242,32 @@ public class DynamoDBLeaseTaker implements LeaseTaker { return takenLeases; } + /** + * If update all leases takes longer than the lease renewal time, + * we fetch the latest lease info for the given leases that are marked for lease steal. + * If nothing is found (or any transient network error occurs), + * we default to the last known state of the lease + * + * @param updateAllLeasesEndTime How long it takes for update all leases to complete + * @return set of leases to take. + */ + private Set updateStaleLeasesWithLatestState(long updateAllLeasesEndTime, + Set leasesToTake) { + if (updateAllLeasesEndTime > leaseRenewalIntervalMillis * RENEWAL_SLACK_PERCENTAGE) { + leasesToTake = leasesToTake.stream().map(lease -> { + if (lease.isMarkedForLeaseSteal()) { + try { + return leaseRefresher.getLease(lease.leaseKey()); + } catch (DependencyException | InvalidStateException | ProvisionedThroughputException e) { + log.debug("Unable to retrieve the current lease, defaulting to existing lease", e); + } + } + return lease; + }).collect(Collectors.toSet()); + } + return leasesToTake; + } + /** Package access for testing purposes. * * @param strings @@ -530,8 +565,9 @@ public class DynamoDBLeaseTaker implements LeaseTaker { // Return random ones Collections.shuffle(candidates); int toIndex = Math.min(candidates.size(), numLeasesToSteal); - leasesToSteal.addAll(candidates.subList(0, toIndex)); - + leasesToSteal.addAll(candidates.subList(0, toIndex).stream() + .map(lease -> lease.isMarkedForLeaseSteal(true)) + .collect(Collectors.toList())); return leasesToSteal; } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerIntegrationTest.java index 972d3951..475f1940 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerIntegrationTest.java @@ -15,7 +15,9 @@ package software.amazon.kinesis.leases.dynamodb; import java.util.Collection; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -26,6 +28,7 @@ import software.amazon.kinesis.leases.LeaseIntegrationTest; import software.amazon.kinesis.leases.exceptions.LeasingException; import software.amazon.kinesis.metrics.NullMetricsFactory; import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; @RunWith(MockitoJUnitRunner.class) @@ -150,6 +153,33 @@ public class DynamoDBLeaseTakerIntegrationTest extends LeaseIntegrationTest { assertThat(addedLeases.values().containsAll(allLeases), equalTo(true)); } + + /** + * Sets the leaseDurationMillis to 0, ensuring a get request to update the existing lease after computing + * leases to take + */ + @Test + public void testSlowGetAllLeases() throws LeasingException { + long leaseDurationMillis = 0; + taker = new DynamoDBLeaseTaker(leaseRefresher, + "foo", + leaseDurationMillis, + new NullMetricsFactory()); + TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher); + + Map addedLeases = builder.withLease("1", "bar") + .withLease("2", "bar") + .withLease("5", "foo") + .build(); + + assertThat(taker.allLeases().size(), equalTo(0)); + taker.takeLeases(); + + Collection allLeases = taker.allLeases(); + assertThat(allLeases.size(), equalTo(addedLeases.size())); + assertEquals(addedLeases.values().size(), allLeases.size()); + } + /** * Verify that LeaseTaker does not steal when it's only short 1 lease and the other worker is at target. Set up a * scenario where there are 4 leases held by two servers, and a third server with one lease. The third server should @@ -189,7 +219,7 @@ public class DynamoDBLeaseTakerIntegrationTest extends LeaseIntegrationTest { builder.build(); // Assert that one lease was stolen from baz. - Map takenLeases = builder.takeMutateAssert(taker, 1); + Map takenLeases = builder.stealMutateAssert(taker, 1); // Assert that it was one of baz's leases (shardId != 1) String shardIdStolen = takenLeases.keySet().iterator().next(); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/TestHarnessBuilder.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/TestHarnessBuilder.java index 677303d6..00db6a51 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/TestHarnessBuilder.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/TestHarnessBuilder.java @@ -111,6 +111,23 @@ public class TestHarnessBuilder { return result; } + public Map stealMutateAssert(DynamoDBLeaseTaker taker, int numToTake) + throws LeasingException { + Map result = taker.takeLeases(timeProvider); + assertEquals(numToTake, result.size()); + + for (Lease actual : result.values()) { + Lease original = leases.get(actual.leaseKey()); + assertNotNull(original); + + original.isMarkedForLeaseSteal(true) + .lastCounterIncrementNanos(actual.lastCounterIncrementNanos()); + mutateAssert(taker.getWorkerIdentifier(), original, actual); + } + + return result; + } + public Map takeMutateAssert(DynamoDBLeaseTaker taker, String... takenShardIds) throws LeasingException { Map result = taker.takeLeases(timeProvider);