From c7cd2f1e75ae329131fd9c3279bebcaff74d7b13 Mon Sep 17 00:00:00 2001 From: Micah Jaffe Date: Thu, 30 Apr 2020 11:07:29 -0700 Subject: [PATCH] Migrate KinesisProxy to ListShardsWithFilter for empty lease table case --- .../lib/worker/KinesisShardSyncer.java | 55 +++++++++++++++++-- .../clientlibrary/proxies/IKinesisProxy.java | 12 ++++ .../clientlibrary/proxies/KinesisProxy.java | 40 +++++++++----- ...etricsCollectingKinesisProxyDecorator.java | 17 ++++++ .../proxies/KinesisLocalFileProxy.java | 11 ++++ 5 files changed, 118 insertions(+), 17 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardSyncer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardSyncer.java index c23fd678..ac57f9cb 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardSyncer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardSyncer.java @@ -26,6 +26,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import com.amazonaws.services.kinesis.model.ShardFilter; +import com.amazonaws.services.kinesis.model.ShardFilterType; import com.amazonaws.util.CollectionUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -130,7 +132,14 @@ class KinesisShardSyncer implements ShardSyncer { boolean cleanupLeasesOfCompletedShards, boolean ignoreUnexpectedChildShards) throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { - List latestShards = getShardList(kinesisProxy); + + // In the case where the lease table is empty, we want to synchronize the minimal amount of shards possible + // based on the given initial position. + // TODO: Implement shard list filtering on non-empty lease table case + final List latestShards = leaseManager.isLeaseTableEmpty() + ? getShardListAtInitialPosition(kinesisProxy, initialPosition) + : getCompleteShardList(kinesisProxy); + syncShardLeases(kinesisProxy, leaseManager, initialPosition, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, latestShards); } @@ -156,7 +165,7 @@ class KinesisShardSyncer implements ShardSyncer { KinesisClientLibIOException { List shards; if(CollectionUtils.isNullOrEmpty(latestShards)) { - shards = getShardList(kinesisProxy); + shards = getCompleteShardList(kinesisProxy); } else { shards = latestShards; } @@ -345,7 +354,7 @@ class KinesisShardSyncer implements ShardSyncer { return shardIdToChildShardIdsMap; } - private List getShardList(IKinesisProxy kinesisProxy) throws KinesisClientLibIOException { + private List getCompleteShardList(IKinesisProxy kinesisProxy) throws KinesisClientLibIOException { List shards = kinesisProxy.getShardList(); if (shards == null) { throw new KinesisClientLibIOException( @@ -354,6 +363,44 @@ class KinesisShardSyncer implements ShardSyncer { return shards; } + private List getShardListAtInitialPosition(IKinesisProxy kinesisProxy, + InitialPositionInStreamExtended initialPosition) + throws KinesisClientLibIOException { + + final ShardFilter shardFilter = getShardFilterAtInitialPosition(initialPosition); + final List shards = kinesisProxy.getShardListWithFilter(shardFilter); + + if (shards == null) { + throw new KinesisClientLibIOException( + "Stream is not in ACTIVE OR UPDATING state - will retry getting the shard list."); + } + + return shards; + } + + private static ShardFilter getShardFilterAtInitialPosition(InitialPositionInStreamExtended initialPosition) { + ShardFilter shardFilter = new ShardFilter(); + + switch (initialPosition.getInitialPositionInStream()) { + case LATEST: + shardFilter = shardFilter.withType(ShardFilterType.AT_LATEST); + break; + case TRIM_HORIZON: + shardFilter = shardFilter.withType(ShardFilterType.AT_TRIM_HORIZON); + break; + case AT_TIMESTAMP: + shardFilter = shardFilter.withType(ShardFilterType.AT_TIMESTAMP) + .withTimestamp(initialPosition.getTimestamp()); + break; + default: + throw new IllegalArgumentException(initialPosition.getInitialPositionInStream() + + " is not a supported initial position in a Kinesis stream. Supported initial positions are" + + " AT_LATEST, AT_TRIM_HORIZON, and AT_TIMESTAMP."); + } + + return shardFilter; + } + /** * Determine new leases to create and their initial checkpoint. * Note: Package level access only for testing purposes. @@ -630,7 +677,7 @@ class KinesisShardSyncer implements ShardSyncer { if (!garbageLeases.isEmpty()) { LOG.info("Found " + garbageLeases.size() + " candidate leases for cleanup. Refreshing list of" + " Kinesis shards to pick up recent/latest shards"); - List currentShardList = getShardList(kinesisProxy); + List currentShardList = getCompleteShardList(kinesisProxy); Set currentKinesisShardIds = new HashSet<>(); for (Shard shard : currentShardList) { currentKinesisShardIds.add(shard.getShardId()); diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/IKinesisProxy.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/IKinesisProxy.java index 6e148969..7921a321 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/IKinesisProxy.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/IKinesisProxy.java @@ -26,6 +26,7 @@ import com.amazonaws.services.kinesis.model.InvalidArgumentException; import com.amazonaws.services.kinesis.model.PutRecordResult; import com.amazonaws.services.kinesis.model.ResourceNotFoundException; import com.amazonaws.services.kinesis.model.Shard; +import com.amazonaws.services.kinesis.model.ShardFilter; /** * Kinesis proxy interface. Operates on a single stream (set up at initialization). @@ -78,6 +79,17 @@ public interface IKinesisProxy { */ List getShardList() throws ResourceNotFoundException; + /** + * Fetch a subset shards defined for the stream using a filter on the ListShards API. This can be used to + * discover new shards and consume data from them, while limiting the total number of shards returned for + * performance or efficiency reasons. + * + * @param shardFilter currently supported filter types are AT_LATEST, AT_TRIM_HORIZON, AT_TIMESTAMP. + * @return List of all shards in the Kinesis stream. + * @throws ResourceNotFoundException The Kinesis stream was not found. + */ + List getShardListWithFilter(ShardFilter shardFilter) throws ResourceNotFoundException; + /** * Used to verify during ShardConsumer shutdown if the provided shardId is for a shard that has been closed. * @param shardId Id of the shard that needs to be verified. diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java index 6717208b..0936bddb 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java @@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.stream.Collectors; +import com.amazonaws.services.kinesis.model.ShardFilter; import com.amazonaws.util.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; @@ -309,7 +310,7 @@ public class KinesisProxy implements IKinesisProxyExtended { } } - private ListShardsResult listShards(final String nextToken) { + private ListShardsResult listShards(final ShardFilter shardFilter, final String nextToken) { final ListShardsRequest request = new ListShardsRequest(); request.setRequestCredentials(credentialsProvider.getCredentials()); if (StringUtils.isEmpty(nextToken)) { @@ -317,6 +318,11 @@ public class KinesisProxy implements IKinesisProxyExtended { } else { request.setNextToken(nextToken); } + + if (shardFilter != null) { + request.setShardFilter(shardFilter); + } + ListShardsResult result = null; LimitExceededException lastException = null; int remainingRetries = this.maxListShardsRetryAttempts; @@ -429,29 +435,37 @@ public class KinesisProxy implements IKinesisProxyExtended { */ @Override public synchronized List getShardList() { + return getShardListWithFilter(null); + } + + /** + * {@inheritDoc} + */ + @Override + public synchronized List getShardListWithFilter(ShardFilter shardFilter) { if (shardIterationState == null) { shardIterationState = new ShardIterationState(); } - + if (isKinesisClient) { ListShardsResult result; String nextToken = null; - + do { - result = listShards(nextToken); - + result = listShards(shardFilter, nextToken); + if (result == null) { /* - * If listShards ever returns null, we should bail and return null. This indicates the stream is not - * in ACTIVE or UPDATING state and we may not have accurate/consistent information about the stream. - */ + * If listShards ever returns null, we should bail and return null. This indicates the stream is not + * in ACTIVE or UPDATING state and we may not have accurate/consistent information about the stream. + */ return null; } else { shardIterationState.update(result.getShards()); nextToken = result.getNextToken(); } } while (StringUtils.isNotEmpty(result.getNextToken())); - + } else { DescribeStreamResult response; @@ -459,10 +473,10 @@ public class KinesisProxy implements IKinesisProxyExtended { response = getStreamInfo(shardIterationState.getLastShardId()); if (response == null) { - /* - * If getStreamInfo ever returns null, we should bail and return null. This indicates the stream is not - * in ACTIVE or UPDATING state and we may not have accurate/consistent information about the stream. - */ + /* + * If getStreamInfo ever returns null, we should bail and return null. This indicates the stream is not + * in ACTIVE or UPDATING state and we may not have accurate/consistent information about the stream. + */ return null; } else { shardIterationState.update(response.getStreamDescription().getShards()); diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/MetricsCollectingKinesisProxyDecorator.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/MetricsCollectingKinesisProxyDecorator.java index 230ee710..fe6eb51e 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/MetricsCollectingKinesisProxyDecorator.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/MetricsCollectingKinesisProxyDecorator.java @@ -28,6 +28,7 @@ import com.amazonaws.services.kinesis.model.ResourceNotFoundException; import com.amazonaws.services.kinesis.model.Shard; import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper; import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; +import com.amazonaws.services.kinesis.model.ShardFilter; /** * IKinesisProxy implementation that wraps another implementation and collects metrics. @@ -179,6 +180,22 @@ public class MetricsCollectingKinesisProxyDecorator implements IKinesisProxy { } } + /** + * {@inheritDoc} + */ + @Override + public List getShardListWithFilter(ShardFilter shardFilter) throws ResourceNotFoundException { + long startTime = System.currentTimeMillis(); + boolean success = false; + try { + List response = other.getShardListWithFilter(shardFilter); + success = true; + return response; + } finally { + MetricsHelper.addSuccessAndLatency(getShardListMetric, startTime, success, MetricsLevel.DETAILED); + } + } + /** * {@inheritDoc} */ diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxy.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxy.java index d78f5ca0..ed57eedf 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxy.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxy.java @@ -33,6 +33,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import com.amazonaws.services.kinesis.model.ShardFilter; import com.amazonaws.util.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; @@ -425,6 +426,16 @@ public class KinesisLocalFileProxy implements IKinesisProxy { return shards; } + /** + * {@inheritDoc} + */ + @Override + public List getShardListWithFilter(ShardFilter shardFilter) throws ResourceNotFoundException { + List shards = new LinkedList(); + shards.addAll(shardList); + return shards; + } + /** * {@inheritDoc} */