diff --git a/amazon-kinesis-client/pom.xml b/amazon-kinesis-client/pom.xml
index 5560c719..e08ed98d 100644
--- a/amazon-kinesis-client/pom.xml
+++ b/amazon-kinesis-client/pom.xml
@@ -75,11 +75,6 @@
netty-nio-client${awssdk.version}
-
- software.amazon.awssdk
- sts
- ${awssdk.version}
- software.amazon.glueschema-registry-serde
@@ -139,20 +134,6 @@
test
-
- org.powermock
- powermock-module-junit4
- 1.7.4
- test
-
-
-
- org.powermock
- powermock-api-mockito
- 1.7.4
- test
-
-
org.hamcresthamcrest-all
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/ConfigsBuilder.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/ConfigsBuilder.java
index 57de9059..02258950 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/ConfigsBuilder.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/ConfigsBuilder.java
@@ -25,6 +25,7 @@ import org.apache.commons.lang3.StringUtils;
import lombok.NonNull;
import lombok.experimental.Accessors;
+import software.amazon.awssdk.arns.Arn;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
@@ -128,7 +129,7 @@ public class ConfigsBuilder {
}
/**
- * Constructor to initialize ConfigsBuilder for a single stream.
+ * Constructor to initialize ConfigsBuilder for a single stream identified by name.
*
* @param streamName
* @param applicationName
@@ -142,7 +143,31 @@ public class ConfigsBuilder {
@NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient,
@NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier,
@NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) {
- this(new SingleStreamTracker(streamName, kinesisClient.serviceClientConfiguration().region()),
+ this(new SingleStreamTracker(streamName),
+ applicationName,
+ kinesisClient,
+ dynamoDBClient,
+ cloudWatchClient,
+ workerIdentifier,
+ shardRecordProcessorFactory);
+ }
+
+ /**
+ * Constructor to initialize ConfigsBuilder for a single stream identified by {@link Arn}.
+ *
+ * @param streamArn
+ * @param applicationName
+ * @param kinesisClient
+ * @param dynamoDBClient
+ * @param cloudWatchClient
+ * @param workerIdentifier
+ * @param shardRecordProcessorFactory
+ */
+ public ConfigsBuilder(@NonNull Arn streamArn, @NonNull String applicationName,
+ @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient,
+ @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier,
+ @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) {
+ this(new SingleStreamTracker(streamArn),
applicationName,
kinesisClient,
dynamoDBClient,
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/FunctionCache.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/FunctionCache.java
deleted file mode 100644
index 881cf5a9..00000000
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/FunctionCache.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Copyright 2023 Amazon.com, Inc. or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package software.amazon.kinesis.common;
-
-import java.util.function.Function;
-
-import lombok.RequiredArgsConstructor;
-
-/**
- * Caches the result from a {@link Function}. Caching is especially useful when
- * invoking the function is an expensive call that produces a reusable result.
- * If the input value should be fixed, {@link SupplierCache} may be used.
- *
- * Note that if {@code f(x)=X} is cached, {@code X} will be returned for every
- * successive query of this cache regardless of the input parameter. This is
- * by design under the assumption that {@code X} is a viable response for
- * other invocations.
- *
- * @param input type
- * @param output type
- */
-@RequiredArgsConstructor
-public class FunctionCache extends SynchronizedCache {
-
- private final Function function;
-
- /**
- * Returns the cached result. If the cache is null, the function will be
- * invoked to populate the cache.
- *
- * @param input input argument to the underlying function
- * @return cached result which may be null
- */
- public OUT get(final IN input) {
- return get(() -> function.apply(input));
- }
-
-}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamARNUtil.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamARNUtil.java
deleted file mode 100644
index 89cf6331..00000000
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamARNUtil.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Copyright 2023 Amazon.com, Inc. or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package software.amazon.kinesis.common;
-
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import software.amazon.awssdk.arns.Arn;
-import software.amazon.awssdk.awscore.exception.AwsServiceException;
-import software.amazon.awssdk.core.exception.SdkClientException;
-import software.amazon.awssdk.http.SdkHttpClient;
-import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
-import software.amazon.awssdk.regions.Region;
-import software.amazon.awssdk.services.sts.StsClient;
-import software.amazon.awssdk.services.sts.model.GetCallerIdentityResponse;
-
-import java.util.Optional;
-
-@Slf4j
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class StreamARNUtil {
-
- /**
- * Caches an {@link Arn} constructed from a {@link StsClient#getCallerIdentity()} call.
- */
- private static final SupplierCache CALLER_IDENTITY_ARN = new SupplierCache<>(() -> {
- try (final SdkHttpClient httpClient = UrlConnectionHttpClient.builder().build();
- final StsClient stsClient = StsClient.builder().httpClient(httpClient).build()) {
- final GetCallerIdentityResponse response = stsClient.getCallerIdentity();
- final Arn arn = Arn.fromString(response.arn());
-
- // guarantee the cached ARN will never have an empty accountId
- arn.accountId().orElseThrow(() -> new IllegalStateException("AccountId is not present on " + arn));
- return arn;
- } catch (AwsServiceException | SdkClientException e) {
- log.warn("Unable to get sts caller identity to build stream arn", e);
- return null;
- }
- });
-
- /**
- * Retrieves the stream ARN using the stream name, region, and accountId returned by STS.
- * It is designed to fail gracefully, returning Optional.empty() if any errors occur.
- *
- * @param streamName stream name
- * @param kinesisRegion Kinesis client endpoint, and also where the stream(s) to be
- * processed are located. A null guarantees an empty ARN.
- */
- public static Optional getStreamARN(String streamName, Region kinesisRegion) {
- return getStreamARN(streamName, kinesisRegion, null);
- }
-
- public static Optional getStreamARN(String streamName, Region kinesisRegion, String accountId) {
- if (kinesisRegion == null) {
- return Optional.empty();
- }
-
- final Arn identityArn = CALLER_IDENTITY_ARN.get();
- if (identityArn == null) {
- return Optional.empty();
- }
-
- // the provided accountId takes precedence
- final String chosenAccountId = (accountId != null) ? accountId : identityArn.accountId().get();
- return Optional.of(Arn.builder()
- .partition(identityArn.partition())
- .service("kinesis")
- .region(kinesisRegion.toString())
- .accountId(chosenAccountId)
- .resource("stream/" + streamName)
- .build());
- }
-
-}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java
index 60195d4f..8307ed82 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java
@@ -23,9 +23,7 @@ import lombok.NonNull;
import lombok.experimental.Accessors;
import software.amazon.awssdk.arns.Arn;
import software.amazon.awssdk.regions.Region;
-import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryResponse;
import software.amazon.awssdk.utils.Validate;
-import software.amazon.kinesis.retrieval.KinesisClientFacade;
import java.util.Optional;
import java.util.regex.Matcher;
@@ -42,9 +40,10 @@ public class StreamIdentifier {
@NonNull
private final String streamName;
@Builder.Default
- private Optional streamCreationEpochOptional = Optional.empty();
+ private final Optional streamCreationEpochOptional = Optional.empty();
@Builder.Default
- private final Optional streamARNOptional = Optional.empty();
+ @EqualsAndHashCode.Exclude
+ private final Optional streamArnOptional = Optional.empty();
/**
* Pattern for a serialized {@link StreamIdentifier}. The valid format is
@@ -59,39 +58,25 @@ public class StreamIdentifier {
* where {@code region} is the id representation of a {@link Region}.
*/
private static final Pattern STREAM_ARN_PATTERN = Pattern.compile(
- "arn:aws:kinesis:(?[-a-z0-9]+):(?[0-9]{12}):stream/(?.+)");
+ "arn:aws[^:]*:kinesis:(?[-a-z0-9]+):(?[0-9]{12}):stream/(?.+)");
/**
* Serialize the current StreamIdentifier instance.
*
- * @return a String of {@code account:stream:creationEpoch[:region]}
- * where {@code region} is the id representation of a {@link Region}
- * and is optional.
+ * @return a String of {@code account:stream:creationEpoch} in multi-stream mode
+ * or {@link #streamName} in single-stream mode.
*/
public String serialize() {
- if (!accountIdOptional.isPresent()) {
+ if (!streamCreationEpochOptional.isPresent()) {
+ // creation epoch is expected to be empty in single-stream mode
return streamName;
}
- if (!streamCreationEpochOptional.isPresent()) {
- // FIXME bias-for-action hack to simplify back-porting into KCL 1.x and facilitate the
- // backwards-compatible requirement. There's a chicken-and-egg issue if DSS is
- // called as the application is being configured (and before the client is rigged).
- // Furthermore, if epoch isn't lazy-loaded here, the problem quickly spirals into
- // systemic issues of concurrency and consistency (e.g., PeriodicShardSyncManager,
- // Scheduler, DDB leases). We should look at leveraging dependency injection.
- // (NOTE: not to inject the Kinesis client here, but to ensure the client is
- // accessible elsewhere ASAP.)
- final DescribeStreamSummaryResponse dss = KinesisClientFacade.describeStreamSummary(
- streamARNOptional().get().toString());
- final long creationEpoch = dss.streamDescriptionSummary().streamCreationTimestamp().getEpochSecond();
- streamCreationEpochOptional = Optional.of(creationEpoch);
- }
-
final char delimiter = ':';
- final StringBuilder sb = new StringBuilder(accountIdOptional.get()).append(delimiter)
- .append(streamName).append(delimiter);
- streamCreationEpochOptional.ifPresent(sb::append);
+ final StringBuilder sb = new StringBuilder()
+ .append(accountIdOptional.get()).append(delimiter)
+ .append(streamName).append(delimiter)
+ .append(streamCreationEpochOptional.get());
return sb.toString();
}
@@ -101,129 +86,98 @@ public class StreamIdentifier {
}
/**
- * Create a multi stream instance for StreamIdentifier from serialized stream identifier.
+ * Create a multi stream instance for StreamIdentifier from serialized stream identifier
+ * of format {@link #STREAM_IDENTIFIER_PATTERN}
*
- * @param serializationOrArn serialized {@link StreamIdentifier} or AWS ARN of a Kinesis stream
- *
- * @see #multiStreamInstance(String, Region)
- * @see #serialize()
+ * @param streamIdentifierSer a String of {@code account:stream:creationEpoch}
+ * @return StreamIdentifier with {@link #accountIdOptional} and {@link #streamCreationEpochOptional} present
*/
- public static StreamIdentifier multiStreamInstance(String serializationOrArn) {
- return multiStreamInstance(serializationOrArn, null);
+ public static StreamIdentifier multiStreamInstance(String streamIdentifierSer) {
+ final Matcher matcher = STREAM_IDENTIFIER_PATTERN.matcher(streamIdentifierSer);
+ if (matcher.matches()) {
+ final String accountId = matcher.group("accountId");
+ final String streamName = matcher.group("streamName");
+ final Long creationEpoch = Long.valueOf(matcher.group("creationEpoch"));
+
+ validateCreationEpoch(creationEpoch);
+
+ return StreamIdentifier.builder()
+ .accountIdOptional(Optional.of(accountId))
+ .streamName(streamName)
+ .streamCreationEpochOptional(Optional.of(creationEpoch))
+ .build();
+ }
+
+ throw new IllegalArgumentException("Unable to deserialize StreamIdentifier from " + streamIdentifierSer);
}
/**
- * Create a multi stream instance for StreamIdentifier from serialized stream identifier.
+ * Create a multi stream instance for StreamIdentifier from stream {@link Arn}.
*
- * @param serializationOrArn serialized {@link StreamIdentifier} or AWS ARN of a Kinesis stream
- * @param kinesisRegion Kinesis client endpoint, and also where the stream(s) to be
- * processed are located. A null will default to the caller's region.
- *
- * @see #multiStreamInstance(String)
- * @see #serialize()
+ * @param streamArn an {@link Arn} of format {@link #STREAM_ARN_PATTERN}
+ * @param creationEpoch Creation epoch of the stream. This value will
+ * reflect in the lease key and is assumed to be correct. (KCL could
+ * verify, but that creates issues for both bootstrapping and, with large
+ * KCL applications, API throttling against DescribeStreamSummary.)
+ * If this epoch is reused for two identically-named streams in the same
+ * account -- such as deleting and recreating a stream -- then KCL will
+ * be unable to differentiate leases between the old and new stream
+ * since the lease keys collide on this creation epoch.
+ * @return StreamIdentifier with {@link #accountIdOptional}, {@link #streamCreationEpochOptional},
+ * and {@link #streamArnOptional} present
*/
- public static StreamIdentifier multiStreamInstance(String serializationOrArn, Region kinesisRegion) {
- final StreamIdentifier fromSerialization = fromSerialization(serializationOrArn, kinesisRegion);
- if (fromSerialization != null) {
- return fromSerialization;
- }
- final StreamIdentifier fromArn = fromArn(serializationOrArn, kinesisRegion);
- if (fromArn != null) {
- return fromArn;
- }
+ public static StreamIdentifier multiStreamInstance(Arn streamArn, long creationEpoch) {
+ validateArn(streamArn);
+ validateCreationEpoch(creationEpoch);
- throw new IllegalArgumentException("Unable to deserialize StreamIdentifier from " + serializationOrArn);
+ return StreamIdentifier.builder()
+ .accountIdOptional(streamArn.accountId())
+ .streamName(streamArn.resource().resource())
+ .streamCreationEpochOptional(Optional.of(creationEpoch))
+ .streamArnOptional(Optional.of(streamArn))
+ .build();
}
/**
* Create a single stream instance for StreamIdentifier from stream name.
*
- * @param streamNameOrArn stream name or AWS ARN of a Kinesis stream
- *
- * @see #singleStreamInstance(String, Region)
+ * @param streamName stream name of a Kinesis stream
*/
- public static StreamIdentifier singleStreamInstance(String streamNameOrArn) {
- return singleStreamInstance(streamNameOrArn, null);
- }
-
- /**
- * Create a single stream instance for StreamIdentifier from the provided stream name and kinesisRegion.
- * This method also constructs the optional StreamARN based on the region info.
- *
- * @param streamNameOrArn stream name or AWS ARN of a Kinesis stream
- * @param kinesisRegion Kinesis client endpoint, and also where the stream(s) to be
- * processed are located. A null will default to the caller's region.
- *
- * @see #singleStreamInstance(String)
- */
- public static StreamIdentifier singleStreamInstance(String streamNameOrArn, Region kinesisRegion) {
- Validate.notEmpty(streamNameOrArn, "StreamName should not be empty");
-
- final StreamIdentifier fromArn = fromArn(streamNameOrArn, kinesisRegion);
- if (fromArn != null) {
- return fromArn;
- }
+ public static StreamIdentifier singleStreamInstance(String streamName) {
+ Validate.notEmpty(streamName, "StreamName should not be empty");
return StreamIdentifier.builder()
- .streamName(streamNameOrArn)
- .streamARNOptional(StreamARNUtil.getStreamARN(streamNameOrArn, kinesisRegion))
- .build();
- }
-
- /**
- * Deserializes a StreamIdentifier from {@link #STREAM_IDENTIFIER_PATTERN}.
- *
- * @param input input string (e.g., ARN, serialized instance) to convert into an instance
- * @param kinesisRegion Kinesis client endpoint, and also where the stream(s) to be
- * processed are located. A null will default to the caller's region.
- * @return a StreamIdentifier instance if the pattern matched, otherwise null
- */
- private static StreamIdentifier fromSerialization(final String input, final Region kinesisRegion) {
- final Matcher matcher = STREAM_IDENTIFIER_PATTERN.matcher(input);
- return matcher.matches()
- ? toStreamIdentifier(matcher, matcher.group("creationEpoch"), kinesisRegion) : null;
- }
-
- /**
- * Constructs a StreamIdentifier from {@link #STREAM_ARN_PATTERN}.
- *
- * @param input input string (e.g., ARN, serialized instance) to convert into an instance
- * @param kinesisRegion Kinesis client endpoint, and also where the stream(s) to be
- * processed are located. A null will default to the caller's region.
- * @return a StreamIdentifier instance if the pattern matched, otherwise null
- */
- private static StreamIdentifier fromArn(final String input, final Region kinesisRegion) {
- final Matcher matcher = STREAM_ARN_PATTERN.matcher(input);
- if (matcher.matches()) {
- final String arnRegion = matcher.group("region");
- final Region region = (arnRegion != null) ? Region.of(arnRegion) : kinesisRegion;
- if ((kinesisRegion != null) && (region != kinesisRegion)) {
- throw new IllegalArgumentException(String.format(
- "Cannot create StreamIdentifier for a region other than %s: %s", kinesisRegion, input));
- }
- return toStreamIdentifier(matcher, "", region);
- }
- return null;
- }
-
- private static StreamIdentifier toStreamIdentifier(final Matcher matcher, final String matchedEpoch,
- final Region kinesisRegion) {
- final String accountId = matcher.group("accountId");
- final String streamName = matcher.group("streamName");
- final Optional creationEpoch = matchedEpoch.isEmpty() ? Optional.empty()
- : Optional.of(Long.valueOf(matchedEpoch));
- final Optional arn = StreamARNUtil.getStreamARN(streamName, kinesisRegion, accountId);
-
- if (!creationEpoch.isPresent() && !arn.isPresent()) {
- throw new IllegalArgumentException("Cannot create StreamIdentifier if missing both ARN and creation epoch");
- }
-
- return StreamIdentifier.builder()
- .accountIdOptional(Optional.of(accountId))
.streamName(streamName)
- .streamCreationEpochOptional(creationEpoch)
- .streamARNOptional(arn)
.build();
}
+ /**
+ * Create a single stream instance for StreamIdentifier from AWS Kinesis stream {@link Arn}.
+ *
+ * @param streamArn AWS ARN of a Kinesis stream
+ * @return StreamIdentifier with {@link #accountIdOptional} and {@link #streamArnOptional} present
+ */
+ public static StreamIdentifier singleStreamInstance(Arn streamArn) {
+ validateArn(streamArn);
+
+ return StreamIdentifier.builder()
+ .accountIdOptional(streamArn.accountId())
+ .streamName(streamArn.resource().resource())
+ .streamArnOptional(Optional.of(streamArn))
+ .build();
+ }
+
+ private static void validateArn(Arn streamArn) {
+ if (!STREAM_ARN_PATTERN.matcher(streamArn.toString()).matches() || !streamArn.region().isPresent()) {
+ throw new IllegalArgumentException("Unable to create a StreamIdentifier from " + streamArn);
+ }
+ }
+
+ private static void validateCreationEpoch(long creationEpoch) {
+ if (creationEpoch <= 0) {
+ throw new IllegalArgumentException(
+ "Creation epoch must be > 0; received " + creationEpoch);
+ }
+ }
+
}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/SupplierCache.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/SupplierCache.java
deleted file mode 100644
index abe2822d..00000000
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/SupplierCache.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Copyright 2023 Amazon.com, Inc. or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package software.amazon.kinesis.common;
-
-import java.util.function.Supplier;
-
-import lombok.RequiredArgsConstructor;
-
-/**
- * Caches results from a {@link Supplier}. Caching is especially useful when
- * {@link Supplier#get()} is an expensive call that produces static results.
- *
- * @param result type
- */
-@RequiredArgsConstructor
-public class SupplierCache extends SynchronizedCache {
-
- private final Supplier supplier;
-
- /**
- * Returns the cached result. If the cache is null, the supplier will be
- * invoked to populate the cache.
- *
- * @return cached result which may be null
- */
- public T get() {
- return get(supplier);
- }
-
-}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/SynchronizedCache.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/SynchronizedCache.java
deleted file mode 100644
index 3df241d3..00000000
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/SynchronizedCache.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Copyright 2023 Amazon.com, Inc. or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package software.amazon.kinesis.common;
-
-import java.util.function.Supplier;
-
-/**
- * A synchronized, "no frills" cache that preserves the first non-null value
- * returned from a {@link Supplier}.
- *
- * @param result type
- */
-public class SynchronizedCache {
-
- private volatile R result;
-
- /**
- * Returns the cached result. If the cache is null, the supplier will be
- * invoked to populate the cache.
- *
- * @param supplier supplier to invoke if the cache is null
- * @return cached result which may be null
- */
- protected R get(final Supplier supplier) {
- if (result == null) {
- synchronized (this) {
- // double-check lock
- if (result == null) {
- result = supplier.get();
- }
- }
- }
- return result;
- }
-
-}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java
index 04b7c795..f22c631a 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java
@@ -223,8 +223,7 @@ public class KinesisShardDetector implements ShardDetector {
ListShardsRequest.Builder builder = KinesisRequestsBuilder.listShardsRequestBuilder();
if (StringUtils.isEmpty(nextToken)) {
builder.streamName(streamIdentifier.streamName()).shardFilter(shardFilter);
- streamIdentifier.streamARNOptional().ifPresent(arn -> builder.streamARN(arn.toString()));
-
+ streamIdentifier.streamArnOptional().ifPresent(arn -> builder.streamARN(arn.toString()));
} else {
builder.nextToken(nextToken);
}
@@ -313,7 +312,7 @@ public class KinesisShardDetector implements ShardDetector {
.streamName(streamIdentifier.streamName())
.shardIteratorType(ShardIteratorType.LATEST)
.shardId(shardId);
- streamIdentifier.streamARNOptional().ifPresent(arn -> requestBuilder.streamARN(arn.toString()));
+ streamIdentifier.streamArnOptional().ifPresent(arn -> requestBuilder.streamARN(arn.toString()));
final GetShardIteratorRequest getShardIteratorRequest = requestBuilder.build();
final GetShardIteratorResponse getShardIteratorResponse =
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/SingleStreamTracker.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/SingleStreamTracker.java
index ee2850ed..9b5f85c3 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/SingleStreamTracker.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/SingleStreamTracker.java
@@ -21,7 +21,7 @@ import java.util.List;
import lombok.EqualsAndHashCode;
import lombok.NonNull;
import lombok.ToString;
-import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.arns.Arn;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.common.StreamIdentifier;
@@ -49,8 +49,8 @@ public class SingleStreamTracker implements StreamTracker {
this(StreamIdentifier.singleStreamInstance(streamName));
}
- public SingleStreamTracker(String streamName, Region region) {
- this(StreamIdentifier.singleStreamInstance(streamName, region));
+ public SingleStreamTracker(Arn streamArn) {
+ this(StreamIdentifier.singleStreamInstance(streamArn));
}
public SingleStreamTracker(StreamIdentifier streamIdentifier) {
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/KinesisClientFacade.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/KinesisClientFacade.java
deleted file mode 100644
index d2f90d3a..00000000
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/KinesisClientFacade.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Copyright 2023 Amazon.com, Inc. or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package software.amazon.kinesis.retrieval;
-
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
-
-import lombok.NonNull;
-import lombok.extern.slf4j.Slf4j;
-import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
-import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryRequest;
-import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryResponse;
-import software.amazon.awssdk.services.kinesis.model.KinesisException;
-import software.amazon.awssdk.services.kinesis.model.LimitExceededException;
-import software.amazon.awssdk.services.kinesis.model.ResourceInUseException;
-import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
-import software.amazon.kinesis.common.KinesisRequestsBuilder;
-
-/**
- * Facade pattern to simplify interactions with a {@link KinesisAsyncClient}.
- */
-@Slf4j
-public final class KinesisClientFacade {
-
- /**
- * Reusable {@link AWSExceptionManager}.
- *
- * N.B. This instance is mutable, but thread-safe for read-only use.
- *
- */
- private static final AWSExceptionManager AWS_EXCEPTION_MANAGER;
-
- // FIXME dependency injection
- private static KinesisAsyncClient kinesisClient;
-
- static {
- AWS_EXCEPTION_MANAGER = new AWSExceptionManager();
- AWS_EXCEPTION_MANAGER.add(KinesisException.class, t -> t);
- AWS_EXCEPTION_MANAGER.add(LimitExceededException.class, t -> t);
- AWS_EXCEPTION_MANAGER.add(ResourceInUseException.class, t -> t);
- AWS_EXCEPTION_MANAGER.add(ResourceNotFoundException.class, t -> t);
- }
-
- static void initialize(final KinesisAsyncClient client) {
- kinesisClient = client;
- }
-
- public static DescribeStreamSummaryResponse describeStreamSummary(final String streamArn) {
- final DescribeStreamSummaryRequest request = KinesisRequestsBuilder
- .describeStreamSummaryRequestBuilder().streamARN(streamArn).build();
- final ServiceCallerSupplier dss =
- () -> kinesisClient.describeStreamSummary(request).get();
- return retryWhenThrottled(dss, 3, streamArn, "DescribeStreamSummary");
- }
-
- // FIXME code lifted-and-shifted from FanOutConsumerRegistration; that class
- // (and others) should not be responsible for interacting directly with
- // the thread-safe Kinesis client (and handling retries, etc.)
- private static T retryWhenThrottled(
- @NonNull final ServiceCallerSupplier retriever,
- final int maxRetries,
- final String streamArn,
- @NonNull final String apiName) {
- LimitExceededException finalException = null;
-
- int retries = maxRetries;
- while (retries > 0) {
- try {
- try {
- return retriever.get();
- } catch (ExecutionException e) {
- throw AWS_EXCEPTION_MANAGER.apply(e.getCause());
- } catch (InterruptedException e) {
- throw KinesisException.create("Unable to complete " + apiName, e);
- } catch (TimeoutException te) {
- log.info("Timed out waiting for " + apiName + " for " + streamArn);
- }
- } catch (LimitExceededException e) {
- log.info("{} : Throttled while calling {} API, will backoff.", streamArn, apiName);
- try {
- Thread.sleep(1000 + (long) (Math.random() * 100));
- } catch (InterruptedException ie) {
- log.debug("Sleep interrupted, shutdown invoked.");
- }
- finalException = e;
- }
- retries--;
- }
-
- if (finalException == null) {
- throw new IllegalStateException(streamArn + " : Exhausted retries while calling " + apiName);
- }
-
- throw finalException;
- }
-
- @FunctionalInterface
- private interface ServiceCallerSupplier {
- T get() throws ExecutionException, InterruptedException, TimeoutException;
- }
-
-}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java
index 153faf70..082dd565 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java
@@ -22,6 +22,7 @@ import lombok.NonNull;
import lombok.Setter;
import lombok.ToString;
import lombok.experimental.Accessors;
+import software.amazon.awssdk.arns.Arn;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.utils.Either;
import software.amazon.kinesis.common.DeprecationUtils;
@@ -119,9 +120,12 @@ public class RetrievalConfig {
public RetrievalConfig(@NonNull KinesisAsyncClient kinesisAsyncClient, @NonNull String streamName,
@NonNull String applicationName) {
- this(kinesisAsyncClient,
- new SingleStreamTracker(streamName, kinesisAsyncClient.serviceClientConfiguration().region()),
- applicationName);
+ this(kinesisAsyncClient, new SingleStreamTracker(streamName), applicationName);
+ }
+
+ public RetrievalConfig(@NonNull KinesisAsyncClient kinesisAsyncClient, @NonNull Arn streamArn,
+ @NonNull String applicationName) {
+ this(kinesisAsyncClient, new SingleStreamTracker(streamArn), applicationName);
}
public RetrievalConfig(@NonNull KinesisAsyncClient kinesisAsyncClient, @NonNull StreamTracker streamTracker,
@@ -131,8 +135,6 @@ public class RetrievalConfig {
this.applicationName = applicationName;
this.appStreamTracker = DeprecationUtils.convert(streamTracker,
singleStreamTracker -> singleStreamTracker.streamConfigList().get(0));
-
- KinesisClientFacade.initialize(kinesisAsyncClient);
}
/**
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java
index 96f8f851..65da2b32 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java
@@ -238,7 +238,7 @@ public class KinesisDataFetcher implements DataFetcher {
GetShardIteratorRequest.Builder builder = KinesisRequestsBuilder.getShardIteratorRequestBuilder()
.streamName(streamIdentifier.streamName()).shardId(shardId);
- streamIdentifier.streamARNOptional().ifPresent(arn -> builder.streamARN(arn.toString()));
+ streamIdentifier.streamArnOptional().ifPresent(arn -> builder.streamARN(arn.toString()));
GetShardIteratorRequest request;
if (isIteratorRestart) {
@@ -321,7 +321,7 @@ public class KinesisDataFetcher implements DataFetcher {
public GetRecordsRequest getGetRecordsRequest(String nextIterator) {
GetRecordsRequest.Builder builder = KinesisRequestsBuilder.getRecordsRequestBuilder()
.shardIterator(nextIterator).limit(maxRecords);
- streamIdentifier.streamARNOptional().ifPresent(arn -> builder.streamARN(arn.toString()));
+ streamIdentifier.streamArnOptional().ifPresent(arn -> builder.streamARN(arn.toString()));
return builder.build();
}
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/common/ConfigsBuilderTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/common/ConfigsBuilderTest.java
index e1de0981..d84b90f7 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/common/ConfigsBuilderTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/common/ConfigsBuilderTest.java
@@ -26,6 +26,8 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
+import software.amazon.awssdk.arns.Arn;
+import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
@@ -33,12 +35,12 @@ import software.amazon.kinesis.processor.MultiStreamTracker;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.processor.SingleStreamTracker;
import software.amazon.kinesis.processor.StreamTracker;
-import software.amazon.kinesis.utils.MockObjectHelper;
@RunWith(MockitoJUnitRunner.class)
public class ConfigsBuilderTest {
- private final KinesisAsyncClient mockKinesisClient = MockObjectHelper.createKinesisClient();
+ @Mock
+ private KinesisAsyncClient mockKinesisClient;
@Mock
private DynamoDbAsyncClient mockDynamoClient;
@@ -53,18 +55,24 @@ public class ConfigsBuilderTest {
private static final String WORKER_IDENTIFIER = "worker-id";
@Test
- public void testTrackerConstruction() {
+ public void testSingleStreamTrackerConstruction() {
final String streamName = "single-stream";
- final ConfigsBuilder configByName = createConfig(streamName);
- final ConfigsBuilder configBySingleTracker = createConfig(new SingleStreamTracker(streamName));
+ final Arn streamArn = createArn(streamName);
- for (final ConfigsBuilder cb : Arrays.asList(configByName, configBySingleTracker)) {
+ for (final ConfigsBuilder cb : Arrays.asList(
+ createConfig(streamName),
+ createConfig(new SingleStreamTracker(streamName)),
+ createConfig(streamArn),
+ createConfig(new SingleStreamTracker(streamArn)))) {
assertEquals(Optional.empty(), cb.appStreamTracker().left());
assertEquals(streamName, cb.appStreamTracker().right().get());
assertEquals(streamName, cb.streamTracker().streamConfigList().get(0).streamIdentifier().streamName());
assertFalse(cb.streamTracker().isMultiStream());
}
+ }
+ @Test
+ public void testMultiStreamTrackerConstruction() {
final StreamTracker mockMultiStreamTracker = mock(MultiStreamTracker.class);
final ConfigsBuilder configByMultiTracker = createConfig(mockMultiStreamTracker);
assertEquals(Optional.empty(), configByMultiTracker.appStreamTracker().right());
@@ -78,9 +86,25 @@ public class ConfigsBuilderTest {
mockCloudWatchClient, WORKER_IDENTIFIER, mockShardProcessorFactory);
}
+ private ConfigsBuilder createConfig(Arn streamArn) {
+ // intentional invocation of constructor where streamArn is an Arn
+ return new ConfigsBuilder(streamArn, APPLICATION_NAME, mockKinesisClient, mockDynamoClient,
+ mockCloudWatchClient, WORKER_IDENTIFIER, mockShardProcessorFactory);
+ }
+
private ConfigsBuilder createConfig(StreamTracker streamTracker) {
return new ConfigsBuilder(streamTracker, APPLICATION_NAME, mockKinesisClient, mockDynamoClient,
mockCloudWatchClient, WORKER_IDENTIFIER, mockShardProcessorFactory);
}
-}
\ No newline at end of file
+ private static Arn createArn(String streamName) {
+ return Arn.builder()
+ .partition("aws")
+ .service("kinesis")
+ .region(Region.US_EAST_1.id())
+ .accountId("123456789012")
+ .resource("stream/" + streamName)
+ .build();
+ }
+
+}
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/common/FunctionCacheTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/common/FunctionCacheTest.java
deleted file mode 100644
index 2f55af4b..00000000
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/common/FunctionCacheTest.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Copyright 2023 Amazon.com, Inc. or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package software.amazon.kinesis.common;
-
-import java.util.function.Function;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.runners.MockitoJUnitRunner;
-
-@RunWith(MockitoJUnitRunner.class)
-public class FunctionCacheTest {
-
- @Mock
- private Function mockFunction;
-
- private FunctionCache cache;
-
- @Before
- public void setUp() {
- cache = new FunctionCache<>(mockFunction);
- }
-
- /**
- * Test that the cache stops invoking the encapsulated {@link Function}
- * after it returns a non-null value.
- */
- @Test
- public void testCache() {
- final int expectedValue = 3;
- when(mockFunction.apply(expectedValue)).thenReturn(expectedValue);
-
- assertNull(cache.get(1));
- assertNull(cache.get(2));
- assertEquals(expectedValue, cache.get(3));
- assertEquals(expectedValue, cache.get(4));
- assertEquals(expectedValue, cache.get(5));
- verify(mockFunction, times(expectedValue)).apply(anyInt());
- }
-}
\ No newline at end of file
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/common/StreamARNUtilTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/common/StreamARNUtilTest.java
deleted file mode 100644
index b6009b43..00000000
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/common/StreamARNUtilTest.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- * Copyright 2023 Amazon.com, Inc. or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package software.amazon.kinesis.common;
-
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-import org.powermock.reflect.Whitebox;
-import software.amazon.awssdk.arns.Arn;
-import software.amazon.awssdk.awscore.exception.AwsServiceException;
-import software.amazon.awssdk.core.exception.SdkClientException;
-import software.amazon.awssdk.http.SdkHttpClient;
-import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
-import software.amazon.awssdk.regions.Region;
-import software.amazon.awssdk.services.sts.StsClient;
-import software.amazon.awssdk.services.sts.StsClientBuilder;
-import software.amazon.awssdk.services.sts.model.GetCallerIdentityResponse;
-
-import java.lang.reflect.Field;
-import java.util.Optional;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyZeroInteractions;
-import static org.mockito.Mockito.when;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({ StreamARNUtil.class, StsClient.class, UrlConnectionHttpClient.class })
-public class StreamARNUtilTest {
- private static final String STS_RESPONSE_ARN_FORMAT = "arn:aws:sts::%s:assumed-role/Admin/alias";
- private static final String KINESIS_STREAM_ARN_FORMAT = "arn:aws:kinesis:us-east-1:%s:stream/%s";
-
- /**
- * Original {@link SupplierCache} that is constructed on class load.
- */
- private static final SupplierCache ORIGINAL_CACHE = Whitebox.getInternalState(
- StreamARNUtil.class, "CALLER_IDENTITY_ARN");
-
- private static final String ACCOUNT_ID = "12345";
-
- private static final String STREAM_NAME = StreamARNUtilTest.class.getSimpleName();
-
- @Mock
- private StsClientBuilder mockStsClientBuilder;
-
- @Mock
- private StsClient mockStsClient;
-
- private SupplierCache spySupplierCache;
-
- @Before
- public void setUp() throws Exception {
- MockitoAnnotations.initMocks(this);
-
- spySupplierCache = spy(ORIGINAL_CACHE);
- setUpSupplierCache(spySupplierCache);
-
- final Arn defaultArn = toArn(STS_RESPONSE_ARN_FORMAT, ACCOUNT_ID);
- doReturn(defaultArn).when(spySupplierCache).get();
- }
-
- private void setUpSts() {
- PowerMockito.mockStatic(StsClient.class);
- PowerMockito.mockStatic(UrlConnectionHttpClient.class);
-
- when(UrlConnectionHttpClient.builder()).thenReturn(mock(UrlConnectionHttpClient.Builder.class));
- when(StsClient.builder()).thenReturn(mockStsClientBuilder);
- when(mockStsClientBuilder.httpClient(any(SdkHttpClient.class))).thenReturn(mockStsClientBuilder);
- when(mockStsClientBuilder.build()).thenReturn(mockStsClient);
-
- // bypass the spy so the Sts clients are called
- when(spySupplierCache.get()).thenCallRealMethod();
- }
-
- /**
- * Wrap and embed the original {@link SupplierCache} with a spy to avoid
- * one-and-done cache behavior, provide each test precise control over
- * return values, and enable the ability to verify interactions via Mockito.
- */
- static void setUpSupplierCache(final SupplierCache cache) throws Exception {
- final Field f = StreamARNUtil.class.getDeclaredField("CALLER_IDENTITY_ARN");
- f.setAccessible(true);
- f.set(null, cache);
- f.setAccessible(false);
- }
-
- @Test
- public void testGetStreamARNHappyCase() {
- getStreamArn();
-
- verify(spySupplierCache).get();
- }
-
- @Test
- public void testGetStreamARNFromCache() {
- final Optional actualStreamARNOptional1 = getStreamArn();
- final Optional actualStreamARNOptional2 = getStreamArn();
-
- verify(spySupplierCache, times(2)).get();
- assertEquals(actualStreamARNOptional1, actualStreamARNOptional2);
- }
-
- @Test
- public void testGetStreamARNReturnsEmptyOnSTSError() {
- setUpSts();
-
- // Optional.empty() is expected when there is an error with the STS call and STS returns empty Arn
- when(mockStsClient.getCallerIdentity())
- .thenThrow(AwsServiceException.builder().message("testAwsServiceException").build())
- .thenThrow(SdkClientException.builder().message("testSdkClientException").build());
-
- assertEquals(Optional.empty(), StreamARNUtil.getStreamARN(STREAM_NAME, Region.US_EAST_1));
- assertEquals(Optional.empty(), StreamARNUtil.getStreamARN(STREAM_NAME, Region.US_EAST_1));
- verify(mockStsClient, times(2)).getCallerIdentity();
- verify(spySupplierCache, times(2)).get();
- }
-
- @Test(expected = IllegalStateException.class)
- public void testStsResponseWithoutAccountId() {
- setUpSts();
-
- final Arn arnWithoutAccountId = toArn(STS_RESPONSE_ARN_FORMAT, "");
- assertEquals(Optional.empty(), arnWithoutAccountId.accountId());
-
- final GetCallerIdentityResponse identityResponse = GetCallerIdentityResponse.builder()
- .arn(arnWithoutAccountId.toString()).build();
- when(mockStsClient.getCallerIdentity()).thenReturn(identityResponse);
-
- try {
- StreamARNUtil.getStreamARN(STREAM_NAME, Region.US_EAST_1);
- } finally {
- verify(mockStsClient).getCallerIdentity();
- }
- }
-
- @Test
- public void testGetStreamARNReturnsEmptyOnInvalidKinesisRegion() {
- // Optional.empty() is expected when kinesis region is not set correctly
- Optional actualStreamARNOptional = StreamARNUtil.getStreamARN(STREAM_NAME, null);
- assertEquals(Optional.empty(), actualStreamARNOptional);
- verifyZeroInteractions(mockStsClient);
- verifyZeroInteractions(spySupplierCache);
- }
-
- @Test
- public void testGetStreamARNWithProvidedAccountIDAndIgnoredSTSResult() {
- // If the account id is provided in the StreamIdentifier, it will override the result (account id) returned by sts
- final String cachedAccountId = "111111111111";
- final String providedAccountId = "222222222222";
-
- final Arn cachedArn = toArn(STS_RESPONSE_ARN_FORMAT, cachedAccountId);
- when(spySupplierCache.get()).thenReturn(cachedArn);
-
- final Optional actualStreamARNOptional = StreamARNUtil.getStreamARN(STREAM_NAME, Region.US_EAST_1,
- providedAccountId);
- final Arn expectedStreamARN = toArn(KINESIS_STREAM_ARN_FORMAT, providedAccountId, STREAM_NAME);
-
- verify(spySupplierCache).get();
- verifyZeroInteractions(mockStsClient);
- assertTrue(actualStreamARNOptional.isPresent());
- assertEquals(expectedStreamARN, actualStreamARNOptional.get());
- }
-
- private static Optional getStreamArn() {
- final Optional actualArn = StreamARNUtil.getStreamARN(STREAM_NAME, Region.US_EAST_1);
- final Arn expectedArn = toArn(KINESIS_STREAM_ARN_FORMAT, ACCOUNT_ID, STREAM_NAME);
-
- assertTrue(actualArn.isPresent());
- assertEquals(expectedArn, actualArn.get());
-
- return actualArn;
- }
-
- private static Arn toArn(final String format, final Object... params) {
- return Arn.fromString(String.format(format, params));
- }
-
-}
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/common/StreamIdentifierTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/common/StreamIdentifierTest.java
index b3f4991b..d511864f 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/common/StreamIdentifierTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/common/StreamIdentifierTest.java
@@ -1,11 +1,7 @@
package software.amazon.kinesis.common;
import org.junit.Assert;
-import org.junit.BeforeClass;
import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
import software.amazon.awssdk.arns.Arn;
import software.amazon.awssdk.regions.Region;
@@ -13,27 +9,16 @@ import java.util.Arrays;
import java.util.Optional;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-import static org.mockito.Mockito.when;
-import static org.powermock.api.mockito.PowerMockito.mockStatic;
-import static org.powermock.api.mockito.PowerMockito.verifyStatic;
-import static software.amazon.kinesis.common.StreamARNUtil.getStreamARN;
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(StreamARNUtil.class)
public class StreamIdentifierTest {
private static final String STREAM_NAME = "stream-name";
+ private static final String PARTITION = "aws";
+ private static final String SERVICE = "kinesis";
private static final Region KINESIS_REGION = Region.US_WEST_1;
private static final String TEST_ACCOUNT_ID = "123456789012";
+ private static final String RESOURCE = "stream/" + STREAM_NAME;
private static final long EPOCH = 1680616058L;
-
- private static final Arn DEFAULT_ARN = toArn(KINESIS_REGION);
-
- @BeforeClass
- public static void setUpBeforeClass() throws Exception {
- StreamARNUtilTest.setUpSupplierCache(new SupplierCache<>(() -> DEFAULT_ARN));
- }
+ private static final Arn DEFAULT_ARN = createArn();
/**
* Test patterns that should match a serialization regex.
@@ -51,74 +36,86 @@ public class StreamIdentifierTest {
@Test
public void testMultiStreamDeserializationFail() {
for (final String pattern : Arrays.asList(
- // arn examples
- "arn:aws:kinesis::123456789012:stream/stream-name", // missing region
- "arn:aws:kinesis:region::stream/stream-name", // missing account id
- "arn:aws:kinesis:region:123456789:stream/stream-name", // account id not 12 digits
- "arn:aws:kinesis:region:123456789abc:stream/stream-name", // 12char alphanumeric account id
- "arn:aws:kinesis:region:123456789012:stream/", // missing stream-name
- // serialization examples
":stream-name:123", // missing account id
// "123456789:stream-name:123", // account id not 12 digits
"123456789abc:stream-name:123", // 12char alphanumeric account id
"123456789012::123", // missing stream name
"123456789012:stream-name", // missing delimiter and creation epoch
"123456789012:stream-name:", // missing creation epoch
+ "123456789012:stream-name:-123", // negative creation epoch
"123456789012:stream-name:abc", // non-numeric creation epoch
""
)) {
try {
StreamIdentifier.multiStreamInstance(pattern);
- Assert.fail(pattern + " should not have created a StreamIdentifier");
+ Assert.fail("Serialization " + pattern + " should not have created a StreamIdentifier");
} catch (final IllegalArgumentException iae) {
// expected; ignore
}
}
}
+ /**
+ * Test ARNs that should not match a valid AWS Kinesis stream ARN.
+ */
@Test
- public void testInstanceFromArn() {
- final Arn arn = toArn(KINESIS_REGION);
- final StreamIdentifier single = StreamIdentifier.singleStreamInstance(arn.toString());
- final StreamIdentifier multi = StreamIdentifier.multiStreamInstance(arn.toString());
-
- assertEquals(single, multi);
- assertEquals(Optional.empty(), single.streamCreationEpochOptional());
- assertActualStreamIdentifierExpected(arn, single);
+ public void testMultiStreamByArnWithInvalidStreamArnFail() {
+ for (final Arn invalidStreamArn : Arrays.asList(
+ createArn("abc", SERVICE, KINESIS_REGION, TEST_ACCOUNT_ID, RESOURCE), // invalid partition
+ createArn(PARTITION, "dynamodb", KINESIS_REGION, TEST_ACCOUNT_ID, RESOURCE), // incorrect service
+ createArn(PARTITION, SERVICE, null, TEST_ACCOUNT_ID, RESOURCE), // missing region
+ createArn(PARTITION, SERVICE, KINESIS_REGION, null, RESOURCE), // missing account id
+ createArn(PARTITION, SERVICE, KINESIS_REGION, "123456789", RESOURCE), // account id not 12 digits
+ createArn(PARTITION, SERVICE, KINESIS_REGION, "123456789abc", RESOURCE), // 12char alphanumeric account id
+ createArn(PARTITION, SERVICE, KINESIS_REGION, TEST_ACCOUNT_ID, "table/name"), // incorrect resource type
+ Arn.fromString("arn:aws:dynamodb:us-east-2:123456789012:table/myDynamoDBTable") // valid ARN for incorrect resource
+ )) {
+ try {
+ StreamIdentifier.multiStreamInstance(invalidStreamArn, EPOCH);
+ Assert.fail("Arn " + invalidStreamArn + " should not have created a StreamIdentifier");
+ } catch (final IllegalArgumentException iae) {
+ // expected; ignore
+ }
+ }
}
@Test(expected = IllegalArgumentException.class)
- public void testInstanceWithoutEpochOrArn() {
- mockStatic(StreamARNUtil.class);
- when(getStreamARN(STREAM_NAME, KINESIS_REGION, TEST_ACCOUNT_ID))
- .thenReturn(Optional.empty());
+ public void testNegativeCreationEpoch() {
+ StreamIdentifier.multiStreamInstance(DEFAULT_ARN, -123);
+ }
- try {
- StreamIdentifier.singleStreamInstance(DEFAULT_ARN.toString());
- } finally {
- verifyStatic(StreamARNUtil.class);
- getStreamARN(STREAM_NAME, KINESIS_REGION, TEST_ACCOUNT_ID);
- }
+ @Test(expected = IllegalArgumentException.class)
+ public void testZeroCreationEpoch() {
+ StreamIdentifier.multiStreamInstance(DEFAULT_ARN, 0);
+ }
+
+ @Test
+ public void testSingleStreamInstanceFromArn() {
+ final StreamIdentifier actualStreamIdentifier = StreamIdentifier.singleStreamInstance(DEFAULT_ARN);
+
+ assertActualStreamIdentifierExpected(DEFAULT_ARN, actualStreamIdentifier);
+ assertEquals(Optional.empty(), actualStreamIdentifier.streamCreationEpochOptional());
+ assertEquals(actualStreamIdentifier.streamName(), actualStreamIdentifier.serialize());
+ }
+
+ @Test
+ public void testMultiStreamInstanceFromArn() {
+ final StreamIdentifier actualStreamIdentifier = StreamIdentifier.multiStreamInstance(DEFAULT_ARN, EPOCH);
+
+ assertActualStreamIdentifierExpected(DEFAULT_ARN, actualStreamIdentifier);
+ assertEquals(Optional.of(EPOCH), actualStreamIdentifier.streamCreationEpochOptional());
+ assertEquals(serialize(), actualStreamIdentifier.serialize());
}
@Test
public void testSingleStreamInstanceWithName() {
StreamIdentifier actualStreamIdentifier = StreamIdentifier.singleStreamInstance(STREAM_NAME);
- assertFalse(actualStreamIdentifier.streamCreationEpochOptional().isPresent());
- assertFalse(actualStreamIdentifier.accountIdOptional().isPresent());
- assertFalse(actualStreamIdentifier.streamARNOptional().isPresent());
+ assertEquals(Optional.empty(), actualStreamIdentifier.streamCreationEpochOptional());
+ assertEquals(Optional.empty(), actualStreamIdentifier.accountIdOptional());
+ assertEquals(Optional.empty(), actualStreamIdentifier.streamArnOptional());
assertEquals(STREAM_NAME, actualStreamIdentifier.streamName());
}
- @Test
- public void testSingleStreamInstanceWithNameAndRegion() {
- StreamIdentifier actualStreamIdentifier = StreamIdentifier.singleStreamInstance(STREAM_NAME, KINESIS_REGION);
- assertFalse(actualStreamIdentifier.streamCreationEpochOptional().isPresent());
- assertFalse(actualStreamIdentifier.accountIdOptional().isPresent());
- assertEquals(STREAM_NAME, actualStreamIdentifier.streamName());
- assertEquals(Optional.of(DEFAULT_ARN), actualStreamIdentifier.streamARNOptional());
- }
-
@Test
public void testMultiStreamInstanceWithIdentifierSerialization() {
StreamIdentifier actualStreamIdentifier = StreamIdentifier.multiStreamInstance(serialize());
@@ -126,34 +123,10 @@ public class StreamIdentifierTest {
assertEquals(Optional.of(EPOCH), actualStreamIdentifier.streamCreationEpochOptional());
}
- /**
- * When KCL's Kinesis endpoint is a region, it lacks visibility to streams
- * in other regions. Therefore, when the endpoint and ARN conflict, an
- * Exception should be thrown.
- */
- @Test(expected = IllegalArgumentException.class)
- public void testConflictOnRegions() {
- final Region arnRegion = Region.US_GOV_EAST_1;
- assertNotEquals(arnRegion, KINESIS_REGION);
-
- StreamIdentifier.multiStreamInstance(toArn(arnRegion).toString(), KINESIS_REGION);
- }
-
- @Test
- public void testMultiStreamInstanceWithoutRegionSerialized() {
- StreamIdentifier actualStreamIdentifier = StreamIdentifier.multiStreamInstance(
- serialize(), KINESIS_REGION);
- assertActualStreamIdentifierExpected(actualStreamIdentifier);
- }
-
- private void assertActualStreamIdentifierExpected(StreamIdentifier actual) {
- assertActualStreamIdentifierExpected(DEFAULT_ARN, actual);
- }
-
private void assertActualStreamIdentifierExpected(Arn expectedArn, StreamIdentifier actual) {
assertEquals(STREAM_NAME, actual.streamName());
assertEquals(Optional.of(TEST_ACCOUNT_ID), actual.accountIdOptional());
- assertEquals(Optional.ofNullable(expectedArn), actual.streamARNOptional());
+ assertEquals(Optional.ofNullable(expectedArn), actual.streamArnOptional());
}
/**
@@ -163,11 +136,18 @@ public class StreamIdentifierTest {
return String.join(":", TEST_ACCOUNT_ID, STREAM_NAME, Long.toString(EPOCH));
}
- private static Arn toArn(final Region region) {
- return Arn.builder().partition("aws").service("kinesis")
- .accountId(TEST_ACCOUNT_ID)
- .resource("stream/" + STREAM_NAME)
- .region(region.toString())
+ private static Arn createArn() {
+ return createArn(PARTITION, SERVICE, KINESIS_REGION, TEST_ACCOUNT_ID, RESOURCE);
+ }
+
+ private static Arn createArn(String partition, String service, Region region, String account, String resource) {
+ return Arn.builder()
+ .partition(partition)
+ .service(service)
+ .region(region != null ? region.id() : null)
+ .accountId(account)
+ .resource(resource)
.build();
}
+
}
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/common/SupplierCacheTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/common/SupplierCacheTest.java
deleted file mode 100644
index a0bde098..00000000
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/common/SupplierCacheTest.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Copyright 2023 Amazon.com, Inc. or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package software.amazon.kinesis.common;
-
-import java.util.function.Supplier;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertSame;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.runners.MockitoJUnitRunner;
-
-@RunWith(MockitoJUnitRunner.class)
-public class SupplierCacheTest {
-
- private static final Object DUMMY_RESULT = SupplierCacheTest.class;
-
- @Mock
- private Supplier