Internally construct and use stream ARNs for all streams in multi-stream mode (#1318)

This commit is contained in:
furq-aws 2024-04-30 14:19:58 -07:00 committed by GitHub
parent c12cee2a1b
commit ec34ed1def
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 228 additions and 30 deletions

View file

@ -0,0 +1,32 @@
package software.amazon.kinesis.common;
import lombok.NonNull;
import software.amazon.awssdk.arns.Arn;
import software.amazon.awssdk.regions.Region;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import static software.amazon.awssdk.services.kinesis.KinesisAsyncClient.SERVICE_NAME;
@KinesisClientInternalApi
public final class ArnUtil {
private static final String STREAM_RESOURCE_PREFIX = "stream/";
/**
* Construct a Kinesis stream ARN.
*
* @param region The region the stream exists in.
* @param accountId The account the stream belongs to.
* @param streamName The name of the stream.
* @return The {@link Arn} of the Kinesis stream.
*/
public static Arn constructStreamArn(
@NonNull final Region region, @NonNull final String accountId, @NonNull final String streamName) {
return Arn.builder()
.partition(region.metadata().partition().id())
.service(SERVICE_NAME)
.region(region.id())
.accountId(accountId)
.resource(STREAM_RESOURCE_PREFIX + streamName)
.build();
}
}

View file

@ -15,10 +15,14 @@
package software.amazon.kinesis.common;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.experimental.Accessors;
@AllArgsConstructor
@RequiredArgsConstructor
@Data
@Accessors(fluent = true)
public class StreamConfig {

View file

@ -49,8 +49,11 @@ import lombok.AccessLevel;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.arns.Arn;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.utils.Validate;
import software.amazon.kinesis.checkpoint.CheckpointConfig;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
@ -97,6 +100,7 @@ import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RetrievalConfig;
import software.amazon.kinesis.schemaregistry.SchemaRegistryDecoder;
import static software.amazon.kinesis.common.ArnUtil.constructStreamArn;
import static software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy.StreamsLeasesDeletionType;
import static software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy.StreamsLeasesDeletionType.FORMER_STREAMS_AUTO_DETECTION_DEFERRED_DELETION;
@ -153,7 +157,7 @@ public class Scheduler implements Runnable {
private final long failoverTimeMillis;
private final long taskBackoffTimeMillis;
private final boolean isMultiStreamMode;
private final Map<StreamIdentifier, StreamConfig> currentStreamConfigMap = new ConcurrentHashMap<>();
private final Map<StreamIdentifier, StreamConfig> currentStreamConfigMap = new StreamConfigMap();
private final StreamTracker streamTracker;
private final FormerStreamsLeasesDeletionStrategy formerStreamsLeasesDeletionStrategy;
private final long listShardsBackoffTimeMillis;
@ -961,7 +965,7 @@ public class Scheduler implements Runnable {
// to gracefully complete the reading.
StreamConfig streamConfig = currentStreamConfigMap.get(streamIdentifier);
if (streamConfig == null) {
streamConfig = streamTracker.createStreamConfig(streamIdentifier);
streamConfig = withStreamArn(streamTracker.createStreamConfig(streamIdentifier), getKinesisRegion());
log.info("Created orphan {}", streamConfig);
}
Validate.notNull(streamConfig, "StreamConfig should not be null");
@ -1051,6 +1055,67 @@ public class Scheduler implements Runnable {
return streamIdentifier;
}
private Region getKinesisRegion() {
return retrievalConfig.kinesisClient().serviceClientConfiguration().region();
}
/**
* Create and return a copy of a {@link StreamConfig} object
* with {@link StreamIdentifier#streamArnOptional()} populated.
* Only to be used in multi-stream mode.
*
* @param streamConfig The {@link StreamConfig} object to return a copy of.
* @param kinesisRegion The {@link Region} the stream exists in, to be used for constructing the {@link Arn}.
* @return A copy of the {@link StreamConfig} with {@link StreamIdentifier#streamArnOptional()} populated.
*/
private static StreamConfig withStreamArn(
@NonNull final StreamConfig streamConfig, @NonNull final Region kinesisRegion) {
Validate.isTrue(streamConfig.streamIdentifier().accountIdOptional().isPresent(),
"accountId should not be empty");
Validate.isTrue(streamConfig.streamIdentifier().streamCreationEpochOptional().isPresent(),
"streamCreationEpoch should not be empty");
log.info("Constructing stream ARN for {} using the Kinesis client's configured region - {}.",
streamConfig.streamIdentifier(), kinesisRegion);
final StreamIdentifier streamIdentifierWithArn = StreamIdentifier.multiStreamInstance(
constructStreamArn(
kinesisRegion,
streamConfig.streamIdentifier().accountIdOptional().get(),
streamConfig.streamIdentifier().streamName()),
streamConfig.streamIdentifier().streamCreationEpochOptional().get());
return new StreamConfig(
streamIdentifierWithArn, streamConfig.initialPositionInStreamExtended(), streamConfig.consumerArn());
}
@RequiredArgsConstructor
private class StreamConfigMap extends ConcurrentHashMap<StreamIdentifier, StreamConfig> {
/**
* If {@link StreamIdentifier#streamArnOptional()} is present for the provided
* {@link StreamConfig#streamIdentifier()}, validates that the region in the stream ARN is consistent with the
* region that the Kinesis client ({@link RetrievalConfig#kinesisClient()}) is configured with.
* <p>
* In multi-stream mode, ensures stream ARN is always present by constructing it using the Kinesis client
* region when {@link StreamIdentifier#streamArnOptional()} is {@link Optional#empty()}.
* <p>
* {@inheritDoc}
*/
@Override
public StreamConfig put(
@NonNull final StreamIdentifier streamIdentifier, @NonNull final StreamConfig streamConfig) {
final Region kinesisRegion = getKinesisRegion();
return super.put(streamIdentifier, streamConfig.streamIdentifier().streamArnOptional()
.map(streamArn -> {
Validate.isTrue(kinesisRegion.id().equals(streamArn.region().get()),
"The provided streamARN " + streamArn
+ " does not match the Kinesis client's configured region - " + kinesisRegion);
return streamConfig;
}).orElse(isMultiStreamMode ? withStreamArn(streamConfig, kinesisRegion) : streamConfig));
}
}
/**
* Logger for suppressing too much INFO logging. To avoid too much logging information Worker will output logging at
* INFO level for a single pass through the main loop every minute. At DEBUG level it will output all INFO logs on

View file

@ -20,6 +20,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
@ -46,11 +47,13 @@ import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.RejectedExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import com.google.common.base.Joiner;
import com.google.common.collect.Sets;
@ -61,14 +64,19 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.OngoingStubbing;
import software.amazon.awssdk.arns.Arn;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisServiceClientConfiguration;
import software.amazon.awssdk.utils.StringUtils;
import software.amazon.kinesis.checkpoint.Checkpoint;
import software.amazon.kinesis.checkpoint.CheckpointConfig;
import software.amazon.kinesis.checkpoint.CheckpointFactory;
@ -129,6 +137,13 @@ public class SchedulerTest {
private static final long MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 1000L;
private static final long MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 30 * 1000L;
private static final long LEASE_TABLE_CHECK_FREQUENCY_MILLIS = 3 * 1000L;
private static final Region TEST_REGION = Region.US_EAST_2;
private static final int ACCOUNT_ID_LENGTH = 12;
private static final long TEST_ACCOUNT = Long.parseLong(StringUtils.repeat("1", ACCOUNT_ID_LENGTH));
private static final long TEST_EPOCH = 1234567890L;
private static final String TEST_SHARD_ID = "shardId-000000000001";
private static final InitialPositionInStreamExtended TEST_INITIAL_POSITION =
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON);
private Scheduler scheduler;
private ShardRecordProcessorFactory shardRecordProcessorFactory;
@ -193,6 +208,8 @@ public class SchedulerTest {
when(retrievalFactory.createGetRecordsCache(any(ShardInfo.class), any(StreamConfig.class),
any(MetricsFactory.class))).thenReturn(recordsPublisher);
when(shardDetector.streamIdentifier()).thenReturn(mock(StreamIdentifier.class));
when(kinesisClient.serviceClientConfiguration())
.thenReturn(KinesisServiceClientConfiguration.builder().region(TEST_REGION).build());
scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
metricsConfig, processorConfig, retrievalConfig);
@ -404,12 +421,12 @@ public class SchedulerTest {
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
List<StreamConfig> streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig(
StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)),
Joiner.on(":").join(streamId * TEST_ACCOUNT, "multiStreamTest-" + streamId, streamId * 12345)),
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)))
.collect(Collectors.toCollection(LinkedList::new));
List<StreamConfig> streamConfigList2 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig(
StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)),
Joiner.on(":").join(streamId * TEST_ACCOUNT, "multiStreamTest-" + streamId, streamId * 12345)),
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)))
.collect(Collectors.toCollection(LinkedList::new));
retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName)
@ -428,12 +445,12 @@ public class SchedulerTest {
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
List<StreamConfig> streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig(
StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)),
Joiner.on(":").join(streamId * TEST_ACCOUNT, "multiStreamTest-" + streamId, streamId * 12345)),
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)))
.collect(Collectors.toCollection(LinkedList::new));
List<StreamConfig> streamConfigList2 = IntStream.range(1, 7).mapToObj(streamId -> new StreamConfig(
StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)),
Joiner.on(":").join(streamId * TEST_ACCOUNT, "multiStreamTest-" + streamId, streamId * 12345)),
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)))
.collect(Collectors.toCollection(LinkedList::new));
retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName)
@ -444,7 +461,7 @@ public class SchedulerTest {
when(scheduler.shouldSyncStreamsNow()).thenReturn(true);
Set<StreamIdentifier> syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases();
Set<StreamIdentifier> expectedSyncedStreams = IntStream.range(5, 7).mapToObj(streamId -> StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))).collect(
Joiner.on(":").join(streamId * TEST_ACCOUNT, "multiStreamTest-" + streamId, streamId * 12345))).collect(
Collectors.toCollection(HashSet::new));
Assert.assertEquals(expectedSyncedStreams, syncedStreams);
Assert.assertEquals(Sets.newHashSet(streamConfigList2),
@ -456,7 +473,7 @@ public class SchedulerTest {
// Streams in lease table but not tracked by multiStreamTracker
List<MultiStreamLease> leasesInTable = IntStream.range(1, 3).mapToObj(streamId -> new MultiStreamLease()
.streamIdentifier(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))
Joiner.on(":").join(streamId * TEST_ACCOUNT, "multiStreamTest-" + streamId, streamId * 12345))
.shardId("some_random_shard_id"))
.collect(Collectors.toCollection(LinkedList::new));
// Include a stream that is already tracked by multiStreamTracker, just to make sure we will not touch this stream config later
@ -466,7 +483,7 @@ public class SchedulerTest {
// By default, Stream not present in multiStreamTracker will have initial position of LATEST
List<StreamConfig> expectedConfig = IntStream.range(1, 3).mapToObj(streamId -> new StreamConfig(
StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)),
Joiner.on(":").join(streamId * TEST_ACCOUNT, "multiStreamTest-" + streamId, streamId * 12345)),
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)))
.collect(Collectors.toCollection(LinkedList::new));
// Include default configs
@ -489,7 +506,7 @@ public class SchedulerTest {
// Streams in lease table but not tracked by multiStreamTracker
List<MultiStreamLease> leasesInTable = IntStream.range(1, 3).mapToObj(streamId -> new MultiStreamLease()
.streamIdentifier(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))
Joiner.on(":").join(streamId * TEST_ACCOUNT, "multiStreamTest-" + streamId, streamId * 12345))
.shardId("some_random_shard_id"))
.collect(Collectors.toCollection(LinkedList::new));
// Include a stream that is already tracked by multiStreamTracker, just to make sure we will not touch this stream config later
@ -499,7 +516,7 @@ public class SchedulerTest {
// Stream not present in multiStreamTracker will have initial position specified by orphanedStreamInitialPositionInStream
List<StreamConfig> expectedConfig = IntStream.range(1, 3).mapToObj(streamId -> new StreamConfig(
StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)),
Joiner.on(":").join(streamId * TEST_ACCOUNT, "multiStreamTest-" + streamId, streamId * 12345)),
InitialPositionInStreamExtended.newInitialPositionAtTimestamp(testTimeStamp)))
.collect(Collectors.toCollection(LinkedList::new));
// Include default configs
@ -557,7 +574,7 @@ public class SchedulerTest {
when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new ProvidedStreamsDeferredDeletionStrategy() {
@Override public List<StreamIdentifier> streamIdentifiersForLeaseCleanup() {
return IntStream.range(1, 3).mapToObj(streamId -> StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))).collect(
Joiner.on(":").join(streamId * TEST_ACCOUNT, "multiStreamTest-" + streamId, streamId * 12345))).collect(
Collectors.toCollection(ArrayList::new));
}
@ -573,12 +590,12 @@ public class SchedulerTest {
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
List<StreamConfig> streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig(
StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)),
Joiner.on(":").join(streamId * TEST_ACCOUNT, "multiStreamTest-" + streamId, streamId * 12345)),
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)))
.collect(Collectors.toCollection(LinkedList::new));
List<StreamConfig> streamConfigList2 = IntStream.range(3, 5).mapToObj(streamId -> new StreamConfig(
StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)),
Joiner.on(":").join(streamId * TEST_ACCOUNT, "multiStreamTest-" + streamId, streamId * 12345)),
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)))
.collect(Collectors.toCollection(LinkedList::new));
retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName)
@ -590,7 +607,7 @@ public class SchedulerTest {
when(scheduler.shouldSyncStreamsNow()).thenReturn(true);
Set<StreamIdentifier> syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases();
Set<StreamIdentifier> expectedPendingStreams = IntStream.range(1, 3).mapToObj(streamId -> StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))).collect(
Joiner.on(":").join(streamId * TEST_ACCOUNT, "multiStreamTest-" + streamId, streamId * 12345))).collect(
Collectors.toCollection(HashSet::new));
Set<StreamIdentifier> expectedSyncedStreams = onlyStreamsDeletionNotLeases ? expectedPendingStreams : Sets.newHashSet();
Assert.assertEquals(expectedSyncedStreams, syncedStreams);
@ -625,7 +642,7 @@ public class SchedulerTest {
});
HashSet<StreamConfig> currentStreamConfigMapOverride = IntStream.range(1, 5).mapToObj(
streamId -> new StreamConfig(StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)),
Joiner.on(":").join(streamId * TEST_ACCOUNT, "multiStreamTest-" + streamId, streamId * 12345)),
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)))
.collect(Collectors.toCollection(HashSet::new));
testMultiStreamStaleStreamsAreDeletedAfterDefermentPeriod(false, currentStreamConfigMapOverride);
@ -637,7 +654,7 @@ public class SchedulerTest {
when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new ProvidedStreamsDeferredDeletionStrategy() {
@Override public List<StreamIdentifier> streamIdentifiersForLeaseCleanup() {
return IntStream.range(1, 3).mapToObj(streamId -> StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))).collect(
Joiner.on(":").join(streamId * TEST_ACCOUNT, "multiStreamTest-" + streamId, streamId * 12345))).collect(
Collectors.toCollection(ArrayList::new));
}
@ -653,12 +670,12 @@ public class SchedulerTest {
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
List<StreamConfig> streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig(
StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)),
Joiner.on(":").join(streamId * TEST_ACCOUNT, "multiStreamTest-" + streamId, streamId * 12345)),
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)))
.collect(Collectors.toCollection(LinkedList::new));
List<StreamConfig> streamConfigList2 = IntStream.range(3, 5).mapToObj(streamId -> new StreamConfig(
StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)),
Joiner.on(":").join(streamId * TEST_ACCOUNT, "multiStreamTest-" + streamId, streamId * 12345)),
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)))
.collect(Collectors.toCollection(LinkedList::new));
retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName)
@ -671,7 +688,7 @@ public class SchedulerTest {
Set<StreamIdentifier> syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases();
Set<StreamIdentifier> expectedSyncedStreams = IntStream.range(1, 3).mapToObj(streamId -> StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))).collect(
Joiner.on(":").join(streamId * TEST_ACCOUNT, "multiStreamTest-" + streamId, streamId * 12345))).collect(
Collectors.toCollection(HashSet::new));
Assert.assertEquals(expectSyncedStreams ? expectedSyncedStreams : Sets.newHashSet(), syncedStreams);
Assert.assertEquals(currentStreamConfigMapOverride == null ? Sets.newHashSet(streamConfigList2) : currentStreamConfigMapOverride,
@ -720,7 +737,7 @@ public class SchedulerTest {
@Override public List<StreamIdentifier> streamIdentifiersForLeaseCleanup() {
return IntStream.range(1, 3)
.mapToObj(streamId -> StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)))
Joiner.on(":").join(streamId * TEST_ACCOUNT, "multiStreamTest-" + streamId, streamId * 12345)))
.collect(Collectors.toCollection(ArrayList::new));
}
@ -750,18 +767,18 @@ public class SchedulerTest {
Set<StreamIdentifier> expectedSyncedStreams;
Set<StreamIdentifier> expectedPendingStreams = IntStream.range(1, 3)
.mapToObj(streamId -> StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)))
Joiner.on(":").join(streamId * TEST_ACCOUNT, "multiStreamTest-" + streamId, streamId * 12345)))
.collect(Collectors.toCollection(HashSet::new));
if (onlyStreamsNoLeasesDeletion) {
expectedSyncedStreams = IntStream.concat(IntStream.range(1, 3), IntStream.range(5, 7))
.mapToObj(streamId -> StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)))
Joiner.on(":").join(streamId * TEST_ACCOUNT, "multiStreamTest-" + streamId, streamId * 12345)))
.collect(Collectors.toCollection(HashSet::new));
} else {
expectedSyncedStreams = IntStream.range(5, 7)
.mapToObj(streamId -> StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)))
Joiner.on(":").join(streamId * TEST_ACCOUNT, "multiStreamTest-" + streamId, streamId * 12345)))
.collect(Collectors.toCollection(HashSet::new));
}
@ -770,13 +787,13 @@ public class SchedulerTest {
if (onlyStreamsNoLeasesDeletion) {
expectedCurrentStreamConfigs = IntStream.range(3, 7).mapToObj(streamId -> new StreamConfig(
StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)),
Joiner.on(":").join(streamId * TEST_ACCOUNT, "multiStreamTest-" + streamId, streamId * 12345)),
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)))
.collect(Collectors.toCollection(LinkedList::new));
} else {
expectedCurrentStreamConfigs = IntStream.range(1, 7).mapToObj(streamId -> new StreamConfig(
StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)),
Joiner.on(":").join(streamId * TEST_ACCOUNT, "multiStreamTest-" + streamId, streamId * 12345)),
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)))
.collect(Collectors.toCollection(LinkedList::new));
}
@ -852,7 +869,7 @@ public class SchedulerTest {
private StreamConfig createDummyStreamConfig(int streamId){
return new StreamConfig(
StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)),
Joiner.on(":").join(streamId * TEST_ACCOUNT, "multiStreamTest-" + streamId, streamId * 12345)),
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST));
}
@ -861,12 +878,12 @@ public class SchedulerTest {
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
List<StreamConfig> streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig(
StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)),
Joiner.on(":").join(streamId * TEST_ACCOUNT, "multiStreamTest-" + streamId, streamId * 12345)),
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)))
.collect(Collectors.toCollection(LinkedList::new));
List<StreamConfig> streamConfigList2 = IntStream.range(3, 7).mapToObj(streamId -> new StreamConfig(
StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)),
Joiner.on(":").join(streamId * TEST_ACCOUNT, "multiStreamTest-" + streamId, streamId * 12345)),
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)))
.collect(Collectors.toCollection(LinkedList::new));
retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName)
@ -883,7 +900,7 @@ public class SchedulerTest {
Set<StreamIdentifier> syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases();
Set<StreamIdentifier> expectedSyncedStreams = IntStream.concat(IntStream.range(1, 3), IntStream.range(5, 7))
.mapToObj(streamId -> StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)))
Joiner.on(":").join(streamId * TEST_ACCOUNT, "multiStreamTest-" + streamId, streamId * 12345)))
.collect(Collectors.toCollection(HashSet::new));
Assert.assertEquals(expectedSyncedStreams, syncedStreams);
Assert.assertEquals(Sets.newHashSet(streamConfigList2),
@ -1080,6 +1097,86 @@ public class SchedulerTest {
.shardId("some_random_shard_id")).collect(Collectors.toList()));
}
@Test
public void testStreamConfigsArePopulatedWithStreamArnsInMultiStreamMode() {
final String streamArnStr = constructStreamArnStr(TEST_REGION, 111122223333L, "some-stream-name");
when(multiStreamTracker.streamConfigList()).thenReturn(Stream.of(
// Each of scheduler's currentStreamConfigMap entries should have a streamARN in
// multi-stream mode, regardless of whether the streamTracker-provided streamIdentifiers
// were created using serialization or stream ARN.
StreamIdentifier.multiStreamInstance(constructStreamIdentifierSer(TEST_ACCOUNT, streamName)),
StreamIdentifier.multiStreamInstance(Arn.fromString(streamArnStr), TEST_EPOCH)
)
.map(streamIdentifier -> new StreamConfig(streamIdentifier, TEST_INITIAL_POSITION))
.collect(Collectors.toList()));
retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName)
.retrievalFactory(retrievalFactory);
scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
metricsConfig, processorConfig, retrievalConfig);
final Set<String> expectedStreamArns =
Sets.newHashSet(constructStreamArnStr(TEST_REGION, TEST_ACCOUNT, streamName), streamArnStr);
final Set<String> actualStreamArns = scheduler.currentStreamConfigMap().values().stream()
.map(sc -> sc.streamIdentifier().streamArnOptional().orElseThrow(IllegalStateException::new).toString())
.collect(Collectors.toSet());
assertEquals(expectedStreamArns, actualStreamArns);
}
@Test
public void testOrphanStreamConfigIsPopulatedWithArn() {
final String streamIdentifierSerializationForOrphan = constructStreamIdentifierSer(TEST_ACCOUNT, streamName);
assertFalse(multiStreamTracker.streamConfigList().stream()
.map(sc -> sc.streamIdentifier().serialize())
.collect(Collectors.toSet())
.contains(streamIdentifierSerializationForOrphan));
when(leaseCoordinator.getCurrentAssignments()).thenReturn(Collections.singletonList(
new ShardInfo(TEST_SHARD_ID, null, null, null, streamIdentifierSerializationForOrphan)));
retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName)
.retrievalFactory(retrievalFactory);
scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
metricsConfig, processorConfig, retrievalConfig);
scheduler.runProcessLoop();
verify(multiStreamTracker).createStreamConfig(
StreamIdentifier.multiStreamInstance(streamIdentifierSerializationForOrphan));
final ArgumentCaptor<StreamConfig> streamConfigArgumentCaptor = ArgumentCaptor.forClass(StreamConfig.class);
verify(retrievalFactory).createGetRecordsCache(any(), streamConfigArgumentCaptor.capture(), any());
final StreamConfig actualStreamConfigForOrphan = streamConfigArgumentCaptor.getValue();
final Optional<Arn> streamArnForOrphan = actualStreamConfigForOrphan.streamIdentifier().streamArnOptional();
assertTrue(streamArnForOrphan.isPresent());
assertEquals(constructStreamArnStr(TEST_REGION, TEST_ACCOUNT, streamName), streamArnForOrphan.get().toString());
}
@Test
public void testMismatchingArnRegionAndKinesisClientRegionThrowsException() {
final Region streamArnRegion = Region.US_WEST_1;
Assert.assertNotEquals(streamArnRegion, kinesisClient.serviceClientConfiguration().region());
when(multiStreamTracker.streamConfigList()).thenReturn(Collections.singletonList(new StreamConfig(
StreamIdentifier.multiStreamInstance(
Arn.fromString(constructStreamArnStr(streamArnRegion, TEST_ACCOUNT, streamName)), TEST_EPOCH),
TEST_INITIAL_POSITION)));
retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName)
.retrievalFactory(retrievalFactory);
assertThrows(IllegalArgumentException.class, () -> new Scheduler(checkpointConfig, coordinatorConfig,
leaseManagementConfig, lifecycleConfig, metricsConfig, processorConfig, retrievalConfig));
}
private static String constructStreamIdentifierSer(long accountId, String streamName) {
return String.join(":", String.valueOf(accountId), streamName, String.valueOf(TEST_EPOCH));
}
private static String constructStreamArnStr(Region region, long accountId, String streamName) {
return "arn:aws:kinesis:" + region + ":" + accountId + ":stream/" + streamName;
}
/*private void runAndTestWorker(int numShards, int threadPoolSize) throws Exception {
final int numberOfRecordsPerShard = 10;
final String kinesisShardPrefix = "kinesis-0-";