Merge pull request #15 from jushkem/QueryMultiStream
Adding support to filter out multi streaming leases
This commit is contained in:
commit
15f17700d9
6 changed files with 55 additions and 31 deletions
|
|
@ -142,8 +142,7 @@ public class HierarchicalShardSyncer {
|
||||||
assertAllParentShardsAreClosed(inconsistentShardIds);
|
assertAllParentShardsAreClosed(inconsistentShardIds);
|
||||||
}
|
}
|
||||||
final List<Lease> currentLeases = isMultiStreamMode ?
|
final List<Lease> currentLeases = isMultiStreamMode ?
|
||||||
getLeasesForStream(shardDetector.streamIdentifier(), leaseRefresher) :
|
leaseRefresher.listLeasesForStream(shardDetector.streamIdentifier()) : leaseRefresher.listLeases();
|
||||||
leaseRefresher.listLeases();
|
|
||||||
final MultiStreamArgs multiStreamArgs = new MultiStreamArgs(isMultiStreamMode, shardDetector.streamIdentifier());
|
final MultiStreamArgs multiStreamArgs = new MultiStreamArgs(isMultiStreamMode, shardDetector.streamIdentifier());
|
||||||
final LeaseSynchronizer leaseSynchronizer = isLeaseTableEmpty ? new EmptyLeaseTableSynchronizer() :
|
final LeaseSynchronizer leaseSynchronizer = isLeaseTableEmpty ? new EmptyLeaseTableSynchronizer() :
|
||||||
new NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap);
|
new NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap);
|
||||||
|
|
@ -171,29 +170,6 @@ public class HierarchicalShardSyncer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// CHECKSTYLE:ON CyclomaticComplexity
|
|
||||||
|
|
||||||
/** Note: This method has package level access solely for testing purposes.
|
|
||||||
*
|
|
||||||
* @param streamIdentifier We'll use this stream identifier to filter leases
|
|
||||||
* @param leaseRefresher Used to fetch leases
|
|
||||||
* @return Return list of leases (corresponding to shards) of the specified stream.
|
|
||||||
* @throws DependencyException
|
|
||||||
* @throws InvalidStateException
|
|
||||||
* @throws ProvisionedThroughputException
|
|
||||||
*/
|
|
||||||
static List<Lease> getLeasesForStream(StreamIdentifier streamIdentifier,
|
|
||||||
LeaseRefresher leaseRefresher)
|
|
||||||
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
|
|
||||||
List<Lease> streamLeases = new ArrayList<>();
|
|
||||||
for (Lease lease : leaseRefresher.listLeases()) {
|
|
||||||
if (streamIdentifier.serialize().equals(((MultiStreamLease)lease).streamIdentifier())) {
|
|
||||||
streamLeases.add(lease);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return streamLeases;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Helper method to detect a race condition between fetching the shards via paginated DescribeStream calls
|
/** Helper method to detect a race condition between fetching the shards via paginated DescribeStream calls
|
||||||
* and a reshard operation.
|
* and a reshard operation.
|
||||||
* @param inconsistentShardIds
|
* @param inconsistentShardIds
|
||||||
|
|
|
||||||
|
|
@ -16,6 +16,7 @@ package software.amazon.kinesis.leases;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
import software.amazon.kinesis.common.StreamIdentifier;
|
||||||
import software.amazon.kinesis.leases.exceptions.DependencyException;
|
import software.amazon.kinesis.leases.exceptions.DependencyException;
|
||||||
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
|
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
|
||||||
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
|
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
|
||||||
|
|
@ -60,6 +61,18 @@ public interface LeaseRefresher {
|
||||||
*/
|
*/
|
||||||
boolean waitUntilLeaseTableExists(long secondsBetweenPolls, long timeoutSeconds) throws DependencyException;
|
boolean waitUntilLeaseTableExists(long secondsBetweenPolls, long timeoutSeconds) throws DependencyException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* List all leases for a given stream synchronously.
|
||||||
|
*
|
||||||
|
* @throws DependencyException if DynamoDB scan fails in an unexpected way
|
||||||
|
* @throws InvalidStateException if lease table does not exist
|
||||||
|
* @throws ProvisionedThroughputException if DynamoDB scan fails due to lack of capacity
|
||||||
|
*
|
||||||
|
* @return list of leases
|
||||||
|
*/
|
||||||
|
List<Lease> listLeasesForStream(StreamIdentifier streamIdentifier) throws DependencyException, InvalidStateException,
|
||||||
|
ProvisionedThroughputException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* List all objects in table synchronously.
|
* List all objects in table synchronously.
|
||||||
*
|
*
|
||||||
|
|
|
||||||
|
|
@ -22,6 +22,7 @@ import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableMap;
|
||||||
import lombok.NonNull;
|
import lombok.NonNull;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
|
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
|
||||||
|
|
@ -29,6 +30,7 @@ import software.amazon.awssdk.services.dynamodb.model.*;
|
||||||
import software.amazon.awssdk.utils.CollectionUtils;
|
import software.amazon.awssdk.utils.CollectionUtils;
|
||||||
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
|
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
|
||||||
import software.amazon.kinesis.common.FutureUtils;
|
import software.amazon.kinesis.common.FutureUtils;
|
||||||
|
import software.amazon.kinesis.common.StreamIdentifier;
|
||||||
import software.amazon.kinesis.leases.Lease;
|
import software.amazon.kinesis.leases.Lease;
|
||||||
import software.amazon.kinesis.leases.LeaseManagementConfig;
|
import software.amazon.kinesis.leases.LeaseManagementConfig;
|
||||||
import software.amazon.kinesis.leases.LeaseRefresher;
|
import software.amazon.kinesis.leases.LeaseRefresher;
|
||||||
|
|
@ -58,6 +60,9 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
|
||||||
|
|
||||||
private boolean newTableCreated = false;
|
private boolean newTableCreated = false;
|
||||||
|
|
||||||
|
private static final String STREAM_NAME = "streamName";
|
||||||
|
private static final String DDB_STREAM_NAME = ":streamName";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor.
|
* Constructor.
|
||||||
*
|
*
|
||||||
|
|
@ -263,12 +268,21 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
|
||||||
return System.currentTimeMillis() - startTime;
|
return System.currentTimeMillis() - startTime;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public List<Lease> listLeasesForStream(StreamIdentifier streamIdentifier) throws DependencyException,
|
||||||
|
InvalidStateException, ProvisionedThroughputException {
|
||||||
|
return list( null, streamIdentifier);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* {@inheritDoc}
|
* {@inheritDoc}
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public List<Lease> listLeases() throws DependencyException, InvalidStateException, ProvisionedThroughputException {
|
public List<Lease> listLeases() throws DependencyException, InvalidStateException, ProvisionedThroughputException {
|
||||||
return list(null);
|
return list(null, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -277,22 +291,34 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
|
||||||
@Override
|
@Override
|
||||||
public boolean isLeaseTableEmpty()
|
public boolean isLeaseTableEmpty()
|
||||||
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
|
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
|
||||||
return list(1).isEmpty();
|
return list(1, null).isEmpty();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* List with the given page size. Package access for integration testing.
|
* List with the given page size. Package access for integration testing.
|
||||||
*
|
*
|
||||||
* @param limit number of items to consider at a time - used by integration tests to force paging.
|
* @param limit number of items to consider at a time - used by integration tests to force paging.
|
||||||
|
* @param streamIdentifier streamIdentifier for multi-stream mode. Can be null.
|
||||||
* @return list of leases
|
* @return list of leases
|
||||||
* @throws InvalidStateException if table does not exist
|
* @throws InvalidStateException if table does not exist
|
||||||
* @throws DependencyException if DynamoDB scan fail in an unexpected way
|
* @throws DependencyException if DynamoDB scan fail in an unexpected way
|
||||||
* @throws ProvisionedThroughputException if DynamoDB scan fail due to exceeded capacity
|
* @throws ProvisionedThroughputException if DynamoDB scan fail due to exceeded capacity
|
||||||
*/
|
*/
|
||||||
List<Lease> list(Integer limit) throws DependencyException, InvalidStateException, ProvisionedThroughputException {
|
List<Lease> list(Integer limit, StreamIdentifier streamIdentifier) throws DependencyException, InvalidStateException,
|
||||||
|
ProvisionedThroughputException {
|
||||||
|
|
||||||
log.debug("Listing leases from table {}", table);
|
log.debug("Listing leases from table {}", table);
|
||||||
|
|
||||||
ScanRequest.Builder scanRequestBuilder = ScanRequest.builder().tableName(table);
|
ScanRequest.Builder scanRequestBuilder = ScanRequest.builder().tableName(table);
|
||||||
|
|
||||||
|
if (streamIdentifier != null) {
|
||||||
|
final Map<String, AttributeValue> expressionAttributeValues = ImmutableMap.of(
|
||||||
|
DDB_STREAM_NAME, AttributeValue.builder().s(streamIdentifier.serialize()).build()
|
||||||
|
);
|
||||||
|
scanRequestBuilder = scanRequestBuilder.filterExpression(STREAM_NAME + " = " + DDB_STREAM_NAME)
|
||||||
|
.expressionAttributeValues(expressionAttributeValues);
|
||||||
|
}
|
||||||
|
|
||||||
if (limit != null) {
|
if (limit != null) {
|
||||||
scanRequestBuilder = scanRequestBuilder.limit(limit);
|
scanRequestBuilder = scanRequestBuilder.limit(limit);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -19,6 +19,7 @@ import java.util.List;
|
||||||
|
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import software.amazon.kinesis.common.StreamIdentifier;
|
||||||
import software.amazon.kinesis.leases.exceptions.DependencyException;
|
import software.amazon.kinesis.leases.exceptions.DependencyException;
|
||||||
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
|
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
|
||||||
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
|
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
|
||||||
|
|
@ -54,6 +55,7 @@ public class ExceptionThrowingLeaseRefresher implements LeaseRefresher {
|
||||||
DELETELEASE(9),
|
DELETELEASE(9),
|
||||||
DELETEALL(10),
|
DELETEALL(10),
|
||||||
UPDATELEASE(11),
|
UPDATELEASE(11),
|
||||||
|
LISTLEASESFORSTREAM(12),
|
||||||
NONE(Integer.MIN_VALUE);
|
NONE(Integer.MIN_VALUE);
|
||||||
|
|
||||||
private Integer index;
|
private Integer index;
|
||||||
|
|
@ -129,6 +131,13 @@ public class ExceptionThrowingLeaseRefresher implements LeaseRefresher {
|
||||||
return leaseRefresher.waitUntilLeaseTableExists(secondsBetweenPolls, timeoutSeconds);
|
return leaseRefresher.waitUntilLeaseTableExists(secondsBetweenPolls, timeoutSeconds);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<Lease> listLeasesForStream(StreamIdentifier streamIdentifier) throws DependencyException, InvalidStateException, ProvisionedThroughputException {
|
||||||
|
throwExceptions("listLeasesForStream", ExceptionThrowingLeaseRefresherMethods.LISTLEASESFORSTREAM);
|
||||||
|
|
||||||
|
return leaseRefresher.listLeasesForStream(streamIdentifier);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<Lease> listLeases()
|
public List<Lease> listLeases()
|
||||||
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
|
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
|
||||||
|
|
|
||||||
|
|
@ -1039,7 +1039,7 @@ public class HierarchicalShardSyncerTest {
|
||||||
final ArgumentCaptor<Lease> leaseCaptor = ArgumentCaptor.forClass(Lease.class);
|
final ArgumentCaptor<Lease> leaseCaptor = ArgumentCaptor.forClass(Lease.class);
|
||||||
|
|
||||||
when(shardDetector.listShards()).thenReturn(shards);
|
when(shardDetector.listShards()).thenReturn(shards);
|
||||||
when(dynamoDBLeaseRefresher.listLeases()).thenReturn(leases);
|
when(dynamoDBLeaseRefresher.listLeasesForStream(any(StreamIdentifier.class))).thenReturn(leases);
|
||||||
doNothing().when(dynamoDBLeaseRefresher).deleteLease(leaseCaptor.capture());
|
doNothing().when(dynamoDBLeaseRefresher).deleteLease(leaseCaptor.capture());
|
||||||
setupMultiStream();
|
setupMultiStream();
|
||||||
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher,
|
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher,
|
||||||
|
|
@ -1049,7 +1049,7 @@ public class HierarchicalShardSyncerTest {
|
||||||
assertThat(leaseCaptor.getValue(), equalTo(garbageLease));
|
assertThat(leaseCaptor.getValue(), equalTo(garbageLease));
|
||||||
|
|
||||||
verify(shardDetector, times(2)).listShards();
|
verify(shardDetector, times(2)).listShards();
|
||||||
verify(dynamoDBLeaseRefresher).listLeases();
|
verify(dynamoDBLeaseRefresher).listLeasesForStream(any(StreamIdentifier.class));
|
||||||
verify(dynamoDBLeaseRefresher).deleteLease(any(Lease.class));
|
verify(dynamoDBLeaseRefresher).deleteLease(any(Lease.class));
|
||||||
verify(dynamoDBLeaseRefresher, never()).createLeaseIfNotExists(any(Lease.class));
|
verify(dynamoDBLeaseRefresher, never()).createLeaseIfNotExists(any(Lease.class));
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -71,7 +71,7 @@ public class DynamoDBLeaseRefresherIntegrationTest extends LeaseIntegrationTest
|
||||||
Collection<Lease> expected = builder.build().values();
|
Collection<Lease> expected = builder.build().values();
|
||||||
|
|
||||||
// The / 3 here ensures that we will test Dynamo's paging mechanics.
|
// The / 3 here ensures that we will test Dynamo's paging mechanics.
|
||||||
List<Lease> actual = leaseRefresher.list(numRecordsToPut / 3);
|
List<Lease> actual = leaseRefresher.list(numRecordsToPut / 3, null);
|
||||||
|
|
||||||
for (Lease lease : actual) {
|
for (Lease lease : actual) {
|
||||||
assertNotNull(expected.remove(lease));
|
assertNotNull(expected.remove(lease));
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue