Hashrange persistence changes

This commit is contained in:
Ashwin Giridharan 2020-05-05 12:58:16 -07:00
commit c479984fb4
8 changed files with 157 additions and 12 deletions

View file

@ -0,0 +1,77 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.common;
import lombok.NonNull;
import lombok.Value;
import lombok.experimental.Accessors;
import org.apache.commons.lang3.Validate;
import software.amazon.awssdk.services.kinesis.model.HashKeyRange;
import java.math.BigInteger;
@Value @Accessors(fluent = true)
/**
* Lease POJO to hold the starting hashkey range and ending hashkey range of kinesis shards.
*/
public class HashKeyRangeForLease {
private final BigInteger startingHashKey;
private final BigInteger endingHashKey;
/**
* Serialize the startingHashKey for persisting in external storage
*
* @return Serialized startingHashKey
*/
public String serializedStartingHashKey() {
return startingHashKey.toString();
}
/**
* Serialize the endingHashKey for persisting in external storage
*
* @return Serialized endingHashKey
*/
public String serializedEndingHashKey() {
return endingHashKey.toString();
}
/**
* Deserialize from serialized hashKeyRange string from external storage.
*
* @param startingHashKeyStr
* @param endingHashKeyStr
* @return HashKeyRangeForLease
*/
public static HashKeyRangeForLease deserialize(@NonNull String startingHashKeyStr, @NonNull String endingHashKeyStr) {
final BigInteger startingHashKey = new BigInteger(startingHashKeyStr);
final BigInteger endingHashKey = new BigInteger(endingHashKeyStr);
Validate.isTrue(startingHashKey.compareTo(endingHashKey) < 0,
"StartingHashKey %s must be less than EndingHashKey %s ", startingHashKeyStr, endingHashKeyStr);
return new HashKeyRangeForLease(startingHashKey, endingHashKey);
}
/**
* Construct HashKeyRangeForLease from Kinesis HashKeyRange
*
* @param hashKeyRange
* @return HashKeyRangeForLease
*/
public static HashKeyRangeForLease fromHashKeyRange(HashKeyRange hashKeyRange) {
return deserialize(hashKeyRange.startingHashKey(), hashKeyRange.endingHashKey());
}
}

View file

@ -45,6 +45,7 @@ import software.amazon.awssdk.services.kinesis.model.ShardFilter;
import software.amazon.awssdk.services.kinesis.model.ShardFilterType; import software.amazon.awssdk.services.kinesis.model.ShardFilterType;
import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.HashKeyRangeForLease;
import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.common.StreamIdentifier;
@ -57,6 +58,8 @@ import software.amazon.kinesis.metrics.MetricsScope;
import software.amazon.kinesis.metrics.MetricsUtil; import software.amazon.kinesis.metrics.MetricsUtil;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import static software.amazon.kinesis.common.HashKeyRangeForLease.fromHashKeyRange;
/** /**
* Helper class to sync leases with shards of the Kinesis stream. * Helper class to sync leases with shards of the Kinesis stream.
* It will create new leases/activities when it discovers new Kinesis shards (bootstrap/resharding). * It will create new leases/activities when it discovers new Kinesis shards (bootstrap/resharding).
@ -757,6 +760,7 @@ public class HierarchicalShardSyncer {
} }
newLease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); newLease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
newLease.ownerSwitchesSinceCheckpoint(0L); newLease.ownerSwitchesSinceCheckpoint(0L);
newLease.hashKeyRange(fromHashKeyRange(childShard.hashKeyRange()));
return newLease; return newLease;
} }
@ -772,6 +776,7 @@ public class HierarchicalShardSyncer {
newLease.ownerSwitchesSinceCheckpoint(0L); newLease.ownerSwitchesSinceCheckpoint(0L);
newLease.streamIdentifier(streamIdentifier.serialize()); newLease.streamIdentifier(streamIdentifier.serialize());
newLease.shardId(childShard.shardId()); newLease.shardId(childShard.shardId());
newLease.hashKeyRange(fromHashKeyRange(childShard.hashKeyRange()));
return newLease; return newLease;
} }
@ -794,7 +799,7 @@ public class HierarchicalShardSyncer {
} }
newLease.parentShardIds(parentShardIds); newLease.parentShardIds(parentShardIds);
newLease.ownerSwitchesSinceCheckpoint(0L); newLease.ownerSwitchesSinceCheckpoint(0L);
newLease.hashKeyRange(fromHashKeyRange(shard.hashKeyRange()));
return newLease; return newLease;
} }
@ -812,6 +817,7 @@ public class HierarchicalShardSyncer {
newLease.ownerSwitchesSinceCheckpoint(0L); newLease.ownerSwitchesSinceCheckpoint(0L);
newLease.streamIdentifier(streamIdentifier.serialize()); newLease.streamIdentifier(streamIdentifier.serialize());
newLease.shardId(shard.shardId()); newLease.shardId(shard.shardId());
newLease.hashKeyRange(fromHashKeyRange(shard.hashKeyRange()));
return newLease; return newLease;
} }

View file

@ -20,6 +20,7 @@ import lombok.NoArgsConstructor;
import lombok.NonNull; import lombok.NonNull;
import lombok.ToString; import lombok.ToString;
import lombok.experimental.Accessors; import lombok.experimental.Accessors;
import software.amazon.kinesis.common.HashKeyRangeForLease;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import java.util.Collection; import java.util.Collection;
@ -96,6 +97,7 @@ public class Lease {
private Long ownerSwitchesSinceCheckpoint = 0L; private Long ownerSwitchesSinceCheckpoint = 0L;
private Set<String> parentShardIds = new HashSet<>(); private Set<String> parentShardIds = new HashSet<>();
private Set<String> childShardIds = new HashSet<>(); private Set<String> childShardIds = new HashSet<>();
private HashKeyRangeForLease hashKeyRangeForLease;
/** /**
* Copy constructor, used by clone(). * Copy constructor, used by clone().
@ -105,7 +107,8 @@ public class Lease {
protected Lease(Lease lease) { protected Lease(Lease lease) {
this(lease.leaseKey(), lease.leaseOwner(), lease.leaseCounter(), lease.concurrencyToken(), this(lease.leaseKey(), lease.leaseOwner(), lease.leaseCounter(), lease.concurrencyToken(),
lease.lastCounterIncrementNanos(), lease.checkpoint(), lease.pendingCheckpoint(), lease.lastCounterIncrementNanos(), lease.checkpoint(), lease.pendingCheckpoint(),
lease.ownerSwitchesSinceCheckpoint(), lease.parentShardIds(), lease.childShardIds(), lease.pendingCheckpointState()); lease.ownerSwitchesSinceCheckpoint(), lease.parentShardIds(), lease.childShardIds(),
lease.pendingCheckpointState(), lease.hashKeyRangeForLease());
} }
@Deprecated @Deprecated
@ -114,14 +117,14 @@ public class Lease {
final ExtendedSequenceNumber checkpoint, final ExtendedSequenceNumber pendingCheckpoint, final ExtendedSequenceNumber checkpoint, final ExtendedSequenceNumber pendingCheckpoint,
final Long ownerSwitchesSinceCheckpoint, final Set<String> parentShardIds) { final Long ownerSwitchesSinceCheckpoint, final Set<String> parentShardIds) {
this(leaseKey, leaseOwner, leaseCounter, concurrencyToken, lastCounterIncrementNanos, checkpoint, pendingCheckpoint, this(leaseKey, leaseOwner, leaseCounter, concurrencyToken, lastCounterIncrementNanos, checkpoint, pendingCheckpoint,
ownerSwitchesSinceCheckpoint, parentShardIds, new HashSet<>(), null); ownerSwitchesSinceCheckpoint, parentShardIds, new HashSet<>(), null, null);
} }
public Lease(final String leaseKey, final String leaseOwner, final Long leaseCounter, public Lease(final String leaseKey, final String leaseOwner, final Long leaseCounter,
final UUID concurrencyToken, final Long lastCounterIncrementNanos, final UUID concurrencyToken, final Long lastCounterIncrementNanos,
final ExtendedSequenceNumber checkpoint, final ExtendedSequenceNumber pendingCheckpoint, final ExtendedSequenceNumber checkpoint, final ExtendedSequenceNumber pendingCheckpoint,
final Long ownerSwitchesSinceCheckpoint, final Set<String> parentShardIds, final Set<String> childShardIds, final Long ownerSwitchesSinceCheckpoint, final Set<String> parentShardIds, final Set<String> childShardIds,
final byte[] pendingCheckpointState) { final byte[] pendingCheckpointState, final HashKeyRangeForLease hashKeyRangeForLease) {
this.leaseKey = leaseKey; this.leaseKey = leaseKey;
this.leaseOwner = leaseOwner; this.leaseOwner = leaseOwner;
this.leaseCounter = leaseCounter; this.leaseCounter = leaseCounter;
@ -136,6 +139,7 @@ public class Lease {
if (childShardIds != null) { if (childShardIds != null) {
this.childShardIds.addAll(childShardIds); this.childShardIds.addAll(childShardIds);
} }
this.hashKeyRangeForLease = hashKeyRangeForLease;
this.pendingCheckpointState = pendingCheckpointState; this.pendingCheckpointState = pendingCheckpointState;
} }
@ -158,7 +162,8 @@ public class Lease {
pendingCheckpoint(lease.pendingCheckpoint); pendingCheckpoint(lease.pendingCheckpoint);
pendingCheckpointState(lease.pendingCheckpointState); pendingCheckpointState(lease.pendingCheckpointState);
parentShardIds(lease.parentShardIds); parentShardIds(lease.parentShardIds);
childShardIds(lease.childShardIds()); childShardIds(lease.childShardIds);
hashKeyRange(lease.hashKeyRangeForLease);
} }
/** /**
@ -274,6 +279,18 @@ public class Lease {
this.childShardIds.addAll(childShardIds); this.childShardIds.addAll(childShardIds);
} }
/**
* Set the hash range key for this shard.
* @param hashKeyRangeForLease
*/
public void hashKeyRange(HashKeyRangeForLease hashKeyRangeForLease) {
if (this.hashKeyRangeForLease == null) {
this.hashKeyRangeForLease = hashKeyRangeForLease;
} else if (!this.hashKeyRangeForLease.equals(hashKeyRangeForLease)) {
throw new IllegalArgumentException("hashKeyRange is immutable");
}
}
/** /**
* Sets leaseOwner. * Sets leaseOwner.
* *

View file

@ -32,6 +32,7 @@ import software.amazon.awssdk.services.dynamodb.model.KeyType;
import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType; import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.HashKeyRangeForLease;
import software.amazon.kinesis.leases.DynamoUtils; import software.amazon.kinesis.leases.DynamoUtils;
import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseSerializer; import software.amazon.kinesis.leases.LeaseSerializer;
@ -54,6 +55,8 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer {
private static final String PENDING_CHECKPOINT_STATE_KEY = "pendingCheckpointState"; private static final String PENDING_CHECKPOINT_STATE_KEY = "pendingCheckpointState";
private static final String PARENT_SHARD_ID_KEY = "parentShardId"; private static final String PARENT_SHARD_ID_KEY = "parentShardId";
private static final String CHILD_SHARD_IDS_KEY = "childShardIds"; private static final String CHILD_SHARD_IDS_KEY = "childShardIds";
private static final String STARTING_HASH_KEY = "startingHashKey";
private static final String ENDING_HASH_KEY = "endingHashKey";
@Override @Override
public Map<String, AttributeValue> toDynamoRecord(final Lease lease) { public Map<String, AttributeValue> toDynamoRecord(final Lease lease) {
@ -85,6 +88,11 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer {
result.put(PENDING_CHECKPOINT_STATE_KEY, DynamoUtils.createAttributeValue(lease.checkpoint().subSequenceNumber())); result.put(PENDING_CHECKPOINT_STATE_KEY, DynamoUtils.createAttributeValue(lease.checkpoint().subSequenceNumber()));
} }
if(lease.hashKeyRangeForLease() != null) {
result.put(STARTING_HASH_KEY, DynamoUtils.createAttributeValue(lease.hashKeyRangeForLease().serializedStartingHashKey()));
result.put(ENDING_HASH_KEY, DynamoUtils.createAttributeValue(lease.hashKeyRangeForLease().serializedEndingHashKey()));
}
return result; return result;
} }
@ -119,6 +127,12 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer {
leaseToUpdate.pendingCheckpointState(DynamoUtils.safeGetByteArray(dynamoRecord, PENDING_CHECKPOINT_STATE_KEY)); leaseToUpdate.pendingCheckpointState(DynamoUtils.safeGetByteArray(dynamoRecord, PENDING_CHECKPOINT_STATE_KEY));
final String startingHashKey, endingHashKey;
if (!Strings.isNullOrEmpty(startingHashKey = DynamoUtils.safeGetString(dynamoRecord, STARTING_HASH_KEY))
&& !Strings.isNullOrEmpty(endingHashKey = DynamoUtils.safeGetString(dynamoRecord, ENDING_HASH_KEY))) {
leaseToUpdate.hashKeyRange(HashKeyRangeForLease.deserialize(startingHashKey, endingHashKey));
}
return leaseToUpdate; return leaseToUpdate;
} }
@ -246,6 +260,11 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer {
result.put(CHILD_SHARD_IDS_KEY, putUpdate(DynamoUtils.createAttributeValue(lease.childShardIds()))); result.put(CHILD_SHARD_IDS_KEY, putUpdate(DynamoUtils.createAttributeValue(lease.childShardIds())));
} }
if(lease.hashKeyRangeForLease() != null) {
result.put(STARTING_HASH_KEY, putUpdate(DynamoUtils.createAttributeValue(lease.hashKeyRangeForLease().serializedStartingHashKey())));
result.put(ENDING_HASH_KEY, putUpdate(DynamoUtils.createAttributeValue(lease.hashKeyRangeForLease().serializedEndingHashKey())));
}
return result; return result;
} }

View file

@ -20,6 +20,7 @@ package software.amazon.kinesis.leases;
import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doNothing;
@ -40,11 +41,13 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.function.Consumer;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.IntStream; import java.util.stream.IntStream;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
@ -57,6 +60,7 @@ import software.amazon.awssdk.services.kinesis.model.SequenceNumberRange;
import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardFilter; import software.amazon.awssdk.services.kinesis.model.ShardFilter;
import software.amazon.awssdk.services.kinesis.model.ShardFilterType; import software.amazon.awssdk.services.kinesis.model.ShardFilterType;
import software.amazon.kinesis.common.HashKeyRangeForLease;
import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.common.StreamIdentifier;
@ -119,7 +123,6 @@ public class HierarchicalShardSyncerTest {
final List<Shard> shards = Collections.emptyList(); final List<Shard> shards = Collections.emptyList();
final List<Lease> leases = Collections.emptyList(); final List<Lease> leases = Collections.emptyList();
final HierarchicalShardSyncer.LeaseSynchronizer emptyLeaseTableSynchronizer = new HierarchicalShardSyncer.EmptyLeaseTableSynchronizer(); final HierarchicalShardSyncer.LeaseSynchronizer emptyLeaseTableSynchronizer = new HierarchicalShardSyncer.EmptyLeaseTableSynchronizer();
assertThat(HierarchicalShardSyncer.determineNewLeasesToCreate(emptyLeaseTableSynchronizer, shards, leases, assertThat(HierarchicalShardSyncer.determineNewLeasesToCreate(emptyLeaseTableSynchronizer, shards, leases,
INITIAL_POSITION_LATEST).isEmpty(), equalTo(true)); INITIAL_POSITION_LATEST).isEmpty(), equalTo(true));
} }
@ -153,6 +156,8 @@ public class HierarchicalShardSyncerTest {
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(emptyLeaseTableSynchronizer, final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(emptyLeaseTableSynchronizer,
shards, currentLeases, INITIAL_POSITION_LATEST); shards, currentLeases, INITIAL_POSITION_LATEST);
validateHashRangeinLease(newLeases);
final Set<String> newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set<String> newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final Set<String> expectedLeaseShardIds = new HashSet<>(Arrays.asList(shardId0, shardId1)); final Set<String> expectedLeaseShardIds = new HashSet<>(Arrays.asList(shardId0, shardId1));
@ -176,6 +181,7 @@ public class HierarchicalShardSyncerTest {
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(emptyLeaseTableSynchronizer, final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(emptyLeaseTableSynchronizer,
shards, currentLeases, INITIAL_POSITION_LATEST, new HashSet<>(), MULTI_STREAM_ARGS); shards, currentLeases, INITIAL_POSITION_LATEST, new HashSet<>(), MULTI_STREAM_ARGS);
validateHashRangeinLease(newLeases);
final Set<String> newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set<String> newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final Set<String> expectedLeaseIds = new HashSet<>( final Set<String> expectedLeaseIds = new HashSet<>(
toMultiStreamLeaseList(Arrays.asList(shardId0, shardId1))); toMultiStreamLeaseList(Arrays.asList(shardId0, shardId1)));
@ -212,6 +218,7 @@ public class HierarchicalShardSyncerTest {
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases,
INITIAL_POSITION_LATEST, inconsistentShardIds); INITIAL_POSITION_LATEST, inconsistentShardIds);
validateHashRangeinLease(newLeases);
final Set<String> newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set<String> newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final Set<String> expectedLeaseShardIds = new HashSet<>(Arrays.asList(shardId0, shardId1)); final Set<String> expectedLeaseShardIds = new HashSet<>(Arrays.asList(shardId0, shardId1));
assertThat(newLeases.size(), equalTo(expectedLeaseShardIds.size())); assertThat(newLeases.size(), equalTo(expectedLeaseShardIds.size()));
@ -247,12 +254,22 @@ public class HierarchicalShardSyncerTest {
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases,
INITIAL_POSITION_LATEST, inconsistentShardIds, MULTI_STREAM_ARGS); INITIAL_POSITION_LATEST, inconsistentShardIds, MULTI_STREAM_ARGS);
final Set<String> newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set<String> newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
validateHashRangeinLease(newLeases);
final Set<String> expectedLeaseShardIds = new HashSet<>( final Set<String> expectedLeaseShardIds = new HashSet<>(
toMultiStreamLeaseList(Arrays.asList(shardId0, shardId1))); toMultiStreamLeaseList(Arrays.asList(shardId0, shardId1)));
assertThat(newLeases.size(), equalTo(expectedLeaseShardIds.size())); assertThat(newLeases.size(), equalTo(expectedLeaseShardIds.size()));
assertThat(newLeaseKeys, equalTo(expectedLeaseShardIds)); assertThat(newLeaseKeys, equalTo(expectedLeaseShardIds));
} }
private void validateHashRangeinLease(List<Lease> leases) {
final Consumer<Lease> leaseValidation = lease -> {
Validate.notNull(lease.hashKeyRangeForLease());
Validate.isTrue(lease.hashKeyRangeForLease().startingHashKey()
.compareTo(lease.hashKeyRangeForLease().endingHashKey()) < 0);
};
leases.forEach(lease -> leaseValidation.accept(lease));
}
/** /**
* Test bootstrapShardLeases() starting at TRIM_HORIZON ("beginning" of stream) * Test bootstrapShardLeases() starting at TRIM_HORIZON ("beginning" of stream)
*/ */
@ -294,6 +311,7 @@ public class HierarchicalShardSyncerTest {
final Set<ExtendedSequenceNumber> extendedSequenceNumbers = requestLeases.stream().map(Lease::checkpoint) final Set<ExtendedSequenceNumber> extendedSequenceNumbers = requestLeases.stream().map(Lease::checkpoint)
.collect(Collectors.toSet()); .collect(Collectors.toSet());
validateHashRangeinLease(requestLeases);
assertThat(requestLeases.size(), equalTo(expectedShardIds.size())); assertThat(requestLeases.size(), equalTo(expectedShardIds.size()));
assertThat(requestLeaseKeys, equalTo(expectedShardIds)); assertThat(requestLeaseKeys, equalTo(expectedShardIds));
assertThat(extendedSequenceNumbers.size(), equalTo(1)); assertThat(extendedSequenceNumbers.size(), equalTo(1));
@ -328,6 +346,7 @@ public class HierarchicalShardSyncerTest {
final Set<ExtendedSequenceNumber> extendedSequenceNumbers = requestLeases.stream().map(Lease::checkpoint) final Set<ExtendedSequenceNumber> extendedSequenceNumbers = requestLeases.stream().map(Lease::checkpoint)
.collect(Collectors.toSet()); .collect(Collectors.toSet());
validateHashRangeinLease(requestLeases);
assertThat(requestLeases.size(), equalTo(expectedShardIds.size())); assertThat(requestLeases.size(), equalTo(expectedShardIds.size()));
assertThat(requestLeaseKeys, equalTo(expectedShardIds)); assertThat(requestLeaseKeys, equalTo(expectedShardIds));
assertThat(extendedSequenceNumbers.size(), equalTo(1)); assertThat(extendedSequenceNumbers.size(), equalTo(1));
@ -375,6 +394,8 @@ public class HierarchicalShardSyncerTest {
assertThat(requestLeaseKeys, equalTo(expectedShardIds)); assertThat(requestLeaseKeys, equalTo(expectedShardIds));
assertThat(extendedSequenceNumbers.size(), equalTo(1)); assertThat(extendedSequenceNumbers.size(), equalTo(1));
validateHashRangeinLease(requestLeases);
extendedSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST))); extendedSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST)));
verify(shardDetector, never()).listShards(); verify(shardDetector, never()).listShards();
@ -411,7 +432,7 @@ public class HierarchicalShardSyncerTest {
assertThat(requestLeases.size(), equalTo(expectedShardIds.size())); assertThat(requestLeases.size(), equalTo(expectedShardIds.size()));
assertThat(requestLeaseKeys, equalTo(expectedShardIds)); assertThat(requestLeaseKeys, equalTo(expectedShardIds));
assertThat(extendedSequenceNumbers.size(), equalTo(1)); assertThat(extendedSequenceNumbers.size(), equalTo(1));
validateHashRangeinLease(requestLeases);
extendedSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST))); extendedSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST)));
verify(shardDetector, never()).listShards(); verify(shardDetector, never()).listShards();
@ -443,7 +464,7 @@ public class HierarchicalShardSyncerTest {
final Set<String> requestLeaseKeys = requestLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set<String> requestLeaseKeys = requestLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final Set<ExtendedSequenceNumber> extendedSequenceNumbers = requestLeases.stream().map(Lease::checkpoint) final Set<ExtendedSequenceNumber> extendedSequenceNumbers = requestLeases.stream().map(Lease::checkpoint)
.collect(Collectors.toSet()); .collect(Collectors.toSet());
validateHashRangeinLease(requestLeases);
assertThat(requestLeases.size(), equalTo(expectedShardIds.size())); assertThat(requestLeases.size(), equalTo(expectedShardIds.size()));
assertThat(extendedSequenceNumbers.size(), equalTo(0)); assertThat(extendedSequenceNumbers.size(), equalTo(0));
@ -969,7 +990,7 @@ public class HierarchicalShardSyncerTest {
parentShardIds.add(shard.adjacentParentShardId()); parentShardIds.add(shard.adjacentParentShardId());
} }
return new Lease(shard.shardId(), leaseOwner, 0L, UUID.randomUUID(), 0L, checkpoint, null, 0L, return new Lease(shard.shardId(), leaseOwner, 0L, UUID.randomUUID(), 0L, checkpoint, null, 0L,
parentShardIds, new HashSet<>(), null); parentShardIds, new HashSet<>(), null, HashKeyRangeForLease.fromHashKeyRange(shard.hashKeyRange()));
}).collect(Collectors.toList()); }).collect(Collectors.toList());
} }

View file

@ -20,6 +20,7 @@ import java.util.UUID;
import lombok.Setter; import lombok.Setter;
import lombok.experimental.Accessors; import lombok.experimental.Accessors;
import software.amazon.kinesis.common.HashKeyRangeForLease;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
@Setter @Setter
@ -36,9 +37,11 @@ public class LeaseBuilder {
private Set<String> parentShardIds = new HashSet<>(); private Set<String> parentShardIds = new HashSet<>();
private Set<String> childShardIds = new HashSet<>(); private Set<String> childShardIds = new HashSet<>();
private byte[] pendingCheckpointState; private byte[] pendingCheckpointState;
private HashKeyRangeForLease hashKeyRangeForLease;
public Lease build() { public Lease build() {
return new Lease(leaseKey, leaseOwner, leaseCounter, concurrencyToken, lastCounterIncrementNanos, checkpoint, return new Lease(leaseKey, leaseOwner, leaseCounter, concurrencyToken, lastCounterIncrementNanos, checkpoint,
pendingCheckpoint, ownerSwitchesSinceCheckpoint, parentShardIds, childShardIds, pendingCheckpointState); pendingCheckpoint, ownerSwitchesSinceCheckpoint, parentShardIds, childShardIds,
pendingCheckpointState, hashKeyRangeForLease);
} }
} }

View file

@ -68,7 +68,7 @@ public class ShardObjectHelper {
String parentShardId, String parentShardId,
String adjacentParentShardId, String adjacentParentShardId,
SequenceNumberRange sequenceNumberRange) { SequenceNumberRange sequenceNumberRange) {
return newShard(shardId, parentShardId, adjacentParentShardId, sequenceNumberRange, null); return newShard(shardId, parentShardId, adjacentParentShardId, sequenceNumberRange, HashKeyRange.builder().startingHashKey("1").endingHashKey("100").build());
} }
/** Helper method to create a new shard object. /** Helper method to create a new shard object.

View file

@ -37,6 +37,7 @@ import org.junit.runner.RunWith;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner; import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.kinesis.common.HashKeyRangeForLease;
import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.DependencyException;
@ -55,7 +56,8 @@ public class DynamoDBLeaseRenewerTest {
private LeaseRefresher leaseRefresher; private LeaseRefresher leaseRefresher;
private static Lease newLease(String leaseKey) { private static Lease newLease(String leaseKey) {
return new Lease(leaseKey, "LeaseOwner", 0L, UUID.randomUUID(), System.nanoTime(), null, null, null, new HashSet<>(), new HashSet<>(), null); return new Lease(leaseKey, "LeaseOwner", 0L, UUID.randomUUID(), System.nanoTime(), null, null, null,
new HashSet<>(), new HashSet<>(), null, HashKeyRangeForLease.deserialize("1", "2"));
} }
@Before @Before