Fix existing unit tests.
This commit is contained in:
parent
892218d8b5
commit
039dd176f8
3 changed files with 40 additions and 12 deletions
|
|
@ -155,7 +155,7 @@ public class ShutdownTask implements ConsumerTask {
|
||||||
log.debug("Looking for child shards of shard {}", shardInfo.shardId());
|
log.debug("Looking for child shards of shard {}", shardInfo.shardId());
|
||||||
// create leases for the child shards
|
// create leases for the child shards
|
||||||
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseCoordinator.leaseRefresher(),
|
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseCoordinator.leaseRefresher(),
|
||||||
initialPositionInStream, cleanupLeasesOfCompletedShards, true, ignoreUnexpectedChildShards, scope, latestShards);
|
initialPositionInStream, false, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, latestShards);
|
||||||
log.debug("Finished checking for child shards of shard {}", shardInfo.shardId());
|
log.debug("Finished checking for child shards of shard {}", shardInfo.shardId());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -41,6 +41,7 @@ import java.util.Set;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.IntStream;
|
import java.util.stream.IntStream;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
|
@ -180,7 +181,7 @@ public class HierarchicalShardSyncerTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test determineNewLeasesToCreate() where there are no leases and no resharding operations have been performed, but
|
* Test determineNewLeasesToCreate() where there is one lease and no resharding operations have been performed, but
|
||||||
* one of the shards was marked as inconsistent.
|
* one of the shards was marked as inconsistent.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
|
|
@ -188,15 +189,22 @@ public class HierarchicalShardSyncerTest {
|
||||||
final String shardId0 = "shardId-0";
|
final String shardId0 = "shardId-0";
|
||||||
final String shardId1 = "shardId-1";
|
final String shardId1 = "shardId-1";
|
||||||
final String shardId2 = "shardId-2";
|
final String shardId2 = "shardId-2";
|
||||||
|
final String shardId3 = "shardId-3";
|
||||||
final SequenceNumberRange sequenceRange = ShardObjectHelper.newSequenceNumberRange("342980", null);
|
final SequenceNumberRange sequenceRange = ShardObjectHelper.newSequenceNumberRange("342980", null);
|
||||||
|
|
||||||
final List<Shard> shards = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange),
|
final List<Shard> shardsWithLeases = Arrays.asList(ShardObjectHelper.newShard(shardId3, null, null, sequenceRange));
|
||||||
|
final List<Shard> shardsWithoutLeases = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange),
|
||||||
ShardObjectHelper.newShard(shardId1, null, null, sequenceRange),
|
ShardObjectHelper.newShard(shardId1, null, null, sequenceRange),
|
||||||
ShardObjectHelper.newShard(shardId2, shardId1, null, sequenceRange));
|
ShardObjectHelper.newShard(shardId2, shardId1, null, sequenceRange));
|
||||||
final List<Lease> currentLeases = Collections.emptyList();
|
|
||||||
|
|
||||||
|
final List<Shard> shards = Stream.of(shardsWithLeases, shardsWithoutLeases).flatMap(x -> x.stream()).collect(Collectors.toList());
|
||||||
|
final List<Lease> currentLeases = createLeasesFromShards(shardsWithLeases, ExtendedSequenceNumber.LATEST, "foo");
|
||||||
final Set<String> inconsistentShardIds = new HashSet<>(Collections.singletonList(shardId2));
|
final Set<String> inconsistentShardIds = new HashSet<>(Collections.singletonList(shardId2));
|
||||||
final HierarchicalShardSyncer.LeaseSynchronizer leaseSynchronizer = new HierarchicalShardSyncer.NonEmptyLeaseTableSynchronizer();
|
|
||||||
|
Map<String, Shard> shardIdToShardMap = HierarchicalShardSyncer.constructShardIdToShardMap(shards);
|
||||||
|
Map<String, Set<String>> shardIdToChildShardIdsMap = HierarchicalShardSyncer.constructShardIdToChildShardIdsMap(shardIdToShardMap);
|
||||||
|
final HierarchicalShardSyncer.LeaseSynchronizer leaseSynchronizer =
|
||||||
|
new HierarchicalShardSyncer.NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap);
|
||||||
|
|
||||||
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases,
|
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases,
|
||||||
INITIAL_POSITION_LATEST, inconsistentShardIds);
|
INITIAL_POSITION_LATEST, inconsistentShardIds);
|
||||||
|
|
@ -215,15 +223,22 @@ public class HierarchicalShardSyncerTest {
|
||||||
final String shardId0 = "shardId-0";
|
final String shardId0 = "shardId-0";
|
||||||
final String shardId1 = "shardId-1";
|
final String shardId1 = "shardId-1";
|
||||||
final String shardId2 = "shardId-2";
|
final String shardId2 = "shardId-2";
|
||||||
|
final String shardId3 = "shardId-3";
|
||||||
final SequenceNumberRange sequenceRange = ShardObjectHelper.newSequenceNumberRange("342980", null);
|
final SequenceNumberRange sequenceRange = ShardObjectHelper.newSequenceNumberRange("342980", null);
|
||||||
|
|
||||||
final List<Shard> shards = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange),
|
final List<Shard> shardsWithLeases = Arrays.asList(ShardObjectHelper.newShard(shardId3, null, null, sequenceRange));
|
||||||
|
final List<Shard> shardsWithoutLeases = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange),
|
||||||
ShardObjectHelper.newShard(shardId1, null, null, sequenceRange),
|
ShardObjectHelper.newShard(shardId1, null, null, sequenceRange),
|
||||||
ShardObjectHelper.newShard(shardId2, shardId1, null, sequenceRange));
|
ShardObjectHelper.newShard(shardId2, shardId1, null, sequenceRange));
|
||||||
final List<Lease> currentLeases = Collections.emptyList();
|
|
||||||
|
|
||||||
|
final List<Shard> shards = Stream.of(shardsWithLeases, shardsWithoutLeases).flatMap(x -> x.stream()).collect(Collectors.toList());
|
||||||
|
final List<Lease> currentLeases = new ArrayList(createMultiStreamLeasesFromShards(shardsWithLeases, ExtendedSequenceNumber.LATEST, "foo"));
|
||||||
final Set<String> inconsistentShardIds = new HashSet<>(Collections.singletonList(shardId2));
|
final Set<String> inconsistentShardIds = new HashSet<>(Collections.singletonList(shardId2));
|
||||||
final HierarchicalShardSyncer.LeaseSynchronizer leaseSynchronizer = new HierarchicalShardSyncer.EmptyLeaseTableSynchronizer();
|
|
||||||
|
Map<String, Shard> shardIdToShardMap = HierarchicalShardSyncer.constructShardIdToShardMap(shards);
|
||||||
|
Map<String, Set<String>> shardIdToChildShardIdsMap = HierarchicalShardSyncer.constructShardIdToChildShardIdsMap(shardIdToShardMap);
|
||||||
|
final HierarchicalShardSyncer.LeaseSynchronizer leaseSynchronizer =
|
||||||
|
new HierarchicalShardSyncer.NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap);
|
||||||
|
|
||||||
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases,
|
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases,
|
||||||
INITIAL_POSITION_LATEST, inconsistentShardIds, MULTI_STREAM_ARGS);
|
INITIAL_POSITION_LATEST, inconsistentShardIds, MULTI_STREAM_ARGS);
|
||||||
|
|
@ -1100,18 +1115,31 @@ public class HierarchicalShardSyncerTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests that leases are not created for closed shards.
|
||||||
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testDetermineNewLeasesToCreateIgnoreClosedShard() {
|
public void testDetermineNewLeasesToCreateIgnoreClosedShard() {
|
||||||
final String lastShardId = "shardId-1";
|
final String lastShardId = "shardId-1";
|
||||||
final List<Lease> currentLeases = new ArrayList<>();
|
|
||||||
|
|
||||||
final List<Shard> shards = Arrays.asList(
|
final List<Shard> shardsWithoutLeases = Arrays.asList(
|
||||||
ShardObjectHelper.newShard("shardId-0", null, null,
|
ShardObjectHelper.newShard("shardId-0", null, null,
|
||||||
ShardObjectHelper.newSequenceNumberRange("303", "404")),
|
ShardObjectHelper.newSequenceNumberRange("303", "404")),
|
||||||
ShardObjectHelper.newShard(lastShardId, null, null,
|
ShardObjectHelper.newShard(lastShardId, null, null,
|
||||||
ShardObjectHelper.newSequenceNumberRange("405", null)));
|
ShardObjectHelper.newSequenceNumberRange("405", null)));
|
||||||
|
|
||||||
final HierarchicalShardSyncer.LeaseSynchronizer leaseSynchronizer = new HierarchicalShardSyncer.EmptyLeaseTableSynchronizer();
|
final List<Shard> shardsWithLeases = Arrays.asList(ShardObjectHelper.newShard("shardId-2", null,
|
||||||
|
null, ShardObjectHelper.newSequenceNumberRange("202", "302")));
|
||||||
|
|
||||||
|
final List<Shard> shards = Stream.of(shardsWithLeases, shardsWithoutLeases).flatMap(x -> x.stream()).collect(Collectors.toList());
|
||||||
|
final List<Lease> currentLeases = createLeasesFromShards(shardsWithLeases, ExtendedSequenceNumber.LATEST, "foo");
|
||||||
|
final Set<String> inconsistentShardIds = Collections.emptySet();
|
||||||
|
|
||||||
|
Map<String, Shard> shardIdToShardMap = HierarchicalShardSyncer.constructShardIdToShardMap(shards);
|
||||||
|
Map<String, Set<String>> shardIdToChildShardIdsMap = HierarchicalShardSyncer.constructShardIdToChildShardIdsMap(shardIdToShardMap);
|
||||||
|
|
||||||
|
final HierarchicalShardSyncer.LeaseSynchronizer leaseSynchronizer =
|
||||||
|
new HierarchicalShardSyncer.NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap);
|
||||||
|
|
||||||
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases,
|
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases,
|
||||||
INITIAL_POSITION_LATEST);
|
INITIAL_POSITION_LATEST);
|
||||||
|
|
|
||||||
|
|
@ -137,7 +137,7 @@ public class ShutdownTaskTest {
|
||||||
throw new KinesisClientLibIOException("KinesisClientLibIOException");
|
throw new KinesisClientLibIOException("KinesisClientLibIOException");
|
||||||
}).when(hierarchicalShardSyncer)
|
}).when(hierarchicalShardSyncer)
|
||||||
.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, INITIAL_POSITION_TRIM_HORIZON,
|
.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, INITIAL_POSITION_TRIM_HORIZON,
|
||||||
true, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards,
|
false, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards,
|
||||||
NULL_METRICS_FACTORY.createMetrics(), latestShards);
|
NULL_METRICS_FACTORY.createMetrics(), latestShards);
|
||||||
|
|
||||||
final TaskResult result = task.call();
|
final TaskResult result = task.call();
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue