diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/HashKeyRangeForLease.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/HashKeyRangeForLease.java new file mode 100644 index 00000000..063451a0 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/HashKeyRangeForLease.java @@ -0,0 +1,77 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package software.amazon.kinesis.common; + +import lombok.NonNull; +import lombok.Value; +import lombok.experimental.Accessors; +import org.apache.commons.lang3.Validate; +import software.amazon.awssdk.services.kinesis.model.HashKeyRange; + +import java.math.BigInteger; + +@Value @Accessors(fluent = true) +/** + * Lease POJO to hold the starting hashkey range and ending hashkey range of kinesis shards. + */ +public class HashKeyRangeForLease { + + private final BigInteger startingHashKey; + private final BigInteger endingHashKey; + + /** + * Serialize the startingHashKey for persisting in external storage + * + * @return Serialized startingHashKey + */ + public String serializedStartingHashKey() { + return startingHashKey.toString(); + } + + /** + * Serialize the endingHashKey for persisting in external storage + * + * @return Serialized endingHashKey + */ + public String serializedEndingHashKey() { + return endingHashKey.toString(); + } + + /** + * Deserialize from serialized hashKeyRange string from external storage. + * + * @param startingHashKeyStr + * @param endingHashKeyStr + * @return HashKeyRangeForLease + */ + public static HashKeyRangeForLease deserialize(@NonNull String startingHashKeyStr, @NonNull String endingHashKeyStr) { + final BigInteger startingHashKey = new BigInteger(startingHashKeyStr); + final BigInteger endingHashKey = new BigInteger(endingHashKeyStr); + Validate.isTrue(startingHashKey.compareTo(endingHashKey) < 0, + "StartingHashKey %s must be less than EndingHashKey %s ", startingHashKeyStr, endingHashKeyStr); + return new HashKeyRangeForLease(startingHashKey, endingHashKey); + } + + /** + * Construct HashKeyRangeForLease from Kinesis HashKeyRange + * + * @param hashKeyRange + * @return HashKeyRangeForLease + */ + public static HashKeyRangeForLease fromHashKeyRange(HashKeyRange hashKeyRange) { + return deserialize(hashKeyRange.startingHashKey(), hashKeyRange.endingHashKey()); + } +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java index debb89bb..714e7f4e 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java @@ -45,6 +45,7 @@ import software.amazon.awssdk.services.kinesis.model.ShardFilter; import software.amazon.awssdk.services.kinesis.model.ShardFilterType; import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.kinesis.annotations.KinesisClientInternalApi; +import software.amazon.kinesis.common.HashKeyRangeForLease; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.StreamIdentifier; @@ -57,6 +58,8 @@ import software.amazon.kinesis.metrics.MetricsScope; import software.amazon.kinesis.metrics.MetricsUtil; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; +import static software.amazon.kinesis.common.HashKeyRangeForLease.fromHashKeyRange; + /** * Helper class to sync leases with shards of the Kinesis stream. * It will create new leases/activities when it discovers new Kinesis shards (bootstrap/resharding). @@ -757,6 +760,7 @@ public class HierarchicalShardSyncer { } newLease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); newLease.ownerSwitchesSinceCheckpoint(0L); + newLease.hashKeyRange(fromHashKeyRange(childShard.hashKeyRange())); return newLease; } @@ -772,6 +776,7 @@ public class HierarchicalShardSyncer { newLease.ownerSwitchesSinceCheckpoint(0L); newLease.streamIdentifier(streamIdentifier.serialize()); newLease.shardId(childShard.shardId()); + newLease.hashKeyRange(fromHashKeyRange(childShard.hashKeyRange())); return newLease; } @@ -794,7 +799,7 @@ public class HierarchicalShardSyncer { } newLease.parentShardIds(parentShardIds); newLease.ownerSwitchesSinceCheckpoint(0L); - + newLease.hashKeyRange(fromHashKeyRange(shard.hashKeyRange())); return newLease; } @@ -812,6 +817,7 @@ public class HierarchicalShardSyncer { newLease.ownerSwitchesSinceCheckpoint(0L); newLease.streamIdentifier(streamIdentifier.serialize()); newLease.shardId(shard.shardId()); + newLease.hashKeyRange(fromHashKeyRange(shard.hashKeyRange())); return newLease; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java index 2d0ce8c2..3df5097e 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java @@ -20,6 +20,7 @@ import lombok.NoArgsConstructor; import lombok.NonNull; import lombok.ToString; import lombok.experimental.Accessors; +import software.amazon.kinesis.common.HashKeyRangeForLease; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import java.util.Collection; @@ -96,6 +97,7 @@ public class Lease { private Long ownerSwitchesSinceCheckpoint = 0L; private Set parentShardIds = new HashSet<>(); private Set childShardIds = new HashSet<>(); + private HashKeyRangeForLease hashKeyRangeForLease; /** * Copy constructor, used by clone(). @@ -105,7 +107,8 @@ public class Lease { protected Lease(Lease lease) { this(lease.leaseKey(), lease.leaseOwner(), lease.leaseCounter(), lease.concurrencyToken(), lease.lastCounterIncrementNanos(), lease.checkpoint(), lease.pendingCheckpoint(), - lease.ownerSwitchesSinceCheckpoint(), lease.parentShardIds(), lease.childShardIds(), lease.pendingCheckpointState()); + lease.ownerSwitchesSinceCheckpoint(), lease.parentShardIds(), lease.childShardIds(), + lease.pendingCheckpointState(), lease.hashKeyRangeForLease()); } @Deprecated @@ -114,14 +117,14 @@ public class Lease { final ExtendedSequenceNumber checkpoint, final ExtendedSequenceNumber pendingCheckpoint, final Long ownerSwitchesSinceCheckpoint, final Set parentShardIds) { this(leaseKey, leaseOwner, leaseCounter, concurrencyToken, lastCounterIncrementNanos, checkpoint, pendingCheckpoint, - ownerSwitchesSinceCheckpoint, parentShardIds, new HashSet<>(), null); + ownerSwitchesSinceCheckpoint, parentShardIds, new HashSet<>(), null, null); } public Lease(final String leaseKey, final String leaseOwner, final Long leaseCounter, final UUID concurrencyToken, final Long lastCounterIncrementNanos, final ExtendedSequenceNumber checkpoint, final ExtendedSequenceNumber pendingCheckpoint, final Long ownerSwitchesSinceCheckpoint, final Set parentShardIds, final Set childShardIds, - final byte[] pendingCheckpointState) { + final byte[] pendingCheckpointState, final HashKeyRangeForLease hashKeyRangeForLease) { this.leaseKey = leaseKey; this.leaseOwner = leaseOwner; this.leaseCounter = leaseCounter; @@ -136,6 +139,7 @@ public class Lease { if (childShardIds != null) { this.childShardIds.addAll(childShardIds); } + this.hashKeyRangeForLease = hashKeyRangeForLease; this.pendingCheckpointState = pendingCheckpointState; } @@ -158,7 +162,8 @@ public class Lease { pendingCheckpoint(lease.pendingCheckpoint); pendingCheckpointState(lease.pendingCheckpointState); parentShardIds(lease.parentShardIds); - childShardIds(lease.childShardIds()); + childShardIds(lease.childShardIds); + hashKeyRange(lease.hashKeyRangeForLease); } /** @@ -274,6 +279,18 @@ public class Lease { this.childShardIds.addAll(childShardIds); } + /** + * Set the hash range key for this shard. + * @param hashKeyRangeForLease + */ + public void hashKeyRange(HashKeyRangeForLease hashKeyRangeForLease) { + if (this.hashKeyRangeForLease == null) { + this.hashKeyRangeForLease = hashKeyRangeForLease; + } else if (!this.hashKeyRangeForLease.equals(hashKeyRangeForLease)) { + throw new IllegalArgumentException("hashKeyRange is immutable"); + } + } + /** * Sets leaseOwner. * diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java index 85632500..8f293881 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java @@ -32,6 +32,7 @@ import software.amazon.awssdk.services.dynamodb.model.KeyType; import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType; import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.kinesis.annotations.KinesisClientInternalApi; +import software.amazon.kinesis.common.HashKeyRangeForLease; import software.amazon.kinesis.leases.DynamoUtils; import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.LeaseSerializer; @@ -54,6 +55,8 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer { private static final String PENDING_CHECKPOINT_STATE_KEY = "pendingCheckpointState"; private static final String PARENT_SHARD_ID_KEY = "parentShardId"; private static final String CHILD_SHARD_IDS_KEY = "childShardIds"; + private static final String STARTING_HASH_KEY = "startingHashKey"; + private static final String ENDING_HASH_KEY = "endingHashKey"; @Override public Map toDynamoRecord(final Lease lease) { @@ -85,6 +88,11 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer { result.put(PENDING_CHECKPOINT_STATE_KEY, DynamoUtils.createAttributeValue(lease.checkpoint().subSequenceNumber())); } + if(lease.hashKeyRangeForLease() != null) { + result.put(STARTING_HASH_KEY, DynamoUtils.createAttributeValue(lease.hashKeyRangeForLease().serializedStartingHashKey())); + result.put(ENDING_HASH_KEY, DynamoUtils.createAttributeValue(lease.hashKeyRangeForLease().serializedEndingHashKey())); + } + return result; } @@ -119,6 +127,12 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer { leaseToUpdate.pendingCheckpointState(DynamoUtils.safeGetByteArray(dynamoRecord, PENDING_CHECKPOINT_STATE_KEY)); + final String startingHashKey, endingHashKey; + if (!Strings.isNullOrEmpty(startingHashKey = DynamoUtils.safeGetString(dynamoRecord, STARTING_HASH_KEY)) + && !Strings.isNullOrEmpty(endingHashKey = DynamoUtils.safeGetString(dynamoRecord, ENDING_HASH_KEY))) { + leaseToUpdate.hashKeyRange(HashKeyRangeForLease.deserialize(startingHashKey, endingHashKey)); + } + return leaseToUpdate; } @@ -246,6 +260,11 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer { result.put(CHILD_SHARD_IDS_KEY, putUpdate(DynamoUtils.createAttributeValue(lease.childShardIds()))); } + if(lease.hashKeyRangeForLease() != null) { + result.put(STARTING_HASH_KEY, putUpdate(DynamoUtils.createAttributeValue(lease.hashKeyRangeForLease().serializedStartingHashKey()))); + result.put(ENDING_HASH_KEY, putUpdate(DynamoUtils.createAttributeValue(lease.hashKeyRangeForLease().serializedEndingHashKey()))); + } + return result; } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java index cbf7add7..acfa3c51 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java @@ -20,6 +20,7 @@ package software.amazon.kinesis.leases; import static org.hamcrest.CoreMatchers.equalTo; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doNothing; @@ -40,11 +41,13 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.Validate; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -57,6 +60,7 @@ import software.amazon.awssdk.services.kinesis.model.SequenceNumberRange; import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.awssdk.services.kinesis.model.ShardFilter; import software.amazon.awssdk.services.kinesis.model.ShardFilterType; +import software.amazon.kinesis.common.HashKeyRangeForLease; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.StreamIdentifier; @@ -119,7 +123,6 @@ public class HierarchicalShardSyncerTest { final List shards = Collections.emptyList(); final List leases = Collections.emptyList(); final HierarchicalShardSyncer.LeaseSynchronizer emptyLeaseTableSynchronizer = new HierarchicalShardSyncer.EmptyLeaseTableSynchronizer(); - assertThat(HierarchicalShardSyncer.determineNewLeasesToCreate(emptyLeaseTableSynchronizer, shards, leases, INITIAL_POSITION_LATEST).isEmpty(), equalTo(true)); } @@ -153,6 +156,8 @@ public class HierarchicalShardSyncerTest { final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(emptyLeaseTableSynchronizer, shards, currentLeases, INITIAL_POSITION_LATEST); + validateHashRangeinLease(newLeases); + final Set newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set expectedLeaseShardIds = new HashSet<>(Arrays.asList(shardId0, shardId1)); @@ -176,6 +181,7 @@ public class HierarchicalShardSyncerTest { final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(emptyLeaseTableSynchronizer, shards, currentLeases, INITIAL_POSITION_LATEST, new HashSet<>(), MULTI_STREAM_ARGS); + validateHashRangeinLease(newLeases); final Set newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set expectedLeaseIds = new HashSet<>( toMultiStreamLeaseList(Arrays.asList(shardId0, shardId1))); @@ -212,6 +218,7 @@ public class HierarchicalShardSyncerTest { final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, INITIAL_POSITION_LATEST, inconsistentShardIds); + validateHashRangeinLease(newLeases); final Set newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set expectedLeaseShardIds = new HashSet<>(Arrays.asList(shardId0, shardId1)); assertThat(newLeases.size(), equalTo(expectedLeaseShardIds.size())); @@ -247,12 +254,22 @@ public class HierarchicalShardSyncerTest { final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, INITIAL_POSITION_LATEST, inconsistentShardIds, MULTI_STREAM_ARGS); final Set newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); + validateHashRangeinLease(newLeases); final Set expectedLeaseShardIds = new HashSet<>( toMultiStreamLeaseList(Arrays.asList(shardId0, shardId1))); assertThat(newLeases.size(), equalTo(expectedLeaseShardIds.size())); assertThat(newLeaseKeys, equalTo(expectedLeaseShardIds)); } + private void validateHashRangeinLease(List leases) { + final Consumer leaseValidation = lease -> { + Validate.notNull(lease.hashKeyRangeForLease()); + Validate.isTrue(lease.hashKeyRangeForLease().startingHashKey() + .compareTo(lease.hashKeyRangeForLease().endingHashKey()) < 0); + }; + leases.forEach(lease -> leaseValidation.accept(lease)); + } + /** * Test bootstrapShardLeases() starting at TRIM_HORIZON ("beginning" of stream) */ @@ -294,6 +311,7 @@ public class HierarchicalShardSyncerTest { final Set extendedSequenceNumbers = requestLeases.stream().map(Lease::checkpoint) .collect(Collectors.toSet()); + validateHashRangeinLease(requestLeases); assertThat(requestLeases.size(), equalTo(expectedShardIds.size())); assertThat(requestLeaseKeys, equalTo(expectedShardIds)); assertThat(extendedSequenceNumbers.size(), equalTo(1)); @@ -328,6 +346,7 @@ public class HierarchicalShardSyncerTest { final Set extendedSequenceNumbers = requestLeases.stream().map(Lease::checkpoint) .collect(Collectors.toSet()); + validateHashRangeinLease(requestLeases); assertThat(requestLeases.size(), equalTo(expectedShardIds.size())); assertThat(requestLeaseKeys, equalTo(expectedShardIds)); assertThat(extendedSequenceNumbers.size(), equalTo(1)); @@ -375,6 +394,8 @@ public class HierarchicalShardSyncerTest { assertThat(requestLeaseKeys, equalTo(expectedShardIds)); assertThat(extendedSequenceNumbers.size(), equalTo(1)); + validateHashRangeinLease(requestLeases); + extendedSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST))); verify(shardDetector, never()).listShards(); @@ -411,7 +432,7 @@ public class HierarchicalShardSyncerTest { assertThat(requestLeases.size(), equalTo(expectedShardIds.size())); assertThat(requestLeaseKeys, equalTo(expectedShardIds)); assertThat(extendedSequenceNumbers.size(), equalTo(1)); - + validateHashRangeinLease(requestLeases); extendedSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST))); verify(shardDetector, never()).listShards(); @@ -443,7 +464,7 @@ public class HierarchicalShardSyncerTest { final Set requestLeaseKeys = requestLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set extendedSequenceNumbers = requestLeases.stream().map(Lease::checkpoint) .collect(Collectors.toSet()); - + validateHashRangeinLease(requestLeases); assertThat(requestLeases.size(), equalTo(expectedShardIds.size())); assertThat(extendedSequenceNumbers.size(), equalTo(0)); @@ -969,7 +990,7 @@ public class HierarchicalShardSyncerTest { parentShardIds.add(shard.adjacentParentShardId()); } return new Lease(shard.shardId(), leaseOwner, 0L, UUID.randomUUID(), 0L, checkpoint, null, 0L, - parentShardIds, new HashSet<>(), null); + parentShardIds, new HashSet<>(), null, HashKeyRangeForLease.fromHashKeyRange(shard.hashKeyRange())); }).collect(Collectors.toList()); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseBuilder.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseBuilder.java index cf06f586..8f825875 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseBuilder.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseBuilder.java @@ -20,6 +20,7 @@ import java.util.UUID; import lombok.Setter; import lombok.experimental.Accessors; +import software.amazon.kinesis.common.HashKeyRangeForLease; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; @Setter @@ -36,9 +37,11 @@ public class LeaseBuilder { private Set parentShardIds = new HashSet<>(); private Set childShardIds = new HashSet<>(); private byte[] pendingCheckpointState; + private HashKeyRangeForLease hashKeyRangeForLease; public Lease build() { return new Lease(leaseKey, leaseOwner, leaseCounter, concurrencyToken, lastCounterIncrementNanos, checkpoint, - pendingCheckpoint, ownerSwitchesSinceCheckpoint, parentShardIds, childShardIds, pendingCheckpointState); + pendingCheckpoint, ownerSwitchesSinceCheckpoint, parentShardIds, childShardIds, + pendingCheckpointState, hashKeyRangeForLease); } } \ No newline at end of file diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardObjectHelper.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardObjectHelper.java index f07a38f4..ee2504d8 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardObjectHelper.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardObjectHelper.java @@ -68,7 +68,7 @@ public class ShardObjectHelper { String parentShardId, String adjacentParentShardId, SequenceNumberRange sequenceNumberRange) { - return newShard(shardId, parentShardId, adjacentParentShardId, sequenceNumberRange, null); + return newShard(shardId, parentShardId, adjacentParentShardId, sequenceNumberRange, HashKeyRange.builder().startingHashKey("1").endingHashKey("100").build()); } /** Helper method to create a new shard object. diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerTest.java index 1daf85b8..bfff4e92 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerTest.java @@ -37,6 +37,7 @@ import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; +import software.amazon.kinesis.common.HashKeyRangeForLease; import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.exceptions.DependencyException; @@ -55,7 +56,8 @@ public class DynamoDBLeaseRenewerTest { private LeaseRefresher leaseRefresher; private static Lease newLease(String leaseKey) { - return new Lease(leaseKey, "LeaseOwner", 0L, UUID.randomUUID(), System.nanoTime(), null, null, null, new HashSet<>(), new HashSet<>(), null); + return new Lease(leaseKey, "LeaseOwner", 0L, UUID.randomUUID(), System.nanoTime(), null, null, null, + new HashSet<>(), new HashSet<>(), null, HashKeyRangeForLease.deserialize("1", "2")); } @Before