Experimental support for sequence number validation in the publisher (#401)

* Moved sequence number validation to an experimental feature

Moved the sequence number validation to become an experimental feature
that can be removed in the future.

Added an annotation for experimental features.

* Delete merge conflict again?

* Add some reminder that this stuff is experimental

* Added a reason field, and some reasons

Added a reason value to the annotation, and updated two of the unusual places.
This commit is contained in:
Justin Pfifer 2018-09-18 15:05:36 -07:00 committed by Sahil Palvia
parent 01f5db8049
commit 592499f7bc
11 changed files with 357 additions and 200 deletions

View file

@ -0,0 +1,26 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package software.amazon.kinesis.annotations;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
/**
* Anything marked as experimental may be removed at any time without warning.
*/
@Retention(RetentionPolicy.CLASS)
public @interface KinesisClientExperimental {
String reason() default "";
}

View file

@ -26,6 +26,7 @@ import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.retrieval.RetrievalFactory;
import software.amazon.kinesis.retrieval.RetrievalSpecificConfig;
import software.amazon.kinesis.annotations.KinesisClientExperimental;
@Data
@Accessors(fluent = true)
@ -80,18 +81,13 @@ public class FanOutConfig implements RetrievalSpecificConfig {
*/
private long retryBackoffMillis = 1000;
/**
* Controls whether the {@link FanOutRecordsPublisher} will validate that all the records are from the shard it's
* processing.
*/
private boolean validateRecordsAreForShard = false;
@Override
public RetrievalFactory retrievalFactory() {
return new FanOutRetrievalFactory(kinesisClient, getOrCreateConsumerArn()).validateRecordsAreForShard(validateRecordsAreForShard);
return new FanOutRetrievalFactory(kinesisClient, getOrCreateConsumerArn());
}
private String getOrCreateConsumerArn() {
@KinesisClientExperimental(reason = "Experimentally changed from private to protected")
protected String getOrCreateConsumerArn() {
if (consumerArn != null) {
return consumerArn;
}

View file

@ -18,13 +18,10 @@ package software.amazon.kinesis.retrieval.fanout;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
@ -40,13 +37,13 @@ import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponse;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.checkpoint.SequenceNumberValidator;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.KinesisRequestsBuilder;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.retrieval.IteratorBuilder;
import software.amazon.kinesis.retrieval.KinesisClientRecord;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.annotations.KinesisClientExperimental;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
@Slf4j
@ -56,14 +53,9 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
private final KinesisAsyncClient kinesis;
private final String shardId;
private final String consumerArn;
private final boolean validateRecordShardMatching;
/**
* Creates a new FanOutRecordsPublisher.
* <p>
* This is deprecated and will be removed in a later release. Use
* {@link #FanOutRecordsPublisher(KinesisAsyncClient, String, String, boolean)} instead
* </p>
*
* @param kinesis
* the kinesis client to use for requests
@ -72,17 +64,10 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
* @param consumerArn
* the consumer to use when retrieving records
*/
@Deprecated
public FanOutRecordsPublisher(KinesisAsyncClient kinesis, String shardId, String consumerArn) {
this(kinesis, shardId, consumerArn, false);
}
public FanOutRecordsPublisher(KinesisAsyncClient kinesis, String shardId, String consumerArn,
boolean validateRecordShardMatching) {
this.kinesis = kinesis;
this.shardId = shardId;
this.consumerArn = consumerArn;
this.validateRecordShardMatching = validateRecordShardMatching;
}
private final Object lockObject = new Object();
@ -383,7 +368,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
}
subscriber = null;
if (flow != null) {
log.debug("{}: [SubscriptionLifetime]: (FanOutRecordsPublisher/Subscription#cancel) @ {} id: {}",
log.debug(
"{}: [SubscriptionLifetime]: (FanOutRecordsPublisher/Subscription#cancel) @ {} id: {}",
shardId, flow.connectionStartedAt, flow.subscribeToShardId);
flow.cancel();
availableQueueSpace = 0;
@ -408,6 +394,20 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
}
}
/**
* Allows validating records received from SubscribeToShard
*
* @param shardId
* the shardId the records should be from
* @param event
* the SubscribeToShard event that was received.
* @throws IllegalArgumentException
* if the records are invalid. This will trigger an error response upwards
*/
@KinesisClientExperimental(reason = "Allows providing a validation function with minimal changes")
protected void validateRecords(String shardId, SubscribeToShardEvent event) {
}
private void rejectSubscription(SdkPublisher<SubscribeToShardEventStream> publisher) {
publisher.subscribe(new Subscriber<SubscribeToShardEventStream>() {
Subscription localSub;
@ -588,7 +588,6 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
private final RecordFlow flow;
private final Instant connectionStartedAt;
private final String subscribeToShardId;
private final SequenceNumberValidator sequenceNumberValidator = new SequenceNumberValidator();
private Subscription subscription;
@ -657,7 +656,14 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
recordBatchEvent.accept(new SubscribeToShardResponseHandler.Visitor() {
@Override
public void visit(SubscribeToShardEvent event) {
if (parent.validateRecordShardMatching && !areRecordsValid(event)) {
try {
parent.validateRecords(parent.shardId, event);
} catch (IllegalArgumentException iae) {
log.debug(
"{}: [SubscriptionLifetime]: (RecordSubscription#onNext#vistor) @ {} id: {} (Subscription ObjectId: {}) -- Failing subscription due to mismatches: [ {} ]",
parent.shardId, connectionStartedAt, subscribeToShardId,
System.identityHashCode(subscription), iae.getMessage());
parent.errorOccurred(flow, iae);
return;
}
flow.recordsReceived(event);
@ -666,45 +672,6 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
}
}
private boolean areRecordsValid(SubscribeToShardEvent event) {
try {
Map<String, Integer> mismatchedRecords = recordsNotForShard(event);
if (mismatchedRecords.size() > 0) {
String mismatchReport = mismatchedRecords.entrySet().stream()
.map(e -> String.format("(%s -> %d)", e.getKey(), e.getValue()))
.collect(Collectors.joining(", "));
log.debug(
"{}: [SubscriptionLifetime]: (RecordSubscription#onNext#vistor) @ {} id: {} (Subscription ObjectId: {}) -- Failing subscription due to mismatches: [ {} ]",
parent.shardId, connectionStartedAt, subscribeToShardId,
System.identityHashCode(subscription), mismatchReport);
parent.errorOccurred(flow, new IllegalArgumentException(
"Received records destined for different shards: " + mismatchReport));
return false;
}
} catch (IllegalArgumentException iae) {
log.debug(
"{}: [SubscriptionLifetime]: (RecordSubscription#onNext#vistor) @ {} id: {} (Subscription ObjectId: {}) -- "
+ "A problem occurred while validating sequence numbers: {} on subscription {}",
parent.shardId, connectionStartedAt, subscribeToShardId, System.identityHashCode(subscription),
iae.getMessage(), iae);
parent.errorOccurred(flow, iae);
return false;
}
return true;
}
private Map<String, Integer> recordsNotForShard(SubscribeToShardEvent event) {
return event.records().stream().map(r -> {
Optional<String> res = sequenceNumberValidator.shardIdFor(r.sequenceNumber());
if (!res.isPresent()) {
throw new IllegalArgumentException("Unable to validate sequence number of " + r.sequenceNumber());
}
return res.get();
}).filter(s -> !StringUtils.equalsIgnoreCase(s, parent.shardId))
.collect(Collectors.groupingBy(Function.identity())).entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().size()));
}
@Override
public void onError(Throwable t) {
log.debug("{}: [SubscriptionLifetime]: (RecordSubscription#onError) @ {} id: {} -- {}: {}", parent.shardId,

View file

@ -15,10 +15,8 @@
package software.amazon.kinesis.retrieval.fanout;
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.experimental.Accessors;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
@ -35,9 +33,6 @@ public class FanOutRetrievalFactory implements RetrievalFactory {
private final KinesisAsyncClient kinesisClient;
private final String consumerArn;
@Getter
@Setter
private boolean validateRecordsAreForShard = false;
@Override
public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(final ShardInfo shardInfo,
@ -48,6 +43,6 @@ public class FanOutRetrievalFactory implements RetrievalFactory {
@Override
public RecordsPublisher createGetRecordsCache(@NonNull final ShardInfo shardInfo,
final MetricsFactory metricsFactory) {
return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(), consumerArn, validateRecordsAreForShard);
return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(), consumerArn);
}
}

View file

@ -0,0 +1,38 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package software.amazon.kinesis.retrieval.fanout.experimental;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.annotations.KinesisClientExperimental;
import software.amazon.kinesis.retrieval.RetrievalFactory;
import software.amazon.kinesis.retrieval.fanout.FanOutConfig;
/**
* Enables validation of sequence number for every received record.
*
* <h2><strong>This is an experimental class and may be removed at any time</strong></h2>
*/
@KinesisClientExperimental
public class ExperimentalFanOutConfig extends FanOutConfig {
public ExperimentalFanOutConfig(KinesisAsyncClient kinesisClient) {
super(kinesisClient);
}
@Override
public RetrievalFactory retrievalFactory() {
return new ExperimentalFanOutRetrievalFactory(kinesisClient(), getOrCreateConsumerArn());
}
}

View file

@ -0,0 +1,76 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package software.amazon.kinesis.retrieval.fanout.experimental;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
import software.amazon.kinesis.annotations.KinesisClientExperimental;
import software.amazon.kinesis.checkpoint.SequenceNumberValidator;
import software.amazon.kinesis.retrieval.fanout.FanOutRecordsPublisher;
/**
* A variation of {@link FanOutRecordsPublisher} that provides validation of every record received by the publisher.
*
* <h2><strong>This is an experimental class and may be removed at any time</strong></h2>
*/
@Slf4j
@KinesisClientExperimental
public class ExperimentalFanOutRecordsPublisher extends FanOutRecordsPublisher {
private final SequenceNumberValidator sequenceNumberValidator = new SequenceNumberValidator();
/**
* Creates a new FanOutRecordsPublisher.
*
* @param kinesis
* the kinesis client to use for requests
* @param shardId
* the shardId to retrieve records for
* @param consumerArn
*/
public ExperimentalFanOutRecordsPublisher(KinesisAsyncClient kinesis, String shardId, String consumerArn) {
super(kinesis, shardId, consumerArn);
}
@Override
protected void validateRecords(String shardId, SubscribeToShardEvent event) {
Map<String, Integer> mismatchedRecords = recordsNotForShard(shardId, event);
if (mismatchedRecords.size() > 0) {
String mismatchReport = mismatchedRecords.entrySet().stream()
.map(e -> String.format("(%s -> %d)", e.getKey(), e.getValue())).collect(Collectors.joining(", "));
throw new IllegalArgumentException("Received records destined for different shards: " + mismatchReport);
}
}
private Map<String, Integer> recordsNotForShard(String shardId, SubscribeToShardEvent event) {
return event.records().stream().map(r -> {
Optional<String> res = sequenceNumberValidator.shardIdFor(r.sequenceNumber());
if (!res.isPresent()) {
throw new IllegalArgumentException("Unable to validate sequence number of " + r.sequenceNumber());
}
return res.get();
}).filter(s -> !StringUtils.equalsIgnoreCase(s, shardId)).collect(Collectors.groupingBy(Function.identity()))
.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().size()));
}
}

View file

@ -0,0 +1,46 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package software.amazon.kinesis.retrieval.fanout.experimental;
import lombok.RequiredArgsConstructor;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.annotations.KinesisClientExperimental;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RetrievalFactory;
/**
* Supports validating that records did originate from the shard.
*
* <h2><strong>This is an experimental class and may be removed at any time</strong></h2>
*/
@RequiredArgsConstructor
@KinesisClientExperimental
public class ExperimentalFanOutRetrievalFactory implements RetrievalFactory {
private final KinesisAsyncClient kinesisClient;
private final String consumerArn;
@Override
public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(ShardInfo shardInfo, MetricsFactory metricsFactory) {
return null;
}
@Override
public RecordsPublisher createGetRecordsCache(ShardInfo shardInfo, MetricsFactory metricsFactory) {
return new ExperimentalFanOutRecordsPublisher(kinesisClient, shardInfo.shardId(), consumerArn);
}
}

View file

@ -0,0 +1,136 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package software.amazon.kinesis.retrieval.fanout;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.retrieval.fanout.FanOutRecordsPublisher;
import software.amazon.kinesis.retrieval.fanout.experimental.ExperimentalFanOutRecordsPublisher;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import java.math.BigInteger;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
//
// This has to be in the fanout package as it accesses internal classes of the FanOutRecordsPublisher but tests
// a descendant of that class.
//
@RunWith(MockitoJUnitRunner.class)
public class ExperimentalFanOutRecordsPublisherTest {
private static final String SHARD_ID = "shardId-000000000001";
private static final String CONSUMER_ARN = "arn:consumer";
@Mock
private KinesisAsyncClient kinesisClient;
@Mock
private SdkPublisher<SubscribeToShardEventStream> publisher;
@Mock
private Subscription subscription;
@Test
public void mismatchedShardIdTest() {
FanOutRecordsPublisher source = new ExperimentalFanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN);
ArgumentCaptor<FanOutRecordsPublisher.RecordSubscription> captor = ArgumentCaptor
.forClass(FanOutRecordsPublisher.RecordSubscription.class);
ArgumentCaptor<FanOutRecordsPublisher.RecordFlow> flowCaptor = ArgumentCaptor
.forClass(FanOutRecordsPublisher.RecordFlow.class);
doNothing().when(publisher).subscribe(captor.capture());
source.start(ExtendedSequenceNumber.LATEST,
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST));
List<Throwable> errorsHandled = new ArrayList<>();
List<ProcessRecordsInput> inputsReceived = new ArrayList<>();
source.subscribe(new Subscriber<ProcessRecordsInput>() {
Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
subscription = s;
subscription.request(1);
}
@Override
public void onNext(ProcessRecordsInput input) {
inputsReceived.add(input);
}
@Override
public void onError(Throwable t) {
errorsHandled.add(t);
}
@Override
public void onComplete() {
fail("OnComplete called when not expected");
}
});
verify(kinesisClient).subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture());
flowCaptor.getValue().onEventStream(publisher);
captor.getValue().onSubscribe(subscription);
List<Record> records = Stream.of(1, 2, 3).map(seq -> FanOutRecordsPublisherTest.makeRecord(seq, seq)).collect(Collectors.toList());
SubscribeToShardEvent batchEvent = SubscribeToShardEvent.builder().millisBehindLatest(100L).records(records).build();
captor.getValue().onNext(batchEvent);
verify(subscription, times(1)).request(1);
assertThat(inputsReceived.size(), equalTo(0));
assertThat(errorsHandled.size(), equalTo(1));
assertThat(errorsHandled.get(0), instanceOf(IllegalArgumentException.class));
assertThat(errorsHandled.get(0).getMessage(), containsString("Received records destined for different shards"));
}
}

View file

@ -70,8 +70,7 @@ public class FanOutRecordsPublisherTest {
@Test
public void simpleTest() throws Exception {
FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN,
VALIDATE_RECORD_SHARD_MATCHING);
FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN);
ArgumentCaptor<FanOutRecordsPublisher.RecordSubscription> captor = ArgumentCaptor.forClass(FanOutRecordsPublisher.RecordSubscription.class);
ArgumentCaptor<FanOutRecordsPublisher.RecordFlow> flowCaptor = ArgumentCaptor
@ -138,8 +137,7 @@ public class FanOutRecordsPublisherTest {
@Test
public void largeRequestTest() throws Exception {
FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN,
VALIDATE_RECORD_SHARD_MATCHING);
FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN);
ArgumentCaptor<FanOutRecordsPublisher.RecordSubscription> captor = ArgumentCaptor.forClass(FanOutRecordsPublisher.RecordSubscription.class);
ArgumentCaptor<FanOutRecordsPublisher.RecordFlow> flowCaptor = ArgumentCaptor
@ -206,8 +204,7 @@ public class FanOutRecordsPublisherTest {
@Test
public void testResourceNotFoundForShard() {
FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN,
VALIDATE_RECORD_SHARD_MATCHING);
FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN);
ArgumentCaptor<FanOutRecordsPublisher.RecordFlow> flowCaptor = ArgumentCaptor
.forClass(FanOutRecordsPublisher.RecordFlow.class);
@ -231,8 +228,7 @@ public class FanOutRecordsPublisherTest {
@Test
public void testContinuesAfterSequence() {
FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN,
VALIDATE_RECORD_SHARD_MATCHING);
FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN);
ArgumentCaptor<FanOutRecordsPublisher.RecordSubscription> captor = ArgumentCaptor
.forClass(FanOutRecordsPublisher.RecordSubscription.class);
@ -305,65 +301,7 @@ public class FanOutRecordsPublisherTest {
}
@Test
public void mismatchedShardIdTest() {
FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN, true);
ArgumentCaptor<FanOutRecordsPublisher.RecordSubscription> captor = ArgumentCaptor
.forClass(FanOutRecordsPublisher.RecordSubscription.class);
ArgumentCaptor<FanOutRecordsPublisher.RecordFlow> flowCaptor = ArgumentCaptor
.forClass(FanOutRecordsPublisher.RecordFlow.class);
doNothing().when(publisher).subscribe(captor.capture());
source.start(ExtendedSequenceNumber.LATEST,
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST));
List<Throwable> errorsHandled = new ArrayList<>();
List<ProcessRecordsInput> inputsReceived = new ArrayList<>();
source.subscribe(new Subscriber<ProcessRecordsInput>() {
Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
subscription = s;
subscription.request(1);
}
@Override
public void onNext(ProcessRecordsInput input) {
inputsReceived.add(input);
}
@Override
public void onError(Throwable t) {
errorsHandled.add(t);
}
@Override
public void onComplete() {
fail("OnComplete called when not expected");
}
});
verify(kinesisClient).subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture());
flowCaptor.getValue().onEventStream(publisher);
captor.getValue().onSubscribe(subscription);
List<Record> records = Stream.of(1, 2, 3).map(seq -> makeRecord(seq, seq)).collect(Collectors.toList());
batchEvent = SubscribeToShardEvent.builder().millisBehindLatest(100L).records(records).build();
captor.getValue().onNext(batchEvent);
verify(subscription, times(1)).request(1);
assertThat(inputsReceived.size(), equalTo(0));
assertThat(errorsHandled.size(), equalTo(1));
assertThat(errorsHandled.get(0), instanceOf(IllegalArgumentException.class));
assertThat(errorsHandled.get(0).getMessage(), containsString("Received records destined for different shards"));
}
private void verifyRecords(List<KinesisClientRecord> clientRecordsList, List<KinesisClientRecordMatcher> matchers) {
assertThat(clientRecordsList.size(), equalTo(matchers.size()));
@ -404,7 +342,7 @@ public class FanOutRecordsPublisherTest {
return makeRecord(sequenceNumber, 1);
}
private Record makeRecord(int sequenceNumber, int shardId) {
static Record makeRecord(int sequenceNumber, int shardId) {
BigInteger version = BigInteger.valueOf(2).shiftLeft(184);
BigInteger shard = BigInteger.valueOf(shardId).shiftLeft(4);
BigInteger seq = version.add(shard).add(BigInteger.valueOf(sequenceNumber));

View file

@ -1,53 +0,0 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.config;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
/**
* Provide Date property.
*/
class DatePropertyValueDecoder implements IPropertyValueDecoder<Date> {
/**
* Constructor.
*/
DatePropertyValueDecoder() {
}
/**
* @param value property value as String
* @return corresponding variable in correct type
*/
@Override
public Date decodeValue(String value) {
try {
return new Date(Long.parseLong(value) * 1000L);
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Date property value must be numeric.");
}
}
/**
* @return list of supported types
*/
@Override
public List<Class<Date>> getSupportedTypes() {
return Arrays.asList(Date.class);
}
}

View file

@ -1,8 +0,0 @@
log4j.rootLogger=INFO, A1
log4j.appender.A1=org.apache.log4j.ConsoleAppender
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
# Print the date in ISO 8601 format
log4j.appender.A1.layout.ConversionPattern=%d [%t] %-5p %c - %m%n
log4j.logger.org.apache.http=WARN