From 550d7af5b193d475d35b4ba81390f635fb2d160c Mon Sep 17 00:00:00 2001 From: Micah Jaffe <31011877+micah-jaffe@users.noreply.github.com> Date: Mon, 4 May 2020 17:06:57 -0700 Subject: [PATCH] Add lease sync strategy for empty lease table (#24) * Add lease sync strategy for empty lease table * Fix ShardSyncer unit tests to reflect new empty lease table shard sync logic --- .../worker/EmptyLeaseTableSynchronizer.java | 98 +++++++ .../lib/worker/KinesisShardSyncer.java | 171 ++++-------- .../lib/worker/LeaseSynchronizer.java | 41 +++ .../NonEmptyLeaseTableSynchronizer.java | 165 ++++++++++++ .../lib/worker/ShardSyncerTest.java | 248 +++++++++++++++--- 5 files changed, 566 insertions(+), 157 deletions(-) create mode 100644 src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/EmptyLeaseTableSynchronizer.java create mode 100644 src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/LeaseSynchronizer.java create mode 100644 src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/NonEmptyLeaseTableSynchronizer.java diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/EmptyLeaseTableSynchronizer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/EmptyLeaseTableSynchronizer.java new file mode 100644 index 00000000..3679f4dc --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/EmptyLeaseTableSynchronizer.java @@ -0,0 +1,98 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; +import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; +import com.amazonaws.services.kinesis.leases.impl.Lease; +import com.amazonaws.services.kinesis.model.Shard; +import lombok.AllArgsConstructor; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisShardSyncer.StartingSequenceNumberAndShardIdBasedComparator; + +/** + * Class to help create leases when the table is initially empty. + */ +@AllArgsConstructor +class EmptyLeaseTableSynchronizer implements LeaseSynchronizer { + + private static final Log LOG = LogFactory.getLog(EmptyLeaseTableSynchronizer.class); + + /** + * Determines how to create leases when the lease table is initially empty. For this, we read all shards where + * the KCL is reading from. For any shards which are closed, we will discover their child shards through GetRecords + * child shard information. + * + * @param shards + * @param currentLeases + * @param initialPosition + * @param inconsistentShardIds + * @return + */ + @Override + public List determineNewLeasesToCreate(List shards, + List currentLeases, + InitialPositionInStreamExtended initialPosition, + Set inconsistentShardIds) { + + final Map shardIdToShardMapOfAllKinesisShards = + KinesisShardSyncer.constructShardIdToShardMap(shards); + + currentLeases.forEach(lease -> LOG.debug("Existing lease: " + lease.getLeaseKey())); + + final List newLeasesToCreate = + getLeasesToCreateForOpenAndClosedShards(initialPosition, shards); + + final Comparator startingSequenceNumberComparator = + new StartingSequenceNumberAndShardIdBasedComparator(shardIdToShardMapOfAllKinesisShards); + + newLeasesToCreate.sort(startingSequenceNumberComparator); + return newLeasesToCreate; + } + + /** + * Helper method to create leases. For an empty lease table, we will be creating leases for all shards + * regardless of if they are open or closed. Closed shards will be unblocked via child shard information upon + * reaching SHARD_END. + */ + private List getLeasesToCreateForOpenAndClosedShards( + InitialPositionInStreamExtended initialPosition, + List shards) { + + final Map shardIdToNewLeaseMap = new HashMap<>(); + + for (Shard shard : shards) { + final String shardId = shard.getShardId(); + final KinesisClientLease lease = KinesisShardSyncer.newKCLLease(shard); + + final ExtendedSequenceNumber checkpoint = KinesisShardSyncer.convertToCheckpoint(initialPosition); + lease.setCheckpoint(checkpoint); + + LOG.debug("Need to create a lease for shard with shardId " + shardId); + shardIdToNewLeaseMap.put(shardId, lease); + } + + return new ArrayList(shardIdToNewLeaseMap.values()); + } +} \ No newline at end of file diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardSyncer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardSyncer.java index ac57f9cb..13a52355 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardSyncer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardSyncer.java @@ -26,6 +26,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import com.amazonaws.services.kinesis.leases.impl.Lease; +import com.amazonaws.services.kinesis.leases.impl.LeaseManager; import com.amazonaws.services.kinesis.model.ShardFilter; import com.amazonaws.services.kinesis.model.ShardFilterType; import com.amazonaws.util.CollectionUtils; @@ -45,6 +47,8 @@ import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper; import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; import com.amazonaws.services.kinesis.model.Shard; +import javax.annotation.Nullable; + /** * Helper class to sync leases with shards of the Kinesis stream. * It will create new leases/activities when it discovers new Kinesis shards (bootstrap/resharding). @@ -111,7 +115,8 @@ class KinesisShardSyncer implements ShardSyncer { boolean cleanupLeasesOfCompletedShards, boolean ignoreUnexpectedChildShards, List latestShards) throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { - syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, latestShards); + syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards, + ignoreUnexpectedChildShards, latestShards, leaseManager.isLeaseTableEmpty()); } /** @@ -136,11 +141,13 @@ class KinesisShardSyncer implements ShardSyncer { // In the case where the lease table is empty, we want to synchronize the minimal amount of shards possible // based on the given initial position. // TODO: Implement shard list filtering on non-empty lease table case - final List latestShards = leaseManager.isLeaseTableEmpty() + final boolean isLeaseTableEmpty = leaseManager.isLeaseTableEmpty(); + final List latestShards = isLeaseTableEmpty ? getShardListAtInitialPosition(kinesisProxy, initialPosition) : getCompleteShardList(kinesisProxy); - syncShardLeases(kinesisProxy, leaseManager, initialPosition, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, latestShards); + syncShardLeases(kinesisProxy, leaseManager, initialPosition, cleanupLeasesOfCompletedShards, + ignoreUnexpectedChildShards, latestShards, isLeaseTableEmpty); } /** @@ -159,10 +166,15 @@ class KinesisShardSyncer implements ShardSyncer { */ // CHECKSTYLE:OFF CyclomaticComplexity private synchronized void syncShardLeases(IKinesisProxy kinesisProxy, - ILeaseManager leaseManager, InitialPositionInStreamExtended initialPosition, - boolean cleanupLeasesOfCompletedShards, boolean ignoreUnexpectedChildShards, List latestShards) + ILeaseManager leaseManager, + InitialPositionInStreamExtended initialPosition, + boolean cleanupLeasesOfCompletedShards, + boolean ignoreUnexpectedChildShards, + List latestShards, + boolean isLeaseTableEmpty) throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { + List shards; if(CollectionUtils.isNullOrEmpty(latestShards)) { shards = getCompleteShardList(kinesisProxy); @@ -178,11 +190,16 @@ class KinesisShardSyncer implements ShardSyncer { assertAllParentShardsAreClosed(inconsistentShardIds); } - List currentLeases = leaseManager.listLeases(); + // Determine which lease sync strategy to use based on the state of the lease table + final LeaseSynchronizer leaseSynchronizer = isLeaseTableEmpty + ? new EmptyLeaseTableSynchronizer() + : new NonEmptyLeaseTableSynchronizer(shardIdToShardMap, shardIdToChildShardIdsMap); - List newLeasesToCreate = determineNewLeasesToCreate(shards, currentLeases, initialPosition, - inconsistentShardIds); + final List currentLeases = leaseManager.listLeases(); + final List newLeasesToCreate = determineNewLeasesToCreate(leaseSynchronizer, shards, + currentLeases, initialPosition, inconsistentShardIds); LOG.debug("Num new leases to create: " + newLeasesToCreate.size()); + for (KinesisClientLease lease : newLeasesToCreate) { long startTimeMillis = System.currentTimeMillis(); boolean success = false; @@ -326,7 +343,7 @@ class KinesisShardSyncer implements ShardSyncer { * @param shardIdToShardMap * @return */ - Map> constructShardIdToChildShardIdsMap(Map shardIdToShardMap) { + static Map> constructShardIdToChildShardIdsMap(Map shardIdToShardMap) { Map> shardIdToChildShardIdsMap = new HashMap<>(); for (Map.Entry entry : shardIdToShardMap.entrySet()) { String shardId = entry.getKey(); @@ -405,42 +422,8 @@ class KinesisShardSyncer implements ShardSyncer { * Determine new leases to create and their initial checkpoint. * Note: Package level access only for testing purposes. * - * For each open (no ending sequence number) shard without open parents that doesn't already have a lease, - * determine if it is a descendent of any shard which is or will be processed (e.g. for which a lease exists): - * If so, set checkpoint of the shard to TrimHorizon and also create leases for ancestors if needed. - * If not, set checkpoint of the shard to the initial position specified by the client. - * To check if we need to create leases for ancestors, we use the following rules: - * * If we began (or will begin) processing data for a shard, then we must reach end of that shard before - * we begin processing data from any of its descendants. - * * A shard does not start processing data until data from all its parents has been processed. - * Note, if the initial position is LATEST and a shard has two parents and only one is a descendant - we'll create - * leases corresponding to both the parents - the parent shard which is not a descendant will have - * its checkpoint set to Latest. - * - * We assume that if there is an existing lease for a shard, then either: - * * we have previously created a lease for its parent (if it was needed), or - * * the parent shard has expired. - * - * For example: - * Shard structure (each level depicts a stream segment): - * 0 1 2 3 4 5 - shards till epoch 102 - * \ / \ / | | - * 6 7 4 5 - shards from epoch 103 - 205 - * \ / | / \ - * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) - * Current leases: (3, 4, 5) - * New leases to create: (2, 6, 7, 8, 9, 10) - * - * The leases returned are sorted by the starting sequence number - following the same order - * when persisting the leases in DynamoDB will ensure that we recover gracefully if we fail - * before creating all the leases. - * - * If a shard has no existing lease, is open, and is a descendant of a parent which is still open, we ignore it - * here; this happens when the list of shards is inconsistent, which could be due to pagination delay for very - * high shard count streams (i.e., dynamodb streams for tables with thousands of partitions). This can only - * currently happen here if ignoreUnexpectedChildShards was true in syncShardleases. - * - * + * @param leaseSynchronizer determines the strategy to use when updating leases based on the current state of + * the lease table (empty vs. non-empty) * @param shards List of all shards in Kinesis (we'll create new leases based on this set) * @param currentLeases List of current leases * @param initialPosition One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. We'll start fetching records from that @@ -448,85 +431,27 @@ class KinesisShardSyncer implements ShardSyncer { * @param inconsistentShardIds Set of child shard ids having open parents. * @return List of new leases to create sorted by starting sequenceNumber of the corresponding shard */ - List determineNewLeasesToCreate(List shards, List currentLeases, - InitialPositionInStreamExtended initialPosition, Set inconsistentShardIds) { - Map shardIdToNewLeaseMap = new HashMap(); - Map shardIdToShardMapOfAllKinesisShards = constructShardIdToShardMap(shards); + List determineNewLeasesToCreate(LeaseSynchronizer leaseSynchronizer, + List shards, + List currentLeases, + InitialPositionInStreamExtended initialPosition, + Set inconsistentShardIds) { - Set shardIdsOfCurrentLeases = new HashSet(); - for (KinesisClientLease lease : currentLeases) { - shardIdsOfCurrentLeases.add(lease.getLeaseKey()); - LOG.debug("Existing lease: " + lease); - } - - List openShards = getOpenShards(shards); - Map memoizationContext = new HashMap<>(); - - // Iterate over the open shards and find those that don't have any lease entries. - for (Shard shard : openShards) { - String shardId = shard.getShardId(); - LOG.debug("Evaluating leases for open shard " + shardId + " and its ancestors."); - if (shardIdsOfCurrentLeases.contains(shardId)) { - LOG.debug("Lease for shardId " + shardId + " already exists. Not creating a lease"); - } else if (inconsistentShardIds.contains(shardId)) { - LOG.info("shardId " + shardId + " is an inconsistent child. Not creating a lease"); - } else { - LOG.debug("Need to create a lease for shardId " + shardId); - KinesisClientLease newLease = newKCLLease(shard); - boolean isDescendant = checkIfDescendantAndAddNewLeasesForAncestors(shardId, initialPosition, - shardIdsOfCurrentLeases, shardIdToShardMapOfAllKinesisShards, shardIdToNewLeaseMap, - memoizationContext); - - /** - * If the shard is a descendant and the specified initial position is AT_TIMESTAMP, then the - * checkpoint should be set to AT_TIMESTAMP, else to TRIM_HORIZON. For AT_TIMESTAMP, we will add a - * lease just like we do for TRIM_HORIZON. However we will only return back records with server-side - * timestamp at or after the specified initial position timestamp. - * - * Shard structure (each level depicts a stream segment): - * 0 1 2 3 4 5 - shards till epoch 102 - * \ / \ / | | - * 6 7 4 5 - shards from epoch 103 - 205 - * \ / | /\ - * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) - * - * Current leases: empty set - * - * For the above example, suppose the initial position in stream is set to AT_TIMESTAMP with - * timestamp value 206. We will then create new leases for all the shards (with checkpoint set to - * AT_TIMESTAMP), including the ancestor shards with epoch less than 206. However as we begin - * processing the ancestor shards, their checkpoints would be updated to SHARD_END and their leases - * would then be deleted since they won't have records with server-side timestamp at/after 206. And - * after that we will begin processing the descendant shards with epoch at/after 206 and we will - * return the records that meet the timestamp requirement for these shards. - */ - if (isDescendant && !initialPosition.getInitialPositionInStream() - .equals(InitialPositionInStream.AT_TIMESTAMP)) { - newLease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON); - } else { - newLease.setCheckpoint(convertToCheckpoint(initialPosition)); - } - LOG.debug("Set checkpoint of " + newLease.getLeaseKey() + " to " + newLease.getCheckpoint()); - shardIdToNewLeaseMap.put(shardId, newLease); - } - } - - List newLeasesToCreate = new ArrayList(); - newLeasesToCreate.addAll(shardIdToNewLeaseMap.values()); - Comparator startingSequenceNumberComparator = new StartingSequenceNumberAndShardIdBasedComparator( - shardIdToShardMapOfAllKinesisShards); - Collections.sort(newLeasesToCreate, startingSequenceNumberComparator); - return newLeasesToCreate; + return leaseSynchronizer.determineNewLeasesToCreate(shards, currentLeases, initialPosition, + inconsistentShardIds); } /** * Determine new leases to create and their initial checkpoint. * Note: Package level access only for testing purposes. */ - List determineNewLeasesToCreate(List shards, List currentLeases, - InitialPositionInStreamExtended initialPosition) { + List determineNewLeasesToCreate(LeaseSynchronizer leaseSynchronizer, + List shards, + List currentLeases, + InitialPositionInStreamExtended initialPosition) { + Set inconsistentShardIds = new HashSet(); - return determineNewLeasesToCreate(shards, currentLeases, initialPosition, inconsistentShardIds); + return determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, initialPosition, inconsistentShardIds); } /** @@ -545,7 +470,7 @@ class KinesisShardSyncer implements ShardSyncer { * @return true if the shard is a descendant of any current shard (lease already exists) */ // CHECKSTYLE:OFF CyclomaticComplexity - boolean checkIfDescendantAndAddNewLeasesForAncestors(String shardId, + static boolean checkIfDescendantAndAddNewLeasesForAncestors(String shardId, InitialPositionInStreamExtended initialPosition, Set shardIdsOfCurrentLeases, Map shardIdToShardMapOfAllKinesisShards, Map shardIdToLeaseMapOfNewShards, Map memoizationContext) { @@ -630,7 +555,7 @@ class KinesisShardSyncer implements ShardSyncer { * @param shardIdToShardMapOfAllKinesisShards ShardId->Shard map containing all shards obtained via DescribeStream. * @return Set of parentShardIds */ - Set getParentShardIds(Shard shard, Map shardIdToShardMapOfAllKinesisShards) { + static Set getParentShardIds(Shard shard, Map shardIdToShardMapOfAllKinesisShards) { Set parentShardIds = new HashSet(2); String parentShardId = shard.getParentShardId(); if ((parentShardId != null) && shardIdToShardMapOfAllKinesisShards.containsKey(parentShardId)) { @@ -794,7 +719,7 @@ class KinesisShardSyncer implements ShardSyncer { * @param shard * @return */ - KinesisClientLease newKCLLease(Shard shard) { + static KinesisClientLease newKCLLease(Shard shard) { KinesisClientLease newLease = new KinesisClientLease(); newLease.setLeaseKey(shard.getShardId()); List parentShardIds = new ArrayList(2); @@ -816,7 +741,7 @@ class KinesisShardSyncer implements ShardSyncer { * @param shards List of shards * @return ShardId->Shard map */ - Map constructShardIdToShardMap(List shards) { + static Map constructShardIdToShardMap(List shards) { Map shardIdToShardMap = new HashMap(); for (Shard shard : shards) { shardIdToShardMap.put(shard.getShardId(), shard); @@ -831,7 +756,7 @@ class KinesisShardSyncer implements ShardSyncer { * @param allShards All shards returved via DescribeStream. We assume this to represent a consistent shard list. * @return List of open shards (shards at the tip of the stream) - may include shards that are not yet active. */ - List getOpenShards(List allShards) { + static List getOpenShards(List allShards) { List openShards = new ArrayList(); for (Shard shard : allShards) { String endingSequenceNumber = shard.getSequenceNumberRange().getEndingSequenceNumber(); @@ -843,7 +768,7 @@ class KinesisShardSyncer implements ShardSyncer { return openShards; } - private ExtendedSequenceNumber convertToCheckpoint(InitialPositionInStreamExtended position) { + static ExtendedSequenceNumber convertToCheckpoint(InitialPositionInStreamExtended position) { ExtendedSequenceNumber checkpoint = null; if (position.getInitialPositionInStream().equals(InitialPositionInStream.TRIM_HORIZON)) { @@ -860,7 +785,7 @@ class KinesisShardSyncer implements ShardSyncer { /** Helper class to compare leases based on starting sequence number of the corresponding shards. * */ - private static class StartingSequenceNumberAndShardIdBasedComparator implements Comparator, + static class StartingSequenceNumberAndShardIdBasedComparator implements Comparator, Serializable { private static final long serialVersionUID = 1L; diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/LeaseSynchronizer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/LeaseSynchronizer.java new file mode 100644 index 00000000..441249d1 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/LeaseSynchronizer.java @@ -0,0 +1,41 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; +import com.amazonaws.services.kinesis.model.Shard; + +import java.util.List; +import java.util.Set; + +/** + * Interface used by {@link KinesisShardSyncer} to determine how to create new leases based on the current state + * of the lease table (i.e. whether the lease table is empty or non-empty). + */ +interface LeaseSynchronizer { + + /** + * Determines how to create leases. + * @param shards + * @param currentLeases + * @param initialPosition + * @param inconsistentShardIds + * @return + */ + List determineNewLeasesToCreate(List shards, + List currentLeases, + InitialPositionInStreamExtended initialPosition, + Set inconsistentShardIds); +} \ No newline at end of file diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/NonEmptyLeaseTableSynchronizer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/NonEmptyLeaseTableSynchronizer.java new file mode 100644 index 00000000..53c42980 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/NonEmptyLeaseTableSynchronizer.java @@ -0,0 +1,165 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; +import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; +import com.amazonaws.services.kinesis.leases.impl.Lease; +import com.amazonaws.services.kinesis.model.Shard; +import lombok.AllArgsConstructor; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * TODO - non-empty lease table sync story + */ +@AllArgsConstructor +class NonEmptyLeaseTableSynchronizer implements LeaseSynchronizer { + + private static final Log LOG = LogFactory.getLog(NonEmptyLeaseTableSynchronizer.class); + + private final Map shardIdToShardMap; + private final Map> shardIdToChildShardIdsMap; + + /** + * Determine new leases to create and their initial checkpoint. + * Note: Package level access only for testing purposes. + * + * For each open (no ending sequence number) shard without open parents that doesn't already have a lease, + * determine if it is a descendent of any shard which is or will be processed (e.g. for which a lease exists): + * If so, set checkpoint of the shard to TrimHorizon and also create leases for ancestors if needed. + * If not, set checkpoint of the shard to the initial position specified by the client. + * To check if we need to create leases for ancestors, we use the following rules: + * * If we began (or will begin) processing data for a shard, then we must reach end of that shard before + * we begin processing data from any of its descendants. + * * A shard does not start processing data until data from all its parents has been processed. + * Note, if the initial position is LATEST and a shard has two parents and only one is a descendant - we'll create + * leases corresponding to both the parents - the parent shard which is not a descendant will have + * its checkpoint set to Latest. + * + * We assume that if there is an existing lease for a shard, then either: + * * we have previously created a lease for its parent (if it was needed), or + * * the parent shard has expired. + * + * For example: + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5 - shards till epoch 102 + * \ / \ / | | + * 6 7 4 5 - shards from epoch 103 - 205 + * \ / | / \ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Current leases: (3, 4, 5) + * New leases to create: (2, 6, 7, 8, 9, 10) + * + * The leases returned are sorted by the starting sequence number - following the same order + * when persisting the leases in DynamoDB will ensure that we recover gracefully if we fail + * before creating all the leases. + * + * If a shard has no existing lease, is open, and is a descendant of a parent which is still open, we ignore it + * here; this happens when the list of shards is inconsistent, which could be due to pagination delay for very + * high shard count streams (i.e., dynamodb streams for tables with thousands of partitions). This can only + * currently happen here if ignoreUnexpectedChildShards was true in syncShardleases. + * + * @param shards List of all shards in Kinesis (we'll create new leases based on this set) + * @param currentLeases List of current leases + * @param initialPosition One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. We'll start fetching records from that + * location in the shard (when an application starts up for the first time - and there are no checkpoints). + * @param inconsistentShardIds Set of child shard ids having open parents. + * @return List of new leases to create sorted by starting sequenceNumber of the corresponding shard + */ + @Override + public List determineNewLeasesToCreate(List shards, + List currentLeases, + InitialPositionInStreamExtended initialPosition, + Set inconsistentShardIds) { + + Map shardIdToNewLeaseMap = new HashMap<>(); + Map shardIdToShardMapOfAllKinesisShards = KinesisShardSyncer.constructShardIdToShardMap(shards); + + Set shardIdsOfCurrentLeases = new HashSet(); + for (Lease lease : currentLeases) { + shardIdsOfCurrentLeases.add(lease.getLeaseKey()); + LOG.debug("Existing lease: " + lease); + } + + List openShards = KinesisShardSyncer.getOpenShards(shards); + Map memoizationContext = new HashMap<>(); + + // Iterate over the open shards and find those that don't have any lease entries. + for (Shard shard : openShards) { + String shardId = shard.getShardId(); + LOG.debug("Evaluating leases for open shard " + shardId + " and its ancestors."); + if (shardIdsOfCurrentLeases.contains(shardId)) { + LOG.debug("Lease for shardId " + shardId + " already exists. Not creating a lease"); + } else if (inconsistentShardIds.contains(shardId)) { + LOG.info("shardId " + shardId + " is an inconsistent child. Not creating a lease"); + } else { + LOG.debug("Need to create a lease for shardId " + shardId); + KinesisClientLease newLease = KinesisShardSyncer.newKCLLease(shard); + boolean isDescendant = KinesisShardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(shardId, + initialPosition, shardIdsOfCurrentLeases, shardIdToShardMapOfAllKinesisShards, + shardIdToNewLeaseMap, memoizationContext); + + /** + * If the shard is a descendant and the specified initial position is AT_TIMESTAMP, then the + * checkpoint should be set to AT_TIMESTAMP, else to TRIM_HORIZON. For AT_TIMESTAMP, we will add a + * lease just like we do for TRIM_HORIZON. However we will only return back records with server-side + * timestamp at or after the specified initial position timestamp. + * + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5 - shards till epoch 102 + * \ / \ / | | + * 6 7 4 5 - shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * + * Current leases: empty set + * + * For the above example, suppose the initial position in stream is set to AT_TIMESTAMP with + * timestamp value 206. We will then create new leases for all the shards (with checkpoint set to + * AT_TIMESTAMP), including the ancestor shards with epoch less than 206. However as we begin + * processing the ancestor shards, their checkpoints would be updated to SHARD_END and their leases + * would then be deleted since they won't have records with server-side timestamp at/after 206. And + * after that we will begin processing the descendant shards with epoch at/after 206 and we will + * return the records that meet the timestamp requirement for these shards. + */ + if (isDescendant && !initialPosition.getInitialPositionInStream() + .equals(InitialPositionInStream.AT_TIMESTAMP)) { + newLease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON); + } else { + newLease.setCheckpoint(KinesisShardSyncer.convertToCheckpoint(initialPosition)); + } + LOG.debug("Set checkpoint of " + newLease.getLeaseKey() + " to " + newLease.getCheckpoint()); + shardIdToNewLeaseMap.put(shardId, newLease); + } + } + + List newLeasesToCreate = new ArrayList<>(); + newLeasesToCreate.addAll(shardIdToNewLeaseMap.values()); + Comparator startingSequenceNumberComparator = new KinesisShardSyncer.StartingSequenceNumberAndShardIdBasedComparator( + shardIdToShardMapOfAllKinesisShards); + Collections.sort(newLeasesToCreate, startingSequenceNumberComparator); + return newLeasesToCreate; + } +} diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java index 2fce50a7..83dcd4af 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java @@ -18,17 +18,21 @@ import java.io.File; import java.io.IOException; import java.math.BigInteger; import java.util.ArrayList; +import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; -import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient; import com.amazonaws.services.dynamodbv2.local.embedded.DynamoDBEmbedded; +import com.amazonaws.services.kinesis.leases.impl.Lease; +import com.amazonaws.services.kinesis.model.ShardFilter; +import com.amazonaws.services.kinesis.model.ShardFilterType; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.junit.After; @@ -56,6 +60,11 @@ import com.amazonaws.services.kinesis.model.Shard; import junit.framework.Assert; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; + /** * */ @@ -69,9 +78,9 @@ public class ShardSyncerTest { private static final InitialPositionInStreamExtended INITIAL_POSITION_AT_TIMESTAMP = InitialPositionInStreamExtended.newInitialPositionAtTimestamp(new Date(1000L)); private final boolean cleanupLeasesOfCompletedShards = true; - AmazonDynamoDB ddbClient = DynamoDBEmbedded.create().amazonDynamoDB(); - LeaseManager leaseManager = new KinesisClientLeaseManager("tempTestTable", ddbClient, KinesisClientLibConfiguration.DEFAULT_DDB_BILLING_MODE); private static final int EXPONENT = 128; + AmazonDynamoDB ddbClient = DynamoDBEmbedded.create().amazonDynamoDB(); + private LeaseManager leaseManager = new KinesisClientLeaseManager("tempTestTable", ddbClient, KinesisClientLibConfiguration.DEFAULT_DDB_BILLING_MODE); protected static final KinesisLeaseCleanupValidator leaseCleanupValidator = new KinesisLeaseCleanupValidator(); private static final KinesisShardSyncer shardSyncer = new KinesisShardSyncer(leaseCleanupValidator); /** @@ -120,8 +129,9 @@ public class ShardSyncerTest { public final void testDetermineNewLeasesToCreateNoShards() { List shards = new ArrayList(); List leases = new ArrayList(); + final LeaseSynchronizer leaseSynchronizer = getLeaseSynchronizer(shards, leases); - Assert.assertTrue(shardSyncer.determineNewLeasesToCreate(shards, leases, INITIAL_POSITION_LATEST).isEmpty()); + Assert.assertTrue(shardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, leases, INITIAL_POSITION_LATEST).isEmpty()); } /** @@ -139,8 +149,10 @@ public class ShardSyncerTest { String shardId1 = "shardId-1"; shards.add(ShardObjectHelper.newShard(shardId1, null, null, sequenceRange)); + final LeaseSynchronizer leaseSynchronizer = getLeaseSynchronizer(shards, currentLeases); + List newLeases = - shardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_LATEST); + shardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, INITIAL_POSITION_LATEST); Assert.assertEquals(2, newLeases.size()); Set expectedLeaseShardIds = new HashSet(); expectedLeaseShardIds.add(shardId0); @@ -151,7 +163,7 @@ public class ShardSyncerTest { } /** - * Test determineNewLeasesToCreate() where there are no leases and no resharding operations have been performed, but one of + * Test determineNewLeasesToCreate() where there is one lease and no resharding operations have been performed, but one of * the shards was marked as inconsistent. */ @Test @@ -169,11 +181,18 @@ public class ShardSyncerTest { String shardId2 = "shardId-2"; shards.add(ShardObjectHelper.newShard(shardId2, shardId1, null, sequenceRange)); + String shardIdWithLease = "shardId-3"; + shards.add(ShardObjectHelper.newShard(shardIdWithLease, shardIdWithLease, null, sequenceRange)); + + currentLeases.add(newLease(shardIdWithLease)); + Set inconsistentShardIds = new HashSet(); inconsistentShardIds.add(shardId2); + final LeaseSynchronizer leaseSynchronizer = getLeaseSynchronizer(shards, currentLeases); + List newLeases = - shardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_LATEST, inconsistentShardIds); + shardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, INITIAL_POSITION_LATEST, inconsistentShardIds); Assert.assertEquals(2, newLeases.size()); Set expectedLeaseShardIds = new HashSet(); expectedLeaseShardIds.add(shardId0); @@ -216,14 +235,10 @@ public class ShardSyncerTest { } /** - * @throws KinesisClientLibIOException - * @throws DependencyException - * @throws InvalidStateException - * @throws ProvisionedThroughputException - * @throws IOException + * All open and closed shards within stream's retention period should be sync'ed when lease table is empty. */ @Test - public final void testCheckAndCreateLeasesForNewShardsAtLatest() + public final void testCheckAndCreateLeasesForNewShardsAtLatestWithEmptyLeaseTable() throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException, IOException { List shards = constructShardListForGraphA(); @@ -231,6 +246,38 @@ public class ShardSyncerTest { dataFile.deleteOnExit(); IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath()); + shardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, INITIAL_POSITION_LATEST, + cleanupLeasesOfCompletedShards, false, shards); + List newLeases = leaseManager.listLeases(); + Set expectedLeaseShardIds = new HashSet(); + for (int i = 0; i < 11; i++) { + expectedLeaseShardIds.add("shardId-" + i); + } + Assert.assertEquals(expectedLeaseShardIds.size(), newLeases.size()); + for (KinesisClientLease lease1 : newLeases) { + Assert.assertTrue(expectedLeaseShardIds.contains(lease1.getLeaseKey())); + Assert.assertEquals(ExtendedSequenceNumber.LATEST, lease1.getCheckpoint()); + } + dataFile.delete(); + } + + /** + * We should only create leases for shards at LATEST when lease table is not empty. + */ + @Test + public final void testCheckAndCreateLeasesForNewShardsAtLatestWithPartialLeaseTable() + throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException, + IOException { + List shards = constructShardListForGraphA(); + File dataFile = KinesisLocalFileDataCreator.generateTempDataFile(shards, 2, "testBootstrap1"); + dataFile.deleteOnExit(); + IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath()); + + // shardId-10 exists at LATEST - create a lease for it + KinesisClientLease lease = newLease("shardId-10"); + lease.setCheckpoint(ExtendedSequenceNumber.LATEST); + leaseManager.createLeaseIfNotExists(lease); + shardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, INITIAL_POSITION_LATEST, cleanupLeasesOfCompletedShards, false, shards); List newLeases = leaseManager.listLeases(); @@ -353,6 +400,13 @@ public class ShardSyncerTest { File dataFile = KinesisLocalFileDataCreator.generateTempDataFile(shards, 2, "testBootstrap1"); dataFile.deleteOnExit(); IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath()); + + // Create a dummy lease in the lease table - otherwise leaseManager will create leases for all shards if + // lease table is empty. + KinesisClientLease lease = newLease("shardId-1000"); + lease.setCheckpoint(ExtendedSequenceNumber.LATEST); + leaseManager.createLeaseIfNotExists(lease); + shardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, INITIAL_POSITION_LATEST, cleanupLeasesOfCompletedShards, true, shards); List newLeases = leaseManager.listLeases(); @@ -695,8 +749,9 @@ public class ShardSyncerTest { initialPositions.add(INITIAL_POSITION_TRIM_HORIZON); for (InitialPositionInStreamExtended initialPosition : initialPositions) { + final LeaseSynchronizer leaseSynchronizer = getLeaseSynchronizer(shards, currentLeases); List newLeases = - shardSyncer.determineNewLeasesToCreate(shards, currentLeases, initialPosition); + shardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, initialPosition); Assert.assertEquals(2, newLeases.size()); Set expectedLeaseShardIds = new HashSet(); expectedLeaseShardIds.add(shardId0); @@ -710,27 +765,73 @@ public class ShardSyncerTest { } /** - * Test determineNewLeasesToCreate() - 1 closed and 1 open shard (ignore closed shard) + * Test determineNewLeasesToCreate() - 1 closed and 1 open shard (ignore closed shard), 1 shard with a lease + * already in lease table. If lease table is non-empty, closed shards should be ignored. */ @Test - public final void testDetermineNewLeasesToCreateIgnoreClosedShard() { - List shards = new ArrayList(); - List currentLeases = new ArrayList(); + public final void testDetermineNewLeasesToCreateIgnoresClosedShardWithPartialLeaseTable() { + final List shardsWithoutLeases = new ArrayList(); + final List shardsWithLeases = new ArrayList(); + final List currentLeases = new ArrayList(); - shards.add(ShardObjectHelper.newShard("shardId-0", + shardsWithoutLeases.add(ShardObjectHelper.newShard("shardId-0", null, null, ShardObjectHelper.newSequenceNumberRange("303", "404"))); - String lastShardId = "shardId-1"; + final String lastShardId = "shardId-1"; + shardsWithoutLeases.add(ShardObjectHelper.newShard(lastShardId, + null, + null, + ShardObjectHelper.newSequenceNumberRange("405", null))); + + shardsWithLeases.add(ShardObjectHelper.newShard("shardId-2", + null, + null, + ShardObjectHelper.newSequenceNumberRange("202", "302"))); + currentLeases.add(newLease("shardId-2")); + + final List allShards = + Stream.concat(shardsWithLeases.stream(), shardsWithoutLeases.stream()).collect(Collectors.toList()); + final LeaseSynchronizer leaseSynchronizer = getLeaseSynchronizer(allShards, currentLeases); + + final List newLeases = + shardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shardsWithoutLeases, currentLeases, INITIAL_POSITION_LATEST); + Assert.assertEquals(1, newLeases.size()); + Assert.assertEquals(lastShardId, newLeases.get(0).getLeaseKey()); + } + + /** + * Test determineNewLeasesToCreate() - 1 closed and 1 open shard. Since lease table is empty, we should create + * leases for all shards, regardless if they are open or closed. + */ + @Test + public final void testDetermineNewLeasesToCreateDoesntIgnoreClosedShardWithEmptyLeaseTable() { + final List shards = new ArrayList<>(); + final List currentLeases = new ArrayList<>(); + + final String firstShardId = "shardId-0"; + shards.add(ShardObjectHelper.newShard(firstShardId, + null, + null, + ShardObjectHelper.newSequenceNumberRange("303", "404"))); + final String lastShardId = "shardId-1"; shards.add(ShardObjectHelper.newShard(lastShardId, null, null, ShardObjectHelper.newSequenceNumberRange("405", null))); - List newLeases = - shardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_LATEST); - Assert.assertEquals(1, newLeases.size()); - Assert.assertEquals(lastShardId, newLeases.get(0).getLeaseKey()); + final LeaseSynchronizer leaseSynchronizer = getLeaseSynchronizer(shards, currentLeases); + + final List newLeases = + shardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, INITIAL_POSITION_LATEST); + Assert.assertEquals(2, newLeases.size()); + + final Set expectedLeaseShardIds = new HashSet<>(); + expectedLeaseShardIds.add(firstShardId); + expectedLeaseShardIds.add(lastShardId); + for (KinesisClientLease lease : newLeases) { + Assert.assertTrue(expectedLeaseShardIds.contains(lease.getLeaseKey())); + } } /** @@ -752,8 +853,10 @@ public class ShardSyncerTest { currentLeases.add(newLease("shardId-4")); currentLeases.add(newLease("shardId-5")); + final LeaseSynchronizer leaseSynchronizer = getLeaseSynchronizer(shards, currentLeases); + List newLeases = - shardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_LATEST); + shardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, INITIAL_POSITION_LATEST); Map expectedShardIdCheckpointMap = new HashMap(); expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.TRIM_HORIZON); @@ -790,8 +893,10 @@ public class ShardSyncerTest { currentLeases.add(newLease("shardId-5")); currentLeases.add(newLease("shardId-7")); + final LeaseSynchronizer leaseSynchronizer = getLeaseSynchronizer(shards, currentLeases); + List newLeases = - shardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_LATEST); + shardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, INITIAL_POSITION_LATEST); Map expectedShardIdCheckpointMap = new HashMap(); expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.TRIM_HORIZON); @@ -826,8 +931,10 @@ public class ShardSyncerTest { currentLeases.add(newLease("shardId-4")); currentLeases.add(newLease("shardId-5")); + final LeaseSynchronizer leaseSynchronizer = getLeaseSynchronizer(shards, currentLeases); + List newLeases = - shardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_TRIM_HORIZON); + shardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, INITIAL_POSITION_TRIM_HORIZON); Map expectedShardIdCheckpointMap = new HashMap(); expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.TRIM_HORIZON); @@ -866,8 +973,10 @@ public class ShardSyncerTest { currentLeases.add(newLease("shardId-5")); currentLeases.add(newLease("shardId-7")); + final LeaseSynchronizer leaseSynchronizer = getLeaseSynchronizer(shards, currentLeases); + List newLeases = - shardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_TRIM_HORIZON); + shardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, INITIAL_POSITION_TRIM_HORIZON); Map expectedShardIdCheckpointMap = new HashMap(); expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.TRIM_HORIZON); @@ -895,8 +1004,9 @@ public class ShardSyncerTest { public final void testDetermineNewLeasesToCreateGraphBNoInitialLeasesTrim() { List shards = constructShardListForGraphB(); List currentLeases = new ArrayList(); + final LeaseSynchronizer leaseSynchronizer = getLeaseSynchronizer(shards, currentLeases); List newLeases = - shardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_TRIM_HORIZON); + shardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, INITIAL_POSITION_TRIM_HORIZON); Map expectedShardIdCheckpointMap = new HashMap(); for (int i = 0; i < 11; i++) { @@ -927,13 +1037,14 @@ public class ShardSyncerTest { List shards = constructShardListForGraphA(); List currentLeases = new ArrayList(); - currentLeases.add(newLease("shardId-3")); currentLeases.add(newLease("shardId-4")); currentLeases.add(newLease("shardId-5")); + final LeaseSynchronizer leaseSynchronizer = getLeaseSynchronizer(shards, currentLeases); + List newLeases = - shardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_AT_TIMESTAMP); + shardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, INITIAL_POSITION_AT_TIMESTAMP); Map expectedShardIdCheckpointMap = new HashMap(); expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.AT_TIMESTAMP); expectedShardIdCheckpointMap.put("shardId-9", ExtendedSequenceNumber.AT_TIMESTAMP); @@ -971,8 +1082,10 @@ public class ShardSyncerTest { currentLeases.add(newLease("shardId-5")); currentLeases.add(newLease("shardId-7")); + final LeaseSynchronizer leaseSynchronizer = getLeaseSynchronizer(shards, currentLeases); + List newLeases = - shardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_AT_TIMESTAMP); + shardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, INITIAL_POSITION_AT_TIMESTAMP); Map expectedShardIdCheckpointMap = new HashMap(); expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.AT_TIMESTAMP); expectedShardIdCheckpointMap.put("shardId-9", ExtendedSequenceNumber.AT_TIMESTAMP); @@ -998,8 +1111,9 @@ public class ShardSyncerTest { public final void testDetermineNewLeasesToCreateGraphBNoInitialLeasesAtTimestamp() { List shards = constructShardListForGraphB(); List currentLeases = new ArrayList(); + final LeaseSynchronizer leaseSynchronizer = getLeaseSynchronizer(shards, currentLeases); List newLeases = - shardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_AT_TIMESTAMP); + shardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, INITIAL_POSITION_AT_TIMESTAMP); Map expectedShardIdCheckpointMap = new HashMap(); for (int i = 0; i < shards.size(); i++) { @@ -1664,6 +1778,51 @@ public class ShardSyncerTest { testAssertShardCoveredOrAbsentTestIncompleteSplit(hashKeyRange, childHashKeyRange1, childHashKeyRange2); } + /** + * Tests that when reading from TIP, we use the AT_LATEST shard filter + * @throws Exception + */ + @Test + public final void testEmptyLeaseTableBootstrapUsesShardFilterWithAtLatest() throws Exception { + ShardFilter shardFilter = new ShardFilter().withType(ShardFilterType.AT_LATEST); + testEmptyLeaseTableUsesListShardsWithFilter(INITIAL_POSITION_LATEST, shardFilter); + } + + /** + * Tests that when reading from TRIM, we use the TRIM_HORIZON shard filter + * @throws Exception + */ + @Test + public final void testEmptyLeaseTableBootstrapUsesShardFilterWithAtTrimHorizon() throws Exception { + ShardFilter shardFilter = new ShardFilter().withType(ShardFilterType.AT_TRIM_HORIZON); + testEmptyLeaseTableUsesListShardsWithFilter(INITIAL_POSITION_TRIM_HORIZON, shardFilter); + } + + /** + * Tests that when reading from AT_TIMESTAMP, we use the AT_TIMESTAMP shard filter + * @throws Exception + */ + @Test + public final void testEmptyLeaseTableBootstrapUsesShardFilterWithAtTimestamp() throws Exception { + ShardFilter shardFilter = new ShardFilter().withType(ShardFilterType.AT_TIMESTAMP).withTimestamp(new Date(1000L)); + testEmptyLeaseTableUsesListShardsWithFilter(INITIAL_POSITION_AT_TIMESTAMP, shardFilter); + } + + private void testEmptyLeaseTableUsesListShardsWithFilter(InitialPositionInStreamExtended initialPosition, + ShardFilter shardFilter) throws Exception { + + final List shards = constructShardListForGraphA(); + final File dataFile = KinesisLocalFileDataCreator.generateTempDataFile(shards, 2, "testBootstrap1"); + dataFile.deleteOnExit(); + final IKinesisProxy kinesisProxy = spy(new KinesisLocalFileProxy(dataFile.getAbsolutePath())); + + shardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, initialPosition, + cleanupLeasesOfCompletedShards, false); + + verify(kinesisProxy, atLeastOnce()).getShardListWithFilter(shardFilter); + verify(kinesisProxy, never()).getShardList(); + } + private void testAssertShardCoveredOrAbsentTestIncompleteSplit(HashKeyRange parentHashKeyRange, HashKeyRange child1HashKeyRange, HashKeyRange child2HashKeyRange) @@ -1710,4 +1869,25 @@ public class ShardSyncerTest { return lease; } + /** + * Helper method to get appropriate LeaseSynchronizer based on available shards and current leases. If there are + * no current leases (empty lease table case), return EmptyLeaseTableSynchronizer. Else, return + * NonEmptyLeaseTableSynchronizer with appropriate lease mappings. + * + * @param shards + * @param currentLeases + * @return + */ + private LeaseSynchronizer getLeaseSynchronizer(List shards, List currentLeases) { + if (currentLeases.isEmpty()) { + return new EmptyLeaseTableSynchronizer(); + } + + final Map shardIdToShardMap = KinesisShardSyncer.constructShardIdToShardMap(shards); + final Map> shardIdToChildShardIdsMap = + KinesisShardSyncer.constructShardIdToChildShardIdsMap(shardIdToShardMap); + + return new NonEmptyLeaseTableSynchronizer(shardIdToShardMap, shardIdToChildShardIdsMap); + } + }