Revert "Fixing bug for multistream not using list shards with filter. (#66)"
This reverts commit f693311ac8.
This commit is contained in:
parent
f693311ac8
commit
74ffd4060c
6 changed files with 9 additions and 33 deletions
|
|
@ -125,6 +125,8 @@ public class HierarchicalShardSyncer {
|
||||||
List<Shard> latestShards, final boolean ignoreUnexpectedChildShards, final MetricsScope scope, final boolean isLeaseTableEmpty)
|
List<Shard> latestShards, final boolean ignoreUnexpectedChildShards, final MetricsScope scope, final boolean isLeaseTableEmpty)
|
||||||
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
|
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
|
||||||
|
|
||||||
|
//TODO: Need to add multistream support for this https://sim.amazon.com/issues/KinesisLTR-191
|
||||||
|
|
||||||
if (!CollectionUtils.isNullOrEmpty(latestShards)) {
|
if (!CollectionUtils.isNullOrEmpty(latestShards)) {
|
||||||
log.debug("{} - Num shards: {}", streamIdentifier, latestShards.size());
|
log.debug("{} - Num shards: {}", streamIdentifier, latestShards.size());
|
||||||
} else {
|
} else {
|
||||||
|
|
|
||||||
|
|
@ -217,19 +217,6 @@ public interface LeaseRefresher {
|
||||||
*/
|
*/
|
||||||
boolean isLeaseTableEmpty() throws DependencyException, InvalidStateException, ProvisionedThroughputException;
|
boolean isLeaseTableEmpty() throws DependencyException, InvalidStateException, ProvisionedThroughputException;
|
||||||
|
|
||||||
/**
|
|
||||||
* Check (synchronously) if there are any leases in the lease table for a given stream identifier.
|
|
||||||
*
|
|
||||||
* @param streamIdentifier for multi-stream mode. Can be null.
|
|
||||||
* @return true if there are no leases in the lease table
|
|
||||||
*
|
|
||||||
* @throws DependencyException if DynamoDB scan fails in an unexpected way
|
|
||||||
* @throws InvalidStateException if lease table does not exist
|
|
||||||
* @throws ProvisionedThroughputException if DynamoDB scan fails due to lack of capacity
|
|
||||||
*/
|
|
||||||
boolean isLeaseTableEmptyForStreamIdentifier(StreamIdentifier streamIdentifier) throws DependencyException,
|
|
||||||
InvalidStateException, ProvisionedThroughputException;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Gets the current checkpoint of the shard. This is useful in the resharding use case
|
* Gets the current checkpoint of the shard. This is useful in the resharding use case
|
||||||
* where we will wait for the parent shard to complete before starting on the records from a child shard.
|
* where we will wait for the parent shard to complete before starting on the records from a child shard.
|
||||||
|
|
|
||||||
|
|
@ -69,7 +69,7 @@ public class ShardSyncTask implements ConsumerTask {
|
||||||
try {
|
try {
|
||||||
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher,
|
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher,
|
||||||
initialPosition, scope, ignoreUnexpectedChildShards,
|
initialPosition, scope, ignoreUnexpectedChildShards,
|
||||||
leaseRefresher.isLeaseTableEmptyForStreamIdentifier(shardDetector.streamIdentifier()));
|
leaseRefresher.isLeaseTableEmpty());
|
||||||
|
|
||||||
if (shardSyncTaskIdleTimeMillis > 0) {
|
if (shardSyncTaskIdleTimeMillis > 0) {
|
||||||
Thread.sleep(shardSyncTaskIdleTimeMillis);
|
Thread.sleep(shardSyncTaskIdleTimeMillis);
|
||||||
|
|
|
||||||
|
|
@ -292,13 +292,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
|
||||||
@Override
|
@Override
|
||||||
public boolean isLeaseTableEmpty()
|
public boolean isLeaseTableEmpty()
|
||||||
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
|
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
|
||||||
return isLeaseTableEmptyForStreamIdentifier(null);
|
return list(1, 1, null).isEmpty();
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isLeaseTableEmptyForStreamIdentifier(StreamIdentifier streamIdentifier)
|
|
||||||
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
|
|
||||||
return list(1, 1, streamIdentifier).isEmpty();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -215,11 +215,6 @@ public class ExceptionThrowingLeaseRefresher implements LeaseRefresher {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isLeaseTableEmptyForStreamIdentifier(StreamIdentifier streamIdentifier) throws DependencyException, InvalidStateException, ProvisionedThroughputException {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ExtendedSequenceNumber getCheckpoint(final String leaseKey)
|
public ExtendedSequenceNumber getCheckpoint(final String leaseKey)
|
||||||
throws ProvisionedThroughputException, InvalidStateException, DependencyException {
|
throws ProvisionedThroughputException, InvalidStateException, DependencyException {
|
||||||
|
|
|
||||||
|
|
@ -335,8 +335,7 @@ public class HierarchicalShardSyncerTest {
|
||||||
setupMultiStream();
|
setupMultiStream();
|
||||||
hierarchicalShardSyncer
|
hierarchicalShardSyncer
|
||||||
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
|
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
|
||||||
SCOPE, false, dynamoDBLeaseRefresher.isLeaseTableEmptyForStreamIdentifier(
|
SCOPE, false, dynamoDBLeaseRefresher.isLeaseTableEmpty());
|
||||||
StreamIdentifier.multiStreamInstance(STREAM_IDENTIFIER)));
|
|
||||||
|
|
||||||
final Set<String> expectedShardIds = new HashSet<>(
|
final Set<String> expectedShardIds = new HashSet<>(
|
||||||
toMultiStreamLeaseList(Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10")));
|
toMultiStreamLeaseList(Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10")));
|
||||||
|
|
@ -418,8 +417,8 @@ public class HierarchicalShardSyncerTest {
|
||||||
setupMultiStream();
|
setupMultiStream();
|
||||||
hierarchicalShardSyncer
|
hierarchicalShardSyncer
|
||||||
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
|
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
|
||||||
latestShards, false, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmptyForStreamIdentifier(
|
latestShards, false, SCOPE,
|
||||||
StreamIdentifier.multiStreamInstance(STREAM_IDENTIFIER)));
|
dynamoDBLeaseRefresher.isLeaseTableEmpty());
|
||||||
|
|
||||||
final Set<String> expectedShardIds = new HashSet<>(
|
final Set<String> expectedShardIds = new HashSet<>(
|
||||||
toMultiStreamLeaseList(Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10")));
|
toMultiStreamLeaseList(Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10")));
|
||||||
|
|
@ -683,7 +682,7 @@ public class HierarchicalShardSyncerTest {
|
||||||
try {
|
try {
|
||||||
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher,
|
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher,
|
||||||
INITIAL_POSITION_TRIM_HORIZON, SCOPE, false,
|
INITIAL_POSITION_TRIM_HORIZON, SCOPE, false,
|
||||||
dynamoDBLeaseRefresher.isLeaseTableEmptyForStreamIdentifier(StreamIdentifier.multiStreamInstance(STREAM_IDENTIFIER)));
|
dynamoDBLeaseRefresher.isLeaseTableEmpty());
|
||||||
} finally {
|
} finally {
|
||||||
verify(shardDetector).listShards();
|
verify(shardDetector).listShards();
|
||||||
verify(dynamoDBLeaseRefresher, never()).listLeases();
|
verify(dynamoDBLeaseRefresher, never()).listLeases();
|
||||||
|
|
@ -763,8 +762,7 @@ public class HierarchicalShardSyncerTest {
|
||||||
setupMultiStream();
|
setupMultiStream();
|
||||||
hierarchicalShardSyncer
|
hierarchicalShardSyncer
|
||||||
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
|
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
|
||||||
SCOPE, true, dynamoDBLeaseRefresher.isLeaseTableEmptyForStreamIdentifier(
|
SCOPE, true, dynamoDBLeaseRefresher.isLeaseTableEmpty());
|
||||||
StreamIdentifier.multiStreamInstance(STREAM_IDENTIFIER)));
|
|
||||||
|
|
||||||
final List<Lease> leases = leaseCaptor.getAllValues();
|
final List<Lease> leases = leaseCaptor.getAllValues();
|
||||||
final Set<String> leaseKeys = leases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
|
final Set<String> leaseKeys = leases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue