diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfigurationTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfigurationTest.java index 5101243b..07f8082b 100644 --- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfigurationTest.java +++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfigurationTest.java @@ -171,6 +171,14 @@ public class MultiLangDaemonConfigurationTest { utilsBean.setProperty(configuration, "retrievalMode", "invalid"); } + // @Test + // TODO : Enable this test once https://github.com/awslabs/amazon-kinesis-client/issues/692 is resolved + public void testmetricsEnabledDimensions() { + MultiLangDaemonConfiguration configuration = baseConfiguration(); + configuration.setMetricsEnabledDimensions(new String[]{"Operation"}); + configuration.resolvedConfiguration(shardRecordProcessorFactory); + } + @Test public void testFanoutConfigSetConsumerName() { String consumerArn = "test-consumer"; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamConfig.java index 55451709..667f1f1c 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamConfig.java @@ -9,6 +9,8 @@ import lombok.experimental.FieldDefaults; @Accessors(fluent = true) @FieldDefaults(makeFinal=true, level= AccessLevel.PRIVATE) public class StreamConfig { - String streamName; + StreamIdentifier streamIdentifier; InitialPositionInStreamExtended initialPositionInStreamExtended; } + + diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java new file mode 100644 index 00000000..e54c97e6 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java @@ -0,0 +1,33 @@ +package software.amazon.kinesis.common; + +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import software.amazon.awssdk.utils.Validate; + +@RequiredArgsConstructor +@EqualsAndHashCode +@Getter +public class StreamIdentifier { + private final String accountName; + private final String streamName; + private final String streamCreationEpoch; + + private static final String DEFAULT = "default"; + + @Override + public String toString(){ + return accountName + ":" + streamName + ":" + streamCreationEpoch; + } + + public static StreamIdentifier fromString(String streamIdentifier) { + final String[] idTokens = streamIdentifier.split(":"); + Validate.isTrue(idTokens.length == 3, "Unable to deserialize StreamIdentifier from " + streamIdentifier); + return new StreamIdentifier(idTokens[0], idTokens[1], idTokens[2]); + } + + public static StreamIdentifier fromStreamName(String streamName) { + Validate.notEmpty(streamName, "StreamName should not be empty"); + return new StreamIdentifier(DEFAULT, streamName, DEFAULT); + } +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index 547feebb..c94981b9 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -20,6 +20,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; @@ -45,6 +46,7 @@ import software.amazon.awssdk.utils.Validate; import software.amazon.kinesis.checkpoint.CheckpointConfig; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; import software.amazon.kinesis.common.StreamConfig; +import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseManagementConfig; @@ -110,8 +112,8 @@ public class Scheduler implements Runnable { private final DiagnosticEventHandler diagnosticEventHandler; // private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; private final LeaseCoordinator leaseCoordinator; - private final Function shardSyncTaskManagerProvider; - private final Map streamToShardSyncTaskManagerMap = new HashMap<>(); + private final Function shardSyncTaskManagerProvider; + private final Map streamToShardSyncTaskManagerMap = new HashMap<>(); private final ShardPrioritization shardPrioritization; private final boolean cleanupLeasesUponShardCompletion; private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist; @@ -121,11 +123,11 @@ public class Scheduler implements Runnable { private final long failoverTimeMillis; private final long taskBackoffTimeMillis; private final Either appStreamTracker; - private final Map currentStreamConfigMap; + private final Map currentStreamConfigMap; private final long listShardsBackoffTimeMillis; private final int maxListShardsRetryAttempts; private final LeaseRefresher leaseRefresher; - private final Function shardDetectorProvider; + private final Function shardDetectorProvider; private final boolean ignoreUnexpetedChildShards; private final AggregatorUtil aggregatorUtil; private final HierarchicalShardSyncer hierarchicalShardSyncer; @@ -183,15 +185,16 @@ public class Scheduler implements Runnable { this.applicationName = this.coordinatorConfig.applicationName(); final MultiStreamTracker multiStreamTracker = this.retrievalConfig.multiStreamTracker(); if(multiStreamTracker == null) { - final StreamConfig streamConfig = new StreamConfig(this.retrievalConfig.streamName(), + final StreamConfig streamConfig = new StreamConfig(StreamIdentifier.fromStreamName(this.retrievalConfig.streamName()), this.retrievalConfig.initialPositionInStreamExtended()); this.appStreamTracker = Either.right(streamConfig); - this.currentStreamConfigMap = new HashMap() {{ - put(streamConfig.streamName(), streamConfig); + this.currentStreamConfigMap = new HashMap() {{ + put(streamConfig.streamIdentifier(), streamConfig); }}; } else { this.appStreamTracker = Either.left(multiStreamTracker); - this.currentStreamConfigMap = multiStreamTracker.streamConfigMap(); + this.currentStreamConfigMap = multiStreamTracker.streamConfigList().stream() + .collect(Collectors.toMap(sc -> sc.streamIdentifier(), sc -> sc)); } this.maxInitializationAttempts = this.coordinatorConfig.maxInitializationAttempts(); this.metricsFactory = this.metricsConfig.metricsFactory(); @@ -218,11 +221,11 @@ public class Scheduler implements Runnable { this.executorService = this.coordinatorConfig.coordinatorFactory().createExecutorService(); this.diagnosticEventFactory = diagnosticEventFactory; this.diagnosticEventHandler = new DiagnosticEventLogger(); - // TODO : Halo : Handle case of no StreamConfig present in streamConfigMap() for the supplied streamName. - // TODO : Pass the immutable map here instead of using mst.streamConfigMap() - this.shardSyncTaskManagerProvider = streamName -> this.leaseManagementConfig + // TODO : Halo : Handle case of no StreamConfig present in streamConfigList() for the supplied streamName. + // TODO : Pass the immutable map here instead of using mst.streamConfigList() + this.shardSyncTaskManagerProvider = streamIdentifier -> this.leaseManagementConfig .leaseManagementFactory(leaseSerializer, this.appStreamTracker.map(mst -> true, sc -> false)) - .createShardSyncTaskManager(this.metricsFactory, this.currentStreamConfigMap.get(streamName)); + .createShardSyncTaskManager(this.metricsFactory, this.currentStreamConfigMap.get(streamIdentifier)); this.shardPrioritization = this.coordinatorConfig.shardPrioritization(); this.cleanupLeasesUponShardCompletion = this.leaseManagementConfig.cleanupLeasesUponShardCompletion(); this.skipShardSyncAtWorkerInitializationIfLeasesExist = @@ -245,7 +248,7 @@ public class Scheduler implements Runnable { // this.maxGetRecordsThreadPool = this.retrievalConfig.maxGetRecordsThreadPool(); this.listShardsBackoffTimeMillis = this.retrievalConfig.listShardsBackoffTimeInMillis(); this.maxListShardsRetryAttempts = this.retrievalConfig.maxListShardsRetryAttempts(); - this.shardDetectorProvider = streamName -> createOrGetShardSyncTaskManager(streamName).shardDetector(); + this.shardDetectorProvider = streamIdentifier -> createOrGetShardSyncTaskManager(streamIdentifier).shardDetector(); this.ignoreUnexpetedChildShards = this.leaseManagementConfig.ignoreUnexpectedChildShards(); this.aggregatorUtil = this.lifecycleConfig.aggregatorUtil(); // TODO : Halo : Check if this needs to be per stream. @@ -298,17 +301,17 @@ public class Scheduler implements Runnable { if (!skipShardSyncAtWorkerInitializationIfLeasesExist || leaseRefresher.isLeaseTableEmpty()) { // TODO: Resume the shard sync from failed stream in the next attempt, to avoid syncing // TODO: for already synced streams - for(String streamName : currentStreamConfigMap.keySet().stream().collect(Collectors.toList())) { + for(StreamIdentifier streamIdentifier : currentStreamConfigMap.keySet().stream().collect(Collectors.toList())) { log.info("Syncing Kinesis shard info"); - final StreamConfig streamConfig = currentStreamConfigMap.get(streamName); - ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetectorProvider.apply(streamName), + final StreamConfig streamConfig = currentStreamConfigMap.get(streamIdentifier); + ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetectorProvider.apply(streamIdentifier), leaseRefresher, streamConfig.initialPositionInStreamExtended(), cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L, hierarchicalShardSyncer, metricsFactory); result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call(); // Throwing the exception, to prevent further syncs for other stream. if (result.getException() != null) { - log.error("Caught exception when sync'ing info for " + streamName, result.getException()); + log.error("Caught exception when sync'ing info for " + streamIdentifier, result.getException()); throw result.getException(); } } @@ -366,10 +369,8 @@ public class Scheduler implements Runnable { } for (ShardInfo completedShard : completedShards) { - final String streamName = completedShard.streamName() - .orElseGet(() -> appStreamTracker.map(mst -> "", sc -> sc.streamName())); - Validate.notEmpty(streamName, "Stream name should not be empty"); - if (createOrGetShardSyncTaskManager(streamName).syncShardAndLeaseInfo()) { + final StreamIdentifier streamIdentifier = getStreamIdentifier(completedShard.streamIdentifier()); + if (createOrGetShardSyncTaskManager(streamIdentifier).syncShardAndLeaseInfo()) { log.info("Found completed shard, initiated new ShardSyncTak for " + completedShard.toString()); } } @@ -629,8 +630,8 @@ public class Scheduler implements Runnable { return consumer; } - private ShardSyncTaskManager createOrGetShardSyncTaskManager(String streamName) { - return streamToShardSyncTaskManagerMap.computeIfAbsent(streamName, s -> shardSyncTaskManagerProvider.apply(s)); + private ShardSyncTaskManager createOrGetShardSyncTaskManager(StreamIdentifier streamIdentifier) { + return streamToShardSyncTaskManagerMap.computeIfAbsent(streamIdentifier, s -> shardSyncTaskManagerProvider.apply(s)); } protected ShardConsumer buildConsumer(@NonNull final ShardInfo shardInfo, @@ -640,18 +641,17 @@ public class Scheduler implements Runnable { checkpoint); // The only case where streamName is not available will be when multistreamtracker not set. In this case, // get the default stream name for the single stream application. - final String streamName = shardInfo.streamName().orElseGet(() -> appStreamTracker.map(mst -> "", sc -> sc.streamName())); - Validate.notEmpty(streamName, "StreamName should not be empty"); + final StreamIdentifier streamIdentifier = getStreamIdentifier(shardInfo.streamIdentifier()); // Irrespective of single stream app or multi stream app, streamConfig should always be available. // TODO: Halo : if not available, construct a default config ? - final StreamConfig streamConfig = currentStreamConfigMap.get(streamName); + final StreamConfig streamConfig = currentStreamConfigMap.get(streamIdentifier); Validate.notNull(streamConfig, "StreamConfig should not be empty"); ShardConsumerArgument argument = new ShardConsumerArgument(shardInfo, - streamConfig.streamName(), + streamConfig.streamIdentifier(), leaseCoordinator, executorService, cache, - shardRecordProcessorFactory.shardRecordProcessor(streamName), + shardRecordProcessorFactory.shardRecordProcessor(streamIdentifier), checkpoint, checkpointer, parentShardPollIntervalMillis, @@ -664,7 +664,7 @@ public class Scheduler implements Runnable { streamConfig.initialPositionInStreamExtended(), cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, - shardDetectorProvider.apply(streamConfig.streamName()), + shardDetectorProvider.apply(streamConfig.streamIdentifier()), aggregatorUtil, hierarchicalShardSyncer, metricsFactory); @@ -716,6 +716,17 @@ public class Scheduler implements Runnable { executorStateEvent.accept(diagnosticEventHandler); } + private StreamIdentifier getStreamIdentifier(Optional streamIdentifierString) { + final StreamIdentifier streamIdentifier; + if(streamIdentifierString.isPresent()) { + streamIdentifier = StreamIdentifier.fromString(streamIdentifierString.get()); + } else { + streamIdentifier = appStreamTracker.map(mst -> null, sc -> sc.streamIdentifier()); + } + Validate.notNull(streamIdentifier, "Stream identifier should not be empty"); + return streamIdentifier; + } + /** * Logger for suppressing too much INFO logging. To avoid too much logging information Worker will output logging at * INFO level for a single pass through the main loop every minute. At DEBUG level it will output all INFO logs on diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java index d3365514..459d730d 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java @@ -29,7 +29,6 @@ import java.util.function.BiFunction; import java.util.function.Function; import java.util.stream.Collectors; -import lombok.AllArgsConstructor; import lombok.Data; import lombok.experimental.Accessors; import org.apache.commons.lang3.StringUtils; @@ -39,7 +38,6 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.awssdk.utils.CollectionUtils; -import software.amazon.awssdk.utils.Pair; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; @@ -163,7 +161,7 @@ public class HierarchicalShardSyncer { throws DependencyException, ProvisionedThroughputException, InvalidStateException { List streamLeases = new ArrayList<>(); for (Lease lease : leaseRefresher.listLeases()) { - if (streamName.equals(((MultiStreamLease)lease).streamName())) { + if (streamName.equals(((MultiStreamLease)lease).streamIdentifier())) { streamLeases.add(lease); } } @@ -772,7 +770,7 @@ public class HierarchicalShardSyncer { } newLease.parentShardIds(parentShardIds); newLease.ownerSwitchesSinceCheckpoint(0L); - newLease.streamName(streamName); + newLease.streamIdentifier(streamName); newLease.shardId(shard.shardId()); return newLease; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/MultiStreamLease.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/MultiStreamLease.java index 9878e32c..8b29168d 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/MultiStreamLease.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/MultiStreamLease.java @@ -17,13 +17,13 @@ import static com.google.common.base.Verify.verifyNotNull; @Accessors(fluent = true) public class MultiStreamLease extends Lease { - @NonNull private String streamName; + @NonNull private String streamIdentifier; @NonNull private String shardId; public MultiStreamLease(Lease other) { super(other); MultiStreamLease casted = validateAndCast(other); - streamName(casted.streamName); + streamIdentifier(casted.streamIdentifier); shardId(casted.shardId); } @@ -31,7 +31,7 @@ public class MultiStreamLease extends Lease { public void update(Lease other) { MultiStreamLease casted = validateAndCast(other); super.update(casted); - streamName(casted.streamName); + streamIdentifier(casted.streamIdentifier); shardId(casted.shardId); } @@ -43,7 +43,7 @@ public class MultiStreamLease extends Lease { @Override public int hashCode() { - return Objects.hash(super.hashCode(), streamName); + return Objects.hash(super.hashCode(), streamIdentifier); } @Override @@ -58,11 +58,11 @@ public class MultiStreamLease extends Lease { return false; } MultiStreamLease other = (MultiStreamLease) obj; - if (streamName == null) { - if (other.streamName != null) { + if (streamIdentifier == null) { + if (other.streamIdentifier != null) { return false; } - } else if (!streamName.equals(other.streamName)) { + } else if (!streamIdentifier.equals(other.streamIdentifier)) { return false; } return true; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardInfo.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardInfo.java index 0f86efb2..36bc5dd1 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardInfo.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardInfo.java @@ -37,7 +37,7 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; @ToString public class ShardInfo { - private final Optional streamName; + private final Optional streamIdentifier; private final String shardId; private final String concurrencyToken; // Sorted list of parent shardIds. @@ -67,7 +67,7 @@ public class ShardInfo { final String concurrencyToken, final Collection parentShardIds, final ExtendedSequenceNumber checkpoint, - final String streamName) { + final String streamIdentifier) { this.shardId = shardId; this.concurrencyToken = concurrencyToken; this.parentShardIds = new LinkedList<>(); @@ -78,7 +78,7 @@ public class ShardInfo { // This makes it easy to check for equality in ShardInfo.equals method. Collections.sort(this.parentShardIds); this.checkpoint = checkpoint; - this.streamName = Optional.ofNullable(streamName); + this.streamIdentifier = Optional.ofNullable(streamIdentifier); } /** @@ -105,7 +105,7 @@ public class ShardInfo { @Override public int hashCode() { return new HashCodeBuilder() - .append(concurrencyToken).append(parentShardIds).append(shardId).append(streamName.orElse("")).toHashCode(); + .append(concurrencyToken).append(parentShardIds).append(shardId).append(streamIdentifier.orElse("")).toHashCode(); } /** @@ -130,7 +130,7 @@ public class ShardInfo { ShardInfo other = (ShardInfo) obj; return new EqualsBuilder().append(concurrencyToken, other.concurrencyToken) .append(parentShardIds, other.parentShardIds).append(shardId, other.shardId) - .append(streamName.orElse(""), other.streamName.orElse("")).isEquals(); + .append(streamIdentifier.orElse(""), other.streamIdentifier.orElse("")).isEquals(); } @@ -140,8 +140,8 @@ public class ShardInfo { * @return */ public static String getLeaseKey(ShardInfo shardInfo) { - return shardInfo.streamName().isPresent() ? - MultiStreamLease.getLeaseKey(shardInfo.streamName().get(), shardInfo.shardId()) : + return shardInfo.streamIdentifier().isPresent() ? + MultiStreamLease.getLeaseKey(shardInfo.streamIdentifier().get(), shardInfo.shardId()) : shardInfo.shardId(); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java index c0d3913b..f8a11c0a 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java @@ -382,7 +382,7 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator { public static ShardInfo convertLeaseToAssignment(final Lease lease) { if (lease instanceof MultiStreamLease) { return new ShardInfo(((MultiStreamLease) lease).shardId(), lease.concurrencyToken().toString(), lease.parentShardIds(), - lease.checkpoint(), ((MultiStreamLease) lease).streamName()); + lease.checkpoint(), ((MultiStreamLease) lease).streamIdentifier()); } else { return new ShardInfo(lease.leaseKey(), lease.concurrencyToken().toString(), lease.parentShardIds(), lease.checkpoint()); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBMultiStreamLeaseSerializer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBMultiStreamLeaseSerializer.java index 7525273f..b8637bdb 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBMultiStreamLeaseSerializer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBMultiStreamLeaseSerializer.java @@ -21,7 +21,7 @@ public class DynamoDBMultiStreamLeaseSerializer extends DynamoDBLeaseSerializer public Map toDynamoRecord(Lease lease) { final MultiStreamLease multiStreamLease = validateAndCast(lease); final Map result = super.toDynamoRecord(multiStreamLease); - result.put(STREAM_NAME_KEY, DynamoUtils.createAttributeValue(multiStreamLease.streamName())); + result.put(STREAM_NAME_KEY, DynamoUtils.createAttributeValue(multiStreamLease.streamIdentifier())); result.put(SHARD_ID_KEY, DynamoUtils.createAttributeValue(multiStreamLease.shardId())); return result; } @@ -30,7 +30,7 @@ public class DynamoDBMultiStreamLeaseSerializer extends DynamoDBLeaseSerializer public MultiStreamLease fromDynamoRecord(Map dynamoRecord) { final MultiStreamLease multiStreamLease = (MultiStreamLease) super .fromDynamoRecord(dynamoRecord, new MultiStreamLease()); - multiStreamLease.streamName(DynamoUtils.safeGetString(dynamoRecord, STREAM_NAME_KEY)); + multiStreamLease.streamIdentifier(DynamoUtils.safeGetString(dynamoRecord, STREAM_NAME_KEY)); multiStreamLease.shardId(DynamoUtils.safeGetString(dynamoRecord, SHARD_ID_KEY)); return multiStreamLease; } @@ -40,7 +40,7 @@ public class DynamoDBMultiStreamLeaseSerializer extends DynamoDBLeaseSerializer public Map getDynamoUpdateLeaseUpdate(Lease lease) { final MultiStreamLease multiStreamLease = validateAndCast(lease); final Map result = super.getDynamoUpdateLeaseUpdate(multiStreamLease); - result.put(STREAM_NAME_KEY, putUpdate(DynamoUtils.createAttributeValue(multiStreamLease.streamName()))); + result.put(STREAM_NAME_KEY, putUpdate(DynamoUtils.createAttributeValue(multiStreamLease.streamIdentifier()))); result.put(SHARD_ID_KEY, putUpdate(DynamoUtils.createAttributeValue(multiStreamLease.shardId()))); return result; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java index 4f1db733..03ddc6ee 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java @@ -21,8 +21,8 @@ import lombok.experimental.Accessors; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.leases.LeaseCoordinator; -import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.HierarchicalShardSyncer; @@ -41,7 +41,7 @@ public class ShardConsumerArgument { @NonNull private final ShardInfo shardInfo; @NonNull - private final String streamName; + private final StreamIdentifier streamIdentifier; @NonNull private final LeaseCoordinator leaseCoordinator; @NonNull diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/MultiStreamTracker.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/MultiStreamTracker.java index fae7e4cb..e41f7e08 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/MultiStreamTracker.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/MultiStreamTracker.java @@ -2,6 +2,7 @@ package software.amazon.kinesis.processor; import software.amazon.kinesis.common.StreamConfig; +import java.util.List; import java.util.Map; /** @@ -15,5 +16,5 @@ public interface MultiStreamTracker { * * @return List of stream names */ - Map streamConfigMap(); + List streamConfigList(); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/ShardRecordProcessorFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/ShardRecordProcessorFactory.java index 4b691401..e16695b2 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/ShardRecordProcessorFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/ShardRecordProcessorFactory.java @@ -15,6 +15,8 @@ package software.amazon.kinesis.processor; +import software.amazon.kinesis.common.StreamIdentifier; + /** * */ @@ -28,10 +30,10 @@ public interface ShardRecordProcessorFactory { /** * Returns a new instance of the ShardRecordProcessor for a stream - * @param streamName + * @param streamIdentifier * @return ShardRecordProcessor */ - default ShardRecordProcessor shardRecordProcessor(String streamName) { + default ShardRecordProcessor shardRecordProcessor(StreamIdentifier streamIdentifier) { return shardRecordProcessor(); } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java index 5c0f5e9a..d4654323 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java @@ -47,7 +47,7 @@ public class FanOutRetrievalFactory implements RetrievalFactory { @Override public RecordsPublisher createGetRecordsCache(@NonNull final ShardInfo shardInfo, final MetricsFactory metricsFactory) { - final String streamName = shardInfo.streamName().orElse(defaultStreamName); + final String streamName = shardInfo.streamIdentifier().orElse(defaultStreamName); return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(), streamToconsumerArnMap.computeIfAbsent(streamName, consumerArnProvider::apply)); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousBlockingRetrievalFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousBlockingRetrievalFactory.java index 7405730e..4a0dc9d5 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousBlockingRetrievalFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousBlockingRetrievalFactory.java @@ -63,7 +63,7 @@ public class SynchronousBlockingRetrievalFactory implements RetrievalFactory { public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(@NonNull final ShardInfo shardInfo, @NonNull final MetricsFactory metricsFactory) { return new SynchronousGetRecordsRetrievalStrategy( - new KinesisDataFetcher(kinesisClient, shardInfo.streamName().orElse(streamName), shardInfo.shardId(), maxRecords, metricsFactory, kinesisRequestTimeout)); + new KinesisDataFetcher(kinesisClient, shardInfo.streamIdentifier().orElse(streamName), shardInfo.shardId(), maxRecords, metricsFactory, kinesisRequestTimeout)); } @Override diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousPrefetchingRetrievalFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousPrefetchingRetrievalFactory.java index 8b669893..6e2e2b05 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousPrefetchingRetrievalFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousPrefetchingRetrievalFactory.java @@ -68,7 +68,7 @@ public class SynchronousPrefetchingRetrievalFactory implements RetrievalFactory @Override public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(@NonNull final ShardInfo shardInfo, @NonNull final MetricsFactory metricsFactory) { return new SynchronousGetRecordsRetrievalStrategy( - new KinesisDataFetcher(kinesisClient, shardInfo.streamName().orElse(streamName), shardInfo.shardId(), + new KinesisDataFetcher(kinesisClient, shardInfo.streamIdentifier().orElse(streamName), shardInfo.shardId(), maxRecords, metricsFactory, maxFutureWait)); }