Add new metric to be emitted on lease creation (#1060)
* Add new metric to be emitted on lease creation * Rebase changes from master --------- Co-authored-by: Noah Thomas <noahbt@amazon.com>
This commit is contained in:
parent
5bbb9768b5
commit
04a121a811
2 changed files with 20 additions and 6 deletions
|
|
@ -160,16 +160,19 @@ public class HierarchicalShardSyncer {
|
||||||
final Set<Lease> createdLeases = new HashSet<>();
|
final Set<Lease> createdLeases = new HashSet<>();
|
||||||
|
|
||||||
for (Lease lease : newLeasesToCreate) {
|
for (Lease lease : newLeasesToCreate) {
|
||||||
long startTime = System.currentTimeMillis();
|
final long startTime = System.currentTimeMillis();
|
||||||
boolean success = false;
|
boolean success = false;
|
||||||
try {
|
try {
|
||||||
if(leaseRefresher.createLeaseIfNotExists(lease)) {
|
if(leaseRefresher.createLeaseIfNotExists(lease)) {
|
||||||
createdLeases.add(lease);
|
createdLeases.add(lease);
|
||||||
}
|
}
|
||||||
success = true;
|
success = true;
|
||||||
}
|
} finally {
|
||||||
finally {
|
|
||||||
MetricsUtil.addSuccessAndLatency(scope, "CreateLease", success, startTime, MetricsLevel.DETAILED);
|
MetricsUtil.addSuccessAndLatency(scope, "CreateLease", success, startTime, MetricsLevel.DETAILED);
|
||||||
|
if (lease.checkpoint() != null) {
|
||||||
|
final String metricName = lease.checkpoint().isSentinelCheckpoint() ? lease.checkpoint().sequenceNumber() : "SEQUENCE_NUMBER";
|
||||||
|
MetricsUtil.addSuccess(scope, "CreateLease_" + metricName, true, MetricsLevel.DETAILED);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
log.info("{} - Newly created leases {}: {}", streamIdentifier, createdLeases.size(), createdLeases);
|
log.info("{} - Newly created leases {}: {}", streamIdentifier, createdLeases.size(), createdLeases);
|
||||||
|
|
|
||||||
|
|
@ -181,7 +181,7 @@ public class ShutdownTask implements ConsumerTask {
|
||||||
+ " : Lease not owned by the current worker. Leaving ShardEnd handling to new owner.");
|
+ " : Lease not owned by the current worker. Leaving ShardEnd handling to new owner.");
|
||||||
}
|
}
|
||||||
if (!CollectionUtils.isNullOrEmpty(childShards)) {
|
if (!CollectionUtils.isNullOrEmpty(childShards)) {
|
||||||
createLeasesForChildShardsIfNotExist();
|
createLeasesForChildShardsIfNotExist(scope);
|
||||||
updateLeaseWithChildShards(currentShardLease);
|
updateLeaseWithChildShards(currentShardLease);
|
||||||
}
|
}
|
||||||
final LeasePendingDeletion leasePendingDeletion = new LeasePendingDeletion(streamIdentifier, currentShardLease,
|
final LeasePendingDeletion leasePendingDeletion = new LeasePendingDeletion(streamIdentifier, currentShardLease,
|
||||||
|
|
@ -239,7 +239,7 @@ public class ShutdownTask implements ConsumerTask {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void createLeasesForChildShardsIfNotExist()
|
private void createLeasesForChildShardsIfNotExist(MetricsScope scope)
|
||||||
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
|
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
|
||||||
// For child shard resulted from merge of two parent shards, verify if both the parents are either present or
|
// For child shard resulted from merge of two parent shards, verify if both the parents are either present or
|
||||||
// not present in the lease table before creating the lease entry.
|
// not present in the lease table before creating the lease entry.
|
||||||
|
|
@ -272,7 +272,18 @@ public class ShutdownTask implements ConsumerTask {
|
||||||
if(leaseCoordinator.leaseRefresher().getLease(leaseKey) == null) {
|
if(leaseCoordinator.leaseRefresher().getLease(leaseKey) == null) {
|
||||||
log.debug("{} - Shard {} - Attempting to create lease for child shard {}", shardDetector.streamIdentifier(), shardInfo.shardId(), leaseKey);
|
log.debug("{} - Shard {} - Attempting to create lease for child shard {}", shardDetector.streamIdentifier(), shardInfo.shardId(), leaseKey);
|
||||||
final Lease leaseToCreate = hierarchicalShardSyncer.createLeaseForChildShard(childShard, shardDetector.streamIdentifier());
|
final Lease leaseToCreate = hierarchicalShardSyncer.createLeaseForChildShard(childShard, shardDetector.streamIdentifier());
|
||||||
|
final long startTime = System.currentTimeMillis();
|
||||||
|
boolean success = false;
|
||||||
|
try {
|
||||||
leaseCoordinator.leaseRefresher().createLeaseIfNotExists(leaseToCreate);
|
leaseCoordinator.leaseRefresher().createLeaseIfNotExists(leaseToCreate);
|
||||||
|
success = true;
|
||||||
|
} finally {
|
||||||
|
MetricsUtil.addSuccessAndLatency(scope, "CreateLease", success, startTime, MetricsLevel.DETAILED);
|
||||||
|
if (leaseToCreate.checkpoint() != null) {
|
||||||
|
final String metricName = leaseToCreate.checkpoint().isSentinelCheckpoint() ? leaseToCreate.checkpoint().sequenceNumber() : "SEQUENCE_NUMBER";
|
||||||
|
MetricsUtil.addSuccess(scope, "CreateLease_" + metricName, true, MetricsLevel.DETAILED);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
log.info("{} - Shard {}: Created child shard lease: {}", shardDetector.streamIdentifier(), shardInfo.shardId(), leaseToCreate);
|
log.info("{} - Shard {}: Created child shard lease: {}", shardDetector.streamIdentifier(), shardInfo.shardId(), leaseToCreate);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue