Ignore dynamically added streams with mismatching region, rather than propagating exception

This commit is contained in:
Furqaan Ali 2024-05-03 01:27:02 -07:00
parent 16e8404dc4
commit a348a35d42
2 changed files with 39 additions and 2 deletions

View file

@ -123,6 +123,7 @@ public class Scheduler implements Runnable {
private static final String PENDING_STREAMS_DELETION_COUNT = "StreamsPendingDeletion.Count"; private static final String PENDING_STREAMS_DELETION_COUNT = "StreamsPendingDeletion.Count";
private static final String DELETED_STREAMS_COUNT = "DeletedStreams.Count"; private static final String DELETED_STREAMS_COUNT = "DeletedStreams.Count";
private static final String NON_EXISTING_STREAM_DELETE_COUNT = "NonExistingStreamDelete.Count"; private static final String NON_EXISTING_STREAM_DELETE_COUNT = "NonExistingStreamDelete.Count";
private static final String IGNORED_STREAMS_COUNT = "IgnoredStreams.Count";
private final SchedulerLog slog = new SchedulerLog(); private final SchedulerLog slog = new SchedulerLog();
@ -504,10 +505,17 @@ public class Scheduler implements Runnable {
for (StreamIdentifier streamIdentifier : newStreamConfigMap.keySet()) { for (StreamIdentifier streamIdentifier : newStreamConfigMap.keySet()) {
if (!currentStreamConfigMap.containsKey(streamIdentifier)) { if (!currentStreamConfigMap.containsKey(streamIdentifier)) {
final StreamConfig streamConfig = newStreamConfigMap.get(streamIdentifier); final StreamConfig streamConfig = newStreamConfigMap.get(streamIdentifier);
try {
currentStreamConfigMap.put(streamIdentifier, streamConfig);
} catch (IllegalArgumentException e) {
log.error("Failed to add stream {} to application. This stream will not be processed.",
streamConfig.streamIdentifier(), e);
MetricsUtil.addCount(metricsScope, IGNORED_STREAMS_COUNT, 1, MetricsLevel.DETAILED);
continue;
}
log.info("Found new stream to process: {}. Syncing shards of that stream.", streamConfig); log.info("Found new stream to process: {}. Syncing shards of that stream.", streamConfig);
ShardSyncTaskManager shardSyncTaskManager = createOrGetShardSyncTaskManager(streamConfig); ShardSyncTaskManager shardSyncTaskManager = createOrGetShardSyncTaskManager(streamConfig);
shardSyncTaskManager.submitShardSyncTask(); shardSyncTaskManager.submitShardSyncTask();
currentStreamConfigMap.put(streamIdentifier, streamConfig);
streamsSynced.add(streamIdentifier); streamsSynced.add(streamIdentifier);
} else { } else {
log.debug("{} is already being processed - skipping shard sync.", streamIdentifier); log.debug("{} is already being processed - skipping shard sync.", streamIdentifier);

View file

@ -1154,7 +1154,7 @@ public class SchedulerTest {
} }
@Test @Test
public void testMismatchingArnRegionAndKinesisClientRegionThrowsException() { public void testMismatchingArnRegionAndKinesisClientRegionOnSchedulerConstructionThrowsException() {
final Region streamArnRegion = Region.US_WEST_1; final Region streamArnRegion = Region.US_WEST_1;
Assert.assertNotEquals(streamArnRegion, kinesisClient.serviceClientConfiguration().region()); Assert.assertNotEquals(streamArnRegion, kinesisClient.serviceClientConfiguration().region());
@ -1169,6 +1169,35 @@ public class SchedulerTest {
leaseManagementConfig, lifecycleConfig, metricsConfig, processorConfig, retrievalConfig)); leaseManagementConfig, lifecycleConfig, metricsConfig, processorConfig, retrievalConfig));
} }
@Test
public void testDynamicallyAddedStreamsWithRegionMismatchingKinesisClientRegionAreIgnored() throws Exception {
final Region mismatchingStreamRegion = Region.US_WEST_1;
final Region kinesisClientRegion = kinesisClient.serviceClientConfiguration().region();
Assert.assertNotEquals(mismatchingStreamRegion, kinesisClientRegion);
final StreamIdentifier streamWithMismatchingRegion = StreamIdentifier.multiStreamInstance(
Arn.fromString(constructStreamArnStr(mismatchingStreamRegion, TEST_ACCOUNT, "stream-1")), TEST_EPOCH);
final StreamIdentifier streamWithMatchingRegion = StreamIdentifier.multiStreamInstance(
Arn.fromString(constructStreamArnStr(kinesisClientRegion, TEST_ACCOUNT, "stream-2")), TEST_EPOCH);
when(multiStreamTracker.streamConfigList()).thenReturn(
Collections.emptyList(), // returned on scheduler construction
Arrays.asList( // returned on stream sync
new StreamConfig(streamWithMismatchingRegion, TEST_INITIAL_POSITION),
new StreamConfig(streamWithMatchingRegion, TEST_INITIAL_POSITION)));
retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName)
.retrievalFactory(retrievalFactory);
scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig,
leaseManagementConfig, lifecycleConfig, metricsConfig, processorConfig, retrievalConfig));
when(scheduler.shouldSyncStreamsNow()).thenReturn(true);
final Set<StreamIdentifier> syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases();
final Set<StreamIdentifier> currentStreamConfigMapKeys = scheduler.currentStreamConfigMap().keySet();
assertFalse(Sets.union(currentStreamConfigMapKeys, syncedStreams).contains(streamWithMismatchingRegion));
assertTrue(Sets.union(currentStreamConfigMapKeys, syncedStreams).contains(streamWithMatchingRegion));
}
private static String constructStreamIdentifierSer(long accountId, String streamName) { private static String constructStreamIdentifierSer(long accountId, String streamName) {
return String.join(":", String.valueOf(accountId), streamName, String.valueOf(TEST_EPOCH)); return String.join(":", String.valueOf(accountId), streamName, String.valueOf(TEST_EPOCH));
} }