Refactored MultiStreamTracker to provide and enhance OOP for both (#1028)

single- and multi-stream trackers.

+ converted `Scheduler#currentStreamConfigMap` to `ConcurrentHashMap`
+ eliminated a responsibility from Scheduler (i.e., orphan config generation)
This commit is contained in:
stair 2023-02-07 02:04:34 -05:00 committed by GitHub
parent dd429a2b1c
commit 17b82a0e67
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 605 additions and 107 deletions

View file

@ -15,6 +15,8 @@
package software.amazon.kinesis.common;
import java.util.function.Function;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
@ -35,6 +37,8 @@ import software.amazon.kinesis.metrics.MetricsConfig;
import software.amazon.kinesis.processor.ProcessorConfig;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.processor.MultiStreamTracker;
import software.amazon.kinesis.processor.SingleStreamTracker;
import software.amazon.kinesis.processor.StreamTracker;
import software.amazon.kinesis.retrieval.RetrievalConfig;
/**
@ -46,9 +50,18 @@ public class ConfigsBuilder {
/**
* Either the name of the stream to consume records from
* Or MultiStreamTracker for all the streams to consume records from
*
* @deprecated Both single- and multi-stream support is now provided by {@link StreamTracker}.
* @see #streamTracker
*/
@Deprecated
private Either<MultiStreamTracker, String> appStreamTracker;
/**
* Stream(s) to be consumed by this KCL application.
*/
private StreamTracker streamTracker;
/**
* Application name for the KCL Worker
*/
@ -115,7 +128,8 @@ public class ConfigsBuilder {
}
/**
* Constructor to initialize ConfigsBuilder with StreamName
* Constructor to initialize ConfigsBuilder for a single stream.
*
* @param streamName
* @param applicationName
* @param kinesisClient
@ -128,18 +142,19 @@ public class ConfigsBuilder {
@NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient,
@NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier,
@NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) {
this.appStreamTracker = Either.right(streamName);
this.applicationName = applicationName;
this.kinesisClient = kinesisClient;
this.dynamoDBClient = dynamoDBClient;
this.cloudWatchClient = cloudWatchClient;
this.workerIdentifier = workerIdentifier;
this.shardRecordProcessorFactory = shardRecordProcessorFactory;
this(new SingleStreamTracker(streamName),
applicationName,
kinesisClient,
dynamoDBClient,
cloudWatchClient,
workerIdentifier,
shardRecordProcessorFactory);
}
/**
* Constructor to initialize ConfigsBuilder with MultiStreamTracker
* @param multiStreamTracker
* Constructor to initialize ConfigsBuilder
*
* @param streamTracker tracker for single- or multi-stream processing
* @param applicationName
* @param kinesisClient
* @param dynamoDBClient
@ -147,17 +162,30 @@ public class ConfigsBuilder {
* @param workerIdentifier
* @param shardRecordProcessorFactory
*/
public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName,
public ConfigsBuilder(@NonNull StreamTracker streamTracker, @NonNull String applicationName,
@NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient,
@NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier,
@NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) {
this.appStreamTracker = Either.left(multiStreamTracker);
this.applicationName = applicationName;
this.kinesisClient = kinesisClient;
this.dynamoDBClient = dynamoDBClient;
this.cloudWatchClient = cloudWatchClient;
this.workerIdentifier = workerIdentifier;
this.shardRecordProcessorFactory = shardRecordProcessorFactory;
// construct both streamTracker and appStreamTracker
streamTracker(streamTracker);
}
public void appStreamTracker(Either<MultiStreamTracker, String> appStreamTracker) {
this.appStreamTracker = appStreamTracker;
streamTracker = appStreamTracker.map(Function.identity(), SingleStreamTracker::new);
}
public void streamTracker(StreamTracker streamTracker) {
this.streamTracker = streamTracker;
this.appStreamTracker = DeprecationUtils.convert(streamTracker,
singleStreamTracker -> singleStreamTracker.streamConfigList().get(0).streamIdentifier().streamName());
}
/**
@ -205,7 +233,6 @@ public class ConfigsBuilder {
return new MetricsConfig(cloudWatchClient(), namespace());
}
/**
* Creates a new instance of ProcessorConfig
*
@ -221,10 +248,6 @@ public class ConfigsBuilder {
* @return RetrievalConfig
*/
public RetrievalConfig retrievalConfig() {
final RetrievalConfig retrievalConfig =
appStreamTracker.map(
multiStreamTracker -> new RetrievalConfig(kinesisClient(), multiStreamTracker, applicationName()),
streamName -> new RetrievalConfig(kinesisClient(), streamName, applicationName()));
return retrievalConfig;
return new RetrievalConfig(kinesisClient(), streamTracker(), applicationName());
}
}

View file

@ -0,0 +1,53 @@
/*
* Copyright 2023 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.common;
import java.util.function.Function;
import software.amazon.awssdk.utils.Either;
import software.amazon.kinesis.processor.MultiStreamTracker;
import software.amazon.kinesis.processor.SingleStreamTracker;
import software.amazon.kinesis.processor.StreamTracker;
/**
* Utility methods to facilitate deprecated code until that deprecated code
* can be safely removed.
*/
public final class DeprecationUtils {
private DeprecationUtils() {
throw new UnsupportedOperationException("utility class");
}
/**
* Converts a {@link StreamTracker} into the deprecated {@code Either<L, R>} convention.
*
* @param streamTracker tracker to convert
*/
@Deprecated
public static <R> Either<MultiStreamTracker, R> convert(
StreamTracker streamTracker,
Function<SingleStreamTracker, R> converter) {
if (streamTracker instanceof MultiStreamTracker) {
return Either.left((MultiStreamTracker) streamTracker);
} else if (streamTracker instanceof SingleStreamTracker) {
return Either.right(converter.apply((SingleStreamTracker) streamTracker));
} else {
throw new IllegalArgumentException("Unhandled StreamTracker: " + streamTracker);
}
}
}

View file

@ -53,7 +53,6 @@ import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.utils.Validate;
import software.amazon.kinesis.checkpoint.CheckpointConfig;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
@ -88,10 +87,10 @@ import software.amazon.kinesis.metrics.MetricsScope;
import software.amazon.kinesis.metrics.MetricsUtil;
import software.amazon.kinesis.processor.Checkpointer;
import software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy;
import software.amazon.kinesis.processor.MultiStreamTracker;
import software.amazon.kinesis.processor.ProcessorConfig;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.processor.ShutdownNotificationAware;
import software.amazon.kinesis.processor.StreamTracker;
import software.amazon.kinesis.retrieval.AggregatorUtil;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RetrievalConfig;
@ -138,7 +137,6 @@ public class Scheduler implements Runnable {
private final ExecutorService executorService;
private final DiagnosticEventFactory diagnosticEventFactory;
private final DiagnosticEventHandler diagnosticEventHandler;
// private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
private final LeaseCoordinator leaseCoordinator;
private final Function<StreamConfig, ShardSyncTaskManager> shardSyncTaskManagerProvider;
private final Map<StreamConfig, ShardSyncTaskManager> streamToShardSyncTaskManagerMap = new HashMap<>();
@ -152,10 +150,9 @@ public class Scheduler implements Runnable {
private final long failoverTimeMillis;
private final long taskBackoffTimeMillis;
private final boolean isMultiStreamMode;
private final Map<StreamIdentifier, StreamConfig> currentStreamConfigMap;
private MultiStreamTracker multiStreamTracker;
private FormerStreamsLeasesDeletionStrategy formerStreamsLeasesDeletionStrategy;
private InitialPositionInStreamExtended orphanedStreamInitialPositionInStream;
private final Map<StreamIdentifier, StreamConfig> currentStreamConfigMap = new ConcurrentHashMap<>();
private final StreamTracker streamTracker;
private final FormerStreamsLeasesDeletionStrategy formerStreamsLeasesDeletionStrategy;
private final long listShardsBackoffTimeMillis;
private final int maxListShardsRetryAttempts;
private final LeaseRefresher leaseRefresher;
@ -222,23 +219,12 @@ public class Scheduler implements Runnable {
this.retrievalConfig = retrievalConfig;
this.applicationName = this.coordinatorConfig.applicationName();
this.isMultiStreamMode = this.retrievalConfig.appStreamTracker().map(
multiStreamTracker -> true, streamConfig -> false);
this.currentStreamConfigMap = this.retrievalConfig.appStreamTracker().map(
multiStreamTracker -> {
this.multiStreamTracker = multiStreamTracker;
this.formerStreamsLeasesDeletionStrategy = multiStreamTracker.formerStreamsLeasesDeletionStrategy();
this.orphanedStreamInitialPositionInStream = multiStreamTracker.orphanedStreamInitialPositionInStream();
return multiStreamTracker.streamConfigList().stream()
.collect(Collectors.toMap(sc -> sc.streamIdentifier(), sc -> sc));
},
streamConfig -> {
// use a concrete, non-singleton map to allow computeIfAbsent(...)
// without forcing behavioral differences for multi-stream support
final Map<StreamIdentifier, StreamConfig> map = new HashMap<>();
map.put(streamConfig.streamIdentifier(), streamConfig);
return map;
});
this.streamTracker = retrievalConfig.streamTracker();
this.isMultiStreamMode = streamTracker.isMultiStream();
this.formerStreamsLeasesDeletionStrategy = streamTracker.formerStreamsLeasesDeletionStrategy();
streamTracker.streamConfigList().forEach(
sc -> currentStreamConfigMap.put(sc.streamIdentifier(), sc));
this.maxInitializationAttempts = this.coordinatorConfig.maxInitializationAttempts();
this.metricsFactory = this.metricsConfig.metricsFactory();
// Determine leaseSerializer based on availability of MultiStreamTracker.
@ -464,12 +450,8 @@ public class Scheduler implements Runnable {
final MetricsScope metricsScope = MetricsUtil.createMetricsWithOperation(metricsFactory, MULTI_STREAM_TRACKER);
try {
final Map<StreamIdentifier, StreamConfig> newStreamConfigMap = new HashMap<>();
final Duration waitPeriodToDeleteOldStreams = formerStreamsLeasesDeletionStrategy.waitPeriodToDeleteFormerStreams();
// Making an immutable copy
newStreamConfigMap.putAll(multiStreamTracker.streamConfigList().stream()
.collect(Collectors.toMap(sc -> sc.streamIdentifier(), sc -> sc)));
final Map<StreamIdentifier, StreamConfig> newStreamConfigMap = streamTracker.streamConfigList()
.stream().collect(Collectors.toMap(StreamConfig::streamIdentifier, Function.identity()));
List<MultiStreamLease> leases;
@ -549,6 +531,8 @@ public class Scheduler implements Runnable {
}
}
final Duration waitPeriodToDeleteOldStreams =
formerStreamsLeasesDeletionStrategy.waitPeriodToDeleteFormerStreams();
// Now let's scan the streamIdentifiersForLeaseCleanup eligible for deferred deletion and delete them.
// StreamIdentifiers are eligible for deletion only when the deferment period has elapsed and
// the streamIdentifiersForLeaseCleanup are not present in the latest snapshot.
@ -594,7 +578,7 @@ public class Scheduler implements Runnable {
.collect(Collectors.toSet());
for (StreamIdentifier streamIdentifier : streamIdentifiers) {
if (!currentStreamConfigMap.containsKey(streamIdentifier)) {
currentStreamConfigMap.put(streamIdentifier, getOrphanedStreamConfig(streamIdentifier));
currentStreamConfigMap.put(streamIdentifier, streamTracker.createStreamConfig(streamIdentifier));
}
}
}
@ -652,17 +636,6 @@ public class Scheduler implements Runnable {
return true;
}
/**
* Generates default StreamConfig for an "orphaned" stream that is in the lease table but not tracked.
*
* @param streamIdentifier stream for which an orphan config should be generated
*/
private StreamConfig getOrphanedStreamConfig(StreamIdentifier streamIdentifier) {
final StreamConfig orphanConfig = new StreamConfig(streamIdentifier, orphanedStreamInitialPositionInStream);
log.info("Identified as orphan: {}", orphanConfig);
return orphanConfig;
}
/**
* Returns whether worker can shutdown immediately. Note that this method is called from Worker's {{@link #run()}
* method before every loop run, so method must do minimum amount of work to not impact shard processing timings.
@ -922,7 +895,10 @@ public class Scheduler implements Runnable {
// Irrespective of single stream app or multi stream app, streamConfig should always be available.
// If we have a shardInfo, that is not present in currentStreamConfigMap for whatever reason, then return default stream config
// to gracefully complete the reading.
final StreamConfig streamConfig = currentStreamConfigMap.computeIfAbsent(streamIdentifier, this::getOrphanedStreamConfig);
StreamConfig streamConfig = currentStreamConfigMap.get(streamIdentifier);
if (streamConfig == null) {
streamConfig = streamTracker.createStreamConfig(streamIdentifier);
}
Validate.notNull(streamConfig, "StreamConfig should not be null");
RecordsPublisher cache = retrievalConfig.retrievalFactory().createGetRecordsCache(shardInfo, streamConfig, metricsFactory);
ShardConsumerArgument argument = new ShardConsumerArgument(shardInfo,

View file

@ -15,43 +15,14 @@
package software.amazon.kinesis.processor;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamConfig;
import java.util.List;
/**
* Interface for stream trackers. This is useful for KCL Workers that need
* to consume data from multiple streams.
* KCL will periodically probe this interface to learn about the new and old streams.
* Tracker for consuming multiple Kinesis streams.
*/
public interface MultiStreamTracker {
public interface MultiStreamTracker extends StreamTracker {
/**
* Returns the list of stream config, to be processed by the current application.
* <b>Note that the streams list CAN be changed during the application runtime.</b>
* This method will be called periodically by the KCL to learn about the change in streams to process.
*
* @return List of StreamConfig
*/
List<StreamConfig> streamConfigList();
/**
* Strategy to delete leases of old streams in the lease table.
* <b>Note that the strategy CANNOT be changed during the application runtime.</b>
*
* @return StreamsLeasesDeletionStrategy
*/
FormerStreamsLeasesDeletionStrategy formerStreamsLeasesDeletionStrategy();
/**
* The position for getting records from an "orphaned" stream that is in the lease table but not tracked
* Default assumes that the stream no longer need to be tracked, so use LATEST for faster shard end.
*
* <p>Default value: {@link InitialPositionInStream#LATEST}</p>
*/
default InitialPositionInStreamExtended orphanedStreamInitialPositionInStream() {
return InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST);
@Override
default boolean isMultiStream() {
return true;
}
}

View file

@ -0,0 +1,81 @@
/*
* Copyright 2023 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.processor;
import java.util.Collections;
import java.util.List;
import lombok.EqualsAndHashCode;
import lombok.NonNull;
import lombok.ToString;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.common.StreamIdentifier;
/**
* Tracker for consuming a single Kinesis stream.
*/
@EqualsAndHashCode
@ToString
public class SingleStreamTracker implements StreamTracker {
/**
* By default, single-stream applications should expect the target stream
* to exist for the duration of the application. Therefore, there is no
* expectation for the leases to be deleted mid-execution.
*/
private static final FormerStreamsLeasesDeletionStrategy NO_LEASE_DELETION =
new FormerStreamsLeasesDeletionStrategy.NoLeaseDeletionStrategy();
private final StreamIdentifier streamIdentifier;
private final List<StreamConfig> streamConfigs;
public SingleStreamTracker(String streamName) {
this(StreamIdentifier.singleStreamInstance(streamName));
}
public SingleStreamTracker(StreamIdentifier streamIdentifier) {
this(streamIdentifier, DEFAULT_POSITION_IN_STREAM);
}
public SingleStreamTracker(
StreamIdentifier streamIdentifier,
@NonNull InitialPositionInStreamExtended initialPosition) {
this(streamIdentifier, new StreamConfig(streamIdentifier, initialPosition));
}
public SingleStreamTracker(@NonNull StreamIdentifier streamIdentifier, @NonNull StreamConfig streamConfig) {
this.streamIdentifier = streamIdentifier;
this.streamConfigs = Collections.singletonList(streamConfig);
}
@Override
public List<StreamConfig> streamConfigList() {
return streamConfigs;
}
@Override
public FormerStreamsLeasesDeletionStrategy formerStreamsLeasesDeletionStrategy() {
return NO_LEASE_DELETION;
}
@Override
public boolean isMultiStream() {
return false;
}
}

View file

@ -0,0 +1,85 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.processor;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.common.StreamIdentifier;
import java.util.List;
/**
* Interface for stream trackers.
* KCL will periodically probe this interface to learn about the new and old streams.
*/
public interface StreamTracker {
/**
* Default position to begin consuming records from a Kinesis stream.
*
* @see #orphanedStreamInitialPositionInStream()
*/
InitialPositionInStreamExtended DEFAULT_POSITION_IN_STREAM =
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST);
/**
* Returns the list of stream config, to be processed by the current application.
* <b>Note that the streams list CAN be changed during the application runtime.</b>
* This method will be called periodically by the KCL to learn about the change in streams to process.
*
* @return List of StreamConfig
*/
List<StreamConfig> streamConfigList();
/**
* Strategy to delete leases of old streams in the lease table.
* <b>Note that the strategy CANNOT be changed during the application runtime.</b>
*
* @return StreamsLeasesDeletionStrategy
*/
FormerStreamsLeasesDeletionStrategy formerStreamsLeasesDeletionStrategy();
/**
* The position for getting records from an "orphaned" stream that is in the lease table but not tracked
* Default assumes that the stream no longer need to be tracked, so use LATEST for faster shard end.
*
* <p>Default value: {@link InitialPositionInStream#LATEST}</p>
*/
default InitialPositionInStreamExtended orphanedStreamInitialPositionInStream() {
return DEFAULT_POSITION_IN_STREAM;
}
/**
* Returns a new {@link StreamConfig} for the provided stream identifier.
*
* @param streamIdentifier stream for which to create a new config
*/
default StreamConfig createStreamConfig(StreamIdentifier streamIdentifier) {
return new StreamConfig(streamIdentifier, orphanedStreamInitialPositionInStream());
}
/**
* Returns true if this application should accommodate the consumption of
* more than one Kinesis stream.
* <p>
* <b>This method must be consistent.</b> Varying the returned value will
* have indeterminate, and likely problematic, effects on stream processing.
* </p>
*/
boolean isMultiStream();
}

View file

@ -24,12 +24,14 @@ import lombok.ToString;
import lombok.experimental.Accessors;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.utils.Either;
import software.amazon.kinesis.common.DeprecationUtils;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.KinesisClientLibraryPackage;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.processor.MultiStreamTracker;
import software.amazon.kinesis.processor.SingleStreamTracker;
import software.amazon.kinesis.processor.StreamTracker;
import software.amazon.kinesis.retrieval.fanout.FanOutConfig;
import software.amazon.kinesis.retrieval.polling.PollingConfig;
@ -67,9 +69,17 @@ public class RetrievalConfig {
/**
* AppStreamTracker either for multi stream tracking or single stream
*
* @deprecated Both single- and multi-stream support is now provided by {@link StreamTracker}.
* @see #streamTracker
*/
private Either<MultiStreamTracker, StreamConfig> appStreamTracker;
/**
* Stream(s) to be consumed by this KCL application.
*/
private StreamTracker streamTracker;
/**
* Backoff time between consecutive ListShards calls.
*
@ -95,7 +105,12 @@ public class RetrievalConfig {
* <p>
* Default value: {@link InitialPositionInStream#LATEST}
* </p>
*
* @deprecated Initial stream position is now handled by {@link StreamTracker}.
* @see StreamTracker#orphanedStreamInitialPositionInStream()
* @see StreamTracker#createConfig(StreamIdentifier)
*/
@Deprecated
private InitialPositionInStreamExtended initialPositionInStreamExtended = InitialPositionInStreamExtended
.newInitialPosition(InitialPositionInStream.LATEST);
@ -105,27 +120,36 @@ public class RetrievalConfig {
public RetrievalConfig(@NonNull KinesisAsyncClient kinesisAsyncClient, @NonNull String streamName,
@NonNull String applicationName) {
this.kinesisClient = kinesisAsyncClient;
this.appStreamTracker = Either
.right(new StreamConfig(StreamIdentifier.singleStreamInstance(streamName), initialPositionInStreamExtended));
this.applicationName = applicationName;
this(kinesisAsyncClient, new SingleStreamTracker(streamName), applicationName);
}
public RetrievalConfig(@NonNull KinesisAsyncClient kinesisAsyncClient, @NonNull MultiStreamTracker multiStreamTracker,
public RetrievalConfig(@NonNull KinesisAsyncClient kinesisAsyncClient, @NonNull StreamTracker streamTracker,
@NonNull String applicationName) {
this.kinesisClient = kinesisAsyncClient;
this.appStreamTracker = Either.left(multiStreamTracker);
this.streamTracker = streamTracker;
this.applicationName = applicationName;
this.appStreamTracker = DeprecationUtils.convert(streamTracker,
singleStreamTracker -> singleStreamTracker.streamConfigList().get(0));
}
/**
*
* @param initialPositionInStreamExtended
*
* @deprecated Initial stream position is now handled by {@link StreamTracker}.
* @see StreamTracker#orphanedStreamInitialPositionInStream()
* @see StreamTracker#createConfig(StreamIdentifier)
*/
@Deprecated
public RetrievalConfig initialPositionInStreamExtended(InitialPositionInStreamExtended initialPositionInStreamExtended) {
final StreamConfig[] streamConfig = new StreamConfig[1];
this.appStreamTracker.apply(multiStreamTracker -> {
throw new IllegalArgumentException(
"Cannot set initialPositionInStreamExtended when multiStreamTracker is set");
}, sc -> streamConfig[0] = sc);
this.appStreamTracker = Either
.right(new StreamConfig(streamConfig[0].streamIdentifier(), initialPositionInStreamExtended));
}, sc -> {
final StreamConfig updatedConfig = new StreamConfig(sc.streamIdentifier(), initialPositionInStreamExtended);
streamTracker = new SingleStreamTracker(sc.streamIdentifier(), updatedConfig);
appStreamTracker = Either.right(updatedConfig);
});
return this;
}

View file

@ -0,0 +1,89 @@
/*
* Copyright 2023 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.common;
import java.util.Arrays;
import java.util.Optional;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.mockito.Mockito.mock;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.processor.MultiStreamTracker;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.processor.SingleStreamTracker;
import software.amazon.kinesis.processor.StreamTracker;
public class ConfigsBuilderTest {
@Mock
private KinesisAsyncClient mockKinesisClient;
@Mock
private DynamoDbAsyncClient mockDynamoClient;
@Mock
private CloudWatchAsyncClient mockCloudWatchClient;
@Mock
private ShardRecordProcessorFactory mockShardProcessorFactory;
private static final String APPLICATION_NAME = ConfigsBuilderTest.class.getSimpleName();
private static final String WORKER_IDENTIFIER = "worker-id";
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
}
@Test
public void testTrackerConstruction() {
final String streamName = "single-stream";
final ConfigsBuilder configByName = createConfig(streamName);
final ConfigsBuilder configBySingleTracker = createConfig(new SingleStreamTracker(streamName));
for (final ConfigsBuilder cb : Arrays.asList(configByName, configBySingleTracker)) {
assertEquals(Optional.empty(), cb.appStreamTracker().left());
assertEquals(streamName, cb.appStreamTracker().right().get());
assertEquals(streamName, cb.streamTracker().streamConfigList().get(0).streamIdentifier().streamName());
assertFalse(cb.streamTracker().isMultiStream());
}
final StreamTracker mockMultiStreamTracker = mock(MultiStreamTracker.class);
final ConfigsBuilder configByMultiTracker = createConfig(mockMultiStreamTracker);
assertEquals(Optional.empty(), configByMultiTracker.appStreamTracker().right());
assertEquals(mockMultiStreamTracker, configByMultiTracker.appStreamTracker().left().get());
assertEquals(mockMultiStreamTracker, configByMultiTracker.streamTracker());
}
private ConfigsBuilder createConfig(String streamName) {
return new ConfigsBuilder(streamName, APPLICATION_NAME, mockKinesisClient, mockDynamoClient,
mockCloudWatchClient, WORKER_IDENTIFIER, mockShardProcessorFactory);
}
private ConfigsBuilder createConfig(StreamTracker streamTracker) {
return new ConfigsBuilder(streamTracker, APPLICATION_NAME, mockKinesisClient, mockDynamoClient,
mockCloudWatchClient, WORKER_IDENTIFIER, mockShardProcessorFactory);
}
}

View file

@ -0,0 +1,45 @@
/*
* Copyright 2023 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.common;
import java.util.function.Function;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import org.junit.Test;
import software.amazon.awssdk.utils.Either;
import software.amazon.kinesis.processor.MultiStreamTracker;
import software.amazon.kinesis.processor.SingleStreamTracker;
import software.amazon.kinesis.processor.StreamTracker;
public class DeprecationUtilsTest {
@Test
public void testTrackerConversion() {
final StreamTracker mockMultiTracker = mock(MultiStreamTracker.class);
assertEquals(Either.left(mockMultiTracker), DeprecationUtils.convert(mockMultiTracker, Function.identity()));
final StreamTracker mockSingleTracker = mock(SingleStreamTracker.class);
assertEquals(Either.right(mockSingleTracker), DeprecationUtils.convert(mockSingleTracker, Function.identity()));
}
@Test(expected = IllegalArgumentException.class)
public void testUnsupportedStreamTrackerConversion() {
DeprecationUtils.convert(mock(StreamTracker.class), Function.identity());
}
}

View file

@ -0,0 +1,66 @@
/*
* Copyright 2023 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.processor;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertThat;
import org.hamcrest.Matchers;
import org.junit.Test;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.common.StreamIdentifier;
public class SingleStreamTrackerTest {
private static final String STREAM_NAME = SingleStreamTrackerTest.class.getSimpleName();
@Test
public void testDefaults() {
validate(new SingleStreamTracker(STREAM_NAME));
validate(new SingleStreamTracker(StreamIdentifier.singleStreamInstance(STREAM_NAME)));
}
@Test
public void testInitialPositionConstructor() {
final InitialPositionInStreamExtended expectedPosition =
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON);
assertNotEquals(expectedPosition, StreamTracker.DEFAULT_POSITION_IN_STREAM);
final StreamTracker tracker = new SingleStreamTracker(
StreamIdentifier.singleStreamInstance(STREAM_NAME), expectedPosition);
validate(tracker, expectedPosition);
}
private static void validate(StreamTracker tracker) {
validate(tracker, StreamTracker.DEFAULT_POSITION_IN_STREAM);
}
private static void validate(StreamTracker tracker, InitialPositionInStreamExtended expectedPosition) {
assertEquals(1, tracker.streamConfigList().size());
assertFalse(tracker.isMultiStream());
assertThat(tracker.formerStreamsLeasesDeletionStrategy(),
Matchers.instanceOf(FormerStreamsLeasesDeletionStrategy.NoLeaseDeletionStrategy.class));
final StreamConfig config = tracker.streamConfigList().get(0);
assertEquals(STREAM_NAME, config.streamIdentifier().streamName());
assertEquals(expectedPosition, config.initialPositionInStreamExtended());
}
}

View file

@ -0,0 +1,85 @@
package software.amazon.kinesis.retrieval;
import java.util.Arrays;
import java.util.Optional;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.mockito.Mockito.mock;
import static software.amazon.kinesis.common.InitialPositionInStream.LATEST;
import static software.amazon.kinesis.common.InitialPositionInStream.TRIM_HORIZON;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.processor.MultiStreamTracker;
import software.amazon.kinesis.processor.SingleStreamTracker;
import software.amazon.kinesis.processor.StreamTracker;
public class RetrievalConfigTest {
private static final String APPLICATION_NAME = RetrievalConfigTest.class.getSimpleName();
@Mock
private KinesisAsyncClient mockKinesisClient;
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
}
@Test
public void testTrackerConstruction() {
final String streamName = "single-stream";
final RetrievalConfig configByName = createConfig(streamName);
final SingleStreamTracker singleTracker = new SingleStreamTracker(streamName);
final RetrievalConfig configBySingleTracker = createConfig(singleTracker);
for (final RetrievalConfig rc : Arrays.asList(configByName, configBySingleTracker)) {
assertEquals(Optional.empty(), rc.appStreamTracker().left());
assertEquals(singleTracker, rc.streamTracker());
assertEquals(1, rc.streamTracker().streamConfigList().size());
assertFalse(rc.streamTracker().isMultiStream());
}
final StreamTracker mockMultiStreamTracker = mock(MultiStreamTracker.class);
final RetrievalConfig configByMultiTracker = createConfig(mockMultiStreamTracker);
assertEquals(Optional.empty(), configByMultiTracker.appStreamTracker().right());
assertEquals(mockMultiStreamTracker, configByMultiTracker.appStreamTracker().left().get());
assertEquals(mockMultiStreamTracker, configByMultiTracker.streamTracker());
}
@Test
public void testUpdateInitialPositionInSingleStream() {
final RetrievalConfig config = createConfig(new SingleStreamTracker("foo"));
for (final StreamConfig sc : config.streamTracker().streamConfigList()) {
assertEquals(LATEST, sc.initialPositionInStreamExtended().getInitialPositionInStream());
}
config.initialPositionInStreamExtended(
InitialPositionInStreamExtended.newInitialPosition(TRIM_HORIZON));
for (final StreamConfig sc : config.streamTracker().streamConfigList()) {
assertEquals(TRIM_HORIZON, sc.initialPositionInStreamExtended().getInitialPositionInStream());
}
}
@Test(expected = IllegalArgumentException.class)
public void testUpdateInitialPositionInMultiStream() {
final RetrievalConfig config = createConfig(mock(MultiStreamTracker.class));
config.initialPositionInStreamExtended(
InitialPositionInStreamExtended.newInitialPosition(TRIM_HORIZON));
}
private RetrievalConfig createConfig(String streamName) {
return new RetrievalConfig(mockKinesisClient, streamName, APPLICATION_NAME);
}
private RetrievalConfig createConfig(StreamTracker streamTracker) {
return new RetrievalConfig(mockKinesisClient, streamTracker, APPLICATION_NAME);
}
}