Re-adding multistream tests.
This commit is contained in:
parent
e906a835f8
commit
1ccbe614eb
1 changed files with 71 additions and 0 deletions
|
|
@ -179,6 +179,59 @@ public class HierarchicalShardSyncerTest {
|
||||||
assertThat(newLeaseKeys, equalTo(expectedLeaseIds));
|
assertThat(newLeaseKeys, equalTo(expectedLeaseIds));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test determineNewLeasesToCreate() where there are no leases and no resharding operations have been performed, but
|
||||||
|
* one of the shards was marked as inconsistent.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testDetermineNewLeasesToCreate0Leases0Reshards1Inconsistent() {
|
||||||
|
final String shardId0 = "shardId-0";
|
||||||
|
final String shardId1 = "shardId-1";
|
||||||
|
final String shardId2 = "shardId-2";
|
||||||
|
final SequenceNumberRange sequenceRange = ShardObjectHelper.newSequenceNumberRange("342980", null);
|
||||||
|
|
||||||
|
final List<Shard> shards = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange),
|
||||||
|
ShardObjectHelper.newShard(shardId1, null, null, sequenceRange),
|
||||||
|
ShardObjectHelper.newShard(shardId2, shardId1, null, sequenceRange));
|
||||||
|
final List<Lease> currentLeases = Collections.emptyList();
|
||||||
|
|
||||||
|
final Set<String> inconsistentShardIds = new HashSet<>(Collections.singletonList(shardId2));
|
||||||
|
|
||||||
|
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
|
||||||
|
INITIAL_POSITION_LATEST, inconsistentShardIds);
|
||||||
|
final Set<String> newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
|
||||||
|
final Set<String> expectedLeaseShardIds = new HashSet<>(Arrays.asList(shardId0, shardId1));
|
||||||
|
assertThat(newLeases.size(), equalTo(expectedLeaseShardIds.size()));
|
||||||
|
assertThat(newLeaseKeys, equalTo(expectedLeaseShardIds));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test determineNewLeasesToCreate() where there are no leases and no resharding operations have been performed, but
|
||||||
|
* one of the shards was marked as inconsistent.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testDetermineNewLeasesToCreate0Leases0Reshards1InconsistentMultiStream() {
|
||||||
|
final String shardId0 = "shardId-0";
|
||||||
|
final String shardId1 = "shardId-1";
|
||||||
|
final String shardId2 = "shardId-2";
|
||||||
|
final SequenceNumberRange sequenceRange = ShardObjectHelper.newSequenceNumberRange("342980", null);
|
||||||
|
|
||||||
|
final List<Shard> shards = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange),
|
||||||
|
ShardObjectHelper.newShard(shardId1, null, null, sequenceRange),
|
||||||
|
ShardObjectHelper.newShard(shardId2, shardId1, null, sequenceRange));
|
||||||
|
final List<Lease> currentLeases = Collections.emptyList();
|
||||||
|
|
||||||
|
final Set<String> inconsistentShardIds = new HashSet<>(Collections.singletonList(shardId2));
|
||||||
|
|
||||||
|
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
|
||||||
|
INITIAL_POSITION_LATEST, inconsistentShardIds, MULTI_STREAM_ARGS);
|
||||||
|
final Set<String> newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
|
||||||
|
final Set<String> expectedLeaseShardIds = new HashSet<>(
|
||||||
|
toMultiStreamLeaseList(Arrays.asList(shardId0, shardId1)));
|
||||||
|
assertThat(newLeases.size(), equalTo(expectedLeaseShardIds.size()));
|
||||||
|
assertThat(newLeaseKeys, equalTo(expectedLeaseShardIds));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test bootstrapShardLeases() starting at TRIM_HORIZON ("beginning" of stream)
|
* Test bootstrapShardLeases() starting at TRIM_HORIZON ("beginning" of stream)
|
||||||
*/
|
*/
|
||||||
|
|
@ -1045,6 +1098,24 @@ public class HierarchicalShardSyncerTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDetermineNewLeasesToCreateIgnoreClosedShard() {
|
||||||
|
final String lastShardId = "shardId-1";
|
||||||
|
final List<Lease> currentLeases = new ArrayList<>();
|
||||||
|
|
||||||
|
final List<Shard> shards = Arrays.asList(
|
||||||
|
ShardObjectHelper.newShard("shardId-0", null, null,
|
||||||
|
ShardObjectHelper.newSequenceNumberRange("303", "404")),
|
||||||
|
ShardObjectHelper.newShard(lastShardId, null, null,
|
||||||
|
ShardObjectHelper.newSequenceNumberRange("405", null)));
|
||||||
|
|
||||||
|
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
|
||||||
|
INITIAL_POSITION_LATEST);
|
||||||
|
|
||||||
|
assertThat(newLeases.size(), equalTo(1));
|
||||||
|
assertThat(newLeases.get(0).leaseKey(), equalTo(lastShardId));
|
||||||
|
}
|
||||||
|
|
||||||
// /**
|
// /**
|
||||||
// * Test CheckIfDescendantAndAddNewLeasesForAncestors (initial position Latest)
|
// * Test CheckIfDescendantAndAddNewLeasesForAncestors (initial position Latest)
|
||||||
// * Shard structure (each level depicts a stream segment):
|
// * Shard structure (each level depicts a stream segment):
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue