Rename few classes and fields from review of multi-lang changes
This commit is contained in:
parent
0ac22c750b
commit
a27c22def9
9 changed files with 33 additions and 29 deletions
|
|
@ -22,7 +22,7 @@ import software.amazon.kinesis.coordinator.CoordinatorConfig.CoordinatorStateTab
|
||||||
|
|
||||||
@Getter
|
@Getter
|
||||||
@Setter
|
@Setter
|
||||||
public class CoordinatorStateConfigBean {
|
public class CoordinatorStateTableConfigBean {
|
||||||
|
|
||||||
interface CoordinatorStateConfigBeanDelegate {
|
interface CoordinatorStateConfigBeanDelegate {
|
||||||
String getCoordinatorStateTableName();
|
String getCoordinatorStateTableName();
|
||||||
|
|
@ -203,11 +203,13 @@ public class MultiLangDaemonConfiguration {
|
||||||
private final WorkerUtilizationAwareAssignmentConfigBean workerUtilizationAwareAssignmentConfigBean =
|
private final WorkerUtilizationAwareAssignmentConfigBean workerUtilizationAwareAssignmentConfigBean =
|
||||||
new WorkerUtilizationAwareAssignmentConfigBean();
|
new WorkerUtilizationAwareAssignmentConfigBean();
|
||||||
|
|
||||||
@Delegate(types = WorkerMetricsTableConfigBean.WorkerMetricsTableConfigBeanDelegate.class)
|
@Delegate(types = WorkerMetricStatsTableConfigBean.WorkerMetricsTableConfigBeanDelegate.class)
|
||||||
private final WorkerMetricsTableConfigBean workerMetricsTableConfigBean = new WorkerMetricsTableConfigBean();
|
private final WorkerMetricStatsTableConfigBean workerMetricStatsTableConfigBean =
|
||||||
|
new WorkerMetricStatsTableConfigBean();
|
||||||
|
|
||||||
@Delegate(types = CoordinatorStateConfigBean.CoordinatorStateConfigBeanDelegate.class)
|
@Delegate(types = CoordinatorStateTableConfigBean.CoordinatorStateConfigBeanDelegate.class)
|
||||||
private final CoordinatorStateConfigBean coordinatorStateConfigBean = new CoordinatorStateConfigBean();
|
private final CoordinatorStateTableConfigBean coordinatorStateTableConfigBean =
|
||||||
|
new CoordinatorStateTableConfigBean();
|
||||||
|
|
||||||
private boolean validateSequenceNumberBeforeCheckpointing;
|
private boolean validateSequenceNumberBeforeCheckpointing;
|
||||||
|
|
||||||
|
|
@ -413,7 +415,7 @@ public class MultiLangDaemonConfiguration {
|
||||||
|
|
||||||
private void handleCoordinatorConfig(CoordinatorConfig coordinatorConfig) {
|
private void handleCoordinatorConfig(CoordinatorConfig coordinatorConfig) {
|
||||||
ConfigurationSettableUtils.resolveFields(
|
ConfigurationSettableUtils.resolveFields(
|
||||||
this.coordinatorStateConfigBean, coordinatorConfig.coordinatorStateConfig());
|
this.coordinatorStateTableConfigBean, coordinatorConfig.coordinatorStateTableConfig());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void handleLeaseManagementConfig(LeaseManagementConfig leaseManagementConfig) {
|
private void handleLeaseManagementConfig(LeaseManagementConfig leaseManagementConfig) {
|
||||||
|
|
@ -423,7 +425,7 @@ public class MultiLangDaemonConfiguration {
|
||||||
this.workerUtilizationAwareAssignmentConfigBean,
|
this.workerUtilizationAwareAssignmentConfigBean,
|
||||||
leaseManagementConfig.workerUtilizationAwareAssignmentConfig());
|
leaseManagementConfig.workerUtilizationAwareAssignmentConfig());
|
||||||
ConfigurationSettableUtils.resolveFields(
|
ConfigurationSettableUtils.resolveFields(
|
||||||
this.workerMetricsTableConfigBean,
|
this.workerMetricStatsTableConfigBean,
|
||||||
leaseManagementConfig.workerUtilizationAwareAssignmentConfig().workerMetricsTableConfig());
|
leaseManagementConfig.workerUtilizationAwareAssignmentConfig().workerMetricsTableConfig());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -22,7 +22,7 @@ import software.amazon.kinesis.leases.LeaseManagementConfig.WorkerMetricsTableCo
|
||||||
|
|
||||||
@Getter
|
@Getter
|
||||||
@Setter
|
@Setter
|
||||||
public class WorkerMetricsTableConfigBean {
|
public class WorkerMetricStatsTableConfigBean {
|
||||||
|
|
||||||
interface WorkerMetricsTableConfigBeanDelegate {
|
interface WorkerMetricsTableConfigBeanDelegate {
|
||||||
String getWorkerMetricsTableName();
|
String getWorkerMetricsTableName();
|
||||||
|
|
@ -288,7 +288,7 @@ public class MultiLangDaemonConfigurationTest {
|
||||||
configuration.resolvedConfiguration(shardRecordProcessorFactory);
|
configuration.resolvedConfiguration(shardRecordProcessorFactory);
|
||||||
CoordinatorConfig coordinatorConfig = resolvedConfiguration.getCoordinatorConfig();
|
CoordinatorConfig coordinatorConfig = resolvedConfiguration.getCoordinatorConfig();
|
||||||
CoordinatorConfig.CoordinatorStateTableConfig coordinatorStateConfig =
|
CoordinatorConfig.CoordinatorStateTableConfig coordinatorStateConfig =
|
||||||
coordinatorConfig.coordinatorStateConfig();
|
coordinatorConfig.coordinatorStateTableConfig();
|
||||||
assertEquals(coordinatorStateConfig.tableName(), "testTable");
|
assertEquals(coordinatorStateConfig.tableName(), "testTable");
|
||||||
assertEquals(coordinatorStateConfig.billingMode(), testWorkerMetricsTableBillingMode);
|
assertEquals(coordinatorStateConfig.billingMode(), testWorkerMetricsTableBillingMode);
|
||||||
assertEquals(coordinatorStateConfig.readCapacity(), 123);
|
assertEquals(coordinatorStateConfig.readCapacity(), 123);
|
||||||
|
|
@ -298,7 +298,7 @@ public class MultiLangDaemonConfigurationTest {
|
||||||
@Test
|
@Test
|
||||||
public void testCoordinatorStateTableConfigUsesDefaults() {
|
public void testCoordinatorStateTableConfigUsesDefaults() {
|
||||||
final CoordinatorConfig.CoordinatorStateTableConfig defaultCoordinatorStateTableConfig =
|
final CoordinatorConfig.CoordinatorStateTableConfig defaultCoordinatorStateTableConfig =
|
||||||
getTestConfigsBuilder().coordinatorConfig().coordinatorStateConfig();
|
getTestConfigsBuilder().coordinatorConfig().coordinatorStateTableConfig();
|
||||||
|
|
||||||
final MultiLangDaemonConfiguration configuration = baseConfiguration();
|
final MultiLangDaemonConfiguration configuration = baseConfiguration();
|
||||||
configuration.setCoordinatorStateWriteCapacity(defaultCoordinatorStateTableConfig.writeCapacity() + 12345);
|
configuration.setCoordinatorStateWriteCapacity(defaultCoordinatorStateTableConfig.writeCapacity() + 12345);
|
||||||
|
|
@ -307,7 +307,7 @@ public class MultiLangDaemonConfigurationTest {
|
||||||
configuration.resolvedConfiguration(shardRecordProcessorFactory);
|
configuration.resolvedConfiguration(shardRecordProcessorFactory);
|
||||||
|
|
||||||
final CoordinatorConfig.CoordinatorStateTableConfig resolvedCoordinatorStateTableConfig =
|
final CoordinatorConfig.CoordinatorStateTableConfig resolvedCoordinatorStateTableConfig =
|
||||||
resolvedConfiguration.coordinatorConfig.coordinatorStateConfig();
|
resolvedConfiguration.coordinatorConfig.coordinatorStateTableConfig();
|
||||||
|
|
||||||
assertNotEquals(defaultCoordinatorStateTableConfig, resolvedCoordinatorStateTableConfig);
|
assertNotEquals(defaultCoordinatorStateTableConfig, resolvedCoordinatorStateTableConfig);
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -32,13 +32,15 @@ public class PropertiesMappingE2ETest {
|
||||||
|
|
||||||
assertEquals(
|
assertEquals(
|
||||||
"MultiLangTest-CoordinatorState-CustomName",
|
"MultiLangTest-CoordinatorState-CustomName",
|
||||||
kclV3Config.coordinatorConfig.coordinatorStateConfig().tableName());
|
kclV3Config.coordinatorConfig.coordinatorStateTableConfig().tableName());
|
||||||
assertEquals(
|
assertEquals(
|
||||||
BillingMode.PROVISIONED,
|
BillingMode.PROVISIONED,
|
||||||
kclV3Config.coordinatorConfig.coordinatorStateConfig().billingMode());
|
kclV3Config.coordinatorConfig.coordinatorStateTableConfig().billingMode());
|
||||||
assertEquals(
|
assertEquals(
|
||||||
1000, kclV3Config.coordinatorConfig.coordinatorStateConfig().readCapacity());
|
1000,
|
||||||
assertEquals(500, kclV3Config.coordinatorConfig.coordinatorStateConfig().writeCapacity());
|
kclV3Config.coordinatorConfig.coordinatorStateTableConfig().readCapacity());
|
||||||
|
assertEquals(
|
||||||
|
500, kclV3Config.coordinatorConfig.coordinatorStateTableConfig().writeCapacity());
|
||||||
|
|
||||||
assertEquals(
|
assertEquals(
|
||||||
10000L,
|
10000L,
|
||||||
|
|
@ -150,10 +152,10 @@ public class PropertiesMappingE2ETest {
|
||||||
|
|
||||||
assertEquals(
|
assertEquals(
|
||||||
"MultiLangTest-CoordinatorState",
|
"MultiLangTest-CoordinatorState",
|
||||||
kclV3Config.coordinatorConfig.coordinatorStateConfig().tableName());
|
kclV3Config.coordinatorConfig.coordinatorStateTableConfig().tableName());
|
||||||
assertEquals(
|
assertEquals(
|
||||||
BillingMode.PAY_PER_REQUEST,
|
BillingMode.PAY_PER_REQUEST,
|
||||||
kclV3Config.coordinatorConfig.coordinatorStateConfig().billingMode());
|
kclV3Config.coordinatorConfig.coordinatorStateTableConfig().billingMode());
|
||||||
|
|
||||||
assertEquals(
|
assertEquals(
|
||||||
30_000L,
|
30_000L,
|
||||||
|
|
|
||||||
|
|
@ -33,7 +33,7 @@ public class CoordinatorConfig {
|
||||||
|
|
||||||
public CoordinatorConfig(final String applicationName) {
|
public CoordinatorConfig(final String applicationName) {
|
||||||
this.applicationName = applicationName;
|
this.applicationName = applicationName;
|
||||||
this.coordinatorStateConfig = new CoordinatorStateTableConfig(applicationName);
|
this.coordinatorStateTableConfig = new CoordinatorStateTableConfig(applicationName);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -153,5 +153,5 @@ public class CoordinatorConfig {
|
||||||
* On-Demand.
|
* On-Demand.
|
||||||
*/
|
*/
|
||||||
@NonNull
|
@NonNull
|
||||||
private final CoordinatorStateTableConfig coordinatorStateConfig;
|
private final CoordinatorStateTableConfig coordinatorStateTableConfig;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -295,7 +295,7 @@ public class Scheduler implements Runnable {
|
||||||
this.leaseRefresher = this.leaseCoordinator.leaseRefresher();
|
this.leaseRefresher = this.leaseCoordinator.leaseRefresher();
|
||||||
|
|
||||||
final CoordinatorStateDAO coordinatorStateDAO = new CoordinatorStateDAO(
|
final CoordinatorStateDAO coordinatorStateDAO = new CoordinatorStateDAO(
|
||||||
leaseManagementConfig.dynamoDBClient(), coordinatorConfig().coordinatorStateConfig());
|
leaseManagementConfig.dynamoDBClient(), coordinatorConfig().coordinatorStateTableConfig());
|
||||||
this.leaseAssignmentModeProvider = new MigrationAdaptiveLeaseAssignmentModeProvider();
|
this.leaseAssignmentModeProvider = new MigrationAdaptiveLeaseAssignmentModeProvider();
|
||||||
this.migrationComponentsInitializer = createDynamicMigrationComponentsInitializer(coordinatorStateDAO);
|
this.migrationComponentsInitializer = createDynamicMigrationComponentsInitializer(coordinatorStateDAO);
|
||||||
this.migrationStateMachine = new MigrationStateMachineImpl(
|
this.migrationStateMachine = new MigrationStateMachineImpl(
|
||||||
|
|
|
||||||
|
|
@ -96,10 +96,10 @@ public class CoordinatorStateDAOTest {
|
||||||
throws ExecutionException, InterruptedException, DependencyException {
|
throws ExecutionException, InterruptedException, DependencyException {
|
||||||
/* Test setup - create class under test **/
|
/* Test setup - create class under test **/
|
||||||
final CoordinatorConfig c = new CoordinatorConfig("testPayPerUseTableCreation");
|
final CoordinatorConfig c = new CoordinatorConfig("testPayPerUseTableCreation");
|
||||||
c.coordinatorStateConfig().billingMode(BillingMode.PAY_PER_REQUEST);
|
c.coordinatorStateTableConfig().billingMode(BillingMode.PAY_PER_REQUEST);
|
||||||
|
|
||||||
final CoordinatorStateDAO doaUnderTest =
|
final CoordinatorStateDAO doaUnderTest =
|
||||||
new CoordinatorStateDAO(dynamoDbAsyncClient, c.coordinatorStateConfig());
|
new CoordinatorStateDAO(dynamoDbAsyncClient, c.coordinatorStateTableConfig());
|
||||||
|
|
||||||
/* Test step - initialize to create the table **/
|
/* Test step - initialize to create the table **/
|
||||||
doaUnderTest.initialize();
|
doaUnderTest.initialize();
|
||||||
|
|
@ -456,19 +456,19 @@ public class CoordinatorStateDAOTest {
|
||||||
final ProvisionedThroughput throughput,
|
final ProvisionedThroughput throughput,
|
||||||
final String tableName) {
|
final String tableName) {
|
||||||
final CoordinatorConfig c = new CoordinatorConfig(applicationName);
|
final CoordinatorConfig c = new CoordinatorConfig(applicationName);
|
||||||
c.coordinatorStateConfig().billingMode(mode);
|
c.coordinatorStateTableConfig().billingMode(mode);
|
||||||
if (tableName != null) {
|
if (tableName != null) {
|
||||||
c.coordinatorStateConfig().tableName(tableName);
|
c.coordinatorStateTableConfig().tableName(tableName);
|
||||||
}
|
}
|
||||||
if (mode == BillingMode.PROVISIONED) {
|
if (mode == BillingMode.PROVISIONED) {
|
||||||
c.coordinatorStateConfig()
|
c.coordinatorStateTableConfig()
|
||||||
.writeCapacity(throughput.writeCapacityUnits())
|
.writeCapacity(throughput.writeCapacityUnits())
|
||||||
.readCapacity(throughput.readCapacityUnits());
|
.readCapacity(throughput.readCapacityUnits());
|
||||||
}
|
}
|
||||||
|
|
||||||
tableNameForTest = c.coordinatorStateConfig().tableName();
|
tableNameForTest = c.coordinatorStateTableConfig().tableName();
|
||||||
|
|
||||||
return c.coordinatorStateConfig();
|
return c.coordinatorStateTableConfig();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void createCoordinatorState(final String keyValue) {
|
private void createCoordinatorState(final String keyValue) {
|
||||||
|
|
|
||||||
|
|
@ -41,8 +41,8 @@ class DynamoDBLockBasedLeaderDeciderTest {
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
void setup() throws DependencyException {
|
void setup() throws DependencyException {
|
||||||
final CoordinatorConfig c = new CoordinatorConfig("TestApplication");
|
final CoordinatorConfig c = new CoordinatorConfig("TestApplication");
|
||||||
c.coordinatorStateConfig().tableName(TEST_LOCK_TABLE_NAME);
|
c.coordinatorStateTableConfig().tableName(TEST_LOCK_TABLE_NAME);
|
||||||
final CoordinatorStateDAO dao = new CoordinatorStateDAO(dynamoDBAsyncClient, c.coordinatorStateConfig());
|
final CoordinatorStateDAO dao = new CoordinatorStateDAO(dynamoDBAsyncClient, c.coordinatorStateTableConfig());
|
||||||
dao.initialize();
|
dao.initialize();
|
||||||
IntStream.range(0, 10).sequential().forEach(index -> {
|
IntStream.range(0, 10).sequential().forEach(index -> {
|
||||||
final String workerId = getWorkerId(index);
|
final String workerId = getWorkerId(index);
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue