Persisting hashrange in lease table

This commit is contained in:
Ashwin Giridharan 2020-04-28 15:53:22 -07:00
parent 038524e0b1
commit 4d058ecf51
8 changed files with 109 additions and 9 deletions

View file

@ -0,0 +1,54 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.common;
import com.google.common.base.Joiner;
import lombok.Data;
import lombok.experimental.Accessors;
import org.apache.commons.lang3.Validate;
import software.amazon.awssdk.services.kinesis.model.HashKeyRange;
@Data
@Accessors(fluent = true)
public class HashKeyRangeForLease {
private static final String DELIM = ":";
private final String startingHashKey;
private final String endingHashKey;
public String serialize() {
return Joiner.on(DELIM).join(startingHashKey, endingHashKey);
}
@Override
public String toString() {
return serialize();
}
public static HashKeyRangeForLease deserialize(String hashKeyRange) {
final String[] hashKeyTokens = hashKeyRange.split(DELIM);
Validate.isTrue(hashKeyTokens.length == 2, "HashKeyRange should have exactly two tokens");
// Assuming that startingHashKey and endingHashRange are not same.
Validate.isTrue(!hashKeyTokens[0].equals(hashKeyTokens[1]), "StartingHashKey and EndingHashKey should not be same");
Validate.isTrue(hashKeyTokens[0].compareTo(hashKeyTokens[1]) < 0, "StartingHashKey must be less than EndingHashKey");
return new HashKeyRangeForLease(hashKeyTokens[0], hashKeyTokens[1]);
}
public static HashKeyRangeForLease fromHashKeyRange(HashKeyRange hashKeyRange) {
return new HashKeyRangeForLease(hashKeyRange.startingHashKey(), hashKeyRange.endingHashKey());
}
}

View file

@ -45,6 +45,7 @@ import software.amazon.awssdk.services.kinesis.model.ShardFilter;
import software.amazon.awssdk.services.kinesis.model.ShardFilterType; import software.amazon.awssdk.services.kinesis.model.ShardFilterType;
import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.HashKeyRangeForLease;
import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.common.StreamIdentifier;
@ -57,6 +58,8 @@ import software.amazon.kinesis.metrics.MetricsScope;
import software.amazon.kinesis.metrics.MetricsUtil; import software.amazon.kinesis.metrics.MetricsUtil;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import static software.amazon.kinesis.common.HashKeyRangeForLease.fromHashKeyRange;
/** /**
* Helper class to sync leases with shards of the Kinesis stream. * Helper class to sync leases with shards of the Kinesis stream.
* It will create new leases/activities when it discovers new Kinesis shards (bootstrap/resharding). * It will create new leases/activities when it discovers new Kinesis shards (bootstrap/resharding).
@ -757,6 +760,7 @@ public class HierarchicalShardSyncer {
} }
newLease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); newLease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
newLease.ownerSwitchesSinceCheckpoint(0L); newLease.ownerSwitchesSinceCheckpoint(0L);
newLease.hashKeyRange(fromHashKeyRange(childShard.hashKeyRange()));
return newLease; return newLease;
} }
@ -772,6 +776,7 @@ public class HierarchicalShardSyncer {
newLease.ownerSwitchesSinceCheckpoint(0L); newLease.ownerSwitchesSinceCheckpoint(0L);
newLease.streamIdentifier(streamIdentifier.serialize()); newLease.streamIdentifier(streamIdentifier.serialize());
newLease.shardId(childShard.shardId()); newLease.shardId(childShard.shardId());
newLease.hashKeyRange(fromHashKeyRange(childShard.hashKeyRange()));
return newLease; return newLease;
} }
@ -794,7 +799,7 @@ public class HierarchicalShardSyncer {
} }
newLease.parentShardIds(parentShardIds); newLease.parentShardIds(parentShardIds);
newLease.ownerSwitchesSinceCheckpoint(0L); newLease.ownerSwitchesSinceCheckpoint(0L);
newLease.hashKeyRange(fromHashKeyRange(shard.hashKeyRange()));
return newLease; return newLease;
} }
@ -812,6 +817,7 @@ public class HierarchicalShardSyncer {
newLease.ownerSwitchesSinceCheckpoint(0L); newLease.ownerSwitchesSinceCheckpoint(0L);
newLease.streamIdentifier(streamIdentifier.serialize()); newLease.streamIdentifier(streamIdentifier.serialize());
newLease.shardId(shard.shardId()); newLease.shardId(shard.shardId());
newLease.hashKeyRange(fromHashKeyRange(shard.hashKeyRange()));
return newLease; return newLease;
} }

View file

@ -20,6 +20,7 @@ import lombok.NoArgsConstructor;
import lombok.NonNull; import lombok.NonNull;
import lombok.ToString; import lombok.ToString;
import lombok.experimental.Accessors; import lombok.experimental.Accessors;
import software.amazon.kinesis.common.HashKeyRangeForLease;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import java.util.Collection; import java.util.Collection;
@ -96,6 +97,7 @@ public class Lease {
private Long ownerSwitchesSinceCheckpoint = 0L; private Long ownerSwitchesSinceCheckpoint = 0L;
private Set<String> parentShardIds = new HashSet<>(); private Set<String> parentShardIds = new HashSet<>();
private Set<String> childShardIds = new HashSet<>(); private Set<String> childShardIds = new HashSet<>();
private HashKeyRangeForLease hashKeyRangeForLease;
/** /**
* Copy constructor, used by clone(). * Copy constructor, used by clone().
@ -105,7 +107,8 @@ public class Lease {
protected Lease(Lease lease) { protected Lease(Lease lease) {
this(lease.leaseKey(), lease.leaseOwner(), lease.leaseCounter(), lease.concurrencyToken(), this(lease.leaseKey(), lease.leaseOwner(), lease.leaseCounter(), lease.concurrencyToken(),
lease.lastCounterIncrementNanos(), lease.checkpoint(), lease.pendingCheckpoint(), lease.lastCounterIncrementNanos(), lease.checkpoint(), lease.pendingCheckpoint(),
lease.ownerSwitchesSinceCheckpoint(), lease.parentShardIds(), lease.childShardIds(), lease.pendingCheckpointState()); lease.ownerSwitchesSinceCheckpoint(), lease.parentShardIds(), lease.childShardIds(),
lease.pendingCheckpointState(), lease.hashKeyRangeForLease());
} }
@Deprecated @Deprecated
@ -114,14 +117,14 @@ public class Lease {
final ExtendedSequenceNumber checkpoint, final ExtendedSequenceNumber pendingCheckpoint, final ExtendedSequenceNumber checkpoint, final ExtendedSequenceNumber pendingCheckpoint,
final Long ownerSwitchesSinceCheckpoint, final Set<String> parentShardIds) { final Long ownerSwitchesSinceCheckpoint, final Set<String> parentShardIds) {
this(leaseKey, leaseOwner, leaseCounter, concurrencyToken, lastCounterIncrementNanos, checkpoint, pendingCheckpoint, this(leaseKey, leaseOwner, leaseCounter, concurrencyToken, lastCounterIncrementNanos, checkpoint, pendingCheckpoint,
ownerSwitchesSinceCheckpoint, parentShardIds, new HashSet<>(), null); ownerSwitchesSinceCheckpoint, parentShardIds, new HashSet<>(), null, null);
} }
public Lease(final String leaseKey, final String leaseOwner, final Long leaseCounter, public Lease(final String leaseKey, final String leaseOwner, final Long leaseCounter,
final UUID concurrencyToken, final Long lastCounterIncrementNanos, final UUID concurrencyToken, final Long lastCounterIncrementNanos,
final ExtendedSequenceNumber checkpoint, final ExtendedSequenceNumber pendingCheckpoint, final ExtendedSequenceNumber checkpoint, final ExtendedSequenceNumber pendingCheckpoint,
final Long ownerSwitchesSinceCheckpoint, final Set<String> parentShardIds, final Set<String> childShardIds, final Long ownerSwitchesSinceCheckpoint, final Set<String> parentShardIds, final Set<String> childShardIds,
final byte[] pendingCheckpointState) { final byte[] pendingCheckpointState, final HashKeyRangeForLease hashKeyRangeForLease) {
this.leaseKey = leaseKey; this.leaseKey = leaseKey;
this.leaseOwner = leaseOwner; this.leaseOwner = leaseOwner;
this.leaseCounter = leaseCounter; this.leaseCounter = leaseCounter;
@ -136,6 +139,9 @@ public class Lease {
if (childShardIds != null) { if (childShardIds != null) {
this.childShardIds.addAll(childShardIds); this.childShardIds.addAll(childShardIds);
} }
if (hashKeyRangeForLease != null) {
this.hashKeyRangeForLease = hashKeyRangeForLease;
}
this.pendingCheckpointState = pendingCheckpointState; this.pendingCheckpointState = pendingCheckpointState;
} }
@ -158,7 +164,8 @@ public class Lease {
pendingCheckpoint(lease.pendingCheckpoint); pendingCheckpoint(lease.pendingCheckpoint);
pendingCheckpointState(lease.pendingCheckpointState); pendingCheckpointState(lease.pendingCheckpointState);
parentShardIds(lease.parentShardIds); parentShardIds(lease.parentShardIds);
childShardIds(lease.childShardIds()); childShardIds(lease.childShardIds);
hashKeyRange(lease.hashKeyRangeForLease);
} }
/** /**
@ -274,6 +281,18 @@ public class Lease {
this.childShardIds.addAll(childShardIds); this.childShardIds.addAll(childShardIds);
} }
/**
* Set the hash range key for this shard.
* @param hashKeyRangeForLease
*/
public void hashKeyRange(final HashKeyRangeForLease hashKeyRangeForLease) {
if(this.hashKeyRangeForLease == null) {
this.hashKeyRangeForLease = hashKeyRangeForLease;
} else {
throw new IllegalArgumentException("hashKeyRange is immutable");
}
}
/** /**
* Sets leaseOwner. * Sets leaseOwner.
* *

View file

@ -32,6 +32,7 @@ import software.amazon.awssdk.services.dynamodb.model.KeyType;
import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType; import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.HashKeyRangeForLease;
import software.amazon.kinesis.leases.DynamoUtils; import software.amazon.kinesis.leases.DynamoUtils;
import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseSerializer; import software.amazon.kinesis.leases.LeaseSerializer;
@ -54,6 +55,7 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer {
private static final String PENDING_CHECKPOINT_STATE_KEY = "pendingCheckpointState"; private static final String PENDING_CHECKPOINT_STATE_KEY = "pendingCheckpointState";
private static final String PARENT_SHARD_ID_KEY = "parentShardId"; private static final String PARENT_SHARD_ID_KEY = "parentShardId";
private static final String CHILD_SHARD_IDS_KEY = "childShardIds"; private static final String CHILD_SHARD_IDS_KEY = "childShardIds";
private static final String HASH_KEY_RANGE = "hashKeyRange";
@Override @Override
public Map<String, AttributeValue> toDynamoRecord(final Lease lease) { public Map<String, AttributeValue> toDynamoRecord(final Lease lease) {
@ -85,6 +87,10 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer {
result.put(PENDING_CHECKPOINT_STATE_KEY, DynamoUtils.createAttributeValue(lease.checkpoint().subSequenceNumber())); result.put(PENDING_CHECKPOINT_STATE_KEY, DynamoUtils.createAttributeValue(lease.checkpoint().subSequenceNumber()));
} }
if(lease.hashKeyRangeForLease() != null) {
result.put(HASH_KEY_RANGE, DynamoUtils.createAttributeValue(lease.hashKeyRangeForLease().serialize()));
}
return result; return result;
} }
@ -119,6 +125,11 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer {
leaseToUpdate.pendingCheckpointState(DynamoUtils.safeGetByteArray(dynamoRecord, PENDING_CHECKPOINT_STATE_KEY)); leaseToUpdate.pendingCheckpointState(DynamoUtils.safeGetByteArray(dynamoRecord, PENDING_CHECKPOINT_STATE_KEY));
final String hashKeyRange;
if (!Strings.isNullOrEmpty(hashKeyRange = DynamoUtils.safeGetString(dynamoRecord, HASH_KEY_RANGE))) {
leaseToUpdate.hashKeyRange(HashKeyRangeForLease.deserialize(hashKeyRange));
}
return leaseToUpdate; return leaseToUpdate;
} }
@ -246,6 +257,10 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer {
result.put(CHILD_SHARD_IDS_KEY, putUpdate(DynamoUtils.createAttributeValue(lease.childShardIds()))); result.put(CHILD_SHARD_IDS_KEY, putUpdate(DynamoUtils.createAttributeValue(lease.childShardIds())));
} }
if(lease.hashKeyRangeForLease() != null) {
result.put(HASH_KEY_RANGE, putUpdate(DynamoUtils.createAttributeValue(lease.hashKeyRangeForLease().serialize())));
}
return result; return result;
} }

View file

@ -57,6 +57,7 @@ import software.amazon.awssdk.services.kinesis.model.SequenceNumberRange;
import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardFilter; import software.amazon.awssdk.services.kinesis.model.ShardFilter;
import software.amazon.awssdk.services.kinesis.model.ShardFilterType; import software.amazon.awssdk.services.kinesis.model.ShardFilterType;
import software.amazon.kinesis.common.HashKeyRangeForLease;
import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.common.StreamIdentifier;
@ -969,7 +970,7 @@ public class HierarchicalShardSyncerTest {
parentShardIds.add(shard.adjacentParentShardId()); parentShardIds.add(shard.adjacentParentShardId());
} }
return new Lease(shard.shardId(), leaseOwner, 0L, UUID.randomUUID(), 0L, checkpoint, null, 0L, return new Lease(shard.shardId(), leaseOwner, 0L, UUID.randomUUID(), 0L, checkpoint, null, 0L,
parentShardIds, new HashSet<>(), null); parentShardIds, new HashSet<>(), null, HashKeyRangeForLease.fromHashKeyRange(shard.hashKeyRange()));
}).collect(Collectors.toList()); }).collect(Collectors.toList());
} }

View file

@ -20,6 +20,7 @@ import java.util.UUID;
import lombok.Setter; import lombok.Setter;
import lombok.experimental.Accessors; import lombok.experimental.Accessors;
import software.amazon.kinesis.common.HashKeyRangeForLease;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
@Setter @Setter
@ -36,9 +37,11 @@ public class LeaseBuilder {
private Set<String> parentShardIds = new HashSet<>(); private Set<String> parentShardIds = new HashSet<>();
private Set<String> childShardIds = new HashSet<>(); private Set<String> childShardIds = new HashSet<>();
private byte[] pendingCheckpointState; private byte[] pendingCheckpointState;
private HashKeyRangeForLease hashKeyRangeForLease;
public Lease build() { public Lease build() {
return new Lease(leaseKey, leaseOwner, leaseCounter, concurrencyToken, lastCounterIncrementNanos, checkpoint, return new Lease(leaseKey, leaseOwner, leaseCounter, concurrencyToken, lastCounterIncrementNanos, checkpoint,
pendingCheckpoint, ownerSwitchesSinceCheckpoint, parentShardIds, childShardIds, pendingCheckpointState); pendingCheckpoint, ownerSwitchesSinceCheckpoint, parentShardIds, childShardIds,
pendingCheckpointState, hashKeyRangeForLease);
} }
} }

View file

@ -68,7 +68,7 @@ public class ShardObjectHelper {
String parentShardId, String parentShardId,
String adjacentParentShardId, String adjacentParentShardId,
SequenceNumberRange sequenceNumberRange) { SequenceNumberRange sequenceNumberRange) {
return newShard(shardId, parentShardId, adjacentParentShardId, sequenceNumberRange, null); return newShard(shardId, parentShardId, adjacentParentShardId, sequenceNumberRange, HashKeyRange.builder().startingHashKey("1").endingHashKey("100").build());
} }
/** Helper method to create a new shard object. /** Helper method to create a new shard object.

View file

@ -37,6 +37,7 @@ import org.junit.runner.RunWith;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner; import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.kinesis.common.HashKeyRangeForLease;
import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.DependencyException;
@ -55,7 +56,8 @@ public class DynamoDBLeaseRenewerTest {
private LeaseRefresher leaseRefresher; private LeaseRefresher leaseRefresher;
private static Lease newLease(String leaseKey) { private static Lease newLease(String leaseKey) {
return new Lease(leaseKey, "LeaseOwner", 0L, UUID.randomUUID(), System.nanoTime(), null, null, null, new HashSet<>(), new HashSet<>(), null); return new Lease(leaseKey, "LeaseOwner", 0L, UUID.randomUUID(), System.nanoTime(), null, null, null,
new HashSet<>(), new HashSet<>(), null, new HashKeyRangeForLease("1", "2"));
} }
@Before @Before