Adding query support for multistream

This commit is contained in:
Joshua Kim 2020-04-15 05:24:18 -04:00
parent 78a877dc0b
commit 9f639b2342
7 changed files with 56 additions and 32 deletions

View file

@ -142,8 +142,7 @@ public class HierarchicalShardSyncer {
assertAllParentShardsAreClosed(inconsistentShardIds);
}
final List<Lease> currentLeases = isMultiStreamMode ?
getLeasesForStream(shardDetector.streamIdentifier(), leaseRefresher) :
leaseRefresher.listLeases();
leaseRefresher.listLeasesForStream(shardDetector.streamIdentifier()) : leaseRefresher.listLeases();
final MultiStreamArgs multiStreamArgs = new MultiStreamArgs(isMultiStreamMode, shardDetector.streamIdentifier());
final LeaseSynchronizer leaseSynchronizer = isLeaseTableEmpty ? new EmptyLeaseTableSynchronizer() :
new NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap);
@ -171,29 +170,6 @@ public class HierarchicalShardSyncer {
}
}
// CHECKSTYLE:ON CyclomaticComplexity
/** Note: This method has package level access solely for testing purposes.
*
* @param streamIdentifier We'll use this stream identifier to filter leases
* @param leaseRefresher Used to fetch leases
* @return Return list of leases (corresponding to shards) of the specified stream.
* @throws DependencyException
* @throws InvalidStateException
* @throws ProvisionedThroughputException
*/
static List<Lease> getLeasesForStream(StreamIdentifier streamIdentifier,
LeaseRefresher leaseRefresher)
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
List<Lease> streamLeases = new ArrayList<>();
for (Lease lease : leaseRefresher.listLeases()) {
if (streamIdentifier.serialize().equals(((MultiStreamLease)lease).streamIdentifier())) {
streamLeases.add(lease);
}
}
return streamLeases;
}
/** Helper method to detect a race condition between fetching the shards via paginated DescribeStream calls
* and a reshard operation.
* @param inconsistentShardIds

View file

@ -16,6 +16,7 @@ package software.amazon.kinesis.leases;
import java.util.List;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
@ -60,6 +61,18 @@ public interface LeaseRefresher {
*/
boolean waitUntilLeaseTableExists(long secondsBetweenPolls, long timeoutSeconds) throws DependencyException;
/**
* List all leases for a given stream synchronously.
*
* @throws DependencyException if DynamoDB scan fails in an unexpected way
* @throws InvalidStateException if lease table does not exist
* @throws ProvisionedThroughputException if DynamoDB scan fails due to lack of capacity
*
* @return list of leases
*/
List<Lease> listLeasesForStream(StreamIdentifier streamIdentifier) throws DependencyException, InvalidStateException,
ProvisionedThroughputException;
/**
* List all objects in table synchronously.
*

View file

@ -22,6 +22,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import com.google.common.collect.ImmutableMap;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
@ -29,6 +30,7 @@ import software.amazon.awssdk.services.dynamodb.model.*;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.FutureUtils;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseManagementConfig;
import software.amazon.kinesis.leases.LeaseRefresher;
@ -58,6 +60,9 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
private boolean newTableCreated = false;
private static final String STREAM_NAME = "streamName";
private static final String DDB_STREAM_NAME = ":streamName";
/**
* Constructor.
*
@ -263,12 +268,21 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
return System.currentTimeMillis() - startTime;
}
/**
* {@inheritDoc}
*/
@Override
public List<Lease> listLeasesForStream(StreamIdentifier streamIdentifier) throws DependencyException,
InvalidStateException, ProvisionedThroughputException {
return list( null, streamIdentifier);
}
/**
* {@inheritDoc}
*/
@Override
public List<Lease> listLeases() throws DependencyException, InvalidStateException, ProvisionedThroughputException {
return list(null);
return list(null, null);
}
/**
@ -277,22 +291,34 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
@Override
public boolean isLeaseTableEmpty()
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
return list(1).isEmpty();
return list(1, null).isEmpty();
}
/**
* List with the given page size. Package access for integration testing.
*
* @param limit number of items to consider at a time - used by integration tests to force paging.
* @param streamIdentifier streamIdentifier for multi-stream mode. Can be null.
* @return list of leases
* @throws InvalidStateException if table does not exist
* @throws DependencyException if DynamoDB scan fail in an unexpected way
* @throws ProvisionedThroughputException if DynamoDB scan fail due to exceeded capacity
*/
List<Lease> list(Integer limit) throws DependencyException, InvalidStateException, ProvisionedThroughputException {
List<Lease> list(Integer limit, StreamIdentifier streamIdentifier) throws DependencyException, InvalidStateException,
ProvisionedThroughputException {
log.debug("Listing leases from table {}", table);
ScanRequest.Builder scanRequestBuilder = ScanRequest.builder().tableName(table);
if (streamIdentifier != null) {
final Map<String, AttributeValue> expressionAttributeValues = ImmutableMap.of(
DDB_STREAM_NAME, AttributeValue.builder().s(streamIdentifier.serialize()).build()
);
scanRequestBuilder = scanRequestBuilder.filterExpression(STREAM_NAME + " = " + DDB_STREAM_NAME)
.expressionAttributeValues(expressionAttributeValues);
}
if (limit != null) {
scanRequestBuilder = scanRequestBuilder.limit(limit);
}

View file

@ -19,6 +19,7 @@ import java.util.List;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
@ -54,6 +55,7 @@ public class ExceptionThrowingLeaseRefresher implements LeaseRefresher {
DELETELEASE(9),
DELETEALL(10),
UPDATELEASE(11),
LISTLEASESFORSTREAM(12),
NONE(Integer.MIN_VALUE);
private Integer index;
@ -129,6 +131,13 @@ public class ExceptionThrowingLeaseRefresher implements LeaseRefresher {
return leaseRefresher.waitUntilLeaseTableExists(secondsBetweenPolls, timeoutSeconds);
}
@Override
public List<Lease> listLeasesForStream(StreamIdentifier streamIdentifier) throws DependencyException, InvalidStateException, ProvisionedThroughputException {
throwExceptions("listLeasesForStream", ExceptionThrowingLeaseRefresherMethods.LISTLEASESFORSTREAM);
return leaseRefresher.listLeasesForStream(streamIdentifier);
}
@Override
public List<Lease> listLeases()
throws DependencyException, InvalidStateException, ProvisionedThroughputException {

View file

@ -1039,7 +1039,7 @@ public class HierarchicalShardSyncerTest {
final ArgumentCaptor<Lease> leaseCaptor = ArgumentCaptor.forClass(Lease.class);
when(shardDetector.listShards()).thenReturn(shards);
when(dynamoDBLeaseRefresher.listLeases()).thenReturn(leases);
when(dynamoDBLeaseRefresher.listLeasesForStream(any(StreamIdentifier.class))).thenReturn(leases);
doNothing().when(dynamoDBLeaseRefresher).deleteLease(leaseCaptor.capture());
setupMultiStream();
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher,
@ -1049,7 +1049,7 @@ public class HierarchicalShardSyncerTest {
assertThat(leaseCaptor.getValue(), equalTo(garbageLease));
verify(shardDetector, times(2)).listShards();
verify(dynamoDBLeaseRefresher).listLeases();
verify(dynamoDBLeaseRefresher).listLeasesForStream(any(StreamIdentifier.class));
verify(dynamoDBLeaseRefresher).deleteLease(any(Lease.class));
verify(dynamoDBLeaseRefresher, never()).createLeaseIfNotExists(any(Lease.class));
}

View file

@ -71,7 +71,7 @@ public class DynamoDBLeaseRefresherIntegrationTest extends LeaseIntegrationTest
Collection<Lease> expected = builder.build().values();
// The / 3 here ensures that we will test Dynamo's paging mechanics.
List<Lease> actual = leaseRefresher.list(numRecordsToPut / 3);
List<Lease> actual = leaseRefresher.list(numRecordsToPut / 3, null);
for (Lease lease : actual) {
assertNotNull(expected.remove(lease));

View file

@ -33,7 +33,7 @@
</scm>
<properties>
<awssdk.version>2.11.8-SNAPSHOT</awssdk.version>
<awssdk.version>2.10.65-SNAPSHOT</awssdk.version>
</properties>
<licenses>