diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/annotations/KinesisClientExperimental.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/annotations/KinesisClientExperimental.java
new file mode 100644
index 00000000..c5191ab1
--- /dev/null
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/annotations/KinesisClientExperimental.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Amazon Software License (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/asl/
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package software.amazon.kinesis.annotations;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+
+/**
+ * Anything marked as experimental may be removed at any time without warning.
+ */
+@Retention(RetentionPolicy.CLASS)
+public @interface KinesisClientExperimental {
+ String reason() default "";
+}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutConfig.java
index 173a871e..9b1146ca 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutConfig.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutConfig.java
@@ -26,6 +26,7 @@ import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.retrieval.RetrievalFactory;
import software.amazon.kinesis.retrieval.RetrievalSpecificConfig;
+import software.amazon.kinesis.annotations.KinesisClientExperimental;
@Data
@Accessors(fluent = true)
@@ -80,18 +81,13 @@ public class FanOutConfig implements RetrievalSpecificConfig {
*/
private long retryBackoffMillis = 1000;
- /**
- * Controls whether the {@link FanOutRecordsPublisher} will validate that all the records are from the shard it's
- * processing.
- */
- private boolean validateRecordsAreForShard = false;
-
@Override
public RetrievalFactory retrievalFactory() {
- return new FanOutRetrievalFactory(kinesisClient, getOrCreateConsumerArn()).validateRecordsAreForShard(validateRecordsAreForShard);
+ return new FanOutRetrievalFactory(kinesisClient, getOrCreateConsumerArn());
}
- private String getOrCreateConsumerArn() {
+ @KinesisClientExperimental(reason = "Experimentally changed from private to protected")
+ protected String getOrCreateConsumerArn() {
if (consumerArn != null) {
return consumerArn;
}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java
index bb9c0c75..5ed51710 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java
@@ -18,13 +18,10 @@ package software.amazon.kinesis.retrieval.fanout;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Function;
import java.util.stream.Collectors;
-import org.apache.commons.lang3.StringUtils;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
@@ -40,13 +37,13 @@ import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponse;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
-import software.amazon.kinesis.checkpoint.SequenceNumberValidator;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.KinesisRequestsBuilder;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.retrieval.IteratorBuilder;
import software.amazon.kinesis.retrieval.KinesisClientRecord;
import software.amazon.kinesis.retrieval.RecordsPublisher;
+import software.amazon.kinesis.annotations.KinesisClientExperimental;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
@Slf4j
@@ -56,14 +53,9 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
private final KinesisAsyncClient kinesis;
private final String shardId;
private final String consumerArn;
- private final boolean validateRecordShardMatching;
/**
* Creates a new FanOutRecordsPublisher.
- *
- * This is deprecated and will be removed in a later release. Use
- * {@link #FanOutRecordsPublisher(KinesisAsyncClient, String, String, boolean)} instead
- *
*
* @param kinesis
* the kinesis client to use for requests
@@ -72,17 +64,10 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
* @param consumerArn
* the consumer to use when retrieving records
*/
- @Deprecated
public FanOutRecordsPublisher(KinesisAsyncClient kinesis, String shardId, String consumerArn) {
- this(kinesis, shardId, consumerArn, false);
- }
-
- public FanOutRecordsPublisher(KinesisAsyncClient kinesis, String shardId, String consumerArn,
- boolean validateRecordShardMatching) {
this.kinesis = kinesis;
this.shardId = shardId;
this.consumerArn = consumerArn;
- this.validateRecordShardMatching = validateRecordShardMatching;
}
private final Object lockObject = new Object();
@@ -383,7 +368,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
}
subscriber = null;
if (flow != null) {
- log.debug("{}: [SubscriptionLifetime]: (FanOutRecordsPublisher/Subscription#cancel) @ {} id: {}",
+ log.debug(
+ "{}: [SubscriptionLifetime]: (FanOutRecordsPublisher/Subscription#cancel) @ {} id: {}",
shardId, flow.connectionStartedAt, flow.subscribeToShardId);
flow.cancel();
availableQueueSpace = 0;
@@ -408,6 +394,20 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
}
}
+ /**
+ * Allows validating records received from SubscribeToShard
+ *
+ * @param shardId
+ * the shardId the records should be from
+ * @param event
+ * the SubscribeToShard event that was received.
+ * @throws IllegalArgumentException
+ * if the records are invalid. This will trigger an error response upwards
+ */
+ @KinesisClientExperimental(reason = "Allows providing a validation function with minimal changes")
+ protected void validateRecords(String shardId, SubscribeToShardEvent event) {
+ }
+
private void rejectSubscription(SdkPublisher publisher) {
publisher.subscribe(new Subscriber() {
Subscription localSub;
@@ -588,7 +588,6 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
private final RecordFlow flow;
private final Instant connectionStartedAt;
private final String subscribeToShardId;
- private final SequenceNumberValidator sequenceNumberValidator = new SequenceNumberValidator();
private Subscription subscription;
@@ -657,7 +656,14 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
recordBatchEvent.accept(new SubscribeToShardResponseHandler.Visitor() {
@Override
public void visit(SubscribeToShardEvent event) {
- if (parent.validateRecordShardMatching && !areRecordsValid(event)) {
+ try {
+ parent.validateRecords(parent.shardId, event);
+ } catch (IllegalArgumentException iae) {
+ log.debug(
+ "{}: [SubscriptionLifetime]: (RecordSubscription#onNext#vistor) @ {} id: {} (Subscription ObjectId: {}) -- Failing subscription due to mismatches: [ {} ]",
+ parent.shardId, connectionStartedAt, subscribeToShardId,
+ System.identityHashCode(subscription), iae.getMessage());
+ parent.errorOccurred(flow, iae);
return;
}
flow.recordsReceived(event);
@@ -666,45 +672,6 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
}
}
- private boolean areRecordsValid(SubscribeToShardEvent event) {
- try {
- Map mismatchedRecords = recordsNotForShard(event);
- if (mismatchedRecords.size() > 0) {
- String mismatchReport = mismatchedRecords.entrySet().stream()
- .map(e -> String.format("(%s -> %d)", e.getKey(), e.getValue()))
- .collect(Collectors.joining(", "));
- log.debug(
- "{}: [SubscriptionLifetime]: (RecordSubscription#onNext#vistor) @ {} id: {} (Subscription ObjectId: {}) -- Failing subscription due to mismatches: [ {} ]",
- parent.shardId, connectionStartedAt, subscribeToShardId,
- System.identityHashCode(subscription), mismatchReport);
- parent.errorOccurred(flow, new IllegalArgumentException(
- "Received records destined for different shards: " + mismatchReport));
- return false;
- }
- } catch (IllegalArgumentException iae) {
- log.debug(
- "{}: [SubscriptionLifetime]: (RecordSubscription#onNext#vistor) @ {} id: {} (Subscription ObjectId: {}) -- "
- + "A problem occurred while validating sequence numbers: {} on subscription {}",
- parent.shardId, connectionStartedAt, subscribeToShardId, System.identityHashCode(subscription),
- iae.getMessage(), iae);
- parent.errorOccurred(flow, iae);
- return false;
- }
- return true;
- }
-
- private Map recordsNotForShard(SubscribeToShardEvent event) {
- return event.records().stream().map(r -> {
- Optional res = sequenceNumberValidator.shardIdFor(r.sequenceNumber());
- if (!res.isPresent()) {
- throw new IllegalArgumentException("Unable to validate sequence number of " + r.sequenceNumber());
- }
- return res.get();
- }).filter(s -> !StringUtils.equalsIgnoreCase(s, parent.shardId))
- .collect(Collectors.groupingBy(Function.identity())).entrySet().stream()
- .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().size()));
- }
-
@Override
public void onError(Throwable t) {
log.debug("{}: [SubscriptionLifetime]: (RecordSubscription#onError) @ {} id: {} -- {}: {}", parent.shardId,
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java
index 46be077f..02831a27 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java
@@ -15,10 +15,8 @@
package software.amazon.kinesis.retrieval.fanout;
-import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
-import lombok.Setter;
import lombok.experimental.Accessors;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
@@ -35,9 +33,6 @@ public class FanOutRetrievalFactory implements RetrievalFactory {
private final KinesisAsyncClient kinesisClient;
private final String consumerArn;
- @Getter
- @Setter
- private boolean validateRecordsAreForShard = false;
@Override
public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(final ShardInfo shardInfo,
@@ -48,6 +43,6 @@ public class FanOutRetrievalFactory implements RetrievalFactory {
@Override
public RecordsPublisher createGetRecordsCache(@NonNull final ShardInfo shardInfo,
final MetricsFactory metricsFactory) {
- return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(), consumerArn, validateRecordsAreForShard);
+ return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(), consumerArn);
}
}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/experimental/ExperimentalFanOutConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/experimental/ExperimentalFanOutConfig.java
new file mode 100644
index 00000000..f2d7102e
--- /dev/null
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/experimental/ExperimentalFanOutConfig.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Amazon Software License (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/asl/
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package software.amazon.kinesis.retrieval.fanout.experimental;
+
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.kinesis.annotations.KinesisClientExperimental;
+import software.amazon.kinesis.retrieval.RetrievalFactory;
+import software.amazon.kinesis.retrieval.fanout.FanOutConfig;
+
+/**
+ * Enables validation of sequence number for every received record.
+ *
+ * This is an experimental class and may be removed at any time
+ */
+@KinesisClientExperimental
+public class ExperimentalFanOutConfig extends FanOutConfig {
+
+ public ExperimentalFanOutConfig(KinesisAsyncClient kinesisClient) {
+ super(kinesisClient);
+ }
+
+ @Override
+ public RetrievalFactory retrievalFactory() {
+ return new ExperimentalFanOutRetrievalFactory(kinesisClient(), getOrCreateConsumerArn());
+ }
+}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/experimental/ExperimentalFanOutRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/experimental/ExperimentalFanOutRecordsPublisher.java
new file mode 100644
index 00000000..1ef645e2
--- /dev/null
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/experimental/ExperimentalFanOutRecordsPublisher.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Amazon Software License (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/asl/
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package software.amazon.kinesis.retrieval.fanout.experimental;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang3.StringUtils;
+
+import lombok.extern.slf4j.Slf4j;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
+import software.amazon.kinesis.annotations.KinesisClientExperimental;
+import software.amazon.kinesis.checkpoint.SequenceNumberValidator;
+import software.amazon.kinesis.retrieval.fanout.FanOutRecordsPublisher;
+
+/**
+ * A variation of {@link FanOutRecordsPublisher} that provides validation of every record received by the publisher.
+ *
+ * This is an experimental class and may be removed at any time
+ */
+@Slf4j
+@KinesisClientExperimental
+public class ExperimentalFanOutRecordsPublisher extends FanOutRecordsPublisher {
+
+ private final SequenceNumberValidator sequenceNumberValidator = new SequenceNumberValidator();
+
+ /**
+ * Creates a new FanOutRecordsPublisher.
+ *
+ * @param kinesis
+ * the kinesis client to use for requests
+ * @param shardId
+ * the shardId to retrieve records for
+ * @param consumerArn
+ */
+ public ExperimentalFanOutRecordsPublisher(KinesisAsyncClient kinesis, String shardId, String consumerArn) {
+ super(kinesis, shardId, consumerArn);
+ }
+
+ @Override
+ protected void validateRecords(String shardId, SubscribeToShardEvent event) {
+ Map mismatchedRecords = recordsNotForShard(shardId, event);
+ if (mismatchedRecords.size() > 0) {
+ String mismatchReport = mismatchedRecords.entrySet().stream()
+ .map(e -> String.format("(%s -> %d)", e.getKey(), e.getValue())).collect(Collectors.joining(", "));
+ throw new IllegalArgumentException("Received records destined for different shards: " + mismatchReport);
+ }
+
+ }
+
+ private Map recordsNotForShard(String shardId, SubscribeToShardEvent event) {
+ return event.records().stream().map(r -> {
+ Optional res = sequenceNumberValidator.shardIdFor(r.sequenceNumber());
+ if (!res.isPresent()) {
+ throw new IllegalArgumentException("Unable to validate sequence number of " + r.sequenceNumber());
+ }
+ return res.get();
+ }).filter(s -> !StringUtils.equalsIgnoreCase(s, shardId)).collect(Collectors.groupingBy(Function.identity()))
+ .entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().size()));
+ }
+}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/experimental/ExperimentalFanOutRetrievalFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/experimental/ExperimentalFanOutRetrievalFactory.java
new file mode 100644
index 00000000..a112ede8
--- /dev/null
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/experimental/ExperimentalFanOutRetrievalFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Amazon Software License (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/asl/
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package software.amazon.kinesis.retrieval.fanout.experimental;
+
+import lombok.RequiredArgsConstructor;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.kinesis.annotations.KinesisClientExperimental;
+import software.amazon.kinesis.leases.ShardInfo;
+import software.amazon.kinesis.metrics.MetricsFactory;
+import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
+import software.amazon.kinesis.retrieval.RecordsPublisher;
+import software.amazon.kinesis.retrieval.RetrievalFactory;
+
+/**
+ * Supports validating that records did originate from the shard.
+ *
+ * This is an experimental class and may be removed at any time
+ */
+@RequiredArgsConstructor
+@KinesisClientExperimental
+public class ExperimentalFanOutRetrievalFactory implements RetrievalFactory {
+ private final KinesisAsyncClient kinesisClient;
+ private final String consumerArn;
+
+ @Override
+ public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(ShardInfo shardInfo, MetricsFactory metricsFactory) {
+ return null;
+ }
+
+ @Override
+ public RecordsPublisher createGetRecordsCache(ShardInfo shardInfo, MetricsFactory metricsFactory) {
+ return new ExperimentalFanOutRecordsPublisher(kinesisClient, shardInfo.shardId(), consumerArn);
+ }
+}
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/ExperimentalFanOutRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/ExperimentalFanOutRecordsPublisherTest.java
new file mode 100644
index 00000000..73c5be27
--- /dev/null
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/ExperimentalFanOutRecordsPublisherTest.java
@@ -0,0 +1,136 @@
+/*
+ * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Amazon Software License (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/asl/
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package software.amazon.kinesis.retrieval.fanout;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.core.async.SdkPublisher;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.model.Record;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
+import software.amazon.kinesis.common.InitialPositionInStream;
+import software.amazon.kinesis.common.InitialPositionInStreamExtended;
+import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
+import software.amazon.kinesis.retrieval.fanout.FanOutRecordsPublisher;
+import software.amazon.kinesis.retrieval.fanout.experimental.ExperimentalFanOutRecordsPublisher;
+import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
+
+import java.math.BigInteger;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+//
+// This has to be in the fanout package as it accesses internal classes of the FanOutRecordsPublisher but tests
+// a descendant of that class.
+//
+
+@RunWith(MockitoJUnitRunner.class)
+public class ExperimentalFanOutRecordsPublisherTest {
+
+ private static final String SHARD_ID = "shardId-000000000001";
+ private static final String CONSUMER_ARN = "arn:consumer";
+
+ @Mock
+ private KinesisAsyncClient kinesisClient;
+ @Mock
+ private SdkPublisher publisher;
+ @Mock
+ private Subscription subscription;
+
+ @Test
+ public void mismatchedShardIdTest() {
+ FanOutRecordsPublisher source = new ExperimentalFanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN);
+
+ ArgumentCaptor captor = ArgumentCaptor
+ .forClass(FanOutRecordsPublisher.RecordSubscription.class);
+ ArgumentCaptor flowCaptor = ArgumentCaptor
+ .forClass(FanOutRecordsPublisher.RecordFlow.class);
+
+ doNothing().when(publisher).subscribe(captor.capture());
+
+ source.start(ExtendedSequenceNumber.LATEST,
+ InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST));
+
+ List errorsHandled = new ArrayList<>();
+ List inputsReceived = new ArrayList<>();
+
+ source.subscribe(new Subscriber() {
+ Subscription subscription;
+
+ @Override
+ public void onSubscribe(Subscription s) {
+ subscription = s;
+ subscription.request(1);
+ }
+
+ @Override
+ public void onNext(ProcessRecordsInput input) {
+ inputsReceived.add(input);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ errorsHandled.add(t);
+
+ }
+
+ @Override
+ public void onComplete() {
+ fail("OnComplete called when not expected");
+ }
+ });
+
+ verify(kinesisClient).subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture());
+ flowCaptor.getValue().onEventStream(publisher);
+ captor.getValue().onSubscribe(subscription);
+
+ List records = Stream.of(1, 2, 3).map(seq -> FanOutRecordsPublisherTest.makeRecord(seq, seq)).collect(Collectors.toList());
+
+ SubscribeToShardEvent batchEvent = SubscribeToShardEvent.builder().millisBehindLatest(100L).records(records).build();
+
+ captor.getValue().onNext(batchEvent);
+
+ verify(subscription, times(1)).request(1);
+ assertThat(inputsReceived.size(), equalTo(0));
+ assertThat(errorsHandled.size(), equalTo(1));
+ assertThat(errorsHandled.get(0), instanceOf(IllegalArgumentException.class));
+ assertThat(errorsHandled.get(0).getMessage(), containsString("Received records destined for different shards"));
+ }
+
+
+
+
+}
\ No newline at end of file
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java
index 6284c6d5..26421add 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java
@@ -70,8 +70,7 @@ public class FanOutRecordsPublisherTest {
@Test
public void simpleTest() throws Exception {
- FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN,
- VALIDATE_RECORD_SHARD_MATCHING);
+ FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN);
ArgumentCaptor captor = ArgumentCaptor.forClass(FanOutRecordsPublisher.RecordSubscription.class);
ArgumentCaptor flowCaptor = ArgumentCaptor
@@ -138,8 +137,7 @@ public class FanOutRecordsPublisherTest {
@Test
public void largeRequestTest() throws Exception {
- FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN,
- VALIDATE_RECORD_SHARD_MATCHING);
+ FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN);
ArgumentCaptor captor = ArgumentCaptor.forClass(FanOutRecordsPublisher.RecordSubscription.class);
ArgumentCaptor flowCaptor = ArgumentCaptor
@@ -206,8 +204,7 @@ public class FanOutRecordsPublisherTest {
@Test
public void testResourceNotFoundForShard() {
- FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN,
- VALIDATE_RECORD_SHARD_MATCHING);
+ FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN);
ArgumentCaptor flowCaptor = ArgumentCaptor
.forClass(FanOutRecordsPublisher.RecordFlow.class);
@@ -231,8 +228,7 @@ public class FanOutRecordsPublisherTest {
@Test
public void testContinuesAfterSequence() {
- FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN,
- VALIDATE_RECORD_SHARD_MATCHING);
+ FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN);
ArgumentCaptor captor = ArgumentCaptor
.forClass(FanOutRecordsPublisher.RecordSubscription.class);
@@ -305,65 +301,7 @@ public class FanOutRecordsPublisherTest {
}
- @Test
- public void mismatchedShardIdTest() {
- FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN, true);
- ArgumentCaptor captor = ArgumentCaptor
- .forClass(FanOutRecordsPublisher.RecordSubscription.class);
- ArgumentCaptor flowCaptor = ArgumentCaptor
- .forClass(FanOutRecordsPublisher.RecordFlow.class);
-
- doNothing().when(publisher).subscribe(captor.capture());
-
- source.start(ExtendedSequenceNumber.LATEST,
- InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST));
-
- List errorsHandled = new ArrayList<>();
- List inputsReceived = new ArrayList<>();
-
- source.subscribe(new Subscriber() {
- Subscription subscription;
-
- @Override
- public void onSubscribe(Subscription s) {
- subscription = s;
- subscription.request(1);
- }
-
- @Override
- public void onNext(ProcessRecordsInput input) {
- inputsReceived.add(input);
- }
-
- @Override
- public void onError(Throwable t) {
- errorsHandled.add(t);
-
- }
-
- @Override
- public void onComplete() {
- fail("OnComplete called when not expected");
- }
- });
-
- verify(kinesisClient).subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture());
- flowCaptor.getValue().onEventStream(publisher);
- captor.getValue().onSubscribe(subscription);
-
- List records = Stream.of(1, 2, 3).map(seq -> makeRecord(seq, seq)).collect(Collectors.toList());
-
- batchEvent = SubscribeToShardEvent.builder().millisBehindLatest(100L).records(records).build();
-
- captor.getValue().onNext(batchEvent);
-
- verify(subscription, times(1)).request(1);
- assertThat(inputsReceived.size(), equalTo(0));
- assertThat(errorsHandled.size(), equalTo(1));
- assertThat(errorsHandled.get(0), instanceOf(IllegalArgumentException.class));
- assertThat(errorsHandled.get(0).getMessage(), containsString("Received records destined for different shards"));
- }
private void verifyRecords(List clientRecordsList, List matchers) {
assertThat(clientRecordsList.size(), equalTo(matchers.size()));
@@ -404,7 +342,7 @@ public class FanOutRecordsPublisherTest {
return makeRecord(sequenceNumber, 1);
}
- private Record makeRecord(int sequenceNumber, int shardId) {
+ static Record makeRecord(int sequenceNumber, int shardId) {
BigInteger version = BigInteger.valueOf(2).shiftLeft(184);
BigInteger shard = BigInteger.valueOf(shardId).shiftLeft(4);
BigInteger seq = version.add(shard).add(BigInteger.valueOf(sequenceNumber));
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/config/DatePropertyValueDecorder.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/config/DatePropertyValueDecorder.java
deleted file mode 100644
index 4f6a588c..00000000
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/config/DatePropertyValueDecorder.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
- *
- * Licensed under the Amazon Software License (the "License").
- * You may not use this file except in compliance with the License.
- * A copy of the License is located at
- *
- * http://aws.amazon.com/asl/
- *
- * or in the "license" file accompanying this file. This file is distributed
- * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package com.amazonaws.services.kinesis.clientlibrary.config;
-
-import java.util.Arrays;
-import java.util.Date;
-import java.util.List;
-
-/**
- * Provide Date property.
- */
-class DatePropertyValueDecoder implements IPropertyValueDecoder {
-
- /**
- * Constructor.
- */
- DatePropertyValueDecoder() {
- }
-
- /**
- * @param value property value as String
- * @return corresponding variable in correct type
- */
- @Override
- public Date decodeValue(String value) {
- try {
- return new Date(Long.parseLong(value) * 1000L);
- } catch (NumberFormatException e) {
- throw new IllegalArgumentException("Date property value must be numeric.");
- }
- }
-
- /**
- * @return list of supported types
- */
- @Override
- public List> getSupportedTypes() {
- return Arrays.asList(Date.class);
- }
-
-}
diff --git a/src/test/resources/log4j.properties b/src/test/resources/log4j.properties
deleted file mode 100644
index 73ba669c..00000000
--- a/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,8 +0,0 @@
-log4j.rootLogger=INFO, A1
-log4j.appender.A1=org.apache.log4j.ConsoleAppender
-log4j.appender.A1.layout=org.apache.log4j.PatternLayout
-
-# Print the date in ISO 8601 format
-log4j.appender.A1.layout.ConversionPattern=%d [%t] %-5p %c - %m%n
-
-log4j.logger.org.apache.http=WARN