diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index e4e63078..5cbaa9c9 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -123,6 +123,7 @@ public class Scheduler implements Runnable { private static final String PENDING_STREAMS_DELETION_COUNT = "StreamsPendingDeletion.Count"; private static final String DELETED_STREAMS_COUNT = "DeletedStreams.Count"; private static final String NON_EXISTING_STREAM_DELETE_COUNT = "NonExistingStreamDelete.Count"; + private static final String IGNORED_STREAMS_COUNT = "IgnoredStreams.Count"; private final SchedulerLog slog = new SchedulerLog(); @@ -504,10 +505,17 @@ public class Scheduler implements Runnable { for (StreamIdentifier streamIdentifier : newStreamConfigMap.keySet()) { if (!currentStreamConfigMap.containsKey(streamIdentifier)) { final StreamConfig streamConfig = newStreamConfigMap.get(streamIdentifier); + try { + currentStreamConfigMap.put(streamIdentifier, streamConfig); + } catch (IllegalArgumentException e) { + log.error("Failed to add stream {} to application. This stream will not be processed.", + streamConfig.streamIdentifier(), e); + MetricsUtil.addCount(metricsScope, IGNORED_STREAMS_COUNT, 1, MetricsLevel.DETAILED); + continue; + } log.info("Found new stream to process: {}. Syncing shards of that stream.", streamConfig); ShardSyncTaskManager shardSyncTaskManager = createOrGetShardSyncTaskManager(streamConfig); shardSyncTaskManager.submitShardSyncTask(); - currentStreamConfigMap.put(streamIdentifier, streamConfig); streamsSynced.add(streamIdentifier); } else { log.debug("{} is already being processed - skipping shard sync.", streamIdentifier); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java index f29c2341..1aa34432 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java @@ -1154,7 +1154,7 @@ public class SchedulerTest { } @Test - public void testMismatchingArnRegionAndKinesisClientRegionThrowsException() { + public void testMismatchingArnRegionAndKinesisClientRegionOnSchedulerConstructionThrowsException() { final Region streamArnRegion = Region.US_WEST_1; Assert.assertNotEquals(streamArnRegion, kinesisClient.serviceClientConfiguration().region()); @@ -1169,6 +1169,35 @@ public class SchedulerTest { leaseManagementConfig, lifecycleConfig, metricsConfig, processorConfig, retrievalConfig)); } + @Test + public void testDynamicallyAddedStreamsWithRegionMismatchingKinesisClientRegionAreIgnored() throws Exception { + final Region mismatchingStreamRegion = Region.US_WEST_1; + final Region kinesisClientRegion = kinesisClient.serviceClientConfiguration().region(); + Assert.assertNotEquals(mismatchingStreamRegion, kinesisClientRegion); + + final StreamIdentifier streamWithMismatchingRegion = StreamIdentifier.multiStreamInstance( + Arn.fromString(constructStreamArnStr(mismatchingStreamRegion, TEST_ACCOUNT, "stream-1")), TEST_EPOCH); + + final StreamIdentifier streamWithMatchingRegion = StreamIdentifier.multiStreamInstance( + Arn.fromString(constructStreamArnStr(kinesisClientRegion, TEST_ACCOUNT, "stream-2")), TEST_EPOCH); + + when(multiStreamTracker.streamConfigList()).thenReturn( + Collections.emptyList(), // returned on scheduler construction + Arrays.asList( // returned on stream sync + new StreamConfig(streamWithMismatchingRegion, TEST_INITIAL_POSITION), + new StreamConfig(streamWithMatchingRegion, TEST_INITIAL_POSITION))); + retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName) + .retrievalFactory(retrievalFactory); + scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, + leaseManagementConfig, lifecycleConfig, metricsConfig, processorConfig, retrievalConfig)); + when(scheduler.shouldSyncStreamsNow()).thenReturn(true); + + final Set syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases(); + final Set currentStreamConfigMapKeys = scheduler.currentStreamConfigMap().keySet(); + assertFalse(Sets.union(currentStreamConfigMapKeys, syncedStreams).contains(streamWithMismatchingRegion)); + assertTrue(Sets.union(currentStreamConfigMapKeys, syncedStreams).contains(streamWithMatchingRegion)); + } + private static String constructStreamIdentifierSer(long accountId, String streamName) { return String.join(":", String.valueOf(accountId), streamName, String.valueOf(TEST_EPOCH)); }