This commit is contained in:
Joshua Kim 2020-03-30 18:37:30 -04:00
parent f2911f1f0d
commit 82cc362545
6 changed files with 46 additions and 50 deletions

View file

@ -89,10 +89,10 @@ public class HierarchicalShardSyncer {
* @param shardDetector * @param shardDetector
* @param leaseRefresher * @param leaseRefresher
* @param initialPosition * @param initialPosition
* @param garbageCollectLeases * @param scope
* @param cleanupLeasesOfCompletedShards * @param cleanupLeasesOfCompletedShards
* @param ignoreUnexpectedChildShards * @param ignoreUnexpectedChildShards
* @param scope * @param garbageCollectLeases
* @throws DependencyException * @throws DependencyException
* @throws InvalidStateException * @throws InvalidStateException
* @throws ProvisionedThroughputException * @throws ProvisionedThroughputException
@ -101,20 +101,20 @@ public class HierarchicalShardSyncer {
// CHECKSTYLE:OFF CyclomaticComplexity // CHECKSTYLE:OFF CyclomaticComplexity
public synchronized void checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector, public synchronized void checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector,
final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition, final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition,
final boolean garbageCollectLeases, final boolean cleanupLeasesOfCompletedShards, final MetricsScope scope, final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards,
final boolean ignoreUnexpectedChildShards, final MetricsScope scope, final boolean isLeaseTableEmpty) final boolean garbageCollectLeases, final boolean isLeaseTableEmpty)
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
final List<Shard> latestShards = isLeaseTableEmpty ? final List<Shard> latestShards = isLeaseTableEmpty ?
getShardListAtInitialPosition(shardDetector, initialPosition) : getShardList(shardDetector); getShardListAtInitialPosition(shardDetector, initialPosition) : getShardList(shardDetector);
checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, garbageCollectLeases, checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, latestShards, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, garbageCollectLeases,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, latestShards, isLeaseTableEmpty); isLeaseTableEmpty);
} }
//Provide a pre-collcted list of shards to avoid calling ListShards API //Provide a pre-collcted list of shards to avoid calling ListShards API
public synchronized void checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector, public synchronized void checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector,
final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition, final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition,
final boolean garbageCollectLeases, final boolean cleanupLeasesOfCompletedShards, List<Shard> latestShards, final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards,
final boolean ignoreUnexpectedChildShards, final MetricsScope scope, List<Shard> latestShards, final boolean isLeaseTableEmpty) final MetricsScope scope, final boolean garbageCollectLeases, final boolean isLeaseTableEmpty)
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
//TODO: Need to add multistream support for this https://sim.amazon.com/issues/KinesisLTR-191 //TODO: Need to add multistream support for this https://sim.amazon.com/issues/KinesisLTR-191

View file

@ -68,8 +68,8 @@ public class ShardSyncTask implements ConsumerTask {
try { try {
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher,
initialPosition, cleanupLeasesUponShardCompletion, garbageCollectLeases, ignoreUnexpectedChildShards, initialPosition, scope, garbageCollectLeases, ignoreUnexpectedChildShards, cleanupLeasesUponShardCompletion,
scope, leaseRefresher.isLeaseTableEmpty()); leaseRefresher.isLeaseTableEmpty());
if (shardSyncTaskIdleTimeMillis > 0) { if (shardSyncTaskIdleTimeMillis > 0) {
Thread.sleep(shardSyncTaskIdleTimeMillis); Thread.sleep(shardSyncTaskIdleTimeMillis);

View file

@ -42,6 +42,6 @@ public class ShardSyncer {
final MetricsScope scope) throws DependencyException, InvalidStateException, ProvisionedThroughputException, final MetricsScope scope) throws DependencyException, InvalidStateException, ProvisionedThroughputException,
KinesisClientLibIOException { KinesisClientLibIOException {
HIERARCHICAL_SHARD_SYNCER.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, HIERARCHICAL_SHARD_SYNCER.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition,
garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, leaseRefresher.isLeaseTableEmpty()); scope, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, leaseRefresher.isLeaseTableEmpty());
} }
} }

View file

@ -16,7 +16,6 @@ package software.amazon.kinesis.lifecycle;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.sun.org.apache.bcel.internal.generic.LUSHR;
import lombok.NonNull; import lombok.NonNull;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -28,7 +27,6 @@ import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.HierarchicalShardSyncer; import software.amazon.kinesis.leases.HierarchicalShardSyncer;
@ -42,8 +40,6 @@ import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List; import java.util.List;
/** /**
@ -157,8 +153,8 @@ public class ShutdownTask implements ConsumerTask {
log.debug("Looking for child shards of shard {}", shardInfo.shardId()); log.debug("Looking for child shards of shard {}", shardInfo.shardId());
// create leases for the child shards // create leases for the child shards
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseCoordinator.leaseRefresher(), hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseCoordinator.leaseRefresher(),
initialPositionInStream, garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, initialPositionInStream, latestShards, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, garbageCollectLeases,
scope, latestShards, isLeaseTableEmpty); isLeaseTableEmpty);
log.debug("Finished checking for child shards of shard {}", shardInfo.shardId()); log.debug("Finished checking for child shards of shard {}", shardInfo.shardId());
} }

View file

@ -30,7 +30,6 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import java.math.BigInteger; import java.math.BigInteger;
import java.time.Instant;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
@ -284,7 +283,7 @@ public class HierarchicalShardSyncerTest {
hierarchicalShardSyncer hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
garbageCollectLeases, cleanupLeasesOfCompletedShards, false, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); SCOPE, cleanupLeasesOfCompletedShards, false, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty());
final Set<String> expectedShardIds = new HashSet<>( final Set<String> expectedShardIds = new HashSet<>(
Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10")); Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10"));
@ -318,7 +317,7 @@ public class HierarchicalShardSyncerTest {
setupMultiStream(); setupMultiStream();
hierarchicalShardSyncer hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
garbageCollectLeases, cleanupLeasesOfCompletedShards, false, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); SCOPE, cleanupLeasesOfCompletedShards, false, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty());
final Set<String> expectedShardIds = new HashSet<>( final Set<String> expectedShardIds = new HashSet<>(
toMultiStreamLeaseList(Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10"))); toMultiStreamLeaseList(Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10")));
@ -360,8 +359,8 @@ public class HierarchicalShardSyncerTest {
hierarchicalShardSyncer hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
garbageCollectLeases, cleanupLeasesOfCompletedShards, false, latestShards, cleanupLeasesOfCompletedShards, false, SCOPE, garbageCollectLeases,
SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty(), latestShards); dynamoDBLeaseRefresher.isLeaseTableEmpty());
final Set<String> expectedShardIds = new HashSet<>( final Set<String> expectedShardIds = new HashSet<>(
Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10")); Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10"));
@ -397,8 +396,8 @@ public class HierarchicalShardSyncerTest {
setupMultiStream(); setupMultiStream();
hierarchicalShardSyncer hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
garbageCollectLeases, cleanupLeasesOfCompletedShards, false, SCOPE, latestShards, cleanupLeasesOfCompletedShards, false, SCOPE, garbageCollectLeases,
dynamoDBLeaseRefresher.isLeaseTableEmpty(), latestShards); dynamoDBLeaseRefresher.isLeaseTableEmpty());
final Set<String> expectedShardIds = new HashSet<>( final Set<String> expectedShardIds = new HashSet<>(
toMultiStreamLeaseList(Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10"))); toMultiStreamLeaseList(Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10")));
@ -434,8 +433,8 @@ public class HierarchicalShardSyncerTest {
hierarchicalShardSyncer hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
garbageCollectLeases, cleanupLeasesOfCompletedShards, false, SCOPE, new ArrayList<Shard>(), cleanupLeasesOfCompletedShards, false, SCOPE, garbageCollectLeases,
dynamoDBLeaseRefresher.isLeaseTableEmpty(), new ArrayList<Shard>()); dynamoDBLeaseRefresher.isLeaseTableEmpty());
final Set<String> expectedShardIds = new HashSet<>(); final Set<String> expectedShardIds = new HashSet<>();
@ -475,7 +474,7 @@ public class HierarchicalShardSyncerTest {
try { try {
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher,
INITIAL_POSITION_TRIM_HORIZON, garbageCollectLeases, cleanupLeasesOfCompletedShards, false, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); INITIAL_POSITION_TRIM_HORIZON, SCOPE, cleanupLeasesOfCompletedShards, false, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty());
} finally { } finally {
verify(shardDetector).listShards(); verify(shardDetector).listShards();
verify(dynamoDBLeaseRefresher, never()).listLeases(); verify(dynamoDBLeaseRefresher, never()).listLeases();
@ -495,7 +494,7 @@ public class HierarchicalShardSyncerTest {
setupMultiStream(); setupMultiStream();
try { try {
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher,
INITIAL_POSITION_TRIM_HORIZON, garbageCollectLeases, cleanupLeasesOfCompletedShards, false, SCOPE, INITIAL_POSITION_TRIM_HORIZON, SCOPE, cleanupLeasesOfCompletedShards, false, garbageCollectLeases,
dynamoDBLeaseRefresher.isLeaseTableEmpty()); dynamoDBLeaseRefresher.isLeaseTableEmpty());
} finally { } finally {
verify(shardDetector).listShards(); verify(shardDetector).listShards();
@ -531,7 +530,7 @@ public class HierarchicalShardSyncerTest {
hierarchicalShardSyncer hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
garbageCollectLeases, cleanupLeasesOfCompletedShards, true, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); SCOPE, cleanupLeasesOfCompletedShards, true, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty());
final List<Lease> leases = leaseCaptor.getAllValues(); final List<Lease> leases = leaseCaptor.getAllValues();
final Set<String> leaseKeys = leases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set<String> leaseKeys = leases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
@ -576,7 +575,7 @@ public class HierarchicalShardSyncerTest {
setupMultiStream(); setupMultiStream();
hierarchicalShardSyncer hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
garbageCollectLeases, cleanupLeasesOfCompletedShards, true, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); SCOPE, cleanupLeasesOfCompletedShards, true, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty());
final List<Lease> leases = leaseCaptor.getAllValues(); final List<Lease> leases = leaseCaptor.getAllValues();
final Set<String> leaseKeys = leases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set<String> leaseKeys = leases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
@ -632,7 +631,7 @@ public class HierarchicalShardSyncerTest {
// Initial call: No leases present, create leases. // Initial call: No leases present, create leases.
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty());
final Set<Lease> createLeases = new HashSet<>(leaseCreateCaptor.getAllValues()); final Set<Lease> createLeases = new HashSet<>(leaseCreateCaptor.getAllValues());
@ -647,7 +646,7 @@ public class HierarchicalShardSyncerTest {
// Second call: Leases present, with shardId-0 being at ShardEnd causing cleanup. // Second call: Leases present, with shardId-0 being at ShardEnd causing cleanup.
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty());
final List<Lease> deleteLeases = leaseDeleteCaptor.getAllValues(); final List<Lease> deleteLeases = leaseDeleteCaptor.getAllValues();
final Set<String> shardIds = deleteLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set<String> shardIds = deleteLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final Set<ExtendedSequenceNumber> sequenceNumbers = deleteLeases.stream().map(Lease::checkpoint) final Set<ExtendedSequenceNumber> sequenceNumbers = deleteLeases.stream().map(Lease::checkpoint)
@ -707,7 +706,7 @@ public class HierarchicalShardSyncerTest {
// Initial call: Call to create leases. // Initial call: Call to create leases.
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty());
final Set<Lease> createLeases = new HashSet<>(leaseCreateCaptor.getAllValues()); final Set<Lease> createLeases = new HashSet<>(leaseCreateCaptor.getAllValues());
@ -724,7 +723,7 @@ public class HierarchicalShardSyncerTest {
// Second call: Leases already present. ShardId-0 is at ShardEnd and needs to be cleaned up. Delete fails. // Second call: Leases already present. ShardId-0 is at ShardEnd and needs to be cleaned up. Delete fails.
hierarchicalShardSyncer hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty());
} finally { } finally {
List<Lease> deleteLeases = leaseDeleteCaptor.getAllValues(); List<Lease> deleteLeases = leaseDeleteCaptor.getAllValues();
Set<String> shardIds = deleteLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); Set<String> shardIds = deleteLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
@ -748,7 +747,7 @@ public class HierarchicalShardSyncerTest {
// Final call: Leases already present. ShardId-0 is at ShardEnd and needs to be cleaned up. Delete passes. // Final call: Leases already present. ShardId-0 is at ShardEnd and needs to be cleaned up. Delete passes.
hierarchicalShardSyncer hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty());
deleteLeases = leaseDeleteCaptor.getAllValues(); deleteLeases = leaseDeleteCaptor.getAllValues();
@ -809,7 +808,7 @@ public class HierarchicalShardSyncerTest {
// Initial call: Call to create leases. Fails on ListLeases // Initial call: Call to create leases. Fails on ListLeases
hierarchicalShardSyncer hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty());
} finally { } finally {
verify(shardDetector, times(1)).listShards(); verify(shardDetector, times(1)).listShards();
verify(dynamoDBLeaseRefresher, times(1)).listLeases(); verify(dynamoDBLeaseRefresher, times(1)).listLeases();
@ -819,7 +818,7 @@ public class HierarchicalShardSyncerTest {
// Second call: Leases not present, leases will be created. // Second call: Leases not present, leases will be created.
hierarchicalShardSyncer hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty());
final Set<Lease> createLeases = new HashSet<>(leaseCreateCaptor.getAllValues()); final Set<Lease> createLeases = new HashSet<>(leaseCreateCaptor.getAllValues());
final Set<Lease> expectedCreateLeases = new HashSet<>(createLeasesFromShards(shards, sequenceNumber, null)); final Set<Lease> expectedCreateLeases = new HashSet<>(createLeasesFromShards(shards, sequenceNumber, null));
@ -834,7 +833,7 @@ public class HierarchicalShardSyncerTest {
// Final call: Leases present, belongs to TestOwner, shardId-0 is at ShardEnd should be cleaned up. // Final call: Leases present, belongs to TestOwner, shardId-0 is at ShardEnd should be cleaned up.
hierarchicalShardSyncer hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty());
final List<Lease> deleteLeases = leaseDeleteCaptor.getAllValues(); final List<Lease> deleteLeases = leaseDeleteCaptor.getAllValues();
final Set<String> shardIds = deleteLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set<String> shardIds = deleteLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
@ -900,7 +899,7 @@ public class HierarchicalShardSyncerTest {
// Initial call: No leases present, create leases. Create lease Fails // Initial call: No leases present, create leases. Create lease Fails
hierarchicalShardSyncer hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty());
} finally { } finally {
verify(shardDetector, times(1)).listShards(); verify(shardDetector, times(1)).listShards();
verify(dynamoDBLeaseRefresher, times(1)).listLeases(); verify(dynamoDBLeaseRefresher, times(1)).listLeases();
@ -909,7 +908,7 @@ public class HierarchicalShardSyncerTest {
hierarchicalShardSyncer hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty());
final Set<Lease> createLeases = new HashSet<>(leaseCreateCaptor.getAllValues()); final Set<Lease> createLeases = new HashSet<>(leaseCreateCaptor.getAllValues());
final Set<Lease> expectedCreateLeases = new HashSet<>(createLeasesFromShards(shards, sequenceNumber, null)); final Set<Lease> expectedCreateLeases = new HashSet<>(createLeasesFromShards(shards, sequenceNumber, null));
@ -924,7 +923,7 @@ public class HierarchicalShardSyncerTest {
// Final call: Leases are present, shardId-0 is at ShardEnd needs to be cleaned up. // Final call: Leases are present, shardId-0 is at ShardEnd needs to be cleaned up.
hierarchicalShardSyncer hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty());
final List<Lease> deleteLeases = leaseDeleteCaptor.getAllValues(); final List<Lease> deleteLeases = leaseDeleteCaptor.getAllValues();
final Set<String> shardIds = deleteLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set<String> shardIds = deleteLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
@ -1014,7 +1013,7 @@ public class HierarchicalShardSyncerTest {
doNothing().when(dynamoDBLeaseRefresher).deleteLease(leaseCaptor.capture()); doNothing().when(dynamoDBLeaseRefresher).deleteLease(leaseCaptor.capture());
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher,
INITIAL_POSITION_TRIM_HORIZON, garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); INITIAL_POSITION_TRIM_HORIZON, SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty());
assertThat(leaseCaptor.getAllValues().size(), equalTo(1)); assertThat(leaseCaptor.getAllValues().size(), equalTo(1));
assertThat(leaseCaptor.getValue(), equalTo(garbageLease)); assertThat(leaseCaptor.getValue(), equalTo(garbageLease));
@ -1043,7 +1042,7 @@ public class HierarchicalShardSyncerTest {
doNothing().when(dynamoDBLeaseRefresher).deleteLease(leaseCaptor.capture()); doNothing().when(dynamoDBLeaseRefresher).deleteLease(leaseCaptor.capture());
setupMultiStream(); setupMultiStream();
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher,
INITIAL_POSITION_TRIM_HORIZON, garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); INITIAL_POSITION_TRIM_HORIZON, SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty());
assertThat(leaseCaptor.getAllValues().size(), equalTo(1)); assertThat(leaseCaptor.getAllValues().size(), equalTo(1));
assertThat(leaseCaptor.getValue(), equalTo(garbageLease)); assertThat(leaseCaptor.getValue(), equalTo(garbageLease));
@ -1075,7 +1074,7 @@ public class HierarchicalShardSyncerTest {
hierarchicalShardSyncer hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, initialPosition, .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, initialPosition,
garbageCollectLeases, cleanupLeasesOfCompletedShards, false, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); SCOPE, cleanupLeasesOfCompletedShards, false, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty());
final List<Lease> leases = leaseCaptor.getAllValues(); final List<Lease> leases = leaseCaptor.getAllValues();
final Set<String> leaseKeys = leases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set<String> leaseKeys = leases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
@ -1656,7 +1655,7 @@ public class HierarchicalShardSyncerTest {
hierarchicalShardSyncer hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, initialPosition, .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, initialPosition,
garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty());
verify(shardDetector, atLeast(1)).listShardsWithFilter(shardFilter); verify(shardDetector, atLeast(1)).listShardsWithFilter(shardFilter);
verify(shardDetector, never()).listShards(); verify(shardDetector, never()).listShards();
@ -1678,7 +1677,7 @@ public class HierarchicalShardSyncerTest {
hierarchicalShardSyncer hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty());
verify(shardDetector, atLeast(1)).listShards(); verify(shardDetector, atLeast(1)).listShards();
} }

View file

@ -19,7 +19,6 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
@ -44,7 +43,6 @@ import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException; import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException;
import software.amazon.kinesis.leases.HierarchicalShardSyncer; import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardDetector;
@ -129,6 +127,9 @@ public class ShutdownTaskTest {
*/ */
@Test @Test
public final void testCallWhenSyncingShardsThrows() throws Exception { public final void testCallWhenSyncingShardsThrows() throws Exception {
final boolean garbageCollectLeases = false;
final boolean isLeaseTableEmpty = false;
List<Shard> latestShards = constructShardListGraphA(); List<Shard> latestShards = constructShardListGraphA();
when(shardDetector.listShards()).thenReturn(latestShards); when(shardDetector.listShards()).thenReturn(latestShards);
when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END); when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END);
@ -138,8 +139,8 @@ public class ShutdownTaskTest {
throw new KinesisClientLibIOException("KinesisClientLibIOException"); throw new KinesisClientLibIOException("KinesisClientLibIOException");
}).when(hierarchicalShardSyncer) }).when(hierarchicalShardSyncer)
.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, INITIAL_POSITION_TRIM_HORIZON, .checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, INITIAL_POSITION_TRIM_HORIZON,
false, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, latestShards, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards,
NULL_METRICS_FACTORY.createMetrics(), false, latestShards); NULL_METRICS_FACTORY.createMetrics(), garbageCollectLeases, isLeaseTableEmpty);
final TaskResult result = task.call(); final TaskResult result = task.call();
assertNotNull(result.getException()); assertNotNull(result.getException());