From bdec3bd66d44d4e0517b8ed5562b444a3c71dd2e Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Wed, 29 Apr 2020 15:23:14 -0700 Subject: [PATCH] Fixing comparison bug and addressing review comments --- .../kinesis/common/HashKeyRangeForLease.java | 54 ++++++++++--------- .../software/amazon/kinesis/leases/Lease.java | 8 ++- .../dynamodb/DynamoDBLeaseSerializer.java | 16 +++--- .../leases/HierarchicalShardSyncerTest.java | 26 +++++++-- .../dynamodb/DynamoDBLeaseRenewerTest.java | 2 +- 5 files changed, 67 insertions(+), 39 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/HashKeyRangeForLease.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/HashKeyRangeForLease.java index 2d45b130..063451a0 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/HashKeyRangeForLease.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/HashKeyRangeForLease.java @@ -15,57 +15,63 @@ package software.amazon.kinesis.common; -import com.google.common.base.Joiner; -import lombok.Data; +import lombok.NonNull; +import lombok.Value; import lombok.experimental.Accessors; import org.apache.commons.lang3.Validate; import software.amazon.awssdk.services.kinesis.model.HashKeyRange; -@Data -@Accessors(fluent = true) +import java.math.BigInteger; + +@Value @Accessors(fluent = true) /** * Lease POJO to hold the starting hashkey range and ending hashkey range of kinesis shards. */ public class HashKeyRangeForLease { - private static final String DELIM = ":"; - - private final String startingHashKey; - private final String endingHashKey; + private final BigInteger startingHashKey; + private final BigInteger endingHashKey; /** - * Serialize the HashKeyRangeForLease for persisting in external storage - * @return Serialized string + * Serialize the startingHashKey for persisting in external storage + * + * @return Serialized startingHashKey */ - public String serialize() { - return Joiner.on(DELIM).join(startingHashKey, endingHashKey); + public String serializedStartingHashKey() { + return startingHashKey.toString(); } - @Override - public String toString() { - return serialize(); + /** + * Serialize the endingHashKey for persisting in external storage + * + * @return Serialized endingHashKey + */ + public String serializedEndingHashKey() { + return endingHashKey.toString(); } /** * Deserialize from serialized hashKeyRange string from external storage. - * @param hashKeyRange + * + * @param startingHashKeyStr + * @param endingHashKeyStr * @return HashKeyRangeForLease */ - public static HashKeyRangeForLease deserialize(String hashKeyRange) { - final String[] hashKeyTokens = hashKeyRange.split(DELIM); - Validate.isTrue(hashKeyTokens.length == 2, "HashKeyRange should have exactly two tokens"); - // Assuming that startingHashKey and endingHashRange are not same. - Validate.isTrue(!hashKeyTokens[0].equals(hashKeyTokens[1]), "StartingHashKey and EndingHashKey should not be same"); - Validate.isTrue(hashKeyTokens[0].compareTo(hashKeyTokens[1]) < 0, "StartingHashKey must be less than EndingHashKey"); - return new HashKeyRangeForLease(hashKeyTokens[0], hashKeyTokens[1]); + public static HashKeyRangeForLease deserialize(@NonNull String startingHashKeyStr, @NonNull String endingHashKeyStr) { + final BigInteger startingHashKey = new BigInteger(startingHashKeyStr); + final BigInteger endingHashKey = new BigInteger(endingHashKeyStr); + Validate.isTrue(startingHashKey.compareTo(endingHashKey) < 0, + "StartingHashKey %s must be less than EndingHashKey %s ", startingHashKeyStr, endingHashKeyStr); + return new HashKeyRangeForLease(startingHashKey, endingHashKey); } /** * Construct HashKeyRangeForLease from Kinesis HashKeyRange + * * @param hashKeyRange * @return HashKeyRangeForLease */ public static HashKeyRangeForLease fromHashKeyRange(HashKeyRange hashKeyRange) { - return new HashKeyRangeForLease(hashKeyRange.startingHashKey(), hashKeyRange.endingHashKey()); + return deserialize(hashKeyRange.startingHashKey(), hashKeyRange.endingHashKey()); } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java index c21e2203..8ffd3bda 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java @@ -139,9 +139,7 @@ public class Lease { if (childShardIds != null) { this.childShardIds.addAll(childShardIds); } - if (hashKeyRangeForLease != null) { - this.hashKeyRangeForLease = hashKeyRangeForLease; - } + this.hashKeyRangeForLease = hashKeyRangeForLease; this.pendingCheckpointState = pendingCheckpointState; } @@ -286,9 +284,9 @@ public class Lease { * @param hashKeyRangeForLease */ public void hashKeyRange(final HashKeyRangeForLease hashKeyRangeForLease) { - if(this.hashKeyRangeForLease == null) { + if (this.hashKeyRangeForLease == null) { this.hashKeyRangeForLease = hashKeyRangeForLease; - } else { + } else if (!this.hashKeyRangeForLease.equals(hashKeyRangeForLease)) { throw new IllegalArgumentException("hashKeyRange is immutable"); } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java index 61035be8..8f293881 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java @@ -55,7 +55,8 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer { private static final String PENDING_CHECKPOINT_STATE_KEY = "pendingCheckpointState"; private static final String PARENT_SHARD_ID_KEY = "parentShardId"; private static final String CHILD_SHARD_IDS_KEY = "childShardIds"; - private static final String HASH_KEY_RANGE = "hashKeyRange"; + private static final String STARTING_HASH_KEY = "startingHashKey"; + private static final String ENDING_HASH_KEY = "endingHashKey"; @Override public Map toDynamoRecord(final Lease lease) { @@ -88,7 +89,8 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer { } if(lease.hashKeyRangeForLease() != null) { - result.put(HASH_KEY_RANGE, DynamoUtils.createAttributeValue(lease.hashKeyRangeForLease().serialize())); + result.put(STARTING_HASH_KEY, DynamoUtils.createAttributeValue(lease.hashKeyRangeForLease().serializedStartingHashKey())); + result.put(ENDING_HASH_KEY, DynamoUtils.createAttributeValue(lease.hashKeyRangeForLease().serializedEndingHashKey())); } return result; @@ -125,9 +127,10 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer { leaseToUpdate.pendingCheckpointState(DynamoUtils.safeGetByteArray(dynamoRecord, PENDING_CHECKPOINT_STATE_KEY)); - final String hashKeyRange; - if (!Strings.isNullOrEmpty(hashKeyRange = DynamoUtils.safeGetString(dynamoRecord, HASH_KEY_RANGE))) { - leaseToUpdate.hashKeyRange(HashKeyRangeForLease.deserialize(hashKeyRange)); + final String startingHashKey, endingHashKey; + if (!Strings.isNullOrEmpty(startingHashKey = DynamoUtils.safeGetString(dynamoRecord, STARTING_HASH_KEY)) + && !Strings.isNullOrEmpty(endingHashKey = DynamoUtils.safeGetString(dynamoRecord, ENDING_HASH_KEY))) { + leaseToUpdate.hashKeyRange(HashKeyRangeForLease.deserialize(startingHashKey, endingHashKey)); } return leaseToUpdate; @@ -258,7 +261,8 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer { } if(lease.hashKeyRangeForLease() != null) { - result.put(HASH_KEY_RANGE, putUpdate(DynamoUtils.createAttributeValue(lease.hashKeyRangeForLease().serialize()))); + result.put(STARTING_HASH_KEY, putUpdate(DynamoUtils.createAttributeValue(lease.hashKeyRangeForLease().serializedStartingHashKey()))); + result.put(ENDING_HASH_KEY, putUpdate(DynamoUtils.createAttributeValue(lease.hashKeyRangeForLease().serializedEndingHashKey()))); } return result; diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java index 06433c1c..acfa3c51 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java @@ -20,6 +20,7 @@ package software.amazon.kinesis.leases; import static org.hamcrest.CoreMatchers.equalTo; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doNothing; @@ -40,11 +41,13 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.Validate; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -120,7 +123,6 @@ public class HierarchicalShardSyncerTest { final List shards = Collections.emptyList(); final List leases = Collections.emptyList(); final HierarchicalShardSyncer.LeaseSynchronizer emptyLeaseTableSynchronizer = new HierarchicalShardSyncer.EmptyLeaseTableSynchronizer(); - assertThat(HierarchicalShardSyncer.determineNewLeasesToCreate(emptyLeaseTableSynchronizer, shards, leases, INITIAL_POSITION_LATEST).isEmpty(), equalTo(true)); } @@ -154,6 +156,8 @@ public class HierarchicalShardSyncerTest { final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(emptyLeaseTableSynchronizer, shards, currentLeases, INITIAL_POSITION_LATEST); + validateHashRangeinLease(newLeases); + final Set newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set expectedLeaseShardIds = new HashSet<>(Arrays.asList(shardId0, shardId1)); @@ -177,6 +181,7 @@ public class HierarchicalShardSyncerTest { final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(emptyLeaseTableSynchronizer, shards, currentLeases, INITIAL_POSITION_LATEST, new HashSet<>(), MULTI_STREAM_ARGS); + validateHashRangeinLease(newLeases); final Set newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set expectedLeaseIds = new HashSet<>( toMultiStreamLeaseList(Arrays.asList(shardId0, shardId1))); @@ -213,6 +218,7 @@ public class HierarchicalShardSyncerTest { final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, INITIAL_POSITION_LATEST, inconsistentShardIds); + validateHashRangeinLease(newLeases); final Set newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set expectedLeaseShardIds = new HashSet<>(Arrays.asList(shardId0, shardId1)); assertThat(newLeases.size(), equalTo(expectedLeaseShardIds.size())); @@ -248,12 +254,22 @@ public class HierarchicalShardSyncerTest { final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, INITIAL_POSITION_LATEST, inconsistentShardIds, MULTI_STREAM_ARGS); final Set newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); + validateHashRangeinLease(newLeases); final Set expectedLeaseShardIds = new HashSet<>( toMultiStreamLeaseList(Arrays.asList(shardId0, shardId1))); assertThat(newLeases.size(), equalTo(expectedLeaseShardIds.size())); assertThat(newLeaseKeys, equalTo(expectedLeaseShardIds)); } + private void validateHashRangeinLease(List leases) { + final Consumer leaseValidation = lease -> { + Validate.notNull(lease.hashKeyRangeForLease()); + Validate.isTrue(lease.hashKeyRangeForLease().startingHashKey() + .compareTo(lease.hashKeyRangeForLease().endingHashKey()) < 0); + }; + leases.forEach(lease -> leaseValidation.accept(lease)); + } + /** * Test bootstrapShardLeases() starting at TRIM_HORIZON ("beginning" of stream) */ @@ -295,6 +311,7 @@ public class HierarchicalShardSyncerTest { final Set extendedSequenceNumbers = requestLeases.stream().map(Lease::checkpoint) .collect(Collectors.toSet()); + validateHashRangeinLease(requestLeases); assertThat(requestLeases.size(), equalTo(expectedShardIds.size())); assertThat(requestLeaseKeys, equalTo(expectedShardIds)); assertThat(extendedSequenceNumbers.size(), equalTo(1)); @@ -329,6 +346,7 @@ public class HierarchicalShardSyncerTest { final Set extendedSequenceNumbers = requestLeases.stream().map(Lease::checkpoint) .collect(Collectors.toSet()); + validateHashRangeinLease(requestLeases); assertThat(requestLeases.size(), equalTo(expectedShardIds.size())); assertThat(requestLeaseKeys, equalTo(expectedShardIds)); assertThat(extendedSequenceNumbers.size(), equalTo(1)); @@ -376,6 +394,8 @@ public class HierarchicalShardSyncerTest { assertThat(requestLeaseKeys, equalTo(expectedShardIds)); assertThat(extendedSequenceNumbers.size(), equalTo(1)); + validateHashRangeinLease(requestLeases); + extendedSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST))); verify(shardDetector, never()).listShards(); @@ -412,7 +432,7 @@ public class HierarchicalShardSyncerTest { assertThat(requestLeases.size(), equalTo(expectedShardIds.size())); assertThat(requestLeaseKeys, equalTo(expectedShardIds)); assertThat(extendedSequenceNumbers.size(), equalTo(1)); - + validateHashRangeinLease(requestLeases); extendedSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST))); verify(shardDetector, never()).listShards(); @@ -444,7 +464,7 @@ public class HierarchicalShardSyncerTest { final Set requestLeaseKeys = requestLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set extendedSequenceNumbers = requestLeases.stream().map(Lease::checkpoint) .collect(Collectors.toSet()); - + validateHashRangeinLease(requestLeases); assertThat(requestLeases.size(), equalTo(expectedShardIds.size())); assertThat(extendedSequenceNumbers.size(), equalTo(0)); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerTest.java index 364a91be..bfff4e92 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerTest.java @@ -57,7 +57,7 @@ public class DynamoDBLeaseRenewerTest { private static Lease newLease(String leaseKey) { return new Lease(leaseKey, "LeaseOwner", 0L, UUID.randomUUID(), System.nanoTime(), null, null, null, - new HashSet<>(), new HashSet<>(), null, new HashKeyRangeForLease("1", "2")); + new HashSet<>(), new HashSet<>(), null, HashKeyRangeForLease.deserialize("1", "2")); } @Before