Revert experimental features from master (#431)
Reverted 3 commits: Revert "Change version number to 2.0.3-experimental" Revert:54c171dc2a. Revert "Experimental support for sequence number validation in the publisher (#401)" Revert:592499f7bc. Revert "Support Validating Records are From to the Expected Shard (#400)" Revert:01f5db8049.
This commit is contained in:
parent
a88d4ba602
commit
e86bf3d7f3
15 changed files with 99 additions and 650 deletions
|
|
@ -19,7 +19,7 @@
|
|||
<parent>
|
||||
<artifactId>amazon-kinesis-client-pom</artifactId>
|
||||
<groupId>software.amazon.kinesis</groupId>
|
||||
<version>2.0.3-experimental</version>
|
||||
<version>2.0.3-SNAPSHOT</version>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@
|
|||
<parent>
|
||||
<groupId>software.amazon.kinesis</groupId>
|
||||
<artifactId>amazon-kinesis-client-pom</artifactId>
|
||||
<version>2.0.3-experimental</version>
|
||||
<version>2.0.3-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>amazon-kinesis-client</artifactId>
|
||||
|
|
|
|||
|
|
@ -1,26 +0,0 @@
|
|||
/*
|
||||
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package software.amazon.kinesis.annotations;
|
||||
|
||||
import java.lang.annotation.Retention;
|
||||
import java.lang.annotation.RetentionPolicy;
|
||||
|
||||
/**
|
||||
* Anything marked as experimental may be removed at any time without warning.
|
||||
*/
|
||||
@Retention(RetentionPolicy.CLASS)
|
||||
public @interface KinesisClientExperimental {
|
||||
String reason() default "";
|
||||
}
|
||||
|
|
@ -1,188 +0,0 @@
|
|||
/*
|
||||
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
|
||||
package software.amazon.kinesis.checkpoint;
|
||||
|
||||
import java.math.BigInteger;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import lombok.Data;
|
||||
import lombok.experimental.Accessors;
|
||||
|
||||
/**
|
||||
* This supports extracting the shardId from a sequence number.
|
||||
*
|
||||
* <h2>Warning</h2>
|
||||
* <strong>Sequence numbers are an opaque value used by Kinesis, and maybe changed at any time. Should validation stop
|
||||
* working you may need to update your version of the KCL</strong>
|
||||
*
|
||||
*/
|
||||
public class SequenceNumberValidator {
|
||||
|
||||
@Data
|
||||
@Accessors(fluent = true)
|
||||
private static class SequenceNumberComponents {
|
||||
final int version;
|
||||
final int shardId;
|
||||
}
|
||||
|
||||
private interface SequenceNumberReader {
|
||||
Optional<SequenceNumberComponents> read(String sequenceNumber);
|
||||
}
|
||||
|
||||
/**
|
||||
* Reader for the v2 sequence number format. v1 sequence numbers are no longer used or available.
|
||||
*/
|
||||
private static class V2SequenceNumberReader implements SequenceNumberReader {
|
||||
|
||||
private static final int VERSION = 2;
|
||||
|
||||
private static final int EXPECTED_BIT_LENGTH = 186;
|
||||
|
||||
private static final int VERSION_OFFSET = 184;
|
||||
private static final long VERSION_MASK = (1 << 4) - 1;
|
||||
|
||||
private static final int SHARD_ID_OFFSET = 4;
|
||||
private static final long SHARD_ID_MASK = (1L << 32) - 1;
|
||||
|
||||
@Override
|
||||
public Optional<SequenceNumberComponents> read(String sequenceNumberString) {
|
||||
BigInteger sequenceNumber = new BigInteger(sequenceNumberString, 10);
|
||||
|
||||
//
|
||||
// If the bit length of the sequence number isn't 186 it's impossible for the version numbers
|
||||
// to be where we expect them. We treat this the same as an unknown version of the sequence number
|
||||
//
|
||||
// If the sequence number length isn't what we expect it's due to a new version of the sequence number or
|
||||
// an invalid sequence number. This
|
||||
//
|
||||
if (sequenceNumber.bitLength() != EXPECTED_BIT_LENGTH) {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
//
|
||||
// Read the 4 most significant bits of the sequence number, the 2 most significant bits are implicitly 0
|
||||
// (2 == 0b0011). If the version number doesn't match we give up and say we can't parse the sequence number
|
||||
//
|
||||
int version = readOffset(sequenceNumber, VERSION_OFFSET, VERSION_MASK);
|
||||
if (version != VERSION) {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
//
|
||||
// If we get here the sequence number is big enough, and the version matches so the shardId should be valid.
|
||||
//
|
||||
int shardId = readOffset(sequenceNumber, SHARD_ID_OFFSET, SHARD_ID_MASK);
|
||||
return Optional.of(new SequenceNumberComponents(version, shardId));
|
||||
}
|
||||
|
||||
private int readOffset(BigInteger sequenceNumber, int offset, long mask) {
|
||||
long value = sequenceNumber.shiftRight(offset).longValue() & mask;
|
||||
return (int) value;
|
||||
}
|
||||
}
|
||||
|
||||
private static final List<SequenceNumberReader> SEQUENCE_NUMBER_READERS = Collections
|
||||
.singletonList(new V2SequenceNumberReader());
|
||||
|
||||
private Optional<SequenceNumberComponents> retrieveComponentsFor(String sequenceNumber) {
|
||||
return SEQUENCE_NUMBER_READERS.stream().map(r -> r.read(sequenceNumber)).filter(Optional::isPresent).map(Optional::get).findFirst();
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempts to retrieve the version for a sequence number. If no reader can be found for the sequence number this
|
||||
* will return an empty Optional.
|
||||
*
|
||||
* <p>
|
||||
* <strong>This will return an empty Optional if the it's unable to extract the version number. This can occur for
|
||||
* multiple reasons including:
|
||||
* <ul>
|
||||
* <li>Kinesis has started using a new version of sequence numbers</li>
|
||||
* <li>The provided sequence number isn't a valid Kinesis sequence number.</li>
|
||||
* </ul>
|
||||
* </strong>
|
||||
* </p>
|
||||
*
|
||||
* @param sequenceNumber
|
||||
* the sequence number to extract the version from
|
||||
* @return an Optional containing the version if a compatible sequence number reader can be found, an empty Optional
|
||||
* otherwise.
|
||||
*/
|
||||
public Optional<Integer> versionFor(String sequenceNumber) {
|
||||
return retrieveComponentsFor(sequenceNumber).map(SequenceNumberComponents::version);
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempts to retrieve the shardId from a sequence number. If the version of the sequence number is unsupported
|
||||
* this will return an empty optional.
|
||||
*
|
||||
* <strong>This will return an empty Optional if the sequence number isn't recognized. This can occur for multiple
|
||||
* reasons including:
|
||||
* <ul>
|
||||
* <li>Kinesis has started using a new version of sequence numbers</li>
|
||||
* <li>The provided sequence number isn't a valid Kinesis sequence number.</li>
|
||||
* </ul>
|
||||
* </strong>
|
||||
* <p>
|
||||
* This should always return a value if {@link #versionFor(String)} returns a value
|
||||
* </p>
|
||||
*
|
||||
* @param sequenceNumber
|
||||
* the sequence number to extract the shardId from
|
||||
* @return an Optional containing the shardId if the version is supported, an empty Optional otherwise.
|
||||
*/
|
||||
public Optional<String> shardIdFor(String sequenceNumber) {
|
||||
return retrieveComponentsFor(sequenceNumber).map(s -> String.format("shardId-%012d", s.shardId()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates that the sequence number provided contains the given shardId. If the sequence number is unsupported
|
||||
* this will return an empty Optional.
|
||||
*
|
||||
* <p>
|
||||
* Validation of a sequence number will only occur if the sequence number can be parsed. It's possible to use
|
||||
* {@link #versionFor(String)} to verify that the given sequence number is supported by this class. There are 3
|
||||
* possible validation states:
|
||||
* <dl>
|
||||
* <dt>Some(True)</dt>
|
||||
* <dd>The sequence number can be parsed, and the shardId matches the one in the sequence number</dd>
|
||||
* <dt>Some(False)</dt>
|
||||
* <dd>THe sequence number can be parsed, and the shardId doesn't match the one in the sequence number</dd>
|
||||
* <dt>None</dt>
|
||||
* <dd>It wasn't possible to parse the sequence number so the validity of the sequence number is unknown</dd>
|
||||
* </dl>
|
||||
* </p>
|
||||
*
|
||||
* <p>
|
||||
* <strong>Handling unknown validation causes is application specific, and not specific handling is
|
||||
* provided.</strong>
|
||||
* </p>
|
||||
*
|
||||
* @param sequenceNumber
|
||||
* the sequence number to verify the shardId
|
||||
* @param shardId
|
||||
* the shardId that the sequence is expected to contain
|
||||
* @return true if the sequence number contains the shardId, false if it doesn't. If the sequence number version is
|
||||
* unsupported this will return an empty Optional
|
||||
*/
|
||||
public Optional<Boolean> validateSequenceNumberForShard(String sequenceNumber, String shardId) {
|
||||
return shardIdFor(sequenceNumber).map(s -> StringUtils.equalsIgnoreCase(s, shardId));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -34,7 +34,7 @@ public class RetrievalConfig {
|
|||
*/
|
||||
public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java";
|
||||
|
||||
public static final String KINESIS_CLIENT_LIB_USER_AGENT_VERSION = "2.0.3-experimental";
|
||||
public static final String KINESIS_CLIENT_LIB_USER_AGENT_VERSION = "2.0.3-SNAPSHOT";
|
||||
|
||||
/**
|
||||
* Client used to make calls to Kinesis for records retrieval
|
||||
|
|
|
|||
|
|
@ -26,7 +26,6 @@ import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
|
|||
import software.amazon.kinesis.leases.exceptions.DependencyException;
|
||||
import software.amazon.kinesis.retrieval.RetrievalFactory;
|
||||
import software.amazon.kinesis.retrieval.RetrievalSpecificConfig;
|
||||
import software.amazon.kinesis.annotations.KinesisClientExperimental;
|
||||
|
||||
@Data
|
||||
@Accessors(fluent = true)
|
||||
|
|
@ -86,8 +85,7 @@ public class FanOutConfig implements RetrievalSpecificConfig {
|
|||
return new FanOutRetrievalFactory(kinesisClient, getOrCreateConsumerArn());
|
||||
}
|
||||
|
||||
@KinesisClientExperimental(reason = "Experimentally changed from private to protected")
|
||||
protected String getOrCreateConsumerArn() {
|
||||
private String getOrCreateConsumerArn() {
|
||||
if (consumerArn != null) {
|
||||
return consumerArn;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -18,7 +18,6 @@ package software.amazon.kinesis.retrieval.fanout;
|
|||
import java.time.Instant;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
|
|
@ -29,7 +28,6 @@ import lombok.NonNull;
|
|||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import software.amazon.awssdk.core.async.SdkPublisher;
|
||||
import software.amazon.awssdk.http.SdkHttpResponse;
|
||||
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
|
||||
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
|
||||
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
|
||||
|
|
@ -37,7 +35,6 @@ import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream
|
|||
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
|
||||
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponse;
|
||||
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
|
||||
import software.amazon.kinesis.annotations.KinesisClientExperimental;
|
||||
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
|
||||
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
|
||||
import software.amazon.kinesis.common.KinesisRequestsBuilder;
|
||||
|
|
@ -48,6 +45,7 @@ import software.amazon.kinesis.retrieval.RecordsPublisher;
|
|||
import software.amazon.kinesis.retrieval.RetryableRetrievalException;
|
||||
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
|
||||
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
@KinesisClientInternalApi
|
||||
public class FanOutRecordsPublisher implements RecordsPublisher {
|
||||
|
|
@ -59,22 +57,6 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
|
|||
private final String shardId;
|
||||
private final String consumerArn;
|
||||
|
||||
/**
|
||||
* Creates a new FanOutRecordsPublisher.
|
||||
*
|
||||
* @param kinesis
|
||||
* the kinesis client to use for requests
|
||||
* @param shardId
|
||||
* the shardId to retrieve records for
|
||||
* @param consumerArn
|
||||
* the consumer to use when retrieving records
|
||||
*/
|
||||
public FanOutRecordsPublisher(KinesisAsyncClient kinesis, String shardId, String consumerArn) {
|
||||
this.kinesis = kinesis;
|
||||
this.shardId = shardId;
|
||||
this.consumerArn = consumerArn;
|
||||
}
|
||||
|
||||
private final Object lockObject = new Object();
|
||||
|
||||
private final AtomicInteger subscribeToShardId = new AtomicInteger(0);
|
||||
|
|
@ -410,8 +392,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
|
|||
}
|
||||
subscriber = null;
|
||||
if (flow != null) {
|
||||
log.debug(
|
||||
"{}: [SubscriptionLifetime]: (FanOutRecordsPublisher/Subscription#cancel) @ {} id: {}",
|
||||
log.debug("{}: [SubscriptionLifetime]: (FanOutRecordsPublisher/Subscription#cancel) @ {} id: {}",
|
||||
shardId, flow.connectionStartedAt, flow.subscribeToShardId);
|
||||
flow.cancel();
|
||||
availableQueueSpace = 0;
|
||||
|
|
@ -436,20 +417,6 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Allows validating records received from SubscribeToShard
|
||||
*
|
||||
* @param shardId
|
||||
* the shardId the records should be from
|
||||
* @param event
|
||||
* the SubscribeToShard event that was received.
|
||||
* @throws IllegalArgumentException
|
||||
* if the records are invalid. This will trigger an error response upwards
|
||||
*/
|
||||
@KinesisClientExperimental(reason = "Allows providing a validation function with minimal changes")
|
||||
protected void validateRecords(String shardId, SubscribeToShardEvent event) {
|
||||
}
|
||||
|
||||
private void rejectSubscription(SdkPublisher<SubscribeToShardEventStream> publisher) {
|
||||
publisher.subscribe(new Subscriber<SubscribeToShardEventStream>() {
|
||||
Subscription localSub;
|
||||
|
|
@ -526,15 +493,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
|
|||
|
||||
@Override
|
||||
public void responseReceived(SubscribeToShardResponse response) {
|
||||
Optional<SdkHttpResponse> sdkHttpResponse = Optional.ofNullable(response)
|
||||
.flatMap(r -> Optional.ofNullable(r.sdkHttpResponse()));
|
||||
Optional<String> requestId = sdkHttpResponse.flatMap(s -> s.firstMatchingHeader("x-amz-requestid"));
|
||||
Optional<String> requestId2 = sdkHttpResponse.flatMap(s -> s.firstMatchingHeader("x-amz-id-2"));
|
||||
|
||||
log.debug(
|
||||
"{}: [SubscriptionLifetime]: (RecordFlow#responseReceived) @ {} id: {} -- Response received -- rid: {} -- rid2: {}",
|
||||
parent.shardId, connectionStartedAt, subscribeToShardId, requestId.orElse("None"),
|
||||
requestId2.orElse("None"));
|
||||
log.debug("{}: [SubscriptionLifetime]: (RecordFlow#responseReceived) @ {} id: {} -- Response received",
|
||||
parent.shardId, connectionStartedAt, subscribeToShardId);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -676,9 +636,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
|
|||
cancel();
|
||||
}
|
||||
log.debug(
|
||||
"{}: [SubscriptionLifetime]: (RecordSubscription#onSubscribe) @ {} id: {} (Subscription ObjectId: {}) -- Outstanding: {} items so requesting an item",
|
||||
parent.shardId, connectionStartedAt, subscribeToShardId, System.identityHashCode(subscription),
|
||||
parent.availableQueueSpace);
|
||||
"{}: [SubscriptionLifetime]: (RecordSubscription#onSubscribe) @ {} id: {} -- Outstanding: {} items so requesting an item",
|
||||
parent.shardId, connectionStartedAt, subscribeToShardId, parent.availableQueueSpace);
|
||||
if (parent.availableQueueSpace > 0) {
|
||||
request(1);
|
||||
}
|
||||
|
|
@ -698,16 +657,6 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
|
|||
recordBatchEvent.accept(new SubscribeToShardResponseHandler.Visitor() {
|
||||
@Override
|
||||
public void visit(SubscribeToShardEvent event) {
|
||||
try {
|
||||
parent.validateRecords(parent.shardId, event);
|
||||
} catch (IllegalArgumentException iae) {
|
||||
log.debug(
|
||||
"{}: [SubscriptionLifetime]: (RecordSubscription#onNext#vistor) @ {} id: {} (Subscription ObjectId: {}) -- Failing subscription due to mismatches: [ {} ]",
|
||||
parent.shardId, connectionStartedAt, subscribeToShardId,
|
||||
System.identityHashCode(subscription), iae.getMessage());
|
||||
parent.errorOccurred(flow, iae);
|
||||
return;
|
||||
}
|
||||
flow.recordsReceived(event);
|
||||
}
|
||||
});
|
||||
|
|
|
|||
|
|
@ -17,7 +17,6 @@ package software.amazon.kinesis.retrieval.fanout;
|
|||
|
||||
import lombok.NonNull;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.experimental.Accessors;
|
||||
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
|
||||
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
|
||||
import software.amazon.kinesis.leases.ShardInfo;
|
||||
|
|
@ -28,7 +27,6 @@ import software.amazon.kinesis.retrieval.RetrievalFactory;
|
|||
|
||||
@RequiredArgsConstructor
|
||||
@KinesisClientInternalApi
|
||||
@Accessors(fluent = true)
|
||||
public class FanOutRetrievalFactory implements RetrievalFactory {
|
||||
|
||||
private final KinesisAsyncClient kinesisClient;
|
||||
|
|
|
|||
|
|
@ -1,38 +0,0 @@
|
|||
/*
|
||||
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package software.amazon.kinesis.retrieval.fanout.experimental;
|
||||
|
||||
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
|
||||
import software.amazon.kinesis.annotations.KinesisClientExperimental;
|
||||
import software.amazon.kinesis.retrieval.RetrievalFactory;
|
||||
import software.amazon.kinesis.retrieval.fanout.FanOutConfig;
|
||||
|
||||
/**
|
||||
* Enables validation of sequence number for every received record.
|
||||
*
|
||||
* <h2><strong>This is an experimental class and may be removed at any time</strong></h2>
|
||||
*/
|
||||
@KinesisClientExperimental
|
||||
public class ExperimentalFanOutConfig extends FanOutConfig {
|
||||
|
||||
public ExperimentalFanOutConfig(KinesisAsyncClient kinesisClient) {
|
||||
super(kinesisClient);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RetrievalFactory retrievalFactory() {
|
||||
return new ExperimentalFanOutRetrievalFactory(kinesisClient(), getOrCreateConsumerArn());
|
||||
}
|
||||
}
|
||||
|
|
@ -1,76 +0,0 @@
|
|||
/*
|
||||
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package software.amazon.kinesis.retrieval.fanout.experimental;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
|
||||
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
|
||||
import software.amazon.kinesis.annotations.KinesisClientExperimental;
|
||||
import software.amazon.kinesis.checkpoint.SequenceNumberValidator;
|
||||
import software.amazon.kinesis.retrieval.fanout.FanOutRecordsPublisher;
|
||||
|
||||
/**
|
||||
* A variation of {@link FanOutRecordsPublisher} that provides validation of every record received by the publisher.
|
||||
*
|
||||
* <h2><strong>This is an experimental class and may be removed at any time</strong></h2>
|
||||
*/
|
||||
@Slf4j
|
||||
@KinesisClientExperimental
|
||||
public class ExperimentalFanOutRecordsPublisher extends FanOutRecordsPublisher {
|
||||
|
||||
private final SequenceNumberValidator sequenceNumberValidator = new SequenceNumberValidator();
|
||||
|
||||
/**
|
||||
* Creates a new FanOutRecordsPublisher.
|
||||
*
|
||||
* @param kinesis
|
||||
* the kinesis client to use for requests
|
||||
* @param shardId
|
||||
* the shardId to retrieve records for
|
||||
* @param consumerArn
|
||||
*/
|
||||
public ExperimentalFanOutRecordsPublisher(KinesisAsyncClient kinesis, String shardId, String consumerArn) {
|
||||
super(kinesis, shardId, consumerArn);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void validateRecords(String shardId, SubscribeToShardEvent event) {
|
||||
Map<String, Integer> mismatchedRecords = recordsNotForShard(shardId, event);
|
||||
if (mismatchedRecords.size() > 0) {
|
||||
String mismatchReport = mismatchedRecords.entrySet().stream()
|
||||
.map(e -> String.format("(%s -> %d)", e.getKey(), e.getValue())).collect(Collectors.joining(", "));
|
||||
throw new IllegalArgumentException("Received records destined for different shards: " + mismatchReport);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private Map<String, Integer> recordsNotForShard(String shardId, SubscribeToShardEvent event) {
|
||||
return event.records().stream().map(r -> {
|
||||
Optional<String> res = sequenceNumberValidator.shardIdFor(r.sequenceNumber());
|
||||
if (!res.isPresent()) {
|
||||
throw new IllegalArgumentException("Unable to validate sequence number of " + r.sequenceNumber());
|
||||
}
|
||||
return res.get();
|
||||
}).filter(s -> !StringUtils.equalsIgnoreCase(s, shardId)).collect(Collectors.groupingBy(Function.identity()))
|
||||
.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().size()));
|
||||
}
|
||||
}
|
||||
|
|
@ -1,46 +0,0 @@
|
|||
/*
|
||||
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package software.amazon.kinesis.retrieval.fanout.experimental;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
|
||||
import software.amazon.kinesis.annotations.KinesisClientExperimental;
|
||||
import software.amazon.kinesis.leases.ShardInfo;
|
||||
import software.amazon.kinesis.metrics.MetricsFactory;
|
||||
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
|
||||
import software.amazon.kinesis.retrieval.RecordsPublisher;
|
||||
import software.amazon.kinesis.retrieval.RetrievalFactory;
|
||||
|
||||
/**
|
||||
* Supports validating that records did originate from the shard.
|
||||
*
|
||||
* <h2><strong>This is an experimental class and may be removed at any time</strong></h2>
|
||||
*/
|
||||
@RequiredArgsConstructor
|
||||
@KinesisClientExperimental
|
||||
public class ExperimentalFanOutRetrievalFactory implements RetrievalFactory {
|
||||
private final KinesisAsyncClient kinesisClient;
|
||||
private final String consumerArn;
|
||||
|
||||
@Override
|
||||
public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(ShardInfo shardInfo, MetricsFactory metricsFactory) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RecordsPublisher createGetRecordsCache(ShardInfo shardInfo, MetricsFactory metricsFactory) {
|
||||
return new ExperimentalFanOutRecordsPublisher(kinesisClient, shardInfo.shardId(), consumerArn);
|
||||
}
|
||||
}
|
||||
|
|
@ -14,88 +14,113 @@
|
|||
*/
|
||||
package software.amazon.kinesis.checkpoint;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
import static org.hamcrest.CoreMatchers.not;
|
||||
import static org.junit.Assert.assertThat;
|
||||
|
||||
//@RunWith(MockitoJUnitRunner.class)
|
||||
public class SequenceNumberValidatorTest {
|
||||
/*private final String streamName = "testStream";
|
||||
private final boolean validateWithGetIterator = true;
|
||||
private final String shardId = "shardid-123";
|
||||
|
||||
private SequenceNumberValidator validator;
|
||||
@Mock
|
||||
private AmazonKinesis amazonKinesis;
|
||||
|
||||
@Before
|
||||
public void begin() {
|
||||
validator = new SequenceNumberValidator();
|
||||
}
|
||||
@Test (expected = IllegalArgumentException.class)
|
||||
public final void testSequenceNumberValidator() {
|
||||
Checkpoint.SequenceNumberValidator validator = new Checkpoint.SequenceNumberValidator(amazonKinesis, streamName,
|
||||
shardId, validateWithGetIterator);
|
||||
|
||||
String goodSequence = "456";
|
||||
String iterator = "happyiterator";
|
||||
String badSequence = "789";
|
||||
|
||||
@Test
|
||||
public void matchingSequenceNumberTest() {
|
||||
String sequenceNumber = "49587497311274533994574834252742144236107130636007899138";
|
||||
String expectedShardId = "shardId-000000000000";
|
||||
ArgumentCaptor<GetShardIteratorRequest> requestCaptor = ArgumentCaptor.forClass(GetShardIteratorRequest.class);
|
||||
|
||||
Optional<Integer> version = validator.versionFor(sequenceNumber);
|
||||
assertThat(version, equalTo(Optional.of(2)));
|
||||
when(amazonKinesis.getShardIterator(requestCaptor.capture()))
|
||||
.thenReturn(new GetShardIteratorResult().withShardIterator(iterator))
|
||||
.thenThrow(new InvalidArgumentException(""));
|
||||
|
||||
Optional<String> shardId = validator.shardIdFor(sequenceNumber);
|
||||
assertThat(shardId, equalTo(Optional.of(expectedShardId)));
|
||||
validator.validateSequenceNumber(goodSequence);
|
||||
try {
|
||||
validator.validateSequenceNumber(badSequence);
|
||||
} finally {
|
||||
final List<GetShardIteratorRequest> requests = requestCaptor.getAllValues();
|
||||
assertEquals(2, requests.size());
|
||||
|
||||
assertThat(validator.validateSequenceNumberForShard(sequenceNumber, expectedShardId), equalTo(Optional.of(true)));
|
||||
final GetShardIteratorRequest goodRequest = requests.get(0);
|
||||
final GetShardIteratorRequest badRequest = requests.get(0);
|
||||
|
||||
assertEquals(streamName, goodRequest.getStreamName());
|
||||
assertEquals(shardId, goodRequest.shardId());
|
||||
assertEquals(ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), goodRequest.getShardIteratorType());
|
||||
assertEquals(goodSequence, goodRequest.getStartingSequenceNumber());
|
||||
|
||||
assertEquals(streamName, badRequest.getStreamName());
|
||||
assertEquals(shardId, badRequest.shardId());
|
||||
assertEquals(ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), badRequest.getShardIteratorType());
|
||||
assertEquals(goodSequence, badRequest.getStartingSequenceNumber());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shardMismatchTest() {
|
||||
String sequenceNumber = "49585389983312162443796657944872008114154899568972529698";
|
||||
String invalidShardId = "shardId-000000000001";
|
||||
public final void testNoValidation() {
|
||||
Checkpoint.SequenceNumberValidator validator = new Checkpoint.SequenceNumberValidator(amazonKinesis, streamName,
|
||||
shardId, !validateWithGetIterator);
|
||||
String sequenceNumber = "456";
|
||||
|
||||
Optional<Integer> version = validator.versionFor(sequenceNumber);
|
||||
assertThat(version, equalTo(Optional.of(2)));
|
||||
// Just checking that the false flag for validating against getIterator is honored
|
||||
validator.validateSequenceNumber(sequenceNumber);
|
||||
|
||||
Optional<String> shardId = validator.shardIdFor(sequenceNumber);
|
||||
assertThat(shardId, not(equalTo(invalidShardId)));
|
||||
|
||||
assertThat(validator.validateSequenceNumberForShard(sequenceNumber, invalidShardId), equalTo(Optional.of(false)));
|
||||
verify(amazonKinesis, never()).getShardIterator(any(GetShardIteratorRequest.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void versionMismatchTest() {
|
||||
String sequenceNumber = "74107425965128755728308386687147091174006956590945533954";
|
||||
String expectedShardId = "shardId-000000000000";
|
||||
public void nonNumericValueValidationTest() {
|
||||
Checkpoint.SequenceNumberValidator validator = new Checkpoint.SequenceNumberValidator(amazonKinesis, streamName,
|
||||
shardId, validateWithGetIterator);
|
||||
|
||||
Optional<Integer> version = validator.versionFor(sequenceNumber);
|
||||
assertThat(version, equalTo(Optional.empty()));
|
||||
String[] nonNumericStrings = {null,
|
||||
"bogus-sequence-number",
|
||||
SentinelCheckpoint.LATEST.toString(),
|
||||
SentinelCheckpoint.TRIM_HORIZON.toString(),
|
||||
SentinelCheckpoint.AT_TIMESTAMP.toString()};
|
||||
|
||||
Optional<String> shardId = validator.shardIdFor(sequenceNumber);
|
||||
assertThat(shardId, equalTo(Optional.empty()));
|
||||
Arrays.stream(nonNumericStrings).forEach(sequenceNumber -> {
|
||||
try {
|
||||
validator.validateSequenceNumber(sequenceNumber);
|
||||
fail("Validator should not consider " + sequenceNumber + " a valid sequence number");
|
||||
} catch (IllegalArgumentException e) {
|
||||
// Do nothing
|
||||
}
|
||||
});
|
||||
|
||||
assertThat(validator.validateSequenceNumberForShard(sequenceNumber, expectedShardId), equalTo(Optional.empty()));
|
||||
verify(amazonKinesis, never()).getShardIterator(any(GetShardIteratorRequest.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void sequenceNumberToShortTest() {
|
||||
String sequenceNumber = "4958538998331216244379665794487200811415489956897252969";
|
||||
String expectedShardId = "shardId-000000000000";
|
||||
|
||||
assertThat(validator.versionFor(sequenceNumber), equalTo(Optional.empty()));
|
||||
assertThat(validator.shardIdFor(sequenceNumber), equalTo(Optional.empty()));
|
||||
|
||||
assertThat(validator.validateSequenceNumberForShard(sequenceNumber, expectedShardId), equalTo(Optional.empty()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void sequenceNumberToLongTest() {
|
||||
String sequenceNumber = "495874973112745339945748342527421442361071306360078991381";
|
||||
String expectedShardId = "shardId-000000000000";
|
||||
|
||||
assertThat(validator.versionFor(sequenceNumber), equalTo(Optional.empty()));
|
||||
assertThat(validator.shardIdFor(sequenceNumber), equalTo(Optional.empty()));
|
||||
|
||||
assertThat(validator.validateSequenceNumberForShard(sequenceNumber, expectedShardId), equalTo(Optional.empty()));
|
||||
}
|
||||
|
||||
public final void testIsDigits() {
|
||||
// Check things that are all digits
|
||||
String[] stringsOfDigits = {"0", "12", "07897803434", "12324456576788"};
|
||||
|
||||
for (String digits : stringsOfDigits) {
|
||||
assertTrue("Expected that " + digits + " would be considered a string of digits.",
|
||||
Checkpoint.SequenceNumberValidator.isDigits(digits));
|
||||
}
|
||||
// Check things that are not all digits
|
||||
String[] stringsWithNonDigits = {
|
||||
null,
|
||||
"",
|
||||
" ", // white spaces
|
||||
"6 4",
|
||||
"\t45",
|
||||
"5242354235234\n",
|
||||
"7\n6\n5\n",
|
||||
"12s", // last character
|
||||
"c07897803434", // first character
|
||||
"1232445wef6576788", // interior
|
||||
"no-digits",
|
||||
};
|
||||
for (String notAllDigits : stringsWithNonDigits) {
|
||||
assertFalse("Expected that " + notAllDigits + " would not be considered a string of digits.",
|
||||
Checkpoint.SequenceNumberValidator.isDigits(notAllDigits));
|
||||
}
|
||||
}*/
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,136 +0,0 @@
|
|||
/*
|
||||
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package software.amazon.kinesis.retrieval.fanout;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.runners.MockitoJUnitRunner;
|
||||
import org.reactivestreams.Subscriber;
|
||||
import org.reactivestreams.Subscription;
|
||||
import software.amazon.awssdk.core.SdkBytes;
|
||||
import software.amazon.awssdk.core.async.SdkPublisher;
|
||||
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
|
||||
import software.amazon.awssdk.services.kinesis.model.Record;
|
||||
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
|
||||
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream;
|
||||
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
|
||||
import software.amazon.kinesis.common.InitialPositionInStream;
|
||||
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
|
||||
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
|
||||
import software.amazon.kinesis.retrieval.fanout.FanOutRecordsPublisher;
|
||||
import software.amazon.kinesis.retrieval.fanout.experimental.ExperimentalFanOutRecordsPublisher;
|
||||
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
|
||||
|
||||
import java.math.BigInteger;
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.containsString;
|
||||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
import static org.hamcrest.CoreMatchers.instanceOf;
|
||||
import static org.junit.Assert.assertThat;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.doNothing;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
//
|
||||
// This has to be in the fanout package as it accesses internal classes of the FanOutRecordsPublisher but tests
|
||||
// a descendant of that class.
|
||||
//
|
||||
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
public class ExperimentalFanOutRecordsPublisherTest {
|
||||
|
||||
private static final String SHARD_ID = "shardId-000000000001";
|
||||
private static final String CONSUMER_ARN = "arn:consumer";
|
||||
|
||||
@Mock
|
||||
private KinesisAsyncClient kinesisClient;
|
||||
@Mock
|
||||
private SdkPublisher<SubscribeToShardEventStream> publisher;
|
||||
@Mock
|
||||
private Subscription subscription;
|
||||
|
||||
@Test
|
||||
public void mismatchedShardIdTest() {
|
||||
FanOutRecordsPublisher source = new ExperimentalFanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN);
|
||||
|
||||
ArgumentCaptor<FanOutRecordsPublisher.RecordSubscription> captor = ArgumentCaptor
|
||||
.forClass(FanOutRecordsPublisher.RecordSubscription.class);
|
||||
ArgumentCaptor<FanOutRecordsPublisher.RecordFlow> flowCaptor = ArgumentCaptor
|
||||
.forClass(FanOutRecordsPublisher.RecordFlow.class);
|
||||
|
||||
doNothing().when(publisher).subscribe(captor.capture());
|
||||
|
||||
source.start(ExtendedSequenceNumber.LATEST,
|
||||
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST));
|
||||
|
||||
List<Throwable> errorsHandled = new ArrayList<>();
|
||||
List<ProcessRecordsInput> inputsReceived = new ArrayList<>();
|
||||
|
||||
source.subscribe(new Subscriber<ProcessRecordsInput>() {
|
||||
Subscription subscription;
|
||||
|
||||
@Override
|
||||
public void onSubscribe(Subscription s) {
|
||||
subscription = s;
|
||||
subscription.request(1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNext(ProcessRecordsInput input) {
|
||||
inputsReceived.add(input);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable t) {
|
||||
errorsHandled.add(t);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onComplete() {
|
||||
fail("OnComplete called when not expected");
|
||||
}
|
||||
});
|
||||
|
||||
verify(kinesisClient).subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture());
|
||||
flowCaptor.getValue().onEventStream(publisher);
|
||||
captor.getValue().onSubscribe(subscription);
|
||||
|
||||
List<Record> records = Stream.of(1, 2, 3).map(seq -> FanOutRecordsPublisherTest.makeRecord(seq, seq)).collect(Collectors.toList());
|
||||
|
||||
SubscribeToShardEvent batchEvent = SubscribeToShardEvent.builder().millisBehindLatest(100L).records(records).build();
|
||||
|
||||
captor.getValue().onNext(batchEvent);
|
||||
|
||||
verify(subscription, times(1)).request(1);
|
||||
assertThat(inputsReceived.size(), equalTo(0));
|
||||
assertThat(errorsHandled.size(), equalTo(1));
|
||||
assertThat(errorsHandled.get(0), instanceOf(IllegalArgumentException.class));
|
||||
assertThat(errorsHandled.get(0).getMessage(), containsString("Received records destined for different shards"));
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
|
@ -11,7 +11,6 @@ import static org.mockito.Mockito.reset;
|
|||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
import java.math.BigInteger;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
|
|
@ -53,7 +52,7 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
|
|||
@Slf4j
|
||||
public class FanOutRecordsPublisherTest {
|
||||
|
||||
private static final String SHARD_ID = "shardId-000000000001";
|
||||
private static final String SHARD_ID = "Shard-001";
|
||||
private static final String CONSUMER_ARN = "arn:consumer";
|
||||
|
||||
@Mock
|
||||
|
|
@ -319,8 +318,6 @@ public class FanOutRecordsPublisherTest {
|
|||
|
||||
}
|
||||
|
||||
|
||||
|
||||
private void verifyRecords(List<KinesisClientRecord> clientRecordsList, List<KinesisClientRecordMatcher> matchers) {
|
||||
assertThat(clientRecordsList.size(), equalTo(matchers.size()));
|
||||
for (int i = 0; i < clientRecordsList.size(); ++i) {
|
||||
|
|
@ -357,17 +354,9 @@ public class FanOutRecordsPublisherTest {
|
|||
}
|
||||
|
||||
private Record makeRecord(int sequenceNumber) {
|
||||
return makeRecord(sequenceNumber, 1);
|
||||
}
|
||||
|
||||
static Record makeRecord(int sequenceNumber, int shardId) {
|
||||
BigInteger version = BigInteger.valueOf(2).shiftLeft(184);
|
||||
BigInteger shard = BigInteger.valueOf(shardId).shiftLeft(4);
|
||||
BigInteger seq = version.add(shard).add(BigInteger.valueOf(sequenceNumber));
|
||||
|
||||
SdkBytes buffer = SdkBytes.fromByteArray(new byte[] { 1, 2, 3 });
|
||||
return Record.builder().data(buffer).approximateArrivalTimestamp(Instant.now())
|
||||
.sequenceNumber(seq.toString()).partitionKey("A").build();
|
||||
.sequenceNumber(Integer.toString(sequenceNumber)).partitionKey("A").build();
|
||||
}
|
||||
|
||||
private static class KinesisClientRecordMatcher extends TypeSafeDiagnosingMatcher<KinesisClientRecord> {
|
||||
|
|
|
|||
2
pom.xml
2
pom.xml
|
|
@ -20,7 +20,7 @@
|
|||
<artifactId>amazon-kinesis-client-pom</artifactId>
|
||||
<packaging>pom</packaging>
|
||||
<name>Amazon Kinesis Client Library</name>
|
||||
<version>2.0.3-experimental</version>
|
||||
<version>2.0.3-SNAPSHOT</version>
|
||||
<description>The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data
|
||||
from Amazon Kinesis.
|
||||
</description>
|
||||
|
|
|
|||
Loading…
Reference in a new issue