diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCoordinator.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCoordinator.java index 880fab4c..6437f339 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCoordinator.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCoordinator.java @@ -92,6 +92,9 @@ public interface LeaseCoordinator { * * @param lease lease object containing updated values * @param concurrencyToken obtained by calling Lease.concurrencyToken for a currently held lease + * @param operation that performs updateLease + * @param singleStreamShardId for metrics emission in single stream mode. MultiStream mode will get the + * shardId from the lease object * * @return true if update succeeded, false otherwise * @@ -99,7 +102,7 @@ public interface LeaseCoordinator { * @throws ProvisionedThroughputException if DynamoDB update fails due to lack of capacity * @throws DependencyException if DynamoDB update fails in an unexpected way */ - boolean updateLease(Lease lease, UUID concurrencyToken, String operation, String shardId) + boolean updateLease(Lease lease, UUID concurrencyToken, String operation, String singleStreamShardId) throws DependencyException, InvalidStateException, ProvisionedThroughputException; /** diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java index f45c4cc2..fc3aba8b 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java @@ -99,7 +99,7 @@ public interface LeaseRefresher { throws DependencyException, InvalidStateException, ProvisionedThroughputException; /** - * @param shardId Get the lease for this shardId + * @param leaseKey Get the lease for this leasekey * * @throws InvalidStateException if lease table does not exist * @throws ProvisionedThroughputException if DynamoDB get fails due to lack of capacity @@ -107,7 +107,7 @@ public interface LeaseRefresher { * * @return lease for the specified shardId, or null if one doesn't exist */ - Lease getLease(String shardId) throws DependencyException, InvalidStateException, ProvisionedThroughputException; + Lease getLease(String leaseKey) throws DependencyException, InvalidStateException, ProvisionedThroughputException; /** * Renew a lease by incrementing the lease counter. Conditional on the leaseCounter in DynamoDB matching the leaseCounter @@ -206,13 +206,13 @@ public interface LeaseRefresher { * Gets the current checkpoint of the shard. This is useful in the resharding use case * where we will wait for the parent shard to complete before starting on the records from a child shard. * - * @param shardId Checkpoint of this shard will be returned + * @param leaseKey Checkpoint of this shard will be returned * @return Checkpoint of this shard, or null if the shard record doesn't exist. * * @throws ProvisionedThroughputException if DynamoDB update fails due to lack of capacity * @throws InvalidStateException if lease table does not exist * @throws DependencyException if DynamoDB update fails in an unexpected way */ - ExtendedSequenceNumber getCheckpoint(String shardId) + ExtendedSequenceNumber getCheckpoint(String leaseKey) throws ProvisionedThroughputException, InvalidStateException, DependencyException; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRenewer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRenewer.java index 9ed5616f..25ec5b45 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRenewer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRenewer.java @@ -86,6 +86,9 @@ public interface LeaseRenewer { * * @param lease lease object containing updated data * @param concurrencyToken obtained by calling Lease.concurrencyToken for a currently held lease + * @param operation that performs updateLease + * @param singleStreamShardId shardId for metrics emission in single stream mode. MultiStream mode will get the + * shardId from the lease object * * @return true if update succeeds, false otherwise * @@ -93,7 +96,7 @@ public interface LeaseRenewer { * @throws ProvisionedThroughputException if DynamoDB update fails due to lack of capacity * @throws DependencyException if DynamoDB update fails in an unexpected way */ - boolean updateLease(Lease lease, UUID concurrencyToken, String operation, String shardId) + boolean updateLease(Lease lease, UUID concurrencyToken, String operation, String singleStreamShardId) throws DependencyException, InvalidStateException, ProvisionedThroughputException; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java index 3b7057de..78673f66 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java @@ -349,8 +349,8 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator { @Override public boolean updateLease(final Lease lease, final UUID concurrencyToken, final String operation, - final String shardId) throws DependencyException, InvalidStateException, ProvisionedThroughputException { - return leaseRenewer.updateLease(lease, concurrencyToken, operation, shardId); + final String singleStreamShardId) throws DependencyException, InvalidStateException, ProvisionedThroughputException { + return leaseRenewer.updateLease(lease, concurrencyToken, operation, singleStreamShardId); } /** diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java index c5bb1f66..67e5abbe 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java @@ -663,10 +663,10 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { * {@inheritDoc} */ @Override - public ExtendedSequenceNumber getCheckpoint(String shardId) + public ExtendedSequenceNumber getCheckpoint(String leaseKey) throws ProvisionedThroughputException, InvalidStateException, DependencyException { ExtendedSequenceNumber checkpoint = null; - Lease lease = getLease(shardId); + Lease lease = getLease(leaseKey); if (lease != null) { checkpoint = lease.checkpoint(); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewer.java index a1e0afcc..ecb0fc26 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewer.java @@ -271,7 +271,7 @@ public class DynamoDBLeaseRenewer implements LeaseRenewer { * {@inheritDoc} */ @Override - public boolean updateLease(Lease lease, UUID concurrencyToken, @NonNull String operation, String shardId) + public boolean updateLease(Lease lease, UUID concurrencyToken, @NonNull String operation, String singleStreamShardId) throws DependencyException, InvalidStateException, ProvisionedThroughputException { verifyNotNull(lease, "lease cannot be null"); verifyNotNull(lease.leaseKey(), "leaseKey cannot be null"); @@ -298,12 +298,14 @@ public class DynamoDBLeaseRenewer implements LeaseRenewer { } final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, operation); - if (StringUtils.isNotEmpty(shardId)) { + if (StringUtils.isNotEmpty(singleStreamShardId)) { if(lease instanceof MultiStreamLease) { MetricsUtil.addStreamId(scope, StreamIdentifier.multiStreamInstance(((MultiStreamLease) lease).streamIdentifier())); + MetricsUtil.addShardId(scope, ((MultiStreamLease) lease).shardId()); + } else { + MetricsUtil.addShardId(scope, singleStreamShardId); } - MetricsUtil.addShardId(scope, shardId); } long startTime = System.currentTimeMillis(); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index bf07f6e2..9111b946 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -47,11 +47,8 @@ import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; -import java.util.HashSet; -import java.util.List; import java.util.Set; import java.util.UUID; -import java.util.function.Function; import java.util.stream.Collectors; /** @@ -92,7 +89,7 @@ public class ShutdownTask implements ConsumerTask { private final List childShards; - private static final Function shardInfoIdProvider = shardInfo -> shardInfo + private static final Function leaseKeyProvider = shardInfo -> shardInfo .streamIdentifierSerOpt().map(s -> s + ":" + shardInfo.shardId()).orElse(shardInfo.shardId()); /* * Invokes ShardRecordProcessor shutdown() API. @@ -110,7 +107,7 @@ public class ShutdownTask implements ConsumerTask { try { try { log.debug("Invoking shutdown() for shard {}, concurrencyToken {}. Shutdown reason: {}", - shardInfoIdProvider.apply(shardInfo), shardInfo.concurrencyToken(), reason); + leaseKeyProvider.apply(shardInfo), shardInfo.concurrencyToken(), reason); final long startTime = System.currentTimeMillis(); if (reason == ShutdownReason.SHARD_END) { @@ -123,7 +120,7 @@ public class ShutdownTask implements ConsumerTask { createLeasesForChildShardsIfNotExist(); updateLeasesForChildShards(); } else { - log.warn("Shard {} no longer exists. Shutting down consumer with SHARD_END reason without creating leases for child shards.", shardInfoIdProvider.apply(shardInfo)); + log.warn("Shard {} no longer exists. Shutting down consumer with SHARD_END reason without creating leases for child shards.", leaseKeyProvider.apply(shardInfo)); } recordProcessorCheckpointer @@ -136,23 +133,23 @@ public class ShutdownTask implements ConsumerTask { throwOnApplicationException(() -> shardRecordProcessor.leaseLost(LeaseLostInput.builder().build()), scope, startTime); } - log.debug("Shutting down retrieval strategy for shard {}.", shardInfoIdProvider.apply(shardInfo)); + log.debug("Shutting down retrieval strategy for shard {}.", leaseKeyProvider.apply(shardInfo)); recordsPublisher.shutdown(); - log.debug("Record processor completed shutdown() for shard {}", shardInfoIdProvider.apply(shardInfo)); + log.debug("Record processor completed shutdown() for shard {}", leaseKeyProvider.apply(shardInfo)); return new TaskResult(null); } catch (Exception e) { if (e instanceof CustomerApplicationException) { - log.error("Shard {}: Application exception. ", shardInfoIdProvider.apply(shardInfo), e); + log.error("Shard {}: Application exception. ", leaseKeyProvider.apply(shardInfo), e); } else { - log.error("Shard {}: Caught exception: ", shardInfoIdProvider.apply(shardInfo), e); + log.error("Shard {}: Caught exception: ", leaseKeyProvider.apply(shardInfo), e); } exception = e; // backoff if we encounter an exception. try { Thread.sleep(this.backoffTimeMillis); } catch (InterruptedException ie) { - log.debug("Shard {}: Interrupted sleep", shardInfoIdProvider.apply(shardInfo), ie); + log.debug("Shard {}: Interrupted sleep", leaseKeyProvider.apply(shardInfo), ie); } } } finally { @@ -168,7 +165,7 @@ public class ShutdownTask implements ConsumerTask { if (lastCheckpointValue == null || !lastCheckpointValue.equals(ExtendedSequenceNumber.SHARD_END)) { throw new IllegalArgumentException("Application didn't checkpoint at end of shard " - + shardInfoIdProvider.apply(shardInfo) + ". Application must checkpoint upon shard end. " + + + leaseKeyProvider.apply(shardInfo) + ". Application must checkpoint upon shard end. " + "See ShardRecordProcessor.shardEnded javadocs for more information."); } } @@ -177,7 +174,7 @@ public class ShutdownTask implements ConsumerTask { try { action.run(); } catch (Exception e) { - throw new CustomerApplicationException("Customer application throws exception for shard " + shardInfoIdProvider.apply(shardInfo) +": ", e); + throw new CustomerApplicationException("Customer application throws exception for shard " + leaseKeyProvider.apply(shardInfo) +": ", e); } finally { MetricsUtil.addLatency(metricsScope, RECORD_PROCESSOR_SHUTDOWN_METRIC, startTime, MetricsLevel.SUMMARY); } @@ -195,12 +192,12 @@ public class ShutdownTask implements ConsumerTask { private void updateLeasesForChildShards() throws DependencyException, InvalidStateException, ProvisionedThroughputException { - final Lease currentLease = leaseCoordinator.getCurrentlyHeldLease(shardInfoIdProvider.apply(shardInfo)); + final Lease currentLease = leaseCoordinator.getCurrentlyHeldLease(leaseKeyProvider.apply(shardInfo)); Set childShardIds = childShards.stream().map(ChildShard::shardId).collect(Collectors.toSet()); final Lease updatedLease = currentLease.copy(); updatedLease.childShardIds(childShardIds); - leaseCoordinator.updateLease(updatedLease, UUID.fromString(shardInfo.concurrencyToken()), SHUTDOWN_TASK_OPERATION, shardInfoIdProvider.apply(shardInfo)); + leaseCoordinator.updateLease(updatedLease, UUID.fromString(shardInfo.concurrencyToken()), SHUTDOWN_TASK_OPERATION, leaseKeyProvider.apply(shardInfo)); } /* @@ -233,7 +230,7 @@ public class ShutdownTask implements ConsumerTask { } private void dropLease() { - Lease currentLease = leaseCoordinator.getCurrentlyHeldLease(shardInfo.shardId()); + Lease currentLease = leaseCoordinator.getCurrentlyHeldLease(leaseKeyProvider.apply(shardInfo)); leaseCoordinator.dropLease(currentLease); if(currentLease != null) { log.warn("Dropped lease for shutting down ShardConsumer: " + currentLease.leaseKey()); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ExceptionThrowingLeaseRefresher.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ExceptionThrowingLeaseRefresher.java index d9d7d01e..81a49839 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ExceptionThrowingLeaseRefresher.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ExceptionThrowingLeaseRefresher.java @@ -195,11 +195,11 @@ public class ExceptionThrowingLeaseRefresher implements LeaseRefresher { } @Override - public Lease getLease(String shardId) + public Lease getLease(String leaseKey) throws DependencyException, InvalidStateException, ProvisionedThroughputException { throwExceptions("getLease", ExceptionThrowingLeaseRefresherMethods.GETLEASE); - return leaseRefresher.getLease(shardId); + return leaseRefresher.getLease(leaseKey); } @Override @@ -216,7 +216,7 @@ public class ExceptionThrowingLeaseRefresher implements LeaseRefresher { } @Override - public ExtendedSequenceNumber getCheckpoint(final String shardId) + public ExtendedSequenceNumber getCheckpoint(final String leaseKey) throws ProvisionedThroughputException, InvalidStateException, DependencyException { return null; }