Code refactoring - 1

This commit is contained in:
Ashwin Giridharan 2020-06-15 21:20:33 -07:00
parent 37281e9493
commit b60dd60f35
2 changed files with 9 additions and 13 deletions

View file

@ -298,14 +298,12 @@ public class DynamoDBLeaseRenewer implements LeaseRenewer {
} }
final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, operation); final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, operation);
if (StringUtils.isNotEmpty(singleStreamShardId)) { if (lease instanceof MultiStreamLease) {
if(lease instanceof MultiStreamLease) { MetricsUtil.addStreamId(scope,
MetricsUtil.addStreamId(scope, StreamIdentifier.multiStreamInstance(((MultiStreamLease) lease).streamIdentifier()));
StreamIdentifier.multiStreamInstance(((MultiStreamLease) lease).streamIdentifier())); MetricsUtil.addShardId(scope, ((MultiStreamLease) lease).shardId());
MetricsUtil.addShardId(scope, ((MultiStreamLease) lease).shardId()); } else if (StringUtils.isNotEmpty(singleStreamShardId)) {
} else { MetricsUtil.addShardId(scope, singleStreamShardId);
MetricsUtil.addShardId(scope, singleStreamShardId);
}
} }
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();

View file

@ -132,9 +132,6 @@ public class ShutdownTask implements ConsumerTask {
final Lease leaseFromDdb = Optional.ofNullable(leaseCoordinator.leaseRefresher().getLease(leaseKeyProvider.apply(shardInfo))) final Lease leaseFromDdb = Optional.ofNullable(leaseCoordinator.leaseRefresher().getLease(leaseKeyProvider.apply(shardInfo)))
.orElseThrow(() -> new IllegalStateException("Lease for shard " + leaseKeyProvider.apply(shardInfo) + " does not exist.")); .orElseThrow(() -> new IllegalStateException("Lease for shard " + leaseKeyProvider.apply(shardInfo) + " does not exist."));
if (!leaseFromDdb.checkpoint().equals(ExtendedSequenceNumber.SHARD_END)) { if (!leaseFromDdb.checkpoint().equals(ExtendedSequenceNumber.SHARD_END)) {
recordProcessorCheckpointer
.sequenceNumberAtShardEnd(recordProcessorCheckpointer.largestPermittedCheckpointValue());
recordProcessorCheckpointer.largestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END);
// Call the shardRecordsProcessor to checkpoint with SHARD_END sequence number. // Call the shardRecordsProcessor to checkpoint with SHARD_END sequence number.
// The shardEnded is implemented by customer. We should validate if the SHARD_END checkpointing is successful after calling shardEnded. // The shardEnded is implemented by customer. We should validate if the SHARD_END checkpointing is successful after calling shardEnded.
throwOnApplicationException(() -> applicationCheckpointAndVerification(), scope, startTime); throwOnApplicationException(() -> applicationCheckpointAndVerification(), scope, startTime);
@ -167,8 +164,6 @@ public class ShutdownTask implements ConsumerTask {
} catch (Exception e) { } catch (Exception e) {
log.error("Unable to cleanup garbage lease {} for {}", currentShardLease.leaseKey(), log.error("Unable to cleanup garbage lease {} for {}", currentShardLease.leaseKey(),
streamIdentifier, e); streamIdentifier, e);
} finally {
} }
} }
} }
@ -203,6 +198,9 @@ public class ShutdownTask implements ConsumerTask {
} }
private void applicationCheckpointAndVerification() { private void applicationCheckpointAndVerification() {
recordProcessorCheckpointer
.sequenceNumberAtShardEnd(recordProcessorCheckpointer.largestPermittedCheckpointValue());
recordProcessorCheckpointer.largestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END);
shardRecordProcessor.shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build()); shardRecordProcessor.shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build());
final ExtendedSequenceNumber lastCheckpointValue = recordProcessorCheckpointer.lastCheckpointValue(); final ExtendedSequenceNumber lastCheckpointValue = recordProcessorCheckpointer.lastCheckpointValue();
if (lastCheckpointValue == null if (lastCheckpointValue == null