Migrate KinesisProxy to ListShardsWithFilter for empty lease table case

This commit is contained in:
Micah Jaffe 2020-04-30 11:07:29 -07:00
parent e1111d487e
commit c7cd2f1e75
5 changed files with 118 additions and 17 deletions

View file

@ -26,6 +26,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import com.amazonaws.services.kinesis.model.ShardFilter;
import com.amazonaws.services.kinesis.model.ShardFilterType;
import com.amazonaws.util.CollectionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -130,7 +132,14 @@ class KinesisShardSyncer implements ShardSyncer {
boolean cleanupLeasesOfCompletedShards, boolean ignoreUnexpectedChildShards)
throws DependencyException, InvalidStateException, ProvisionedThroughputException,
KinesisClientLibIOException {
List<Shard> latestShards = getShardList(kinesisProxy);
// In the case where the lease table is empty, we want to synchronize the minimal amount of shards possible
// based on the given initial position.
// TODO: Implement shard list filtering on non-empty lease table case
final List<Shard> latestShards = leaseManager.isLeaseTableEmpty()
? getShardListAtInitialPosition(kinesisProxy, initialPosition)
: getCompleteShardList(kinesisProxy);
syncShardLeases(kinesisProxy, leaseManager, initialPosition, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, latestShards);
}
@ -156,7 +165,7 @@ class KinesisShardSyncer implements ShardSyncer {
KinesisClientLibIOException {
List<Shard> shards;
if(CollectionUtils.isNullOrEmpty(latestShards)) {
shards = getShardList(kinesisProxy);
shards = getCompleteShardList(kinesisProxy);
} else {
shards = latestShards;
}
@ -345,7 +354,7 @@ class KinesisShardSyncer implements ShardSyncer {
return shardIdToChildShardIdsMap;
}
private List<Shard> getShardList(IKinesisProxy kinesisProxy) throws KinesisClientLibIOException {
private List<Shard> getCompleteShardList(IKinesisProxy kinesisProxy) throws KinesisClientLibIOException {
List<Shard> shards = kinesisProxy.getShardList();
if (shards == null) {
throw new KinesisClientLibIOException(
@ -354,6 +363,44 @@ class KinesisShardSyncer implements ShardSyncer {
return shards;
}
private List<Shard> getShardListAtInitialPosition(IKinesisProxy kinesisProxy,
InitialPositionInStreamExtended initialPosition)
throws KinesisClientLibIOException {
final ShardFilter shardFilter = getShardFilterAtInitialPosition(initialPosition);
final List<Shard> shards = kinesisProxy.getShardListWithFilter(shardFilter);
if (shards == null) {
throw new KinesisClientLibIOException(
"Stream is not in ACTIVE OR UPDATING state - will retry getting the shard list.");
}
return shards;
}
private static ShardFilter getShardFilterAtInitialPosition(InitialPositionInStreamExtended initialPosition) {
ShardFilter shardFilter = new ShardFilter();
switch (initialPosition.getInitialPositionInStream()) {
case LATEST:
shardFilter = shardFilter.withType(ShardFilterType.AT_LATEST);
break;
case TRIM_HORIZON:
shardFilter = shardFilter.withType(ShardFilterType.AT_TRIM_HORIZON);
break;
case AT_TIMESTAMP:
shardFilter = shardFilter.withType(ShardFilterType.AT_TIMESTAMP)
.withTimestamp(initialPosition.getTimestamp());
break;
default:
throw new IllegalArgumentException(initialPosition.getInitialPositionInStream()
+ " is not a supported initial position in a Kinesis stream. Supported initial positions are"
+ " AT_LATEST, AT_TRIM_HORIZON, and AT_TIMESTAMP.");
}
return shardFilter;
}
/**
* Determine new leases to create and their initial checkpoint.
* Note: Package level access only for testing purposes.
@ -630,7 +677,7 @@ class KinesisShardSyncer implements ShardSyncer {
if (!garbageLeases.isEmpty()) {
LOG.info("Found " + garbageLeases.size() + " candidate leases for cleanup. Refreshing list of"
+ " Kinesis shards to pick up recent/latest shards");
List<Shard> currentShardList = getShardList(kinesisProxy);
List<Shard> currentShardList = getCompleteShardList(kinesisProxy);
Set<String> currentKinesisShardIds = new HashSet<>();
for (Shard shard : currentShardList) {
currentKinesisShardIds.add(shard.getShardId());

View file

@ -26,6 +26,7 @@ import com.amazonaws.services.kinesis.model.InvalidArgumentException;
import com.amazonaws.services.kinesis.model.PutRecordResult;
import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.model.ShardFilter;
/**
* Kinesis proxy interface. Operates on a single stream (set up at initialization).
@ -78,6 +79,17 @@ public interface IKinesisProxy {
*/
List<Shard> getShardList() throws ResourceNotFoundException;
/**
* Fetch a subset shards defined for the stream using a filter on the ListShards API. This can be used to
* discover new shards and consume data from them, while limiting the total number of shards returned for
* performance or efficiency reasons.
*
* @param shardFilter currently supported filter types are AT_LATEST, AT_TRIM_HORIZON, AT_TIMESTAMP.
* @return List of all shards in the Kinesis stream.
* @throws ResourceNotFoundException The Kinesis stream was not found.
*/
List<Shard> getShardListWithFilter(ShardFilter shardFilter) throws ResourceNotFoundException;
/**
* Used to verify during ShardConsumer shutdown if the provided shardId is for a shard that has been closed.
* @param shardId Id of the shard that needs to be verified.

View file

@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import com.amazonaws.services.kinesis.model.ShardFilter;
import com.amazonaws.util.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
@ -309,7 +310,7 @@ public class KinesisProxy implements IKinesisProxyExtended {
}
}
private ListShardsResult listShards(final String nextToken) {
private ListShardsResult listShards(final ShardFilter shardFilter, final String nextToken) {
final ListShardsRequest request = new ListShardsRequest();
request.setRequestCredentials(credentialsProvider.getCredentials());
if (StringUtils.isEmpty(nextToken)) {
@ -317,6 +318,11 @@ public class KinesisProxy implements IKinesisProxyExtended {
} else {
request.setNextToken(nextToken);
}
if (shardFilter != null) {
request.setShardFilter(shardFilter);
}
ListShardsResult result = null;
LimitExceededException lastException = null;
int remainingRetries = this.maxListShardsRetryAttempts;
@ -429,6 +435,14 @@ public class KinesisProxy implements IKinesisProxyExtended {
*/
@Override
public synchronized List<Shard> getShardList() {
return getShardListWithFilter(null);
}
/**
* {@inheritDoc}
*/
@Override
public synchronized List<Shard> getShardListWithFilter(ShardFilter shardFilter) {
if (shardIterationState == null) {
shardIterationState = new ShardIterationState();
}
@ -438,7 +452,7 @@ public class KinesisProxy implements IKinesisProxyExtended {
String nextToken = null;
do {
result = listShards(nextToken);
result = listShards(shardFilter, nextToken);
if (result == null) {
/*

View file

@ -28,6 +28,7 @@ import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
import com.amazonaws.services.kinesis.model.ShardFilter;
/**
* IKinesisProxy implementation that wraps another implementation and collects metrics.
@ -179,6 +180,22 @@ public class MetricsCollectingKinesisProxyDecorator implements IKinesisProxy {
}
}
/**
* {@inheritDoc}
*/
@Override
public List<Shard> getShardListWithFilter(ShardFilter shardFilter) throws ResourceNotFoundException {
long startTime = System.currentTimeMillis();
boolean success = false;
try {
List<Shard> response = other.getShardListWithFilter(shardFilter);
success = true;
return response;
} finally {
MetricsHelper.addSuccessAndLatency(getShardListMetric, startTime, success, MetricsLevel.DETAILED);
}
}
/**
* {@inheritDoc}
*/

View file

@ -33,6 +33,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import com.amazonaws.services.kinesis.model.ShardFilter;
import com.amazonaws.util.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
@ -425,6 +426,16 @@ public class KinesisLocalFileProxy implements IKinesisProxy {
return shards;
}
/**
* {@inheritDoc}
*/
@Override
public List<Shard> getShardListWithFilter(ShardFilter shardFilter) throws ResourceNotFoundException {
List<Shard> shards = new LinkedList<Shard>();
shards.addAll(shardList);
return shards;
}
/**
* {@inheritDoc}
*/