StreamARN: enhanced StreamIdentifier to spawn instances from stream… (#1098)

* StreamARN: enhanced `StreamIdentifier` to spawn instances from stream ARNs.

* Bugfix: moved instantation of `Future` inside `Supplier`.
This commit is contained in:
stair 2023-04-19 19:38:00 -04:00 committed by GitHub
parent 0fd94acb2b
commit b86fa22e96
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 443 additions and 85 deletions

View file

@ -15,46 +15,89 @@
package software.amazon.kinesis.common;
import com.google.common.base.Joiner;
import lombok.AccessLevel;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import lombok.NonNull;
import lombok.experimental.Accessors;
import software.amazon.awssdk.arns.Arn;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryResponse;
import software.amazon.awssdk.utils.Validate;
import software.amazon.kinesis.retrieval.KinesisClientFacade;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@Builder(access = AccessLevel.PRIVATE)
@EqualsAndHashCode
@Getter
@ToString
@Accessors(fluent = true)
public class StreamIdentifier {
@Builder.Default
private final Optional<String> accountIdOptional = Optional.empty();
@NonNull
private final String streamName;
@Builder.Default
private final Optional<Long> streamCreationEpochOptional = Optional.empty();
private Optional<Long> streamCreationEpochOptional = Optional.empty();
@Builder.Default
private final Optional<Arn> streamARNOptional = Optional.empty();
private static final String DELIMITER = ":";
private static final Pattern PATTERN = Pattern.compile(".*" + ":" + ".*" + ":" + "[0-9]*" + ":?([a-z]{2}(-gov)?-[a-z]+-\\d{1})?");
/**
* Pattern for a serialized {@link StreamIdentifier}. The valid format is
* {@code <accountId>:<streamName>:<creationEpoch>[:<region>]} where
* {@code region} is the id representation of a {@link Region} and is
* optional.
*/
private static final Pattern STREAM_IDENTIFIER_PATTERN = Pattern.compile(
// `?::` has two parts: `?:` starts a non-capturing group, and
// `:` is the first character in the group (i.e., ":<region>")
"(?<accountId>[0-9]+):(?<streamName>[^:]+):(?<creationEpoch>[0-9]+)(?::(?<region>[-a-z0-9]+))?");
/**
* Pattern for a stream ARN. The valid format is
* {@code arn:aws:kinesis:<region>:<accountId>:stream:<streamName>}
* where {@code region} is the id representation of a {@link Region}.
*/
private static final Pattern STREAM_ARN_PATTERN = Pattern.compile(
"arn:aws:kinesis:(?<region>[-a-z0-9]+):(?<accountId>[0-9]{12}):stream/(?<streamName>.+)");
/**
* Serialize the current StreamIdentifier instance.
* TODO: Consider appending region info for cross-account consumer support
* @return
*
* @return a String of {@code account:stream:creationEpoch[:region]}
* where {@code region} is the id representation of a {@link Region}
* and is optional.
*/
public String serialize() {
return accountIdOptional.isPresent() ?
Joiner.on(DELIMITER).join(accountIdOptional.get(), streamName, streamCreationEpochOptional.get()) :
streamName;
if (!accountIdOptional.isPresent()) {
return streamName;
}
if (!streamCreationEpochOptional.isPresent()) {
// FIXME bias-for-action hack to simplify back-porting into KCL 1.x and facilitate the
// backwards-compatible requirement. There's a chicken-and-egg issue if DSS is
// called as the application is being configured (and before the client is rigged).
// Furthermore, if epoch isn't lazy-loaded here, the problem quickly spirals into
// systemic issues of concurrency and consistency (e.g., PeriodicShardSyncManager,
// Scheduler, DDB leases). We should look at leveraging dependency injection.
// (NOTE: not to inject the Kinesis client here, but to ensure the client is
// accessible elsewhere ASAP.)
final DescribeStreamSummaryResponse dss = KinesisClientFacade.describeStreamSummary(
streamARNOptional().get().toString());
final long creationEpoch = dss.streamDescriptionSummary().streamCreationTimestamp().getEpochSecond();
streamCreationEpochOptional = Optional.of(creationEpoch);
}
final char delimiter = ':';
final StringBuilder sb = new StringBuilder(accountIdOptional.get()).append(delimiter)
.append(streamName).append(delimiter);
streamCreationEpochOptional.ifPresent(sb::append);
streamARNOptional.flatMap(Arn::region).ifPresent(region -> sb.append(delimiter).append(region));
return sb.toString();
}
@Override
@ -64,61 +107,113 @@ public class StreamIdentifier {
/**
* Create a multi stream instance for StreamIdentifier from serialized stream identifier.
* See the format of a serialized stream identifier at {@link StreamIdentifier#multiStreamInstance(String, Region)}
* @param streamIdentifierSer
* @return StreamIdentifier
*
* @param serializationOrArn serialized {@link StreamIdentifier} or AWS ARN of a Kinesis stream
*
* @see #multiStreamInstance(String, Region)
* @see #serialize()
*/
public static StreamIdentifier multiStreamInstance(String streamIdentifierSer) {
return multiStreamInstance(streamIdentifierSer, null);
public static StreamIdentifier multiStreamInstance(String serializationOrArn) {
return multiStreamInstance(serializationOrArn, null);
}
/**
* Create a multi stream instance for StreamIdentifier from serialized stream identifier.
* @param streamIdentifierSer The serialized stream identifier should be of the format
* account:stream:creationepoch[:region]
*
* @param serializationOrArn serialized {@link StreamIdentifier} or AWS ARN of a Kinesis stream
* @param kinesisRegion This nullable region is used to construct the optional StreamARN
* @return StreamIdentifier
*
* @see #serialize()
*/
public static StreamIdentifier multiStreamInstance(String streamIdentifierSer, Region kinesisRegion) {
if (PATTERN.matcher(streamIdentifierSer).matches()) {
final String[] split = streamIdentifierSer.split(DELIMITER);
final String streamName = split[1];
final Optional<String> accountId = Optional.ofNullable(split[0]);
StreamIdentifierBuilder builder = StreamIdentifier.builder()
.accountIdOptional(accountId)
.streamName(streamName)
.streamCreationEpochOptional(Optional.of(Long.parseLong(split[2])));
final Region region = (split.length == 4) ?
Region.of(split[3]) : // Use the region extracted from the serialized string, which matches the regex pattern
kinesisRegion; // Otherwise just use the provided region
final Optional<Arn> streamARN = StreamARNUtil.getStreamARN(streamName, region, accountId);
return builder.streamARNOptional(streamARN).build();
} else {
throw new IllegalArgumentException("Unable to deserialize StreamIdentifier from " + streamIdentifierSer);
public static StreamIdentifier multiStreamInstance(String serializationOrArn, Region kinesisRegion) {
final StreamIdentifier fromSerialization = fromSerialization(serializationOrArn, kinesisRegion);
if (fromSerialization != null) {
return fromSerialization;
}
final StreamIdentifier fromArn = fromArn(serializationOrArn, kinesisRegion);
if (fromArn != null) {
return fromArn;
}
throw new IllegalArgumentException("Unable to deserialize StreamIdentifier from " + serializationOrArn);
}
/**
* Create a single stream instance for StreamIdentifier from stream name.
* @param streamName
* @return StreamIdentifier
*
* @param streamNameOrArn stream name or AWS ARN of a Kinesis stream
*/
public static StreamIdentifier singleStreamInstance(String streamName) {
return singleStreamInstance(streamName, null);
public static StreamIdentifier singleStreamInstance(String streamNameOrArn) {
return singleStreamInstance(streamNameOrArn, null);
}
/**
* Create a single stream instance for StreamIdentifier from the provided stream name and kinesisRegion.
* This method also constructs the optional StreamARN based on the region info.
* @param streamName
* @param kinesisRegion
* @return StreamIdentifier
*
* @param streamNameOrArn stream name or AWS ARN of a Kinesis stream
* @param kinesisRegion (optional) region used to construct the ARN
*/
public static StreamIdentifier singleStreamInstance(String streamName, Region kinesisRegion) {
Validate.notEmpty(streamName, "StreamName should not be empty");
public static StreamIdentifier singleStreamInstance(String streamNameOrArn, Region kinesisRegion) {
Validate.notEmpty(streamNameOrArn, "StreamName should not be empty");
final StreamIdentifier fromArn = fromArn(streamNameOrArn, kinesisRegion);
if (fromArn != null) {
return fromArn;
}
return StreamIdentifier.builder()
.streamName(streamName)
.streamARNOptional(StreamARNUtil.getStreamARN(streamName, kinesisRegion))
.streamName(streamNameOrArn)
.streamARNOptional(StreamARNUtil.getStreamARN(streamNameOrArn, kinesisRegion))
.build();
}
/**
* Deserializes a StreamIdentifier from {@link #STREAM_IDENTIFIER_PATTERN}.
*
* @param input input string (e.g., ARN, serialized instance) to convert into an instance
* @param kinesisRegion (optional) region used to construct the ARN
* @return a StreamIdentifier instance if the pattern matched, otherwise null
*/
private static StreamIdentifier fromSerialization(final String input, final Region kinesisRegion) {
final Matcher matcher = STREAM_IDENTIFIER_PATTERN.matcher(input);
return matcher.matches()
? toStreamIdentifier(matcher, matcher.group("creationEpoch"), kinesisRegion) : null;
}
/**
* Constructs a StreamIdentifier from {@link #STREAM_ARN_PATTERN}.
*
* @param input input string (e.g., ARN, serialized instance) to convert into an instance
* @param kinesisRegion (optional) region used to construct the ARN
* @return a StreamIdentifier instance if the pattern matched, otherwise null
*/
private static StreamIdentifier fromArn(final String input, final Region kinesisRegion) {
final Matcher matcher = STREAM_ARN_PATTERN.matcher(input);
return matcher.matches()
? toStreamIdentifier(matcher, "", kinesisRegion) : null;
}
private static StreamIdentifier toStreamIdentifier(final Matcher matcher, final String matchedEpoch,
final Region kinesisRegion) {
final Optional<String> accountId = Optional.of(matcher.group("accountId"));
final String streamName = matcher.group("streamName");
final Optional<Long> creationEpoch = matchedEpoch.isEmpty() ? Optional.empty()
: Optional.of(Long.valueOf(matchedEpoch));
final String matchedRegion = matcher.group("region");
final Region region = (matchedRegion != null) ? Region.of(matchedRegion) : kinesisRegion;
final Optional<Arn> arn = StreamARNUtil.getStreamARN(streamName, region, accountId);
if (!creationEpoch.isPresent() && !arn.isPresent()) {
throw new IllegalArgumentException("Cannot create StreamIdentifier if missing both ARN and creation epoch");
}
return StreamIdentifier.builder()
.accountIdOptional(accountId)
.streamName(streamName)
.streamCreationEpochOptional(creationEpoch)
.streamARNOptional(arn)
.build();
}
}

View file

@ -0,0 +1,100 @@
package software.amazon.kinesis.retrieval;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryResponse;
import software.amazon.awssdk.services.kinesis.model.KinesisException;
import software.amazon.awssdk.services.kinesis.model.LimitExceededException;
import software.amazon.awssdk.services.kinesis.model.ResourceInUseException;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import software.amazon.kinesis.common.KinesisRequestsBuilder;
/**
* Facade pattern to simplify interactions with a {@link KinesisAsyncClient}.
*/
@Slf4j
public final class KinesisClientFacade {
/**
* Reusable {@link AWSExceptionManager}.
* <p>
* N.B. This instance is mutable, but thread-safe for <b>read-only</b> use.
* </p>
*/
private static final AWSExceptionManager AWS_EXCEPTION_MANAGER;
// FIXME dependency injection
private static KinesisAsyncClient kinesisClient;
static {
AWS_EXCEPTION_MANAGER = new AWSExceptionManager();
AWS_EXCEPTION_MANAGER.add(KinesisException.class, t -> t);
AWS_EXCEPTION_MANAGER.add(LimitExceededException.class, t -> t);
AWS_EXCEPTION_MANAGER.add(ResourceInUseException.class, t -> t);
AWS_EXCEPTION_MANAGER.add(ResourceNotFoundException.class, t -> t);
}
static void initialize(final KinesisAsyncClient client) {
kinesisClient = client;
}
public static DescribeStreamSummaryResponse describeStreamSummary(final String streamArn) {
final DescribeStreamSummaryRequest request = KinesisRequestsBuilder
.describeStreamSummaryRequestBuilder().streamARN(streamArn).build();
final ServiceCallerSupplier<DescribeStreamSummaryResponse> dss =
() -> kinesisClient.describeStreamSummary(request).get();
return retryWhenThrottled(dss, 3, streamArn, "DescribeStreamSummary");
}
// FIXME code lifted-and-shifted from FanOutConsumerRegistration; that class
// (and others) should not be responsible for interacting directly with
// the thread-safe Kinesis client (and handling retries, etc.)
private static <T> T retryWhenThrottled(
@NonNull final ServiceCallerSupplier<T> retriever,
final int maxRetries,
final String streamArn,
@NonNull final String apiName) {
LimitExceededException finalException = null;
int retries = maxRetries;
while (retries > 0) {
try {
try {
return retriever.get();
} catch (ExecutionException e) {
throw AWS_EXCEPTION_MANAGER.apply(e.getCause());
} catch (InterruptedException e) {
throw KinesisException.create("Unable to complete " + apiName, e);
} catch (TimeoutException te) {
log.info("Timed out waiting for " + apiName + " for " + streamArn);
}
} catch (LimitExceededException e) {
log.info("{} : Throttled while calling {} API, will backoff.", streamArn, apiName);
try {
Thread.sleep(1000 + (long) (Math.random() * 100));
} catch (InterruptedException ie) {
log.debug("Sleep interrupted, shutdown invoked.");
}
finalException = e;
}
retries--;
}
if (finalException == null) {
throw new IllegalStateException(streamArn + " : Exhausted retries while calling " + apiName);
}
throw finalException;
}
@FunctionalInterface
private interface ServiceCallerSupplier<T> {
T get() throws ExecutionException, InterruptedException, TimeoutException;
}
}

View file

@ -33,7 +33,6 @@ import software.amazon.kinesis.processor.MultiStreamTracker;
import software.amazon.kinesis.processor.SingleStreamTracker;
import software.amazon.kinesis.processor.StreamTracker;
import software.amazon.kinesis.retrieval.fanout.FanOutConfig;
import software.amazon.kinesis.retrieval.polling.PollingConfig;
/**
* Used by the KCL to configure the retrieval of records from Kinesis.
@ -132,6 +131,8 @@ public class RetrievalConfig {
this.applicationName = applicationName;
this.appStreamTracker = DeprecationUtils.convert(streamTracker,
singleStreamTracker -> singleStreamTracker.streamConfigList().get(0));
KinesisClientFacade.initialize(kinesisAsyncClient);
}
/**

View file

@ -1,5 +1,6 @@
package software.amazon.kinesis.common;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -8,38 +9,113 @@ import org.powermock.modules.junit4.PowerMockRunner;
import software.amazon.awssdk.arns.Arn;
import software.amazon.awssdk.regions.Region;
import java.util.Arrays;
import java.util.Optional;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.when;
import static org.powermock.api.mockito.PowerMockito.mockStatic;
import static org.powermock.api.mockito.PowerMockito.verifyStatic;
import static software.amazon.kinesis.common.StreamARNUtil.getStreamARN;
@RunWith(PowerMockRunner.class)
@PrepareForTest(StreamARNUtil.class)
public class StreamIdentifierTest {
private static final String STREAM_NAME = "streamName";
private static final String STREAM_NAME = "stream-name";
private static final Region KINESIS_REGION = Region.US_WEST_1;
private static final String TEST_ACCOUNT_ID = "111111111111";
private static final String EPOCH = "1680616058";
private static final String TEST_ACCOUNT_ID = "123456789012";
private static final long EPOCH = 1680616058L;
private static final Arn DEFAULT_ARN = Arn.builder().partition("aws").service("kinesis")
.region(KINESIS_REGION.toString()).accountId(TEST_ACCOUNT_ID).resource("stream/" + STREAM_NAME).build();
private static final Arn DEFAULT_ARN = toArn(KINESIS_REGION);
@Before
public void setUp() {
mockStatic(StreamARNUtil.class);
when(StreamARNUtil.getStreamARN(anyString(), any(Region.class))).thenReturn(Optional.empty());
when(StreamARNUtil.getStreamARN(STREAM_NAME, KINESIS_REGION)).thenReturn(Optional.of(DEFAULT_ARN));
when(StreamARNUtil.getStreamARN(STREAM_NAME, KINESIS_REGION, Optional.of(TEST_ACCOUNT_ID)))
when(getStreamARN(anyString(), any(Region.class))).thenReturn(Optional.empty());
when(getStreamARN(STREAM_NAME, KINESIS_REGION)).thenReturn(Optional.of(DEFAULT_ARN));
when(getStreamARN(STREAM_NAME, KINESIS_REGION, Optional.of(TEST_ACCOUNT_ID)))
.thenReturn(Optional.of(DEFAULT_ARN));
}
/**
* Test patterns that should match a serialization regex.
*/
@Test
public void testMultiStreamDeserializationSuccess() {
for (final String pattern : Arrays.asList(
// arn examples
toArn(KINESIS_REGION).toString(),
// serialization examples
"123456789012:stream-name:123",
"123456789012:stream-name:123:" + Region.US_ISOB_EAST_1
)) {
final StreamIdentifier si = StreamIdentifier.multiStreamInstance(pattern);
assertNotNull(si);
}
}
/**
* Test patterns that <b>should not</b> match a serialization regex.
*/
@Test
public void testMultiStreamDeserializationFail() {
for (final String pattern : Arrays.asList(
// arn examples
"arn:aws:kinesis::123456789012:stream/stream-name", // missing region
"arn:aws:kinesis:region::stream/stream-name", // missing account id
"arn:aws:kinesis:region:123456789:stream/stream-name", // account id not 12 digits
"arn:aws:kinesis:region:123456789abc:stream/stream-name", // 12char alphanumeric account id
"arn:aws:kinesis:region:123456789012:stream/", // missing stream-name
// serialization examples
":stream-name:123", // missing account id
"123456789012:stream-name", // missing delimiter before creation epoch
"accountId:stream-name:123", // non-numeric account id
// "123456789:stream-name:123", // account id not 12 digits
"123456789abc:stream-name:123", // 12char alphanumeric account id
"123456789012::123", // missing stream name
"123456789012:stream-name:", // missing creation epoch
"123456789012:stream-name::", // missing creation epoch; ':' for optional region yet missing region
"123456789012:stream-name::us-east-1", // missing creation epoch
"123456789012:stream-name:abc", // non-numeric creation epoch
"123456789012:stream-name:abc:", // non-numeric creation epoch with ':' yet missing region
"123456789012:stream-name:123:", // ':' for optional region yet missing region
""
)) {
try {
StreamIdentifier.multiStreamInstance(pattern);
Assert.fail(pattern + " should not have created a StreamIdentifier");
} catch (final IllegalArgumentException iae) {
// expected; ignore
}
}
}
@Test
public void testInstanceFromArn() {
final Arn arn = toArn(KINESIS_REGION);
final StreamIdentifier single = StreamIdentifier.singleStreamInstance(arn.toString());
final StreamIdentifier multi = StreamIdentifier.multiStreamInstance(arn.toString());
assertEquals(single, multi);
assertEquals(Optional.of(TEST_ACCOUNT_ID), single.accountIdOptional());
assertEquals(STREAM_NAME, single.streamName());
assertEquals(Optional.of(arn), single.streamARNOptional());
}
@Test(expected = IllegalArgumentException.class)
public void testInstanceWithoutEpochOrArn() {
when(getStreamARN(STREAM_NAME, KINESIS_REGION, Optional.of(TEST_ACCOUNT_ID)))
.thenReturn(Optional.empty());
final Arn arn = toArn(KINESIS_REGION);
StreamIdentifier.singleStreamInstance(arn.toString());
}
@Test
public void testSingleStreamInstanceWithName() {
StreamIdentifier actualStreamIdentifier = StreamIdentifier.singleStreamInstance(STREAM_NAME);
@ -54,44 +130,64 @@ public class StreamIdentifierTest {
StreamIdentifier actualStreamIdentifier = StreamIdentifier.singleStreamInstance(STREAM_NAME, KINESIS_REGION);
assertFalse(actualStreamIdentifier.streamCreationEpochOptional().isPresent());
assertFalse(actualStreamIdentifier.accountIdOptional().isPresent());
assertTrue(actualStreamIdentifier.streamARNOptional().isPresent());
assertEquals(DEFAULT_ARN, actualStreamIdentifier.streamARNOptional().get());
assertEquals(Optional.of(DEFAULT_ARN), actualStreamIdentifier.streamARNOptional());
}
@Test
public void testMultiStreamInstanceWithIdentifierSerialization() {
StreamIdentifier actualStreamIdentifier = StreamIdentifier.multiStreamInstance(
String.join(":", TEST_ACCOUNT_ID, STREAM_NAME, EPOCH, KINESIS_REGION.toString()));
assertActualStreamIdentifierExpected(DEFAULT_ARN, actualStreamIdentifier);
StreamIdentifier actualStreamIdentifier = StreamIdentifier.multiStreamInstance(serialize(KINESIS_REGION));
assertActualStreamIdentifierExpected(actualStreamIdentifier);
}
@Test
public void testMultiStreamInstanceWithRegionSerialized() {
Region serializedRegion = Region.US_GOV_EAST_1;
Optional<Arn> arn = Optional.ofNullable(Arn.builder().partition("aws").service("kinesis")
.accountId(TEST_ACCOUNT_ID).region(serializedRegion.toString()).resource("stream/" + STREAM_NAME).build());
final Optional<Arn> arn = Optional.of(toArn(serializedRegion));
when(StreamARNUtil.getStreamARN(eq(STREAM_NAME), eq(serializedRegion), any(Optional.class))).thenReturn(arn);
when(getStreamARN(STREAM_NAME, serializedRegion, Optional.of(TEST_ACCOUNT_ID))).thenReturn(arn);
final String expectedSerialization = serialize(serializedRegion);
StreamIdentifier actualStreamIdentifier = StreamIdentifier.multiStreamInstance(
String.join(":", TEST_ACCOUNT_ID, STREAM_NAME, EPOCH, serializedRegion.toString()), KINESIS_REGION);
expectedSerialization, KINESIS_REGION);
assertActualStreamIdentifierExpected(arn, actualStreamIdentifier);
assertEquals(expectedSerialization, actualStreamIdentifier.serialize());
verifyStatic(StreamARNUtil.class);
getStreamARN(STREAM_NAME, serializedRegion, Optional.of(TEST_ACCOUNT_ID));
}
@Test
public void testMultiStreamInstanceWithoutRegionSerialized() {
StreamIdentifier actualStreamIdentifier = StreamIdentifier.multiStreamInstance(
String.join(":", TEST_ACCOUNT_ID, STREAM_NAME, EPOCH), KINESIS_REGION);
assertActualStreamIdentifierExpected(DEFAULT_ARN, actualStreamIdentifier);
serialize(null), KINESIS_REGION);
assertActualStreamIdentifierExpected(actualStreamIdentifier);
}
private void assertActualStreamIdentifierExpected(Optional<Arn> expectedARN, StreamIdentifier actual) {
assertActualStreamIdentifierExpected(expectedARN.get(), actual);
private void assertActualStreamIdentifierExpected(StreamIdentifier actual) {
assertActualStreamIdentifierExpected(Optional.of(DEFAULT_ARN), actual);
}
private void assertActualStreamIdentifierExpected(Arn expectedARN, StreamIdentifier actual) {
assertTrue(actual.streamCreationEpochOptional().isPresent());
assertTrue(actual.accountIdOptional().isPresent());
assertTrue(actual.streamARNOptional().isPresent());
assertEquals(Optional.of(expectedARN), actual.streamARNOptional());
private void assertActualStreamIdentifierExpected(Optional<Arn> expectedArn, StreamIdentifier actual) {
assertEquals(STREAM_NAME, actual.streamName());
assertEquals(Optional.of(EPOCH), actual.streamCreationEpochOptional());
assertEquals(Optional.of(TEST_ACCOUNT_ID), actual.accountIdOptional());
assertEquals(expectedArn, actual.streamARNOptional());
}
/**
* Creates a pattern that matches {@link StreamIdentifier} serialization.
*
* @param region (optional) region to serialize
*/
private static String serialize(final Region region) {
return String.join(":", TEST_ACCOUNT_ID, STREAM_NAME, Long.toString(EPOCH)) +
((region == null) ? "" : ':' + region.toString());
}
private static Arn toArn(final Region region) {
return Arn.builder().partition("aws").service("kinesis")
.accountId(TEST_ACCOUNT_ID)
.resource("stream/" + STREAM_NAME)
.region(region.toString())
.build();
}
}

View file

@ -73,7 +73,7 @@ public class PeriodicShardSyncManagerTest {
@Before
public void setup() {
streamIdentifier = StreamIdentifier.multiStreamInstance("123:stream:456");
streamIdentifier = StreamIdentifier.multiStreamInstance("123456789012:stream:456");
periodicShardSyncManager = new PeriodicShardSyncManager("worker", leaderDecider, leaseRefresher, currentStreamConfigMap,
shardSyncTaskManagerProvider, true, new NullMetricsFactory(), 2 * 60 * 1000, 3);
}

View file

@ -456,7 +456,7 @@ public class SchedulerTest {
.shardId("some_random_shard_id"))
.collect(Collectors.toCollection(LinkedList::new));
// Include a stream that is already tracked by multiStreamTracker, just to make sure we will not touch this stream config later
leasesInTable.add(new MultiStreamLease().streamIdentifier("acc1:stream1:1").shardId("some_random_shard_id"));
leasesInTable.add(new MultiStreamLease().streamIdentifier("123456789012:stream1:1").shardId("some_random_shard_id"));
// Expected StreamConfig after running syncStreamsFromLeaseTableOnAppInit
// By default, Stream not present in multiStreamTracker will have initial position of LATEST
@ -489,7 +489,7 @@ public class SchedulerTest {
.shardId("some_random_shard_id"))
.collect(Collectors.toCollection(LinkedList::new));
// Include a stream that is already tracked by multiStreamTracker, just to make sure we will not touch this stream config later
leasesInTable.add(new MultiStreamLease().streamIdentifier("acc1:stream1:1").shardId("some_random_shard_id"));
leasesInTable.add(new MultiStreamLease().streamIdentifier("123456789012:stream1:1").shardId("some_random_shard_id"));
// Expected StreamConfig after running syncStreamsFromLeaseTableOnAppInit
// Stream not present in multiStreamTracker will have initial position specified by orphanedStreamInitialPositionInStream
@ -1299,13 +1299,13 @@ public class SchedulerTest {
@Override
public List<StreamConfig> streamConfigList(){
return new ArrayList<StreamConfig>() {{
add(new StreamConfig(StreamIdentifier.multiStreamInstance("acc1:stream1:1"), InitialPositionInStreamExtended.newInitialPosition(
add(new StreamConfig(StreamIdentifier.multiStreamInstance("123456789012:stream1:1"), InitialPositionInStreamExtended.newInitialPosition(
InitialPositionInStream.LATEST)));
add(new StreamConfig(StreamIdentifier.multiStreamInstance("acc1:stream2:2"), InitialPositionInStreamExtended.newInitialPosition(
add(new StreamConfig(StreamIdentifier.multiStreamInstance("123456789012:stream2:2"), InitialPositionInStreamExtended.newInitialPosition(
InitialPositionInStream.LATEST)));
add(new StreamConfig(StreamIdentifier.multiStreamInstance("acc2:stream1:1"), InitialPositionInStreamExtended.newInitialPosition(
add(new StreamConfig(StreamIdentifier.multiStreamInstance("210987654321:stream1:1"), InitialPositionInStreamExtended.newInitialPosition(
InitialPositionInStream.LATEST)));
add(new StreamConfig(StreamIdentifier.multiStreamInstance("acc2:stream2:3"), InitialPositionInStreamExtended.newInitialPosition(
add(new StreamConfig(StreamIdentifier.multiStreamInstance("210987654321:stream2:3"), InitialPositionInStreamExtended.newInitialPosition(
InitialPositionInStream.LATEST)));
}};
}

View file

@ -92,7 +92,7 @@ public class HierarchicalShardSyncerTest {
private static final String LEASE_OWNER = "TestOwner";
private static final MetricsScope SCOPE = new NullMetricsScope();
private static final boolean MULTISTREAM_MODE_ON = true;
private static final String STREAM_IDENTIFIER = "acc:stream:1";
private static final String STREAM_IDENTIFIER = "123456789012:stream:1";
private static final HierarchicalShardSyncer.MultiStreamArgs MULTI_STREAM_ARGS = new HierarchicalShardSyncer.MultiStreamArgs(
MULTISTREAM_MODE_ON, StreamIdentifier.multiStreamInstance(STREAM_IDENTIFIER));

View file

@ -0,0 +1,66 @@
package software.amazon.kinesis.retrieval;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static software.amazon.kinesis.retrieval.KinesisClientFacade.describeStreamSummary;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryResponse;
import software.amazon.awssdk.services.kinesis.model.LimitExceededException;
@RunWith(MockitoJUnitRunner.class)
public class KinesisClientFacadeTest {
@Mock
private KinesisAsyncClient mockKinesisClient;
@Before
public void setUp() {
KinesisClientFacade.initialize(mockKinesisClient);
}
@Test
public void testDescribeStreamSummary() {
final DescribeStreamSummaryResponse expectedResponse = DescribeStreamSummaryResponse.builder().build();
when(mockKinesisClient.describeStreamSummary(any(DescribeStreamSummaryRequest.class)))
.thenReturn(CompletableFuture.completedFuture(expectedResponse));
final DescribeStreamSummaryResponse actualResponse = describeStreamSummary("narf");
assertEquals(expectedResponse, actualResponse);
verify(mockKinesisClient).describeStreamSummary(any(DescribeStreamSummaryRequest.class));
}
@Test
public void testDescribeStreamSummaryRetries() throws Exception {
final DescribeStreamSummaryResponse expectedResponse = DescribeStreamSummaryResponse.builder().build();
final CompletableFuture<DescribeStreamSummaryResponse> mockFuture = mock(CompletableFuture.class);
final ExecutionException executionException = new ExecutionException(LimitExceededException.builder().build());
when(mockKinesisClient.describeStreamSummary(any(DescribeStreamSummaryRequest.class)))
.thenReturn(mockFuture);
when(mockFuture.get())
.thenThrow(executionException)
.thenThrow(executionException)
.thenReturn(expectedResponse);
final DescribeStreamSummaryResponse actualResponse = describeStreamSummary("retry me plz");
assertEquals(expectedResponse, actualResponse);
verify(mockKinesisClient, times(3)).describeStreamSummary(any(DescribeStreamSummaryRequest.class));
verify(mockFuture, times(3)).get();
}
}

View file

@ -94,14 +94,14 @@ public class FanOutConfigTest {
public void testRegisterNotCalledWhenConsumerArnSetInMultiStreamMode() throws Exception {
when(streamConfig.consumerArn()).thenReturn("consumerArn");
getRecordsCache("account:stream:12345");
getRecordsCache("123456789012:stream:12345");
verify(consumerRegistration, never()).getOrCreateStreamConsumerArn();
}
@Test
public void testRegisterCalledWhenConsumerArnNotSetInMultiStreamMode() throws Exception {
getRecordsCache("account:stream:12345");
getRecordsCache("123456789012:stream:12345");
verify(consumerRegistration).getOrCreateStreamConsumerArn();
}