From 4d058ecf518fa4fdb23b3ca990eca6e7348d8a08 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Tue, 28 Apr 2020 15:53:22 -0700 Subject: [PATCH 1/5] Persisting hashrange in lease table --- .../kinesis/common/HashKeyRangeForLease.java | 54 +++++++++++++++++++ .../leases/HierarchicalShardSyncer.java | 8 ++- .../software/amazon/kinesis/leases/Lease.java | 27 ++++++++-- .../dynamodb/DynamoDBLeaseSerializer.java | 15 ++++++ .../leases/HierarchicalShardSyncerTest.java | 3 +- .../amazon/kinesis/leases/LeaseBuilder.java | 5 +- .../kinesis/leases/ShardObjectHelper.java | 2 +- .../dynamodb/DynamoDBLeaseRenewerTest.java | 4 +- 8 files changed, 109 insertions(+), 9 deletions(-) create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/HashKeyRangeForLease.java diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/HashKeyRangeForLease.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/HashKeyRangeForLease.java new file mode 100644 index 00000000..1d4529ff --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/HashKeyRangeForLease.java @@ -0,0 +1,54 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package software.amazon.kinesis.common; + +import com.google.common.base.Joiner; +import lombok.Data; +import lombok.experimental.Accessors; +import org.apache.commons.lang3.Validate; +import software.amazon.awssdk.services.kinesis.model.HashKeyRange; + +@Data +@Accessors(fluent = true) +public class HashKeyRangeForLease { + + private static final String DELIM = ":"; + + private final String startingHashKey; + private final String endingHashKey; + + public String serialize() { + return Joiner.on(DELIM).join(startingHashKey, endingHashKey); + } + + @Override + public String toString() { + return serialize(); + } + + public static HashKeyRangeForLease deserialize(String hashKeyRange) { + final String[] hashKeyTokens = hashKeyRange.split(DELIM); + Validate.isTrue(hashKeyTokens.length == 2, "HashKeyRange should have exactly two tokens"); + // Assuming that startingHashKey and endingHashRange are not same. + Validate.isTrue(!hashKeyTokens[0].equals(hashKeyTokens[1]), "StartingHashKey and EndingHashKey should not be same"); + Validate.isTrue(hashKeyTokens[0].compareTo(hashKeyTokens[1]) < 0, "StartingHashKey must be less than EndingHashKey"); + return new HashKeyRangeForLease(hashKeyTokens[0], hashKeyTokens[1]); + } + + public static HashKeyRangeForLease fromHashKeyRange(HashKeyRange hashKeyRange) { + return new HashKeyRangeForLease(hashKeyRange.startingHashKey(), hashKeyRange.endingHashKey()); + } +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java index debb89bb..714e7f4e 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java @@ -45,6 +45,7 @@ import software.amazon.awssdk.services.kinesis.model.ShardFilter; import software.amazon.awssdk.services.kinesis.model.ShardFilterType; import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.kinesis.annotations.KinesisClientInternalApi; +import software.amazon.kinesis.common.HashKeyRangeForLease; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.StreamIdentifier; @@ -57,6 +58,8 @@ import software.amazon.kinesis.metrics.MetricsScope; import software.amazon.kinesis.metrics.MetricsUtil; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; +import static software.amazon.kinesis.common.HashKeyRangeForLease.fromHashKeyRange; + /** * Helper class to sync leases with shards of the Kinesis stream. * It will create new leases/activities when it discovers new Kinesis shards (bootstrap/resharding). @@ -757,6 +760,7 @@ public class HierarchicalShardSyncer { } newLease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); newLease.ownerSwitchesSinceCheckpoint(0L); + newLease.hashKeyRange(fromHashKeyRange(childShard.hashKeyRange())); return newLease; } @@ -772,6 +776,7 @@ public class HierarchicalShardSyncer { newLease.ownerSwitchesSinceCheckpoint(0L); newLease.streamIdentifier(streamIdentifier.serialize()); newLease.shardId(childShard.shardId()); + newLease.hashKeyRange(fromHashKeyRange(childShard.hashKeyRange())); return newLease; } @@ -794,7 +799,7 @@ public class HierarchicalShardSyncer { } newLease.parentShardIds(parentShardIds); newLease.ownerSwitchesSinceCheckpoint(0L); - + newLease.hashKeyRange(fromHashKeyRange(shard.hashKeyRange())); return newLease; } @@ -812,6 +817,7 @@ public class HierarchicalShardSyncer { newLease.ownerSwitchesSinceCheckpoint(0L); newLease.streamIdentifier(streamIdentifier.serialize()); newLease.shardId(shard.shardId()); + newLease.hashKeyRange(fromHashKeyRange(shard.hashKeyRange())); return newLease; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java index 2d0ce8c2..c21e2203 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java @@ -20,6 +20,7 @@ import lombok.NoArgsConstructor; import lombok.NonNull; import lombok.ToString; import lombok.experimental.Accessors; +import software.amazon.kinesis.common.HashKeyRangeForLease; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import java.util.Collection; @@ -96,6 +97,7 @@ public class Lease { private Long ownerSwitchesSinceCheckpoint = 0L; private Set parentShardIds = new HashSet<>(); private Set childShardIds = new HashSet<>(); + private HashKeyRangeForLease hashKeyRangeForLease; /** * Copy constructor, used by clone(). @@ -105,7 +107,8 @@ public class Lease { protected Lease(Lease lease) { this(lease.leaseKey(), lease.leaseOwner(), lease.leaseCounter(), lease.concurrencyToken(), lease.lastCounterIncrementNanos(), lease.checkpoint(), lease.pendingCheckpoint(), - lease.ownerSwitchesSinceCheckpoint(), lease.parentShardIds(), lease.childShardIds(), lease.pendingCheckpointState()); + lease.ownerSwitchesSinceCheckpoint(), lease.parentShardIds(), lease.childShardIds(), + lease.pendingCheckpointState(), lease.hashKeyRangeForLease()); } @Deprecated @@ -114,14 +117,14 @@ public class Lease { final ExtendedSequenceNumber checkpoint, final ExtendedSequenceNumber pendingCheckpoint, final Long ownerSwitchesSinceCheckpoint, final Set parentShardIds) { this(leaseKey, leaseOwner, leaseCounter, concurrencyToken, lastCounterIncrementNanos, checkpoint, pendingCheckpoint, - ownerSwitchesSinceCheckpoint, parentShardIds, new HashSet<>(), null); + ownerSwitchesSinceCheckpoint, parentShardIds, new HashSet<>(), null, null); } public Lease(final String leaseKey, final String leaseOwner, final Long leaseCounter, final UUID concurrencyToken, final Long lastCounterIncrementNanos, final ExtendedSequenceNumber checkpoint, final ExtendedSequenceNumber pendingCheckpoint, final Long ownerSwitchesSinceCheckpoint, final Set parentShardIds, final Set childShardIds, - final byte[] pendingCheckpointState) { + final byte[] pendingCheckpointState, final HashKeyRangeForLease hashKeyRangeForLease) { this.leaseKey = leaseKey; this.leaseOwner = leaseOwner; this.leaseCounter = leaseCounter; @@ -136,6 +139,9 @@ public class Lease { if (childShardIds != null) { this.childShardIds.addAll(childShardIds); } + if (hashKeyRangeForLease != null) { + this.hashKeyRangeForLease = hashKeyRangeForLease; + } this.pendingCheckpointState = pendingCheckpointState; } @@ -158,7 +164,8 @@ public class Lease { pendingCheckpoint(lease.pendingCheckpoint); pendingCheckpointState(lease.pendingCheckpointState); parentShardIds(lease.parentShardIds); - childShardIds(lease.childShardIds()); + childShardIds(lease.childShardIds); + hashKeyRange(lease.hashKeyRangeForLease); } /** @@ -274,6 +281,18 @@ public class Lease { this.childShardIds.addAll(childShardIds); } + /** + * Set the hash range key for this shard. + * @param hashKeyRangeForLease + */ + public void hashKeyRange(final HashKeyRangeForLease hashKeyRangeForLease) { + if(this.hashKeyRangeForLease == null) { + this.hashKeyRangeForLease = hashKeyRangeForLease; + } else { + throw new IllegalArgumentException("hashKeyRange is immutable"); + } + } + /** * Sets leaseOwner. * diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java index 85632500..61035be8 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java @@ -32,6 +32,7 @@ import software.amazon.awssdk.services.dynamodb.model.KeyType; import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType; import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.kinesis.annotations.KinesisClientInternalApi; +import software.amazon.kinesis.common.HashKeyRangeForLease; import software.amazon.kinesis.leases.DynamoUtils; import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.LeaseSerializer; @@ -54,6 +55,7 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer { private static final String PENDING_CHECKPOINT_STATE_KEY = "pendingCheckpointState"; private static final String PARENT_SHARD_ID_KEY = "parentShardId"; private static final String CHILD_SHARD_IDS_KEY = "childShardIds"; + private static final String HASH_KEY_RANGE = "hashKeyRange"; @Override public Map toDynamoRecord(final Lease lease) { @@ -85,6 +87,10 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer { result.put(PENDING_CHECKPOINT_STATE_KEY, DynamoUtils.createAttributeValue(lease.checkpoint().subSequenceNumber())); } + if(lease.hashKeyRangeForLease() != null) { + result.put(HASH_KEY_RANGE, DynamoUtils.createAttributeValue(lease.hashKeyRangeForLease().serialize())); + } + return result; } @@ -119,6 +125,11 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer { leaseToUpdate.pendingCheckpointState(DynamoUtils.safeGetByteArray(dynamoRecord, PENDING_CHECKPOINT_STATE_KEY)); + final String hashKeyRange; + if (!Strings.isNullOrEmpty(hashKeyRange = DynamoUtils.safeGetString(dynamoRecord, HASH_KEY_RANGE))) { + leaseToUpdate.hashKeyRange(HashKeyRangeForLease.deserialize(hashKeyRange)); + } + return leaseToUpdate; } @@ -246,6 +257,10 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer { result.put(CHILD_SHARD_IDS_KEY, putUpdate(DynamoUtils.createAttributeValue(lease.childShardIds()))); } + if(lease.hashKeyRangeForLease() != null) { + result.put(HASH_KEY_RANGE, putUpdate(DynamoUtils.createAttributeValue(lease.hashKeyRangeForLease().serialize()))); + } + return result; } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java index cbf7add7..06433c1c 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java @@ -57,6 +57,7 @@ import software.amazon.awssdk.services.kinesis.model.SequenceNumberRange; import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.awssdk.services.kinesis.model.ShardFilter; import software.amazon.awssdk.services.kinesis.model.ShardFilterType; +import software.amazon.kinesis.common.HashKeyRangeForLease; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.StreamIdentifier; @@ -969,7 +970,7 @@ public class HierarchicalShardSyncerTest { parentShardIds.add(shard.adjacentParentShardId()); } return new Lease(shard.shardId(), leaseOwner, 0L, UUID.randomUUID(), 0L, checkpoint, null, 0L, - parentShardIds, new HashSet<>(), null); + parentShardIds, new HashSet<>(), null, HashKeyRangeForLease.fromHashKeyRange(shard.hashKeyRange())); }).collect(Collectors.toList()); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseBuilder.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseBuilder.java index cf06f586..8f825875 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseBuilder.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseBuilder.java @@ -20,6 +20,7 @@ import java.util.UUID; import lombok.Setter; import lombok.experimental.Accessors; +import software.amazon.kinesis.common.HashKeyRangeForLease; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; @Setter @@ -36,9 +37,11 @@ public class LeaseBuilder { private Set parentShardIds = new HashSet<>(); private Set childShardIds = new HashSet<>(); private byte[] pendingCheckpointState; + private HashKeyRangeForLease hashKeyRangeForLease; public Lease build() { return new Lease(leaseKey, leaseOwner, leaseCounter, concurrencyToken, lastCounterIncrementNanos, checkpoint, - pendingCheckpoint, ownerSwitchesSinceCheckpoint, parentShardIds, childShardIds, pendingCheckpointState); + pendingCheckpoint, ownerSwitchesSinceCheckpoint, parentShardIds, childShardIds, + pendingCheckpointState, hashKeyRangeForLease); } } \ No newline at end of file diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardObjectHelper.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardObjectHelper.java index f07a38f4..ee2504d8 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardObjectHelper.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardObjectHelper.java @@ -68,7 +68,7 @@ public class ShardObjectHelper { String parentShardId, String adjacentParentShardId, SequenceNumberRange sequenceNumberRange) { - return newShard(shardId, parentShardId, adjacentParentShardId, sequenceNumberRange, null); + return newShard(shardId, parentShardId, adjacentParentShardId, sequenceNumberRange, HashKeyRange.builder().startingHashKey("1").endingHashKey("100").build()); } /** Helper method to create a new shard object. diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerTest.java index 1daf85b8..364a91be 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerTest.java @@ -37,6 +37,7 @@ import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; +import software.amazon.kinesis.common.HashKeyRangeForLease; import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.exceptions.DependencyException; @@ -55,7 +56,8 @@ public class DynamoDBLeaseRenewerTest { private LeaseRefresher leaseRefresher; private static Lease newLease(String leaseKey) { - return new Lease(leaseKey, "LeaseOwner", 0L, UUID.randomUUID(), System.nanoTime(), null, null, null, new HashSet<>(), new HashSet<>(), null); + return new Lease(leaseKey, "LeaseOwner", 0L, UUID.randomUUID(), System.nanoTime(), null, null, null, + new HashSet<>(), new HashSet<>(), null, new HashKeyRangeForLease("1", "2")); } @Before From 07c9529c149bd2d598da73e647742435a9c230bc Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Tue, 28 Apr 2020 15:58:50 -0700 Subject: [PATCH 2/5] doc comments --- .../kinesis/common/HashKeyRangeForLease.java | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/HashKeyRangeForLease.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/HashKeyRangeForLease.java index 1d4529ff..2d45b130 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/HashKeyRangeForLease.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/HashKeyRangeForLease.java @@ -23,6 +23,9 @@ import software.amazon.awssdk.services.kinesis.model.HashKeyRange; @Data @Accessors(fluent = true) +/** + * Lease POJO to hold the starting hashkey range and ending hashkey range of kinesis shards. + */ public class HashKeyRangeForLease { private static final String DELIM = ":"; @@ -30,6 +33,10 @@ public class HashKeyRangeForLease { private final String startingHashKey; private final String endingHashKey; + /** + * Serialize the HashKeyRangeForLease for persisting in external storage + * @return Serialized string + */ public String serialize() { return Joiner.on(DELIM).join(startingHashKey, endingHashKey); } @@ -39,6 +46,11 @@ public class HashKeyRangeForLease { return serialize(); } + /** + * Deserialize from serialized hashKeyRange string from external storage. + * @param hashKeyRange + * @return HashKeyRangeForLease + */ public static HashKeyRangeForLease deserialize(String hashKeyRange) { final String[] hashKeyTokens = hashKeyRange.split(DELIM); Validate.isTrue(hashKeyTokens.length == 2, "HashKeyRange should have exactly two tokens"); @@ -48,6 +60,11 @@ public class HashKeyRangeForLease { return new HashKeyRangeForLease(hashKeyTokens[0], hashKeyTokens[1]); } + /** + * Construct HashKeyRangeForLease from Kinesis HashKeyRange + * @param hashKeyRange + * @return HashKeyRangeForLease + */ public static HashKeyRangeForLease fromHashKeyRange(HashKeyRange hashKeyRange) { return new HashKeyRangeForLease(hashKeyRange.startingHashKey(), hashKeyRange.endingHashKey()); } From bdec3bd66d44d4e0517b8ed5562b444a3c71dd2e Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Wed, 29 Apr 2020 15:23:14 -0700 Subject: [PATCH 3/5] Fixing comparison bug and addressing review comments --- .../kinesis/common/HashKeyRangeForLease.java | 54 ++++++++++--------- .../software/amazon/kinesis/leases/Lease.java | 8 ++- .../dynamodb/DynamoDBLeaseSerializer.java | 16 +++--- .../leases/HierarchicalShardSyncerTest.java | 26 +++++++-- .../dynamodb/DynamoDBLeaseRenewerTest.java | 2 +- 5 files changed, 67 insertions(+), 39 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/HashKeyRangeForLease.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/HashKeyRangeForLease.java index 2d45b130..063451a0 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/HashKeyRangeForLease.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/HashKeyRangeForLease.java @@ -15,57 +15,63 @@ package software.amazon.kinesis.common; -import com.google.common.base.Joiner; -import lombok.Data; +import lombok.NonNull; +import lombok.Value; import lombok.experimental.Accessors; import org.apache.commons.lang3.Validate; import software.amazon.awssdk.services.kinesis.model.HashKeyRange; -@Data -@Accessors(fluent = true) +import java.math.BigInteger; + +@Value @Accessors(fluent = true) /** * Lease POJO to hold the starting hashkey range and ending hashkey range of kinesis shards. */ public class HashKeyRangeForLease { - private static final String DELIM = ":"; - - private final String startingHashKey; - private final String endingHashKey; + private final BigInteger startingHashKey; + private final BigInteger endingHashKey; /** - * Serialize the HashKeyRangeForLease for persisting in external storage - * @return Serialized string + * Serialize the startingHashKey for persisting in external storage + * + * @return Serialized startingHashKey */ - public String serialize() { - return Joiner.on(DELIM).join(startingHashKey, endingHashKey); + public String serializedStartingHashKey() { + return startingHashKey.toString(); } - @Override - public String toString() { - return serialize(); + /** + * Serialize the endingHashKey for persisting in external storage + * + * @return Serialized endingHashKey + */ + public String serializedEndingHashKey() { + return endingHashKey.toString(); } /** * Deserialize from serialized hashKeyRange string from external storage. - * @param hashKeyRange + * + * @param startingHashKeyStr + * @param endingHashKeyStr * @return HashKeyRangeForLease */ - public static HashKeyRangeForLease deserialize(String hashKeyRange) { - final String[] hashKeyTokens = hashKeyRange.split(DELIM); - Validate.isTrue(hashKeyTokens.length == 2, "HashKeyRange should have exactly two tokens"); - // Assuming that startingHashKey and endingHashRange are not same. - Validate.isTrue(!hashKeyTokens[0].equals(hashKeyTokens[1]), "StartingHashKey and EndingHashKey should not be same"); - Validate.isTrue(hashKeyTokens[0].compareTo(hashKeyTokens[1]) < 0, "StartingHashKey must be less than EndingHashKey"); - return new HashKeyRangeForLease(hashKeyTokens[0], hashKeyTokens[1]); + public static HashKeyRangeForLease deserialize(@NonNull String startingHashKeyStr, @NonNull String endingHashKeyStr) { + final BigInteger startingHashKey = new BigInteger(startingHashKeyStr); + final BigInteger endingHashKey = new BigInteger(endingHashKeyStr); + Validate.isTrue(startingHashKey.compareTo(endingHashKey) < 0, + "StartingHashKey %s must be less than EndingHashKey %s ", startingHashKeyStr, endingHashKeyStr); + return new HashKeyRangeForLease(startingHashKey, endingHashKey); } /** * Construct HashKeyRangeForLease from Kinesis HashKeyRange + * * @param hashKeyRange * @return HashKeyRangeForLease */ public static HashKeyRangeForLease fromHashKeyRange(HashKeyRange hashKeyRange) { - return new HashKeyRangeForLease(hashKeyRange.startingHashKey(), hashKeyRange.endingHashKey()); + return deserialize(hashKeyRange.startingHashKey(), hashKeyRange.endingHashKey()); } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java index c21e2203..8ffd3bda 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java @@ -139,9 +139,7 @@ public class Lease { if (childShardIds != null) { this.childShardIds.addAll(childShardIds); } - if (hashKeyRangeForLease != null) { - this.hashKeyRangeForLease = hashKeyRangeForLease; - } + this.hashKeyRangeForLease = hashKeyRangeForLease; this.pendingCheckpointState = pendingCheckpointState; } @@ -286,9 +284,9 @@ public class Lease { * @param hashKeyRangeForLease */ public void hashKeyRange(final HashKeyRangeForLease hashKeyRangeForLease) { - if(this.hashKeyRangeForLease == null) { + if (this.hashKeyRangeForLease == null) { this.hashKeyRangeForLease = hashKeyRangeForLease; - } else { + } else if (!this.hashKeyRangeForLease.equals(hashKeyRangeForLease)) { throw new IllegalArgumentException("hashKeyRange is immutable"); } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java index 61035be8..8f293881 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java @@ -55,7 +55,8 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer { private static final String PENDING_CHECKPOINT_STATE_KEY = "pendingCheckpointState"; private static final String PARENT_SHARD_ID_KEY = "parentShardId"; private static final String CHILD_SHARD_IDS_KEY = "childShardIds"; - private static final String HASH_KEY_RANGE = "hashKeyRange"; + private static final String STARTING_HASH_KEY = "startingHashKey"; + private static final String ENDING_HASH_KEY = "endingHashKey"; @Override public Map toDynamoRecord(final Lease lease) { @@ -88,7 +89,8 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer { } if(lease.hashKeyRangeForLease() != null) { - result.put(HASH_KEY_RANGE, DynamoUtils.createAttributeValue(lease.hashKeyRangeForLease().serialize())); + result.put(STARTING_HASH_KEY, DynamoUtils.createAttributeValue(lease.hashKeyRangeForLease().serializedStartingHashKey())); + result.put(ENDING_HASH_KEY, DynamoUtils.createAttributeValue(lease.hashKeyRangeForLease().serializedEndingHashKey())); } return result; @@ -125,9 +127,10 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer { leaseToUpdate.pendingCheckpointState(DynamoUtils.safeGetByteArray(dynamoRecord, PENDING_CHECKPOINT_STATE_KEY)); - final String hashKeyRange; - if (!Strings.isNullOrEmpty(hashKeyRange = DynamoUtils.safeGetString(dynamoRecord, HASH_KEY_RANGE))) { - leaseToUpdate.hashKeyRange(HashKeyRangeForLease.deserialize(hashKeyRange)); + final String startingHashKey, endingHashKey; + if (!Strings.isNullOrEmpty(startingHashKey = DynamoUtils.safeGetString(dynamoRecord, STARTING_HASH_KEY)) + && !Strings.isNullOrEmpty(endingHashKey = DynamoUtils.safeGetString(dynamoRecord, ENDING_HASH_KEY))) { + leaseToUpdate.hashKeyRange(HashKeyRangeForLease.deserialize(startingHashKey, endingHashKey)); } return leaseToUpdate; @@ -258,7 +261,8 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer { } if(lease.hashKeyRangeForLease() != null) { - result.put(HASH_KEY_RANGE, putUpdate(DynamoUtils.createAttributeValue(lease.hashKeyRangeForLease().serialize()))); + result.put(STARTING_HASH_KEY, putUpdate(DynamoUtils.createAttributeValue(lease.hashKeyRangeForLease().serializedStartingHashKey()))); + result.put(ENDING_HASH_KEY, putUpdate(DynamoUtils.createAttributeValue(lease.hashKeyRangeForLease().serializedEndingHashKey()))); } return result; diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java index 06433c1c..acfa3c51 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java @@ -20,6 +20,7 @@ package software.amazon.kinesis.leases; import static org.hamcrest.CoreMatchers.equalTo; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doNothing; @@ -40,11 +41,13 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.Validate; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -120,7 +123,6 @@ public class HierarchicalShardSyncerTest { final List shards = Collections.emptyList(); final List leases = Collections.emptyList(); final HierarchicalShardSyncer.LeaseSynchronizer emptyLeaseTableSynchronizer = new HierarchicalShardSyncer.EmptyLeaseTableSynchronizer(); - assertThat(HierarchicalShardSyncer.determineNewLeasesToCreate(emptyLeaseTableSynchronizer, shards, leases, INITIAL_POSITION_LATEST).isEmpty(), equalTo(true)); } @@ -154,6 +156,8 @@ public class HierarchicalShardSyncerTest { final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(emptyLeaseTableSynchronizer, shards, currentLeases, INITIAL_POSITION_LATEST); + validateHashRangeinLease(newLeases); + final Set newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set expectedLeaseShardIds = new HashSet<>(Arrays.asList(shardId0, shardId1)); @@ -177,6 +181,7 @@ public class HierarchicalShardSyncerTest { final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(emptyLeaseTableSynchronizer, shards, currentLeases, INITIAL_POSITION_LATEST, new HashSet<>(), MULTI_STREAM_ARGS); + validateHashRangeinLease(newLeases); final Set newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set expectedLeaseIds = new HashSet<>( toMultiStreamLeaseList(Arrays.asList(shardId0, shardId1))); @@ -213,6 +218,7 @@ public class HierarchicalShardSyncerTest { final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, INITIAL_POSITION_LATEST, inconsistentShardIds); + validateHashRangeinLease(newLeases); final Set newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set expectedLeaseShardIds = new HashSet<>(Arrays.asList(shardId0, shardId1)); assertThat(newLeases.size(), equalTo(expectedLeaseShardIds.size())); @@ -248,12 +254,22 @@ public class HierarchicalShardSyncerTest { final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, INITIAL_POSITION_LATEST, inconsistentShardIds, MULTI_STREAM_ARGS); final Set newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); + validateHashRangeinLease(newLeases); final Set expectedLeaseShardIds = new HashSet<>( toMultiStreamLeaseList(Arrays.asList(shardId0, shardId1))); assertThat(newLeases.size(), equalTo(expectedLeaseShardIds.size())); assertThat(newLeaseKeys, equalTo(expectedLeaseShardIds)); } + private void validateHashRangeinLease(List leases) { + final Consumer leaseValidation = lease -> { + Validate.notNull(lease.hashKeyRangeForLease()); + Validate.isTrue(lease.hashKeyRangeForLease().startingHashKey() + .compareTo(lease.hashKeyRangeForLease().endingHashKey()) < 0); + }; + leases.forEach(lease -> leaseValidation.accept(lease)); + } + /** * Test bootstrapShardLeases() starting at TRIM_HORIZON ("beginning" of stream) */ @@ -295,6 +311,7 @@ public class HierarchicalShardSyncerTest { final Set extendedSequenceNumbers = requestLeases.stream().map(Lease::checkpoint) .collect(Collectors.toSet()); + validateHashRangeinLease(requestLeases); assertThat(requestLeases.size(), equalTo(expectedShardIds.size())); assertThat(requestLeaseKeys, equalTo(expectedShardIds)); assertThat(extendedSequenceNumbers.size(), equalTo(1)); @@ -329,6 +346,7 @@ public class HierarchicalShardSyncerTest { final Set extendedSequenceNumbers = requestLeases.stream().map(Lease::checkpoint) .collect(Collectors.toSet()); + validateHashRangeinLease(requestLeases); assertThat(requestLeases.size(), equalTo(expectedShardIds.size())); assertThat(requestLeaseKeys, equalTo(expectedShardIds)); assertThat(extendedSequenceNumbers.size(), equalTo(1)); @@ -376,6 +394,8 @@ public class HierarchicalShardSyncerTest { assertThat(requestLeaseKeys, equalTo(expectedShardIds)); assertThat(extendedSequenceNumbers.size(), equalTo(1)); + validateHashRangeinLease(requestLeases); + extendedSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST))); verify(shardDetector, never()).listShards(); @@ -412,7 +432,7 @@ public class HierarchicalShardSyncerTest { assertThat(requestLeases.size(), equalTo(expectedShardIds.size())); assertThat(requestLeaseKeys, equalTo(expectedShardIds)); assertThat(extendedSequenceNumbers.size(), equalTo(1)); - + validateHashRangeinLease(requestLeases); extendedSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST))); verify(shardDetector, never()).listShards(); @@ -444,7 +464,7 @@ public class HierarchicalShardSyncerTest { final Set requestLeaseKeys = requestLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set extendedSequenceNumbers = requestLeases.stream().map(Lease::checkpoint) .collect(Collectors.toSet()); - + validateHashRangeinLease(requestLeases); assertThat(requestLeases.size(), equalTo(expectedShardIds.size())); assertThat(extendedSequenceNumbers.size(), equalTo(0)); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerTest.java index 364a91be..bfff4e92 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerTest.java @@ -57,7 +57,7 @@ public class DynamoDBLeaseRenewerTest { private static Lease newLease(String leaseKey) { return new Lease(leaseKey, "LeaseOwner", 0L, UUID.randomUUID(), System.nanoTime(), null, null, null, - new HashSet<>(), new HashSet<>(), null, new HashKeyRangeForLease("1", "2")); + new HashSet<>(), new HashSet<>(), null, HashKeyRangeForLease.deserialize("1", "2")); } @Before From 4a323b8f1cd717b254ab102e4b1d7e21842e30c2 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Wed, 29 Apr 2020 15:56:15 -0700 Subject: [PATCH 4/5] Fixing a metrics bug --- .../src/main/java/software/amazon/kinesis/leases/Lease.java | 3 +-- .../kinesis/leases/dynamodb/DynamoDBLeaseRenewer.java | 6 ++++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java index 8ffd3bda..427b3509 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java @@ -163,7 +163,6 @@ public class Lease { pendingCheckpointState(lease.pendingCheckpointState); parentShardIds(lease.parentShardIds); childShardIds(lease.childShardIds); - hashKeyRange(lease.hashKeyRangeForLease); } /** @@ -283,7 +282,7 @@ public class Lease { * Set the hash range key for this shard. * @param hashKeyRangeForLease */ - public void hashKeyRange(final HashKeyRangeForLease hashKeyRangeForLease) { + public void hashKeyRange(HashKeyRangeForLease hashKeyRangeForLease) { if (this.hashKeyRangeForLease == null) { this.hashKeyRangeForLease = hashKeyRangeForLease; } else if (!this.hashKeyRangeForLease.equals(hashKeyRangeForLease)) { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewer.java index a1e0afcc..544de8cd 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewer.java @@ -271,7 +271,7 @@ public class DynamoDBLeaseRenewer implements LeaseRenewer { * {@inheritDoc} */ @Override - public boolean updateLease(Lease lease, UUID concurrencyToken, @NonNull String operation, String shardId) + public boolean updateLease(Lease lease, UUID concurrencyToken, @NonNull String operation, String shardId) throws DependencyException, InvalidStateException, ProvisionedThroughputException { verifyNotNull(lease, "lease cannot be null"); verifyNotNull(lease.leaseKey(), "leaseKey cannot be null"); @@ -302,8 +302,10 @@ public class DynamoDBLeaseRenewer implements LeaseRenewer { if(lease instanceof MultiStreamLease) { MetricsUtil.addStreamId(scope, StreamIdentifier.multiStreamInstance(((MultiStreamLease) lease).streamIdentifier())); + MetricsUtil.addShardId(scope, ((MultiStreamLease) lease).shardId()); + } else { + MetricsUtil.addShardId(scope, shardId); } - MetricsUtil.addShardId(scope, shardId); } long startTime = System.currentTimeMillis(); From 4ce0591e79cca64c3745642eab56165ab2604611 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Wed, 29 Apr 2020 15:59:53 -0700 Subject: [PATCH 5/5] Adding hashkeyRange back --- .../src/main/java/software/amazon/kinesis/leases/Lease.java | 1 + 1 file changed, 1 insertion(+) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java index 427b3509..3df5097e 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java @@ -163,6 +163,7 @@ public class Lease { pendingCheckpointState(lease.pendingCheckpointState); parentShardIds(lease.parentShardIds); childShardIds(lease.childShardIds); + hashKeyRange(lease.hashKeyRangeForLease); } /**