diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java index 0fc7d2b2..1a81f606 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java @@ -15,46 +15,89 @@ package software.amazon.kinesis.common; -import com.google.common.base.Joiner; import lombok.AccessLevel; import lombok.Builder; import lombok.EqualsAndHashCode; import lombok.Getter; -import lombok.ToString; +import lombok.NonNull; import lombok.experimental.Accessors; import software.amazon.awssdk.arns.Arn; import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryResponse; import software.amazon.awssdk.utils.Validate; +import software.amazon.kinesis.retrieval.KinesisClientFacade; import java.util.Optional; +import java.util.regex.Matcher; import java.util.regex.Pattern; @Builder(access = AccessLevel.PRIVATE) @EqualsAndHashCode @Getter -@ToString @Accessors(fluent = true) public class StreamIdentifier { + @Builder.Default private final Optional accountIdOptional = Optional.empty(); + @NonNull private final String streamName; @Builder.Default - private final Optional streamCreationEpochOptional = Optional.empty(); + private Optional streamCreationEpochOptional = Optional.empty(); @Builder.Default private final Optional streamARNOptional = Optional.empty(); - private static final String DELIMITER = ":"; - private static final Pattern PATTERN = Pattern.compile(".*" + ":" + ".*" + ":" + "[0-9]*" + ":?([a-z]{2}(-gov)?-[a-z]+-\\d{1})?"); + /** + * Pattern for a serialized {@link StreamIdentifier}. The valid format is + * {@code ::[:]} where + * {@code region} is the id representation of a {@link Region} and is + * optional. + */ + private static final Pattern STREAM_IDENTIFIER_PATTERN = Pattern.compile( + // `?::` has two parts: `?:` starts a non-capturing group, and + // `:` is the first character in the group (i.e., ":") + "(?[0-9]+):(?[^:]+):(?[0-9]+)(?::(?[-a-z0-9]+))?"); + + /** + * Pattern for a stream ARN. The valid format is + * {@code arn:aws:kinesis:::stream:} + * where {@code region} is the id representation of a {@link Region}. + */ + private static final Pattern STREAM_ARN_PATTERN = Pattern.compile( + "arn:aws:kinesis:(?[-a-z0-9]+):(?[0-9]{12}):stream/(?.+)"); /** * Serialize the current StreamIdentifier instance. - * TODO: Consider appending region info for cross-account consumer support - * @return + * + * @return a String of {@code account:stream:creationEpoch[:region]} + * where {@code region} is the id representation of a {@link Region} + * and is optional. */ public String serialize() { - return accountIdOptional.isPresent() ? - Joiner.on(DELIMITER).join(accountIdOptional.get(), streamName, streamCreationEpochOptional.get()) : - streamName; + if (!accountIdOptional.isPresent()) { + return streamName; + } + + if (!streamCreationEpochOptional.isPresent()) { + // FIXME bias-for-action hack to simplify back-porting into KCL 1.x and facilitate the + // backwards-compatible requirement. There's a chicken-and-egg issue if DSS is + // called as the application is being configured (and before the client is rigged). + // Furthermore, if epoch isn't lazy-loaded here, the problem quickly spirals into + // systemic issues of concurrency and consistency (e.g., PeriodicShardSyncManager, + // Scheduler, DDB leases). We should look at leveraging dependency injection. + // (NOTE: not to inject the Kinesis client here, but to ensure the client is + // accessible elsewhere ASAP.) + final DescribeStreamSummaryResponse dss = KinesisClientFacade.describeStreamSummary( + streamARNOptional().get().toString()); + final long creationEpoch = dss.streamDescriptionSummary().streamCreationTimestamp().getEpochSecond(); + streamCreationEpochOptional = Optional.of(creationEpoch); + } + + final char delimiter = ':'; + final StringBuilder sb = new StringBuilder(accountIdOptional.get()).append(delimiter) + .append(streamName).append(delimiter); + streamCreationEpochOptional.ifPresent(sb::append); + streamARNOptional.flatMap(Arn::region).ifPresent(region -> sb.append(delimiter).append(region)); + return sb.toString(); } @Override @@ -64,61 +107,113 @@ public class StreamIdentifier { /** * Create a multi stream instance for StreamIdentifier from serialized stream identifier. - * See the format of a serialized stream identifier at {@link StreamIdentifier#multiStreamInstance(String, Region)} - * @param streamIdentifierSer - * @return StreamIdentifier + * + * @param serializationOrArn serialized {@link StreamIdentifier} or AWS ARN of a Kinesis stream + * + * @see #multiStreamInstance(String, Region) + * @see #serialize() */ - public static StreamIdentifier multiStreamInstance(String streamIdentifierSer) { - return multiStreamInstance(streamIdentifierSer, null); + public static StreamIdentifier multiStreamInstance(String serializationOrArn) { + return multiStreamInstance(serializationOrArn, null); } /** * Create a multi stream instance for StreamIdentifier from serialized stream identifier. - * @param streamIdentifierSer The serialized stream identifier should be of the format - * account:stream:creationepoch[:region] + * + * @param serializationOrArn serialized {@link StreamIdentifier} or AWS ARN of a Kinesis stream * @param kinesisRegion This nullable region is used to construct the optional StreamARN - * @return StreamIdentifier + * + * @see #serialize() */ - public static StreamIdentifier multiStreamInstance(String streamIdentifierSer, Region kinesisRegion) { - if (PATTERN.matcher(streamIdentifierSer).matches()) { - final String[] split = streamIdentifierSer.split(DELIMITER); - final String streamName = split[1]; - final Optional accountId = Optional.ofNullable(split[0]); - StreamIdentifierBuilder builder = StreamIdentifier.builder() - .accountIdOptional(accountId) - .streamName(streamName) - .streamCreationEpochOptional(Optional.of(Long.parseLong(split[2]))); - final Region region = (split.length == 4) ? - Region.of(split[3]) : // Use the region extracted from the serialized string, which matches the regex pattern - kinesisRegion; // Otherwise just use the provided region - final Optional streamARN = StreamARNUtil.getStreamARN(streamName, region, accountId); - return builder.streamARNOptional(streamARN).build(); - } else { - throw new IllegalArgumentException("Unable to deserialize StreamIdentifier from " + streamIdentifierSer); + public static StreamIdentifier multiStreamInstance(String serializationOrArn, Region kinesisRegion) { + final StreamIdentifier fromSerialization = fromSerialization(serializationOrArn, kinesisRegion); + if (fromSerialization != null) { + return fromSerialization; } + final StreamIdentifier fromArn = fromArn(serializationOrArn, kinesisRegion); + if (fromArn != null) { + return fromArn; + } + + throw new IllegalArgumentException("Unable to deserialize StreamIdentifier from " + serializationOrArn); } /** * Create a single stream instance for StreamIdentifier from stream name. - * @param streamName - * @return StreamIdentifier + * + * @param streamNameOrArn stream name or AWS ARN of a Kinesis stream */ - public static StreamIdentifier singleStreamInstance(String streamName) { - return singleStreamInstance(streamName, null); + public static StreamIdentifier singleStreamInstance(String streamNameOrArn) { + return singleStreamInstance(streamNameOrArn, null); } /** * Create a single stream instance for StreamIdentifier from the provided stream name and kinesisRegion. * This method also constructs the optional StreamARN based on the region info. - * @param streamName - * @param kinesisRegion - * @return StreamIdentifier + * + * @param streamNameOrArn stream name or AWS ARN of a Kinesis stream + * @param kinesisRegion (optional) region used to construct the ARN */ - public static StreamIdentifier singleStreamInstance(String streamName, Region kinesisRegion) { - Validate.notEmpty(streamName, "StreamName should not be empty"); + public static StreamIdentifier singleStreamInstance(String streamNameOrArn, Region kinesisRegion) { + Validate.notEmpty(streamNameOrArn, "StreamName should not be empty"); + + final StreamIdentifier fromArn = fromArn(streamNameOrArn, kinesisRegion); + if (fromArn != null) { + return fromArn; + } + return StreamIdentifier.builder() - .streamName(streamName) - .streamARNOptional(StreamARNUtil.getStreamARN(streamName, kinesisRegion)) + .streamName(streamNameOrArn) + .streamARNOptional(StreamARNUtil.getStreamARN(streamNameOrArn, kinesisRegion)) .build(); } + + /** + * Deserializes a StreamIdentifier from {@link #STREAM_IDENTIFIER_PATTERN}. + * + * @param input input string (e.g., ARN, serialized instance) to convert into an instance + * @param kinesisRegion (optional) region used to construct the ARN + * @return a StreamIdentifier instance if the pattern matched, otherwise null + */ + private static StreamIdentifier fromSerialization(final String input, final Region kinesisRegion) { + final Matcher matcher = STREAM_IDENTIFIER_PATTERN.matcher(input); + return matcher.matches() + ? toStreamIdentifier(matcher, matcher.group("creationEpoch"), kinesisRegion) : null; + } + + /** + * Constructs a StreamIdentifier from {@link #STREAM_ARN_PATTERN}. + * + * @param input input string (e.g., ARN, serialized instance) to convert into an instance + * @param kinesisRegion (optional) region used to construct the ARN + * @return a StreamIdentifier instance if the pattern matched, otherwise null + */ + private static StreamIdentifier fromArn(final String input, final Region kinesisRegion) { + final Matcher matcher = STREAM_ARN_PATTERN.matcher(input); + return matcher.matches() + ? toStreamIdentifier(matcher, "", kinesisRegion) : null; + } + + private static StreamIdentifier toStreamIdentifier(final Matcher matcher, final String matchedEpoch, + final Region kinesisRegion) { + final Optional accountId = Optional.of(matcher.group("accountId")); + final String streamName = matcher.group("streamName"); + final Optional creationEpoch = matchedEpoch.isEmpty() ? Optional.empty() + : Optional.of(Long.valueOf(matchedEpoch)); + final String matchedRegion = matcher.group("region"); + final Region region = (matchedRegion != null) ? Region.of(matchedRegion) : kinesisRegion; + final Optional arn = StreamARNUtil.getStreamARN(streamName, region, accountId); + + if (!creationEpoch.isPresent() && !arn.isPresent()) { + throw new IllegalArgumentException("Cannot create StreamIdentifier if missing both ARN and creation epoch"); + } + + return StreamIdentifier.builder() + .accountIdOptional(accountId) + .streamName(streamName) + .streamCreationEpochOptional(creationEpoch) + .streamARNOptional(arn) + .build(); + } + } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/KinesisClientFacade.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/KinesisClientFacade.java new file mode 100644 index 00000000..5c9e732b --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/KinesisClientFacade.java @@ -0,0 +1,100 @@ +package software.amazon.kinesis.retrieval; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; + +import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryRequest; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryResponse; +import software.amazon.awssdk.services.kinesis.model.KinesisException; +import software.amazon.awssdk.services.kinesis.model.LimitExceededException; +import software.amazon.awssdk.services.kinesis.model.ResourceInUseException; +import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; +import software.amazon.kinesis.common.KinesisRequestsBuilder; + +/** + * Facade pattern to simplify interactions with a {@link KinesisAsyncClient}. + */ +@Slf4j +public final class KinesisClientFacade { + + /** + * Reusable {@link AWSExceptionManager}. + *

+ * N.B. This instance is mutable, but thread-safe for read-only use. + *

+ */ + private static final AWSExceptionManager AWS_EXCEPTION_MANAGER; + + // FIXME dependency injection + private static KinesisAsyncClient kinesisClient; + + static { + AWS_EXCEPTION_MANAGER = new AWSExceptionManager(); + AWS_EXCEPTION_MANAGER.add(KinesisException.class, t -> t); + AWS_EXCEPTION_MANAGER.add(LimitExceededException.class, t -> t); + AWS_EXCEPTION_MANAGER.add(ResourceInUseException.class, t -> t); + AWS_EXCEPTION_MANAGER.add(ResourceNotFoundException.class, t -> t); + } + + static void initialize(final KinesisAsyncClient client) { + kinesisClient = client; + } + + public static DescribeStreamSummaryResponse describeStreamSummary(final String streamArn) { + final DescribeStreamSummaryRequest request = KinesisRequestsBuilder + .describeStreamSummaryRequestBuilder().streamARN(streamArn).build(); + final ServiceCallerSupplier dss = + () -> kinesisClient.describeStreamSummary(request).get(); + return retryWhenThrottled(dss, 3, streamArn, "DescribeStreamSummary"); + } + + // FIXME code lifted-and-shifted from FanOutConsumerRegistration; that class + // (and others) should not be responsible for interacting directly with + // the thread-safe Kinesis client (and handling retries, etc.) + private static T retryWhenThrottled( + @NonNull final ServiceCallerSupplier retriever, + final int maxRetries, + final String streamArn, + @NonNull final String apiName) { + LimitExceededException finalException = null; + + int retries = maxRetries; + while (retries > 0) { + try { + try { + return retriever.get(); + } catch (ExecutionException e) { + throw AWS_EXCEPTION_MANAGER.apply(e.getCause()); + } catch (InterruptedException e) { + throw KinesisException.create("Unable to complete " + apiName, e); + } catch (TimeoutException te) { + log.info("Timed out waiting for " + apiName + " for " + streamArn); + } + } catch (LimitExceededException e) { + log.info("{} : Throttled while calling {} API, will backoff.", streamArn, apiName); + try { + Thread.sleep(1000 + (long) (Math.random() * 100)); + } catch (InterruptedException ie) { + log.debug("Sleep interrupted, shutdown invoked."); + } + finalException = e; + } + retries--; + } + + if (finalException == null) { + throw new IllegalStateException(streamArn + " : Exhausted retries while calling " + apiName); + } + + throw finalException; + } + + @FunctionalInterface + private interface ServiceCallerSupplier { + T get() throws ExecutionException, InterruptedException, TimeoutException; + } + +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java index d8fcf39e..153faf70 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java @@ -33,7 +33,6 @@ import software.amazon.kinesis.processor.MultiStreamTracker; import software.amazon.kinesis.processor.SingleStreamTracker; import software.amazon.kinesis.processor.StreamTracker; import software.amazon.kinesis.retrieval.fanout.FanOutConfig; -import software.amazon.kinesis.retrieval.polling.PollingConfig; /** * Used by the KCL to configure the retrieval of records from Kinesis. @@ -132,6 +131,8 @@ public class RetrievalConfig { this.applicationName = applicationName; this.appStreamTracker = DeprecationUtils.convert(streamTracker, singleStreamTracker -> singleStreamTracker.streamConfigList().get(0)); + + KinesisClientFacade.initialize(kinesisAsyncClient); } /** diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/common/StreamIdentifierTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/common/StreamIdentifierTest.java index 3c4b5e34..115cab03 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/common/StreamIdentifierTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/common/StreamIdentifierTest.java @@ -1,5 +1,6 @@ package software.amazon.kinesis.common; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -8,38 +9,113 @@ import org.powermock.modules.junit4.PowerMockRunner; import software.amazon.awssdk.arns.Arn; import software.amazon.awssdk.regions.Region; +import java.util.Arrays; import java.util.Optional; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertNotNull; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; -import static org.mockito.Matchers.eq; import static org.mockito.Mockito.when; import static org.powermock.api.mockito.PowerMockito.mockStatic; +import static org.powermock.api.mockito.PowerMockito.verifyStatic; +import static software.amazon.kinesis.common.StreamARNUtil.getStreamARN; @RunWith(PowerMockRunner.class) @PrepareForTest(StreamARNUtil.class) public class StreamIdentifierTest { - private static final String STREAM_NAME = "streamName"; + private static final String STREAM_NAME = "stream-name"; private static final Region KINESIS_REGION = Region.US_WEST_1; - private static final String TEST_ACCOUNT_ID = "111111111111"; - private static final String EPOCH = "1680616058"; + private static final String TEST_ACCOUNT_ID = "123456789012"; + private static final long EPOCH = 1680616058L; - private static final Arn DEFAULT_ARN = Arn.builder().partition("aws").service("kinesis") - .region(KINESIS_REGION.toString()).accountId(TEST_ACCOUNT_ID).resource("stream/" + STREAM_NAME).build(); + private static final Arn DEFAULT_ARN = toArn(KINESIS_REGION); @Before public void setUp() { mockStatic(StreamARNUtil.class); - when(StreamARNUtil.getStreamARN(anyString(), any(Region.class))).thenReturn(Optional.empty()); - when(StreamARNUtil.getStreamARN(STREAM_NAME, KINESIS_REGION)).thenReturn(Optional.of(DEFAULT_ARN)); - when(StreamARNUtil.getStreamARN(STREAM_NAME, KINESIS_REGION, Optional.of(TEST_ACCOUNT_ID))) + when(getStreamARN(anyString(), any(Region.class))).thenReturn(Optional.empty()); + when(getStreamARN(STREAM_NAME, KINESIS_REGION)).thenReturn(Optional.of(DEFAULT_ARN)); + when(getStreamARN(STREAM_NAME, KINESIS_REGION, Optional.of(TEST_ACCOUNT_ID))) .thenReturn(Optional.of(DEFAULT_ARN)); } + /** + * Test patterns that should match a serialization regex. + */ + @Test + public void testMultiStreamDeserializationSuccess() { + for (final String pattern : Arrays.asList( + // arn examples + toArn(KINESIS_REGION).toString(), + // serialization examples + "123456789012:stream-name:123", + "123456789012:stream-name:123:" + Region.US_ISOB_EAST_1 + )) { + final StreamIdentifier si = StreamIdentifier.multiStreamInstance(pattern); + assertNotNull(si); + } + } + + /** + * Test patterns that should not match a serialization regex. + */ + @Test + public void testMultiStreamDeserializationFail() { + for (final String pattern : Arrays.asList( + // arn examples + "arn:aws:kinesis::123456789012:stream/stream-name", // missing region + "arn:aws:kinesis:region::stream/stream-name", // missing account id + "arn:aws:kinesis:region:123456789:stream/stream-name", // account id not 12 digits + "arn:aws:kinesis:region:123456789abc:stream/stream-name", // 12char alphanumeric account id + "arn:aws:kinesis:region:123456789012:stream/", // missing stream-name + // serialization examples + ":stream-name:123", // missing account id + "123456789012:stream-name", // missing delimiter before creation epoch + "accountId:stream-name:123", // non-numeric account id +// "123456789:stream-name:123", // account id not 12 digits + "123456789abc:stream-name:123", // 12char alphanumeric account id + "123456789012::123", // missing stream name + "123456789012:stream-name:", // missing creation epoch + "123456789012:stream-name::", // missing creation epoch; ':' for optional region yet missing region + "123456789012:stream-name::us-east-1", // missing creation epoch + "123456789012:stream-name:abc", // non-numeric creation epoch + "123456789012:stream-name:abc:", // non-numeric creation epoch with ':' yet missing region + "123456789012:stream-name:123:", // ':' for optional region yet missing region + "" + )) { + try { + StreamIdentifier.multiStreamInstance(pattern); + Assert.fail(pattern + " should not have created a StreamIdentifier"); + } catch (final IllegalArgumentException iae) { + // expected; ignore + } + } + } + + @Test + public void testInstanceFromArn() { + final Arn arn = toArn(KINESIS_REGION); + final StreamIdentifier single = StreamIdentifier.singleStreamInstance(arn.toString()); + final StreamIdentifier multi = StreamIdentifier.multiStreamInstance(arn.toString()); + + assertEquals(single, multi); + assertEquals(Optional.of(TEST_ACCOUNT_ID), single.accountIdOptional()); + assertEquals(STREAM_NAME, single.streamName()); + assertEquals(Optional.of(arn), single.streamARNOptional()); + } + + @Test(expected = IllegalArgumentException.class) + public void testInstanceWithoutEpochOrArn() { + when(getStreamARN(STREAM_NAME, KINESIS_REGION, Optional.of(TEST_ACCOUNT_ID))) + .thenReturn(Optional.empty()); + + final Arn arn = toArn(KINESIS_REGION); + StreamIdentifier.singleStreamInstance(arn.toString()); + } + @Test public void testSingleStreamInstanceWithName() { StreamIdentifier actualStreamIdentifier = StreamIdentifier.singleStreamInstance(STREAM_NAME); @@ -54,44 +130,64 @@ public class StreamIdentifierTest { StreamIdentifier actualStreamIdentifier = StreamIdentifier.singleStreamInstance(STREAM_NAME, KINESIS_REGION); assertFalse(actualStreamIdentifier.streamCreationEpochOptional().isPresent()); assertFalse(actualStreamIdentifier.accountIdOptional().isPresent()); - assertTrue(actualStreamIdentifier.streamARNOptional().isPresent()); - assertEquals(DEFAULT_ARN, actualStreamIdentifier.streamARNOptional().get()); + assertEquals(Optional.of(DEFAULT_ARN), actualStreamIdentifier.streamARNOptional()); } @Test public void testMultiStreamInstanceWithIdentifierSerialization() { - StreamIdentifier actualStreamIdentifier = StreamIdentifier.multiStreamInstance( - String.join(":", TEST_ACCOUNT_ID, STREAM_NAME, EPOCH, KINESIS_REGION.toString())); - assertActualStreamIdentifierExpected(DEFAULT_ARN, actualStreamIdentifier); + StreamIdentifier actualStreamIdentifier = StreamIdentifier.multiStreamInstance(serialize(KINESIS_REGION)); + assertActualStreamIdentifierExpected(actualStreamIdentifier); } @Test public void testMultiStreamInstanceWithRegionSerialized() { Region serializedRegion = Region.US_GOV_EAST_1; - Optional arn = Optional.ofNullable(Arn.builder().partition("aws").service("kinesis") - .accountId(TEST_ACCOUNT_ID).region(serializedRegion.toString()).resource("stream/" + STREAM_NAME).build()); + final Optional arn = Optional.of(toArn(serializedRegion)); - when(StreamARNUtil.getStreamARN(eq(STREAM_NAME), eq(serializedRegion), any(Optional.class))).thenReturn(arn); + when(getStreamARN(STREAM_NAME, serializedRegion, Optional.of(TEST_ACCOUNT_ID))).thenReturn(arn); + + final String expectedSerialization = serialize(serializedRegion); StreamIdentifier actualStreamIdentifier = StreamIdentifier.multiStreamInstance( - String.join(":", TEST_ACCOUNT_ID, STREAM_NAME, EPOCH, serializedRegion.toString()), KINESIS_REGION); + expectedSerialization, KINESIS_REGION); assertActualStreamIdentifierExpected(arn, actualStreamIdentifier); + assertEquals(expectedSerialization, actualStreamIdentifier.serialize()); + verifyStatic(StreamARNUtil.class); + getStreamARN(STREAM_NAME, serializedRegion, Optional.of(TEST_ACCOUNT_ID)); } @Test public void testMultiStreamInstanceWithoutRegionSerialized() { StreamIdentifier actualStreamIdentifier = StreamIdentifier.multiStreamInstance( - String.join(":", TEST_ACCOUNT_ID, STREAM_NAME, EPOCH), KINESIS_REGION); - assertActualStreamIdentifierExpected(DEFAULT_ARN, actualStreamIdentifier); + serialize(null), KINESIS_REGION); + assertActualStreamIdentifierExpected(actualStreamIdentifier); } - private void assertActualStreamIdentifierExpected(Optional expectedARN, StreamIdentifier actual) { - assertActualStreamIdentifierExpected(expectedARN.get(), actual); + private void assertActualStreamIdentifierExpected(StreamIdentifier actual) { + assertActualStreamIdentifierExpected(Optional.of(DEFAULT_ARN), actual); } - private void assertActualStreamIdentifierExpected(Arn expectedARN, StreamIdentifier actual) { - assertTrue(actual.streamCreationEpochOptional().isPresent()); - assertTrue(actual.accountIdOptional().isPresent()); - assertTrue(actual.streamARNOptional().isPresent()); - assertEquals(Optional.of(expectedARN), actual.streamARNOptional()); + private void assertActualStreamIdentifierExpected(Optional expectedArn, StreamIdentifier actual) { + assertEquals(STREAM_NAME, actual.streamName()); + assertEquals(Optional.of(EPOCH), actual.streamCreationEpochOptional()); + assertEquals(Optional.of(TEST_ACCOUNT_ID), actual.accountIdOptional()); + assertEquals(expectedArn, actual.streamARNOptional()); + } + + /** + * Creates a pattern that matches {@link StreamIdentifier} serialization. + * + * @param region (optional) region to serialize + */ + private static String serialize(final Region region) { + return String.join(":", TEST_ACCOUNT_ID, STREAM_NAME, Long.toString(EPOCH)) + + ((region == null) ? "" : ':' + region.toString()); + } + + private static Arn toArn(final Region region) { + return Arn.builder().partition("aws").service("kinesis") + .accountId(TEST_ACCOUNT_ID) + .resource("stream/" + STREAM_NAME) + .region(region.toString()) + .build(); } } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManagerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManagerTest.java index 9f8b34ac..f7492d8d 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManagerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManagerTest.java @@ -73,7 +73,7 @@ public class PeriodicShardSyncManagerTest { @Before public void setup() { - streamIdentifier = StreamIdentifier.multiStreamInstance("123:stream:456"); + streamIdentifier = StreamIdentifier.multiStreamInstance("123456789012:stream:456"); periodicShardSyncManager = new PeriodicShardSyncManager("worker", leaderDecider, leaseRefresher, currentStreamConfigMap, shardSyncTaskManagerProvider, true, new NullMetricsFactory(), 2 * 60 * 1000, 3); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java index aeb07e66..1be0a9d4 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java @@ -456,7 +456,7 @@ public class SchedulerTest { .shardId("some_random_shard_id")) .collect(Collectors.toCollection(LinkedList::new)); // Include a stream that is already tracked by multiStreamTracker, just to make sure we will not touch this stream config later - leasesInTable.add(new MultiStreamLease().streamIdentifier("acc1:stream1:1").shardId("some_random_shard_id")); + leasesInTable.add(new MultiStreamLease().streamIdentifier("123456789012:stream1:1").shardId("some_random_shard_id")); // Expected StreamConfig after running syncStreamsFromLeaseTableOnAppInit // By default, Stream not present in multiStreamTracker will have initial position of LATEST @@ -489,7 +489,7 @@ public class SchedulerTest { .shardId("some_random_shard_id")) .collect(Collectors.toCollection(LinkedList::new)); // Include a stream that is already tracked by multiStreamTracker, just to make sure we will not touch this stream config later - leasesInTable.add(new MultiStreamLease().streamIdentifier("acc1:stream1:1").shardId("some_random_shard_id")); + leasesInTable.add(new MultiStreamLease().streamIdentifier("123456789012:stream1:1").shardId("some_random_shard_id")); // Expected StreamConfig after running syncStreamsFromLeaseTableOnAppInit // Stream not present in multiStreamTracker will have initial position specified by orphanedStreamInitialPositionInStream @@ -1299,13 +1299,13 @@ public class SchedulerTest { @Override public List streamConfigList(){ return new ArrayList() {{ - add(new StreamConfig(StreamIdentifier.multiStreamInstance("acc1:stream1:1"), InitialPositionInStreamExtended.newInitialPosition( + add(new StreamConfig(StreamIdentifier.multiStreamInstance("123456789012:stream1:1"), InitialPositionInStreamExtended.newInitialPosition( InitialPositionInStream.LATEST))); - add(new StreamConfig(StreamIdentifier.multiStreamInstance("acc1:stream2:2"), InitialPositionInStreamExtended.newInitialPosition( + add(new StreamConfig(StreamIdentifier.multiStreamInstance("123456789012:stream2:2"), InitialPositionInStreamExtended.newInitialPosition( InitialPositionInStream.LATEST))); - add(new StreamConfig(StreamIdentifier.multiStreamInstance("acc2:stream1:1"), InitialPositionInStreamExtended.newInitialPosition( + add(new StreamConfig(StreamIdentifier.multiStreamInstance("210987654321:stream1:1"), InitialPositionInStreamExtended.newInitialPosition( InitialPositionInStream.LATEST))); - add(new StreamConfig(StreamIdentifier.multiStreamInstance("acc2:stream2:3"), InitialPositionInStreamExtended.newInitialPosition( + add(new StreamConfig(StreamIdentifier.multiStreamInstance("210987654321:stream2:3"), InitialPositionInStreamExtended.newInitialPosition( InitialPositionInStream.LATEST))); }}; } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java index d9f36481..9e130c38 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java @@ -92,7 +92,7 @@ public class HierarchicalShardSyncerTest { private static final String LEASE_OWNER = "TestOwner"; private static final MetricsScope SCOPE = new NullMetricsScope(); private static final boolean MULTISTREAM_MODE_ON = true; - private static final String STREAM_IDENTIFIER = "acc:stream:1"; + private static final String STREAM_IDENTIFIER = "123456789012:stream:1"; private static final HierarchicalShardSyncer.MultiStreamArgs MULTI_STREAM_ARGS = new HierarchicalShardSyncer.MultiStreamArgs( MULTISTREAM_MODE_ON, StreamIdentifier.multiStreamInstance(STREAM_IDENTIFIER)); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/KinesisClientFacadeTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/KinesisClientFacadeTest.java new file mode 100644 index 00000000..7f978bdc --- /dev/null +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/KinesisClientFacadeTest.java @@ -0,0 +1,66 @@ +package software.amazon.kinesis.retrieval; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static software.amazon.kinesis.retrieval.KinesisClientFacade.describeStreamSummary; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryRequest; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryResponse; +import software.amazon.awssdk.services.kinesis.model.LimitExceededException; + +@RunWith(MockitoJUnitRunner.class) +public class KinesisClientFacadeTest { + + @Mock + private KinesisAsyncClient mockKinesisClient; + + @Before + public void setUp() { + KinesisClientFacade.initialize(mockKinesisClient); + } + + @Test + public void testDescribeStreamSummary() { + final DescribeStreamSummaryResponse expectedResponse = DescribeStreamSummaryResponse.builder().build(); + when(mockKinesisClient.describeStreamSummary(any(DescribeStreamSummaryRequest.class))) + .thenReturn(CompletableFuture.completedFuture(expectedResponse)); + + final DescribeStreamSummaryResponse actualResponse = describeStreamSummary("narf"); + assertEquals(expectedResponse, actualResponse); + + verify(mockKinesisClient).describeStreamSummary(any(DescribeStreamSummaryRequest.class)); + } + + @Test + public void testDescribeStreamSummaryRetries() throws Exception { + final DescribeStreamSummaryResponse expectedResponse = DescribeStreamSummaryResponse.builder().build(); + final CompletableFuture mockFuture = mock(CompletableFuture.class); + final ExecutionException executionException = new ExecutionException(LimitExceededException.builder().build()); + + when(mockKinesisClient.describeStreamSummary(any(DescribeStreamSummaryRequest.class))) + .thenReturn(mockFuture); + when(mockFuture.get()) + .thenThrow(executionException) + .thenThrow(executionException) + .thenReturn(expectedResponse); + + final DescribeStreamSummaryResponse actualResponse = describeStreamSummary("retry me plz"); + assertEquals(expectedResponse, actualResponse); + + verify(mockKinesisClient, times(3)).describeStreamSummary(any(DescribeStreamSummaryRequest.class)); + verify(mockFuture, times(3)).get(); + } +} \ No newline at end of file diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutConfigTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutConfigTest.java index 32ca17ce..58454087 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutConfigTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutConfigTest.java @@ -94,14 +94,14 @@ public class FanOutConfigTest { public void testRegisterNotCalledWhenConsumerArnSetInMultiStreamMode() throws Exception { when(streamConfig.consumerArn()).thenReturn("consumerArn"); - getRecordsCache("account:stream:12345"); + getRecordsCache("123456789012:stream:12345"); verify(consumerRegistration, never()).getOrCreateStreamConsumerArn(); } @Test public void testRegisterCalledWhenConsumerArnNotSetInMultiStreamMode() throws Exception { - getRecordsCache("account:stream:12345"); + getRecordsCache("123456789012:stream:12345"); verify(consumerRegistration).getOrCreateStreamConsumerArn(); }