Add lease sync strategy for empty lease table (#24)

* Add lease sync strategy for empty lease table

* Fix ShardSyncer unit tests to reflect new empty lease table shard sync logic
This commit is contained in:
Micah Jaffe 2020-05-04 17:06:57 -07:00 committed by GitHub
parent c7cd2f1e75
commit 550d7af5b1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 566 additions and 157 deletions

View file

@ -0,0 +1,98 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.impl.Lease;
import com.amazonaws.services.kinesis.model.Shard;
import lombok.AllArgsConstructor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisShardSyncer.StartingSequenceNumberAndShardIdBasedComparator;
/**
* Class to help create leases when the table is initially empty.
*/
@AllArgsConstructor
class EmptyLeaseTableSynchronizer implements LeaseSynchronizer {
private static final Log LOG = LogFactory.getLog(EmptyLeaseTableSynchronizer.class);
/**
* Determines how to create leases when the lease table is initially empty. For this, we read all shards where
* the KCL is reading from. For any shards which are closed, we will discover their child shards through GetRecords
* child shard information.
*
* @param shards
* @param currentLeases
* @param initialPosition
* @param inconsistentShardIds
* @return
*/
@Override
public List<KinesisClientLease> determineNewLeasesToCreate(List<Shard> shards,
List<KinesisClientLease> currentLeases,
InitialPositionInStreamExtended initialPosition,
Set<String> inconsistentShardIds) {
final Map<String, Shard> shardIdToShardMapOfAllKinesisShards =
KinesisShardSyncer.constructShardIdToShardMap(shards);
currentLeases.forEach(lease -> LOG.debug("Existing lease: " + lease.getLeaseKey()));
final List<KinesisClientLease> newLeasesToCreate =
getLeasesToCreateForOpenAndClosedShards(initialPosition, shards);
final Comparator<KinesisClientLease> startingSequenceNumberComparator =
new StartingSequenceNumberAndShardIdBasedComparator(shardIdToShardMapOfAllKinesisShards);
newLeasesToCreate.sort(startingSequenceNumberComparator);
return newLeasesToCreate;
}
/**
* Helper method to create leases. For an empty lease table, we will be creating leases for all shards
* regardless of if they are open or closed. Closed shards will be unblocked via child shard information upon
* reaching SHARD_END.
*/
private List<KinesisClientLease> getLeasesToCreateForOpenAndClosedShards(
InitialPositionInStreamExtended initialPosition,
List<Shard> shards) {
final Map<String, Lease> shardIdToNewLeaseMap = new HashMap<>();
for (Shard shard : shards) {
final String shardId = shard.getShardId();
final KinesisClientLease lease = KinesisShardSyncer.newKCLLease(shard);
final ExtendedSequenceNumber checkpoint = KinesisShardSyncer.convertToCheckpoint(initialPosition);
lease.setCheckpoint(checkpoint);
LOG.debug("Need to create a lease for shard with shardId " + shardId);
shardIdToNewLeaseMap.put(shardId, lease);
}
return new ArrayList(shardIdToNewLeaseMap.values());
}
}

View file

@ -26,6 +26,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import com.amazonaws.services.kinesis.leases.impl.Lease;
import com.amazonaws.services.kinesis.leases.impl.LeaseManager;
import com.amazonaws.services.kinesis.model.ShardFilter;
import com.amazonaws.services.kinesis.model.ShardFilterType;
import com.amazonaws.util.CollectionUtils;
@ -45,6 +47,8 @@ import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
import com.amazonaws.services.kinesis.model.Shard;
import javax.annotation.Nullable;
/**
* Helper class to sync leases with shards of the Kinesis stream.
* It will create new leases/activities when it discovers new Kinesis shards (bootstrap/resharding).
@ -111,7 +115,8 @@ class KinesisShardSyncer implements ShardSyncer {
boolean cleanupLeasesOfCompletedShards, boolean ignoreUnexpectedChildShards, List<Shard> latestShards)
throws DependencyException, InvalidStateException, ProvisionedThroughputException,
KinesisClientLibIOException {
syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, latestShards);
syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards, latestShards, leaseManager.isLeaseTableEmpty());
}
/**
@ -136,11 +141,13 @@ class KinesisShardSyncer implements ShardSyncer {
// In the case where the lease table is empty, we want to synchronize the minimal amount of shards possible
// based on the given initial position.
// TODO: Implement shard list filtering on non-empty lease table case
final List<Shard> latestShards = leaseManager.isLeaseTableEmpty()
final boolean isLeaseTableEmpty = leaseManager.isLeaseTableEmpty();
final List<Shard> latestShards = isLeaseTableEmpty
? getShardListAtInitialPosition(kinesisProxy, initialPosition)
: getCompleteShardList(kinesisProxy);
syncShardLeases(kinesisProxy, leaseManager, initialPosition, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, latestShards);
syncShardLeases(kinesisProxy, leaseManager, initialPosition, cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards, latestShards, isLeaseTableEmpty);
}
/**
@ -159,10 +166,15 @@ class KinesisShardSyncer implements ShardSyncer {
*/
// CHECKSTYLE:OFF CyclomaticComplexity
private synchronized void syncShardLeases(IKinesisProxy kinesisProxy,
ILeaseManager<KinesisClientLease> leaseManager, InitialPositionInStreamExtended initialPosition,
boolean cleanupLeasesOfCompletedShards, boolean ignoreUnexpectedChildShards, List<Shard> latestShards)
ILeaseManager<KinesisClientLease> leaseManager,
InitialPositionInStreamExtended initialPosition,
boolean cleanupLeasesOfCompletedShards,
boolean ignoreUnexpectedChildShards,
List<Shard> latestShards,
boolean isLeaseTableEmpty)
throws DependencyException, InvalidStateException, ProvisionedThroughputException,
KinesisClientLibIOException {
List<Shard> shards;
if(CollectionUtils.isNullOrEmpty(latestShards)) {
shards = getCompleteShardList(kinesisProxy);
@ -178,11 +190,16 @@ class KinesisShardSyncer implements ShardSyncer {
assertAllParentShardsAreClosed(inconsistentShardIds);
}
List<KinesisClientLease> currentLeases = leaseManager.listLeases();
// Determine which lease sync strategy to use based on the state of the lease table
final LeaseSynchronizer leaseSynchronizer = isLeaseTableEmpty
? new EmptyLeaseTableSynchronizer()
: new NonEmptyLeaseTableSynchronizer(shardIdToShardMap, shardIdToChildShardIdsMap);
List<KinesisClientLease> newLeasesToCreate = determineNewLeasesToCreate(shards, currentLeases, initialPosition,
inconsistentShardIds);
final List<KinesisClientLease> currentLeases = leaseManager.listLeases();
final List<KinesisClientLease> newLeasesToCreate = determineNewLeasesToCreate(leaseSynchronizer, shards,
currentLeases, initialPosition, inconsistentShardIds);
LOG.debug("Num new leases to create: " + newLeasesToCreate.size());
for (KinesisClientLease lease : newLeasesToCreate) {
long startTimeMillis = System.currentTimeMillis();
boolean success = false;
@ -326,7 +343,7 @@ class KinesisShardSyncer implements ShardSyncer {
* @param shardIdToShardMap
* @return
*/
Map<String, Set<String>> constructShardIdToChildShardIdsMap(Map<String, Shard> shardIdToShardMap) {
static Map<String, Set<String>> constructShardIdToChildShardIdsMap(Map<String, Shard> shardIdToShardMap) {
Map<String, Set<String>> shardIdToChildShardIdsMap = new HashMap<>();
for (Map.Entry<String, Shard> entry : shardIdToShardMap.entrySet()) {
String shardId = entry.getKey();
@ -405,42 +422,8 @@ class KinesisShardSyncer implements ShardSyncer {
* Determine new leases to create and their initial checkpoint.
* Note: Package level access only for testing purposes.
*
* For each open (no ending sequence number) shard without open parents that doesn't already have a lease,
* determine if it is a descendent of any shard which is or will be processed (e.g. for which a lease exists):
* If so, set checkpoint of the shard to TrimHorizon and also create leases for ancestors if needed.
* If not, set checkpoint of the shard to the initial position specified by the client.
* To check if we need to create leases for ancestors, we use the following rules:
* * If we began (or will begin) processing data for a shard, then we must reach end of that shard before
* we begin processing data from any of its descendants.
* * A shard does not start processing data until data from all its parents has been processed.
* Note, if the initial position is LATEST and a shard has two parents and only one is a descendant - we'll create
* leases corresponding to both the parents - the parent shard which is not a descendant will have
* its checkpoint set to Latest.
*
* We assume that if there is an existing lease for a shard, then either:
* * we have previously created a lease for its parent (if it was needed), or
* * the parent shard has expired.
*
* For example:
* Shard structure (each level depicts a stream segment):
* 0 1 2 3 4 5 - shards till epoch 102
* \ / \ / | |
* 6 7 4 5 - shards from epoch 103 - 205
* \ / | / \
* 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
* Current leases: (3, 4, 5)
* New leases to create: (2, 6, 7, 8, 9, 10)
*
* The leases returned are sorted by the starting sequence number - following the same order
* when persisting the leases in DynamoDB will ensure that we recover gracefully if we fail
* before creating all the leases.
*
* If a shard has no existing lease, is open, and is a descendant of a parent which is still open, we ignore it
* here; this happens when the list of shards is inconsistent, which could be due to pagination delay for very
* high shard count streams (i.e., dynamodb streams for tables with thousands of partitions). This can only
* currently happen here if ignoreUnexpectedChildShards was true in syncShardleases.
*
*
* @param leaseSynchronizer determines the strategy to use when updating leases based on the current state of
* the lease table (empty vs. non-empty)
* @param shards List of all shards in Kinesis (we'll create new leases based on this set)
* @param currentLeases List of current leases
* @param initialPosition One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. We'll start fetching records from that
@ -448,85 +431,27 @@ class KinesisShardSyncer implements ShardSyncer {
* @param inconsistentShardIds Set of child shard ids having open parents.
* @return List of new leases to create sorted by starting sequenceNumber of the corresponding shard
*/
List<KinesisClientLease> determineNewLeasesToCreate(List<Shard> shards, List<KinesisClientLease> currentLeases,
InitialPositionInStreamExtended initialPosition, Set<String> inconsistentShardIds) {
Map<String, KinesisClientLease> shardIdToNewLeaseMap = new HashMap<String, KinesisClientLease>();
Map<String, Shard> shardIdToShardMapOfAllKinesisShards = constructShardIdToShardMap(shards);
List<KinesisClientLease> determineNewLeasesToCreate(LeaseSynchronizer leaseSynchronizer,
List<Shard> shards,
List<KinesisClientLease> currentLeases,
InitialPositionInStreamExtended initialPosition,
Set<String> inconsistentShardIds) {
Set<String> shardIdsOfCurrentLeases = new HashSet<String>();
for (KinesisClientLease lease : currentLeases) {
shardIdsOfCurrentLeases.add(lease.getLeaseKey());
LOG.debug("Existing lease: " + lease);
}
List<Shard> openShards = getOpenShards(shards);
Map<String, Boolean> memoizationContext = new HashMap<>();
// Iterate over the open shards and find those that don't have any lease entries.
for (Shard shard : openShards) {
String shardId = shard.getShardId();
LOG.debug("Evaluating leases for open shard " + shardId + " and its ancestors.");
if (shardIdsOfCurrentLeases.contains(shardId)) {
LOG.debug("Lease for shardId " + shardId + " already exists. Not creating a lease");
} else if (inconsistentShardIds.contains(shardId)) {
LOG.info("shardId " + shardId + " is an inconsistent child. Not creating a lease");
} else {
LOG.debug("Need to create a lease for shardId " + shardId);
KinesisClientLease newLease = newKCLLease(shard);
boolean isDescendant = checkIfDescendantAndAddNewLeasesForAncestors(shardId, initialPosition,
shardIdsOfCurrentLeases, shardIdToShardMapOfAllKinesisShards, shardIdToNewLeaseMap,
memoizationContext);
/**
* If the shard is a descendant and the specified initial position is AT_TIMESTAMP, then the
* checkpoint should be set to AT_TIMESTAMP, else to TRIM_HORIZON. For AT_TIMESTAMP, we will add a
* lease just like we do for TRIM_HORIZON. However we will only return back records with server-side
* timestamp at or after the specified initial position timestamp.
*
* Shard structure (each level depicts a stream segment):
* 0 1 2 3 4 5 - shards till epoch 102
* \ / \ / | |
* 6 7 4 5 - shards from epoch 103 - 205
* \ / | /\
* 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
*
* Current leases: empty set
*
* For the above example, suppose the initial position in stream is set to AT_TIMESTAMP with
* timestamp value 206. We will then create new leases for all the shards (with checkpoint set to
* AT_TIMESTAMP), including the ancestor shards with epoch less than 206. However as we begin
* processing the ancestor shards, their checkpoints would be updated to SHARD_END and their leases
* would then be deleted since they won't have records with server-side timestamp at/after 206. And
* after that we will begin processing the descendant shards with epoch at/after 206 and we will
* return the records that meet the timestamp requirement for these shards.
*/
if (isDescendant && !initialPosition.getInitialPositionInStream()
.equals(InitialPositionInStream.AT_TIMESTAMP)) {
newLease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON);
} else {
newLease.setCheckpoint(convertToCheckpoint(initialPosition));
}
LOG.debug("Set checkpoint of " + newLease.getLeaseKey() + " to " + newLease.getCheckpoint());
shardIdToNewLeaseMap.put(shardId, newLease);
}
}
List<KinesisClientLease> newLeasesToCreate = new ArrayList<KinesisClientLease>();
newLeasesToCreate.addAll(shardIdToNewLeaseMap.values());
Comparator<? super KinesisClientLease> startingSequenceNumberComparator = new StartingSequenceNumberAndShardIdBasedComparator(
shardIdToShardMapOfAllKinesisShards);
Collections.sort(newLeasesToCreate, startingSequenceNumberComparator);
return newLeasesToCreate;
return leaseSynchronizer.determineNewLeasesToCreate(shards, currentLeases, initialPosition,
inconsistentShardIds);
}
/**
* Determine new leases to create and their initial checkpoint.
* Note: Package level access only for testing purposes.
*/
List<KinesisClientLease> determineNewLeasesToCreate(List<Shard> shards, List<KinesisClientLease> currentLeases,
InitialPositionInStreamExtended initialPosition) {
List<KinesisClientLease> determineNewLeasesToCreate(LeaseSynchronizer leaseSynchronizer,
List<Shard> shards,
List<KinesisClientLease> currentLeases,
InitialPositionInStreamExtended initialPosition) {
Set<String> inconsistentShardIds = new HashSet<String>();
return determineNewLeasesToCreate(shards, currentLeases, initialPosition, inconsistentShardIds);
return determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, initialPosition, inconsistentShardIds);
}
/**
@ -545,7 +470,7 @@ class KinesisShardSyncer implements ShardSyncer {
* @return true if the shard is a descendant of any current shard (lease already exists)
*/
// CHECKSTYLE:OFF CyclomaticComplexity
boolean checkIfDescendantAndAddNewLeasesForAncestors(String shardId,
static boolean checkIfDescendantAndAddNewLeasesForAncestors(String shardId,
InitialPositionInStreamExtended initialPosition, Set<String> shardIdsOfCurrentLeases,
Map<String, Shard> shardIdToShardMapOfAllKinesisShards,
Map<String, KinesisClientLease> shardIdToLeaseMapOfNewShards, Map<String, Boolean> memoizationContext) {
@ -630,7 +555,7 @@ class KinesisShardSyncer implements ShardSyncer {
* @param shardIdToShardMapOfAllKinesisShards ShardId->Shard map containing all shards obtained via DescribeStream.
* @return Set of parentShardIds
*/
Set<String> getParentShardIds(Shard shard, Map<String, Shard> shardIdToShardMapOfAllKinesisShards) {
static Set<String> getParentShardIds(Shard shard, Map<String, Shard> shardIdToShardMapOfAllKinesisShards) {
Set<String> parentShardIds = new HashSet<String>(2);
String parentShardId = shard.getParentShardId();
if ((parentShardId != null) && shardIdToShardMapOfAllKinesisShards.containsKey(parentShardId)) {
@ -794,7 +719,7 @@ class KinesisShardSyncer implements ShardSyncer {
* @param shard
* @return
*/
KinesisClientLease newKCLLease(Shard shard) {
static KinesisClientLease newKCLLease(Shard shard) {
KinesisClientLease newLease = new KinesisClientLease();
newLease.setLeaseKey(shard.getShardId());
List<String> parentShardIds = new ArrayList<String>(2);
@ -816,7 +741,7 @@ class KinesisShardSyncer implements ShardSyncer {
* @param shards List of shards
* @return ShardId->Shard map
*/
Map<String, Shard> constructShardIdToShardMap(List<Shard> shards) {
static Map<String, Shard> constructShardIdToShardMap(List<Shard> shards) {
Map<String, Shard> shardIdToShardMap = new HashMap<String, Shard>();
for (Shard shard : shards) {
shardIdToShardMap.put(shard.getShardId(), shard);
@ -831,7 +756,7 @@ class KinesisShardSyncer implements ShardSyncer {
* @param allShards All shards returved via DescribeStream. We assume this to represent a consistent shard list.
* @return List of open shards (shards at the tip of the stream) - may include shards that are not yet active.
*/
List<Shard> getOpenShards(List<Shard> allShards) {
static List<Shard> getOpenShards(List<Shard> allShards) {
List<Shard> openShards = new ArrayList<Shard>();
for (Shard shard : allShards) {
String endingSequenceNumber = shard.getSequenceNumberRange().getEndingSequenceNumber();
@ -843,7 +768,7 @@ class KinesisShardSyncer implements ShardSyncer {
return openShards;
}
private ExtendedSequenceNumber convertToCheckpoint(InitialPositionInStreamExtended position) {
static ExtendedSequenceNumber convertToCheckpoint(InitialPositionInStreamExtended position) {
ExtendedSequenceNumber checkpoint = null;
if (position.getInitialPositionInStream().equals(InitialPositionInStream.TRIM_HORIZON)) {
@ -860,7 +785,7 @@ class KinesisShardSyncer implements ShardSyncer {
/** Helper class to compare leases based on starting sequence number of the corresponding shards.
*
*/
private static class StartingSequenceNumberAndShardIdBasedComparator implements Comparator<KinesisClientLease>,
static class StartingSequenceNumberAndShardIdBasedComparator implements Comparator<KinesisClientLease>,
Serializable {
private static final long serialVersionUID = 1L;

View file

@ -0,0 +1,41 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.model.Shard;
import java.util.List;
import java.util.Set;
/**
* Interface used by {@link KinesisShardSyncer} to determine how to create new leases based on the current state
* of the lease table (i.e. whether the lease table is empty or non-empty).
*/
interface LeaseSynchronizer {
/**
* Determines how to create leases.
* @param shards
* @param currentLeases
* @param initialPosition
* @param inconsistentShardIds
* @return
*/
List<KinesisClientLease> determineNewLeasesToCreate(List<Shard> shards,
List<KinesisClientLease> currentLeases,
InitialPositionInStreamExtended initialPosition,
Set<String> inconsistentShardIds);
}

View file

@ -0,0 +1,165 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.impl.Lease;
import com.amazonaws.services.kinesis.model.Shard;
import lombok.AllArgsConstructor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* TODO - non-empty lease table sync story
*/
@AllArgsConstructor
class NonEmptyLeaseTableSynchronizer implements LeaseSynchronizer {
private static final Log LOG = LogFactory.getLog(NonEmptyLeaseTableSynchronizer.class);
private final Map<String, Shard> shardIdToShardMap;
private final Map<String, Set<String>> shardIdToChildShardIdsMap;
/**
* Determine new leases to create and their initial checkpoint.
* Note: Package level access only for testing purposes.
*
* For each open (no ending sequence number) shard without open parents that doesn't already have a lease,
* determine if it is a descendent of any shard which is or will be processed (e.g. for which a lease exists):
* If so, set checkpoint of the shard to TrimHorizon and also create leases for ancestors if needed.
* If not, set checkpoint of the shard to the initial position specified by the client.
* To check if we need to create leases for ancestors, we use the following rules:
* * If we began (or will begin) processing data for a shard, then we must reach end of that shard before
* we begin processing data from any of its descendants.
* * A shard does not start processing data until data from all its parents has been processed.
* Note, if the initial position is LATEST and a shard has two parents and only one is a descendant - we'll create
* leases corresponding to both the parents - the parent shard which is not a descendant will have
* its checkpoint set to Latest.
*
* We assume that if there is an existing lease for a shard, then either:
* * we have previously created a lease for its parent (if it was needed), or
* * the parent shard has expired.
*
* For example:
* Shard structure (each level depicts a stream segment):
* 0 1 2 3 4 5 - shards till epoch 102
* \ / \ / | |
* 6 7 4 5 - shards from epoch 103 - 205
* \ / | / \
* 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
* Current leases: (3, 4, 5)
* New leases to create: (2, 6, 7, 8, 9, 10)
*
* The leases returned are sorted by the starting sequence number - following the same order
* when persisting the leases in DynamoDB will ensure that we recover gracefully if we fail
* before creating all the leases.
*
* If a shard has no existing lease, is open, and is a descendant of a parent which is still open, we ignore it
* here; this happens when the list of shards is inconsistent, which could be due to pagination delay for very
* high shard count streams (i.e., dynamodb streams for tables with thousands of partitions). This can only
* currently happen here if ignoreUnexpectedChildShards was true in syncShardleases.
*
* @param shards List of all shards in Kinesis (we'll create new leases based on this set)
* @param currentLeases List of current leases
* @param initialPosition One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. We'll start fetching records from that
* location in the shard (when an application starts up for the first time - and there are no checkpoints).
* @param inconsistentShardIds Set of child shard ids having open parents.
* @return List of new leases to create sorted by starting sequenceNumber of the corresponding shard
*/
@Override
public List<KinesisClientLease> determineNewLeasesToCreate(List<Shard> shards,
List<KinesisClientLease> currentLeases,
InitialPositionInStreamExtended initialPosition,
Set<String> inconsistentShardIds) {
Map<String, KinesisClientLease> shardIdToNewLeaseMap = new HashMap<>();
Map<String, Shard> shardIdToShardMapOfAllKinesisShards = KinesisShardSyncer.constructShardIdToShardMap(shards);
Set<String> shardIdsOfCurrentLeases = new HashSet<String>();
for (Lease lease : currentLeases) {
shardIdsOfCurrentLeases.add(lease.getLeaseKey());
LOG.debug("Existing lease: " + lease);
}
List<Shard> openShards = KinesisShardSyncer.getOpenShards(shards);
Map<String, Boolean> memoizationContext = new HashMap<>();
// Iterate over the open shards and find those that don't have any lease entries.
for (Shard shard : openShards) {
String shardId = shard.getShardId();
LOG.debug("Evaluating leases for open shard " + shardId + " and its ancestors.");
if (shardIdsOfCurrentLeases.contains(shardId)) {
LOG.debug("Lease for shardId " + shardId + " already exists. Not creating a lease");
} else if (inconsistentShardIds.contains(shardId)) {
LOG.info("shardId " + shardId + " is an inconsistent child. Not creating a lease");
} else {
LOG.debug("Need to create a lease for shardId " + shardId);
KinesisClientLease newLease = KinesisShardSyncer.newKCLLease(shard);
boolean isDescendant = KinesisShardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(shardId,
initialPosition, shardIdsOfCurrentLeases, shardIdToShardMapOfAllKinesisShards,
shardIdToNewLeaseMap, memoizationContext);
/**
* If the shard is a descendant and the specified initial position is AT_TIMESTAMP, then the
* checkpoint should be set to AT_TIMESTAMP, else to TRIM_HORIZON. For AT_TIMESTAMP, we will add a
* lease just like we do for TRIM_HORIZON. However we will only return back records with server-side
* timestamp at or after the specified initial position timestamp.
*
* Shard structure (each level depicts a stream segment):
* 0 1 2 3 4 5 - shards till epoch 102
* \ / \ / | |
* 6 7 4 5 - shards from epoch 103 - 205
* \ / | /\
* 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
*
* Current leases: empty set
*
* For the above example, suppose the initial position in stream is set to AT_TIMESTAMP with
* timestamp value 206. We will then create new leases for all the shards (with checkpoint set to
* AT_TIMESTAMP), including the ancestor shards with epoch less than 206. However as we begin
* processing the ancestor shards, their checkpoints would be updated to SHARD_END and their leases
* would then be deleted since they won't have records with server-side timestamp at/after 206. And
* after that we will begin processing the descendant shards with epoch at/after 206 and we will
* return the records that meet the timestamp requirement for these shards.
*/
if (isDescendant && !initialPosition.getInitialPositionInStream()
.equals(InitialPositionInStream.AT_TIMESTAMP)) {
newLease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON);
} else {
newLease.setCheckpoint(KinesisShardSyncer.convertToCheckpoint(initialPosition));
}
LOG.debug("Set checkpoint of " + newLease.getLeaseKey() + " to " + newLease.getCheckpoint());
shardIdToNewLeaseMap.put(shardId, newLease);
}
}
List<KinesisClientLease> newLeasesToCreate = new ArrayList<>();
newLeasesToCreate.addAll(shardIdToNewLeaseMap.values());
Comparator<? super KinesisClientLease> startingSequenceNumberComparator = new KinesisShardSyncer.StartingSequenceNumberAndShardIdBasedComparator(
shardIdToShardMapOfAllKinesisShards);
Collections.sort(newLeasesToCreate, startingSequenceNumberComparator);
return newLeasesToCreate;
}
}

View file

@ -18,17 +18,21 @@ import java.io.File;
import java.io.IOException;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.dynamodbv2.local.embedded.DynamoDBEmbedded;
import com.amazonaws.services.kinesis.leases.impl.Lease;
import com.amazonaws.services.kinesis.model.ShardFilter;
import com.amazonaws.services.kinesis.model.ShardFilterType;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.After;
@ -56,6 +60,11 @@ import com.amazonaws.services.kinesis.model.Shard;
import junit.framework.Assert;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
/**
*
*/
@ -69,9 +78,9 @@ public class ShardSyncerTest {
private static final InitialPositionInStreamExtended INITIAL_POSITION_AT_TIMESTAMP =
InitialPositionInStreamExtended.newInitialPositionAtTimestamp(new Date(1000L));
private final boolean cleanupLeasesOfCompletedShards = true;
AmazonDynamoDB ddbClient = DynamoDBEmbedded.create().amazonDynamoDB();
LeaseManager<KinesisClientLease> leaseManager = new KinesisClientLeaseManager("tempTestTable", ddbClient, KinesisClientLibConfiguration.DEFAULT_DDB_BILLING_MODE);
private static final int EXPONENT = 128;
AmazonDynamoDB ddbClient = DynamoDBEmbedded.create().amazonDynamoDB();
private LeaseManager<KinesisClientLease> leaseManager = new KinesisClientLeaseManager("tempTestTable", ddbClient, KinesisClientLibConfiguration.DEFAULT_DDB_BILLING_MODE);
protected static final KinesisLeaseCleanupValidator leaseCleanupValidator = new KinesisLeaseCleanupValidator();
private static final KinesisShardSyncer shardSyncer = new KinesisShardSyncer(leaseCleanupValidator);
/**
@ -120,8 +129,9 @@ public class ShardSyncerTest {
public final void testDetermineNewLeasesToCreateNoShards() {
List<Shard> shards = new ArrayList<Shard>();
List<KinesisClientLease> leases = new ArrayList<KinesisClientLease>();
final LeaseSynchronizer leaseSynchronizer = getLeaseSynchronizer(shards, leases);
Assert.assertTrue(shardSyncer.determineNewLeasesToCreate(shards, leases, INITIAL_POSITION_LATEST).isEmpty());
Assert.assertTrue(shardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, leases, INITIAL_POSITION_LATEST).isEmpty());
}
/**
@ -139,8 +149,10 @@ public class ShardSyncerTest {
String shardId1 = "shardId-1";
shards.add(ShardObjectHelper.newShard(shardId1, null, null, sequenceRange));
final LeaseSynchronizer leaseSynchronizer = getLeaseSynchronizer(shards, currentLeases);
List<KinesisClientLease> newLeases =
shardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_LATEST);
shardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, INITIAL_POSITION_LATEST);
Assert.assertEquals(2, newLeases.size());
Set<String> expectedLeaseShardIds = new HashSet<String>();
expectedLeaseShardIds.add(shardId0);
@ -151,7 +163,7 @@ public class ShardSyncerTest {
}
/**
* Test determineNewLeasesToCreate() where there are no leases and no resharding operations have been performed, but one of
* Test determineNewLeasesToCreate() where there is one lease and no resharding operations have been performed, but one of
* the shards was marked as inconsistent.
*/
@Test
@ -169,11 +181,18 @@ public class ShardSyncerTest {
String shardId2 = "shardId-2";
shards.add(ShardObjectHelper.newShard(shardId2, shardId1, null, sequenceRange));
String shardIdWithLease = "shardId-3";
shards.add(ShardObjectHelper.newShard(shardIdWithLease, shardIdWithLease, null, sequenceRange));
currentLeases.add(newLease(shardIdWithLease));
Set<String> inconsistentShardIds = new HashSet<String>();
inconsistentShardIds.add(shardId2);
final LeaseSynchronizer leaseSynchronizer = getLeaseSynchronizer(shards, currentLeases);
List<KinesisClientLease> newLeases =
shardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_LATEST, inconsistentShardIds);
shardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, INITIAL_POSITION_LATEST, inconsistentShardIds);
Assert.assertEquals(2, newLeases.size());
Set<String> expectedLeaseShardIds = new HashSet<String>();
expectedLeaseShardIds.add(shardId0);
@ -216,14 +235,10 @@ public class ShardSyncerTest {
}
/**
* @throws KinesisClientLibIOException
* @throws DependencyException
* @throws InvalidStateException
* @throws ProvisionedThroughputException
* @throws IOException
* All open and closed shards within stream's retention period should be sync'ed when lease table is empty.
*/
@Test
public final void testCheckAndCreateLeasesForNewShardsAtLatest()
public final void testCheckAndCreateLeasesForNewShardsAtLatestWithEmptyLeaseTable()
throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException,
IOException {
List<Shard> shards = constructShardListForGraphA();
@ -231,6 +246,38 @@ public class ShardSyncerTest {
dataFile.deleteOnExit();
IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath());
shardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, INITIAL_POSITION_LATEST,
cleanupLeasesOfCompletedShards, false, shards);
List<KinesisClientLease> newLeases = leaseManager.listLeases();
Set<String> expectedLeaseShardIds = new HashSet<String>();
for (int i = 0; i < 11; i++) {
expectedLeaseShardIds.add("shardId-" + i);
}
Assert.assertEquals(expectedLeaseShardIds.size(), newLeases.size());
for (KinesisClientLease lease1 : newLeases) {
Assert.assertTrue(expectedLeaseShardIds.contains(lease1.getLeaseKey()));
Assert.assertEquals(ExtendedSequenceNumber.LATEST, lease1.getCheckpoint());
}
dataFile.delete();
}
/**
* We should only create leases for shards at LATEST when lease table is not empty.
*/
@Test
public final void testCheckAndCreateLeasesForNewShardsAtLatestWithPartialLeaseTable()
throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException,
IOException {
List<Shard> shards = constructShardListForGraphA();
File dataFile = KinesisLocalFileDataCreator.generateTempDataFile(shards, 2, "testBootstrap1");
dataFile.deleteOnExit();
IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath());
// shardId-10 exists at LATEST - create a lease for it
KinesisClientLease lease = newLease("shardId-10");
lease.setCheckpoint(ExtendedSequenceNumber.LATEST);
leaseManager.createLeaseIfNotExists(lease);
shardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, INITIAL_POSITION_LATEST,
cleanupLeasesOfCompletedShards, false, shards);
List<KinesisClientLease> newLeases = leaseManager.listLeases();
@ -353,6 +400,13 @@ public class ShardSyncerTest {
File dataFile = KinesisLocalFileDataCreator.generateTempDataFile(shards, 2, "testBootstrap1");
dataFile.deleteOnExit();
IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath());
// Create a dummy lease in the lease table - otherwise leaseManager will create leases for all shards if
// lease table is empty.
KinesisClientLease lease = newLease("shardId-1000");
lease.setCheckpoint(ExtendedSequenceNumber.LATEST);
leaseManager.createLeaseIfNotExists(lease);
shardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, INITIAL_POSITION_LATEST,
cleanupLeasesOfCompletedShards, true, shards);
List<KinesisClientLease> newLeases = leaseManager.listLeases();
@ -695,8 +749,9 @@ public class ShardSyncerTest {
initialPositions.add(INITIAL_POSITION_TRIM_HORIZON);
for (InitialPositionInStreamExtended initialPosition : initialPositions) {
final LeaseSynchronizer leaseSynchronizer = getLeaseSynchronizer(shards, currentLeases);
List<KinesisClientLease> newLeases =
shardSyncer.determineNewLeasesToCreate(shards, currentLeases, initialPosition);
shardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, initialPosition);
Assert.assertEquals(2, newLeases.size());
Set<String> expectedLeaseShardIds = new HashSet<String>();
expectedLeaseShardIds.add(shardId0);
@ -710,27 +765,73 @@ public class ShardSyncerTest {
}
/**
* Test determineNewLeasesToCreate() - 1 closed and 1 open shard (ignore closed shard)
* Test determineNewLeasesToCreate() - 1 closed and 1 open shard (ignore closed shard), 1 shard with a lease
* already in lease table. If lease table is non-empty, closed shards should be ignored.
*/
@Test
public final void testDetermineNewLeasesToCreateIgnoreClosedShard() {
List<Shard> shards = new ArrayList<Shard>();
List<KinesisClientLease> currentLeases = new ArrayList<KinesisClientLease>();
public final void testDetermineNewLeasesToCreateIgnoresClosedShardWithPartialLeaseTable() {
final List<Shard> shardsWithoutLeases = new ArrayList<Shard>();
final List<Shard> shardsWithLeases = new ArrayList<Shard>();
final List<KinesisClientLease> currentLeases = new ArrayList<KinesisClientLease>();
shards.add(ShardObjectHelper.newShard("shardId-0",
shardsWithoutLeases.add(ShardObjectHelper.newShard("shardId-0",
null,
null,
ShardObjectHelper.newSequenceNumberRange("303", "404")));
String lastShardId = "shardId-1";
final String lastShardId = "shardId-1";
shardsWithoutLeases.add(ShardObjectHelper.newShard(lastShardId,
null,
null,
ShardObjectHelper.newSequenceNumberRange("405", null)));
shardsWithLeases.add(ShardObjectHelper.newShard("shardId-2",
null,
null,
ShardObjectHelper.newSequenceNumberRange("202", "302")));
currentLeases.add(newLease("shardId-2"));
final List<Shard> allShards =
Stream.concat(shardsWithLeases.stream(), shardsWithoutLeases.stream()).collect(Collectors.toList());
final LeaseSynchronizer leaseSynchronizer = getLeaseSynchronizer(allShards, currentLeases);
final List<KinesisClientLease> newLeases =
shardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shardsWithoutLeases, currentLeases, INITIAL_POSITION_LATEST);
Assert.assertEquals(1, newLeases.size());
Assert.assertEquals(lastShardId, newLeases.get(0).getLeaseKey());
}
/**
* Test determineNewLeasesToCreate() - 1 closed and 1 open shard. Since lease table is empty, we should create
* leases for all shards, regardless if they are open or closed.
*/
@Test
public final void testDetermineNewLeasesToCreateDoesntIgnoreClosedShardWithEmptyLeaseTable() {
final List<Shard> shards = new ArrayList<>();
final List<KinesisClientLease> currentLeases = new ArrayList<>();
final String firstShardId = "shardId-0";
shards.add(ShardObjectHelper.newShard(firstShardId,
null,
null,
ShardObjectHelper.newSequenceNumberRange("303", "404")));
final String lastShardId = "shardId-1";
shards.add(ShardObjectHelper.newShard(lastShardId,
null,
null,
ShardObjectHelper.newSequenceNumberRange("405", null)));
List<KinesisClientLease> newLeases =
shardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_LATEST);
Assert.assertEquals(1, newLeases.size());
Assert.assertEquals(lastShardId, newLeases.get(0).getLeaseKey());
final LeaseSynchronizer leaseSynchronizer = getLeaseSynchronizer(shards, currentLeases);
final List<KinesisClientLease> newLeases =
shardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, INITIAL_POSITION_LATEST);
Assert.assertEquals(2, newLeases.size());
final Set<String> expectedLeaseShardIds = new HashSet<>();
expectedLeaseShardIds.add(firstShardId);
expectedLeaseShardIds.add(lastShardId);
for (KinesisClientLease lease : newLeases) {
Assert.assertTrue(expectedLeaseShardIds.contains(lease.getLeaseKey()));
}
}
/**
@ -752,8 +853,10 @@ public class ShardSyncerTest {
currentLeases.add(newLease("shardId-4"));
currentLeases.add(newLease("shardId-5"));
final LeaseSynchronizer leaseSynchronizer = getLeaseSynchronizer(shards, currentLeases);
List<KinesisClientLease> newLeases =
shardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_LATEST);
shardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, INITIAL_POSITION_LATEST);
Map<String, ExtendedSequenceNumber> expectedShardIdCheckpointMap =
new HashMap<String, ExtendedSequenceNumber>();
expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.TRIM_HORIZON);
@ -790,8 +893,10 @@ public class ShardSyncerTest {
currentLeases.add(newLease("shardId-5"));
currentLeases.add(newLease("shardId-7"));
final LeaseSynchronizer leaseSynchronizer = getLeaseSynchronizer(shards, currentLeases);
List<KinesisClientLease> newLeases =
shardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_LATEST);
shardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, INITIAL_POSITION_LATEST);
Map<String, ExtendedSequenceNumber> expectedShardIdCheckpointMap =
new HashMap<String, ExtendedSequenceNumber>();
expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.TRIM_HORIZON);
@ -826,8 +931,10 @@ public class ShardSyncerTest {
currentLeases.add(newLease("shardId-4"));
currentLeases.add(newLease("shardId-5"));
final LeaseSynchronizer leaseSynchronizer = getLeaseSynchronizer(shards, currentLeases);
List<KinesisClientLease> newLeases =
shardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_TRIM_HORIZON);
shardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, INITIAL_POSITION_TRIM_HORIZON);
Map<String, ExtendedSequenceNumber> expectedShardIdCheckpointMap =
new HashMap<String, ExtendedSequenceNumber>();
expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.TRIM_HORIZON);
@ -866,8 +973,10 @@ public class ShardSyncerTest {
currentLeases.add(newLease("shardId-5"));
currentLeases.add(newLease("shardId-7"));
final LeaseSynchronizer leaseSynchronizer = getLeaseSynchronizer(shards, currentLeases);
List<KinesisClientLease> newLeases =
shardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_TRIM_HORIZON);
shardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, INITIAL_POSITION_TRIM_HORIZON);
Map<String, ExtendedSequenceNumber> expectedShardIdCheckpointMap =
new HashMap<String, ExtendedSequenceNumber>();
expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.TRIM_HORIZON);
@ -895,8 +1004,9 @@ public class ShardSyncerTest {
public final void testDetermineNewLeasesToCreateGraphBNoInitialLeasesTrim() {
List<Shard> shards = constructShardListForGraphB();
List<KinesisClientLease> currentLeases = new ArrayList<KinesisClientLease>();
final LeaseSynchronizer leaseSynchronizer = getLeaseSynchronizer(shards, currentLeases);
List<KinesisClientLease> newLeases =
shardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_TRIM_HORIZON);
shardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, INITIAL_POSITION_TRIM_HORIZON);
Map<String, ExtendedSequenceNumber> expectedShardIdCheckpointMap =
new HashMap<String, ExtendedSequenceNumber>();
for (int i = 0; i < 11; i++) {
@ -927,13 +1037,14 @@ public class ShardSyncerTest {
List<Shard> shards = constructShardListForGraphA();
List<KinesisClientLease> currentLeases = new ArrayList<KinesisClientLease>();
currentLeases.add(newLease("shardId-3"));
currentLeases.add(newLease("shardId-4"));
currentLeases.add(newLease("shardId-5"));
final LeaseSynchronizer leaseSynchronizer = getLeaseSynchronizer(shards, currentLeases);
List<KinesisClientLease> newLeases =
shardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_AT_TIMESTAMP);
shardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, INITIAL_POSITION_AT_TIMESTAMP);
Map<String, ExtendedSequenceNumber> expectedShardIdCheckpointMap = new HashMap<String, ExtendedSequenceNumber>();
expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.AT_TIMESTAMP);
expectedShardIdCheckpointMap.put("shardId-9", ExtendedSequenceNumber.AT_TIMESTAMP);
@ -971,8 +1082,10 @@ public class ShardSyncerTest {
currentLeases.add(newLease("shardId-5"));
currentLeases.add(newLease("shardId-7"));
final LeaseSynchronizer leaseSynchronizer = getLeaseSynchronizer(shards, currentLeases);
List<KinesisClientLease> newLeases =
shardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_AT_TIMESTAMP);
shardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, INITIAL_POSITION_AT_TIMESTAMP);
Map<String, ExtendedSequenceNumber> expectedShardIdCheckpointMap = new HashMap<String, ExtendedSequenceNumber>();
expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.AT_TIMESTAMP);
expectedShardIdCheckpointMap.put("shardId-9", ExtendedSequenceNumber.AT_TIMESTAMP);
@ -998,8 +1111,9 @@ public class ShardSyncerTest {
public final void testDetermineNewLeasesToCreateGraphBNoInitialLeasesAtTimestamp() {
List<Shard> shards = constructShardListForGraphB();
List<KinesisClientLease> currentLeases = new ArrayList<KinesisClientLease>();
final LeaseSynchronizer leaseSynchronizer = getLeaseSynchronizer(shards, currentLeases);
List<KinesisClientLease> newLeases =
shardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_AT_TIMESTAMP);
shardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, INITIAL_POSITION_AT_TIMESTAMP);
Map<String, ExtendedSequenceNumber> expectedShardIdCheckpointMap =
new HashMap<String, ExtendedSequenceNumber>();
for (int i = 0; i < shards.size(); i++) {
@ -1664,6 +1778,51 @@ public class ShardSyncerTest {
testAssertShardCoveredOrAbsentTestIncompleteSplit(hashKeyRange, childHashKeyRange1, childHashKeyRange2);
}
/**
* Tests that when reading from TIP, we use the AT_LATEST shard filter
* @throws Exception
*/
@Test
public final void testEmptyLeaseTableBootstrapUsesShardFilterWithAtLatest() throws Exception {
ShardFilter shardFilter = new ShardFilter().withType(ShardFilterType.AT_LATEST);
testEmptyLeaseTableUsesListShardsWithFilter(INITIAL_POSITION_LATEST, shardFilter);
}
/**
* Tests that when reading from TRIM, we use the TRIM_HORIZON shard filter
* @throws Exception
*/
@Test
public final void testEmptyLeaseTableBootstrapUsesShardFilterWithAtTrimHorizon() throws Exception {
ShardFilter shardFilter = new ShardFilter().withType(ShardFilterType.AT_TRIM_HORIZON);
testEmptyLeaseTableUsesListShardsWithFilter(INITIAL_POSITION_TRIM_HORIZON, shardFilter);
}
/**
* Tests that when reading from AT_TIMESTAMP, we use the AT_TIMESTAMP shard filter
* @throws Exception
*/
@Test
public final void testEmptyLeaseTableBootstrapUsesShardFilterWithAtTimestamp() throws Exception {
ShardFilter shardFilter = new ShardFilter().withType(ShardFilterType.AT_TIMESTAMP).withTimestamp(new Date(1000L));
testEmptyLeaseTableUsesListShardsWithFilter(INITIAL_POSITION_AT_TIMESTAMP, shardFilter);
}
private void testEmptyLeaseTableUsesListShardsWithFilter(InitialPositionInStreamExtended initialPosition,
ShardFilter shardFilter) throws Exception {
final List<Shard> shards = constructShardListForGraphA();
final File dataFile = KinesisLocalFileDataCreator.generateTempDataFile(shards, 2, "testBootstrap1");
dataFile.deleteOnExit();
final IKinesisProxy kinesisProxy = spy(new KinesisLocalFileProxy(dataFile.getAbsolutePath()));
shardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, initialPosition,
cleanupLeasesOfCompletedShards, false);
verify(kinesisProxy, atLeastOnce()).getShardListWithFilter(shardFilter);
verify(kinesisProxy, never()).getShardList();
}
private void testAssertShardCoveredOrAbsentTestIncompleteSplit(HashKeyRange parentHashKeyRange,
HashKeyRange child1HashKeyRange,
HashKeyRange child2HashKeyRange)
@ -1710,4 +1869,25 @@ public class ShardSyncerTest {
return lease;
}
/**
* Helper method to get appropriate LeaseSynchronizer based on available shards and current leases. If there are
* no current leases (empty lease table case), return EmptyLeaseTableSynchronizer. Else, return
* NonEmptyLeaseTableSynchronizer with appropriate lease mappings.
*
* @param shards
* @param currentLeases
* @return
*/
private LeaseSynchronizer getLeaseSynchronizer(List<Shard> shards, List<KinesisClientLease> currentLeases) {
if (currentLeases.isEmpty()) {
return new EmptyLeaseTableSynchronizer();
}
final Map<String, Shard> shardIdToShardMap = KinesisShardSyncer.constructShardIdToShardMap(shards);
final Map<String, Set<String>> shardIdToChildShardIdsMap =
KinesisShardSyncer.constructShardIdToChildShardIdsMap(shardIdToShardMap);
return new NonEmptyLeaseTableSynchronizer(shardIdToShardMap, shardIdToChildShardIdsMap);
}
}