Add support for stream ARNs (#1109)

Add support for referencing streams by streamARN in single-stream mode, or by the combination of streamARN and creationEpoch in multi-stream mode.
This commit is contained in:
furq-aws 2023-05-19 12:21:20 -07:00 committed by GitHub
parent 7092ffdbd6
commit d7f3a079e1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
22 changed files with 254 additions and 1115 deletions

View file

@ -75,11 +75,6 @@
<artifactId>netty-nio-client</artifactId> <artifactId>netty-nio-client</artifactId>
<version>${awssdk.version}</version> <version>${awssdk.version}</version>
</dependency> </dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sts</artifactId>
<version>${awssdk.version}</version>
</dependency>
<dependency> <dependency>
<groupId>software.amazon.glue</groupId> <groupId>software.amazon.glue</groupId>
<artifactId>schema-registry-serde</artifactId> <artifactId>schema-registry-serde</artifactId>
@ -139,20 +134,6 @@
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-junit4</artifactId>
<version>1.7.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-api-mockito</artifactId>
<version>1.7.4</version>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>org.hamcrest</groupId> <groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId> <artifactId>hamcrest-all</artifactId>

View file

@ -25,6 +25,7 @@ import org.apache.commons.lang3.StringUtils;
import lombok.NonNull; import lombok.NonNull;
import lombok.experimental.Accessors; import lombok.experimental.Accessors;
import software.amazon.awssdk.arns.Arn;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
@ -128,7 +129,7 @@ public class ConfigsBuilder {
} }
/** /**
* Constructor to initialize ConfigsBuilder for a single stream. * Constructor to initialize ConfigsBuilder for a single stream identified by name.
* *
* @param streamName * @param streamName
* @param applicationName * @param applicationName
@ -142,7 +143,31 @@ public class ConfigsBuilder {
@NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient,
@NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier,
@NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) {
this(new SingleStreamTracker(streamName, kinesisClient.serviceClientConfiguration().region()), this(new SingleStreamTracker(streamName),
applicationName,
kinesisClient,
dynamoDBClient,
cloudWatchClient,
workerIdentifier,
shardRecordProcessorFactory);
}
/**
* Constructor to initialize ConfigsBuilder for a single stream identified by {@link Arn}.
*
* @param streamArn
* @param applicationName
* @param kinesisClient
* @param dynamoDBClient
* @param cloudWatchClient
* @param workerIdentifier
* @param shardRecordProcessorFactory
*/
public ConfigsBuilder(@NonNull Arn streamArn, @NonNull String applicationName,
@NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient,
@NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier,
@NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) {
this(new SingleStreamTracker(streamArn),
applicationName, applicationName,
kinesisClient, kinesisClient,
dynamoDBClient, dynamoDBClient,

View file

@ -1,50 +0,0 @@
/*
* Copyright 2023 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.common;
import java.util.function.Function;
import lombok.RequiredArgsConstructor;
/**
* Caches the result from a {@link Function}. Caching is especially useful when
* invoking the function is an expensive call that produces a reusable result.
* If the input value should be fixed, {@link SupplierCache} may be used.
* <br/><br/>
* Note that if {@code f(x)=X} is cached, {@code X} will be returned for every
* successive query of this cache regardless of the input parameter. This is
* by design under the assumption that {@code X} is a viable response for
* other invocations.
*
* @param <IN> input type
* @param <OUT> output type
*/
@RequiredArgsConstructor
public class FunctionCache<IN, OUT> extends SynchronizedCache<OUT> {
private final Function<IN, OUT> function;
/**
* Returns the cached result. If the cache is null, the function will be
* invoked to populate the cache.
*
* @param input input argument to the underlying function
* @return cached result which may be null
*/
public OUT get(final IN input) {
return get(() -> function.apply(input));
}
}

View file

@ -1,86 +0,0 @@
/*
* Copyright 2023 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.common;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.arns.Arn;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sts.StsClient;
import software.amazon.awssdk.services.sts.model.GetCallerIdentityResponse;
import java.util.Optional;
@Slf4j
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class StreamARNUtil {
/**
* Caches an {@link Arn} constructed from a {@link StsClient#getCallerIdentity()} call.
*/
private static final SupplierCache<Arn> CALLER_IDENTITY_ARN = new SupplierCache<>(() -> {
try (final SdkHttpClient httpClient = UrlConnectionHttpClient.builder().build();
final StsClient stsClient = StsClient.builder().httpClient(httpClient).build()) {
final GetCallerIdentityResponse response = stsClient.getCallerIdentity();
final Arn arn = Arn.fromString(response.arn());
// guarantee the cached ARN will never have an empty accountId
arn.accountId().orElseThrow(() -> new IllegalStateException("AccountId is not present on " + arn));
return arn;
} catch (AwsServiceException | SdkClientException e) {
log.warn("Unable to get sts caller identity to build stream arn", e);
return null;
}
});
/**
* Retrieves the stream ARN using the stream name, region, and accountId returned by STS.
* It is designed to fail gracefully, returning Optional.empty() if any errors occur.
*
* @param streamName stream name
* @param kinesisRegion Kinesis client endpoint, and also where the stream(s) to be
* processed are located. A null guarantees an empty ARN.
*/
public static Optional<Arn> getStreamARN(String streamName, Region kinesisRegion) {
return getStreamARN(streamName, kinesisRegion, null);
}
public static Optional<Arn> getStreamARN(String streamName, Region kinesisRegion, String accountId) {
if (kinesisRegion == null) {
return Optional.empty();
}
final Arn identityArn = CALLER_IDENTITY_ARN.get();
if (identityArn == null) {
return Optional.empty();
}
// the provided accountId takes precedence
final String chosenAccountId = (accountId != null) ? accountId : identityArn.accountId().get();
return Optional.of(Arn.builder()
.partition(identityArn.partition())
.service("kinesis")
.region(kinesisRegion.toString())
.accountId(chosenAccountId)
.resource("stream/" + streamName)
.build());
}
}

View file

@ -23,9 +23,7 @@ import lombok.NonNull;
import lombok.experimental.Accessors; import lombok.experimental.Accessors;
import software.amazon.awssdk.arns.Arn; import software.amazon.awssdk.arns.Arn;
import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryResponse;
import software.amazon.awssdk.utils.Validate; import software.amazon.awssdk.utils.Validate;
import software.amazon.kinesis.retrieval.KinesisClientFacade;
import java.util.Optional; import java.util.Optional;
import java.util.regex.Matcher; import java.util.regex.Matcher;
@ -42,9 +40,10 @@ public class StreamIdentifier {
@NonNull @NonNull
private final String streamName; private final String streamName;
@Builder.Default @Builder.Default
private Optional<Long> streamCreationEpochOptional = Optional.empty(); private final Optional<Long> streamCreationEpochOptional = Optional.empty();
@Builder.Default @Builder.Default
private final Optional<Arn> streamARNOptional = Optional.empty(); @EqualsAndHashCode.Exclude
private final Optional<Arn> streamArnOptional = Optional.empty();
/** /**
* Pattern for a serialized {@link StreamIdentifier}. The valid format is * Pattern for a serialized {@link StreamIdentifier}. The valid format is
@ -59,39 +58,25 @@ public class StreamIdentifier {
* where {@code region} is the id representation of a {@link Region}. * where {@code region} is the id representation of a {@link Region}.
*/ */
private static final Pattern STREAM_ARN_PATTERN = Pattern.compile( private static final Pattern STREAM_ARN_PATTERN = Pattern.compile(
"arn:aws:kinesis:(?<region>[-a-z0-9]+):(?<accountId>[0-9]{12}):stream/(?<streamName>.+)"); "arn:aws[^:]*:kinesis:(?<region>[-a-z0-9]+):(?<accountId>[0-9]{12}):stream/(?<streamName>.+)");
/** /**
* Serialize the current StreamIdentifier instance. * Serialize the current StreamIdentifier instance.
* *
* @return a String of {@code account:stream:creationEpoch[:region]} * @return a String of {@code account:stream:creationEpoch} in multi-stream mode
* where {@code region} is the id representation of a {@link Region} * or {@link #streamName} in single-stream mode.
* and is optional.
*/ */
public String serialize() { public String serialize() {
if (!accountIdOptional.isPresent()) { if (!streamCreationEpochOptional.isPresent()) {
// creation epoch is expected to be empty in single-stream mode
return streamName; return streamName;
} }
if (!streamCreationEpochOptional.isPresent()) {
// FIXME bias-for-action hack to simplify back-porting into KCL 1.x and facilitate the
// backwards-compatible requirement. There's a chicken-and-egg issue if DSS is
// called as the application is being configured (and before the client is rigged).
// Furthermore, if epoch isn't lazy-loaded here, the problem quickly spirals into
// systemic issues of concurrency and consistency (e.g., PeriodicShardSyncManager,
// Scheduler, DDB leases). We should look at leveraging dependency injection.
// (NOTE: not to inject the Kinesis client here, but to ensure the client is
// accessible elsewhere ASAP.)
final DescribeStreamSummaryResponse dss = KinesisClientFacade.describeStreamSummary(
streamARNOptional().get().toString());
final long creationEpoch = dss.streamDescriptionSummary().streamCreationTimestamp().getEpochSecond();
streamCreationEpochOptional = Optional.of(creationEpoch);
}
final char delimiter = ':'; final char delimiter = ':';
final StringBuilder sb = new StringBuilder(accountIdOptional.get()).append(delimiter) final StringBuilder sb = new StringBuilder()
.append(streamName).append(delimiter); .append(accountIdOptional.get()).append(delimiter)
streamCreationEpochOptional.ifPresent(sb::append); .append(streamName).append(delimiter)
.append(streamCreationEpochOptional.get());
return sb.toString(); return sb.toString();
} }
@ -101,129 +86,98 @@ public class StreamIdentifier {
} }
/** /**
* Create a multi stream instance for StreamIdentifier from serialized stream identifier. * Create a multi stream instance for StreamIdentifier from serialized stream identifier
* of format {@link #STREAM_IDENTIFIER_PATTERN}
* *
* @param serializationOrArn serialized {@link StreamIdentifier} or AWS ARN of a Kinesis stream * @param streamIdentifierSer a String of {@code account:stream:creationEpoch}
* * @return StreamIdentifier with {@link #accountIdOptional} and {@link #streamCreationEpochOptional} present
* @see #multiStreamInstance(String, Region)
* @see #serialize()
*/ */
public static StreamIdentifier multiStreamInstance(String serializationOrArn) { public static StreamIdentifier multiStreamInstance(String streamIdentifierSer) {
return multiStreamInstance(serializationOrArn, null); final Matcher matcher = STREAM_IDENTIFIER_PATTERN.matcher(streamIdentifierSer);
if (matcher.matches()) {
final String accountId = matcher.group("accountId");
final String streamName = matcher.group("streamName");
final Long creationEpoch = Long.valueOf(matcher.group("creationEpoch"));
validateCreationEpoch(creationEpoch);
return StreamIdentifier.builder()
.accountIdOptional(Optional.of(accountId))
.streamName(streamName)
.streamCreationEpochOptional(Optional.of(creationEpoch))
.build();
}
throw new IllegalArgumentException("Unable to deserialize StreamIdentifier from " + streamIdentifierSer);
} }
/** /**
* Create a multi stream instance for StreamIdentifier from serialized stream identifier. * Create a multi stream instance for StreamIdentifier from stream {@link Arn}.
* *
* @param serializationOrArn serialized {@link StreamIdentifier} or AWS ARN of a Kinesis stream * @param streamArn an {@link Arn} of format {@link #STREAM_ARN_PATTERN}
* @param kinesisRegion Kinesis client endpoint, and also where the stream(s) to be * @param creationEpoch Creation epoch of the stream. This value will
* processed are located. A null will default to the caller's region. * reflect in the lease key and is assumed to be correct. (KCL could
* * verify, but that creates issues for both bootstrapping and, with large
* @see #multiStreamInstance(String) * KCL applications, API throttling against DescribeStreamSummary.)
* @see #serialize() * If this epoch is reused for two identically-named streams in the same
* account -- such as deleting and recreating a stream -- then KCL will
* <b>be unable to differentiate leases between the old and new stream</b>
* since the lease keys collide on this creation epoch.
* @return StreamIdentifier with {@link #accountIdOptional}, {@link #streamCreationEpochOptional},
* and {@link #streamArnOptional} present
*/ */
public static StreamIdentifier multiStreamInstance(String serializationOrArn, Region kinesisRegion) { public static StreamIdentifier multiStreamInstance(Arn streamArn, long creationEpoch) {
final StreamIdentifier fromSerialization = fromSerialization(serializationOrArn, kinesisRegion); validateArn(streamArn);
if (fromSerialization != null) { validateCreationEpoch(creationEpoch);
return fromSerialization;
}
final StreamIdentifier fromArn = fromArn(serializationOrArn, kinesisRegion);
if (fromArn != null) {
return fromArn;
}
throw new IllegalArgumentException("Unable to deserialize StreamIdentifier from " + serializationOrArn); return StreamIdentifier.builder()
.accountIdOptional(streamArn.accountId())
.streamName(streamArn.resource().resource())
.streamCreationEpochOptional(Optional.of(creationEpoch))
.streamArnOptional(Optional.of(streamArn))
.build();
} }
/** /**
* Create a single stream instance for StreamIdentifier from stream name. * Create a single stream instance for StreamIdentifier from stream name.
* *
* @param streamNameOrArn stream name or AWS ARN of a Kinesis stream * @param streamName stream name of a Kinesis stream
*
* @see #singleStreamInstance(String, Region)
*/ */
public static StreamIdentifier singleStreamInstance(String streamNameOrArn) { public static StreamIdentifier singleStreamInstance(String streamName) {
return singleStreamInstance(streamNameOrArn, null); Validate.notEmpty(streamName, "StreamName should not be empty");
}
/**
* Create a single stream instance for StreamIdentifier from the provided stream name and kinesisRegion.
* This method also constructs the optional StreamARN based on the region info.
*
* @param streamNameOrArn stream name or AWS ARN of a Kinesis stream
* @param kinesisRegion Kinesis client endpoint, and also where the stream(s) to be
* processed are located. A null will default to the caller's region.
*
* @see #singleStreamInstance(String)
*/
public static StreamIdentifier singleStreamInstance(String streamNameOrArn, Region kinesisRegion) {
Validate.notEmpty(streamNameOrArn, "StreamName should not be empty");
final StreamIdentifier fromArn = fromArn(streamNameOrArn, kinesisRegion);
if (fromArn != null) {
return fromArn;
}
return StreamIdentifier.builder() return StreamIdentifier.builder()
.streamName(streamNameOrArn)
.streamARNOptional(StreamARNUtil.getStreamARN(streamNameOrArn, kinesisRegion))
.build();
}
/**
* Deserializes a StreamIdentifier from {@link #STREAM_IDENTIFIER_PATTERN}.
*
* @param input input string (e.g., ARN, serialized instance) to convert into an instance
* @param kinesisRegion Kinesis client endpoint, and also where the stream(s) to be
* processed are located. A null will default to the caller's region.
* @return a StreamIdentifier instance if the pattern matched, otherwise null
*/
private static StreamIdentifier fromSerialization(final String input, final Region kinesisRegion) {
final Matcher matcher = STREAM_IDENTIFIER_PATTERN.matcher(input);
return matcher.matches()
? toStreamIdentifier(matcher, matcher.group("creationEpoch"), kinesisRegion) : null;
}
/**
* Constructs a StreamIdentifier from {@link #STREAM_ARN_PATTERN}.
*
* @param input input string (e.g., ARN, serialized instance) to convert into an instance
* @param kinesisRegion Kinesis client endpoint, and also where the stream(s) to be
* processed are located. A null will default to the caller's region.
* @return a StreamIdentifier instance if the pattern matched, otherwise null
*/
private static StreamIdentifier fromArn(final String input, final Region kinesisRegion) {
final Matcher matcher = STREAM_ARN_PATTERN.matcher(input);
if (matcher.matches()) {
final String arnRegion = matcher.group("region");
final Region region = (arnRegion != null) ? Region.of(arnRegion) : kinesisRegion;
if ((kinesisRegion != null) && (region != kinesisRegion)) {
throw new IllegalArgumentException(String.format(
"Cannot create StreamIdentifier for a region other than %s: %s", kinesisRegion, input));
}
return toStreamIdentifier(matcher, "", region);
}
return null;
}
private static StreamIdentifier toStreamIdentifier(final Matcher matcher, final String matchedEpoch,
final Region kinesisRegion) {
final String accountId = matcher.group("accountId");
final String streamName = matcher.group("streamName");
final Optional<Long> creationEpoch = matchedEpoch.isEmpty() ? Optional.empty()
: Optional.of(Long.valueOf(matchedEpoch));
final Optional<Arn> arn = StreamARNUtil.getStreamARN(streamName, kinesisRegion, accountId);
if (!creationEpoch.isPresent() && !arn.isPresent()) {
throw new IllegalArgumentException("Cannot create StreamIdentifier if missing both ARN and creation epoch");
}
return StreamIdentifier.builder()
.accountIdOptional(Optional.of(accountId))
.streamName(streamName) .streamName(streamName)
.streamCreationEpochOptional(creationEpoch)
.streamARNOptional(arn)
.build(); .build();
} }
/**
* Create a single stream instance for StreamIdentifier from AWS Kinesis stream {@link Arn}.
*
* @param streamArn AWS ARN of a Kinesis stream
* @return StreamIdentifier with {@link #accountIdOptional} and {@link #streamArnOptional} present
*/
public static StreamIdentifier singleStreamInstance(Arn streamArn) {
validateArn(streamArn);
return StreamIdentifier.builder()
.accountIdOptional(streamArn.accountId())
.streamName(streamArn.resource().resource())
.streamArnOptional(Optional.of(streamArn))
.build();
}
private static void validateArn(Arn streamArn) {
if (!STREAM_ARN_PATTERN.matcher(streamArn.toString()).matches() || !streamArn.region().isPresent()) {
throw new IllegalArgumentException("Unable to create a StreamIdentifier from " + streamArn);
}
}
private static void validateCreationEpoch(long creationEpoch) {
if (creationEpoch <= 0) {
throw new IllegalArgumentException(
"Creation epoch must be > 0; received " + creationEpoch);
}
}
} }

View file

@ -1,42 +0,0 @@
/*
* Copyright 2023 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.common;
import java.util.function.Supplier;
import lombok.RequiredArgsConstructor;
/**
* Caches results from a {@link Supplier}. Caching is especially useful when
* {@link Supplier#get()} is an expensive call that produces static results.
*
* @param <T> result type
*/
@RequiredArgsConstructor
public class SupplierCache<T> extends SynchronizedCache<T> {
private final Supplier<T> supplier;
/**
* Returns the cached result. If the cache is null, the supplier will be
* invoked to populate the cache.
*
* @return cached result which may be null
*/
public T get() {
return get(supplier);
}
}

View file

@ -1,48 +0,0 @@
/*
* Copyright 2023 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.common;
import java.util.function.Supplier;
/**
* A synchronized, "no frills" cache that preserves the first non-null value
* returned from a {@link Supplier}.
*
* @param <R> result type
*/
public class SynchronizedCache<R> {
private volatile R result;
/**
* Returns the cached result. If the cache is null, the supplier will be
* invoked to populate the cache.
*
* @param supplier supplier to invoke if the cache is null
* @return cached result which may be null
*/
protected R get(final Supplier<R> supplier) {
if (result == null) {
synchronized (this) {
// double-check lock
if (result == null) {
result = supplier.get();
}
}
}
return result;
}
}

View file

@ -223,8 +223,7 @@ public class KinesisShardDetector implements ShardDetector {
ListShardsRequest.Builder builder = KinesisRequestsBuilder.listShardsRequestBuilder(); ListShardsRequest.Builder builder = KinesisRequestsBuilder.listShardsRequestBuilder();
if (StringUtils.isEmpty(nextToken)) { if (StringUtils.isEmpty(nextToken)) {
builder.streamName(streamIdentifier.streamName()).shardFilter(shardFilter); builder.streamName(streamIdentifier.streamName()).shardFilter(shardFilter);
streamIdentifier.streamARNOptional().ifPresent(arn -> builder.streamARN(arn.toString())); streamIdentifier.streamArnOptional().ifPresent(arn -> builder.streamARN(arn.toString()));
} else { } else {
builder.nextToken(nextToken); builder.nextToken(nextToken);
} }
@ -313,7 +312,7 @@ public class KinesisShardDetector implements ShardDetector {
.streamName(streamIdentifier.streamName()) .streamName(streamIdentifier.streamName())
.shardIteratorType(ShardIteratorType.LATEST) .shardIteratorType(ShardIteratorType.LATEST)
.shardId(shardId); .shardId(shardId);
streamIdentifier.streamARNOptional().ifPresent(arn -> requestBuilder.streamARN(arn.toString())); streamIdentifier.streamArnOptional().ifPresent(arn -> requestBuilder.streamARN(arn.toString()));
final GetShardIteratorRequest getShardIteratorRequest = requestBuilder.build(); final GetShardIteratorRequest getShardIteratorRequest = requestBuilder.build();
final GetShardIteratorResponse getShardIteratorResponse = final GetShardIteratorResponse getShardIteratorResponse =

View file

@ -21,7 +21,7 @@ import java.util.List;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;
import lombok.NonNull; import lombok.NonNull;
import lombok.ToString; import lombok.ToString;
import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.arns.Arn;
import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamConfig; import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.common.StreamIdentifier;
@ -49,8 +49,8 @@ public class SingleStreamTracker implements StreamTracker {
this(StreamIdentifier.singleStreamInstance(streamName)); this(StreamIdentifier.singleStreamInstance(streamName));
} }
public SingleStreamTracker(String streamName, Region region) { public SingleStreamTracker(Arn streamArn) {
this(StreamIdentifier.singleStreamInstance(streamName, region)); this(StreamIdentifier.singleStreamInstance(streamArn));
} }
public SingleStreamTracker(StreamIdentifier streamIdentifier) { public SingleStreamTracker(StreamIdentifier streamIdentifier) {

View file

@ -1,114 +0,0 @@
/*
* Copyright 2023 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.retrieval;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryResponse;
import software.amazon.awssdk.services.kinesis.model.KinesisException;
import software.amazon.awssdk.services.kinesis.model.LimitExceededException;
import software.amazon.awssdk.services.kinesis.model.ResourceInUseException;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import software.amazon.kinesis.common.KinesisRequestsBuilder;
/**
* Facade pattern to simplify interactions with a {@link KinesisAsyncClient}.
*/
@Slf4j
public final class KinesisClientFacade {
/**
* Reusable {@link AWSExceptionManager}.
* <p>
* N.B. This instance is mutable, but thread-safe for <b>read-only</b> use.
* </p>
*/
private static final AWSExceptionManager AWS_EXCEPTION_MANAGER;
// FIXME dependency injection
private static KinesisAsyncClient kinesisClient;
static {
AWS_EXCEPTION_MANAGER = new AWSExceptionManager();
AWS_EXCEPTION_MANAGER.add(KinesisException.class, t -> t);
AWS_EXCEPTION_MANAGER.add(LimitExceededException.class, t -> t);
AWS_EXCEPTION_MANAGER.add(ResourceInUseException.class, t -> t);
AWS_EXCEPTION_MANAGER.add(ResourceNotFoundException.class, t -> t);
}
static void initialize(final KinesisAsyncClient client) {
kinesisClient = client;
}
public static DescribeStreamSummaryResponse describeStreamSummary(final String streamArn) {
final DescribeStreamSummaryRequest request = KinesisRequestsBuilder
.describeStreamSummaryRequestBuilder().streamARN(streamArn).build();
final ServiceCallerSupplier<DescribeStreamSummaryResponse> dss =
() -> kinesisClient.describeStreamSummary(request).get();
return retryWhenThrottled(dss, 3, streamArn, "DescribeStreamSummary");
}
// FIXME code lifted-and-shifted from FanOutConsumerRegistration; that class
// (and others) should not be responsible for interacting directly with
// the thread-safe Kinesis client (and handling retries, etc.)
private static <T> T retryWhenThrottled(
@NonNull final ServiceCallerSupplier<T> retriever,
final int maxRetries,
final String streamArn,
@NonNull final String apiName) {
LimitExceededException finalException = null;
int retries = maxRetries;
while (retries > 0) {
try {
try {
return retriever.get();
} catch (ExecutionException e) {
throw AWS_EXCEPTION_MANAGER.apply(e.getCause());
} catch (InterruptedException e) {
throw KinesisException.create("Unable to complete " + apiName, e);
} catch (TimeoutException te) {
log.info("Timed out waiting for " + apiName + " for " + streamArn);
}
} catch (LimitExceededException e) {
log.info("{} : Throttled while calling {} API, will backoff.", streamArn, apiName);
try {
Thread.sleep(1000 + (long) (Math.random() * 100));
} catch (InterruptedException ie) {
log.debug("Sleep interrupted, shutdown invoked.");
}
finalException = e;
}
retries--;
}
if (finalException == null) {
throw new IllegalStateException(streamArn + " : Exhausted retries while calling " + apiName);
}
throw finalException;
}
@FunctionalInterface
private interface ServiceCallerSupplier<T> {
T get() throws ExecutionException, InterruptedException, TimeoutException;
}
}

View file

@ -22,6 +22,7 @@ import lombok.NonNull;
import lombok.Setter; import lombok.Setter;
import lombok.ToString; import lombok.ToString;
import lombok.experimental.Accessors; import lombok.experimental.Accessors;
import software.amazon.awssdk.arns.Arn;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.utils.Either; import software.amazon.awssdk.utils.Either;
import software.amazon.kinesis.common.DeprecationUtils; import software.amazon.kinesis.common.DeprecationUtils;
@ -119,9 +120,12 @@ public class RetrievalConfig {
public RetrievalConfig(@NonNull KinesisAsyncClient kinesisAsyncClient, @NonNull String streamName, public RetrievalConfig(@NonNull KinesisAsyncClient kinesisAsyncClient, @NonNull String streamName,
@NonNull String applicationName) { @NonNull String applicationName) {
this(kinesisAsyncClient, this(kinesisAsyncClient, new SingleStreamTracker(streamName), applicationName);
new SingleStreamTracker(streamName, kinesisAsyncClient.serviceClientConfiguration().region()), }
applicationName);
public RetrievalConfig(@NonNull KinesisAsyncClient kinesisAsyncClient, @NonNull Arn streamArn,
@NonNull String applicationName) {
this(kinesisAsyncClient, new SingleStreamTracker(streamArn), applicationName);
} }
public RetrievalConfig(@NonNull KinesisAsyncClient kinesisAsyncClient, @NonNull StreamTracker streamTracker, public RetrievalConfig(@NonNull KinesisAsyncClient kinesisAsyncClient, @NonNull StreamTracker streamTracker,
@ -131,8 +135,6 @@ public class RetrievalConfig {
this.applicationName = applicationName; this.applicationName = applicationName;
this.appStreamTracker = DeprecationUtils.convert(streamTracker, this.appStreamTracker = DeprecationUtils.convert(streamTracker,
singleStreamTracker -> singleStreamTracker.streamConfigList().get(0)); singleStreamTracker -> singleStreamTracker.streamConfigList().get(0));
KinesisClientFacade.initialize(kinesisAsyncClient);
} }
/** /**

View file

@ -238,7 +238,7 @@ public class KinesisDataFetcher implements DataFetcher {
GetShardIteratorRequest.Builder builder = KinesisRequestsBuilder.getShardIteratorRequestBuilder() GetShardIteratorRequest.Builder builder = KinesisRequestsBuilder.getShardIteratorRequestBuilder()
.streamName(streamIdentifier.streamName()).shardId(shardId); .streamName(streamIdentifier.streamName()).shardId(shardId);
streamIdentifier.streamARNOptional().ifPresent(arn -> builder.streamARN(arn.toString())); streamIdentifier.streamArnOptional().ifPresent(arn -> builder.streamARN(arn.toString()));
GetShardIteratorRequest request; GetShardIteratorRequest request;
if (isIteratorRestart) { if (isIteratorRestart) {
@ -321,7 +321,7 @@ public class KinesisDataFetcher implements DataFetcher {
public GetRecordsRequest getGetRecordsRequest(String nextIterator) { public GetRecordsRequest getGetRecordsRequest(String nextIterator) {
GetRecordsRequest.Builder builder = KinesisRequestsBuilder.getRecordsRequestBuilder() GetRecordsRequest.Builder builder = KinesisRequestsBuilder.getRecordsRequestBuilder()
.shardIterator(nextIterator).limit(maxRecords); .shardIterator(nextIterator).limit(maxRecords);
streamIdentifier.streamARNOptional().ifPresent(arn -> builder.streamARN(arn.toString())); streamIdentifier.streamArnOptional().ifPresent(arn -> builder.streamARN(arn.toString()));
return builder.build(); return builder.build();
} }

View file

@ -26,6 +26,8 @@ import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner; import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.awssdk.arns.Arn;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
@ -33,12 +35,12 @@ import software.amazon.kinesis.processor.MultiStreamTracker;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory; import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.processor.SingleStreamTracker; import software.amazon.kinesis.processor.SingleStreamTracker;
import software.amazon.kinesis.processor.StreamTracker; import software.amazon.kinesis.processor.StreamTracker;
import software.amazon.kinesis.utils.MockObjectHelper;
@RunWith(MockitoJUnitRunner.class) @RunWith(MockitoJUnitRunner.class)
public class ConfigsBuilderTest { public class ConfigsBuilderTest {
private final KinesisAsyncClient mockKinesisClient = MockObjectHelper.createKinesisClient(); @Mock
private KinesisAsyncClient mockKinesisClient;
@Mock @Mock
private DynamoDbAsyncClient mockDynamoClient; private DynamoDbAsyncClient mockDynamoClient;
@ -53,18 +55,24 @@ public class ConfigsBuilderTest {
private static final String WORKER_IDENTIFIER = "worker-id"; private static final String WORKER_IDENTIFIER = "worker-id";
@Test @Test
public void testTrackerConstruction() { public void testSingleStreamTrackerConstruction() {
final String streamName = "single-stream"; final String streamName = "single-stream";
final ConfigsBuilder configByName = createConfig(streamName); final Arn streamArn = createArn(streamName);
final ConfigsBuilder configBySingleTracker = createConfig(new SingleStreamTracker(streamName));
for (final ConfigsBuilder cb : Arrays.asList(configByName, configBySingleTracker)) { for (final ConfigsBuilder cb : Arrays.asList(
createConfig(streamName),
createConfig(new SingleStreamTracker(streamName)),
createConfig(streamArn),
createConfig(new SingleStreamTracker(streamArn)))) {
assertEquals(Optional.empty(), cb.appStreamTracker().left()); assertEquals(Optional.empty(), cb.appStreamTracker().left());
assertEquals(streamName, cb.appStreamTracker().right().get()); assertEquals(streamName, cb.appStreamTracker().right().get());
assertEquals(streamName, cb.streamTracker().streamConfigList().get(0).streamIdentifier().streamName()); assertEquals(streamName, cb.streamTracker().streamConfigList().get(0).streamIdentifier().streamName());
assertFalse(cb.streamTracker().isMultiStream()); assertFalse(cb.streamTracker().isMultiStream());
} }
}
@Test
public void testMultiStreamTrackerConstruction() {
final StreamTracker mockMultiStreamTracker = mock(MultiStreamTracker.class); final StreamTracker mockMultiStreamTracker = mock(MultiStreamTracker.class);
final ConfigsBuilder configByMultiTracker = createConfig(mockMultiStreamTracker); final ConfigsBuilder configByMultiTracker = createConfig(mockMultiStreamTracker);
assertEquals(Optional.empty(), configByMultiTracker.appStreamTracker().right()); assertEquals(Optional.empty(), configByMultiTracker.appStreamTracker().right());
@ -78,9 +86,25 @@ public class ConfigsBuilderTest {
mockCloudWatchClient, WORKER_IDENTIFIER, mockShardProcessorFactory); mockCloudWatchClient, WORKER_IDENTIFIER, mockShardProcessorFactory);
} }
private ConfigsBuilder createConfig(Arn streamArn) {
// intentional invocation of constructor where streamArn is an Arn
return new ConfigsBuilder(streamArn, APPLICATION_NAME, mockKinesisClient, mockDynamoClient,
mockCloudWatchClient, WORKER_IDENTIFIER, mockShardProcessorFactory);
}
private ConfigsBuilder createConfig(StreamTracker streamTracker) { private ConfigsBuilder createConfig(StreamTracker streamTracker) {
return new ConfigsBuilder(streamTracker, APPLICATION_NAME, mockKinesisClient, mockDynamoClient, return new ConfigsBuilder(streamTracker, APPLICATION_NAME, mockKinesisClient, mockDynamoClient,
mockCloudWatchClient, WORKER_IDENTIFIER, mockShardProcessorFactory); mockCloudWatchClient, WORKER_IDENTIFIER, mockShardProcessorFactory);
} }
private static Arn createArn(String streamName) {
return Arn.builder()
.partition("aws")
.service("kinesis")
.region(Region.US_EAST_1.id())
.accountId("123456789012")
.resource("stream/" + streamName)
.build();
}
} }

View file

@ -1,61 +0,0 @@
/*
* Copyright 2023 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.common;
import java.util.function.Function;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class FunctionCacheTest {
@Mock
private Function<Integer, Object> mockFunction;
private FunctionCache<Integer, Object> cache;
@Before
public void setUp() {
cache = new FunctionCache<>(mockFunction);
}
/**
* Test that the cache stops invoking the encapsulated {@link Function}
* after it returns a non-null value.
*/
@Test
public void testCache() {
final int expectedValue = 3;
when(mockFunction.apply(expectedValue)).thenReturn(expectedValue);
assertNull(cache.get(1));
assertNull(cache.get(2));
assertEquals(expectedValue, cache.get(3));
assertEquals(expectedValue, cache.get(4));
assertEquals(expectedValue, cache.get(5));
verify(mockFunction, times(expectedValue)).apply(anyInt());
}
}

View file

@ -1,201 +0,0 @@
/*
* Copyright 2023 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.common;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.reflect.Whitebox;
import software.amazon.awssdk.arns.Arn;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sts.StsClient;
import software.amazon.awssdk.services.sts.StsClientBuilder;
import software.amazon.awssdk.services.sts.model.GetCallerIdentityResponse;
import java.lang.reflect.Field;
import java.util.Optional;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
@RunWith(PowerMockRunner.class)
@PrepareForTest({ StreamARNUtil.class, StsClient.class, UrlConnectionHttpClient.class })
public class StreamARNUtilTest {
private static final String STS_RESPONSE_ARN_FORMAT = "arn:aws:sts::%s:assumed-role/Admin/alias";
private static final String KINESIS_STREAM_ARN_FORMAT = "arn:aws:kinesis:us-east-1:%s:stream/%s";
/**
* Original {@link SupplierCache} that is constructed on class load.
*/
private static final SupplierCache<Arn> ORIGINAL_CACHE = Whitebox.getInternalState(
StreamARNUtil.class, "CALLER_IDENTITY_ARN");
private static final String ACCOUNT_ID = "12345";
private static final String STREAM_NAME = StreamARNUtilTest.class.getSimpleName();
@Mock
private StsClientBuilder mockStsClientBuilder;
@Mock
private StsClient mockStsClient;
private SupplierCache<Arn> spySupplierCache;
@Before
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
spySupplierCache = spy(ORIGINAL_CACHE);
setUpSupplierCache(spySupplierCache);
final Arn defaultArn = toArn(STS_RESPONSE_ARN_FORMAT, ACCOUNT_ID);
doReturn(defaultArn).when(spySupplierCache).get();
}
private void setUpSts() {
PowerMockito.mockStatic(StsClient.class);
PowerMockito.mockStatic(UrlConnectionHttpClient.class);
when(UrlConnectionHttpClient.builder()).thenReturn(mock(UrlConnectionHttpClient.Builder.class));
when(StsClient.builder()).thenReturn(mockStsClientBuilder);
when(mockStsClientBuilder.httpClient(any(SdkHttpClient.class))).thenReturn(mockStsClientBuilder);
when(mockStsClientBuilder.build()).thenReturn(mockStsClient);
// bypass the spy so the Sts clients are called
when(spySupplierCache.get()).thenCallRealMethod();
}
/**
* Wrap and embed the original {@link SupplierCache} with a spy to avoid
* one-and-done cache behavior, provide each test precise control over
* return values, and enable the ability to verify interactions via Mockito.
*/
static void setUpSupplierCache(final SupplierCache<Arn> cache) throws Exception {
final Field f = StreamARNUtil.class.getDeclaredField("CALLER_IDENTITY_ARN");
f.setAccessible(true);
f.set(null, cache);
f.setAccessible(false);
}
@Test
public void testGetStreamARNHappyCase() {
getStreamArn();
verify(spySupplierCache).get();
}
@Test
public void testGetStreamARNFromCache() {
final Optional<Arn> actualStreamARNOptional1 = getStreamArn();
final Optional<Arn> actualStreamARNOptional2 = getStreamArn();
verify(spySupplierCache, times(2)).get();
assertEquals(actualStreamARNOptional1, actualStreamARNOptional2);
}
@Test
public void testGetStreamARNReturnsEmptyOnSTSError() {
setUpSts();
// Optional.empty() is expected when there is an error with the STS call and STS returns empty Arn
when(mockStsClient.getCallerIdentity())
.thenThrow(AwsServiceException.builder().message("testAwsServiceException").build())
.thenThrow(SdkClientException.builder().message("testSdkClientException").build());
assertEquals(Optional.empty(), StreamARNUtil.getStreamARN(STREAM_NAME, Region.US_EAST_1));
assertEquals(Optional.empty(), StreamARNUtil.getStreamARN(STREAM_NAME, Region.US_EAST_1));
verify(mockStsClient, times(2)).getCallerIdentity();
verify(spySupplierCache, times(2)).get();
}
@Test(expected = IllegalStateException.class)
public void testStsResponseWithoutAccountId() {
setUpSts();
final Arn arnWithoutAccountId = toArn(STS_RESPONSE_ARN_FORMAT, "");
assertEquals(Optional.empty(), arnWithoutAccountId.accountId());
final GetCallerIdentityResponse identityResponse = GetCallerIdentityResponse.builder()
.arn(arnWithoutAccountId.toString()).build();
when(mockStsClient.getCallerIdentity()).thenReturn(identityResponse);
try {
StreamARNUtil.getStreamARN(STREAM_NAME, Region.US_EAST_1);
} finally {
verify(mockStsClient).getCallerIdentity();
}
}
@Test
public void testGetStreamARNReturnsEmptyOnInvalidKinesisRegion() {
// Optional.empty() is expected when kinesis region is not set correctly
Optional<Arn> actualStreamARNOptional = StreamARNUtil.getStreamARN(STREAM_NAME, null);
assertEquals(Optional.empty(), actualStreamARNOptional);
verifyZeroInteractions(mockStsClient);
verifyZeroInteractions(spySupplierCache);
}
@Test
public void testGetStreamARNWithProvidedAccountIDAndIgnoredSTSResult() {
// If the account id is provided in the StreamIdentifier, it will override the result (account id) returned by sts
final String cachedAccountId = "111111111111";
final String providedAccountId = "222222222222";
final Arn cachedArn = toArn(STS_RESPONSE_ARN_FORMAT, cachedAccountId);
when(spySupplierCache.get()).thenReturn(cachedArn);
final Optional<Arn> actualStreamARNOptional = StreamARNUtil.getStreamARN(STREAM_NAME, Region.US_EAST_1,
providedAccountId);
final Arn expectedStreamARN = toArn(KINESIS_STREAM_ARN_FORMAT, providedAccountId, STREAM_NAME);
verify(spySupplierCache).get();
verifyZeroInteractions(mockStsClient);
assertTrue(actualStreamARNOptional.isPresent());
assertEquals(expectedStreamARN, actualStreamARNOptional.get());
}
private static Optional<Arn> getStreamArn() {
final Optional<Arn> actualArn = StreamARNUtil.getStreamARN(STREAM_NAME, Region.US_EAST_1);
final Arn expectedArn = toArn(KINESIS_STREAM_ARN_FORMAT, ACCOUNT_ID, STREAM_NAME);
assertTrue(actualArn.isPresent());
assertEquals(expectedArn, actualArn.get());
return actualArn;
}
private static Arn toArn(final String format, final Object... params) {
return Arn.fromString(String.format(format, params));
}
}

View file

@ -1,11 +1,7 @@
package software.amazon.kinesis.common; package software.amazon.kinesis.common;
import org.junit.Assert; import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import software.amazon.awssdk.arns.Arn; import software.amazon.awssdk.arns.Arn;
import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.regions.Region;
@ -13,27 +9,16 @@ import java.util.Arrays;
import java.util.Optional; import java.util.Optional;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.mockito.Mockito.when;
import static org.powermock.api.mockito.PowerMockito.mockStatic;
import static org.powermock.api.mockito.PowerMockito.verifyStatic;
import static software.amazon.kinesis.common.StreamARNUtil.getStreamARN;
@RunWith(PowerMockRunner.class)
@PrepareForTest(StreamARNUtil.class)
public class StreamIdentifierTest { public class StreamIdentifierTest {
private static final String STREAM_NAME = "stream-name"; private static final String STREAM_NAME = "stream-name";
private static final String PARTITION = "aws";
private static final String SERVICE = "kinesis";
private static final Region KINESIS_REGION = Region.US_WEST_1; private static final Region KINESIS_REGION = Region.US_WEST_1;
private static final String TEST_ACCOUNT_ID = "123456789012"; private static final String TEST_ACCOUNT_ID = "123456789012";
private static final String RESOURCE = "stream/" + STREAM_NAME;
private static final long EPOCH = 1680616058L; private static final long EPOCH = 1680616058L;
private static final Arn DEFAULT_ARN = createArn();
private static final Arn DEFAULT_ARN = toArn(KINESIS_REGION);
@BeforeClass
public static void setUpBeforeClass() throws Exception {
StreamARNUtilTest.setUpSupplierCache(new SupplierCache<>(() -> DEFAULT_ARN));
}
/** /**
* Test patterns that should match a serialization regex. * Test patterns that should match a serialization regex.
@ -51,74 +36,86 @@ public class StreamIdentifierTest {
@Test @Test
public void testMultiStreamDeserializationFail() { public void testMultiStreamDeserializationFail() {
for (final String pattern : Arrays.asList( for (final String pattern : Arrays.asList(
// arn examples
"arn:aws:kinesis::123456789012:stream/stream-name", // missing region
"arn:aws:kinesis:region::stream/stream-name", // missing account id
"arn:aws:kinesis:region:123456789:stream/stream-name", // account id not 12 digits
"arn:aws:kinesis:region:123456789abc:stream/stream-name", // 12char alphanumeric account id
"arn:aws:kinesis:region:123456789012:stream/", // missing stream-name
// serialization examples
":stream-name:123", // missing account id ":stream-name:123", // missing account id
// "123456789:stream-name:123", // account id not 12 digits // "123456789:stream-name:123", // account id not 12 digits
"123456789abc:stream-name:123", // 12char alphanumeric account id "123456789abc:stream-name:123", // 12char alphanumeric account id
"123456789012::123", // missing stream name "123456789012::123", // missing stream name
"123456789012:stream-name", // missing delimiter and creation epoch "123456789012:stream-name", // missing delimiter and creation epoch
"123456789012:stream-name:", // missing creation epoch "123456789012:stream-name:", // missing creation epoch
"123456789012:stream-name:-123", // negative creation epoch
"123456789012:stream-name:abc", // non-numeric creation epoch "123456789012:stream-name:abc", // non-numeric creation epoch
"" ""
)) { )) {
try { try {
StreamIdentifier.multiStreamInstance(pattern); StreamIdentifier.multiStreamInstance(pattern);
Assert.fail(pattern + " should not have created a StreamIdentifier"); Assert.fail("Serialization " + pattern + " should not have created a StreamIdentifier");
} catch (final IllegalArgumentException iae) { } catch (final IllegalArgumentException iae) {
// expected; ignore // expected; ignore
} }
} }
} }
/**
* Test ARNs that <b>should not</b> match a valid AWS Kinesis stream ARN.
*/
@Test @Test
public void testInstanceFromArn() { public void testMultiStreamByArnWithInvalidStreamArnFail() {
final Arn arn = toArn(KINESIS_REGION); for (final Arn invalidStreamArn : Arrays.asList(
final StreamIdentifier single = StreamIdentifier.singleStreamInstance(arn.toString()); createArn("abc", SERVICE, KINESIS_REGION, TEST_ACCOUNT_ID, RESOURCE), // invalid partition
final StreamIdentifier multi = StreamIdentifier.multiStreamInstance(arn.toString()); createArn(PARTITION, "dynamodb", KINESIS_REGION, TEST_ACCOUNT_ID, RESOURCE), // incorrect service
createArn(PARTITION, SERVICE, null, TEST_ACCOUNT_ID, RESOURCE), // missing region
assertEquals(single, multi); createArn(PARTITION, SERVICE, KINESIS_REGION, null, RESOURCE), // missing account id
assertEquals(Optional.empty(), single.streamCreationEpochOptional()); createArn(PARTITION, SERVICE, KINESIS_REGION, "123456789", RESOURCE), // account id not 12 digits
assertActualStreamIdentifierExpected(arn, single); createArn(PARTITION, SERVICE, KINESIS_REGION, "123456789abc", RESOURCE), // 12char alphanumeric account id
createArn(PARTITION, SERVICE, KINESIS_REGION, TEST_ACCOUNT_ID, "table/name"), // incorrect resource type
Arn.fromString("arn:aws:dynamodb:us-east-2:123456789012:table/myDynamoDBTable") // valid ARN for incorrect resource
)) {
try {
StreamIdentifier.multiStreamInstance(invalidStreamArn, EPOCH);
Assert.fail("Arn " + invalidStreamArn + " should not have created a StreamIdentifier");
} catch (final IllegalArgumentException iae) {
// expected; ignore
}
}
} }
@Test(expected = IllegalArgumentException.class) @Test(expected = IllegalArgumentException.class)
public void testInstanceWithoutEpochOrArn() { public void testNegativeCreationEpoch() {
mockStatic(StreamARNUtil.class); StreamIdentifier.multiStreamInstance(DEFAULT_ARN, -123);
when(getStreamARN(STREAM_NAME, KINESIS_REGION, TEST_ACCOUNT_ID))
.thenReturn(Optional.empty());
try {
StreamIdentifier.singleStreamInstance(DEFAULT_ARN.toString());
} finally {
verifyStatic(StreamARNUtil.class);
getStreamARN(STREAM_NAME, KINESIS_REGION, TEST_ACCOUNT_ID);
} }
@Test(expected = IllegalArgumentException.class)
public void testZeroCreationEpoch() {
StreamIdentifier.multiStreamInstance(DEFAULT_ARN, 0);
}
@Test
public void testSingleStreamInstanceFromArn() {
final StreamIdentifier actualStreamIdentifier = StreamIdentifier.singleStreamInstance(DEFAULT_ARN);
assertActualStreamIdentifierExpected(DEFAULT_ARN, actualStreamIdentifier);
assertEquals(Optional.empty(), actualStreamIdentifier.streamCreationEpochOptional());
assertEquals(actualStreamIdentifier.streamName(), actualStreamIdentifier.serialize());
}
@Test
public void testMultiStreamInstanceFromArn() {
final StreamIdentifier actualStreamIdentifier = StreamIdentifier.multiStreamInstance(DEFAULT_ARN, EPOCH);
assertActualStreamIdentifierExpected(DEFAULT_ARN, actualStreamIdentifier);
assertEquals(Optional.of(EPOCH), actualStreamIdentifier.streamCreationEpochOptional());
assertEquals(serialize(), actualStreamIdentifier.serialize());
} }
@Test @Test
public void testSingleStreamInstanceWithName() { public void testSingleStreamInstanceWithName() {
StreamIdentifier actualStreamIdentifier = StreamIdentifier.singleStreamInstance(STREAM_NAME); StreamIdentifier actualStreamIdentifier = StreamIdentifier.singleStreamInstance(STREAM_NAME);
assertFalse(actualStreamIdentifier.streamCreationEpochOptional().isPresent()); assertEquals(Optional.empty(), actualStreamIdentifier.streamCreationEpochOptional());
assertFalse(actualStreamIdentifier.accountIdOptional().isPresent()); assertEquals(Optional.empty(), actualStreamIdentifier.accountIdOptional());
assertFalse(actualStreamIdentifier.streamARNOptional().isPresent()); assertEquals(Optional.empty(), actualStreamIdentifier.streamArnOptional());
assertEquals(STREAM_NAME, actualStreamIdentifier.streamName()); assertEquals(STREAM_NAME, actualStreamIdentifier.streamName());
} }
@Test
public void testSingleStreamInstanceWithNameAndRegion() {
StreamIdentifier actualStreamIdentifier = StreamIdentifier.singleStreamInstance(STREAM_NAME, KINESIS_REGION);
assertFalse(actualStreamIdentifier.streamCreationEpochOptional().isPresent());
assertFalse(actualStreamIdentifier.accountIdOptional().isPresent());
assertEquals(STREAM_NAME, actualStreamIdentifier.streamName());
assertEquals(Optional.of(DEFAULT_ARN), actualStreamIdentifier.streamARNOptional());
}
@Test @Test
public void testMultiStreamInstanceWithIdentifierSerialization() { public void testMultiStreamInstanceWithIdentifierSerialization() {
StreamIdentifier actualStreamIdentifier = StreamIdentifier.multiStreamInstance(serialize()); StreamIdentifier actualStreamIdentifier = StreamIdentifier.multiStreamInstance(serialize());
@ -126,34 +123,10 @@ public class StreamIdentifierTest {
assertEquals(Optional.of(EPOCH), actualStreamIdentifier.streamCreationEpochOptional()); assertEquals(Optional.of(EPOCH), actualStreamIdentifier.streamCreationEpochOptional());
} }
/**
* When KCL's Kinesis endpoint is a region, it lacks visibility to streams
* in other regions. Therefore, when the endpoint and ARN conflict, an
* Exception should be thrown.
*/
@Test(expected = IllegalArgumentException.class)
public void testConflictOnRegions() {
final Region arnRegion = Region.US_GOV_EAST_1;
assertNotEquals(arnRegion, KINESIS_REGION);
StreamIdentifier.multiStreamInstance(toArn(arnRegion).toString(), KINESIS_REGION);
}
@Test
public void testMultiStreamInstanceWithoutRegionSerialized() {
StreamIdentifier actualStreamIdentifier = StreamIdentifier.multiStreamInstance(
serialize(), KINESIS_REGION);
assertActualStreamIdentifierExpected(actualStreamIdentifier);
}
private void assertActualStreamIdentifierExpected(StreamIdentifier actual) {
assertActualStreamIdentifierExpected(DEFAULT_ARN, actual);
}
private void assertActualStreamIdentifierExpected(Arn expectedArn, StreamIdentifier actual) { private void assertActualStreamIdentifierExpected(Arn expectedArn, StreamIdentifier actual) {
assertEquals(STREAM_NAME, actual.streamName()); assertEquals(STREAM_NAME, actual.streamName());
assertEquals(Optional.of(TEST_ACCOUNT_ID), actual.accountIdOptional()); assertEquals(Optional.of(TEST_ACCOUNT_ID), actual.accountIdOptional());
assertEquals(Optional.ofNullable(expectedArn), actual.streamARNOptional()); assertEquals(Optional.ofNullable(expectedArn), actual.streamArnOptional());
} }
/** /**
@ -163,11 +136,18 @@ public class StreamIdentifierTest {
return String.join(":", TEST_ACCOUNT_ID, STREAM_NAME, Long.toString(EPOCH)); return String.join(":", TEST_ACCOUNT_ID, STREAM_NAME, Long.toString(EPOCH));
} }
private static Arn toArn(final Region region) { private static Arn createArn() {
return Arn.builder().partition("aws").service("kinesis") return createArn(PARTITION, SERVICE, KINESIS_REGION, TEST_ACCOUNT_ID, RESOURCE);
.accountId(TEST_ACCOUNT_ID) }
.resource("stream/" + STREAM_NAME)
.region(region.toString()) private static Arn createArn(String partition, String service, Region region, String account, String resource) {
return Arn.builder()
.partition(partition)
.service(service)
.region(region != null ? region.id() : null)
.accountId(account)
.resource(resource)
.build(); .build();
} }
} }

View file

@ -1,70 +0,0 @@
/*
* Copyright 2023 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.common;
import java.util.function.Supplier;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class SupplierCacheTest {
private static final Object DUMMY_RESULT = SupplierCacheTest.class;
@Mock
private Supplier<Object> mockSupplier;
private SupplierCache<Object> cache;
@Before
public void setUp() {
cache = new SupplierCache<>(mockSupplier);
}
@Test
public void testCache() {
when(mockSupplier.get()).thenReturn(DUMMY_RESULT);
final Object result1 = cache.get();
final Object result2 = cache.get();
assertEquals(DUMMY_RESULT, result1);
assertSame(result1, result2);
verify(mockSupplier).get();
}
@Test
public void testCacheWithNullResult() {
when(mockSupplier.get()).thenReturn(null).thenReturn(DUMMY_RESULT);
final Object result1 = cache.get();
final Object result2 = cache.get();
assertNull(result1);
assertEquals(DUMMY_RESULT, result2);
verify(mockSupplier, times(2)).get();
}
}

View file

@ -1,62 +0,0 @@
/*
* Copyright 2023 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.common;
import java.util.function.Supplier;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class SynchronizedCacheTest {
private static final Object DUMMY_RESULT = SynchronizedCacheTest.class;
@Mock
private Supplier<Object> mockSupplier;
private final SynchronizedCache<Object> cache = new SynchronizedCache<>();
@Test
public void testCache() {
when(mockSupplier.get()).thenReturn(DUMMY_RESULT);
final Object result1 = cache.get(mockSupplier);
final Object result2 = cache.get(mockSupplier);
assertEquals(DUMMY_RESULT, result1);
assertSame(result1, result2);
verify(mockSupplier).get();
}
@Test
public void testCacheWithNullResult() {
when(mockSupplier.get()).thenReturn(null).thenReturn(DUMMY_RESULT);
assertNull(cache.get(mockSupplier));
assertEquals(DUMMY_RESULT, cache.get(mockSupplier));
assertEquals(DUMMY_RESULT, cache.get(mockSupplier));
verify(mockSupplier, times(2)).get();
}
}

View file

@ -112,7 +112,6 @@ import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RetrievalConfig; import software.amazon.kinesis.retrieval.RetrievalConfig;
import software.amazon.kinesis.retrieval.RetrievalFactory; import software.amazon.kinesis.retrieval.RetrievalFactory;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import software.amazon.kinesis.utils.MockObjectHelper;
/** /**
* *
@ -138,6 +137,7 @@ public class SchedulerTest {
private ProcessorConfig processorConfig; private ProcessorConfig processorConfig;
private RetrievalConfig retrievalConfig; private RetrievalConfig retrievalConfig;
@Mock
private KinesisAsyncClient kinesisClient; private KinesisAsyncClient kinesisClient;
@Mock @Mock
private DynamoDbAsyncClient dynamoDBClient; private DynamoDbAsyncClient dynamoDBClient;
@ -180,7 +180,6 @@ public class SchedulerTest {
lifecycleConfig = new LifecycleConfig(); lifecycleConfig = new LifecycleConfig();
metricsConfig = new MetricsConfig(cloudWatchClient, namespace); metricsConfig = new MetricsConfig(cloudWatchClient, namespace);
processorConfig = new ProcessorConfig(shardRecordProcessorFactory); processorConfig = new ProcessorConfig(shardRecordProcessorFactory);
kinesisClient = MockObjectHelper.createKinesisClient();
retrievalConfig = new RetrievalConfig(kinesisClient, streamName, applicationName) retrievalConfig = new RetrievalConfig(kinesisClient, streamName, applicationName)
.retrievalFactory(retrievalFactory); .retrievalFactory(retrievalFactory);
when(leaseCoordinator.leaseRefresher()).thenReturn(dynamoDBLeaseRefresher); when(leaseCoordinator.leaseRefresher()).thenReturn(dynamoDBLeaseRefresher);

View file

@ -1,80 +0,0 @@
/*
* Copyright 2023 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.retrieval;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static software.amazon.kinesis.retrieval.KinesisClientFacade.describeStreamSummary;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryResponse;
import software.amazon.awssdk.services.kinesis.model.LimitExceededException;
@RunWith(MockitoJUnitRunner.class)
public class KinesisClientFacadeTest {
@Mock
private KinesisAsyncClient mockKinesisClient;
@Before
public void setUp() {
KinesisClientFacade.initialize(mockKinesisClient);
}
@Test
public void testDescribeStreamSummary() {
final DescribeStreamSummaryResponse expectedResponse = DescribeStreamSummaryResponse.builder().build();
when(mockKinesisClient.describeStreamSummary(any(DescribeStreamSummaryRequest.class)))
.thenReturn(CompletableFuture.completedFuture(expectedResponse));
final DescribeStreamSummaryResponse actualResponse = describeStreamSummary("narf");
assertEquals(expectedResponse, actualResponse);
verify(mockKinesisClient).describeStreamSummary(any(DescribeStreamSummaryRequest.class));
}
@Test
public void testDescribeStreamSummaryRetries() throws Exception {
final DescribeStreamSummaryResponse expectedResponse = DescribeStreamSummaryResponse.builder().build();
final CompletableFuture<DescribeStreamSummaryResponse> mockFuture = mock(CompletableFuture.class);
final ExecutionException executionException = new ExecutionException(LimitExceededException.builder().build());
when(mockKinesisClient.describeStreamSummary(any(DescribeStreamSummaryRequest.class)))
.thenReturn(mockFuture);
when(mockFuture.get())
.thenThrow(executionException)
.thenThrow(executionException)
.thenReturn(expectedResponse);
final DescribeStreamSummaryResponse actualResponse = describeStreamSummary("retry me plz");
assertEquals(expectedResponse, actualResponse);
verify(mockKinesisClient, times(3)).describeStreamSummary(any(DescribeStreamSummaryRequest.class));
verify(mockFuture, times(3)).get();
}
}

View file

@ -18,19 +18,21 @@ import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner; import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.awssdk.arns.Arn;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamConfig; import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.processor.MultiStreamTracker; import software.amazon.kinesis.processor.MultiStreamTracker;
import software.amazon.kinesis.processor.SingleStreamTracker; import software.amazon.kinesis.processor.SingleStreamTracker;
import software.amazon.kinesis.processor.StreamTracker; import software.amazon.kinesis.processor.StreamTracker;
import software.amazon.kinesis.utils.MockObjectHelper;
@RunWith(MockitoJUnitRunner.class) @RunWith(MockitoJUnitRunner.class)
public class RetrievalConfigTest { public class RetrievalConfigTest {
private static final String APPLICATION_NAME = RetrievalConfigTest.class.getSimpleName(); private static final String APPLICATION_NAME = RetrievalConfigTest.class.getSimpleName();
@Mock
private KinesisAsyncClient mockKinesisClient; private KinesisAsyncClient mockKinesisClient;
@Mock @Mock
@ -38,24 +40,28 @@ public class RetrievalConfigTest {
@Before @Before
public void setUp() { public void setUp() {
mockKinesisClient = MockObjectHelper.createKinesisClient(true);
when(mockMultiStreamTracker.isMultiStream()).thenReturn(true); when(mockMultiStreamTracker.isMultiStream()).thenReturn(true);
} }
@Test @Test
public void testTrackerConstruction() { public void testSingleStreamTrackerConstruction() {
final String streamName = "single-stream"; final String streamName = "single-stream";
final RetrievalConfig configByName = createConfig(streamName); final Arn streamArn = createArn(streamName);
final SingleStreamTracker singleTracker = new SingleStreamTracker(streamName);
final RetrievalConfig configBySingleTracker = createConfig(singleTracker);
for (final RetrievalConfig rc : Arrays.asList(configByName, configBySingleTracker)) { for (final RetrievalConfig rc : Arrays.asList(
createConfig(streamName),
createConfig(new SingleStreamTracker(streamName)),
createConfig(streamArn),
createConfig(new SingleStreamTracker(streamArn)))) {
assertEquals(Optional.empty(), rc.appStreamTracker().left()); assertEquals(Optional.empty(), rc.appStreamTracker().left());
assertEquals(singleTracker, rc.streamTracker()); assertEquals(streamName, rc.streamTracker().streamConfigList().get(0).streamIdentifier().streamName());
assertEquals(1, rc.streamTracker().streamConfigList().size()); assertEquals(1, rc.streamTracker().streamConfigList().size());
assertFalse(rc.streamTracker().isMultiStream()); assertFalse(rc.streamTracker().isMultiStream());
} }
}
@Test
public void testMultiStreamTrackerConstruction() {
final StreamTracker mockMultiStreamTracker = mock(MultiStreamTracker.class); final StreamTracker mockMultiStreamTracker = mock(MultiStreamTracker.class);
final RetrievalConfig configByMultiTracker = createConfig(mockMultiStreamTracker); final RetrievalConfig configByMultiTracker = createConfig(mockMultiStreamTracker);
assertEquals(Optional.empty(), configByMultiTracker.appStreamTracker().right()); assertEquals(Optional.empty(), configByMultiTracker.appStreamTracker().right());
@ -110,8 +116,22 @@ public class RetrievalConfigTest {
return new RetrievalConfig(mockKinesisClient, streamName, APPLICATION_NAME); return new RetrievalConfig(mockKinesisClient, streamName, APPLICATION_NAME);
} }
private RetrievalConfig createConfig(Arn streamArn) {
return new RetrievalConfig(mockKinesisClient, streamArn, APPLICATION_NAME);
}
private RetrievalConfig createConfig(StreamTracker streamTracker) { private RetrievalConfig createConfig(StreamTracker streamTracker) {
return new RetrievalConfig(mockKinesisClient, streamTracker, APPLICATION_NAME); return new RetrievalConfig(mockKinesisClient, streamTracker, APPLICATION_NAME);
} }
private static Arn createArn(String streamName) {
return Arn.builder()
.partition("aws")
.service("kinesis")
.region(Region.US_EAST_1.id())
.accountId("123456789012")
.resource("stream/" + streamName)
.build();
}
} }

View file

@ -1,31 +0,0 @@
package software.amazon.kinesis.utils;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisServiceClientConfiguration;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public final class MockObjectHelper {
public static KinesisAsyncClient createKinesisClient() {
return createKinesisClient(Region.US_EAST_1);
}
/**
* @param isRegionDummy a boolean to determine whether to use a null value for the Kinesis client's region.
* @return
*/
public static KinesisAsyncClient createKinesisClient(boolean isRegionDummy) {
return isRegionDummy ? createKinesisClient(null) : createKinesisClient();
}
public static KinesisAsyncClient createKinesisClient(Region region) {
KinesisAsyncClient kinesisClient = mock(KinesisAsyncClient.class);
when(kinesisClient.serviceClientConfiguration()).
thenReturn(KinesisServiceClientConfiguration.builder().region(region).build());
return kinesisClient;
}
}