Fixing comparison bug and addressing review comments

This commit is contained in:
Ashwin Giridharan 2020-04-29 15:23:14 -07:00
parent 07c9529c14
commit bdec3bd66d
5 changed files with 67 additions and 39 deletions

View file

@ -15,57 +15,63 @@
package software.amazon.kinesis.common; package software.amazon.kinesis.common;
import com.google.common.base.Joiner; import lombok.NonNull;
import lombok.Data; import lombok.Value;
import lombok.experimental.Accessors; import lombok.experimental.Accessors;
import org.apache.commons.lang3.Validate; import org.apache.commons.lang3.Validate;
import software.amazon.awssdk.services.kinesis.model.HashKeyRange; import software.amazon.awssdk.services.kinesis.model.HashKeyRange;
@Data import java.math.BigInteger;
@Accessors(fluent = true)
@Value @Accessors(fluent = true)
/** /**
* Lease POJO to hold the starting hashkey range and ending hashkey range of kinesis shards. * Lease POJO to hold the starting hashkey range and ending hashkey range of kinesis shards.
*/ */
public class HashKeyRangeForLease { public class HashKeyRangeForLease {
private static final String DELIM = ":"; private final BigInteger startingHashKey;
private final BigInteger endingHashKey;
private final String startingHashKey;
private final String endingHashKey;
/** /**
* Serialize the HashKeyRangeForLease for persisting in external storage * Serialize the startingHashKey for persisting in external storage
* @return Serialized string *
* @return Serialized startingHashKey
*/ */
public String serialize() { public String serializedStartingHashKey() {
return Joiner.on(DELIM).join(startingHashKey, endingHashKey); return startingHashKey.toString();
} }
@Override /**
public String toString() { * Serialize the endingHashKey for persisting in external storage
return serialize(); *
* @return Serialized endingHashKey
*/
public String serializedEndingHashKey() {
return endingHashKey.toString();
} }
/** /**
* Deserialize from serialized hashKeyRange string from external storage. * Deserialize from serialized hashKeyRange string from external storage.
* @param hashKeyRange *
* @param startingHashKeyStr
* @param endingHashKeyStr
* @return HashKeyRangeForLease * @return HashKeyRangeForLease
*/ */
public static HashKeyRangeForLease deserialize(String hashKeyRange) { public static HashKeyRangeForLease deserialize(@NonNull String startingHashKeyStr, @NonNull String endingHashKeyStr) {
final String[] hashKeyTokens = hashKeyRange.split(DELIM); final BigInteger startingHashKey = new BigInteger(startingHashKeyStr);
Validate.isTrue(hashKeyTokens.length == 2, "HashKeyRange should have exactly two tokens"); final BigInteger endingHashKey = new BigInteger(endingHashKeyStr);
// Assuming that startingHashKey and endingHashRange are not same. Validate.isTrue(startingHashKey.compareTo(endingHashKey) < 0,
Validate.isTrue(!hashKeyTokens[0].equals(hashKeyTokens[1]), "StartingHashKey and EndingHashKey should not be same"); "StartingHashKey %s must be less than EndingHashKey %s ", startingHashKeyStr, endingHashKeyStr);
Validate.isTrue(hashKeyTokens[0].compareTo(hashKeyTokens[1]) < 0, "StartingHashKey must be less than EndingHashKey"); return new HashKeyRangeForLease(startingHashKey, endingHashKey);
return new HashKeyRangeForLease(hashKeyTokens[0], hashKeyTokens[1]);
} }
/** /**
* Construct HashKeyRangeForLease from Kinesis HashKeyRange * Construct HashKeyRangeForLease from Kinesis HashKeyRange
*
* @param hashKeyRange * @param hashKeyRange
* @return HashKeyRangeForLease * @return HashKeyRangeForLease
*/ */
public static HashKeyRangeForLease fromHashKeyRange(HashKeyRange hashKeyRange) { public static HashKeyRangeForLease fromHashKeyRange(HashKeyRange hashKeyRange) {
return new HashKeyRangeForLease(hashKeyRange.startingHashKey(), hashKeyRange.endingHashKey()); return deserialize(hashKeyRange.startingHashKey(), hashKeyRange.endingHashKey());
} }
} }

View file

@ -139,9 +139,7 @@ public class Lease {
if (childShardIds != null) { if (childShardIds != null) {
this.childShardIds.addAll(childShardIds); this.childShardIds.addAll(childShardIds);
} }
if (hashKeyRangeForLease != null) { this.hashKeyRangeForLease = hashKeyRangeForLease;
this.hashKeyRangeForLease = hashKeyRangeForLease;
}
this.pendingCheckpointState = pendingCheckpointState; this.pendingCheckpointState = pendingCheckpointState;
} }
@ -286,9 +284,9 @@ public class Lease {
* @param hashKeyRangeForLease * @param hashKeyRangeForLease
*/ */
public void hashKeyRange(final HashKeyRangeForLease hashKeyRangeForLease) { public void hashKeyRange(final HashKeyRangeForLease hashKeyRangeForLease) {
if(this.hashKeyRangeForLease == null) { if (this.hashKeyRangeForLease == null) {
this.hashKeyRangeForLease = hashKeyRangeForLease; this.hashKeyRangeForLease = hashKeyRangeForLease;
} else { } else if (!this.hashKeyRangeForLease.equals(hashKeyRangeForLease)) {
throw new IllegalArgumentException("hashKeyRange is immutable"); throw new IllegalArgumentException("hashKeyRange is immutable");
} }
} }

View file

@ -55,7 +55,8 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer {
private static final String PENDING_CHECKPOINT_STATE_KEY = "pendingCheckpointState"; private static final String PENDING_CHECKPOINT_STATE_KEY = "pendingCheckpointState";
private static final String PARENT_SHARD_ID_KEY = "parentShardId"; private static final String PARENT_SHARD_ID_KEY = "parentShardId";
private static final String CHILD_SHARD_IDS_KEY = "childShardIds"; private static final String CHILD_SHARD_IDS_KEY = "childShardIds";
private static final String HASH_KEY_RANGE = "hashKeyRange"; private static final String STARTING_HASH_KEY = "startingHashKey";
private static final String ENDING_HASH_KEY = "endingHashKey";
@Override @Override
public Map<String, AttributeValue> toDynamoRecord(final Lease lease) { public Map<String, AttributeValue> toDynamoRecord(final Lease lease) {
@ -88,7 +89,8 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer {
} }
if(lease.hashKeyRangeForLease() != null) { if(lease.hashKeyRangeForLease() != null) {
result.put(HASH_KEY_RANGE, DynamoUtils.createAttributeValue(lease.hashKeyRangeForLease().serialize())); result.put(STARTING_HASH_KEY, DynamoUtils.createAttributeValue(lease.hashKeyRangeForLease().serializedStartingHashKey()));
result.put(ENDING_HASH_KEY, DynamoUtils.createAttributeValue(lease.hashKeyRangeForLease().serializedEndingHashKey()));
} }
return result; return result;
@ -125,9 +127,10 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer {
leaseToUpdate.pendingCheckpointState(DynamoUtils.safeGetByteArray(dynamoRecord, PENDING_CHECKPOINT_STATE_KEY)); leaseToUpdate.pendingCheckpointState(DynamoUtils.safeGetByteArray(dynamoRecord, PENDING_CHECKPOINT_STATE_KEY));
final String hashKeyRange; final String startingHashKey, endingHashKey;
if (!Strings.isNullOrEmpty(hashKeyRange = DynamoUtils.safeGetString(dynamoRecord, HASH_KEY_RANGE))) { if (!Strings.isNullOrEmpty(startingHashKey = DynamoUtils.safeGetString(dynamoRecord, STARTING_HASH_KEY))
leaseToUpdate.hashKeyRange(HashKeyRangeForLease.deserialize(hashKeyRange)); && !Strings.isNullOrEmpty(endingHashKey = DynamoUtils.safeGetString(dynamoRecord, ENDING_HASH_KEY))) {
leaseToUpdate.hashKeyRange(HashKeyRangeForLease.deserialize(startingHashKey, endingHashKey));
} }
return leaseToUpdate; return leaseToUpdate;
@ -258,7 +261,8 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer {
} }
if(lease.hashKeyRangeForLease() != null) { if(lease.hashKeyRangeForLease() != null) {
result.put(HASH_KEY_RANGE, putUpdate(DynamoUtils.createAttributeValue(lease.hashKeyRangeForLease().serialize()))); result.put(STARTING_HASH_KEY, putUpdate(DynamoUtils.createAttributeValue(lease.hashKeyRangeForLease().serializedStartingHashKey())));
result.put(ENDING_HASH_KEY, putUpdate(DynamoUtils.createAttributeValue(lease.hashKeyRangeForLease().serializedEndingHashKey())));
} }
return result; return result;

View file

@ -20,6 +20,7 @@ package software.amazon.kinesis.leases;
import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doNothing;
@ -40,11 +41,13 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.function.Consumer;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.IntStream; import java.util.stream.IntStream;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
@ -120,7 +123,6 @@ public class HierarchicalShardSyncerTest {
final List<Shard> shards = Collections.emptyList(); final List<Shard> shards = Collections.emptyList();
final List<Lease> leases = Collections.emptyList(); final List<Lease> leases = Collections.emptyList();
final HierarchicalShardSyncer.LeaseSynchronizer emptyLeaseTableSynchronizer = new HierarchicalShardSyncer.EmptyLeaseTableSynchronizer(); final HierarchicalShardSyncer.LeaseSynchronizer emptyLeaseTableSynchronizer = new HierarchicalShardSyncer.EmptyLeaseTableSynchronizer();
assertThat(HierarchicalShardSyncer.determineNewLeasesToCreate(emptyLeaseTableSynchronizer, shards, leases, assertThat(HierarchicalShardSyncer.determineNewLeasesToCreate(emptyLeaseTableSynchronizer, shards, leases,
INITIAL_POSITION_LATEST).isEmpty(), equalTo(true)); INITIAL_POSITION_LATEST).isEmpty(), equalTo(true));
} }
@ -154,6 +156,8 @@ public class HierarchicalShardSyncerTest {
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(emptyLeaseTableSynchronizer, final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(emptyLeaseTableSynchronizer,
shards, currentLeases, INITIAL_POSITION_LATEST); shards, currentLeases, INITIAL_POSITION_LATEST);
validateHashRangeinLease(newLeases);
final Set<String> newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set<String> newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final Set<String> expectedLeaseShardIds = new HashSet<>(Arrays.asList(shardId0, shardId1)); final Set<String> expectedLeaseShardIds = new HashSet<>(Arrays.asList(shardId0, shardId1));
@ -177,6 +181,7 @@ public class HierarchicalShardSyncerTest {
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(emptyLeaseTableSynchronizer, final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(emptyLeaseTableSynchronizer,
shards, currentLeases, INITIAL_POSITION_LATEST, new HashSet<>(), MULTI_STREAM_ARGS); shards, currentLeases, INITIAL_POSITION_LATEST, new HashSet<>(), MULTI_STREAM_ARGS);
validateHashRangeinLease(newLeases);
final Set<String> newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set<String> newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final Set<String> expectedLeaseIds = new HashSet<>( final Set<String> expectedLeaseIds = new HashSet<>(
toMultiStreamLeaseList(Arrays.asList(shardId0, shardId1))); toMultiStreamLeaseList(Arrays.asList(shardId0, shardId1)));
@ -213,6 +218,7 @@ public class HierarchicalShardSyncerTest {
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases,
INITIAL_POSITION_LATEST, inconsistentShardIds); INITIAL_POSITION_LATEST, inconsistentShardIds);
validateHashRangeinLease(newLeases);
final Set<String> newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set<String> newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final Set<String> expectedLeaseShardIds = new HashSet<>(Arrays.asList(shardId0, shardId1)); final Set<String> expectedLeaseShardIds = new HashSet<>(Arrays.asList(shardId0, shardId1));
assertThat(newLeases.size(), equalTo(expectedLeaseShardIds.size())); assertThat(newLeases.size(), equalTo(expectedLeaseShardIds.size()));
@ -248,12 +254,22 @@ public class HierarchicalShardSyncerTest {
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases,
INITIAL_POSITION_LATEST, inconsistentShardIds, MULTI_STREAM_ARGS); INITIAL_POSITION_LATEST, inconsistentShardIds, MULTI_STREAM_ARGS);
final Set<String> newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set<String> newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
validateHashRangeinLease(newLeases);
final Set<String> expectedLeaseShardIds = new HashSet<>( final Set<String> expectedLeaseShardIds = new HashSet<>(
toMultiStreamLeaseList(Arrays.asList(shardId0, shardId1))); toMultiStreamLeaseList(Arrays.asList(shardId0, shardId1)));
assertThat(newLeases.size(), equalTo(expectedLeaseShardIds.size())); assertThat(newLeases.size(), equalTo(expectedLeaseShardIds.size()));
assertThat(newLeaseKeys, equalTo(expectedLeaseShardIds)); assertThat(newLeaseKeys, equalTo(expectedLeaseShardIds));
} }
private void validateHashRangeinLease(List<Lease> leases) {
final Consumer<Lease> leaseValidation = lease -> {
Validate.notNull(lease.hashKeyRangeForLease());
Validate.isTrue(lease.hashKeyRangeForLease().startingHashKey()
.compareTo(lease.hashKeyRangeForLease().endingHashKey()) < 0);
};
leases.forEach(lease -> leaseValidation.accept(lease));
}
/** /**
* Test bootstrapShardLeases() starting at TRIM_HORIZON ("beginning" of stream) * Test bootstrapShardLeases() starting at TRIM_HORIZON ("beginning" of stream)
*/ */
@ -295,6 +311,7 @@ public class HierarchicalShardSyncerTest {
final Set<ExtendedSequenceNumber> extendedSequenceNumbers = requestLeases.stream().map(Lease::checkpoint) final Set<ExtendedSequenceNumber> extendedSequenceNumbers = requestLeases.stream().map(Lease::checkpoint)
.collect(Collectors.toSet()); .collect(Collectors.toSet());
validateHashRangeinLease(requestLeases);
assertThat(requestLeases.size(), equalTo(expectedShardIds.size())); assertThat(requestLeases.size(), equalTo(expectedShardIds.size()));
assertThat(requestLeaseKeys, equalTo(expectedShardIds)); assertThat(requestLeaseKeys, equalTo(expectedShardIds));
assertThat(extendedSequenceNumbers.size(), equalTo(1)); assertThat(extendedSequenceNumbers.size(), equalTo(1));
@ -329,6 +346,7 @@ public class HierarchicalShardSyncerTest {
final Set<ExtendedSequenceNumber> extendedSequenceNumbers = requestLeases.stream().map(Lease::checkpoint) final Set<ExtendedSequenceNumber> extendedSequenceNumbers = requestLeases.stream().map(Lease::checkpoint)
.collect(Collectors.toSet()); .collect(Collectors.toSet());
validateHashRangeinLease(requestLeases);
assertThat(requestLeases.size(), equalTo(expectedShardIds.size())); assertThat(requestLeases.size(), equalTo(expectedShardIds.size()));
assertThat(requestLeaseKeys, equalTo(expectedShardIds)); assertThat(requestLeaseKeys, equalTo(expectedShardIds));
assertThat(extendedSequenceNumbers.size(), equalTo(1)); assertThat(extendedSequenceNumbers.size(), equalTo(1));
@ -376,6 +394,8 @@ public class HierarchicalShardSyncerTest {
assertThat(requestLeaseKeys, equalTo(expectedShardIds)); assertThat(requestLeaseKeys, equalTo(expectedShardIds));
assertThat(extendedSequenceNumbers.size(), equalTo(1)); assertThat(extendedSequenceNumbers.size(), equalTo(1));
validateHashRangeinLease(requestLeases);
extendedSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST))); extendedSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST)));
verify(shardDetector, never()).listShards(); verify(shardDetector, never()).listShards();
@ -412,7 +432,7 @@ public class HierarchicalShardSyncerTest {
assertThat(requestLeases.size(), equalTo(expectedShardIds.size())); assertThat(requestLeases.size(), equalTo(expectedShardIds.size()));
assertThat(requestLeaseKeys, equalTo(expectedShardIds)); assertThat(requestLeaseKeys, equalTo(expectedShardIds));
assertThat(extendedSequenceNumbers.size(), equalTo(1)); assertThat(extendedSequenceNumbers.size(), equalTo(1));
validateHashRangeinLease(requestLeases);
extendedSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST))); extendedSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST)));
verify(shardDetector, never()).listShards(); verify(shardDetector, never()).listShards();
@ -444,7 +464,7 @@ public class HierarchicalShardSyncerTest {
final Set<String> requestLeaseKeys = requestLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set<String> requestLeaseKeys = requestLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final Set<ExtendedSequenceNumber> extendedSequenceNumbers = requestLeases.stream().map(Lease::checkpoint) final Set<ExtendedSequenceNumber> extendedSequenceNumbers = requestLeases.stream().map(Lease::checkpoint)
.collect(Collectors.toSet()); .collect(Collectors.toSet());
validateHashRangeinLease(requestLeases);
assertThat(requestLeases.size(), equalTo(expectedShardIds.size())); assertThat(requestLeases.size(), equalTo(expectedShardIds.size()));
assertThat(extendedSequenceNumbers.size(), equalTo(0)); assertThat(extendedSequenceNumbers.size(), equalTo(0));

View file

@ -57,7 +57,7 @@ public class DynamoDBLeaseRenewerTest {
private static Lease newLease(String leaseKey) { private static Lease newLease(String leaseKey) {
return new Lease(leaseKey, "LeaseOwner", 0L, UUID.randomUUID(), System.nanoTime(), null, null, null, return new Lease(leaseKey, "LeaseOwner", 0L, UUID.randomUUID(), System.nanoTime(), null, null, null,
new HashSet<>(), new HashSet<>(), null, new HashKeyRangeForLease("1", "2")); new HashSet<>(), new HashSet<>(), null, HashKeyRangeForLease.deserialize("1", "2"));
} }
@Before @Before