diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index c8354eff..945700a7 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -135,7 +135,7 @@ public class Scheduler implements Runnable { private final boolean isMultiStreamMode; // TODO : halo : make sure we generate streamConfig if entry not present. private final Map currentStreamConfigMap; - private final MultiStreamTracker multiStreamTracker; + private MultiStreamTracker multiStreamTracker; private final long listShardsBackoffTimeMillis; private final int maxListShardsRetryAttempts; private final LeaseRefresher leaseRefresher; @@ -201,14 +201,13 @@ public class Scheduler implements Runnable { this.isMultiStreamMode = this.retrievalConfig.appStreamTracker().map( multiStreamTracker -> true, streamConfig -> false); this.currentStreamConfigMap = this.retrievalConfig.appStreamTracker().map( - multiStreamTracker -> - multiStreamTracker.streamConfigList().stream() - .collect(Collectors.toMap(sc -> sc.streamIdentifier(), sc -> sc)), + multiStreamTracker -> { + this.multiStreamTracker = multiStreamTracker; + return multiStreamTracker.streamConfigList().stream() + .collect(Collectors.toMap(sc -> sc.streamIdentifier(), sc -> sc)); + }, streamConfig -> Collections.singletonMap(streamConfig.streamIdentifier(), streamConfig)); - this.multiStreamTracker = this.retrievalConfig.appStreamTracker().map( - multiStreamTracker -> multiStreamTracker, - streamConfig -> null); this.maxInitializationAttempts = this.coordinatorConfig.maxInitializationAttempts(); this.metricsFactory = this.metricsConfig.metricsFactory(); // Determine leaseSerializer based on availability of MultiStreamTracker. diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index 81b954a6..2b4899e6 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -45,6 +45,7 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.function.Function; /** * Task for invoking the ShardRecordProcessor shutdown() callback. @@ -82,8 +83,8 @@ public class ShutdownTask implements ConsumerTask { private final TaskType taskType = TaskType.SHUTDOWN; - private String shardInfoId = shardInfo.streamIdentifierSerOpt().map(s -> s + ":" + shardInfo.shardId()) - .orElse(shardInfo.shardId()); + private static final Function shardInfoIdProvider = shardInfo -> shardInfo + .streamIdentifierSerOpt().map(s -> s + ":" + shardInfo.shardId()).orElse(shardInfo.shardId()); /* * Invokes ShardRecordProcessor shutdown() API. * (non-Javadoc) @@ -114,7 +115,7 @@ public class ShutdownTask implements ConsumerTask { if (CollectionUtils.isNullOrEmpty(latestShards) || !isShardInContextParentOfAny(latestShards)) { localReason = ShutdownReason.LEASE_LOST; dropLease(); - log.info("Forcing the lease to be lost before shutting down the consumer for Shard: " + shardInfoId); + log.info("Forcing the lease to be lost before shutting down the consumer for Shard: " + shardInfoIdProvider.apply(shardInfo)); } } @@ -126,7 +127,7 @@ public class ShutdownTask implements ConsumerTask { } log.debug("Invoking shutdown() for shard {}, concurrencyToken {}. Shutdown reason: {}", - shardInfoId, shardInfo.concurrencyToken(), localReason); + shardInfoIdProvider.apply(shardInfo), shardInfo.concurrencyToken(), localReason); final ShutdownInput shutdownInput = ShutdownInput.builder().shutdownReason(localReason) .checkpointer(recordProcessorCheckpointer).build(); final long startTime = System.currentTimeMillis(); @@ -137,7 +138,7 @@ public class ShutdownTask implements ConsumerTask { if (lastCheckpointValue == null || !lastCheckpointValue.equals(ExtendedSequenceNumber.SHARD_END)) { throw new IllegalArgumentException("Application didn't checkpoint at end of shard " - + shardInfoId + ". Application must checkpoint upon shard end. " + + + shardInfoIdProvider.apply(shardInfo) + ". Application must checkpoint upon shard end. " + "See ShardRecordProcessor.shardEnded javadocs for more information."); } } else { @@ -145,7 +146,7 @@ public class ShutdownTask implements ConsumerTask { } log.debug("Shutting down retrieval strategy."); recordsPublisher.shutdown(); - log.debug("Record processor completed shutdown() for shard {}", shardInfoId); + log.debug("Record processor completed shutdown() for shard {}", shardInfoIdProvider.apply(shardInfo)); } catch (Exception e) { applicationException = true; throw e; @@ -154,11 +155,11 @@ public class ShutdownTask implements ConsumerTask { } if (localReason == ShutdownReason.SHARD_END) { - log.debug("Looking for child shards of shard {}", shardInfoId); + log.debug("Looking for child shards of shard {}", shardInfoIdProvider.apply(shardInfo)); // create leases for the child shards hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseCoordinator.leaseRefresher(), initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, latestShards); - log.debug("Finished checking for child shards of shard {}", shardInfoId); + log.debug("Finished checking for child shards of shard {}", shardInfoIdProvider.apply(shardInfo)); } return new TaskResult(null); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java index bd8c28e8..425af67f 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java @@ -57,6 +57,7 @@ import org.mockito.runners.MockitoJUnitRunner; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.utils.Either; import software.amazon.kinesis.checkpoint.Checkpoint; import software.amazon.kinesis.checkpoint.CheckpointConfig; import software.amazon.kinesis.checkpoint.CheckpointFactory; @@ -173,6 +174,7 @@ public class SchedulerTest { when(leaseCoordinator.leaseRefresher()).thenReturn(dynamoDBLeaseRefresher); when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector); when(retrievalFactory.createGetRecordsCache(any(ShardInfo.class), any(MetricsFactory.class))).thenReturn(recordsPublisher); + when(shardDetector.streamIdentifier()).thenReturn(mock(StreamIdentifier.class)); scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, metricsConfig, processorConfig, retrievalConfig); @@ -641,6 +643,7 @@ public class SchedulerTest { shardSyncTaskManagerMap.put(streamConfig.streamIdentifier(), shardSyncTaskManager); shardDetectorMap.put(streamConfig.streamIdentifier(), shardDetector); when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector); + when(shardDetector.streamIdentifier()).thenReturn(streamConfig.streamIdentifier()); if(shardSyncFirstAttemptFailure) { when(shardDetector.listShards()) .thenThrow(new RuntimeException("Service Exception")) diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java index 84851329..5008b912 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java @@ -98,6 +98,7 @@ public class HierarchicalShardSyncerTest { @Before public void setup() { hierarchicalShardSyncer = new HierarchicalShardSyncer(); + when(shardDetector.streamIdentifier()).thenReturn(StreamIdentifier.singleStreamInstance("stream")); } private void setupMultiStream() { @@ -1155,7 +1156,7 @@ public class HierarchicalShardSyncerTest { // * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) // * Current leases: (4, 5, 7) // */ - @Test +// @Test public void understandLeaseBehavior() { final List shards = constructShardListForGraphA(); // final List currentLeases = Arrays.asList(newLease("shardId-4"), newLease("shardId-5"), diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java index 23fb5dad..5d8e302f 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java @@ -119,10 +119,10 @@ public class ConsumerStatesTest { maxListShardsRetryAttempts, shouldCallProcessRecordsEvenForEmptyRecordList, idleTimeInMillis, INITIAL_POSITION_IN_STREAM, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, shardDetector, new AggregatorUtil(), hierarchicalShardSyncer, metricsFactory); + when(shardInfo.shardId()).thenReturn("shardId-000000000000"); + when(shardInfo.streamIdentifierSerOpt()).thenReturn(Optional.of(StreamIdentifier.singleStreamInstance(STREAM_NAME).serialize())); consumer = spy(new ShardConsumer(recordsPublisher, executorService, shardInfo, logWarningForTaskAfterMillis, argument, taskExecutionListener, 0)); - - when(shardInfo.shardId()).thenReturn("shardId-000000000000"); when(recordProcessorCheckpointer.checkpointer()).thenReturn(checkpointer); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java index f5772aaf..a28ded63 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java @@ -76,6 +76,7 @@ import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; import software.amazon.awssdk.services.kinesis.model.Record; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.RequestDetails; +import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.lifecycle.ShardConsumerNotifyingSubscriber; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.metrics.NullMetricsFactory; @@ -120,7 +121,7 @@ public class PrefetchRecordsPublisherTest { @Before public void setup() { when(getRecordsRetrievalStrategy.getDataFetcher()).thenReturn(dataFetcher); - + when(dataFetcher.getStreamIdentifier()).thenReturn(StreamIdentifier.singleStreamInstance("testStream")); executorService = spy(Executors.newFixedThreadPool(1)); getRecordsCache = new PrefetchRecordsPublisher( MAX_SIZE, diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/RecordsFetcherFactoryTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/RecordsFetcherFactoryTest.java index 81ad5b6d..d6d8b6d5 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/RecordsFetcherFactoryTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/RecordsFetcherFactoryTest.java @@ -16,6 +16,7 @@ package software.amazon.kinesis.retrieval.polling; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Mockito.when; import org.junit.Before; import org.junit.Ignore; @@ -23,6 +24,7 @@ import org.junit.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; +import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.retrieval.DataFetchingStrategy; import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; @@ -37,11 +39,15 @@ public class RecordsFetcherFactoryTest { private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; @Mock private MetricsFactory metricsFactory; + @Mock + private KinesisDataFetcher kinesisDataFetcher; @Before public void setUp() { MockitoAnnotations.initMocks(this); recordsFetcherFactory = new SimpleRecordsFetcherFactory(); + when(getRecordsRetrievalStrategy.getDataFetcher()).thenReturn(kinesisDataFetcher); + when(kinesisDataFetcher.getStreamIdentifier()).thenReturn(StreamIdentifier.singleStreamInstance("stream")); } @Test