From b60dd60f35b956b7d97af499050a68e28100a593 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Mon, 15 Jun 2020 21:20:33 -0700 Subject: [PATCH] Code refactoring - 1 --- .../leases/dynamodb/DynamoDBLeaseRenewer.java | 14 ++++++-------- .../amazon/kinesis/lifecycle/ShutdownTask.java | 8 +++----- 2 files changed, 9 insertions(+), 13 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewer.java index ecb0fc26..e457b5ec 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewer.java @@ -298,14 +298,12 @@ public class DynamoDBLeaseRenewer implements LeaseRenewer { } final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, operation); - if (StringUtils.isNotEmpty(singleStreamShardId)) { - if(lease instanceof MultiStreamLease) { - MetricsUtil.addStreamId(scope, - StreamIdentifier.multiStreamInstance(((MultiStreamLease) lease).streamIdentifier())); - MetricsUtil.addShardId(scope, ((MultiStreamLease) lease).shardId()); - } else { - MetricsUtil.addShardId(scope, singleStreamShardId); - } + if (lease instanceof MultiStreamLease) { + MetricsUtil.addStreamId(scope, + StreamIdentifier.multiStreamInstance(((MultiStreamLease) lease).streamIdentifier())); + MetricsUtil.addShardId(scope, ((MultiStreamLease) lease).shardId()); + } else if (StringUtils.isNotEmpty(singleStreamShardId)) { + MetricsUtil.addShardId(scope, singleStreamShardId); } long startTime = System.currentTimeMillis(); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index 73745168..64e394b3 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -132,9 +132,6 @@ public class ShutdownTask implements ConsumerTask { final Lease leaseFromDdb = Optional.ofNullable(leaseCoordinator.leaseRefresher().getLease(leaseKeyProvider.apply(shardInfo))) .orElseThrow(() -> new IllegalStateException("Lease for shard " + leaseKeyProvider.apply(shardInfo) + " does not exist.")); if (!leaseFromDdb.checkpoint().equals(ExtendedSequenceNumber.SHARD_END)) { - recordProcessorCheckpointer - .sequenceNumberAtShardEnd(recordProcessorCheckpointer.largestPermittedCheckpointValue()); - recordProcessorCheckpointer.largestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END); // Call the shardRecordsProcessor to checkpoint with SHARD_END sequence number. // The shardEnded is implemented by customer. We should validate if the SHARD_END checkpointing is successful after calling shardEnded. throwOnApplicationException(() -> applicationCheckpointAndVerification(), scope, startTime); @@ -167,8 +164,6 @@ public class ShutdownTask implements ConsumerTask { } catch (Exception e) { log.error("Unable to cleanup garbage lease {} for {}", currentShardLease.leaseKey(), streamIdentifier, e); - } finally { - } } } @@ -203,6 +198,9 @@ public class ShutdownTask implements ConsumerTask { } private void applicationCheckpointAndVerification() { + recordProcessorCheckpointer + .sequenceNumberAtShardEnd(recordProcessorCheckpointer.largestPermittedCheckpointValue()); + recordProcessorCheckpointer.largestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END); shardRecordProcessor.shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build()); final ExtendedSequenceNumber lastCheckpointValue = recordProcessorCheckpointer.lastCheckpointValue(); if (lastCheckpointValue == null