Adding java docs
This commit is contained in:
parent
16cf142b44
commit
8609820f20
2 changed files with 48 additions and 1 deletions
|
|
@ -313,6 +313,11 @@ public class HierarchicalShardSyncer {
|
||||||
return shardIdToChildShardIdsMap;
|
return shardIdToChildShardIdsMap;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Helper method to resolve the correct shard filter to use when listing shards from a position in a stream.
|
||||||
|
* @param initialPositionInStreamExtended
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
private static ShardFilter getShardFilterFromInitialPosition(InitialPositionInStreamExtended initialPositionInStreamExtended) {
|
private static ShardFilter getShardFilterFromInitialPosition(InitialPositionInStreamExtended initialPositionInStreamExtended) {
|
||||||
|
|
||||||
ShardFilter.Builder builder = ShardFilter.builder();
|
ShardFilter.Builder builder = ShardFilter.builder();
|
||||||
|
|
@ -806,17 +811,44 @@ public class HierarchicalShardSyncer {
|
||||||
private final StreamIdentifier streamIdentifier;
|
private final StreamIdentifier streamIdentifier;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Interface to determine how to create new leases.
|
||||||
|
*/
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
static interface LeaseSynchronizer {
|
static interface LeaseSynchronizer {
|
||||||
|
/**
|
||||||
|
* Determines how to create leases.
|
||||||
|
* @param shards
|
||||||
|
* @param currentLeases
|
||||||
|
* @param initialPosition
|
||||||
|
* @param inconsistentShardIds
|
||||||
|
* @param multiStreamArgs
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
List<Lease> determineNewLeasesToCreate(List<Shard> shards, List<Lease> currentLeases,
|
List<Lease> determineNewLeasesToCreate(List<Shard> shards, List<Lease> currentLeases,
|
||||||
InitialPositionInStreamExtended initialPosition, Set<String> inconsistentShardIds,
|
InitialPositionInStreamExtended initialPosition, Set<String> inconsistentShardIds,
|
||||||
MultiStreamArgs multiStreamArgs);
|
MultiStreamArgs multiStreamArgs);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Class to help create leases when the table is initially empty.
|
||||||
|
*/
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@AllArgsConstructor
|
@AllArgsConstructor
|
||||||
static class EmptyLeaseTableSynchronizer implements LeaseSynchronizer {
|
static class EmptyLeaseTableSynchronizer implements LeaseSynchronizer {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Determines how to create leases when the lease table is initially empty. For this, we read all shards where
|
||||||
|
* the KCL is reading from. For any shards which are closed, we will discover their child shards through GetRecords
|
||||||
|
* child shard information.
|
||||||
|
*
|
||||||
|
* @param shards
|
||||||
|
* @param currentLeases
|
||||||
|
* @param initialPosition
|
||||||
|
* @param inconsistentShardIds
|
||||||
|
* @param multiStreamArgs
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
@Override
|
@Override
|
||||||
public List<Lease> determineNewLeasesToCreate(List<Shard> shards, List<Lease> currentLeases,
|
public List<Lease> determineNewLeasesToCreate(List<Shard> shards, List<Lease> currentLeases,
|
||||||
InitialPositionInStreamExtended initialPosition, Set<String> inconsistentShardIds, MultiStreamArgs multiStreamArgs) {
|
InitialPositionInStreamExtended initialPosition, Set<String> inconsistentShardIds, MultiStreamArgs multiStreamArgs) {
|
||||||
|
|
@ -860,6 +892,9 @@ public class HierarchicalShardSyncer {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Class to help create leases when the lease table is not initially empty.
|
||||||
|
*/
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@AllArgsConstructor
|
@AllArgsConstructor
|
||||||
static class NonEmptyLeaseTableSynchronizer implements LeaseSynchronizer {
|
static class NonEmptyLeaseTableSynchronizer implements LeaseSynchronizer {
|
||||||
|
|
|
||||||
|
|
@ -1612,18 +1612,30 @@ public class HierarchicalShardSyncerTest {
|
||||||
assertThat(newLeaseMap.isEmpty(), equalTo(true));
|
assertThat(newLeaseMap.isEmpty(), equalTo(true));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests that when reading from TIP, we use the AT_LATEST shard filter.
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testEmptyLeaseTableBootstrapUsesShardFilterWithAtLatest() throws Exception {
|
public void testEmptyLeaseTableBootstrapUsesShardFilterWithAtLatest() throws Exception {
|
||||||
ShardFilter shardFilter = ShardFilter.builder().type(ShardFilterType.AT_LATEST).build();
|
ShardFilter shardFilter = ShardFilter.builder().type(ShardFilterType.AT_LATEST).build();
|
||||||
testEmptyLeaseTableBootstrapUsesListShardsWithFilter(INITIAL_POSITION_LATEST, shardFilter);
|
testEmptyLeaseTableBootstrapUsesListShardsWithFilter(INITIAL_POSITION_LATEST, shardFilter);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests that when reading from TRIM, we use the TRIM_HORIZON shard filter.
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testEmptyLeaseTableBootstrapUsesShardFilterWithAtTrimHorizon() throws Exception {
|
public void testEmptyLeaseTableBootstrapUsesShardFilterWithAtTrimHorizon() throws Exception {
|
||||||
ShardFilter shardFilter = ShardFilter.builder().type(ShardFilterType.AT_TRIM_HORIZON).build();
|
ShardFilter shardFilter = ShardFilter.builder().type(ShardFilterType.AT_TRIM_HORIZON).build();
|
||||||
testEmptyLeaseTableBootstrapUsesListShardsWithFilter(INITIAL_POSITION_TRIM_HORIZON, shardFilter);
|
testEmptyLeaseTableBootstrapUsesListShardsWithFilter(INITIAL_POSITION_TRIM_HORIZON, shardFilter);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests that when reading from AT_TIMESTAMP, we use the AT_TIMESTAMP shard filter.
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testEmptyLeaseTableBootstrapUsesShardFilterWithAtTimestamp() throws Exception {
|
public void testEmptyLeaseTableBootstrapUsesShardFilterWithAtTimestamp() throws Exception {
|
||||||
ShardFilter shardFilter = ShardFilter.builder().type(ShardFilterType.AT_TIMESTAMP).timestamp(new Date(1000L).toInstant()).build();
|
ShardFilter shardFilter = ShardFilter.builder().type(ShardFilterType.AT_TIMESTAMP).timestamp(new Date(1000L).toInstant()).build();
|
||||||
|
|
@ -1668,7 +1680,7 @@ public class HierarchicalShardSyncerTest {
|
||||||
verify(shardDetector, atLeast(1)).listShards();
|
verify(shardDetector, atLeast(1)).listShards();
|
||||||
}
|
}
|
||||||
|
|
||||||
// /**
|
// /**getShardFilterFromInitialPosition
|
||||||
// * Test CheckIfDescendantAndAddNewLeasesForAncestors - two parents, there is a lease for one parent.
|
// * Test CheckIfDescendantAndAddNewLeasesForAncestors - two parents, there is a lease for one parent.
|
||||||
// */
|
// */
|
||||||
// @Test
|
// @Test
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue