Change log level from debug to warn when update stale leases
Change log level from debug to warn when update stale leases
This commit is contained in:
parent
a323272a97
commit
72aeac3819
2 changed files with 23 additions and 24 deletions
|
|
@ -47,7 +47,8 @@ import software.amazon.kinesis.metrics.MetricsFactory;
|
||||||
import software.amazon.kinesis.metrics.MetricsLevel;
|
import software.amazon.kinesis.metrics.MetricsLevel;
|
||||||
import software.amazon.kinesis.metrics.MetricsScope;
|
import software.amazon.kinesis.metrics.MetricsScope;
|
||||||
import software.amazon.kinesis.metrics.MetricsUtil;
|
import software.amazon.kinesis.metrics.MetricsUtil;
|
||||||
import static software.amazon.kinesis.common.CommonCalculations.*;
|
|
||||||
|
import static software.amazon.kinesis.common.CommonCalculations.getRenewerTakerIntervalMillis;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* LeaseCoordinator abstracts away LeaseTaker and LeaseRenewer from the application code that's using leasing. It owns
|
* LeaseCoordinator abstracts away LeaseTaker and LeaseRenewer from the application code that's using leasing. It owns
|
||||||
|
|
|
||||||
|
|
@ -41,7 +41,8 @@ import software.amazon.kinesis.metrics.MetricsFactory;
|
||||||
import software.amazon.kinesis.metrics.MetricsLevel;
|
import software.amazon.kinesis.metrics.MetricsLevel;
|
||||||
import software.amazon.kinesis.metrics.MetricsScope;
|
import software.amazon.kinesis.metrics.MetricsScope;
|
||||||
import software.amazon.kinesis.metrics.MetricsUtil;
|
import software.amazon.kinesis.metrics.MetricsUtil;
|
||||||
import static software.amazon.kinesis.common.CommonCalculations.*;
|
|
||||||
|
import static software.amazon.kinesis.common.CommonCalculations.getRenewerTakerIntervalMillis;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An implementation of {@link LeaseTaker} that uses DynamoDB via {@link LeaseRefresher}.
|
* An implementation of {@link LeaseTaker} that uses DynamoDB via {@link LeaseRefresher}.
|
||||||
|
|
@ -51,7 +52,7 @@ import static software.amazon.kinesis.common.CommonCalculations.*;
|
||||||
public class DynamoDBLeaseTaker implements LeaseTaker {
|
public class DynamoDBLeaseTaker implements LeaseTaker {
|
||||||
private static final int TAKE_RETRIES = 3;
|
private static final int TAKE_RETRIES = 3;
|
||||||
private static final int SCAN_RETRIES = 1;
|
private static final int SCAN_RETRIES = 1;
|
||||||
private long veryOldLeaseDurationNanosMultiplier = 3;
|
private static final double RENEWAL_SLACK_PERCENTAGE = 0.5;
|
||||||
|
|
||||||
// See note on TAKE_LEASES_DIMENSION(Callable) for why we have this callable.
|
// See note on TAKE_LEASES_DIMENSION(Callable) for why we have this callable.
|
||||||
private static final Callable<Long> SYSTEM_CLOCK_CALLABLE = System::nanoTime;
|
private static final Callable<Long> SYSTEM_CLOCK_CALLABLE = System::nanoTime;
|
||||||
|
|
@ -63,16 +64,16 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
|
||||||
private final long leaseDurationNanos;
|
private final long leaseDurationNanos;
|
||||||
private final long leaseRenewalIntervalMillis;
|
private final long leaseRenewalIntervalMillis;
|
||||||
private final MetricsFactory metricsFactory;
|
private final MetricsFactory metricsFactory;
|
||||||
|
|
||||||
private final double RENEWAL_SLACK_PERCENTAGE = 0.5;
|
final Map<String, Lease> allLeases = new HashMap<>();
|
||||||
|
|
||||||
private final Map<String, Lease> allLeases = new HashMap<>();
|
|
||||||
// TODO: Remove these defaults and use the defaults in the config
|
// TODO: Remove these defaults and use the defaults in the config
|
||||||
private int maxLeasesForWorker = Integer.MAX_VALUE;
|
private int maxLeasesForWorker = Integer.MAX_VALUE;
|
||||||
private int maxLeasesToStealAtOneTime = 1;
|
private int maxLeasesToStealAtOneTime = 1;
|
||||||
|
|
||||||
|
private long veryOldLeaseDurationNanosMultiplier = 3;
|
||||||
private long lastScanTimeNanos = 0L;
|
private long lastScanTimeNanos = 0L;
|
||||||
|
|
||||||
|
|
||||||
public DynamoDBLeaseTaker(LeaseRefresher leaseRefresher, String workerIdentifier, long leaseDurationMillis,
|
public DynamoDBLeaseTaker(LeaseRefresher leaseRefresher, String workerIdentifier, long leaseDurationMillis,
|
||||||
final MetricsFactory metricsFactory) {
|
final MetricsFactory metricsFactory) {
|
||||||
this.leaseRefresher = leaseRefresher;
|
this.leaseRefresher = leaseRefresher;
|
||||||
|
|
@ -143,12 +144,12 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
|
||||||
* Internal implementation of TAKE_LEASES_DIMENSION. Takes a callable that can provide the time to enable test cases
|
* Internal implementation of TAKE_LEASES_DIMENSION. Takes a callable that can provide the time to enable test cases
|
||||||
* without Thread.sleep. Takes a callable instead of a raw time value because the time needs to be computed as-of
|
* without Thread.sleep. Takes a callable instead of a raw time value because the time needs to be computed as-of
|
||||||
* immediately after the scan.
|
* immediately after the scan.
|
||||||
*
|
*
|
||||||
* @param timeProvider
|
* @param timeProvider
|
||||||
* Callable that will supply the time
|
* Callable that will supply the time
|
||||||
*
|
*
|
||||||
* @return map of lease key to taken lease
|
* @return map of lease key to taken lease
|
||||||
*
|
*
|
||||||
* @throws DependencyException
|
* @throws DependencyException
|
||||||
* @throws InvalidStateException
|
* @throws InvalidStateException
|
||||||
*/
|
*/
|
||||||
|
|
@ -259,7 +260,8 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
|
||||||
try {
|
try {
|
||||||
return leaseRefresher.getLease(lease.leaseKey());
|
return leaseRefresher.getLease(lease.leaseKey());
|
||||||
} catch (DependencyException | InvalidStateException | ProvisionedThroughputException e) {
|
} catch (DependencyException | InvalidStateException | ProvisionedThroughputException e) {
|
||||||
log.debug("Unable to retrieve the current lease, defaulting to existing lease", e);
|
log.warn("Unable to retrieve the current lease while refreshing the stale lease, "
|
||||||
|
+ "defaulting to existing lease", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return lease;
|
return lease;
|
||||||
|
|
@ -284,17 +286,17 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
|
||||||
builder.append(string);
|
builder.append(string);
|
||||||
needDelimiter = true;
|
needDelimiter = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
return builder.toString();
|
return builder.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Scan all leases and update lastRenewalTime. Add new leases and delete old leases.
|
* Scan all leases and update lastRenewalTime. Add new leases and delete old leases.
|
||||||
*
|
*
|
||||||
* @param timeProvider callable that supplies the current time
|
* @param timeProvider callable that supplies the current time
|
||||||
*
|
*
|
||||||
* @return list of expired leases, possibly empty, never null.
|
* @return list of expired leases, possibly empty, never null.
|
||||||
*
|
*
|
||||||
* @throws ProvisionedThroughputException if listLeases fails due to lack of provisioned throughput
|
* @throws ProvisionedThroughputException if listLeases fails due to lack of provisioned throughput
|
||||||
* @throws InvalidStateException if the lease table does not exist
|
* @throws InvalidStateException if the lease table does not exist
|
||||||
* @throws DependencyException if listLeases fails in an unexpected way
|
* @throws DependencyException if listLeases fails in an unexpected way
|
||||||
|
|
@ -371,7 +373,7 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Compute the number of leases I should try to take based on the state of the system.
|
* Compute the number of leases I should try to take based on the state of the system.
|
||||||
*
|
*
|
||||||
* @param expiredLeases list of leases we determined to be expired
|
* @param expiredLeases list of leases we determined to be expired
|
||||||
* @return set of leases to take.
|
* @return set of leases to take.
|
||||||
*/
|
*/
|
||||||
|
|
@ -496,11 +498,11 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
|
||||||
/**
|
/**
|
||||||
* Choose leases to steal by randomly selecting one or more (up to max) from the most loaded worker.
|
* Choose leases to steal by randomly selecting one or more (up to max) from the most loaded worker.
|
||||||
* Stealing rules:
|
* Stealing rules:
|
||||||
*
|
*
|
||||||
* Steal up to maxLeasesToStealAtOneTime leases from the most loaded worker if
|
* Steal up to maxLeasesToStealAtOneTime leases from the most loaded worker if
|
||||||
* a) he has > target leases and I need >= 1 leases : steal min(leases needed, maxLeasesToStealAtOneTime)
|
* a) he has > target leases and I need >= 1 leases : steal min(leases needed, maxLeasesToStealAtOneTime)
|
||||||
* b) he has == target leases and I need > 1 leases : steal 1
|
* b) he has == target leases and I need > 1 leases : steal 1
|
||||||
*
|
*
|
||||||
* @param leaseCounts map of workerIdentifier to lease count
|
* @param leaseCounts map of workerIdentifier to lease count
|
||||||
* @param needed # of leases needed to reach the target leases for the worker
|
* @param needed # of leases needed to reach the target leases for the worker
|
||||||
* @param target target # of leases per worker
|
* @param target target # of leases per worker
|
||||||
|
|
@ -574,7 +576,7 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
|
||||||
/**
|
/**
|
||||||
* Count leases by host. Always includes myself, but otherwise only includes hosts that are currently holding
|
* Count leases by host. Always includes myself, but otherwise only includes hosts that are currently holding
|
||||||
* leases.
|
* leases.
|
||||||
*
|
*
|
||||||
* @param expiredLeases list of leases that are currently expired
|
* @param expiredLeases list of leases that are currently expired
|
||||||
* @return map of workerIdentifier to lease count
|
* @return map of workerIdentifier to lease count
|
||||||
*/
|
*/
|
||||||
|
|
@ -599,11 +601,7 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
|
||||||
}
|
}
|
||||||
|
|
||||||
// If I have no leases, I wasn't represented in leaseCounts. Let's fix that.
|
// If I have no leases, I wasn't represented in leaseCounts. Let's fix that.
|
||||||
Integer myCount = leaseCounts.get(workerIdentifier);
|
leaseCounts.putIfAbsent(workerIdentifier, 0);
|
||||||
if (myCount == null) {
|
|
||||||
myCount = 0;
|
|
||||||
leaseCounts.put(workerIdentifier, myCount);
|
|
||||||
}
|
|
||||||
|
|
||||||
return leaseCounts;
|
return leaseCounts;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue