Reusing code while determining leaskekey

This commit is contained in:
Ashwin Giridharan 2020-05-06 11:06:23 -07:00
parent a6922d9d7e
commit 9e97edd273
5 changed files with 10 additions and 39 deletions

View file

@ -851,8 +851,7 @@ public class Scheduler implements Runnable {
if (!firstItem) { if (!firstItem) {
builder.append(", "); builder.append(", ");
} }
builder.append(shardInfo.streamIdentifierSerOpt().map(s -> s + ":" + shardInfo.shardId()) builder.append(ShardInfo.getLeaseKey(shardInfo));
.orElse(shardInfo.shardId()));
firstItem = false; firstItem = false;
} }
slog.info("Current stream shard assignments: " + builder.toString()); slog.info("Current stream shard assignments: " + builder.toString());
@ -948,8 +947,7 @@ public class Scheduler implements Runnable {
ShardConsumer consumer = shardInfoShardConsumerMap.get(shard); ShardConsumer consumer = shardInfoShardConsumerMap.get(shard);
if (consumer.leaseLost()) { if (consumer.leaseLost()) {
shardInfoShardConsumerMap.remove(shard); shardInfoShardConsumerMap.remove(shard);
log.debug("Removed consumer for {} as lease has been lost", log.debug("Removed consumer for {} as lease has been lost", ShardInfo.getLeaseKey(shard));
shard.streamIdentifierSerOpt().map(s -> s + ":" + shard.shardId()).orElse(shard.shardId()));
} else { } else {
consumer.executeLifecycle(); consumer.executeLifecycle();
} }

View file

@ -54,8 +54,7 @@ public class BlockOnParentShardTask implements ConsumerTask {
@Override @Override
public TaskResult call() { public TaskResult call() {
Exception exception = null; Exception exception = null;
final String shardInfoId = shardInfo.streamIdentifierSerOpt().map(s -> s + ":" + shardInfo.shardId()) final String shardInfoId = ShardInfo.getLeaseKey(shardInfo);
.orElse(shardInfo.shardId());
try { try {
boolean blockedOnParentShard = false; boolean blockedOnParentShard = false;
for (String shardId : shardInfo.parentShardIds()) { for (String shardId : shardInfo.parentShardIds()) {

View file

@ -76,8 +76,7 @@ public class ProcessTask implements ConsumerTask {
@NonNull AggregatorUtil aggregatorUtil, @NonNull AggregatorUtil aggregatorUtil,
@NonNull MetricsFactory metricsFactory) { @NonNull MetricsFactory metricsFactory) {
this.shardInfo = shardInfo; this.shardInfo = shardInfo;
this.shardInfoId = shardInfo.streamIdentifierSerOpt().map(s -> s + ":" + shardInfo.shardId()) this.shardInfoId = ShardInfo.getLeaseKey(shardInfo);
.orElse(shardInfo.shardId());
this.shardRecordProcessor = shardRecordProcessor; this.shardRecordProcessor = shardRecordProcessor;
this.recordProcessorCheckpointer = recordProcessorCheckpointer; this.recordProcessorCheckpointer = recordProcessorCheckpointer;
this.backoffTimeMillis = backoffTimeMillis; this.backoffTimeMillis = backoffTimeMillis;

View file

@ -24,6 +24,7 @@ import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Subscriber; import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription; import org.reactivestreams.Subscription;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RecordsRetrieved; import software.amazon.kinesis.retrieval.RecordsRetrieved;
import software.amazon.kinesis.retrieval.RetryableRetrievalException; import software.amazon.kinesis.retrieval.RetryableRetrievalException;
@ -70,8 +71,7 @@ class ShardConsumerSubscriber implements Subscriber<RecordsRetrieved> {
this.bufferSize = bufferSize; this.bufferSize = bufferSize;
this.shardConsumer = shardConsumer; this.shardConsumer = shardConsumer;
this.readTimeoutsToIgnoreBeforeWarning = readTimeoutsToIgnoreBeforeWarning; this.readTimeoutsToIgnoreBeforeWarning = readTimeoutsToIgnoreBeforeWarning;
this.shardInfoId = shardConsumer.shardInfo().streamIdentifierSerOpt() this.shardInfoId = ShardInfo.getLeaseKey(shardConsumer.shardInfo());
.map(s -> s + ":" + shardConsumer.shardInfo().shardId()).orElse(shardConsumer.shardInfo().shardId());
} }

View file

@ -15,15 +15,10 @@
package software.amazon.kinesis.lifecycle; package software.amazon.kinesis.lifecycle;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import java.util.List;
import java.util.function.Function;
import lombok.NonNull; import lombok.NonNull;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import software.amazon.awssdk.services.kinesis.model.ChildShard; import software.amazon.awssdk.services.kinesis.model.ChildShard;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
@ -47,8 +42,10 @@ import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
@ -89,8 +86,8 @@ public class ShutdownTask implements ConsumerTask {
private final List<ChildShard> childShards; private final List<ChildShard> childShards;
private static final Function<ShardInfo, String> leaseKeyProvider = shardInfo -> shardInfo private static final Function<ShardInfo, String> leaseKeyProvider = shardInfo -> ShardInfo.getLeaseKey(shardInfo);
.streamIdentifierSerOpt().map(s -> s + ":" + shardInfo.shardId()).orElse(shardInfo.shardId());
/* /*
* Invokes ShardRecordProcessor shutdown() API. * Invokes ShardRecordProcessor shutdown() API.
* (non-Javadoc) * (non-Javadoc)
@ -218,26 +215,4 @@ public class ShutdownTask implements ConsumerTask {
return reason; return reason;
} }
private boolean isShardInContextParentOfAny(List<Shard> shards) {
for(Shard shard : shards) {
if (isChildShardOfShardInContext(shard)) {
return true;
}
}
return false;
}
private boolean isChildShardOfShardInContext(Shard shard) {
return (StringUtils.equals(shard.parentShardId(), shardInfo.shardId())
|| StringUtils.equals(shard.adjacentParentShardId(), shardInfo.shardId()));
}
private void dropLease() {
Lease currentLease = leaseCoordinator.getCurrentlyHeldLease(leaseKeyProvider.apply(shardInfo));
leaseCoordinator.dropLease(currentLease);
if(currentLease != null) {
log.warn("Dropped lease for shutting down ShardConsumer: " + currentLease.leaseKey());
}
}
} }