Lease I/F changes and drop lease fix

This commit is contained in:
Ashwin Giridharan 2020-04-30 17:20:01 -07:00
parent f69e9cf3ba
commit a73701ff87
8 changed files with 37 additions and 32 deletions

View file

@ -92,6 +92,9 @@ public interface LeaseCoordinator {
*
* @param lease lease object containing updated values
* @param concurrencyToken obtained by calling Lease.concurrencyToken for a currently held lease
* @param operation that performs updateLease
* @param singleStreamShardId for metrics emission in single stream mode. MultiStream mode will get the
* shardId from the lease object
*
* @return true if update succeeded, false otherwise
*
@ -99,7 +102,7 @@ public interface LeaseCoordinator {
* @throws ProvisionedThroughputException if DynamoDB update fails due to lack of capacity
* @throws DependencyException if DynamoDB update fails in an unexpected way
*/
boolean updateLease(Lease lease, UUID concurrencyToken, String operation, String shardId)
boolean updateLease(Lease lease, UUID concurrencyToken, String operation, String singleStreamShardId)
throws DependencyException, InvalidStateException, ProvisionedThroughputException;
/**

View file

@ -99,7 +99,7 @@ public interface LeaseRefresher {
throws DependencyException, InvalidStateException, ProvisionedThroughputException;
/**
* @param shardId Get the lease for this shardId
* @param leaseKey Get the lease for this leasekey
*
* @throws InvalidStateException if lease table does not exist
* @throws ProvisionedThroughputException if DynamoDB get fails due to lack of capacity
@ -107,7 +107,7 @@ public interface LeaseRefresher {
*
* @return lease for the specified shardId, or null if one doesn't exist
*/
Lease getLease(String shardId) throws DependencyException, InvalidStateException, ProvisionedThroughputException;
Lease getLease(String leaseKey) throws DependencyException, InvalidStateException, ProvisionedThroughputException;
/**
* Renew a lease by incrementing the lease counter. Conditional on the leaseCounter in DynamoDB matching the leaseCounter
@ -206,13 +206,13 @@ public interface LeaseRefresher {
* Gets the current checkpoint of the shard. This is useful in the resharding use case
* where we will wait for the parent shard to complete before starting on the records from a child shard.
*
* @param shardId Checkpoint of this shard will be returned
* @param leaseKey Checkpoint of this shard will be returned
* @return Checkpoint of this shard, or null if the shard record doesn't exist.
*
* @throws ProvisionedThroughputException if DynamoDB update fails due to lack of capacity
* @throws InvalidStateException if lease table does not exist
* @throws DependencyException if DynamoDB update fails in an unexpected way
*/
ExtendedSequenceNumber getCheckpoint(String shardId)
ExtendedSequenceNumber getCheckpoint(String leaseKey)
throws ProvisionedThroughputException, InvalidStateException, DependencyException;
}

View file

@ -86,6 +86,9 @@ public interface LeaseRenewer {
*
* @param lease lease object containing updated data
* @param concurrencyToken obtained by calling Lease.concurrencyToken for a currently held lease
* @param operation that performs updateLease
* @param singleStreamShardId shardId for metrics emission in single stream mode. MultiStream mode will get the
* shardId from the lease object
*
* @return true if update succeeds, false otherwise
*
@ -93,7 +96,7 @@ public interface LeaseRenewer {
* @throws ProvisionedThroughputException if DynamoDB update fails due to lack of capacity
* @throws DependencyException if DynamoDB update fails in an unexpected way
*/
boolean updateLease(Lease lease, UUID concurrencyToken, String operation, String shardId)
boolean updateLease(Lease lease, UUID concurrencyToken, String operation, String singleStreamShardId)
throws DependencyException, InvalidStateException, ProvisionedThroughputException;
}

View file

@ -349,8 +349,8 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
@Override
public boolean updateLease(final Lease lease, final UUID concurrencyToken, final String operation,
final String shardId) throws DependencyException, InvalidStateException, ProvisionedThroughputException {
return leaseRenewer.updateLease(lease, concurrencyToken, operation, shardId);
final String singleStreamShardId) throws DependencyException, InvalidStateException, ProvisionedThroughputException {
return leaseRenewer.updateLease(lease, concurrencyToken, operation, singleStreamShardId);
}
/**

View file

@ -663,10 +663,10 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
* {@inheritDoc}
*/
@Override
public ExtendedSequenceNumber getCheckpoint(String shardId)
public ExtendedSequenceNumber getCheckpoint(String leaseKey)
throws ProvisionedThroughputException, InvalidStateException, DependencyException {
ExtendedSequenceNumber checkpoint = null;
Lease lease = getLease(shardId);
Lease lease = getLease(leaseKey);
if (lease != null) {
checkpoint = lease.checkpoint();
}

View file

@ -271,7 +271,7 @@ public class DynamoDBLeaseRenewer implements LeaseRenewer {
* {@inheritDoc}
*/
@Override
public boolean updateLease(Lease lease, UUID concurrencyToken, @NonNull String operation, String shardId)
public boolean updateLease(Lease lease, UUID concurrencyToken, @NonNull String operation, String singleStreamShardId)
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
verifyNotNull(lease, "lease cannot be null");
verifyNotNull(lease.leaseKey(), "leaseKey cannot be null");
@ -298,12 +298,14 @@ public class DynamoDBLeaseRenewer implements LeaseRenewer {
}
final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, operation);
if (StringUtils.isNotEmpty(shardId)) {
if (StringUtils.isNotEmpty(singleStreamShardId)) {
if(lease instanceof MultiStreamLease) {
MetricsUtil.addStreamId(scope,
StreamIdentifier.multiStreamInstance(((MultiStreamLease) lease).streamIdentifier()));
MetricsUtil.addShardId(scope, ((MultiStreamLease) lease).shardId());
} else {
MetricsUtil.addShardId(scope, singleStreamShardId);
}
MetricsUtil.addShardId(scope, shardId);
}
long startTime = System.currentTimeMillis();

View file

@ -47,11 +47,8 @@ import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
@ -92,7 +89,7 @@ public class ShutdownTask implements ConsumerTask {
private final List<ChildShard> childShards;
private static final Function<ShardInfo, String> shardInfoIdProvider = shardInfo -> shardInfo
private static final Function<ShardInfo, String> leaseKeyProvider = shardInfo -> shardInfo
.streamIdentifierSerOpt().map(s -> s + ":" + shardInfo.shardId()).orElse(shardInfo.shardId());
/*
* Invokes ShardRecordProcessor shutdown() API.
@ -110,7 +107,7 @@ public class ShutdownTask implements ConsumerTask {
try {
try {
log.debug("Invoking shutdown() for shard {}, concurrencyToken {}. Shutdown reason: {}",
shardInfoIdProvider.apply(shardInfo), shardInfo.concurrencyToken(), reason);
leaseKeyProvider.apply(shardInfo), shardInfo.concurrencyToken(), reason);
final long startTime = System.currentTimeMillis();
if (reason == ShutdownReason.SHARD_END) {
@ -123,7 +120,7 @@ public class ShutdownTask implements ConsumerTask {
createLeasesForChildShardsIfNotExist();
updateLeasesForChildShards();
} else {
log.warn("Shard {} no longer exists. Shutting down consumer with SHARD_END reason without creating leases for child shards.", shardInfoIdProvider.apply(shardInfo));
log.warn("Shard {} no longer exists. Shutting down consumer with SHARD_END reason without creating leases for child shards.", leaseKeyProvider.apply(shardInfo));
}
recordProcessorCheckpointer
@ -136,23 +133,23 @@ public class ShutdownTask implements ConsumerTask {
throwOnApplicationException(() -> shardRecordProcessor.leaseLost(LeaseLostInput.builder().build()), scope, startTime);
}
log.debug("Shutting down retrieval strategy for shard {}.", shardInfoIdProvider.apply(shardInfo));
log.debug("Shutting down retrieval strategy for shard {}.", leaseKeyProvider.apply(shardInfo));
recordsPublisher.shutdown();
log.debug("Record processor completed shutdown() for shard {}", shardInfoIdProvider.apply(shardInfo));
log.debug("Record processor completed shutdown() for shard {}", leaseKeyProvider.apply(shardInfo));
return new TaskResult(null);
} catch (Exception e) {
if (e instanceof CustomerApplicationException) {
log.error("Shard {}: Application exception. ", shardInfoIdProvider.apply(shardInfo), e);
log.error("Shard {}: Application exception. ", leaseKeyProvider.apply(shardInfo), e);
} else {
log.error("Shard {}: Caught exception: ", shardInfoIdProvider.apply(shardInfo), e);
log.error("Shard {}: Caught exception: ", leaseKeyProvider.apply(shardInfo), e);
}
exception = e;
// backoff if we encounter an exception.
try {
Thread.sleep(this.backoffTimeMillis);
} catch (InterruptedException ie) {
log.debug("Shard {}: Interrupted sleep", shardInfoIdProvider.apply(shardInfo), ie);
log.debug("Shard {}: Interrupted sleep", leaseKeyProvider.apply(shardInfo), ie);
}
}
} finally {
@ -168,7 +165,7 @@ public class ShutdownTask implements ConsumerTask {
if (lastCheckpointValue == null
|| !lastCheckpointValue.equals(ExtendedSequenceNumber.SHARD_END)) {
throw new IllegalArgumentException("Application didn't checkpoint at end of shard "
+ shardInfoIdProvider.apply(shardInfo) + ". Application must checkpoint upon shard end. " +
+ leaseKeyProvider.apply(shardInfo) + ". Application must checkpoint upon shard end. " +
"See ShardRecordProcessor.shardEnded javadocs for more information.");
}
}
@ -177,7 +174,7 @@ public class ShutdownTask implements ConsumerTask {
try {
action.run();
} catch (Exception e) {
throw new CustomerApplicationException("Customer application throws exception for shard " + shardInfoIdProvider.apply(shardInfo) +": ", e);
throw new CustomerApplicationException("Customer application throws exception for shard " + leaseKeyProvider.apply(shardInfo) +": ", e);
} finally {
MetricsUtil.addLatency(metricsScope, RECORD_PROCESSOR_SHUTDOWN_METRIC, startTime, MetricsLevel.SUMMARY);
}
@ -195,12 +192,12 @@ public class ShutdownTask implements ConsumerTask {
private void updateLeasesForChildShards()
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
final Lease currentLease = leaseCoordinator.getCurrentlyHeldLease(shardInfoIdProvider.apply(shardInfo));
final Lease currentLease = leaseCoordinator.getCurrentlyHeldLease(leaseKeyProvider.apply(shardInfo));
Set<String> childShardIds = childShards.stream().map(ChildShard::shardId).collect(Collectors.toSet());
final Lease updatedLease = currentLease.copy();
updatedLease.childShardIds(childShardIds);
leaseCoordinator.updateLease(updatedLease, UUID.fromString(shardInfo.concurrencyToken()), SHUTDOWN_TASK_OPERATION, shardInfoIdProvider.apply(shardInfo));
leaseCoordinator.updateLease(updatedLease, UUID.fromString(shardInfo.concurrencyToken()), SHUTDOWN_TASK_OPERATION, leaseKeyProvider.apply(shardInfo));
}
/*
@ -233,7 +230,7 @@ public class ShutdownTask implements ConsumerTask {
}
private void dropLease() {
Lease currentLease = leaseCoordinator.getCurrentlyHeldLease(shardInfo.shardId());
Lease currentLease = leaseCoordinator.getCurrentlyHeldLease(leaseKeyProvider.apply(shardInfo));
leaseCoordinator.dropLease(currentLease);
if(currentLease != null) {
log.warn("Dropped lease for shutting down ShardConsumer: " + currentLease.leaseKey());

View file

@ -195,11 +195,11 @@ public class ExceptionThrowingLeaseRefresher implements LeaseRefresher {
}
@Override
public Lease getLease(String shardId)
public Lease getLease(String leaseKey)
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
throwExceptions("getLease", ExceptionThrowingLeaseRefresherMethods.GETLEASE);
return leaseRefresher.getLease(shardId);
return leaseRefresher.getLease(leaseKey);
}
@Override
@ -216,7 +216,7 @@ public class ExceptionThrowingLeaseRefresher implements LeaseRefresher {
}
@Override
public ExtendedSequenceNumber getCheckpoint(final String shardId)
public ExtendedSequenceNumber getCheckpoint(final String leaseKey)
throws ProvisionedThroughputException, InvalidStateException, DependencyException {
return null;
}