diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java index 65c89956..07e9068d 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java @@ -47,7 +47,8 @@ import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.metrics.MetricsLevel; import software.amazon.kinesis.metrics.MetricsScope; import software.amazon.kinesis.metrics.MetricsUtil; -import static software.amazon.kinesis.common.CommonCalculations.*; + +import static software.amazon.kinesis.common.CommonCalculations.getRenewerTakerIntervalMillis; /** * LeaseCoordinator abstracts away LeaseTaker and LeaseRenewer from the application code that's using leasing. It owns diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java index d0e11dd8..32ebc632 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java @@ -41,7 +41,8 @@ import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.metrics.MetricsLevel; import software.amazon.kinesis.metrics.MetricsScope; import software.amazon.kinesis.metrics.MetricsUtil; -import static software.amazon.kinesis.common.CommonCalculations.*; + +import static software.amazon.kinesis.common.CommonCalculations.getRenewerTakerIntervalMillis; /** * An implementation of {@link LeaseTaker} that uses DynamoDB via {@link LeaseRefresher}. @@ -51,7 +52,7 @@ import static software.amazon.kinesis.common.CommonCalculations.*; public class DynamoDBLeaseTaker implements LeaseTaker { private static final int TAKE_RETRIES = 3; private static final int SCAN_RETRIES = 1; - private long veryOldLeaseDurationNanosMultiplier = 3; + private static final double RENEWAL_SLACK_PERCENTAGE = 0.5; // See note on TAKE_LEASES_DIMENSION(Callable) for why we have this callable. private static final Callable SYSTEM_CLOCK_CALLABLE = System::nanoTime; @@ -63,16 +64,16 @@ public class DynamoDBLeaseTaker implements LeaseTaker { private final long leaseDurationNanos; private final long leaseRenewalIntervalMillis; private final MetricsFactory metricsFactory; - - private final double RENEWAL_SLACK_PERCENTAGE = 0.5; - - private final Map allLeases = new HashMap<>(); + + final Map allLeases = new HashMap<>(); // TODO: Remove these defaults and use the defaults in the config private int maxLeasesForWorker = Integer.MAX_VALUE; private int maxLeasesToStealAtOneTime = 1; + private long veryOldLeaseDurationNanosMultiplier = 3; private long lastScanTimeNanos = 0L; + public DynamoDBLeaseTaker(LeaseRefresher leaseRefresher, String workerIdentifier, long leaseDurationMillis, final MetricsFactory metricsFactory) { this.leaseRefresher = leaseRefresher; @@ -143,12 +144,12 @@ public class DynamoDBLeaseTaker implements LeaseTaker { * Internal implementation of TAKE_LEASES_DIMENSION. Takes a callable that can provide the time to enable test cases * without Thread.sleep. Takes a callable instead of a raw time value because the time needs to be computed as-of * immediately after the scan. - * + * * @param timeProvider * Callable that will supply the time - * + * * @return map of lease key to taken lease - * + * * @throws DependencyException * @throws InvalidStateException */ @@ -259,7 +260,8 @@ public class DynamoDBLeaseTaker implements LeaseTaker { try { return leaseRefresher.getLease(lease.leaseKey()); } catch (DependencyException | InvalidStateException | ProvisionedThroughputException e) { - log.debug("Unable to retrieve the current lease, defaulting to existing lease", e); + log.warn("Unable to retrieve the current lease while refreshing the stale lease, " + + "defaulting to existing lease", e); } } return lease; @@ -284,17 +286,17 @@ public class DynamoDBLeaseTaker implements LeaseTaker { builder.append(string); needDelimiter = true; } - + return builder.toString(); } /** * Scan all leases and update lastRenewalTime. Add new leases and delete old leases. - * + * * @param timeProvider callable that supplies the current time - * + * * @return list of expired leases, possibly empty, never null. - * + * * @throws ProvisionedThroughputException if listLeases fails due to lack of provisioned throughput * @throws InvalidStateException if the lease table does not exist * @throws DependencyException if listLeases fails in an unexpected way @@ -371,7 +373,7 @@ public class DynamoDBLeaseTaker implements LeaseTaker { /** * Compute the number of leases I should try to take based on the state of the system. - * + * * @param expiredLeases list of leases we determined to be expired * @return set of leases to take. */ @@ -496,11 +498,11 @@ public class DynamoDBLeaseTaker implements LeaseTaker { /** * Choose leases to steal by randomly selecting one or more (up to max) from the most loaded worker. * Stealing rules: - * + * * Steal up to maxLeasesToStealAtOneTime leases from the most loaded worker if * a) he has > target leases and I need >= 1 leases : steal min(leases needed, maxLeasesToStealAtOneTime) * b) he has == target leases and I need > 1 leases : steal 1 - * + * * @param leaseCounts map of workerIdentifier to lease count * @param needed # of leases needed to reach the target leases for the worker * @param target target # of leases per worker @@ -574,7 +576,7 @@ public class DynamoDBLeaseTaker implements LeaseTaker { /** * Count leases by host. Always includes myself, but otherwise only includes hosts that are currently holding * leases. - * + * * @param expiredLeases list of leases that are currently expired * @return map of workerIdentifier to lease count */ @@ -599,11 +601,7 @@ public class DynamoDBLeaseTaker implements LeaseTaker { } // If I have no leases, I wasn't represented in leaseCounts. Let's fix that. - Integer myCount = leaseCounts.get(workerIdentifier); - if (myCount == null) { - myCount = 0; - leaseCounts.put(workerIdentifier, myCount); - } + leaseCounts.putIfAbsent(workerIdentifier, 0); return leaseCounts; }