Add Changes for Giving the Capability of Paasing CloudWatch Endpoint URL as a KCL Configuration Parameter

This commit is contained in:
Daham 2021-02-01 13:59:06 +05:30
parent 6fbfc21ad7
commit bb7d59518d
6 changed files with 92 additions and 38 deletions

View file

@ -240,6 +240,7 @@ public class KinesisClientLibConfiguration {
private String streamName;
private String kinesisEndpoint;
private String dynamoDBEndpoint;
private String cloudWatchEndpoint;
private InitialPositionInStream initialPositionInStream;
private AWSCredentialsProvider kinesisCredentialsProvider;
private AWSCredentialsProvider dynamoDBCredentialsProvider;
@ -280,6 +281,9 @@ public class KinesisClientLibConfiguration {
private long leasesRecoveryAuditorExecutionFrequencyMillis;
private int leasesRecoveryAuditorInconsistencyConfidenceThreshold;
@Getter
private boolean isCBORProtocolDisabled;
@Getter
private Optional<Integer> timeoutInSeconds = Optional.empty();
@ -847,6 +851,13 @@ public class KinesisClientLibConfiguration {
return dynamoDBEndpoint;
}
/**
* @return CloudWatch endpoint
*/
public String getCloudWatchEndpoint() {
return cloudWatchEndpoint;
}
/**
* @return the initialPositionInStream
*/
@ -1076,6 +1087,15 @@ public class KinesisClientLibConfiguration {
return this;
}
/**
* @param isCBORProtocolDisabled is CBOR protocol disabled
* @return KinesisClientLibConfiguration
*/
public KinesisClientLibConfiguration withIsCBORProtocolDisabled(boolean isCBORProtocolDisabled) {
this.isCBORProtocolDisabled = isCBORProtocolDisabled;
return this;
}
/**
* @param dynamoDBEndpoint DynamoDB endpoint
* @return KinesisClientLibConfiguration
@ -1085,6 +1105,15 @@ public class KinesisClientLibConfiguration {
return this;
}
/**
* @param cloudWatchEndpoint CloudWatch endpoint
* @return KinesisClientLibConfiguration
*/
public KinesisClientLibConfiguration withCloudWatchEndpoint(String cloudWatchEndpoint) {
this.cloudWatchEndpoint = cloudWatchEndpoint;
return this;
}
/**
* @param initialPositionInStream One of LATEST or TRIM_HORIZON. The Amazon Kinesis Client Library
* will start fetching records from this position when the application starts up if there are no checkpoints.

View file

@ -35,6 +35,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import com.amazonaws.SDKGlobalConfiguration;
import com.amazonaws.services.kinesis.leases.exceptions.DependencyException;
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException;
@ -1424,6 +1425,11 @@ public class Worker implements Runnable {
if (execService == null) {
execService = getExecutorService();
}
if (config.isCBORProtocolDisabled()) {
System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, "true");
}
if (kinesisClient == null) {
kinesisClient = createClient(AmazonKinesisClientBuilder.standard(),
config.getKinesisCredentialsProvider(),
@ -1442,7 +1448,7 @@ public class Worker implements Runnable {
cloudWatchClient = createClient(AmazonCloudWatchClientBuilder.standard(),
config.getCloudWatchCredentialsProvider(),
config.getCloudWatchClientConfiguration(),
null,
config.getCloudWatchEndpoint(),
config.getRegionName());
}
// If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB.

View file

@ -14,33 +14,28 @@
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.services.kinesis.leases.impl.GenericLeaseSelector;
import com.amazonaws.services.kinesis.leases.interfaces.LeaseSelector;
import junit.framework.Assert;
import org.junit.Before;
import org.junit.Test;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.leases.exceptions.DependencyException;
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.leases.exceptions.LeasingException;
import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException;
import com.amazonaws.services.kinesis.leases.impl.GenericLeaseSelector;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLeaseManager;
import com.amazonaws.services.kinesis.leases.impl.Lease;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseRenewer;
import com.amazonaws.services.kinesis.leases.interfaces.LeaseSelector;
import junit.framework.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.*;
import java.util.concurrent.Callable;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
@ -59,7 +54,10 @@ public class KinesisClientLibLeaseCoordinatorIntegrationTest {
final boolean useConsistentReads = true;
LeaseSelector<KinesisClientLease> leaseSelector = new GenericLeaseSelector<>();
if (leaseManager == null) {
AmazonDynamoDBClient ddb = new AmazonDynamoDBClient(new DefaultAWSCredentialsProviderChain());
AmazonDynamoDBClient ddb = (AmazonDynamoDBClient) AmazonDynamoDBClientBuilder.standard()
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration("http://localhost:4566","us-east-1"))
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("accessKey", "secretKey")))
.build();
leaseManager =
new KinesisClientLeaseManager(TABLE_NAME, ddb, useConsistentReads,
KinesisClientLibConfiguration.DEFAULT_DDB_BILLING_MODE);

View file

@ -18,9 +18,15 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
import com.amazonaws.services.dynamodbv2.model.BillingMode;
import com.amazonaws.services.dynamodbv2.model.DescribeTableResult;
import com.amazonaws.services.dynamodbv2.model.ListTablesResult;
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
import com.amazonaws.services.kinesis.model.CreateStreamRequest;
import org.joda.time.DateTime;
import org.junit.AfterClass;
import org.junit.Assert;
@ -51,7 +57,7 @@ import static junit.framework.TestCase.fail;
public class ShardSyncTaskIntegrationTest {
private static final String STREAM_NAME = "IntegrationTestStream02";
private static final String KINESIS_ENDPOINT = "https://kinesis.us-east-1.amazonaws.com";
private static final String KINESIS_ENDPOINT = "http://localhost:4566";
private static AWSCredentialsProvider credentialsProvider;
private IKinesisClientLeaseManager leaseManager;
@ -63,8 +69,13 @@ public class ShardSyncTaskIntegrationTest {
*/
@BeforeClass
public static void setUpBeforeClass() throws Exception {
System.setProperty("com.amazonaws.sdk.disableCbor", "true");
credentialsProvider = new DefaultAWSCredentialsProviderChain();
AmazonKinesis kinesis = new AmazonKinesisClient(credentialsProvider);
AmazonKinesis kinesis = AmazonKinesisClientBuilder.standard()
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration("http://localhost:4566","us-east-1"))
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("accessKey", "secretKey")))
.build();
try {
kinesis.createStream(STREAM_NAME, 1);
@ -92,7 +103,10 @@ public class ShardSyncTaskIntegrationTest {
boolean useConsistentReads = true;
leaseManager =
new KinesisClientLeaseManager(tableName,
new AmazonDynamoDBClient(credentialsProvider),
AmazonDynamoDBClientBuilder.standard()
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration("http://localhost:4566", "us-east-1"))
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("accessKey", "secretKey")))
.build(),
useConsistentReads,
billingMode);
@ -140,7 +154,10 @@ public class ShardSyncTaskIntegrationTest {
}
private void cleanUpTable(String tableName) throws DependencyException {
AmazonDynamoDBClient client = new AmazonDynamoDBClient(DefaultAWSCredentialsProviderChain.getInstance());
AmazonDynamoDBClient client = (AmazonDynamoDBClient) AmazonDynamoDBClientBuilder.standard()
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration("http://localhost:4566","us-east-1"))
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("accessKey", "secretKey")))
.build();
ListTablesResult tables = client.listTables();
if(tables.getTableNames().contains(tableName)){
leaseManager.waitUntilLeaseTableExists(2,20);
@ -160,7 +177,10 @@ public class ShardSyncTaskIntegrationTest {
}
private void checkBillingMode(BillingMode billingMode, String tableName) {
AmazonDynamoDBClient client = new AmazonDynamoDBClient(DefaultAWSCredentialsProviderChain.getInstance());
AmazonDynamoDBClient client = (AmazonDynamoDBClient) AmazonDynamoDBClientBuilder.standard()
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration("http://localhost:4566","us-east-1"))
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("accessKey", "secretKey")))
.build();
DescribeTableResult tableDetails = client.describeTable(tableName);
if(BillingMode.PAY_PER_REQUEST.equals(billingMode)) {
Assert.assertEquals(tableDetails.getTable().getBillingModeSummary().getBillingMode(), billingMode.name());

View file

@ -1840,7 +1840,8 @@ public class WorkerTest {
KinesisClientLibConfiguration config = new KinesisClientLibConfiguration("TestApp", null, null, WORKER_ID)
.withRegionName(Regions.US_WEST_2.getName())
.withKinesisEndpoint(endpoint)
.withDynamoDBEndpoint(endpoint);
.withDynamoDBEndpoint(endpoint)
.withCloudWatchEndpoint(endpoint);
AmazonKinesis kinesisClient = spy(AmazonKinesisClientBuilder.standard().withRegion(Regions.US_WEST_2).build());
AmazonDynamoDB dynamoDBClient = spy(AmazonDynamoDBClientBuilder.standard().withRegion(Regions.US_WEST_2).build());
@ -1939,16 +1940,14 @@ public class WorkerTest {
String region = Regions.US_WEST_2.getName();
String endpointUrl = "TestEndpoint";
KinesisClientLibConfiguration config = new KinesisClientLibConfiguration("TestApp", null, null, WORKER_ID)
.withRegionName(region).withKinesisEndpoint(endpointUrl).withDynamoDBEndpoint(endpointUrl);
.withRegionName(region).withKinesisEndpoint(endpointUrl).withDynamoDBEndpoint(endpointUrl).withCloudWatchEndpoint(endpointUrl);
Worker.Builder builder = spy(new Worker.Builder());
builder.recordProcessorFactory(recordProcessorFactory).config(config).build();
verify(builder, times(2)).createClient(
verify(builder, times(3)).createClient(
any(AwsClientBuilder.class), eq(null), any(ClientConfiguration.class), eq(endpointUrl), eq(region));
verify(builder, times(1)).createClient(
any(AwsClientBuilder.class), eq(null), any(ClientConfiguration.class), eq(null), eq(region));
}
private abstract class InjectableWorker extends Worker {

View file

@ -14,10 +14,14 @@
*/
package com.amazonaws.services.kinesis.leases.impl;
import java.util.logging.Logger;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.leases.exceptions.LeasingException;
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.Ignore;
@ -25,17 +29,15 @@ import org.junit.Rule;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory;
@Ignore
public class LeaseIntegrationTest {
protected static KinesisClientLeaseManager leaseManager;
protected static AmazonDynamoDBClient ddbClient =
new AmazonDynamoDBClient(new DefaultAWSCredentialsProviderChain());
(AmazonDynamoDBClient) AmazonDynamoDBClientBuilder.standard()
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration("http://localhost:4566","us-east-1"))
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("accessKey", "secretKey")))
.build();
private static final Log LOG = LogFactory.getLog(LeaseIntegrationTest.class);