diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/HashKeyRangeForLease.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/HashKeyRangeForLease.java new file mode 100644 index 00000000..1d4529ff --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/HashKeyRangeForLease.java @@ -0,0 +1,54 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package software.amazon.kinesis.common; + +import com.google.common.base.Joiner; +import lombok.Data; +import lombok.experimental.Accessors; +import org.apache.commons.lang3.Validate; +import software.amazon.awssdk.services.kinesis.model.HashKeyRange; + +@Data +@Accessors(fluent = true) +public class HashKeyRangeForLease { + + private static final String DELIM = ":"; + + private final String startingHashKey; + private final String endingHashKey; + + public String serialize() { + return Joiner.on(DELIM).join(startingHashKey, endingHashKey); + } + + @Override + public String toString() { + return serialize(); + } + + public static HashKeyRangeForLease deserialize(String hashKeyRange) { + final String[] hashKeyTokens = hashKeyRange.split(DELIM); + Validate.isTrue(hashKeyTokens.length == 2, "HashKeyRange should have exactly two tokens"); + // Assuming that startingHashKey and endingHashRange are not same. + Validate.isTrue(!hashKeyTokens[0].equals(hashKeyTokens[1]), "StartingHashKey and EndingHashKey should not be same"); + Validate.isTrue(hashKeyTokens[0].compareTo(hashKeyTokens[1]) < 0, "StartingHashKey must be less than EndingHashKey"); + return new HashKeyRangeForLease(hashKeyTokens[0], hashKeyTokens[1]); + } + + public static HashKeyRangeForLease fromHashKeyRange(HashKeyRange hashKeyRange) { + return new HashKeyRangeForLease(hashKeyRange.startingHashKey(), hashKeyRange.endingHashKey()); + } +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java index debb89bb..714e7f4e 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java @@ -45,6 +45,7 @@ import software.amazon.awssdk.services.kinesis.model.ShardFilter; import software.amazon.awssdk.services.kinesis.model.ShardFilterType; import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.kinesis.annotations.KinesisClientInternalApi; +import software.amazon.kinesis.common.HashKeyRangeForLease; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.StreamIdentifier; @@ -57,6 +58,8 @@ import software.amazon.kinesis.metrics.MetricsScope; import software.amazon.kinesis.metrics.MetricsUtil; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; +import static software.amazon.kinesis.common.HashKeyRangeForLease.fromHashKeyRange; + /** * Helper class to sync leases with shards of the Kinesis stream. * It will create new leases/activities when it discovers new Kinesis shards (bootstrap/resharding). @@ -757,6 +760,7 @@ public class HierarchicalShardSyncer { } newLease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); newLease.ownerSwitchesSinceCheckpoint(0L); + newLease.hashKeyRange(fromHashKeyRange(childShard.hashKeyRange())); return newLease; } @@ -772,6 +776,7 @@ public class HierarchicalShardSyncer { newLease.ownerSwitchesSinceCheckpoint(0L); newLease.streamIdentifier(streamIdentifier.serialize()); newLease.shardId(childShard.shardId()); + newLease.hashKeyRange(fromHashKeyRange(childShard.hashKeyRange())); return newLease; } @@ -794,7 +799,7 @@ public class HierarchicalShardSyncer { } newLease.parentShardIds(parentShardIds); newLease.ownerSwitchesSinceCheckpoint(0L); - + newLease.hashKeyRange(fromHashKeyRange(shard.hashKeyRange())); return newLease; } @@ -812,6 +817,7 @@ public class HierarchicalShardSyncer { newLease.ownerSwitchesSinceCheckpoint(0L); newLease.streamIdentifier(streamIdentifier.serialize()); newLease.shardId(shard.shardId()); + newLease.hashKeyRange(fromHashKeyRange(shard.hashKeyRange())); return newLease; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java index 2d0ce8c2..c21e2203 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java @@ -20,6 +20,7 @@ import lombok.NoArgsConstructor; import lombok.NonNull; import lombok.ToString; import lombok.experimental.Accessors; +import software.amazon.kinesis.common.HashKeyRangeForLease; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import java.util.Collection; @@ -96,6 +97,7 @@ public class Lease { private Long ownerSwitchesSinceCheckpoint = 0L; private Set parentShardIds = new HashSet<>(); private Set childShardIds = new HashSet<>(); + private HashKeyRangeForLease hashKeyRangeForLease; /** * Copy constructor, used by clone(). @@ -105,7 +107,8 @@ public class Lease { protected Lease(Lease lease) { this(lease.leaseKey(), lease.leaseOwner(), lease.leaseCounter(), lease.concurrencyToken(), lease.lastCounterIncrementNanos(), lease.checkpoint(), lease.pendingCheckpoint(), - lease.ownerSwitchesSinceCheckpoint(), lease.parentShardIds(), lease.childShardIds(), lease.pendingCheckpointState()); + lease.ownerSwitchesSinceCheckpoint(), lease.parentShardIds(), lease.childShardIds(), + lease.pendingCheckpointState(), lease.hashKeyRangeForLease()); } @Deprecated @@ -114,14 +117,14 @@ public class Lease { final ExtendedSequenceNumber checkpoint, final ExtendedSequenceNumber pendingCheckpoint, final Long ownerSwitchesSinceCheckpoint, final Set parentShardIds) { this(leaseKey, leaseOwner, leaseCounter, concurrencyToken, lastCounterIncrementNanos, checkpoint, pendingCheckpoint, - ownerSwitchesSinceCheckpoint, parentShardIds, new HashSet<>(), null); + ownerSwitchesSinceCheckpoint, parentShardIds, new HashSet<>(), null, null); } public Lease(final String leaseKey, final String leaseOwner, final Long leaseCounter, final UUID concurrencyToken, final Long lastCounterIncrementNanos, final ExtendedSequenceNumber checkpoint, final ExtendedSequenceNumber pendingCheckpoint, final Long ownerSwitchesSinceCheckpoint, final Set parentShardIds, final Set childShardIds, - final byte[] pendingCheckpointState) { + final byte[] pendingCheckpointState, final HashKeyRangeForLease hashKeyRangeForLease) { this.leaseKey = leaseKey; this.leaseOwner = leaseOwner; this.leaseCounter = leaseCounter; @@ -136,6 +139,9 @@ public class Lease { if (childShardIds != null) { this.childShardIds.addAll(childShardIds); } + if (hashKeyRangeForLease != null) { + this.hashKeyRangeForLease = hashKeyRangeForLease; + } this.pendingCheckpointState = pendingCheckpointState; } @@ -158,7 +164,8 @@ public class Lease { pendingCheckpoint(lease.pendingCheckpoint); pendingCheckpointState(lease.pendingCheckpointState); parentShardIds(lease.parentShardIds); - childShardIds(lease.childShardIds()); + childShardIds(lease.childShardIds); + hashKeyRange(lease.hashKeyRangeForLease); } /** @@ -274,6 +281,18 @@ public class Lease { this.childShardIds.addAll(childShardIds); } + /** + * Set the hash range key for this shard. + * @param hashKeyRangeForLease + */ + public void hashKeyRange(final HashKeyRangeForLease hashKeyRangeForLease) { + if(this.hashKeyRangeForLease == null) { + this.hashKeyRangeForLease = hashKeyRangeForLease; + } else { + throw new IllegalArgumentException("hashKeyRange is immutable"); + } + } + /** * Sets leaseOwner. * diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java index 85632500..61035be8 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java @@ -32,6 +32,7 @@ import software.amazon.awssdk.services.dynamodb.model.KeyType; import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType; import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.kinesis.annotations.KinesisClientInternalApi; +import software.amazon.kinesis.common.HashKeyRangeForLease; import software.amazon.kinesis.leases.DynamoUtils; import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.LeaseSerializer; @@ -54,6 +55,7 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer { private static final String PENDING_CHECKPOINT_STATE_KEY = "pendingCheckpointState"; private static final String PARENT_SHARD_ID_KEY = "parentShardId"; private static final String CHILD_SHARD_IDS_KEY = "childShardIds"; + private static final String HASH_KEY_RANGE = "hashKeyRange"; @Override public Map toDynamoRecord(final Lease lease) { @@ -85,6 +87,10 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer { result.put(PENDING_CHECKPOINT_STATE_KEY, DynamoUtils.createAttributeValue(lease.checkpoint().subSequenceNumber())); } + if(lease.hashKeyRangeForLease() != null) { + result.put(HASH_KEY_RANGE, DynamoUtils.createAttributeValue(lease.hashKeyRangeForLease().serialize())); + } + return result; } @@ -119,6 +125,11 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer { leaseToUpdate.pendingCheckpointState(DynamoUtils.safeGetByteArray(dynamoRecord, PENDING_CHECKPOINT_STATE_KEY)); + final String hashKeyRange; + if (!Strings.isNullOrEmpty(hashKeyRange = DynamoUtils.safeGetString(dynamoRecord, HASH_KEY_RANGE))) { + leaseToUpdate.hashKeyRange(HashKeyRangeForLease.deserialize(hashKeyRange)); + } + return leaseToUpdate; } @@ -246,6 +257,10 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer { result.put(CHILD_SHARD_IDS_KEY, putUpdate(DynamoUtils.createAttributeValue(lease.childShardIds()))); } + if(lease.hashKeyRangeForLease() != null) { + result.put(HASH_KEY_RANGE, putUpdate(DynamoUtils.createAttributeValue(lease.hashKeyRangeForLease().serialize()))); + } + return result; } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java index cbf7add7..06433c1c 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java @@ -57,6 +57,7 @@ import software.amazon.awssdk.services.kinesis.model.SequenceNumberRange; import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.awssdk.services.kinesis.model.ShardFilter; import software.amazon.awssdk.services.kinesis.model.ShardFilterType; +import software.amazon.kinesis.common.HashKeyRangeForLease; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.StreamIdentifier; @@ -969,7 +970,7 @@ public class HierarchicalShardSyncerTest { parentShardIds.add(shard.adjacentParentShardId()); } return new Lease(shard.shardId(), leaseOwner, 0L, UUID.randomUUID(), 0L, checkpoint, null, 0L, - parentShardIds, new HashSet<>(), null); + parentShardIds, new HashSet<>(), null, HashKeyRangeForLease.fromHashKeyRange(shard.hashKeyRange())); }).collect(Collectors.toList()); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseBuilder.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseBuilder.java index cf06f586..8f825875 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseBuilder.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseBuilder.java @@ -20,6 +20,7 @@ import java.util.UUID; import lombok.Setter; import lombok.experimental.Accessors; +import software.amazon.kinesis.common.HashKeyRangeForLease; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; @Setter @@ -36,9 +37,11 @@ public class LeaseBuilder { private Set parentShardIds = new HashSet<>(); private Set childShardIds = new HashSet<>(); private byte[] pendingCheckpointState; + private HashKeyRangeForLease hashKeyRangeForLease; public Lease build() { return new Lease(leaseKey, leaseOwner, leaseCounter, concurrencyToken, lastCounterIncrementNanos, checkpoint, - pendingCheckpoint, ownerSwitchesSinceCheckpoint, parentShardIds, childShardIds, pendingCheckpointState); + pendingCheckpoint, ownerSwitchesSinceCheckpoint, parentShardIds, childShardIds, + pendingCheckpointState, hashKeyRangeForLease); } } \ No newline at end of file diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardObjectHelper.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardObjectHelper.java index f07a38f4..ee2504d8 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardObjectHelper.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardObjectHelper.java @@ -68,7 +68,7 @@ public class ShardObjectHelper { String parentShardId, String adjacentParentShardId, SequenceNumberRange sequenceNumberRange) { - return newShard(shardId, parentShardId, adjacentParentShardId, sequenceNumberRange, null); + return newShard(shardId, parentShardId, adjacentParentShardId, sequenceNumberRange, HashKeyRange.builder().startingHashKey("1").endingHashKey("100").build()); } /** Helper method to create a new shard object. diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerTest.java index 1daf85b8..364a91be 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerTest.java @@ -37,6 +37,7 @@ import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; +import software.amazon.kinesis.common.HashKeyRangeForLease; import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.exceptions.DependencyException; @@ -55,7 +56,8 @@ public class DynamoDBLeaseRenewerTest { private LeaseRefresher leaseRefresher; private static Lease newLease(String leaseKey) { - return new Lease(leaseKey, "LeaseOwner", 0L, UUID.randomUUID(), System.nanoTime(), null, null, null, new HashSet<>(), new HashSet<>(), null); + return new Lease(leaseKey, "LeaseOwner", 0L, UUID.randomUUID(), System.nanoTime(), null, null, null, + new HashSet<>(), new HashSet<>(), null, new HashKeyRangeForLease("1", "2")); } @Before