Exposed convenience method of ExtendedSequenceNumber#isSentinelCheckpoint() (#1053)

+ fixed unrelated parameterized log message in `ShardSyncTaskManager`
This commit is contained in:
stair 2023-02-27 13:15:30 -05:00 committed by GitHub
parent d8aa784f17
commit cd80c93966
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 70 additions and 65 deletions

View file

@ -205,7 +205,8 @@ public class ShardSyncTaskManager {
private void handlePendingShardSyncs(Throwable exception, TaskResult taskResult) { private void handlePendingShardSyncs(Throwable exception, TaskResult taskResult) {
if (exception != null || taskResult.getException() != null) { if (exception != null || taskResult.getException() != null) {
log.error("Caught exception running {} task: ", currentTask.taskType(), exception != null ? exception : taskResult.getException()); log.error("Caught exception running {} task: {}", currentTask.taskType(),
exception != null ? exception : taskResult.getException());
} }
// Acquire lock here. If shardSyncRequestPending is false in this completionStage and // Acquire lock here. If shardSyncRequestPending is false in this completionStage and
// submitShardSyncTask is invoked, before completion stage exits (future completes) // submitShardSyncTask is invoked, before completion stage exits (future completes)

View file

@ -12,14 +12,13 @@ import software.amazon.kinesis.metrics.MetricsScope;
* Helper class to sync leases with shards of the Kinesis stream. * Helper class to sync leases with shards of the Kinesis stream.
* It will create new leases/activities when it discovers new Kinesis shards (bootstrap/resharding). * It will create new leases/activities when it discovers new Kinesis shards (bootstrap/resharding).
* It deletes leases for shards that have been trimmed from Kinesis, or if we've completed processing it * It deletes leases for shards that have been trimmed from Kinesis, or if we've completed processing it
* and begun processing it's child shards. * and begun processing its child shards.
* *
* <p>NOTE: This class is deprecated and will be removed in a future release.</p> * <p>NOTE: This class is deprecated and will be removed in a future release.</p>
*/ */
@Deprecated @Deprecated
public class ShardSyncer { public class ShardSyncer {
private static final HierarchicalShardSyncer HIERARCHICAL_SHARD_SYNCER = new HierarchicalShardSyncer(); private static final HierarchicalShardSyncer HIERARCHICAL_SHARD_SYNCER = new HierarchicalShardSyncer();
private static final boolean garbageCollectLeases = true;
/** /**
* <p>NOTE: This method is deprecated and will be removed in a future release.</p> * <p>NOTE: This method is deprecated and will be removed in a future release.</p>

View file

@ -113,7 +113,7 @@ public class ProcessTask implements ConsumerTask {
*/ */
@Override @Override
public TaskResult call() { public TaskResult call() {
/** /*
* NOTE: the difference between appScope and shardScope is, appScope doesn't have shardId as a dimension, * NOTE: the difference between appScope and shardScope is, appScope doesn't have shardId as a dimension,
* therefore all data added to appScope, although from different shard consumer, will be sent to the same metric, * therefore all data added to appScope, although from different shard consumer, will be sent to the same metric,
* which is the app-level MillsBehindLatest metric. * which is the app-level MillsBehindLatest metric.
@ -180,8 +180,6 @@ public class ProcessTask implements ConsumerTask {
} }
} }
private List<KinesisClientRecord> deaggregateAnyKplRecords(List<KinesisClientRecord> records) { private List<KinesisClientRecord> deaggregateAnyKplRecords(List<KinesisClientRecord> records) {
if (shard == null) { if (shard == null) {
return aggregatorUtil.deaggregate(records); return aggregatorUtil.deaggregate(records);

View file

@ -15,8 +15,12 @@
package software.amazon.kinesis.retrieval.kpl; package software.amazon.kinesis.retrieval.kpl;
import java.math.BigInteger; import java.math.BigInteger;
import java.util.Arrays;
import java.util.Collections;
import java.util.Set;
import java.util.stream.Collectors;
//import com.amazonaws.services.kinesis.clientlibrary.lib.worker.String; import lombok.EqualsAndHashCode;
import software.amazon.kinesis.checkpoint.SentinelCheckpoint; import software.amazon.kinesis.checkpoint.SentinelCheckpoint;
/** /**
@ -28,10 +32,8 @@ import software.amazon.kinesis.checkpoint.SentinelCheckpoint;
* user record therefore has an integer sub-sequence number, in addition to the * user record therefore has an integer sub-sequence number, in addition to the
* regular sequence number of the Kinesis record. The sub-sequence number is * regular sequence number of the Kinesis record. The sub-sequence number is
* used to checkpoint within an aggregated record. * used to checkpoint within an aggregated record.
*
* @author daphnliu
*
*/ */
@EqualsAndHashCode
public class ExtendedSequenceNumber implements Comparable<ExtendedSequenceNumber> { public class ExtendedSequenceNumber implements Comparable<ExtendedSequenceNumber> {
private final String sequenceNumber; private final String sequenceNumber;
private final long subSequenceNumber; private final long subSequenceNumber;
@ -65,6 +67,15 @@ public class ExtendedSequenceNumber implements Comparable<ExtendedSequenceNumber
public static final ExtendedSequenceNumber AT_TIMESTAMP = public static final ExtendedSequenceNumber AT_TIMESTAMP =
new ExtendedSequenceNumber(SentinelCheckpoint.AT_TIMESTAMP.toString()); new ExtendedSequenceNumber(SentinelCheckpoint.AT_TIMESTAMP.toString());
/**
* Cache of {@link SentinelCheckpoint} values that avoids expensive
* try-catch and Exception handling.
*
* @see #isSentinelCheckpoint()
*/
private static final Set<String> SENTINEL_VALUES = Collections.unmodifiableSet(
Arrays.stream(SentinelCheckpoint.values()).map(SentinelCheckpoint::name).collect(Collectors.toSet()));
/** /**
* Construct an ExtendedSequenceNumber. The sub-sequence number defaults to * Construct an ExtendedSequenceNumber. The sub-sequence number defaults to
* 0. * 0.
@ -87,7 +98,7 @@ public class ExtendedSequenceNumber implements Comparable<ExtendedSequenceNumber
*/ */
public ExtendedSequenceNumber(String sequenceNumber, Long subSequenceNumber) { public ExtendedSequenceNumber(String sequenceNumber, Long subSequenceNumber) {
this.sequenceNumber = sequenceNumber; this.sequenceNumber = sequenceNumber;
this.subSequenceNumber = subSequenceNumber == null ? 0 : subSequenceNumber.longValue(); this.subSequenceNumber = subSequenceNumber == null ? 0L : subSequenceNumber;
} }
/** /**
@ -104,7 +115,7 @@ public class ExtendedSequenceNumber implements Comparable<ExtendedSequenceNumber
public int compareTo(ExtendedSequenceNumber extendedSequenceNumber) { public int compareTo(ExtendedSequenceNumber extendedSequenceNumber) {
String secondSequenceNumber = extendedSequenceNumber.sequenceNumber(); String secondSequenceNumber = extendedSequenceNumber.sequenceNumber();
if (!isDigitsOrSentinelValue(sequenceNumber) || !isDigitsOrSentinelValue(secondSequenceNumber)) { if (!isDigitsOrSentinelValue(this) || !isDigitsOrSentinelValue(extendedSequenceNumber)) {
throw new IllegalArgumentException("Expected a sequence number or a sentinel checkpoint value but " throw new IllegalArgumentException("Expected a sequence number or a sentinel checkpoint value but "
+ "received: first=" + sequenceNumber + " and second=" + secondSequenceNumber); + "received: first=" + sequenceNumber + " and second=" + secondSequenceNumber);
} }
@ -141,7 +152,6 @@ public class ExtendedSequenceNumber implements Comparable<ExtendedSequenceNumber
return subSequenceNumber; return subSequenceNumber;
} }
public boolean isShardEnd() { public boolean isShardEnd() {
return sequenceNumber.equals(SentinelCheckpoint.SHARD_END.toString()); return sequenceNumber.equals(SentinelCheckpoint.SHARD_END.toString());
} }
@ -149,49 +159,17 @@ public class ExtendedSequenceNumber implements Comparable<ExtendedSequenceNumber
@Override @Override
public String toString() { public String toString() {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
sb.append("{"); sb.append('{');
if (sequenceNumber() != null) { if (sequenceNumber() != null) {
sb.append("SequenceNumber: " + sequenceNumber() + ","); sb.append("SequenceNumber: ").append(sequenceNumber()).append(',');
} }
if (subSequenceNumber >= 0) { if (subSequenceNumber >= 0) {
sb.append("SubsequenceNumber: " + subSequenceNumber()); sb.append("SubsequenceNumber: ").append(subSequenceNumber());
} }
sb.append("}"); sb.append('}');
return sb.toString(); return sb.toString();
} }
@Override
public int hashCode() {
final int prime = 31;
final int shift = 32;
int hashCode = 1;
hashCode = prime * hashCode + ((sequenceNumber == null) ? 0 : sequenceNumber.hashCode());
hashCode = prime * hashCode + ((subSequenceNumber < 0)
? 0
: (int) (subSequenceNumber ^ (subSequenceNumber >>> shift)));
return hashCode;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (!(obj instanceof ExtendedSequenceNumber)) {
return false;
}
ExtendedSequenceNumber other = (ExtendedSequenceNumber) obj;
if (!sequenceNumber.equals(other.sequenceNumber())) {
return false;
}
return subSequenceNumber == other.subSequenceNumber();
}
/** /**
* Sequence numbers are converted, sentinels are given a value of -1. Note this method is only used after special * Sequence numbers are converted, sentinels are given a value of -1. Note this method is only used after special
* logic associated with SHARD_END and the case of comparing two sentinel values has already passed, so we map * logic associated with SHARD_END and the case of comparing two sentinel values has already passed, so we map
@ -217,28 +195,21 @@ public class ExtendedSequenceNumber implements Comparable<ExtendedSequenceNumber
} }
/** /**
* Checks if the string is all digits or one of the SentinelCheckpoint values. * Checks if a sequence number is all digits or a {@link SentinelCheckpoint}.
* *
* @param string * @param esn {@code ExtendedSequenceNumber} to validate its sequence number
* @return true if and only if the string is all digits or one of the SentinelCheckpoint values * @return true if and only if the string is all digits or one of the SentinelCheckpoint values
*/ */
private static boolean isDigitsOrSentinelValue(String string) { private static boolean isDigitsOrSentinelValue(final ExtendedSequenceNumber esn) {
return isDigits(string) || isSentinelValue(string); return isDigits(esn.sequenceNumber()) || esn.isSentinelCheckpoint();
} }
/** /**
* Checks if the string is a SentinelCheckpoint value. * Returns true if-and-only-if the sequence number is a {@link SentinelCheckpoint}.
* * Subsequence numbers are ignored when making this determination.
* @param string
* @return true if and only if the string can be converted to a SentinelCheckpoint
*/ */
private static boolean isSentinelValue(String string) { public boolean isSentinelCheckpoint() {
try { return SENTINEL_VALUES.contains(sequenceNumber);
SentinelCheckpoint.valueOf(string);
return true;
} catch (Exception e) {
return false;
}
} }
/** /**

View file

@ -0,0 +1,36 @@
/*
* Copyright 2023 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.retrieval.kpl;
import static org.junit.Assert.assertTrue;
import org.junit.Test;
import software.amazon.kinesis.checkpoint.SentinelCheckpoint;
public class ExtendedSequenceNumberTest {
@Test
public void testSentinelCheckpoints() {
for (final SentinelCheckpoint sentinel : SentinelCheckpoint.values()) {
final ExtendedSequenceNumber esn = new ExtendedSequenceNumber(sentinel.name());
assertTrue(sentinel.name(), esn.isSentinelCheckpoint());
// For backwards-compatibility, sentinels should ignore subsequences
final ExtendedSequenceNumber esnWithSubsequence = new ExtendedSequenceNumber(sentinel.name(), 42L);
assertTrue(sentinel.name(), esnWithSubsequence.isSentinelCheckpoint());
}
}
}