add basic testcases for ignoring children of open parent shards
This commit is contained in:
parent
f2b8f677eb
commit
67b3f565dc
2 changed files with 74 additions and 3 deletions
|
|
@ -60,9 +60,11 @@ class ShardSyncer {
|
||||||
static synchronized void bootstrapShardLeases(IKinesisProxy kinesisProxy,
|
static synchronized void bootstrapShardLeases(IKinesisProxy kinesisProxy,
|
||||||
ILeaseManager<KinesisClientLease> leaseManager,
|
ILeaseManager<KinesisClientLease> leaseManager,
|
||||||
InitialPositionInStreamExtended initialPositionInStream,
|
InitialPositionInStreamExtended initialPositionInStream,
|
||||||
boolean cleanupLeasesOfCompletedShards)
|
boolean cleanupLeasesOfCompletedShards,
|
||||||
|
boolean ignoreUnexpectedChildShards)
|
||||||
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
|
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
|
||||||
syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards, false);
|
syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards,
|
||||||
|
ignoreUnexpectedChildShards);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -146,6 +146,39 @@ public class ShardSyncerTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test determineNewLeasesToCreate() where there are no leases and no resharding operations have been performed, but one of
|
||||||
|
* the shards was marked as inconsistent.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public final void testDetermineNewLeasesToCreate0Leases0Reshards1Inconsistent() {
|
||||||
|
List<Shard> shards = new ArrayList<Shard>();
|
||||||
|
List<KinesisClientLease> currentLeases = new ArrayList<KinesisClientLease>();
|
||||||
|
SequenceNumberRange sequenceRange = ShardObjectHelper.newSequenceNumberRange("342980", null);
|
||||||
|
|
||||||
|
String shardId0 = "shardId-0";
|
||||||
|
shards.add(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange));
|
||||||
|
|
||||||
|
String shardId1 = "shardId-1";
|
||||||
|
shards.add(ShardObjectHelper.newShard(shardId1, null, null, sequenceRange));
|
||||||
|
|
||||||
|
String shardId2 = "shardId-2";
|
||||||
|
shards.add(ShardObjectHelper.newShard(shardId2, shardId1, null, sequenceRange));
|
||||||
|
|
||||||
|
Set<String> inconsistentShardIds = new HashSet<String>();
|
||||||
|
inconsistentShardIds.add(shardId2);
|
||||||
|
|
||||||
|
List<KinesisClientLease> newLeases =
|
||||||
|
ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_LATEST, inconsistentShardIds);
|
||||||
|
Assert.assertEquals(2, newLeases.size());
|
||||||
|
Set<String> expectedLeaseShardIds = new HashSet<String>();
|
||||||
|
expectedLeaseShardIds.add(shardId0);
|
||||||
|
expectedLeaseShardIds.add(shardId1);
|
||||||
|
for (KinesisClientLease lease : newLeases) {
|
||||||
|
Assert.assertTrue(expectedLeaseShardIds.contains(lease.getLeaseKey()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test bootstrapShardLeases() starting at TRIM_HORIZON ("beginning" of stream)
|
* Test bootstrapShardLeases() starting at TRIM_HORIZON ("beginning" of stream)
|
||||||
*
|
*
|
||||||
|
|
@ -296,6 +329,41 @@ public class ShardSyncerTest {
|
||||||
dataFile.delete();
|
dataFile.delete();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test checkAndCreateLeasesForNewShards() when a parent is open and children of open parents are being ignored.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public final void testCheckAndCreateLeasesForNewShardsWhenParentIsOpenAndIgnoringInconsistentChildren()
|
||||||
|
throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException,
|
||||||
|
IOException {
|
||||||
|
List<Shard> shards = constructShardListForGraphA();
|
||||||
|
Shard shard = shards.get(5);
|
||||||
|
Assert.assertEquals("shardId-5", shard.getShardId());
|
||||||
|
SequenceNumberRange range = shard.getSequenceNumberRange();
|
||||||
|
// shardId-5 in graph A has two children (shardId-9 and shardId-10). if shardId-5
|
||||||
|
// is not closed, those children should be ignored when syncing shards, no leases
|
||||||
|
// should be obtained for them, and we should obtain a lease on the still-open
|
||||||
|
// parent.
|
||||||
|
range.setEndingSequenceNumber(null);
|
||||||
|
shard.setSequenceNumberRange(range);
|
||||||
|
File dataFile = KinesisLocalFileDataCreator.generateTempDataFile(shards, 2, "testBootstrap1");
|
||||||
|
dataFile.deleteOnExit();
|
||||||
|
IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath());
|
||||||
|
ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, INITIAL_POSITION_LATEST,
|
||||||
|
cleanupLeasesOfCompletedShards, true);
|
||||||
|
List<KinesisClientLease> newLeases = leaseManager.listLeases();
|
||||||
|
Set<String> expectedLeaseShardIds = new HashSet<String>();
|
||||||
|
expectedLeaseShardIds.add("shardId-4");
|
||||||
|
expectedLeaseShardIds.add("shardId-5");
|
||||||
|
expectedLeaseShardIds.add("shardId-8");
|
||||||
|
Assert.assertEquals(expectedLeaseShardIds.size(), newLeases.size());
|
||||||
|
for (KinesisClientLease lease1 : newLeases) {
|
||||||
|
Assert.assertTrue(expectedLeaseShardIds.contains(lease1.getLeaseKey()));
|
||||||
|
Assert.assertEquals(ExtendedSequenceNumber.LATEST, lease1.getCheckpoint());
|
||||||
|
}
|
||||||
|
dataFile.delete();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @throws KinesisClientLibIOException
|
* @throws KinesisClientLibIOException
|
||||||
* @throws DependencyException
|
* @throws DependencyException
|
||||||
|
|
@ -586,7 +654,8 @@ public class ShardSyncerTest {
|
||||||
dataFile.deleteOnExit();
|
dataFile.deleteOnExit();
|
||||||
IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath());
|
IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath());
|
||||||
|
|
||||||
ShardSyncer.bootstrapShardLeases(kinesisProxy, leaseManager, initialPosition, cleanupLeasesOfCompletedShards);
|
ShardSyncer.bootstrapShardLeases(kinesisProxy, leaseManager, initialPosition, cleanupLeasesOfCompletedShards,
|
||||||
|
false);
|
||||||
List<KinesisClientLease> newLeases = leaseManager.listLeases();
|
List<KinesisClientLease> newLeases = leaseManager.listLeases();
|
||||||
Assert.assertEquals(2, newLeases.size());
|
Assert.assertEquals(2, newLeases.size());
|
||||||
Set<String> expectedLeaseShardIds = new HashSet<String>();
|
Set<String> expectedLeaseShardIds = new HashSet<String>();
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue