From 592499f7bc3cc1e23580688c82a3050913a62676 Mon Sep 17 00:00:00 2001 From: Justin Pfifer Date: Tue, 18 Sep 2018 15:05:36 -0700 Subject: [PATCH] Experimental support for sequence number validation in the publisher (#401) * Moved sequence number validation to an experimental feature Moved the sequence number validation to become an experimental feature that can be removed in the future. Added an annotation for experimental features. * Delete merge conflict again? * Add some reminder that this stuff is experimental * Added a reason field, and some reasons Added a reason value to the annotation, and updated two of the unusual places. --- .../KinesisClientExperimental.java | 26 ++++ .../retrieval/fanout/FanOutConfig.java | 12 +- .../fanout/FanOutRecordsPublisher.java | 83 ++++------- .../fanout/FanOutRetrievalFactory.java | 7 +- .../ExperimentalFanOutConfig.java | 38 +++++ .../ExperimentalFanOutRecordsPublisher.java | 76 ++++++++++ .../ExperimentalFanOutRetrievalFactory.java | 46 ++++++ ...xperimentalFanOutRecordsPublisherTest.java | 136 ++++++++++++++++++ .../fanout/FanOutRecordsPublisherTest.java | 72 +--------- .../config/DatePropertyValueDecorder.java | 53 ------- src/test/resources/log4j.properties | 8 -- 11 files changed, 357 insertions(+), 200 deletions(-) create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/annotations/KinesisClientExperimental.java create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/experimental/ExperimentalFanOutConfig.java create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/experimental/ExperimentalFanOutRecordsPublisher.java create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/experimental/ExperimentalFanOutRetrievalFactory.java create mode 100644 amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/ExperimentalFanOutRecordsPublisherTest.java delete mode 100644 src/main/java/com/amazonaws/services/kinesis/clientlibrary/config/DatePropertyValueDecorder.java delete mode 100644 src/test/resources/log4j.properties diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/annotations/KinesisClientExperimental.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/annotations/KinesisClientExperimental.java new file mode 100644 index 00000000..c5191ab1 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/annotations/KinesisClientExperimental.java @@ -0,0 +1,26 @@ +/* + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package software.amazon.kinesis.annotations; + +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; + +/** + * Anything marked as experimental may be removed at any time without warning. + */ +@Retention(RetentionPolicy.CLASS) +public @interface KinesisClientExperimental { + String reason() default ""; +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutConfig.java index 173a871e..9b1146ca 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutConfig.java @@ -26,6 +26,7 @@ import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.retrieval.RetrievalFactory; import software.amazon.kinesis.retrieval.RetrievalSpecificConfig; +import software.amazon.kinesis.annotations.KinesisClientExperimental; @Data @Accessors(fluent = true) @@ -80,18 +81,13 @@ public class FanOutConfig implements RetrievalSpecificConfig { */ private long retryBackoffMillis = 1000; - /** - * Controls whether the {@link FanOutRecordsPublisher} will validate that all the records are from the shard it's - * processing. - */ - private boolean validateRecordsAreForShard = false; - @Override public RetrievalFactory retrievalFactory() { - return new FanOutRetrievalFactory(kinesisClient, getOrCreateConsumerArn()).validateRecordsAreForShard(validateRecordsAreForShard); + return new FanOutRetrievalFactory(kinesisClient, getOrCreateConsumerArn()); } - private String getOrCreateConsumerArn() { + @KinesisClientExperimental(reason = "Experimentally changed from private to protected") + protected String getOrCreateConsumerArn() { if (consumerArn != null) { return consumerArn; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java index bb9c0c75..5ed51710 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java @@ -18,13 +18,10 @@ package software.amazon.kinesis.retrieval.fanout; import java.time.Instant; import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Function; import java.util.stream.Collectors; -import org.apache.commons.lang3.StringUtils; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; @@ -40,13 +37,13 @@ import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponse; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; import software.amazon.kinesis.annotations.KinesisClientInternalApi; -import software.amazon.kinesis.checkpoint.SequenceNumberValidator; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.KinesisRequestsBuilder; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.retrieval.IteratorBuilder; import software.amazon.kinesis.retrieval.KinesisClientRecord; import software.amazon.kinesis.retrieval.RecordsPublisher; +import software.amazon.kinesis.annotations.KinesisClientExperimental; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; @Slf4j @@ -56,14 +53,9 @@ public class FanOutRecordsPublisher implements RecordsPublisher { private final KinesisAsyncClient kinesis; private final String shardId; private final String consumerArn; - private final boolean validateRecordShardMatching; /** * Creates a new FanOutRecordsPublisher. - *

- * This is deprecated and will be removed in a later release. Use - * {@link #FanOutRecordsPublisher(KinesisAsyncClient, String, String, boolean)} instead - *

* * @param kinesis * the kinesis client to use for requests @@ -72,17 +64,10 @@ public class FanOutRecordsPublisher implements RecordsPublisher { * @param consumerArn * the consumer to use when retrieving records */ - @Deprecated public FanOutRecordsPublisher(KinesisAsyncClient kinesis, String shardId, String consumerArn) { - this(kinesis, shardId, consumerArn, false); - } - - public FanOutRecordsPublisher(KinesisAsyncClient kinesis, String shardId, String consumerArn, - boolean validateRecordShardMatching) { this.kinesis = kinesis; this.shardId = shardId; this.consumerArn = consumerArn; - this.validateRecordShardMatching = validateRecordShardMatching; } private final Object lockObject = new Object(); @@ -383,7 +368,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher { } subscriber = null; if (flow != null) { - log.debug("{}: [SubscriptionLifetime]: (FanOutRecordsPublisher/Subscription#cancel) @ {} id: {}", + log.debug( + "{}: [SubscriptionLifetime]: (FanOutRecordsPublisher/Subscription#cancel) @ {} id: {}", shardId, flow.connectionStartedAt, flow.subscribeToShardId); flow.cancel(); availableQueueSpace = 0; @@ -408,6 +394,20 @@ public class FanOutRecordsPublisher implements RecordsPublisher { } } + /** + * Allows validating records received from SubscribeToShard + * + * @param shardId + * the shardId the records should be from + * @param event + * the SubscribeToShard event that was received. + * @throws IllegalArgumentException + * if the records are invalid. This will trigger an error response upwards + */ + @KinesisClientExperimental(reason = "Allows providing a validation function with minimal changes") + protected void validateRecords(String shardId, SubscribeToShardEvent event) { + } + private void rejectSubscription(SdkPublisher publisher) { publisher.subscribe(new Subscriber() { Subscription localSub; @@ -588,7 +588,6 @@ public class FanOutRecordsPublisher implements RecordsPublisher { private final RecordFlow flow; private final Instant connectionStartedAt; private final String subscribeToShardId; - private final SequenceNumberValidator sequenceNumberValidator = new SequenceNumberValidator(); private Subscription subscription; @@ -657,7 +656,14 @@ public class FanOutRecordsPublisher implements RecordsPublisher { recordBatchEvent.accept(new SubscribeToShardResponseHandler.Visitor() { @Override public void visit(SubscribeToShardEvent event) { - if (parent.validateRecordShardMatching && !areRecordsValid(event)) { + try { + parent.validateRecords(parent.shardId, event); + } catch (IllegalArgumentException iae) { + log.debug( + "{}: [SubscriptionLifetime]: (RecordSubscription#onNext#vistor) @ {} id: {} (Subscription ObjectId: {}) -- Failing subscription due to mismatches: [ {} ]", + parent.shardId, connectionStartedAt, subscribeToShardId, + System.identityHashCode(subscription), iae.getMessage()); + parent.errorOccurred(flow, iae); return; } flow.recordsReceived(event); @@ -666,45 +672,6 @@ public class FanOutRecordsPublisher implements RecordsPublisher { } } - private boolean areRecordsValid(SubscribeToShardEvent event) { - try { - Map mismatchedRecords = recordsNotForShard(event); - if (mismatchedRecords.size() > 0) { - String mismatchReport = mismatchedRecords.entrySet().stream() - .map(e -> String.format("(%s -> %d)", e.getKey(), e.getValue())) - .collect(Collectors.joining(", ")); - log.debug( - "{}: [SubscriptionLifetime]: (RecordSubscription#onNext#vistor) @ {} id: {} (Subscription ObjectId: {}) -- Failing subscription due to mismatches: [ {} ]", - parent.shardId, connectionStartedAt, subscribeToShardId, - System.identityHashCode(subscription), mismatchReport); - parent.errorOccurred(flow, new IllegalArgumentException( - "Received records destined for different shards: " + mismatchReport)); - return false; - } - } catch (IllegalArgumentException iae) { - log.debug( - "{}: [SubscriptionLifetime]: (RecordSubscription#onNext#vistor) @ {} id: {} (Subscription ObjectId: {}) -- " - + "A problem occurred while validating sequence numbers: {} on subscription {}", - parent.shardId, connectionStartedAt, subscribeToShardId, System.identityHashCode(subscription), - iae.getMessage(), iae); - parent.errorOccurred(flow, iae); - return false; - } - return true; - } - - private Map recordsNotForShard(SubscribeToShardEvent event) { - return event.records().stream().map(r -> { - Optional res = sequenceNumberValidator.shardIdFor(r.sequenceNumber()); - if (!res.isPresent()) { - throw new IllegalArgumentException("Unable to validate sequence number of " + r.sequenceNumber()); - } - return res.get(); - }).filter(s -> !StringUtils.equalsIgnoreCase(s, parent.shardId)) - .collect(Collectors.groupingBy(Function.identity())).entrySet().stream() - .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().size())); - } - @Override public void onError(Throwable t) { log.debug("{}: [SubscriptionLifetime]: (RecordSubscription#onError) @ {} id: {} -- {}: {}", parent.shardId, diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java index 46be077f..02831a27 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java @@ -15,10 +15,8 @@ package software.amazon.kinesis.retrieval.fanout; -import lombok.Getter; import lombok.NonNull; import lombok.RequiredArgsConstructor; -import lombok.Setter; import lombok.experimental.Accessors; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.annotations.KinesisClientInternalApi; @@ -35,9 +33,6 @@ public class FanOutRetrievalFactory implements RetrievalFactory { private final KinesisAsyncClient kinesisClient; private final String consumerArn; - @Getter - @Setter - private boolean validateRecordsAreForShard = false; @Override public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(final ShardInfo shardInfo, @@ -48,6 +43,6 @@ public class FanOutRetrievalFactory implements RetrievalFactory { @Override public RecordsPublisher createGetRecordsCache(@NonNull final ShardInfo shardInfo, final MetricsFactory metricsFactory) { - return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(), consumerArn, validateRecordsAreForShard); + return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(), consumerArn); } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/experimental/ExperimentalFanOutConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/experimental/ExperimentalFanOutConfig.java new file mode 100644 index 00000000..f2d7102e --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/experimental/ExperimentalFanOutConfig.java @@ -0,0 +1,38 @@ +/* + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package software.amazon.kinesis.retrieval.fanout.experimental; + +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.kinesis.annotations.KinesisClientExperimental; +import software.amazon.kinesis.retrieval.RetrievalFactory; +import software.amazon.kinesis.retrieval.fanout.FanOutConfig; + +/** + * Enables validation of sequence number for every received record. + * + *

This is an experimental class and may be removed at any time

+ */ +@KinesisClientExperimental +public class ExperimentalFanOutConfig extends FanOutConfig { + + public ExperimentalFanOutConfig(KinesisAsyncClient kinesisClient) { + super(kinesisClient); + } + + @Override + public RetrievalFactory retrievalFactory() { + return new ExperimentalFanOutRetrievalFactory(kinesisClient(), getOrCreateConsumerArn()); + } +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/experimental/ExperimentalFanOutRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/experimental/ExperimentalFanOutRecordsPublisher.java new file mode 100644 index 00000000..1ef645e2 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/experimental/ExperimentalFanOutRecordsPublisher.java @@ -0,0 +1,76 @@ +/* + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package software.amazon.kinesis.retrieval.fanout.experimental; + +import java.util.Map; +import java.util.Optional; +import java.util.function.Function; +import java.util.stream.Collectors; + +import org.apache.commons.lang3.StringUtils; + +import lombok.extern.slf4j.Slf4j; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; +import software.amazon.kinesis.annotations.KinesisClientExperimental; +import software.amazon.kinesis.checkpoint.SequenceNumberValidator; +import software.amazon.kinesis.retrieval.fanout.FanOutRecordsPublisher; + +/** + * A variation of {@link FanOutRecordsPublisher} that provides validation of every record received by the publisher. + * + *

This is an experimental class and may be removed at any time

+ */ +@Slf4j +@KinesisClientExperimental +public class ExperimentalFanOutRecordsPublisher extends FanOutRecordsPublisher { + + private final SequenceNumberValidator sequenceNumberValidator = new SequenceNumberValidator(); + + /** + * Creates a new FanOutRecordsPublisher. + * + * @param kinesis + * the kinesis client to use for requests + * @param shardId + * the shardId to retrieve records for + * @param consumerArn + */ + public ExperimentalFanOutRecordsPublisher(KinesisAsyncClient kinesis, String shardId, String consumerArn) { + super(kinesis, shardId, consumerArn); + } + + @Override + protected void validateRecords(String shardId, SubscribeToShardEvent event) { + Map mismatchedRecords = recordsNotForShard(shardId, event); + if (mismatchedRecords.size() > 0) { + String mismatchReport = mismatchedRecords.entrySet().stream() + .map(e -> String.format("(%s -> %d)", e.getKey(), e.getValue())).collect(Collectors.joining(", ")); + throw new IllegalArgumentException("Received records destined for different shards: " + mismatchReport); + } + + } + + private Map recordsNotForShard(String shardId, SubscribeToShardEvent event) { + return event.records().stream().map(r -> { + Optional res = sequenceNumberValidator.shardIdFor(r.sequenceNumber()); + if (!res.isPresent()) { + throw new IllegalArgumentException("Unable to validate sequence number of " + r.sequenceNumber()); + } + return res.get(); + }).filter(s -> !StringUtils.equalsIgnoreCase(s, shardId)).collect(Collectors.groupingBy(Function.identity())) + .entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().size())); + } +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/experimental/ExperimentalFanOutRetrievalFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/experimental/ExperimentalFanOutRetrievalFactory.java new file mode 100644 index 00000000..a112ede8 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/experimental/ExperimentalFanOutRetrievalFactory.java @@ -0,0 +1,46 @@ +/* + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package software.amazon.kinesis.retrieval.fanout.experimental; + +import lombok.RequiredArgsConstructor; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.kinesis.annotations.KinesisClientExperimental; +import software.amazon.kinesis.leases.ShardInfo; +import software.amazon.kinesis.metrics.MetricsFactory; +import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; +import software.amazon.kinesis.retrieval.RecordsPublisher; +import software.amazon.kinesis.retrieval.RetrievalFactory; + +/** + * Supports validating that records did originate from the shard. + * + *

This is an experimental class and may be removed at any time

+ */ +@RequiredArgsConstructor +@KinesisClientExperimental +public class ExperimentalFanOutRetrievalFactory implements RetrievalFactory { + private final KinesisAsyncClient kinesisClient; + private final String consumerArn; + + @Override + public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(ShardInfo shardInfo, MetricsFactory metricsFactory) { + return null; + } + + @Override + public RecordsPublisher createGetRecordsCache(ShardInfo shardInfo, MetricsFactory metricsFactory) { + return new ExperimentalFanOutRecordsPublisher(kinesisClient, shardInfo.shardId(), consumerArn); + } +} diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/ExperimentalFanOutRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/ExperimentalFanOutRecordsPublisherTest.java new file mode 100644 index 00000000..73c5be27 --- /dev/null +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/ExperimentalFanOutRecordsPublisherTest.java @@ -0,0 +1,136 @@ +/* + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package software.amazon.kinesis.retrieval.fanout; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.core.async.SdkPublisher; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.model.Record; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; +import software.amazon.kinesis.common.InitialPositionInStream; +import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; +import software.amazon.kinesis.retrieval.fanout.FanOutRecordsPublisher; +import software.amazon.kinesis.retrieval.fanout.experimental.ExperimentalFanOutRecordsPublisher; +import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; + +import java.math.BigInteger; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +// +// This has to be in the fanout package as it accesses internal classes of the FanOutRecordsPublisher but tests +// a descendant of that class. +// + +@RunWith(MockitoJUnitRunner.class) +public class ExperimentalFanOutRecordsPublisherTest { + + private static final String SHARD_ID = "shardId-000000000001"; + private static final String CONSUMER_ARN = "arn:consumer"; + + @Mock + private KinesisAsyncClient kinesisClient; + @Mock + private SdkPublisher publisher; + @Mock + private Subscription subscription; + + @Test + public void mismatchedShardIdTest() { + FanOutRecordsPublisher source = new ExperimentalFanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN); + + ArgumentCaptor captor = ArgumentCaptor + .forClass(FanOutRecordsPublisher.RecordSubscription.class); + ArgumentCaptor flowCaptor = ArgumentCaptor + .forClass(FanOutRecordsPublisher.RecordFlow.class); + + doNothing().when(publisher).subscribe(captor.capture()); + + source.start(ExtendedSequenceNumber.LATEST, + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)); + + List errorsHandled = new ArrayList<>(); + List inputsReceived = new ArrayList<>(); + + source.subscribe(new Subscriber() { + Subscription subscription; + + @Override + public void onSubscribe(Subscription s) { + subscription = s; + subscription.request(1); + } + + @Override + public void onNext(ProcessRecordsInput input) { + inputsReceived.add(input); + } + + @Override + public void onError(Throwable t) { + errorsHandled.add(t); + + } + + @Override + public void onComplete() { + fail("OnComplete called when not expected"); + } + }); + + verify(kinesisClient).subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture()); + flowCaptor.getValue().onEventStream(publisher); + captor.getValue().onSubscribe(subscription); + + List records = Stream.of(1, 2, 3).map(seq -> FanOutRecordsPublisherTest.makeRecord(seq, seq)).collect(Collectors.toList()); + + SubscribeToShardEvent batchEvent = SubscribeToShardEvent.builder().millisBehindLatest(100L).records(records).build(); + + captor.getValue().onNext(batchEvent); + + verify(subscription, times(1)).request(1); + assertThat(inputsReceived.size(), equalTo(0)); + assertThat(errorsHandled.size(), equalTo(1)); + assertThat(errorsHandled.get(0), instanceOf(IllegalArgumentException.class)); + assertThat(errorsHandled.get(0).getMessage(), containsString("Received records destined for different shards")); + } + + + + +} \ No newline at end of file diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java index 6284c6d5..26421add 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java @@ -70,8 +70,7 @@ public class FanOutRecordsPublisherTest { @Test public void simpleTest() throws Exception { - FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN, - VALIDATE_RECORD_SHARD_MATCHING); + FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN); ArgumentCaptor captor = ArgumentCaptor.forClass(FanOutRecordsPublisher.RecordSubscription.class); ArgumentCaptor flowCaptor = ArgumentCaptor @@ -138,8 +137,7 @@ public class FanOutRecordsPublisherTest { @Test public void largeRequestTest() throws Exception { - FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN, - VALIDATE_RECORD_SHARD_MATCHING); + FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN); ArgumentCaptor captor = ArgumentCaptor.forClass(FanOutRecordsPublisher.RecordSubscription.class); ArgumentCaptor flowCaptor = ArgumentCaptor @@ -206,8 +204,7 @@ public class FanOutRecordsPublisherTest { @Test public void testResourceNotFoundForShard() { - FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN, - VALIDATE_RECORD_SHARD_MATCHING); + FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN); ArgumentCaptor flowCaptor = ArgumentCaptor .forClass(FanOutRecordsPublisher.RecordFlow.class); @@ -231,8 +228,7 @@ public class FanOutRecordsPublisherTest { @Test public void testContinuesAfterSequence() { - FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN, - VALIDATE_RECORD_SHARD_MATCHING); + FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN); ArgumentCaptor captor = ArgumentCaptor .forClass(FanOutRecordsPublisher.RecordSubscription.class); @@ -305,65 +301,7 @@ public class FanOutRecordsPublisherTest { } - @Test - public void mismatchedShardIdTest() { - FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN, true); - ArgumentCaptor captor = ArgumentCaptor - .forClass(FanOutRecordsPublisher.RecordSubscription.class); - ArgumentCaptor flowCaptor = ArgumentCaptor - .forClass(FanOutRecordsPublisher.RecordFlow.class); - - doNothing().when(publisher).subscribe(captor.capture()); - - source.start(ExtendedSequenceNumber.LATEST, - InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)); - - List errorsHandled = new ArrayList<>(); - List inputsReceived = new ArrayList<>(); - - source.subscribe(new Subscriber() { - Subscription subscription; - - @Override - public void onSubscribe(Subscription s) { - subscription = s; - subscription.request(1); - } - - @Override - public void onNext(ProcessRecordsInput input) { - inputsReceived.add(input); - } - - @Override - public void onError(Throwable t) { - errorsHandled.add(t); - - } - - @Override - public void onComplete() { - fail("OnComplete called when not expected"); - } - }); - - verify(kinesisClient).subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture()); - flowCaptor.getValue().onEventStream(publisher); - captor.getValue().onSubscribe(subscription); - - List records = Stream.of(1, 2, 3).map(seq -> makeRecord(seq, seq)).collect(Collectors.toList()); - - batchEvent = SubscribeToShardEvent.builder().millisBehindLatest(100L).records(records).build(); - - captor.getValue().onNext(batchEvent); - - verify(subscription, times(1)).request(1); - assertThat(inputsReceived.size(), equalTo(0)); - assertThat(errorsHandled.size(), equalTo(1)); - assertThat(errorsHandled.get(0), instanceOf(IllegalArgumentException.class)); - assertThat(errorsHandled.get(0).getMessage(), containsString("Received records destined for different shards")); - } private void verifyRecords(List clientRecordsList, List matchers) { assertThat(clientRecordsList.size(), equalTo(matchers.size())); @@ -404,7 +342,7 @@ public class FanOutRecordsPublisherTest { return makeRecord(sequenceNumber, 1); } - private Record makeRecord(int sequenceNumber, int shardId) { + static Record makeRecord(int sequenceNumber, int shardId) { BigInteger version = BigInteger.valueOf(2).shiftLeft(184); BigInteger shard = BigInteger.valueOf(shardId).shiftLeft(4); BigInteger seq = version.add(shard).add(BigInteger.valueOf(sequenceNumber)); diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/config/DatePropertyValueDecorder.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/config/DatePropertyValueDecorder.java deleted file mode 100644 index 4f6a588c..00000000 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/config/DatePropertyValueDecorder.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/asl/ - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package com.amazonaws.services.kinesis.clientlibrary.config; - -import java.util.Arrays; -import java.util.Date; -import java.util.List; - -/** - * Provide Date property. - */ -class DatePropertyValueDecoder implements IPropertyValueDecoder { - - /** - * Constructor. - */ - DatePropertyValueDecoder() { - } - - /** - * @param value property value as String - * @return corresponding variable in correct type - */ - @Override - public Date decodeValue(String value) { - try { - return new Date(Long.parseLong(value) * 1000L); - } catch (NumberFormatException e) { - throw new IllegalArgumentException("Date property value must be numeric."); - } - } - - /** - * @return list of supported types - */ - @Override - public List> getSupportedTypes() { - return Arrays.asList(Date.class); - } - -} diff --git a/src/test/resources/log4j.properties b/src/test/resources/log4j.properties deleted file mode 100644 index 73ba669c..00000000 --- a/src/test/resources/log4j.properties +++ /dev/null @@ -1,8 +0,0 @@ -log4j.rootLogger=INFO, A1 -log4j.appender.A1=org.apache.log4j.ConsoleAppender -log4j.appender.A1.layout=org.apache.log4j.PatternLayout - -# Print the date in ISO 8601 format -log4j.appender.A1.layout.ConversionPattern=%d [%t] %-5p %c - %m%n - -log4j.logger.org.apache.http=WARN