diff --git a/amazon-kinesis-client-multilang/pom.xml b/amazon-kinesis-client-multilang/pom.xml index f5f21d96..ea3acfe1 100644 --- a/amazon-kinesis-client-multilang/pom.xml +++ b/amazon-kinesis-client-multilang/pom.xml @@ -104,6 +104,12 @@ + + org.junit.jupiter + junit-jupiter-api + 5.11.3 + test + junit junit @@ -122,6 +128,13 @@ 1.3 test + + + org.mockito + mockito-junit-jupiter + 3.12.4 + test + diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/MultiLangDaemon.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/MultiLangDaemon.java index 4588b246..94bdfd10 100644 --- a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/MultiLangDaemon.java +++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/MultiLangDaemon.java @@ -141,7 +141,7 @@ public class MultiLangDaemon { } } - String propertiesFile(final MultiLangDaemonArguments arguments) { + String validateAndGetPropertiesFileName(final MultiLangDaemonArguments arguments) { String propertiesFile = ""; if (CollectionUtils.isNotEmpty(arguments.parameters)) { @@ -216,9 +216,9 @@ public class MultiLangDaemon { MultiLangDaemonArguments arguments = new MultiLangDaemonArguments(); JCommander jCommander = daemon.buildJCommanderAndParseArgs(arguments, args); try { - String propertiesFile = daemon.propertiesFile(arguments); + String propertiesFileName = daemon.validateAndGetPropertiesFileName(arguments); daemon.configureLogging(arguments.logConfiguration); - MultiLangDaemonConfig config = daemon.buildMultiLangDaemonConfig(propertiesFile); + MultiLangDaemonConfig config = daemon.buildMultiLangDaemonConfig(propertiesFileName); Scheduler scheduler = daemon.buildScheduler(config); MultiLangRunner runner = new MultiLangRunner(scheduler); diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/CoordinatorStateConfigBean.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/CoordinatorStateConfigBean.java new file mode 100644 index 00000000..eaf93f49 --- /dev/null +++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/CoordinatorStateConfigBean.java @@ -0,0 +1,56 @@ +/* + * Copyright 2024 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package software.amazon.kinesis.multilang.config; + +import lombok.Getter; +import lombok.Setter; +import software.amazon.awssdk.services.dynamodb.model.BillingMode; +import software.amazon.kinesis.coordinator.CoordinatorConfig.CoordinatorStateTableConfig; + +@Getter +@Setter +public class CoordinatorStateConfigBean { + + interface CoordinatorStateConfigBeanDelegate { + String getCoordinatorStateTableName(); + + void setCoordinatorStateTableName(String value); + + BillingMode getCoordinatorStateBillingMode(); + + void setCoordinatorStateBillingMode(BillingMode value); + + long getCoordinatorStateReadCapacity(); + + void setCoordinatorStateReadCapacity(long value); + + long getCoordinatorStateWriteCapacity(); + + void setCoordinatorStateWriteCapacity(long value); + } + + @ConfigurationSettable(configurationClass = CoordinatorStateTableConfig.class, methodName = "tableName") + private String coordinatorStateTableName; + + @ConfigurationSettable(configurationClass = CoordinatorStateTableConfig.class, methodName = "billingMode") + private BillingMode coordinatorStateBillingMode; + + @ConfigurationSettable(configurationClass = CoordinatorStateTableConfig.class, methodName = "readCapacity") + private long coordinatorStateReadCapacity; + + @ConfigurationSettable(configurationClass = CoordinatorStateTableConfig.class, methodName = "writeCapacity") + private long coordinatorStateWriteCapacity; +} diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/GracefulLeaseHandoffConfigBean.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/GracefulLeaseHandoffConfigBean.java new file mode 100644 index 00000000..97327962 --- /dev/null +++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/GracefulLeaseHandoffConfigBean.java @@ -0,0 +1,41 @@ +/* + * Copyright 2024 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package software.amazon.kinesis.multilang.config; + +import lombok.Getter; +import lombok.Setter; +import software.amazon.kinesis.leases.LeaseManagementConfig; + +@Getter +@Setter +public class GracefulLeaseHandoffConfigBean { + + interface GracefulLeaseHandoffConfigBeanDelegate { + Long getGracefulLeaseHandoffTimeoutMillis(); + + void setGracefulLeaseHandoffTimeoutMillis(Long value); + + Boolean getIsGracefulLeaseHandoffEnabled(); + + void setIsGracefulLeaseHandoffEnabled(Boolean value); + } + + @ConfigurationSettable(configurationClass = LeaseManagementConfig.GracefulLeaseHandoffConfig.class) + private Long gracefulLeaseHandoffTimeoutMillis; + + @ConfigurationSettable(configurationClass = LeaseManagementConfig.GracefulLeaseHandoffConfig.class) + private Boolean isGracefulLeaseHandoffEnabled; +} diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java index 3336be88..5c7a11b2 100644 --- a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java +++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java @@ -17,6 +17,7 @@ package software.amazon.kinesis.multilang.config; import java.lang.reflect.InvocationTargetException; import java.net.URI; +import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.Date; @@ -41,6 +42,7 @@ import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; +import software.amazon.awssdk.services.dynamodb.model.BillingMode; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder; import software.amazon.kinesis.checkpoint.CheckpointConfig; @@ -55,6 +57,7 @@ import software.amazon.kinesis.leases.ShardPrioritization; import software.amazon.kinesis.lifecycle.LifecycleConfig; import software.amazon.kinesis.metrics.MetricsConfig; import software.amazon.kinesis.metrics.MetricsLevel; +import software.amazon.kinesis.multilang.config.converter.DurationConverter; import software.amazon.kinesis.multilang.config.credentials.V2CredentialWrapper; import software.amazon.kinesis.processor.ProcessorConfig; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; @@ -156,6 +159,9 @@ public class MultiLangDaemonConfiguration { @ConfigurationSettable(configurationClass = CoordinatorConfig.class) private long schedulerInitializationBackoffTimeMillis; + @ConfigurationSettable(configurationClass = CoordinatorConfig.class) + private CoordinatorConfig.ClientVersionConfig clientVersionConfig; + @ConfigurationSettable(configurationClass = LifecycleConfig.class) private long taskBackoffTimeMillis; @@ -189,6 +195,20 @@ public class MultiLangDaemonConfiguration { @Delegate(types = PollingConfigBean.PollingConfigBeanDelegate.class) private final PollingConfigBean pollingConfig = new PollingConfigBean(); + @Delegate(types = GracefulLeaseHandoffConfigBean.GracefulLeaseHandoffConfigBeanDelegate.class) + private final GracefulLeaseHandoffConfigBean gracefulLeaseHandoffConfigBean = new GracefulLeaseHandoffConfigBean(); + + @Delegate( + types = WorkerUtilizationAwareAssignmentConfigBean.WorkerUtilizationAwareAssignmentConfigBeanDelegate.class) + private final WorkerUtilizationAwareAssignmentConfigBean workerUtilizationAwareAssignmentConfigBean = + new WorkerUtilizationAwareAssignmentConfigBean(); + + @Delegate(types = WorkerMetricsTableConfigBean.WorkerMetricsTableConfigBeanDelegate.class) + private final WorkerMetricsTableConfigBean workerMetricsTableConfigBean = new WorkerMetricsTableConfigBean(); + + @Delegate(types = CoordinatorStateConfigBean.CoordinatorStateConfigBeanDelegate.class) + private final CoordinatorStateConfigBean coordinatorStateConfigBean = new CoordinatorStateConfigBean(); + private boolean validateSequenceNumberBeforeCheckpointing; private long shutdownGraceMillis; @@ -252,6 +272,25 @@ public class MultiLangDaemonConfiguration { }, InitialPositionInStream.class); + convertUtilsBean.register( + new Converter() { + @Override + public T convert(Class type, Object value) { + return type.cast(CoordinatorConfig.ClientVersionConfig.valueOf( + value.toString().toUpperCase())); + } + }, + CoordinatorConfig.ClientVersionConfig.class); + + convertUtilsBean.register( + new Converter() { + @Override + public T convert(Class type, Object value) { + return type.cast(BillingMode.valueOf(value.toString().toUpperCase())); + } + }, + BillingMode.class); + convertUtilsBean.register( new Converter() { @Override @@ -279,6 +318,8 @@ public class MultiLangDaemonConfiguration { }, Region.class); + convertUtilsBean.register(new DurationConverter(), Duration.class); + ArrayConverter arrayConverter = new ArrayConverter(String[].class, new StringConverter()); arrayConverter.setDelimiter(','); convertUtilsBean.register(arrayConverter, String[].class); @@ -370,6 +411,22 @@ public class MultiLangDaemonConfiguration { retrievalMode.builder(this).build(configsBuilder.kinesisClient(), this)); } + private void handleCoordinatorConfig(CoordinatorConfig coordinatorConfig) { + ConfigurationSettableUtils.resolveFields( + this.coordinatorStateConfigBean, coordinatorConfig.coordinatorStateConfig()); + } + + private void handleLeaseManagementConfig(LeaseManagementConfig leaseManagementConfig) { + ConfigurationSettableUtils.resolveFields( + this.gracefulLeaseHandoffConfigBean, leaseManagementConfig.gracefulLeaseHandoffConfig()); + ConfigurationSettableUtils.resolveFields( + this.workerUtilizationAwareAssignmentConfigBean, + leaseManagementConfig.workerUtilizationAwareAssignmentConfig()); + ConfigurationSettableUtils.resolveFields( + this.workerMetricsTableConfigBean, + leaseManagementConfig.workerUtilizationAwareAssignmentConfig().workerMetricsTableConfig()); + } + private Object adjustKinesisHttpConfiguration(Object builderObj) { if (builderObj instanceof KinesisAsyncClientBuilder) { KinesisAsyncClientBuilder builder = (KinesisAsyncClientBuilder) builderObj; @@ -448,6 +505,8 @@ public class MultiLangDaemonConfiguration { processorConfig, retrievalConfig); + handleCoordinatorConfig(coordinatorConfig); + handleLeaseManagementConfig(leaseManagementConfig); handleRetrievalConfig(retrievalConfig, configsBuilder); resolveFields(configObjects, null, new HashSet<>(Arrays.asList(ConfigsBuilder.class, PollingConfig.class))); diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/WorkerMetricsTableConfigBean.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/WorkerMetricsTableConfigBean.java new file mode 100644 index 00000000..5cb9a6ec --- /dev/null +++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/WorkerMetricsTableConfigBean.java @@ -0,0 +1,56 @@ +/* + * Copyright 2024 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package software.amazon.kinesis.multilang.config; + +import lombok.Getter; +import lombok.Setter; +import software.amazon.awssdk.services.dynamodb.model.BillingMode; +import software.amazon.kinesis.leases.LeaseManagementConfig.WorkerMetricsTableConfig; + +@Getter +@Setter +public class WorkerMetricsTableConfigBean { + + interface WorkerMetricsTableConfigBeanDelegate { + String getWorkerMetricsTableName(); + + void setWorkerMetricsTableName(String value); + + BillingMode getWorkerMetricsBillingMode(); + + void setWorkerMetricsBillingMode(BillingMode value); + + long getWorkerMetricsReadCapacity(); + + void setWorkerMetricsReadCapacity(long value); + + long getWorkerMetricsWriteCapacity(); + + void setWorkerMetricsWriteCapacity(long value); + } + + @ConfigurationSettable(configurationClass = WorkerMetricsTableConfig.class, methodName = "tableName") + private String workerMetricsTableName; + + @ConfigurationSettable(configurationClass = WorkerMetricsTableConfig.class, methodName = "billingMode") + private BillingMode workerMetricsBillingMode; + + @ConfigurationSettable(configurationClass = WorkerMetricsTableConfig.class, methodName = "readCapacity") + private long workerMetricsReadCapacity; + + @ConfigurationSettable(configurationClass = WorkerMetricsTableConfig.class, methodName = "writeCapacity") + private long workerMetricsWriteCapacity; +} diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/WorkerUtilizationAwareAssignmentConfigBean.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/WorkerUtilizationAwareAssignmentConfigBean.java new file mode 100644 index 00000000..fc335283 --- /dev/null +++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/WorkerUtilizationAwareAssignmentConfigBean.java @@ -0,0 +1,106 @@ +/* + * Copyright 2024 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package software.amazon.kinesis.multilang.config; + +import java.time.Duration; + +import lombok.Getter; +import lombok.Setter; +import software.amazon.kinesis.leases.LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig; + +@Getter +@Setter +public class WorkerUtilizationAwareAssignmentConfigBean { + + interface WorkerUtilizationAwareAssignmentConfigBeanDelegate { + long getInMemoryWorkerMetricsCaptureFrequencyMillis(); + + void setInMemoryWorkerMetricsCaptureFrequencyMillis(long value); + + long getWorkerMetricsReporterFreqInMillis(); + + void setWorkerMetricsReporterFreqInMillis(long value); + + int getNoOfPersistedMetricsPerWorkerMetrics(); + + void setNoOfPersistedMetricsPerWorkerMetrics(int value); + + Boolean getDisableWorkerMetrics(); + + void setDisableWorkerMetrics(Boolean value); + + double getMaxThroughputPerHostKBps(); + + void setMaxThroughputPerHostKBps(double value); + + int getDampeningPercentage(); + + void setDampeningPercentage(int value); + + int getReBalanceThresholdPercentage(); + + void setReBalanceThresholdPercentage(int value); + + Boolean getAllowThroughputOvershoot(); + + void setAllowThroughputOvershoot(Boolean value); + + int getVarianceBalancingFrequency(); + + void setVarianceBalancingFrequency(int value); + + double getWorkerMetricsEMAAlpha(); + + void setWorkerMetricsEMAAlpha(double value); + + void setStaleWorkerMetricsEntryCleanupDuration(Duration value); + + Duration getStaleWorkerMetricsEntryCleanupDuration(); + } + + @ConfigurationSettable(configurationClass = WorkerUtilizationAwareAssignmentConfig.class) + private long inMemoryWorkerMetricsCaptureFrequencyMillis; + + @ConfigurationSettable(configurationClass = WorkerUtilizationAwareAssignmentConfig.class) + private long workerMetricsReporterFreqInMillis; + + @ConfigurationSettable(configurationClass = WorkerUtilizationAwareAssignmentConfig.class) + private int noOfPersistedMetricsPerWorkerMetrics; + + @ConfigurationSettable(configurationClass = WorkerUtilizationAwareAssignmentConfig.class) + private Boolean disableWorkerMetrics; + + @ConfigurationSettable(configurationClass = WorkerUtilizationAwareAssignmentConfig.class) + private double maxThroughputPerHostKBps; + + @ConfigurationSettable(configurationClass = WorkerUtilizationAwareAssignmentConfig.class) + private int dampeningPercentage; + + @ConfigurationSettable(configurationClass = WorkerUtilizationAwareAssignmentConfig.class) + private int reBalanceThresholdPercentage; + + @ConfigurationSettable(configurationClass = WorkerUtilizationAwareAssignmentConfig.class) + private Boolean allowThroughputOvershoot; + + @ConfigurationSettable(configurationClass = WorkerUtilizationAwareAssignmentConfig.class) + private int varianceBalancingFrequency; + + @ConfigurationSettable(configurationClass = WorkerUtilizationAwareAssignmentConfig.class) + private double workerMetricsEMAAlpha; + + @ConfigurationSettable(configurationClass = WorkerUtilizationAwareAssignmentConfig.class) + private Duration staleWorkerMetricsEntryCleanupDuration; +} diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/converter/DurationConverter.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/converter/DurationConverter.java new file mode 100644 index 00000000..3c07f1f2 --- /dev/null +++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/converter/DurationConverter.java @@ -0,0 +1,37 @@ +package software.amazon.kinesis.multilang.config.converter; + +import java.time.Duration; + +import org.apache.commons.beanutils.Converter; + +/** + * Converter that converts Duration text representation to a Duration object. + * Refer to {@code Duration.parse} javadocs for the exact text representation. + */ +public class DurationConverter implements Converter { + + @Override + public T convert(Class type, Object value) { + if (value == null) { + return null; + } + + if (type != Duration.class) { + throw new ConversionException("Can only convert to Duration"); + } + + String durationString = value.toString().trim(); + final Duration duration = Duration.parse(durationString); + if (duration.isNegative()) { + throw new ConversionException("Negative values are not permitted for duration: " + durationString); + } + + return type.cast(duration); + } + + public static class ConversionException extends RuntimeException { + public ConversionException(String message) { + super(message); + } + } +} diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MultiLangDaemonTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MultiLangDaemonTest.java index 3e689437..453f81aa 100644 --- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MultiLangDaemonTest.java +++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MultiLangDaemonTest.java @@ -157,7 +157,7 @@ public class MultiLangDaemonTest { MultiLangDaemon.MultiLangDaemonArguments arguments = new MultiLangDaemon.MultiLangDaemonArguments(); - daemon.propertiesFile(arguments); + daemon.validateAndGetPropertiesFileName(arguments); } @Test @@ -166,7 +166,7 @@ public class MultiLangDaemonTest { MultiLangDaemon.MultiLangDaemonArguments arguments = new MultiLangDaemon.MultiLangDaemonArguments(); arguments.parameters = Collections.singletonList(expectedPropertiesFile); - String propertiesFile = daemon.propertiesFile(arguments); + String propertiesFile = daemon.validateAndGetPropertiesFileName(arguments); assertThat(propertiesFile, equalTo(expectedPropertiesFile)); } @@ -180,7 +180,7 @@ public class MultiLangDaemonTest { arguments.parameters = Collections.singletonList(propertiesArgument); arguments.propertiesFile = propertiesOptions; - String propertiesFile = daemon.propertiesFile(arguments); + String propertiesFile = daemon.validateAndGetPropertiesFileName(arguments); assertThat(propertiesFile, equalTo(propertiesOptions)); } @@ -193,7 +193,7 @@ public class MultiLangDaemonTest { MultiLangDaemon.MultiLangDaemonArguments arguments = new MultiLangDaemon.MultiLangDaemonArguments(); arguments.parameters = Arrays.asList("parameter1", "parameter2"); - daemon.propertiesFile(arguments); + daemon.validateAndGetPropertiesFileName(arguments); } @Test diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/ConfigurationSettableUtilsTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/ConfigurationSettableUtilsTest.java index 5e0db340..cee3cad2 100644 --- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/ConfigurationSettableUtilsTest.java +++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/ConfigurationSettableUtilsTest.java @@ -52,6 +52,16 @@ public class ConfigurationSettableUtilsTest { assertThat(actual, equalTo(expected)); } + @Test + public void testBoolean() { + ConfigResult expected = ConfigResult.builder().bool(false).build(); + + ConfigObject configObject = ConfigObject.builder().bool(expected.bool).build(); + ConfigResult actual = resolve(configObject); + + assertThat(actual, equalTo(expected)); + } + @Test public void testHeapValuesSet() { ConfigResult expected = @@ -147,6 +157,9 @@ public class ConfigurationSettableUtilsTest { private Long boxedLong; private ComplexValue complexValue; + @Builder.Default + private Boolean bool = true; + private Optional optionalString; private Optional optionalInteger; private Optional optionalLong; @@ -175,6 +188,10 @@ public class ConfigurationSettableUtilsTest { @ConfigurationSettable(configurationClass = ConfigResult.class) private int rawInt; + @ConfigurationSettable(configurationClass = ConfigResult.class) + @Builder.Default + private Boolean bool = true; + @ConfigurationSettable(configurationClass = ConfigResult.class) private Integer boxedInt; diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfiguratorTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfiguratorTest.java index b0e3b870..9b38bc30 100644 --- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfiguratorTest.java +++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfiguratorTest.java @@ -20,6 +20,7 @@ import java.net.URI; import java.util.Arrays; import java.util.Date; import java.util.HashSet; +import java.util.NoSuchElementException; import java.util.Set; import com.amazonaws.auth.AWSCredentials; @@ -32,7 +33,9 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.runners.MockitoJUnitRunner; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.services.dynamodb.model.BillingMode; import software.amazon.kinesis.common.InitialPositionInStream; +import software.amazon.kinesis.coordinator.CoordinatorConfig; import software.amazon.kinesis.metrics.MetricsLevel; import static org.hamcrest.CoreMatchers.equalTo; @@ -40,6 +43,7 @@ import static org.hamcrest.CoreMatchers.nullValue; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -69,6 +73,8 @@ public class KinesisClientLibConfiguratorTest { assertEquals(config.getWorkerIdentifier(), "123"); assertThat(config.getMaxGetRecordsThreadPool(), nullValue()); assertThat(config.getRetryGetRecordsInSeconds(), nullValue()); + assertNull(config.getGracefulLeaseHandoffTimeoutMillis()); + assertNull(config.getIsGracefulLeaseHandoffEnabled()); } @Test @@ -147,6 +153,151 @@ public class KinesisClientLibConfiguratorTest { } } + @Test + public void testGracefulLeaseHandoffConfig() { + final Long testGracefulLeaseHandoffTimeoutMillis = 12345L; + final boolean testGracefulLeaseHandoffEnabled = true; + + final MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join( + new String[] { + "applicationName = dummyApplicationName", + "streamName = dummyStreamName", + "AWSCredentialsProvider = " + credentialName1 + ", " + credentialName2, + "gracefulLeaseHandoffTimeoutMillis = " + testGracefulLeaseHandoffTimeoutMillis, + "isGracefulLeaseHandoffEnabled = " + testGracefulLeaseHandoffEnabled + }, + '\n')); + + assertEquals(testGracefulLeaseHandoffTimeoutMillis, config.getGracefulLeaseHandoffTimeoutMillis()); + assertEquals(testGracefulLeaseHandoffEnabled, config.getIsGracefulLeaseHandoffEnabled()); + } + + @Test + public void testClientVersionConfig() { + final CoordinatorConfig.ClientVersionConfig testClientVersionConfig = Arrays.stream( + CoordinatorConfig.ClientVersionConfig.values()) + .findAny() + .orElseThrow(NoSuchElementException::new); + + final MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join( + new String[] { + "applicationName = dummyApplicationName", + "streamName = dummyStreamName", + "AWSCredentialsProvider = " + credentialName1 + ", " + credentialName2, + "clientVersionConfig = " + testClientVersionConfig.name() + }, + '\n')); + + assertEquals(testClientVersionConfig, config.getClientVersionConfig()); + } + + @Test + public void testCoordinatorStateConfig() { + final String testCoordinatorStateTableName = "CoordState"; + final BillingMode testCoordinatorStateBillingMode = BillingMode.PAY_PER_REQUEST; + final long testCoordinatorStateReadCapacity = 123; + final long testCoordinatorStateWriteCapacity = 123; + + final MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join( + new String[] { + "applicationName = dummyApplicationName", + "streamName = dummyStreamName", + "AWSCredentialsProvider = " + credentialName1 + ", " + credentialName2, + "coordinatorStateTableName = " + testCoordinatorStateTableName, + "coordinatorStateBillingMode = " + testCoordinatorStateBillingMode.name(), + "coordinatorStateReadCapacity = " + testCoordinatorStateReadCapacity, + "coordinatorStateWriteCapacity = " + testCoordinatorStateWriteCapacity + }, + '\n')); + + assertEquals(testCoordinatorStateTableName, config.getCoordinatorStateTableName()); + assertEquals(testCoordinatorStateBillingMode, config.getCoordinatorStateBillingMode()); + assertEquals(testCoordinatorStateReadCapacity, config.getCoordinatorStateReadCapacity()); + assertEquals(testCoordinatorStateWriteCapacity, config.getCoordinatorStateWriteCapacity()); + } + + @Test + public void testWorkerUtilizationAwareAssignmentConfig() { + final long testInMemoryWorkerMetricsCaptureFrequencyMillis = 123; + final long testWorkerMetricsReporterFreqInMillis = 123; + final long testNoOfPersistedMetricsPerWorkerMetrics = 123; + final Boolean testDisableWorkerMetrics = true; + final double testMaxThroughputPerHostKBps = 123; + final long testDampeningPercentage = 12; + final long testReBalanceThresholdPercentage = 12; + final Boolean testAllowThroughputOvershoot = false; + final long testVarianceBalancingFrequency = 12; + final double testWorkerMetricsEMAAlpha = .123; + + final MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join( + new String[] { + "applicationName = dummyApplicationName", + "streamName = dummyStreamName", + "AWSCredentialsProvider = " + credentialName1 + ", " + credentialName2, + "inMemoryWorkerMetricsCaptureFrequencyMillis = " + testInMemoryWorkerMetricsCaptureFrequencyMillis, + "workerMetricsReporterFreqInMillis = " + testWorkerMetricsReporterFreqInMillis, + "noOfPersistedMetricsPerWorkerMetrics = " + testNoOfPersistedMetricsPerWorkerMetrics, + "disableWorkerMetrics = " + testDisableWorkerMetrics, + "maxThroughputPerHostKBps = " + testMaxThroughputPerHostKBps, + "dampeningPercentage = " + testDampeningPercentage, + "reBalanceThresholdPercentage = " + testReBalanceThresholdPercentage, + "allowThroughputOvershoot = " + testAllowThroughputOvershoot, + "varianceBalancingFrequency = " + testVarianceBalancingFrequency, + "workerMetricsEMAAlpha = " + testWorkerMetricsEMAAlpha + }, + '\n')); + + assertEquals( + testInMemoryWorkerMetricsCaptureFrequencyMillis, + config.getInMemoryWorkerMetricsCaptureFrequencyMillis()); + assertEquals(testWorkerMetricsReporterFreqInMillis, config.getWorkerMetricsReporterFreqInMillis()); + assertEquals(testNoOfPersistedMetricsPerWorkerMetrics, config.getNoOfPersistedMetricsPerWorkerMetrics()); + assertEquals(testDisableWorkerMetrics, config.getDisableWorkerMetrics()); + assertEquals(testMaxThroughputPerHostKBps, config.getMaxThroughputPerHostKBps(), 0.0001); + assertEquals(testDampeningPercentage, config.getDampeningPercentage()); + assertEquals(testReBalanceThresholdPercentage, config.getReBalanceThresholdPercentage()); + assertEquals(testAllowThroughputOvershoot, config.getAllowThroughputOvershoot()); + assertEquals(testVarianceBalancingFrequency, config.getVarianceBalancingFrequency()); + assertEquals(testWorkerMetricsEMAAlpha, config.getWorkerMetricsEMAAlpha(), 0.0001); + } + + @Test + public void testWorkerMetricsConfig() { + final String testWorkerMetricsTableName = "CoordState"; + final BillingMode testWorkerMetricsBillingMode = BillingMode.PROVISIONED; + final long testWorkerMetricsReadCapacity = 123; + final long testWorkerMetricsWriteCapacity = 123; + + final MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join( + new String[] { + "applicationName = dummyApplicationName", + "streamName = dummyStreamName", + "AWSCredentialsProvider = " + credentialName1 + ", " + credentialName2, + "workerMetricsTableName = " + testWorkerMetricsTableName, + "workerMetricsBillingMode = " + testWorkerMetricsBillingMode.name(), + "workerMetricsReadCapacity = " + testWorkerMetricsReadCapacity, + "workerMetricsWriteCapacity = " + testWorkerMetricsWriteCapacity + }, + '\n')); + + assertEquals(testWorkerMetricsTableName, config.getWorkerMetricsTableName()); + assertEquals(testWorkerMetricsBillingMode, config.getWorkerMetricsBillingMode()); + assertEquals(testWorkerMetricsReadCapacity, config.getWorkerMetricsReadCapacity()); + assertEquals(testWorkerMetricsWriteCapacity, config.getWorkerMetricsWriteCapacity()); + } + + @Test(expected = IllegalArgumentException.class) + public void testInvalidClientVersionConfig() { + getConfiguration(StringUtils.join( + new String[] { + "applicationName = dummyApplicationName", + "streamName = dummyStreamName", + "AWSCredentialsProvider = " + credentialName1 + ", " + credentialName2, + "clientVersionConfig = " + "invalid_client_version_config" + }, + '\n')); + } + @Test public void testWithUnsupportedClientConfigurationVariables() { MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join( diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfigurationTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfigurationTest.java index 1c45eb6e..c451ef81 100644 --- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfigurationTest.java +++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfigurationTest.java @@ -15,6 +15,9 @@ package software.amazon.kinesis.multilang.config; +import java.util.Arrays; +import java.util.NoSuchElementException; + import org.apache.commons.beanutils.BeanUtilsBean; import org.apache.commons.beanutils.ConvertUtilsBean; import org.junit.After; @@ -24,8 +27,16 @@ import org.junit.Test; import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.runners.MockitoJUnitRunner; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; +import software.amazon.awssdk.services.dynamodb.model.BillingMode; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.kinesis.common.ConfigsBuilder; +import software.amazon.kinesis.coordinator.CoordinatorConfig; +import software.amazon.kinesis.leases.LeaseManagementConfig; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; import software.amazon.kinesis.retrieval.fanout.FanOutConfig; import software.amazon.kinesis.retrieval.polling.PollingConfig; @@ -34,6 +45,7 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -41,6 +53,8 @@ import static org.junit.Assert.assertTrue; public class MultiLangDaemonConfigurationTest { private static final String AWS_REGION_PROPERTY_NAME = "aws.region"; + private static final String DUMMY_APPLICATION_NAME = "dummyApplicationName"; + private static final String DUMMY_STREAM_NAME = "dummyStreamName"; private BeanUtilsBean utilsBean; private ConvertUtilsBean convertUtilsBean; @@ -71,8 +85,8 @@ public class MultiLangDaemonConfigurationTest { public MultiLangDaemonConfiguration baseConfiguration() { MultiLangDaemonConfiguration configuration = new MultiLangDaemonConfiguration(utilsBean, convertUtilsBean); - configuration.setApplicationName("Test"); - configuration.setStreamName("Test"); + configuration.setApplicationName(DUMMY_APPLICATION_NAME); + configuration.setStreamName(DUMMY_STREAM_NAME); configuration.getKinesisCredentialsProvider().set("class", DefaultCredentialsProvider.class.getName()); return configuration; @@ -111,6 +125,197 @@ public class MultiLangDaemonConfigurationTest { assertTrue(resolvedConfiguration.leaseManagementConfig.leaseTableDeletionProtectionEnabled()); } + @Test + public void testGracefulLeaseHandoffConfig() { + final LeaseManagementConfig.GracefulLeaseHandoffConfig defaultGracefulLeaseHandoffConfig = + getTestConfigsBuilder().leaseManagementConfig().gracefulLeaseHandoffConfig(); + + final long testGracefulLeaseHandoffTimeoutMillis = + defaultGracefulLeaseHandoffConfig.gracefulLeaseHandoffTimeoutMillis() + 12345; + final boolean testGracefulLeaseHandoffEnabled = + !defaultGracefulLeaseHandoffConfig.isGracefulLeaseHandoffEnabled(); + + final MultiLangDaemonConfiguration configuration = baseConfiguration(); + configuration.setGracefulLeaseHandoffTimeoutMillis(testGracefulLeaseHandoffTimeoutMillis); + configuration.setIsGracefulLeaseHandoffEnabled(testGracefulLeaseHandoffEnabled); + + final MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration = + configuration.resolvedConfiguration(shardRecordProcessorFactory); + + final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig = + resolvedConfiguration.leaseManagementConfig.gracefulLeaseHandoffConfig(); + + assertEquals( + testGracefulLeaseHandoffTimeoutMillis, gracefulLeaseHandoffConfig.gracefulLeaseHandoffTimeoutMillis()); + assertEquals(testGracefulLeaseHandoffEnabled, gracefulLeaseHandoffConfig.isGracefulLeaseHandoffEnabled()); + } + + @Test + public void testGracefulLeaseHandoffUsesDefaults() { + final MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration = + baseConfiguration().resolvedConfiguration(shardRecordProcessorFactory); + + final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig = + resolvedConfiguration.leaseManagementConfig.gracefulLeaseHandoffConfig(); + + final LeaseManagementConfig.GracefulLeaseHandoffConfig defaultGracefulLeaseHandoffConfig = + getTestConfigsBuilder().leaseManagementConfig().gracefulLeaseHandoffConfig(); + + assertEquals(defaultGracefulLeaseHandoffConfig, gracefulLeaseHandoffConfig); + } + + @Test + public void testWorkerUtilizationAwareAssignmentConfig() { + MultiLangDaemonConfiguration configuration = baseConfiguration(); + + configuration.setInMemoryWorkerMetricsCaptureFrequencyMillis(123); + configuration.setWorkerMetricsReporterFreqInMillis(123); + configuration.setNoOfPersistedMetricsPerWorkerMetrics(123); + configuration.setDisableWorkerMetrics(true); + configuration.setMaxThroughputPerHostKBps(.123); + configuration.setDampeningPercentage(12); + configuration.setReBalanceThresholdPercentage(12); + configuration.setAllowThroughputOvershoot(false); + configuration.setVarianceBalancingFrequency(12); + configuration.setWorkerMetricsEMAAlpha(.123); + + MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration = + configuration.resolvedConfiguration(shardRecordProcessorFactory); + LeaseManagementConfig leaseManagementConfig = resolvedConfiguration.leaseManagementConfig; + LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig config = + leaseManagementConfig.workerUtilizationAwareAssignmentConfig(); + + assertEquals(config.inMemoryWorkerMetricsCaptureFrequencyMillis(), 123); + assertEquals(config.workerMetricsReporterFreqInMillis(), 123); + assertEquals(config.noOfPersistedMetricsPerWorkerMetrics(), 123); + assertTrue(config.disableWorkerMetrics()); + assertEquals(config.maxThroughputPerHostKBps(), .123, .25); + assertEquals(config.dampeningPercentage(), 12); + assertEquals(config.reBalanceThresholdPercentage(), 12); + assertFalse(config.allowThroughputOvershoot()); + assertEquals(config.varianceBalancingFrequency(), 12); + assertEquals(config.workerMetricsEMAAlpha(), .123, .25); + } + + @Test + public void testWorkerUtilizationAwareAssignmentConfigUsesDefaults() { + final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig defaultWorkerUtilAwareAssignmentConfig = + getTestConfigsBuilder().leaseManagementConfig().workerUtilizationAwareAssignmentConfig(); + + final MultiLangDaemonConfiguration configuration = baseConfiguration(); + configuration.setVarianceBalancingFrequency( + defaultWorkerUtilAwareAssignmentConfig.varianceBalancingFrequency() + 12345); + + final MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration = + configuration.resolvedConfiguration(shardRecordProcessorFactory); + + final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig resolvedWorkerUtilAwareAssignmentConfig = + resolvedConfiguration.leaseManagementConfig.workerUtilizationAwareAssignmentConfig(); + + assertNotEquals(defaultWorkerUtilAwareAssignmentConfig, resolvedWorkerUtilAwareAssignmentConfig); + + // apart from the single updated configuration, all other config values should be equal to the default + resolvedWorkerUtilAwareAssignmentConfig.varianceBalancingFrequency( + defaultWorkerUtilAwareAssignmentConfig.varianceBalancingFrequency()); + assertEquals(defaultWorkerUtilAwareAssignmentConfig, resolvedWorkerUtilAwareAssignmentConfig); + } + + @Test + public void testWorkerMetricsTableConfigBean() { + final BillingMode testWorkerMetricsTableBillingMode = BillingMode.PROVISIONED; + + MultiLangDaemonConfiguration configuration = baseConfiguration(); + + configuration.setWorkerMetricsTableName("testTable"); + configuration.setWorkerMetricsBillingMode(testWorkerMetricsTableBillingMode); + configuration.setWorkerMetricsReadCapacity(123); + configuration.setWorkerMetricsWriteCapacity(123); + + MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration = + configuration.resolvedConfiguration(shardRecordProcessorFactory); + LeaseManagementConfig leaseManagementConfig = resolvedConfiguration.leaseManagementConfig; + LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig workerUtilizationConfig = + leaseManagementConfig.workerUtilizationAwareAssignmentConfig(); + LeaseManagementConfig.WorkerMetricsTableConfig workerMetricsConfig = + workerUtilizationConfig.workerMetricsTableConfig(); + + assertEquals(workerMetricsConfig.tableName(), "testTable"); + assertEquals(workerMetricsConfig.billingMode(), testWorkerMetricsTableBillingMode); + assertEquals(workerMetricsConfig.readCapacity(), 123); + assertEquals(workerMetricsConfig.writeCapacity(), 123); + } + + @Test + public void testWorkerMetricsTableConfigUsesDefaults() { + final LeaseManagementConfig.WorkerMetricsTableConfig defaultWorkerMetricsTableConfig = getTestConfigsBuilder() + .leaseManagementConfig() + .workerUtilizationAwareAssignmentConfig() + .workerMetricsTableConfig(); + + final MultiLangDaemonConfiguration configuration = baseConfiguration(); + configuration.setWorkerMetricsBillingMode(Arrays.stream(BillingMode.values()) + .filter(billingMode -> billingMode != defaultWorkerMetricsTableConfig.billingMode()) + .findFirst() + .orElseThrow(NoSuchElementException::new)); + + final MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration = + configuration.resolvedConfiguration(shardRecordProcessorFactory); + + final LeaseManagementConfig.WorkerMetricsTableConfig resolvedWorkerMetricsTableConfig = resolvedConfiguration + .leaseManagementConfig + .workerUtilizationAwareAssignmentConfig() + .workerMetricsTableConfig(); + + assertNotEquals(defaultWorkerMetricsTableConfig, resolvedWorkerMetricsTableConfig); + + // apart from the single updated configuration, all other config values should be equal to the default + resolvedWorkerMetricsTableConfig.billingMode(defaultWorkerMetricsTableConfig.billingMode()); + assertEquals(defaultWorkerMetricsTableConfig, resolvedWorkerMetricsTableConfig); + } + + @Test + public void testCoordinatorStateTableConfigBean() { + final BillingMode testWorkerMetricsTableBillingMode = BillingMode.PAY_PER_REQUEST; + + MultiLangDaemonConfiguration configuration = baseConfiguration(); + + configuration.setCoordinatorStateTableName("testTable"); + configuration.setCoordinatorStateBillingMode(testWorkerMetricsTableBillingMode); + configuration.setCoordinatorStateReadCapacity(123); + configuration.setCoordinatorStateWriteCapacity(123); + + MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration = + configuration.resolvedConfiguration(shardRecordProcessorFactory); + CoordinatorConfig coordinatorConfig = resolvedConfiguration.getCoordinatorConfig(); + CoordinatorConfig.CoordinatorStateTableConfig coordinatorStateConfig = + coordinatorConfig.coordinatorStateConfig(); + assertEquals(coordinatorStateConfig.tableName(), "testTable"); + assertEquals(coordinatorStateConfig.billingMode(), testWorkerMetricsTableBillingMode); + assertEquals(coordinatorStateConfig.readCapacity(), 123); + assertEquals(coordinatorStateConfig.writeCapacity(), 123); + } + + @Test + public void testCoordinatorStateTableConfigUsesDefaults() { + final CoordinatorConfig.CoordinatorStateTableConfig defaultCoordinatorStateTableConfig = + getTestConfigsBuilder().coordinatorConfig().coordinatorStateConfig(); + + final MultiLangDaemonConfiguration configuration = baseConfiguration(); + configuration.setCoordinatorStateWriteCapacity(defaultCoordinatorStateTableConfig.writeCapacity() + 12345); + + final MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration = + configuration.resolvedConfiguration(shardRecordProcessorFactory); + + final CoordinatorConfig.CoordinatorStateTableConfig resolvedCoordinatorStateTableConfig = + resolvedConfiguration.coordinatorConfig.coordinatorStateConfig(); + + assertNotEquals(defaultCoordinatorStateTableConfig, resolvedCoordinatorStateTableConfig); + + // apart from the single updated configuration, all other config values should be equal to the default + resolvedCoordinatorStateTableConfig.writeCapacity(defaultCoordinatorStateTableConfig.writeCapacity()); + assertEquals(defaultCoordinatorStateTableConfig, resolvedCoordinatorStateTableConfig); + } + @Test public void testSetLeaseTablePitrEnabledToTrue() { MultiLangDaemonConfiguration configuration = baseConfiguration(); @@ -266,4 +471,43 @@ public class MultiLangDaemonConfigurationTest { assertThat(fanOutConfig.consumerArn(), equalTo(consumerArn)); } + + @Test + public void testClientVersionConfig() { + final CoordinatorConfig.ClientVersionConfig testClientVersionConfig = + CoordinatorConfig.ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X; + + final MultiLangDaemonConfiguration configuration = baseConfiguration(); + configuration.setClientVersionConfig(testClientVersionConfig); + + final MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration = + configuration.resolvedConfiguration(shardRecordProcessorFactory); + + final CoordinatorConfig coordinatorConfig = resolvedConfiguration.coordinatorConfig; + + assertEquals(testClientVersionConfig, coordinatorConfig.clientVersionConfig()); + } + + @Test + public void testClientVersionConfigUsesDefault() { + final MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration = + baseConfiguration().resolvedConfiguration(shardRecordProcessorFactory); + + final CoordinatorConfig coordinatorConfig = resolvedConfiguration.coordinatorConfig; + + assertEquals( + getTestConfigsBuilder().coordinatorConfig().clientVersionConfig(), + coordinatorConfig.clientVersionConfig()); + } + + private ConfigsBuilder getTestConfigsBuilder() { + return new ConfigsBuilder( + DUMMY_STREAM_NAME, + DUMMY_APPLICATION_NAME, + Mockito.mock(KinesisAsyncClient.class), + Mockito.mock(DynamoDbAsyncClient.class), + Mockito.mock(CloudWatchAsyncClient.class), + "dummyWorkerIdentifier", + shardRecordProcessorFactory); + } } diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/PropertiesMappingE2ETest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/PropertiesMappingE2ETest.java new file mode 100644 index 00000000..cf52a5ff --- /dev/null +++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/PropertiesMappingE2ETest.java @@ -0,0 +1,249 @@ +package software.amazon.kinesis.multilang.config; + +import java.io.IOException; +import java.time.Duration; + +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.services.dynamodb.model.BillingMode; +import software.amazon.kinesis.coordinator.CoordinatorConfig.ClientVersionConfig; +import software.amazon.kinesis.multilang.MultiLangDaemonConfig; +import software.amazon.kinesis.multilang.config.MultiLangDaemonConfiguration.ResolvedConfiguration; +import software.amazon.kinesis.processor.ShardRecordProcessor; +import software.amazon.kinesis.processor.ShardRecordProcessorFactory; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class PropertiesMappingE2ETest { + private static final String PROPERTIES_FILE = "multilang.properties"; + private static final String PROPERTIES_FILE_V3 = "multilangv3.properties"; + + @Test + public void testKclV3PropertiesMapping() throws IOException { + final MultiLangDaemonConfig config = new MultiLangDaemonConfig(PROPERTIES_FILE); + + final ResolvedConfiguration kclV3Config = + config.getMultiLangDaemonConfiguration().resolvedConfiguration(new TestRecordProcessorFactory()); + + assertEquals( + ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X, + kclV3Config.coordinatorConfig.clientVersionConfig()); + + assertEquals( + "MultiLangTest-CoordinatorState-CustomName", + kclV3Config.coordinatorConfig.coordinatorStateConfig().tableName()); + assertEquals( + BillingMode.PROVISIONED, + kclV3Config.coordinatorConfig.coordinatorStateConfig().billingMode()); + assertEquals( + 1000, kclV3Config.coordinatorConfig.coordinatorStateConfig().readCapacity()); + assertEquals(500, kclV3Config.coordinatorConfig.coordinatorStateConfig().writeCapacity()); + + assertEquals( + 10000L, + kclV3Config.leaseManagementConfig.gracefulLeaseHandoffConfig().gracefulLeaseHandoffTimeoutMillis()); + assertFalse( + kclV3Config.leaseManagementConfig.gracefulLeaseHandoffConfig().isGracefulLeaseHandoffEnabled()); + + assertEquals( + 5000L, + kclV3Config + .leaseManagementConfig + .workerUtilizationAwareAssignmentConfig() + .inMemoryWorkerMetricsCaptureFrequencyMillis()); + assertEquals( + 60000L, + kclV3Config + .leaseManagementConfig + .workerUtilizationAwareAssignmentConfig() + .workerMetricsReporterFreqInMillis()); + assertEquals( + 50, + kclV3Config + .leaseManagementConfig + .workerUtilizationAwareAssignmentConfig() + .noOfPersistedMetricsPerWorkerMetrics()); + assertTrue(kclV3Config + .leaseManagementConfig + .workerUtilizationAwareAssignmentConfig() + .disableWorkerMetrics()); + assertEquals( + 10000, + kclV3Config + .leaseManagementConfig + .workerUtilizationAwareAssignmentConfig() + .maxThroughputPerHostKBps()); + assertEquals( + 90, + kclV3Config + .leaseManagementConfig + .workerUtilizationAwareAssignmentConfig() + .dampeningPercentage()); + assertEquals( + 5, + kclV3Config + .leaseManagementConfig + .workerUtilizationAwareAssignmentConfig() + .reBalanceThresholdPercentage()); + assertFalse(kclV3Config + .leaseManagementConfig + .workerUtilizationAwareAssignmentConfig() + .allowThroughputOvershoot()); + assertEquals( + Duration.ofHours(12), + kclV3Config + .leaseManagementConfig + .workerUtilizationAwareAssignmentConfig() + .staleWorkerMetricsEntryCleanupDuration()); + assertEquals( + 5, + kclV3Config + .leaseManagementConfig + .workerUtilizationAwareAssignmentConfig() + .varianceBalancingFrequency()); + assertEquals( + 0.18D, + kclV3Config + .leaseManagementConfig + .workerUtilizationAwareAssignmentConfig() + .workerMetricsEMAAlpha()); + + assertEquals( + "MultiLangTest-WorkerMetrics-CustomName", + kclV3Config + .leaseManagementConfig + .workerUtilizationAwareAssignmentConfig() + .workerMetricsTableConfig() + .tableName()); + assertEquals( + BillingMode.PROVISIONED, + kclV3Config + .leaseManagementConfig + .workerUtilizationAwareAssignmentConfig() + .workerMetricsTableConfig() + .billingMode()); + assertEquals( + 250, + kclV3Config + .leaseManagementConfig + .workerUtilizationAwareAssignmentConfig() + .workerMetricsTableConfig() + .readCapacity()); + assertEquals( + 90, + kclV3Config + .leaseManagementConfig + .workerUtilizationAwareAssignmentConfig() + .workerMetricsTableConfig() + .writeCapacity()); + } + + @Test + public void testKclV3PropertiesMappingForDefaultValues() throws IOException { + final MultiLangDaemonConfig config = new MultiLangDaemonConfig(PROPERTIES_FILE_V3); + + final ResolvedConfiguration kclV3Config = + config.getMultiLangDaemonConfiguration().resolvedConfiguration(new TestRecordProcessorFactory()); + + assertEquals(ClientVersionConfig.CLIENT_VERSION_CONFIG_3X, kclV3Config.coordinatorConfig.clientVersionConfig()); + + assertEquals( + "MultiLangTest-CoordinatorState", + kclV3Config.coordinatorConfig.coordinatorStateConfig().tableName()); + assertEquals( + BillingMode.PAY_PER_REQUEST, + kclV3Config.coordinatorConfig.coordinatorStateConfig().billingMode()); + + assertEquals( + 30_000L, + kclV3Config.leaseManagementConfig.gracefulLeaseHandoffConfig().gracefulLeaseHandoffTimeoutMillis()); + assertTrue( + kclV3Config.leaseManagementConfig.gracefulLeaseHandoffConfig().isGracefulLeaseHandoffEnabled()); + + assertEquals( + 1000L, + kclV3Config + .leaseManagementConfig + .workerUtilizationAwareAssignmentConfig() + .inMemoryWorkerMetricsCaptureFrequencyMillis()); + assertEquals( + 30000L, + kclV3Config + .leaseManagementConfig + .workerUtilizationAwareAssignmentConfig() + .workerMetricsReporterFreqInMillis()); + assertEquals( + 10, + kclV3Config + .leaseManagementConfig + .workerUtilizationAwareAssignmentConfig() + .noOfPersistedMetricsPerWorkerMetrics()); + assertFalse(kclV3Config + .leaseManagementConfig + .workerUtilizationAwareAssignmentConfig() + .disableWorkerMetrics()); + assertEquals( + Double.MAX_VALUE, + kclV3Config + .leaseManagementConfig + .workerUtilizationAwareAssignmentConfig() + .maxThroughputPerHostKBps()); + assertEquals( + 60, + kclV3Config + .leaseManagementConfig + .workerUtilizationAwareAssignmentConfig() + .dampeningPercentage()); + assertEquals( + 10, + kclV3Config + .leaseManagementConfig + .workerUtilizationAwareAssignmentConfig() + .reBalanceThresholdPercentage()); + assertTrue(kclV3Config + .leaseManagementConfig + .workerUtilizationAwareAssignmentConfig() + .allowThroughputOvershoot()); + assertEquals( + Duration.ofDays(1), + kclV3Config + .leaseManagementConfig + .workerUtilizationAwareAssignmentConfig() + .staleWorkerMetricsEntryCleanupDuration()); + assertEquals( + 3, + kclV3Config + .leaseManagementConfig + .workerUtilizationAwareAssignmentConfig() + .varianceBalancingFrequency()); + assertEquals( + 0.5D, + kclV3Config + .leaseManagementConfig + .workerUtilizationAwareAssignmentConfig() + .workerMetricsEMAAlpha()); + + assertEquals( + "MultiLangTest-WorkerMetricStats", + kclV3Config + .leaseManagementConfig + .workerUtilizationAwareAssignmentConfig() + .workerMetricsTableConfig() + .tableName()); + assertEquals( + BillingMode.PAY_PER_REQUEST, + kclV3Config + .leaseManagementConfig + .workerUtilizationAwareAssignmentConfig() + .workerMetricsTableConfig() + .billingMode()); + } + + private static class TestRecordProcessorFactory implements ShardRecordProcessorFactory { + @Override + public ShardRecordProcessor shardRecordProcessor() { + return null; + } + } +} diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/WorkerUtilizationAwareAssignmentConfigBeanTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/WorkerUtilizationAwareAssignmentConfigBeanTest.java new file mode 100644 index 00000000..71ada01f --- /dev/null +++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/WorkerUtilizationAwareAssignmentConfigBeanTest.java @@ -0,0 +1,68 @@ +/* + * Copyright 2024 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package software.amazon.kinesis.multilang.config; + +import java.util.Optional; + +import org.apache.commons.beanutils.BeanUtilsBean; +import org.apache.commons.beanutils.ConvertUtilsBean; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.kinesis.retrieval.polling.PollingConfig; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertThat; + +@RunWith(MockitoJUnitRunner.class) +public class WorkerUtilizationAwareAssignmentConfigBeanTest { + + @Mock + private KinesisAsyncClient kinesisAsyncClient; + + @Test + public void testAllPropertiesTransit() { + PollingConfigBean pollingConfigBean = new PollingConfigBean(); + pollingConfigBean.setIdleTimeBetweenReadsInMillis(1000); + pollingConfigBean.setMaxGetRecordsThreadPool(20); + pollingConfigBean.setMaxRecords(5000); + pollingConfigBean.setRetryGetRecordsInSeconds(30); + + ConvertUtilsBean convertUtilsBean = new ConvertUtilsBean(); + BeanUtilsBean utilsBean = new BeanUtilsBean(convertUtilsBean); + + MultiLangDaemonConfiguration multiLangDaemonConfiguration = + new MultiLangDaemonConfiguration(utilsBean, convertUtilsBean); + multiLangDaemonConfiguration.setStreamName("test-stream"); + + PollingConfig pollingConfig = pollingConfigBean.build(kinesisAsyncClient, multiLangDaemonConfiguration); + + assertThat(pollingConfig.kinesisClient(), equalTo(kinesisAsyncClient)); + assertThat(pollingConfig.streamName(), equalTo(multiLangDaemonConfiguration.getStreamName())); + assertThat( + pollingConfig.idleTimeBetweenReadsInMillis(), + equalTo(pollingConfigBean.getIdleTimeBetweenReadsInMillis())); + assertThat( + pollingConfig.maxGetRecordsThreadPool(), + equalTo(Optional.of(pollingConfigBean.getMaxGetRecordsThreadPool()))); + assertThat(pollingConfig.maxRecords(), equalTo(pollingConfigBean.getMaxRecords())); + assertThat( + pollingConfig.retryGetRecordsInSeconds(), + equalTo(Optional.of(pollingConfigBean.getRetryGetRecordsInSeconds()))); + } +} diff --git a/amazon-kinesis-client-multilang/src/test/resources/multilang.properties b/amazon-kinesis-client-multilang/src/test/resources/multilang.properties index 34cb0c1a..3611fa0a 100644 --- a/amazon-kinesis-client-multilang/src/test/resources/multilang.properties +++ b/amazon-kinesis-client-multilang/src/test/resources/multilang.properties @@ -91,3 +91,73 @@ validateSequenceNumberBeforeCheckpointing = true # active threads set to the provided value. If a non-positive integer or no # value is provided a CachedThreadPool is used. maxActiveThreads = -1 + +################### KclV3 configurations ################### +# Coordinator config +# Version the KCL needs to operate in. For more details check the KCLv3 migration +# documentation. Default is CLIENT_VERSION_CONFIG_3X +clientVersionConfig = CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2x +# TODO: include table deletion protection and pitr config once its added +# Configurations to control how the CoordinatorState DDB table is created +# Default name is applicationName-CoordinatorState in PAY_PER_REQUEST +coordinatorStateTableName = MultiLangTest-CoordinatorState-CustomName +coordinatorStateBillingMode = PROVISIONED +coordinatorStateReadCapacity = 1000 +coordinatorStateWriteCapacity = 500 + +# Graceful handoff config - tuning of the shutdown behavior during lease transfers +# default values are 30000 and true respectively +gracefulLeaseHandoffTimeoutMillis = 10000 +isGracefulLeaseHandoffEnabled = false + +# WorkerMetricStats table config - control how the DDB table is created +## Default name is applicationName-WorkerMetricStats in PAY_PER_REQUEST +# TODO: include table deletion protection and pitr config once its added +workerMetricsTableName = MultiLangTest-WorkerMetrics-CustomName +workerMetricsBillingMode = PROVISIONED +workerMetricsReadCapacity = 250 +workerMetricsWriteCapacity = 90 + +# WorkerUtilizationAwareAssignment config - tune the new KCLv3 Lease balancing algorithm +# +# frequency of capturing worker metrics in memory. Default is 1s +inMemoryWorkerMetricsCaptureFrequencyMillis = 5000 +# frequency of reporting worker metric stats to storage. Default is 30s +workerMetricsReporterFreqInMillis = 60000 +# No. of metricStats that are persisted in WorkerMetricStats ddb table, default is 10 +noOfPersistedMetricsPerWorkerMetrics = 50 +# Disable use of worker metrics to balance lease, default is false. +# If it is true, the algorithm balances lease based on worker's processing throughput. +disableWorkerMetrics = true +# Max throughput per host 10 MBps, to limit processing to the given value +# Default is unlimited. +maxThroughputPerHostKBps = 10000 +# Dampen the load that is rebalanced during lease re-balancing, default is 60% +dampeningPercentage = 90 +# Configures the allowed variance range for worker utilization. The upper +# limit is calculated as average * (1 + reBalanceThresholdPercentage/100). +# The lower limit is average * (1 - reBalanceThresholdPercentage/100). If +# any worker's utilization falls outside this range, lease re-balancing is +# triggered. The re-balancing algorithm aims to bring variance within the +# specified range. It also avoids thrashing by ensuring the utilization of +# the worker receiving the load after re-balancing doesn't exceed the fleet +# average. This might cause no re-balancing action even the utilization is +# out of the variance range. The default value is 10, representing +/-10% +# variance from the average value. +reBalanceThresholdPercentage = 5 +# Whether at-least one lease must be taken from a high utilization worker +# during re-balancing when there is no lease assigned to that worker which has +# throughput is less than or equal to the minimum throughput that needs to be +# moved away from that worker to bring the worker back into the allowed variance. +# Default is true. +allowThroughputOvershoot = false +# Lease assignment is performed every failoverTimeMillis but re-balance will +# be attempted only once in 5 times based on the below config. Default is 3. +varianceBalancingFrequency = 5 +# Alpha value used for calculating exponential moving average of worker's metricStats. +workerMetricsEMAAlpha = 0.18 +# Duration after which workerMetricStats entry from WorkerMetricStats table will +# be cleaned up. +# Duration format examples: PT15M (15 mins) PT10H (10 hours) P2D (2 days) +# Refer to Duration.parse javadocs for more details +staleWorkerMetricsEntryCleanupDuration = PT12H diff --git a/amazon-kinesis-client-multilang/src/test/resources/multilangv3.properties b/amazon-kinesis-client-multilang/src/test/resources/multilangv3.properties new file mode 100644 index 00000000..d92c8016 --- /dev/null +++ b/amazon-kinesis-client-multilang/src/test/resources/multilangv3.properties @@ -0,0 +1,167 @@ +# The script that abides by the multi-language protocol. This script will +# be executed by the MultiLangDaemon, which will communicate with this script +# over STDIN and STDOUT according to the multi-language protocol. +executableName = sample_kclpy_app.py + +# The Stream arn: arn:aws:kinesis:::stream/ +# Important: streamArn takes precedence over streamName if both are set +streamArn = arn:aws:kinesis:us-east-5:000000000000:stream/kclpysample + +# The name of an Amazon Kinesis stream to process. +# Important: streamArn takes precedence over streamName if both are set +streamName = kclpysample + +# Used by the KCL as the name of this application. Will be used as the name +# of an Amazon DynamoDB table which will store the lease and checkpoint +# information for workers with this application name +applicationName = MultiLangTest + +# Users can change the credentials provider the KCL will use to retrieve credentials. +# The DefaultAWSCredentialsProviderChain checks several other providers, which is +# described here: +# http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html +AWSCredentialsProvider = DefaultAWSCredentialsProviderChain + +# Appended to the user agent of the KCL. Does not impact the functionality of the +# KCL in any other way. +processingLanguage = python/3.8 + +# Valid options at TRIM_HORIZON or LATEST. +# See http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax +initialPositionInStream = TRIM_HORIZON + +# To specify an initial timestamp from which to start processing records, please specify timestamp value for 'initiatPositionInStreamExtended', +# and uncomment below line with right timestamp value. +# See more from 'Timestamp' under http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax +#initialPositionInStreamExtended = 1636609142 + +# The following properties are also available for configuring the KCL Worker that is created +# by the MultiLangDaemon. + +# The KCL defaults to us-east-1 +regionName = us-east-1 + +# Fail over time in milliseconds. A worker which does not renew it's lease within this time interval +# will be regarded as having problems and it's shards will be assigned to other workers. +# For applications that have a large number of shards, this msy be set to a higher number to reduce +# the number of DynamoDB IOPS required for tracking leases +failoverTimeMillis = 10000 + +# A worker id that uniquely identifies this worker among all workers using the same applicationName +# If this isn't provided a MultiLangDaemon instance will assign a unique workerId to itself. +workerId = "workerId" + +# Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks. +shardSyncIntervalMillis = 60000 + +# Max records to fetch from Kinesis in a single GetRecords call. +maxRecords = 10000 + +# Idle time between record reads in milliseconds. +idleTimeBetweenReadsInMillis = 1000 + +# Enables applications flush/checkpoint (if they have some data "in progress", but don't get new data for while) +callProcessRecordsEvenForEmptyRecordList = false + +# Interval in milliseconds between polling to check for parent shard completion. +# Polling frequently will take up more DynamoDB IOPS (when there are leases for shards waiting on +# completion of parent shards). +parentShardPollIntervalMillis = 10000 + +# Cleanup leases upon shards completion (don't wait until they expire in Kinesis). +# Keeping leases takes some tracking/resources (e.g. they need to be renewed, assigned), so by default we try +# to delete the ones we don't need any longer. +cleanupLeasesUponShardCompletion = true + +# Backoff time in milliseconds for Amazon Kinesis Client Library tasks (in the event of failures). +taskBackoffTimeMillis = 500 + +# Buffer metrics for at most this long before publishing to CloudWatch. +metricsBufferTimeMillis = 10000 + +# Buffer at most this many metrics before publishing to CloudWatch. +metricsMaxQueueSize = 10000 + +# KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls +# to RecordProcessorCheckpointer#checkpoint(String) by default. +validateSequenceNumberBeforeCheckpointing = true + +# The maximum number of active threads for the MultiLangDaemon to permit. +# If a value is provided then a FixedThreadPool is used with the maximum +# active threads set to the provided value. If a non-positive integer or no +# value is provided a CachedThreadPool is used. +maxActiveThreads = -1 + +################### KclV3 configurations ################### +# Coordinator config +clientVersionConfig = CLIENT_VERSION_CONFIG_3x + +## Let all other config be defaults +## TODO: include table deletion protection and pitr config once its added +## Configurations to control how the CoordinatorState DDB table is created +## Default name is applicationName-CoordinatorState in PAY_PER_REQUEST +#coordinatorStateTableName = MultiLangTest-CoordinatorState-CustomName +#coordinatorStateBillingMode = PROVISIONED +#coordinatorStateReadCapacity = 1000 +#coordinatorStateWriteCapacity = 500 +# +## Graceful handoff config - tuning of the shutdown behavior during lease transfers +## default values are 30000 and true respectively +#gracefulLeaseHandoffTimeoutMillis = 10000 +#isGracefulLeaseHandoffEnabled = false +# +## WorkerMetricStats table config - control how the DDB table is created +### Default name is applicationName-WorkerMetricStats in PAY_PER_REQUEST +## TODO: include table deletion protection and pitr config once its added +#workerMetricsTableName = MultiLangTest-WorkerMetrics-CustomName +#workerMetricsBillingMode = PROVISIONED +#workerMetricsReadCapacity = 250 +#workerMetricsWriteCapacity = 90 +# +## WorkerUtilizationAwareAssignment config - tune the new KCLv3 Lease balancing algorithm +## +## frequency of capturing worker metrics in memory. Default is 1s +#inMemoryWorkerMetricsCaptureFrequencyMillis = 5000 +## frequency of reporting worker metric stats to storage. Default is 30s +#workerMetricsReporterFreqInMillis = 60000 +## No. of metricStats that are persisted in WorkerMetricStats ddb table, default is 10. +## This provides historic values that are used to compute the workers current +## utilization using an exponential-moving-average. +#noOfPersistedMetricsPerWorkerMetrics = 50 +## Disable use of worker metrics to balance lease, default is false. +## If it is true, the algorithm balances lease based on worker's processing throughput. +#disableWorkerMetrics = true +## Max throughput per host 10 MBps, to limit processing to the given value +## Default is unlimited. +#maxThroughputPerHostKBps = 10000 +## Dampen the load that is rebalanced during lease re-balancing, default is 60% +#dampeningPercentage = 90 +## Configures the allowed variance range for worker utilization. The upper +## limit is calculated as average * (1 + reBalanceThresholdPercentage/100). +## The lower limit is average * (1 - reBalanceThresholdPercentage/100). If +## any worker's utilization falls outside this range, lease re-balancing is +## triggered. The re-balancing algorithm aims to bring variance within the +## specified range. It also avoids thrashing by ensuring the utilization of +## the worker receiving the load after re-balancing doesn't exceed the fleet +## average. This might cause no re-balancing action even the utilization is +## out of the variance range. The default value is 10, representing +/-10% +## variance from the average value. +#reBalanceThresholdPercentage = 5 +## Whether at-least one lease must be taken from a high utilization worker +## during re-balancing when there is no lease assigned to that worker which has +## throughput is less than or equal to the minimum throughput that needs to be +## moved away from that worker to bring the worker back into the allowed variance. +## Default is true. +#allowThroughputOvershoot = false +## Lease assignment is performed every failoverTimeMillis but re-balance will +## be attempted only once in 5 times based on the below config. Default is 3. +#varianceBalancingFrequency = 5 +## Alpha value used for calculating exponential moving average of worker's metricStats. +## Default is 0.5, a higher alpha value will make re-balancing more sensitive +## to recent metricStats. +#workerMetricsEMAAlpha = 0.18 +## Duration after which workerMetricStats entry from WorkerMetricStats table will +## be cleaned up. Default is 1 day. +## Duration format examples: PT15M (15 mins) PT10H (10 hours) P2D (2 days) +## Refer to Duration.parse javadocs for more details +#staleWorkerMetricsEntryCleanupDuration = PT12H diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorConfig.java index 238f4f37..d646aab9 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorConfig.java @@ -123,7 +123,7 @@ public class CoordinatorConfig { * This version also allows rolling back to the compatible mode from the * auto-toggled 3.x mode. */ - CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2x, + CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X, /** * A new application operating with KCLv3.x will use this value. Also, an application * that has successfully upgraded to 3.x version and no longer needs the ability @@ -131,14 +131,14 @@ public class CoordinatorConfig { * KCL will operate with new algorithms introduced in 3.x which is not compatible * with prior versions. And once in this version, rollback to 2.x is not supported. */ - CLIENT_VERSION_CONFIG_3x, + CLIENT_VERSION_CONFIG_3X, } /** * Client version KCL must operate in, by default it operates in 3.x version which is not * compatible with prior versions. */ - private ClientVersionConfig clientVersionConfig = ClientVersionConfig.CLIENT_VERSION_CONFIG_3x; + private ClientVersionConfig clientVersionConfig = ClientVersionConfig.CLIENT_VERSION_CONFIG_3X; public static class CoordinatorStateTableConfig extends DdbTableConfig { private CoordinatorStateTableConfig(final String applicationName) { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DynamicMigrationComponentsInitializer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DynamicMigrationComponentsInitializer.java index 8d1e78a1..c4aecdda 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DynamicMigrationComponentsInitializer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DynamicMigrationComponentsInitializer.java @@ -159,7 +159,7 @@ public final class DynamicMigrationComponentsInitializer { // always collect metrics so that when we flip to start reporting we will have accurate historical data. log.info("Start collection of WorkerMetricStats"); workerMetricsManager.startManager(); - if (migrationStateMachineStartingClientVersion == ClientVersion.CLIENT_VERSION_3x) { + if (migrationStateMachineStartingClientVersion == ClientVersion.CLIENT_VERSION_3X) { initializeComponentsFor3x(); } else { initializeComponentsForMigration(migrationStateMachineStartingClientVersion); @@ -187,7 +187,7 @@ public final class DynamicMigrationComponentsInitializer { log.info("Initializing for migration to 3x"); dualMode = true; final LeaderDecider initialLeaderDecider; - if (migrationStateMachineStartingClientVersion == ClientVersion.CLIENT_VERSION_3x_WITH_ROLLBACK) { + if (migrationStateMachineStartingClientVersion == ClientVersion.CLIENT_VERSION_3X_WITH_ROLLBACK) { currentAssignmentMode = WORKER_UTILIZATION_AWARE_ASSIGNMENT; initialLeaderDecider = ddbLockBasedLeaderDeciderCreator.get(); } else { @@ -292,8 +292,8 @@ public final class DynamicMigrationComponentsInitializer { /** * Initialize KCL with components and configuration to support upgrade from 2x. This can happen - * at KCL Worker startup when MigrationStateMachine starts in ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2x. - * Or Dynamically during roll-forward from ClientVersion.CLIENT_VERSION_2x. + * at KCL Worker startup when MigrationStateMachine starts in ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2X. + * Or Dynamically during roll-forward from ClientVersion.CLIENT_VERSION_2X. */ public synchronized void initializeClientVersionForUpgradeFrom2x(final ClientVersion fromClientVersion) throws DependencyException { @@ -306,8 +306,8 @@ public final class DynamicMigrationComponentsInitializer { /** * Initialize KCL with components and configuration to run vanilla 3x functionality. This can happen - * at KCL Worker startup when MigrationStateMachine starts in ClientVersion.CLIENT_VERSION_3x, or dynamically - * during a new deployment when existing worker are in ClientVersion.CLIENT_VERSION_3x_WITH_ROLLBACK + * at KCL Worker startup when MigrationStateMachine starts in ClientVersion.CLIENT_VERSION_3X, or dynamically + * during a new deployment when existing worker are in ClientVersion.CLIENT_VERSION_3X_WITH_ROLLBACK */ public synchronized void initializeClientVersionFor3x(final ClientVersion fromClientVersion) throws DependencyException { @@ -322,14 +322,14 @@ public final class DynamicMigrationComponentsInitializer { log.info("Starting LAM"); leaseAssignmentManager.start(); } - // nothing to do when transitioning from CLIENT_VERSION_3x_WITH_ROLLBACK. + // nothing to do when transitioning from CLIENT_VERSION_3X_WITH_ROLLBACK. } /** * Initialize KCL with components and configuration to run 2x compatible functionality * while allowing roll-forward. This can happen at KCL Worker startup when MigrationStateMachine - * starts in ClientVersion.CLIENT_VERSION_2x (after a rollback) - * Or Dynamically during rollback from CLIENT_VERSION_UPGRADE_FROM_2x or CLIENT_VERSION_3x_WITH_ROLLBACK. + * starts in ClientVersion.CLIENT_VERSION_2X (after a rollback) + * Or Dynamically during rollback from CLIENT_VERSION_UPGRADE_FROM_2X or CLIENT_VERSION_3X_WITH_ROLLBACK. */ public synchronized void initializeClientVersionFor2x(final ClientVersion fromClientVersion) { log.info("Initializing KCL components for rollback to 2x from {}", fromClientVersion); @@ -341,7 +341,7 @@ public final class DynamicMigrationComponentsInitializer { // and WorkerMetricStats table } - if (fromClientVersion == ClientVersion.CLIENT_VERSION_3x_WITH_ROLLBACK) { + if (fromClientVersion == ClientVersion.CLIENT_VERSION_3X_WITH_ROLLBACK) { // we are rolling back after flip currentAssignmentMode = DEFAULT_LEASE_COUNT_BASED_ASSIGNMENT; notifyLeaseAssignmentModeChange(); @@ -361,14 +361,14 @@ public final class DynamicMigrationComponentsInitializer { /** * Initialize KCL with components and configuration to run vanilla 3x functionality * while allowing roll-back to 2x functionality. This can happen at KCL Worker startup - * when MigrationStateMachine starts in ClientVersion.CLIENT_VERSION_3x_WITH_ROLLBACK (after the flip) - * Or Dynamically during flip from CLIENT_VERSION_UPGRADE_FROM_2x. + * when MigrationStateMachine starts in ClientVersion.CLIENT_VERSION_3X_WITH_ROLLBACK (after the flip) + * Or Dynamically during flip from CLIENT_VERSION_UPGRADE_FROM_2X. */ public synchronized void initializeClientVersionFor3xWithRollback(final ClientVersion fromClientVersion) throws DependencyException { log.info("Initializing KCL components for 3x with rollback from {}", fromClientVersion); - if (fromClientVersion == ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2x) { + if (fromClientVersion == ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2X) { // dynamic flip currentAssignmentMode = WORKER_UTILIZATION_AWARE_ASSIGNMENT; notifyLeaseAssignmentModeChange(); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/ClientVersion.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/ClientVersion.java index 2ea07965..ccbd9085 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/ClientVersion.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/ClientVersion.java @@ -31,28 +31,28 @@ public enum ClientVersion { * KCL workers will emit WorkerMetricStats and run KCLv2.x algorithms for leader election and lease * assignment. KCL will also monitor for upgrade to KCLv3.x readiness of the worker fleet. */ - CLIENT_VERSION_UPGRADE_FROM_2x, + CLIENT_VERSION_UPGRADE_FROM_2X, /** - * This version is used during rollback from CLIENT_VERSION_UPGRADE_FROM_2x or CLIENT_VERSION_3x_WITH_ROLLBACK, + * This version is used during rollback from CLIENT_VERSION_UPGRADE_FROM_2X or CLIENT_VERSION_3X_WITH_ROLLBACK, * which can only be initiated using a KCL migration tool, when customer wants to revert to KCLv2.x functionality. * In this version, KCL will not emit WorkerMetricStats and run KCLv2.x algorithms for leader election * and lease assignment. In this version, KCL will monitor for roll-forward scenario where - * client version is updated to CLIENT_VERSION_UPGRADE_FROM_2x using the migration tool. + * client version is updated to CLIENT_VERSION_UPGRADE_FROM_2X using the migration tool. */ - CLIENT_VERSION_2x, + CLIENT_VERSION_2X, /** - * When workers are operating in CLIENT_VERSION_UPGRADE_FROM_2x and when worker fleet is determined to be + * When workers are operating in CLIENT_VERSION_UPGRADE_FROM_2X and when worker fleet is determined to be * KCLv3.x ready (when lease table GSI is active and worker-metrics are being emitted by all lease owners) * then the leader will initiate the switch to KCLv3.x algorithms for leader election and lease assignment, * by using this version and persisting it in the {@link MigrationState} that allows all worker hosts * to also flip to KCLv3.x functionality. In this KCL will also monitor for rollback to detect when the - * customer updates version to CLIENT_VERSION_2x using migration tool, so that it instantly flips back - * to CLIENT_VERSION_2x. + * customer updates version to CLIENT_VERSION_2X using migration tool, so that it instantly flips back + * to CLIENT_VERSION_2X. */ - CLIENT_VERSION_3x_WITH_ROLLBACK, + CLIENT_VERSION_3X_WITH_ROLLBACK, /** * A new application starting KCLv3.x or an upgraded application from KCLv2.x after upgrade is successful * can use this version to default all KCLv3.x algorithms without any monitor to rollback. */ - CLIENT_VERSION_3x; + CLIENT_VERSION_3X; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/ClientVersionChangeMonitor.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/ClientVersionChangeMonitor.java index 9db6c6b8..29777fa3 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/ClientVersionChangeMonitor.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/ClientVersionChangeMonitor.java @@ -144,11 +144,11 @@ public class ClientVersionChangeMonitor implements Runnable { final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, METRICS_OPERATION); try { switch (expectedVersion) { - case CLIENT_VERSION_3x_WITH_ROLLBACK: + case CLIENT_VERSION_3X_WITH_ROLLBACK: scope.addData("CurrentState:3xWorker", 1, StandardUnit.COUNT, MetricsLevel.SUMMARY); break; - case CLIENT_VERSION_2x: - case CLIENT_VERSION_UPGRADE_FROM_2x: + case CLIENT_VERSION_2X: + case CLIENT_VERSION_UPGRADE_FROM_2X: scope.addData("CurrentState:2xCompatibleWorker", 1, StandardUnit.COUNT, MetricsLevel.SUMMARY); break; default: diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersion2xState.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersion2xState.java index 7deac9bc..45d29a41 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersion2xState.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersion2xState.java @@ -32,13 +32,13 @@ import software.amazon.kinesis.metrics.MetricsLevel; import software.amazon.kinesis.metrics.MetricsScope; import software.amazon.kinesis.metrics.MetricsUtil; -import static software.amazon.kinesis.coordinator.migration.ClientVersion.CLIENT_VERSION_2x; -import static software.amazon.kinesis.coordinator.migration.ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2x; +import static software.amazon.kinesis.coordinator.migration.ClientVersion.CLIENT_VERSION_2X; +import static software.amazon.kinesis.coordinator.migration.ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2X; import static software.amazon.kinesis.coordinator.migration.MigrationStateMachineImpl.FAULT_METRIC; import static software.amazon.kinesis.coordinator.migration.MigrationStateMachineImpl.METRICS_OPERATION; /** - * State for CLIENT_VERSION_2x. In this state, the only allowed valid transition is + * State for CLIENT_VERSION_2X. In this state, the only allowed valid transition is * the roll-forward scenario which can only be performed using the KCL Migration tool. * So when the state machine enters this state, a monitor is started to detect the * roll-forward scenario. @@ -60,7 +60,7 @@ public class MigrationClientVersion2xState implements MigrationClientVersionStat @Override public ClientVersion clientVersion() { - return CLIENT_VERSION_2x; + return CLIENT_VERSION_2X; } @Override @@ -102,7 +102,7 @@ public class MigrationClientVersion2xState implements MigrationClientVersionStat /** * Callback handler to handle client version changes in MigrationState in DDB. - * @param newState current MigrationState read from DDB where client version is not CLIENT_VERSION_2x + * @param newState current MigrationState read from DDB where client version is not CLIENT_VERSION_2X * @throws InvalidStateException during transition to the next state based on the new ClientVersion * or if the new state in DDB is unexpected. */ @@ -115,18 +115,18 @@ public class MigrationClientVersion2xState implements MigrationClientVersionStat final MetricsScope scope = MetricsUtil.createMetricsWithOperation(initializer.metricsFactory(), METRICS_OPERATION); try { - if (newState.getClientVersion() == CLIENT_VERSION_UPGRADE_FROM_2x) { + if (newState.getClientVersion() == CLIENT_VERSION_UPGRADE_FROM_2X) { log.info( "A roll-forward has been initiated for the application. Transition to {}", - CLIENT_VERSION_UPGRADE_FROM_2x); + CLIENT_VERSION_UPGRADE_FROM_2X); // If this succeeds, the monitor will cancel itself. - stateMachine.transitionTo(CLIENT_VERSION_UPGRADE_FROM_2x, newState); + stateMachine.transitionTo(CLIENT_VERSION_UPGRADE_FROM_2X, newState); } else { // This should not happen, so throw an exception that allows the monitor to continue monitoring // changes, this allows KCL to operate in the current state and keep monitoring until a valid // state transition is possible. // However, there could be a split brain here, new workers will use DDB value as source of truth, - // so we could also write back CLIENT_VERSION_2x to DDB to ensure all workers have consistent + // so we could also write back CLIENT_VERSION_2X to DDB to ensure all workers have consistent // behavior. // Ideally we don't expect modifications to DDB table out of the KCL migration tool scope, // so keeping it simple and not writing back to DDB, the error log below would help capture @@ -134,7 +134,7 @@ public class MigrationClientVersion2xState implements MigrationClientVersionStat log.error( "Migration state has invalid client version {}. Transition from {} is not supported", newState, - CLIENT_VERSION_2x); + CLIENT_VERSION_2X); throw new InvalidStateException(String.format("Unexpected new state %s", newState)); } } catch (final InvalidStateException | DependencyException e) { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersion3xState.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersion3xState.java index 4eba4165..1e857311 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersion3xState.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersion3xState.java @@ -22,7 +22,7 @@ import software.amazon.kinesis.coordinator.DynamicMigrationComponentsInitializer import software.amazon.kinesis.leases.exceptions.DependencyException; /** - * State for CLIENT_VERSION_3x which enables KCL to run 3.x algorithms on new KCLv3.x application + * State for CLIENT_VERSION_3X which enables KCL to run 3.x algorithms on new KCLv3.x application * or successfully upgraded application which upgraded from v2.x. This is a terminal state of the * state machine and no rollbacks are supported in this state. */ @@ -38,7 +38,7 @@ public class MigrationClientVersion3xState implements MigrationClientVersionStat @Override public ClientVersion clientVersion() { - return ClientVersion.CLIENT_VERSION_3x; + return ClientVersion.CLIENT_VERSION_3X; } @Override diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersion3xWithRollbackState.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersion3xWithRollbackState.java index 512d3c05..6235c5a9 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersion3xWithRollbackState.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersion3xWithRollbackState.java @@ -31,15 +31,15 @@ import software.amazon.kinesis.metrics.MetricsLevel; import software.amazon.kinesis.metrics.MetricsScope; import software.amazon.kinesis.metrics.MetricsUtil; -import static software.amazon.kinesis.coordinator.migration.ClientVersion.CLIENT_VERSION_2x; -import static software.amazon.kinesis.coordinator.migration.ClientVersion.CLIENT_VERSION_3x; +import static software.amazon.kinesis.coordinator.migration.ClientVersion.CLIENT_VERSION_2X; +import static software.amazon.kinesis.coordinator.migration.ClientVersion.CLIENT_VERSION_3X; import static software.amazon.kinesis.coordinator.migration.MigrationStateMachineImpl.FAULT_METRIC; import static software.amazon.kinesis.coordinator.migration.MigrationStateMachineImpl.METRICS_OPERATION; /** - * State for CLIENT_VERSION_3x_WITH_ROLLBACK which enables KCL to run its 3.x compliant algorithms + * State for CLIENT_VERSION_3X_WITH_ROLLBACK which enables KCL to run its 3.x compliant algorithms * during the upgrade process after all KCL workers in the fleet are 3.x complaint. Since this - * is an instant switch from CLIENT_VERSION_UPGRADE_FROM_2x, it also supports rollback if customers + * is an instant switch from CLIENT_VERSION_UPGRADE_FROM_2X, it also supports rollback if customers * see regression to allow for instant rollbacks as well. This would be achieved by customers * running a KCL migration tool to update MigrationState in DDB. So this state monitors for * rollback triggers and performs state transitions accordingly. @@ -62,7 +62,7 @@ public class MigrationClientVersion3xWithRollbackState implements MigrationClien @Override public ClientVersion clientVersion() { - return ClientVersion.CLIENT_VERSION_3x_WITH_ROLLBACK; + return ClientVersion.CLIENT_VERSION_3X_WITH_ROLLBACK; } @Override @@ -108,17 +108,17 @@ public class MigrationClientVersion3xWithRollbackState implements MigrationClien MetricsUtil.createMetricsWithOperation(initializer.metricsFactory(), METRICS_OPERATION); try { switch (newState.getClientVersion()) { - case CLIENT_VERSION_2x: - log.info("A rollback has been initiated for the application. Transition to {}", CLIENT_VERSION_2x); - stateMachine.transitionTo(ClientVersion.CLIENT_VERSION_2x, newState); + case CLIENT_VERSION_2X: + log.info("A rollback has been initiated for the application. Transition to {}", CLIENT_VERSION_2X); + stateMachine.transitionTo(ClientVersion.CLIENT_VERSION_2X, newState); break; - case CLIENT_VERSION_3x: + case CLIENT_VERSION_3X: log.info("Customer has switched to 3.x after successful upgrade, state machine will move to a" + "terminal state and stop monitoring. Rollbacks will no longer be supported anymore"); - stateMachine.transitionTo(CLIENT_VERSION_3x, newState); + stateMachine.transitionTo(CLIENT_VERSION_3X, newState); // This worker will still be running the migrationAdaptive components in 3.x mode which will // no longer dynamically switch back to 2.x mode, however to directly run 3.x component without - // adaption to migration (i.e. move to CLIENT_VERSION_3x state), it requires this worker to go + // adaption to migration (i.e. move to CLIENT_VERSION_3X state), it requires this worker to go // through the current deployment which initiated the switch to 3.x mode. break; default: @@ -126,7 +126,7 @@ public class MigrationClientVersion3xWithRollbackState implements MigrationClien // changes, this allows KCL to operate in the current state and keep monitoring until a valid // state transition is possible. // However, there could be a split brain here, new workers will use DDB value as source of truth, - // so we could also write back CLIENT_VERSION_3x_WITH_ROLLBACK to DDB to ensure all workers have + // so we could also write back CLIENT_VERSION_3X_WITH_ROLLBACK to DDB to ensure all workers have // consistent behavior. // Ideally we don't expect modifications to DDB table out of the KCL migration tool scope, // so keeping it simple and not writing back to DDB, the error log below would help capture diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersionStateInitializer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersionStateInitializer.java index 02231218..970bd6ed 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersionStateInitializer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersionStateInitializer.java @@ -31,10 +31,10 @@ import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.InvalidStateException; import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; -import static software.amazon.kinesis.coordinator.migration.ClientVersion.CLIENT_VERSION_2x; -import static software.amazon.kinesis.coordinator.migration.ClientVersion.CLIENT_VERSION_3x; -import static software.amazon.kinesis.coordinator.migration.ClientVersion.CLIENT_VERSION_3x_WITH_ROLLBACK; -import static software.amazon.kinesis.coordinator.migration.ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2x; +import static software.amazon.kinesis.coordinator.migration.ClientVersion.CLIENT_VERSION_2X; +import static software.amazon.kinesis.coordinator.migration.ClientVersion.CLIENT_VERSION_3X; +import static software.amazon.kinesis.coordinator.migration.ClientVersion.CLIENT_VERSION_3X_WITH_ROLLBACK; +import static software.amazon.kinesis.coordinator.migration.ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2X; import static software.amazon.kinesis.coordinator.migration.MigrationState.MIGRATION_HASH_KEY; /** @@ -44,13 +44,13 @@ import static software.amazon.kinesis.coordinator.migration.MigrationState.MIGRA * as follows * ClientVersionConfig | MigrationState (DDB) | initial client version * --------------------+---------------------------------+-------------------------------- - * COMPATIBLE_WITH_2x | Does not exist | CLIENT_VERSION_UPGRADE_FROM_2x - * 3x | Does not exist | CLIENT_VERSION_3x - * COMPATIBLE_WITH_2x | CLIENT_VERSION_3x_WITH_ROLLBACK | CLIENT_VERSION_3x_WITH_ROLLBACK - * 3x | CLIENT_VERSION_3x_WITH_ROLLBACK | CLIENT_VERSION_3x - * any | CLIENT_VERSION_2x | CLIENT_VERSION_2x - * any | CLIENT_VERSION_UPGRADE_FROM_2x | CLIENT_VERSION_UPGRADE_FROM_2x - * any | CLIENT_VERSION_3x | CLIENT_VERSION_3x + * COMPATIBLE_WITH_2X | Does not exist | CLIENT_VERSION_UPGRADE_FROM_2X + * 3X | Does not exist | CLIENT_VERSION_3X + * COMPATIBLE_WITH_2X | CLIENT_VERSION_3X_WITH_ROLLBACK | CLIENT_VERSION_3X_WITH_ROLLBACK + * 3X | CLIENT_VERSION_3X_WITH_ROLLBACK | CLIENT_VERSION_3X + * any | CLIENT_VERSION_2X | CLIENT_VERSION_2X + * any | CLIENT_VERSION_UPGRADE_FROM_2X | CLIENT_VERSION_UPGRADE_FROM_2X + * any | CLIENT_VERSION_3X | CLIENT_VERSION_3X */ @KinesisClientInternalApi @RequiredArgsConstructor @@ -110,26 +110,26 @@ public class MigrationClientVersionStateInitializer { nextClientVersion = getNextClientVersionBasedOnConfigVersion(); log.info("Application is starting in {}", nextClientVersion); break; - case CLIENT_VERSION_3x_WITH_ROLLBACK: - if (clientVersionConfig == ClientVersionConfig.CLIENT_VERSION_CONFIG_3x) { + case CLIENT_VERSION_3X_WITH_ROLLBACK: + if (clientVersionConfig == ClientVersionConfig.CLIENT_VERSION_CONFIG_3X) { // upgrade successful, allow transition to 3x. - log.info("Application has successfully upgraded, transitioning to {}", CLIENT_VERSION_3x); - nextClientVersion = CLIENT_VERSION_3x; + log.info("Application has successfully upgraded, transitioning to {}", CLIENT_VERSION_3X); + nextClientVersion = CLIENT_VERSION_3X; break; } - log.info("Initialize with {}", CLIENT_VERSION_3x_WITH_ROLLBACK); + log.info("Initialize with {}", CLIENT_VERSION_3X_WITH_ROLLBACK); nextClientVersion = migrationState.getClientVersion(); break; - case CLIENT_VERSION_2x: - log.info("Application has rolled-back, initialize with {}", CLIENT_VERSION_2x); + case CLIENT_VERSION_2X: + log.info("Application has rolled-back, initialize with {}", CLIENT_VERSION_2X); nextClientVersion = migrationState.getClientVersion(); break; - case CLIENT_VERSION_UPGRADE_FROM_2x: - log.info("Application is upgrading, initialize with {}", CLIENT_VERSION_UPGRADE_FROM_2x); + case CLIENT_VERSION_UPGRADE_FROM_2X: + log.info("Application is upgrading, initialize with {}", CLIENT_VERSION_UPGRADE_FROM_2X); nextClientVersion = migrationState.getClientVersion(); break; - case CLIENT_VERSION_3x: - log.info("Initialize with {}", CLIENT_VERSION_3x); + case CLIENT_VERSION_3X: + log.info("Initialize with {}", CLIENT_VERSION_3X); nextClientVersion = migrationState.getClientVersion(); break; default: @@ -180,10 +180,10 @@ public class MigrationClientVersionStateInitializer { private ClientVersion getNextClientVersionBasedOnConfigVersion() { switch (clientVersionConfig) { - case CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2x: - return CLIENT_VERSION_UPGRADE_FROM_2x; - case CLIENT_VERSION_CONFIG_3x: - return CLIENT_VERSION_3x; + case CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X: + return CLIENT_VERSION_UPGRADE_FROM_2X; + case CLIENT_VERSION_CONFIG_3X: + return CLIENT_VERSION_3X; } throw new IllegalStateException(String.format("Unknown configured Client version %s", clientVersionConfig)); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersionUpgradeFrom2xState.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersionUpgradeFrom2xState.java index c0a59410..86106a07 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersionUpgradeFrom2xState.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersionUpgradeFrom2xState.java @@ -32,23 +32,23 @@ import software.amazon.kinesis.metrics.MetricsLevel; import software.amazon.kinesis.metrics.MetricsScope; import software.amazon.kinesis.metrics.MetricsUtil; -import static software.amazon.kinesis.coordinator.migration.ClientVersion.CLIENT_VERSION_2x; -import static software.amazon.kinesis.coordinator.migration.ClientVersion.CLIENT_VERSION_3x_WITH_ROLLBACK; +import static software.amazon.kinesis.coordinator.migration.ClientVersion.CLIENT_VERSION_2X; +import static software.amazon.kinesis.coordinator.migration.ClientVersion.CLIENT_VERSION_3X_WITH_ROLLBACK; import static software.amazon.kinesis.coordinator.migration.MigrationStateMachineImpl.FAULT_METRIC; import static software.amazon.kinesis.coordinator.migration.MigrationStateMachineImpl.METRICS_OPERATION; /** - * State for CLIENT_VERSION_UPGRADE_FROM_2x. When state machine enters this state, + * State for CLIENT_VERSION_UPGRADE_FROM_2X. When state machine enters this state, * KCL is initialized to operate in dual mode for Lease assignment and Leader decider algorithms * which initially start in 2.x compatible mode and when all the KCL workers are 3.x compliant, * it dynamically switches to the 3.x algorithms. It also monitors for rollback * initiated from customer via the KCL migration tool and instantly switches back to the 2.x * complaint algorithms. - * The allowed state transitions are to CLIENT_VERSION_3x_WITH_ROLLBACK when KCL workers are - * 3.x complaint, and to CLIENT_VERSION_2x when customer has initiated a rollback. + * The allowed state transitions are to CLIENT_VERSION_3X_WITH_ROLLBACK when KCL workers are + * 3.x complaint, and to CLIENT_VERSION_2X when customer has initiated a rollback. * Only the leader KCL worker performs migration ready monitor and notifies all workers (including * itself) via a MigrationState update. When all worker's monitor notice the MigrationState change - * (including itself), it will transition to CLIENT_VERSION_3x_WITH_ROLLBACK. + * (including itself), it will transition to CLIENT_VERSION_3X_WITH_ROLLBACK. */ @KinesisClientInternalApi @RequiredArgsConstructor @@ -71,7 +71,7 @@ public class MigrationClientVersionUpgradeFrom2xState implements MigrationClient @Override public ClientVersion clientVersion() { - return ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2x; + return ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2X; } @Override @@ -170,7 +170,7 @@ public class MigrationClientVersionUpgradeFrom2xState implements MigrationClient /** * Callback handler to handle client version changes in MigrationState in DDB. - * @param newState current MigrationState read from DDB where client version is not CLIENT_VERSION_UPGRADE_FROM_2x + * @param newState current MigrationState read from DDB where client version is not CLIENT_VERSION_UPGRADE_FROM_2X * @throws InvalidStateException during transition to the next state based on the new ClientVersion * or if the new state in DDB is unexpected. */ @@ -184,23 +184,23 @@ public class MigrationClientVersionUpgradeFrom2xState implements MigrationClient MetricsUtil.createMetricsWithOperation(initializer.metricsFactory(), METRICS_OPERATION); try { switch (newState.getClientVersion()) { - case CLIENT_VERSION_2x: - log.info("A rollback has been initiated for the application. Transition to {}", CLIENT_VERSION_2x); + case CLIENT_VERSION_2X: + log.info("A rollback has been initiated for the application. Transition to {}", CLIENT_VERSION_2X); // cancel monitor asynchronously cancelMigrationReadyMonitor(); - stateMachine.transitionTo(CLIENT_VERSION_2x, newState); + stateMachine.transitionTo(CLIENT_VERSION_2X, newState); break; - case CLIENT_VERSION_3x_WITH_ROLLBACK: - log.info("KCL workers are v3.x compliant, transition to {}", CLIENT_VERSION_3x_WITH_ROLLBACK); + case CLIENT_VERSION_3X_WITH_ROLLBACK: + log.info("KCL workers are v3.x compliant, transition to {}", CLIENT_VERSION_3X_WITH_ROLLBACK); cancelMigrationReadyMonitor(); - stateMachine.transitionTo(CLIENT_VERSION_3x_WITH_ROLLBACK, newState); + stateMachine.transitionTo(CLIENT_VERSION_3X_WITH_ROLLBACK, newState); break; default: // This should not happen, so throw an exception that allows the monitor to continue monitoring // changes, this allows KCL to operate in the current state and keep monitoring until a valid // state transition is possible. // However, there could be a split brain here, new workers will use DDB value as source of truth, - // so we could also write back CLIENT_VERSION_UPGRADE_FROM_2x to DDB to ensure all workers have + // so we could also write back CLIENT_VERSION_UPGRADE_FROM_2X to DDB to ensure all workers have // consistent behavior. // Ideally we don't expect modifications to DDB table out of the KCL migration tool scope, // so keeping it simple and not writing back to DDB, the error log below would help capture @@ -222,7 +222,7 @@ public class MigrationClientVersionUpgradeFrom2xState implements MigrationClient try { final MigrationState newMigrationState = currentMigrationState .copy() - .update(CLIENT_VERSION_3x_WITH_ROLLBACK, initializer.workerIdentifier()); + .update(CLIENT_VERSION_3X_WITH_ROLLBACK, initializer.workerIdentifier()); log.info("Updating Migration State in DDB with {} prev state {}", newMigrationState, currentMigrationState); return coordinatorStateDAO.updateCoordinatorStateWithExpectation( newMigrationState, currentMigrationState.getDynamoClientVersionExpectation()); @@ -230,7 +230,7 @@ public class MigrationClientVersionUpgradeFrom2xState implements MigrationClient log.warn( "Exception occurred when toggling to {}, upgradeReadyMonitor will retry the update" + " if upgrade condition is still true", - CLIENT_VERSION_3x_WITH_ROLLBACK, + CLIENT_VERSION_3X_WITH_ROLLBACK, e); scope.addData(FAULT_METRIC, 1, StandardUnit.COUNT, MetricsLevel.SUMMARY); return false; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationStateMachineImpl.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationStateMachineImpl.java index 247dec13..6bc08189 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationStateMachineImpl.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationStateMachineImpl.java @@ -190,7 +190,7 @@ public class MigrationStateMachineImpl implements MigrationStateMachine { currentMigrationClientVersionState = nextMigrationClientVersionState; log.info("Successfully transitioned to {}", nextMigrationClientVersionState.clientVersion()); - if (currentMigrationClientVersionState.clientVersion() == ClientVersion.CLIENT_VERSION_3x) { + if (currentMigrationClientVersionState.clientVersion() == ClientVersion.CLIENT_VERSION_3X) { terminate(); } success = true; @@ -220,10 +220,10 @@ public class MigrationStateMachineImpl implements MigrationStateMachine { private MigrationClientVersionState createMigrationClientVersionState( final ClientVersion clientVersion, final MigrationState migrationState) { switch (clientVersion) { - case CLIENT_VERSION_2x: + case CLIENT_VERSION_2X: return new MigrationClientVersion2xState( this, coordinatorStateDAO, stateMachineThreadPool, initializer, random); - case CLIENT_VERSION_UPGRADE_FROM_2x: + case CLIENT_VERSION_UPGRADE_FROM_2X: return new MigrationClientVersionUpgradeFrom2xState( this, timeProvider, @@ -233,10 +233,10 @@ public class MigrationStateMachineImpl implements MigrationStateMachine { random, migrationState, flipTo3XStabilizerTimeInSeconds); - case CLIENT_VERSION_3x_WITH_ROLLBACK: + case CLIENT_VERSION_3X_WITH_ROLLBACK: return new MigrationClientVersion3xWithRollbackState( this, coordinatorStateDAO, stateMachineThreadPool, initializer, random); - case CLIENT_VERSION_3x: + case CLIENT_VERSION_3X: return new MigrationClientVersion3xState(this, initializer); } throw new IllegalStateException(String.format("Unknown client version %s", clientVersion)); @@ -246,7 +246,7 @@ public class MigrationStateMachineImpl implements MigrationStateMachine { if (currentMigrationClientVersionState != null) { return currentMigrationClientVersionState.clientVersion(); } else if (terminated) { - return ClientVersion.CLIENT_VERSION_3x; + return ClientVersion.CLIENT_VERSION_3X; } throw new UnsupportedOperationException( "No current state when state machine is either not initialized" + " or already terminated"); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java index 77a45a04..bbdf78e2 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java @@ -29,7 +29,6 @@ import java.util.function.Function; import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.Builder; import lombok.Data; -import lombok.Getter; import lombok.NonNull; import lombok.experimental.Accessors; import org.apache.commons.lang3.Validate; @@ -381,8 +380,8 @@ public class LeaseManagementConfig { * to shut down and an option to enable or disable graceful lease handoff. *

*/ + @Data @Builder - @Getter @Accessors(fluent = true) public static class GracefulLeaseHandoffConfig { /** @@ -550,7 +549,7 @@ public class LeaseManagementConfig { private int dampeningPercentage = 60; /** * Percentage value used to trigger reBalance. If fleet has workers which are have metrics value more or less - * than 20% of fleet level average then reBalance is triggered. + * than 10% of fleet level average then reBalance is triggered. * Leases are taken from workers with metrics value more than fleet level average. The load to take from these * workers is determined by evaluating how far they are with respect to fleet level average. */ @@ -565,7 +564,7 @@ public class LeaseManagementConfig { private boolean allowThroughputOvershoot = true; /** - * Duration after which workerMetrics entry from WorkerMetricStats table will be cleaned up. When an entry's + * Duration after which workerMetricStats entry from WorkerMetricStats table will be cleaned up. When an entry's * lastUpdateTime is older than staleWorkerMetricsEntryCleanupDuration from current time, entry will be removed * from the table. */ @@ -580,15 +579,16 @@ public class LeaseManagementConfig { private WorkerMetricsTableConfig workerMetricsTableConfig; /** - * Frequency to perform worker variance balancing frequency. This value is used with respect to the LAM freq, + * Frequency to perform worker variance balancing. This value is used with respect to the LAM frequency, * that is every third (as default) iteration of LAM the worker variance balancing will be performed. * Setting it to 1 will make varianceBalancing run on every iteration of LAM and 2 on every 2nd iteration * and so on. + * NOTE: LAM frequency = failoverTimeMillis */ private int varianceBalancingFrequency = 3; /** - * Alpha value used for calculating exponential moving average of worker's metrics values. Selecting + * Alpha value used for calculating exponential moving average of worker's metricStats. Selecting * higher alpha value gives more weightage to recent value and thus low smoothing effect on computed average * and selecting smaller alpha values gives more weightage to past value and high smoothing effect. */ diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/CoordinatorStateDAOTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/CoordinatorStateDAOTest.java index 3874b8a7..2ec51318 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/CoordinatorStateDAOTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/CoordinatorStateDAOTest.java @@ -189,7 +189,7 @@ public class CoordinatorStateDAOTest { createCoordinatorState("key1"); final MigrationState migrationState = new MigrationState(MIGRATION_HASH_KEY, WORKER_ID) - .update(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2x, WORKER_ID); + .update(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2X, WORKER_ID); doaUnderTest.createCoordinatorStateIfNotExists(migrationState); final AmazonDynamoDBLockClient dynamoDBLockClient = new AmazonDynamoDBLockClient(doaUnderTest @@ -223,7 +223,7 @@ public class CoordinatorStateDAOTest { // Make sure the record has not changed due to using // ddb lock client Assertions.assertEquals( - ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2x.toString(), + ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2X.toString(), item.get(CLIENT_VERSION_ATTRIBUTE_NAME).s()); } else if (LEADER_HASH_KEY.equals(key)) { Assertions.assertEquals("TEST_WORKER", item.get("ownerName").s()); @@ -264,7 +264,7 @@ public class CoordinatorStateDAOTest { if ("Migration3.0".equals(keyValue)) { Assertions.assertTrue(state instanceof MigrationState); final MigrationState migrationState = (MigrationState) state; - Assertions.assertEquals(ClientVersion.CLIENT_VERSION_3x, migrationState.getClientVersion()); + Assertions.assertEquals(ClientVersion.CLIENT_VERSION_3X, migrationState.getClientVersion()); return; } Assertions.assertEquals(3, state.getAttributes().size()); @@ -379,7 +379,7 @@ public class CoordinatorStateDAOTest { final MigrationState state = createMigrationState(); /* Test step - update the state with mismatched condition */ - final MigrationState updatedState = state.copy().update(ClientVersion.CLIENT_VERSION_2x, WORKER_ID); + final MigrationState updatedState = state.copy().update(ClientVersion.CLIENT_VERSION_2X, WORKER_ID); boolean updated = doaUnderTest.updateCoordinatorStateWithExpectation( updatedState, updatedState.getDynamoClientVersionExpectation()); @@ -403,7 +403,7 @@ public class CoordinatorStateDAOTest { .build()) .join(); Assertions.assertEquals( - ClientVersion.CLIENT_VERSION_2x.name(), + ClientVersion.CLIENT_VERSION_2X.name(), response.item().get("cv").s()); Assertions.assertEquals(WORKER_ID, response.item().get("mb").s()); Assertions.assertEquals( @@ -433,7 +433,7 @@ public class CoordinatorStateDAOTest { /* Test step - update with new state object */ final MigrationState updatedState = - new MigrationState("Migration3.0", WORKER_ID).update(ClientVersion.CLIENT_VERSION_2x, WORKER_ID); + new MigrationState("Migration3.0", WORKER_ID).update(ClientVersion.CLIENT_VERSION_2X, WORKER_ID); boolean updated = doaUnderTest.updateCoordinatorStateWithExpectation(updatedState, null); @@ -491,7 +491,7 @@ public class CoordinatorStateDAOTest { final HashMap item = new HashMap() { { put("key", AttributeValue.fromS("Migration3.0")); - put("cv", AttributeValue.fromS(ClientVersion.CLIENT_VERSION_3x.toString())); + put("cv", AttributeValue.fromS(ClientVersion.CLIENT_VERSION_3X.toString())); put("mb", AttributeValue.fromS("DUMMY_WORKER")); put("mts", AttributeValue.fromN(String.valueOf(System.currentTimeMillis()))); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DynamicMigrationComponentsInitializerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DynamicMigrationComponentsInitializerTest.java index fce69191..6d9cfb98 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DynamicMigrationComponentsInitializerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DynamicMigrationComponentsInitializerTest.java @@ -112,7 +112,7 @@ public class DynamicMigrationComponentsInitializerTest { @Test public void testInitialize_ClientVersion3_X() throws DependencyException { // Test initializing to verify correct leader decider is created - migrationInitializer.initialize(ClientVersion.CLIENT_VERSION_3x); + migrationInitializer.initialize(ClientVersion.CLIENT_VERSION_3X); verify(mockWorkerMetricsManager).startManager(); verify(mockDdbLockBasedLeaderDeciderCreator).get(); @@ -147,7 +147,7 @@ public class DynamicMigrationComponentsInitializerTest { @Test public void testInitialize_ClientVersion_3_xWithRollback() throws DependencyException { // Test initializing to verify correct leader decider is created - migrationInitializer.initialize(ClientVersion.CLIENT_VERSION_3x_WITH_ROLLBACK); + migrationInitializer.initialize(ClientVersion.CLIENT_VERSION_3X_WITH_ROLLBACK); verify(mockWorkerMetricsManager).startManager(); @@ -171,7 +171,7 @@ public class DynamicMigrationComponentsInitializerTest { } @ParameterizedTest - @CsvSource({"CLIENT_VERSION_UPGRADE_FROM_2x", "CLIENT_VERSION_2x"}) + @CsvSource({"CLIENT_VERSION_UPGRADE_FROM_2X", "CLIENT_VERSION_2X"}) public void testInitialize_ClientVersion_All2_X(final ClientVersion clientVersion) throws DependencyException { // Test initializing to verify correct leader decider is created migrationInitializer.initialize(clientVersion); @@ -187,7 +187,7 @@ public class DynamicMigrationComponentsInitializerTest { verify(mockConsumer).initialize(eq(true), eq(LeaseAssignmentMode.DEFAULT_LEASE_COUNT_BASED_ASSIGNMENT)); // test initialization from state machine - if (clientVersion == ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2x) { + if (clientVersion == ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2X) { migrationInitializer.initializeClientVersionForUpgradeFrom2x(ClientVersion.CLIENT_VERSION_INIT); // start worker stats and create gsi without waiting verify(mockWorkerMetricsDAO).initialize(); @@ -211,7 +211,7 @@ public class DynamicMigrationComponentsInitializerTest { when(mockLamThreadPool.awaitTermination(anyLong(), any())).thenReturn(true); when(mockWorkerMetricsScheduler.awaitTermination(anyLong(), any())).thenReturn(true); - migrationInitializer.initialize(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2x); + migrationInitializer.initialize(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2X); migrationInitializer.shutdown(); verify(mockLamThreadPool).shutdown(); @@ -226,7 +226,7 @@ public class DynamicMigrationComponentsInitializerTest { @Test public void initializationFails_WhenGsiIsNotActiveIn3_X() throws DependencyException { - migrationInitializer.initialize(ClientVersion.CLIENT_VERSION_3x); + migrationInitializer.initialize(ClientVersion.CLIENT_VERSION_3X); // test initialization from state machine assertThrows( @@ -236,7 +236,7 @@ public class DynamicMigrationComponentsInitializerTest { @Test public void initializationDoesNotFail_WhenGsiIsNotActiveIn3_XWithRollback() throws DependencyException { - migrationInitializer.initialize(ClientVersion.CLIENT_VERSION_3x_WITH_ROLLBACK); + migrationInitializer.initialize(ClientVersion.CLIENT_VERSION_3X_WITH_ROLLBACK); // test initialization from state machine assertDoesNotThrow( @@ -245,11 +245,11 @@ public class DynamicMigrationComponentsInitializerTest { @Test public void testComponentsInitialization_AfterFlip() throws DependencyException { - migrationInitializer.initialize(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2x); + migrationInitializer.initialize(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2X); migrationInitializer.initializeClientVersionForUpgradeFrom2x(ClientVersion.CLIENT_VERSION_INIT); // Test flip - migrationInitializer.initializeClientVersionFor3xWithRollback(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2x); + migrationInitializer.initializeClientVersionFor3xWithRollback(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2X); // verify verify(mockLam).start(); @@ -266,13 +266,13 @@ public class DynamicMigrationComponentsInitializerTest { .when(mockWorkerMetricsScheduler) .scheduleAtFixedRate(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class)); - migrationInitializer.initialize(ClientVersion.CLIENT_VERSION_2x); + migrationInitializer.initialize(ClientVersion.CLIENT_VERSION_2X); migrationInitializer.initializeClientVersionFor2x(ClientVersion.CLIENT_VERSION_INIT); // test roll-forward reset(mockWorkerMetricsScheduler); reset(mockLeaseRefresher); - migrationInitializer.initializeClientVersionForUpgradeFrom2x(ClientVersion.CLIENT_VERSION_2x); + migrationInitializer.initializeClientVersionForUpgradeFrom2x(ClientVersion.CLIENT_VERSION_2X); // verify verify(mockWorkerMetricsScheduler) @@ -288,11 +288,11 @@ public class DynamicMigrationComponentsInitializerTest { .when(mockWorkerMetricsScheduler) .scheduleAtFixedRate(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class)); - migrationInitializer.initialize(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2x); + migrationInitializer.initialize(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2X); migrationInitializer.initializeClientVersionForUpgradeFrom2x(ClientVersion.CLIENT_VERSION_INIT); // test rollback before flip - migrationInitializer.initializeClientVersionFor2x(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2x); + migrationInitializer.initializeClientVersionFor2x(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2X); // verify verify(mockFuture).cancel(anyBoolean()); @@ -305,11 +305,11 @@ public class DynamicMigrationComponentsInitializerTest { .when(mockWorkerMetricsScheduler) .scheduleAtFixedRate(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class)); - migrationInitializer.initialize(ClientVersion.CLIENT_VERSION_3x_WITH_ROLLBACK); + migrationInitializer.initialize(ClientVersion.CLIENT_VERSION_3X_WITH_ROLLBACK); migrationInitializer.initializeClientVersionFor3xWithRollback(ClientVersion.CLIENT_VERSION_INIT); // test rollback before flip - migrationInitializer.initializeClientVersionFor2x(ClientVersion.CLIENT_VERSION_3x_WITH_ROLLBACK); + migrationInitializer.initializeClientVersionFor2x(ClientVersion.CLIENT_VERSION_3X_WITH_ROLLBACK); // verify verify(mockFuture).cancel(anyBoolean()); @@ -337,7 +337,7 @@ public class DynamicMigrationComponentsInitializerTest { } }); - migrationInitializer.initialize(ClientVersion.CLIENT_VERSION_3x_WITH_ROLLBACK); + migrationInitializer.initialize(ClientVersion.CLIENT_VERSION_3X_WITH_ROLLBACK); migrationInitializer.initializeClientVersionFor3xWithRollback(ClientVersion.CLIENT_VERSION_INIT); // run the worker stats reporting thread diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/migration/ClientVersionChangeMonitorTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/migration/ClientVersionChangeMonitorTest.java index b4ef5315..e83189a4 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/migration/ClientVersionChangeMonitorTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/migration/ClientVersionChangeMonitorTest.java @@ -60,10 +60,10 @@ public class ClientVersionChangeMonitorTest { @ParameterizedTest @CsvSource({ - "CLIENT_VERSION_2x, CLIENT_VERSION_UPGRADE_FROM_2x", - "CLIENT_VERSION_3x_WITH_ROLLBACK, CLIENT_VERSION_2x", - "CLIENT_VERSION_UPGRADE_FROM_2x, CLIENT_VERSION_3x_WITH_ROLLBACK", - "CLIENT_VERSION_3x_WITH_ROLLBACK, CLIENT_VERSION_3x" + "CLIENT_VERSION_2X, CLIENT_VERSION_UPGRADE_FROM_2X", + "CLIENT_VERSION_3X_WITH_ROLLBACK, CLIENT_VERSION_2X", + "CLIENT_VERSION_UPGRADE_FROM_2X, CLIENT_VERSION_3X_WITH_ROLLBACK", + "CLIENT_VERSION_3X_WITH_ROLLBACK, CLIENT_VERSION_3X" }) public void testMonitor(final ClientVersion currentClientVersion, final ClientVersion changedClientVersion) throws Exception { @@ -104,7 +104,7 @@ public class ClientVersionChangeMonitorTest { mockCoordinatorStateDAO, mockScheduler, mockCallback, - ClientVersion.CLIENT_VERSION_2x, + ClientVersion.CLIENT_VERSION_2X, mockRandom); monitorUnderTest.startMonitor(); @@ -112,7 +112,7 @@ public class ClientVersionChangeMonitorTest { verify(mockScheduler).scheduleWithFixedDelay(argumentCaptor.capture(), anyLong(), anyLong(), anyObject()); final MigrationState state = new MigrationState(MIGRATION_HASH_KEY, "DUMMY_WORKER") - .update(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2x, "DUMMY_WORKER"); + .update(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2X, "DUMMY_WORKER"); when(mockCoordinatorStateDAO.getCoordinatorState(MIGRATION_HASH_KEY)).thenReturn(state); argumentCaptor.getValue().run(); @@ -130,7 +130,7 @@ public class ClientVersionChangeMonitorTest { mockCoordinatorStateDAO, mockScheduler, mockCallback, - ClientVersion.CLIENT_VERSION_2x, + ClientVersion.CLIENT_VERSION_2X, mockRandom); monitorUnderTest.startMonitor(); @@ -138,7 +138,7 @@ public class ClientVersionChangeMonitorTest { verify(mockScheduler).scheduleWithFixedDelay(argumentCaptor.capture(), anyLong(), anyLong(), anyObject()); final MigrationState state = new MigrationState(MIGRATION_HASH_KEY, "DUMMY_WORKER") - .update(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2x, "DUMMY_WORKER"); + .update(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2X, "DUMMY_WORKER"); when(mockCoordinatorStateDAO.getCoordinatorState(MIGRATION_HASH_KEY)).thenReturn(state); doThrow(new InvalidStateException("test exception")).when(mockCallback).accept(any()); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/migration/MigrationStateMachineTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/migration/MigrationStateMachineTest.java index 22436946..88e16380 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/migration/MigrationStateMachineTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/migration/MigrationStateMachineTest.java @@ -107,8 +107,8 @@ public class MigrationStateMachineTest { @ParameterizedTest @CsvSource({ - "CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2x, CLIENT_VERSION_UPGRADE_FROM_2x", - "CLIENT_VERSION_CONFIG_3x, CLIENT_VERSION_3x" + "CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X, CLIENT_VERSION_UPGRADE_FROM_2X", + "CLIENT_VERSION_CONFIG_3X, CLIENT_VERSION_3X" }) public void testStateMachineInitialization( final ClientVersionConfig config, final ClientVersion expectedStateMachineState) throws Exception { @@ -118,7 +118,7 @@ public class MigrationStateMachineTest { @Test public void testMigrationReadyFlip() throws Exception { - stateMachineUnderTest = getStateMachineUnderTest(ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2x); + stateMachineUnderTest = getStateMachineUnderTest(ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X); // After initialization, state machine should start to monitor for upgrade readiness final ArgumentCaptor runnableCaptor = ArgumentCaptor.forClass(Runnable.class); @@ -130,7 +130,7 @@ public class MigrationStateMachineTest { @Test public void testRollbackAfterFlip() throws Exception { - stateMachineUnderTest = getStateMachineUnderTest(ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2x); + stateMachineUnderTest = getStateMachineUnderTest(ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X); // After initialization, state machine should start to monitor for upgrade readiness ArgumentCaptor runnableCaptor = ArgumentCaptor.forClass(Runnable.class); @@ -150,7 +150,7 @@ public class MigrationStateMachineTest { @Test public void testRollForward() throws Exception { - stateMachineUnderTest = getStateMachineUnderTest(ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2x); + stateMachineUnderTest = getStateMachineUnderTest(ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X); // After initialization, state machine should start to monitor for upgrade readiness ArgumentCaptor runnableCaptor = ArgumentCaptor.forClass(Runnable.class); @@ -177,7 +177,7 @@ public class MigrationStateMachineTest { @Test public void testRollbackBeforeFlip() throws Exception { - stateMachineUnderTest = getStateMachineUnderTest(ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2x); + stateMachineUnderTest = getStateMachineUnderTest(ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X); // After initialization, state machine should start to monitor for upgrade readiness ArgumentCaptor runnableCaptor = ArgumentCaptor.forClass(Runnable.class); @@ -189,7 +189,7 @@ public class MigrationStateMachineTest { @Test public void successfulUpgradeAfterFlip() throws Exception { - stateMachineUnderTest = getStateMachineUnderTest(ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2x); + stateMachineUnderTest = getStateMachineUnderTest(ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X); // After initialization, state machine should start to monitor for upgrade readiness ArgumentCaptor runnableCaptor = ArgumentCaptor.forClass(Runnable.class); @@ -244,15 +244,15 @@ public class MigrationStateMachineTest { // Invoke the monitor callbacks so the version flips to 3.x with rollback migrationReadyMonitorRunnable.run(); Assertions.assertEquals( - ClientVersion.CLIENT_VERSION_3x_WITH_ROLLBACK, + ClientVersion.CLIENT_VERSION_3X_WITH_ROLLBACK, stateCaptor.getValue().getClientVersion()); versionChangeMonitorRunnable.run(); Assertions.assertEquals( - ClientVersion.CLIENT_VERSION_3x_WITH_ROLLBACK, stateMachineUnderTest.getCurrentClientVersion()); + ClientVersion.CLIENT_VERSION_3X_WITH_ROLLBACK, stateMachineUnderTest.getCurrentClientVersion()); verify(mockInitializer) - .initializeClientVersionFor3xWithRollback(eq(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2x)); + .initializeClientVersionFor3xWithRollback(eq(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2X)); log.info("TestLog ----------- flip done -------------"); } @@ -263,33 +263,33 @@ public class MigrationStateMachineTest { : runnableCaptor.getAllValues().get(1); final MigrationState state = - new MigrationState(MIGRATION_HASH_KEY, WORKER_ID).update(ClientVersion.CLIENT_VERSION_2x, WORKER_ID); + new MigrationState(MIGRATION_HASH_KEY, WORKER_ID).update(ClientVersion.CLIENT_VERSION_2X, WORKER_ID); when(mockCoordinatorStateDAO.getCoordinatorState(MIGRATION_HASH_KEY)).thenReturn(state); reset(mockMigrationStateMachineThreadPool); reset(mockInitializer); log.info("TestLog ----------- Initiate rollback before flip -------------"); versionChangeMonitorRunnable.run(); log.info("TestLog ----------- rollback before flip done -------------"); - Assertions.assertEquals(ClientVersion.CLIENT_VERSION_2x, stateMachineUnderTest.getCurrentClientVersion()); - verify(mockInitializer).initializeClientVersionFor2x(eq(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2x)); + Assertions.assertEquals(ClientVersion.CLIENT_VERSION_2X, stateMachineUnderTest.getCurrentClientVersion()); + verify(mockInitializer).initializeClientVersionFor2x(eq(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2X)); } private void initiateAndTestRollBack(final Runnable rollbackMonitorRunnable) throws Exception { final MigrationState state = - new MigrationState(MIGRATION_HASH_KEY, WORKER_ID).update(ClientVersion.CLIENT_VERSION_2x, WORKER_ID); + new MigrationState(MIGRATION_HASH_KEY, WORKER_ID).update(ClientVersion.CLIENT_VERSION_2X, WORKER_ID); when(mockCoordinatorStateDAO.getCoordinatorState(MIGRATION_HASH_KEY)).thenReturn(state); reset(mockMigrationStateMachineThreadPool); reset(mockInitializer); log.info("TestLog ----------- Initiate rollback -------------"); rollbackMonitorRunnable.run(); log.info("TestLog ----------- rollback done -------------"); - Assertions.assertEquals(ClientVersion.CLIENT_VERSION_2x, stateMachineUnderTest.getCurrentClientVersion()); - verify(mockInitializer).initializeClientVersionFor2x(eq(ClientVersion.CLIENT_VERSION_3x_WITH_ROLLBACK)); + Assertions.assertEquals(ClientVersion.CLIENT_VERSION_2X, stateMachineUnderTest.getCurrentClientVersion()); + verify(mockInitializer).initializeClientVersionFor2x(eq(ClientVersion.CLIENT_VERSION_3X_WITH_ROLLBACK)); } private void initiateAndTestRollForward(final Runnable rollforwardMonitorRunnable) throws Exception { final MigrationState state = new MigrationState(MIGRATION_HASH_KEY, WORKER_ID) - .update(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2x, WORKER_ID); + .update(ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2X, WORKER_ID); when(mockCoordinatorStateDAO.getCoordinatorState(MIGRATION_HASH_KEY)).thenReturn(state); reset(mockMigrationStateMachineThreadPool); reset(mockInitializer); @@ -297,20 +297,20 @@ public class MigrationStateMachineTest { rollforwardMonitorRunnable.run(); log.info("TestLog ----------- roll-forward done -------------"); Assertions.assertEquals( - ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2x, stateMachineUnderTest.getCurrentClientVersion()); - verify(mockInitializer).initializeClientVersionForUpgradeFrom2x(eq(ClientVersion.CLIENT_VERSION_2x)); + ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2X, stateMachineUnderTest.getCurrentClientVersion()); + verify(mockInitializer).initializeClientVersionForUpgradeFrom2x(eq(ClientVersion.CLIENT_VERSION_2X)); } private void initiateAndTestSuccessfulUpgrade(final Runnable successfulUpgradeMonitor) throws Exception { final MigrationState state = - new MigrationState(MIGRATION_HASH_KEY, WORKER_ID).update(ClientVersion.CLIENT_VERSION_3x, WORKER_ID); + new MigrationState(MIGRATION_HASH_KEY, WORKER_ID).update(ClientVersion.CLIENT_VERSION_3X, WORKER_ID); when(mockCoordinatorStateDAO.getCoordinatorState(MIGRATION_HASH_KEY)).thenReturn(state); reset(mockMigrationStateMachineThreadPool); reset(mockInitializer); log.info("TestLog ----------- Initiate successful upgrade -------------"); successfulUpgradeMonitor.run(); log.info("TestLog ----------- successful upgrade done -------------"); - Assertions.assertEquals(ClientVersion.CLIENT_VERSION_3x, stateMachineUnderTest.getCurrentClientVersion()); - verify(mockInitializer).initializeClientVersionFor3x(ClientVersion.CLIENT_VERSION_3x_WITH_ROLLBACK); + Assertions.assertEquals(ClientVersion.CLIENT_VERSION_3X, stateMachineUnderTest.getCurrentClientVersion()); + verify(mockInitializer).initializeClientVersionFor3x(ClientVersion.CLIENT_VERSION_3X_WITH_ROLLBACK); } }