From bb7d59518db681d3d1b58028b2cb900404c7ab43 Mon Sep 17 00:00:00 2001 From: Daham Date: Mon, 1 Feb 2021 13:59:06 +0530 Subject: [PATCH 1/3] Add Changes for Giving the Capability of Paasing CloudWatch Endpoint URL as a KCL Configuration Parameter --- .../worker/KinesisClientLibConfiguration.java | 29 ++++++++++++++++ .../clientlibrary/lib/worker/Worker.java | 8 ++++- ...entLibLeaseCoordinatorIntegrationTest.java | 34 +++++++++---------- .../worker/ShardSyncTaskIntegrationTest.java | 30 +++++++++++++--- .../clientlibrary/lib/worker/WorkerTest.java | 9 +++-- .../leases/impl/LeaseIntegrationTest.java | 20 ++++++----- 6 files changed, 92 insertions(+), 38 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index 643b45d0..a46722ee 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -240,6 +240,7 @@ public class KinesisClientLibConfiguration { private String streamName; private String kinesisEndpoint; private String dynamoDBEndpoint; + private String cloudWatchEndpoint; private InitialPositionInStream initialPositionInStream; private AWSCredentialsProvider kinesisCredentialsProvider; private AWSCredentialsProvider dynamoDBCredentialsProvider; @@ -280,6 +281,9 @@ public class KinesisClientLibConfiguration { private long leasesRecoveryAuditorExecutionFrequencyMillis; private int leasesRecoveryAuditorInconsistencyConfidenceThreshold; + @Getter + private boolean isCBORProtocolDisabled; + @Getter private Optional timeoutInSeconds = Optional.empty(); @@ -847,6 +851,13 @@ public class KinesisClientLibConfiguration { return dynamoDBEndpoint; } + /** + * @return CloudWatch endpoint + */ + public String getCloudWatchEndpoint() { + return cloudWatchEndpoint; + } + /** * @return the initialPositionInStream */ @@ -1076,6 +1087,15 @@ public class KinesisClientLibConfiguration { return this; } + /** + * @param isCBORProtocolDisabled is CBOR protocol disabled + * @return KinesisClientLibConfiguration + */ + public KinesisClientLibConfiguration withIsCBORProtocolDisabled(boolean isCBORProtocolDisabled) { + this.isCBORProtocolDisabled = isCBORProtocolDisabled; + return this; + } + /** * @param dynamoDBEndpoint DynamoDB endpoint * @return KinesisClientLibConfiguration @@ -1085,6 +1105,15 @@ public class KinesisClientLibConfiguration { return this; } + /** + * @param cloudWatchEndpoint CloudWatch endpoint + * @return KinesisClientLibConfiguration + */ + public KinesisClientLibConfiguration withCloudWatchEndpoint(String cloudWatchEndpoint) { + this.cloudWatchEndpoint = cloudWatchEndpoint; + return this; + } + /** * @param initialPositionInStream One of LATEST or TRIM_HORIZON. The Amazon Kinesis Client Library * will start fetching records from this position when the application starts up if there are no checkpoints. diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index f8c66181..35bfc19f 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -35,6 +35,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Consumer; +import com.amazonaws.SDKGlobalConfiguration; import com.amazonaws.services.kinesis.leases.exceptions.DependencyException; import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException; @@ -1424,6 +1425,11 @@ public class Worker implements Runnable { if (execService == null) { execService = getExecutorService(); } + + if (config.isCBORProtocolDisabled()) { + System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, "true"); + } + if (kinesisClient == null) { kinesisClient = createClient(AmazonKinesisClientBuilder.standard(), config.getKinesisCredentialsProvider(), @@ -1442,7 +1448,7 @@ public class Worker implements Runnable { cloudWatchClient = createClient(AmazonCloudWatchClientBuilder.standard(), config.getCloudWatchCredentialsProvider(), config.getCloudWatchClientConfiguration(), - null, + config.getCloudWatchEndpoint(), config.getRegionName()); } // If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB. diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinatorIntegrationTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinatorIntegrationTest.java index aedc627d..2bbe84e3 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinatorIntegrationTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinatorIntegrationTest.java @@ -14,33 +14,28 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.Callable; - -import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; -import com.amazonaws.services.kinesis.leases.impl.GenericLeaseSelector; -import com.amazonaws.services.kinesis.leases.interfaces.LeaseSelector; -import junit.framework.Assert; - -import org.junit.Before; -import org.junit.Test; - - +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.client.builder.AwsClientBuilder; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; import com.amazonaws.services.kinesis.leases.exceptions.DependencyException; import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.leases.exceptions.LeasingException; import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException; +import com.amazonaws.services.kinesis.leases.impl.GenericLeaseSelector; import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; import com.amazonaws.services.kinesis.leases.impl.KinesisClientLeaseManager; import com.amazonaws.services.kinesis.leases.impl.Lease; import com.amazonaws.services.kinesis.leases.interfaces.ILeaseRenewer; +import com.amazonaws.services.kinesis.leases.interfaces.LeaseSelector; +import junit.framework.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.*; +import java.util.concurrent.Callable; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; @@ -59,7 +54,10 @@ public class KinesisClientLibLeaseCoordinatorIntegrationTest { final boolean useConsistentReads = true; LeaseSelector leaseSelector = new GenericLeaseSelector<>(); if (leaseManager == null) { - AmazonDynamoDBClient ddb = new AmazonDynamoDBClient(new DefaultAWSCredentialsProviderChain()); + AmazonDynamoDBClient ddb = (AmazonDynamoDBClient) AmazonDynamoDBClientBuilder.standard() + .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration("http://localhost:4566","us-east-1")) + .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("accessKey", "secretKey"))) + .build(); leaseManager = new KinesisClientLeaseManager(TABLE_NAME, ddb, useConsistentReads, KinesisClientLibConfiguration.DEFAULT_DDB_BILLING_MODE); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java index a4afa053..5020f83f 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java @@ -18,9 +18,15 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; import com.amazonaws.services.dynamodbv2.model.BillingMode; import com.amazonaws.services.dynamodbv2.model.DescribeTableResult; import com.amazonaws.services.dynamodbv2.model.ListTablesResult; +import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder; +import com.amazonaws.services.kinesis.model.CreateStreamRequest; import org.joda.time.DateTime; import org.junit.AfterClass; import org.junit.Assert; @@ -51,7 +57,7 @@ import static junit.framework.TestCase.fail; public class ShardSyncTaskIntegrationTest { private static final String STREAM_NAME = "IntegrationTestStream02"; - private static final String KINESIS_ENDPOINT = "https://kinesis.us-east-1.amazonaws.com"; + private static final String KINESIS_ENDPOINT = "http://localhost:4566"; private static AWSCredentialsProvider credentialsProvider; private IKinesisClientLeaseManager leaseManager; @@ -63,8 +69,13 @@ public class ShardSyncTaskIntegrationTest { */ @BeforeClass public static void setUpBeforeClass() throws Exception { + System.setProperty("com.amazonaws.sdk.disableCbor", "true"); + credentialsProvider = new DefaultAWSCredentialsProviderChain(); - AmazonKinesis kinesis = new AmazonKinesisClient(credentialsProvider); + AmazonKinesis kinesis = AmazonKinesisClientBuilder.standard() + .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration("http://localhost:4566","us-east-1")) + .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("accessKey", "secretKey"))) + .build(); try { kinesis.createStream(STREAM_NAME, 1); @@ -92,7 +103,10 @@ public class ShardSyncTaskIntegrationTest { boolean useConsistentReads = true; leaseManager = new KinesisClientLeaseManager(tableName, - new AmazonDynamoDBClient(credentialsProvider), + AmazonDynamoDBClientBuilder.standard() + .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration("http://localhost:4566", "us-east-1")) + .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("accessKey", "secretKey"))) + .build(), useConsistentReads, billingMode); @@ -140,7 +154,10 @@ public class ShardSyncTaskIntegrationTest { } private void cleanUpTable(String tableName) throws DependencyException { - AmazonDynamoDBClient client = new AmazonDynamoDBClient(DefaultAWSCredentialsProviderChain.getInstance()); + AmazonDynamoDBClient client = (AmazonDynamoDBClient) AmazonDynamoDBClientBuilder.standard() + .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration("http://localhost:4566","us-east-1")) + .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("accessKey", "secretKey"))) + .build(); ListTablesResult tables = client.listTables(); if(tables.getTableNames().contains(tableName)){ leaseManager.waitUntilLeaseTableExists(2,20); @@ -160,7 +177,10 @@ public class ShardSyncTaskIntegrationTest { } private void checkBillingMode(BillingMode billingMode, String tableName) { - AmazonDynamoDBClient client = new AmazonDynamoDBClient(DefaultAWSCredentialsProviderChain.getInstance()); + AmazonDynamoDBClient client = (AmazonDynamoDBClient) AmazonDynamoDBClientBuilder.standard() + .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration("http://localhost:4566","us-east-1")) + .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("accessKey", "secretKey"))) + .build(); DescribeTableResult tableDetails = client.describeTable(tableName); if(BillingMode.PAY_PER_REQUEST.equals(billingMode)) { Assert.assertEquals(tableDetails.getTable().getBillingModeSummary().getBillingMode(), billingMode.name()); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index a7ca1151..8a55d860 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -1840,7 +1840,8 @@ public class WorkerTest { KinesisClientLibConfiguration config = new KinesisClientLibConfiguration("TestApp", null, null, WORKER_ID) .withRegionName(Regions.US_WEST_2.getName()) .withKinesisEndpoint(endpoint) - .withDynamoDBEndpoint(endpoint); + .withDynamoDBEndpoint(endpoint) + .withCloudWatchEndpoint(endpoint); AmazonKinesis kinesisClient = spy(AmazonKinesisClientBuilder.standard().withRegion(Regions.US_WEST_2).build()); AmazonDynamoDB dynamoDBClient = spy(AmazonDynamoDBClientBuilder.standard().withRegion(Regions.US_WEST_2).build()); @@ -1939,16 +1940,14 @@ public class WorkerTest { String region = Regions.US_WEST_2.getName(); String endpointUrl = "TestEndpoint"; KinesisClientLibConfiguration config = new KinesisClientLibConfiguration("TestApp", null, null, WORKER_ID) - .withRegionName(region).withKinesisEndpoint(endpointUrl).withDynamoDBEndpoint(endpointUrl); + .withRegionName(region).withKinesisEndpoint(endpointUrl).withDynamoDBEndpoint(endpointUrl).withCloudWatchEndpoint(endpointUrl); Worker.Builder builder = spy(new Worker.Builder()); builder.recordProcessorFactory(recordProcessorFactory).config(config).build(); - verify(builder, times(2)).createClient( + verify(builder, times(3)).createClient( any(AwsClientBuilder.class), eq(null), any(ClientConfiguration.class), eq(endpointUrl), eq(region)); - verify(builder, times(1)).createClient( - any(AwsClientBuilder.class), eq(null), any(ClientConfiguration.class), eq(null), eq(region)); } private abstract class InjectableWorker extends Worker { diff --git a/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseIntegrationTest.java b/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseIntegrationTest.java index 091de651..1af0d42f 100644 --- a/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseIntegrationTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseIntegrationTest.java @@ -14,10 +14,14 @@ */ package com.amazonaws.services.kinesis.leases.impl; -import java.util.logging.Logger; - +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; -import com.amazonaws.services.kinesis.leases.exceptions.LeasingException; +import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper; +import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.junit.Ignore; @@ -25,17 +29,15 @@ import org.junit.Rule; import org.junit.rules.TestWatcher; import org.junit.runner.Description; -import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient; -import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper; -import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory; - @Ignore public class LeaseIntegrationTest { protected static KinesisClientLeaseManager leaseManager; protected static AmazonDynamoDBClient ddbClient = - new AmazonDynamoDBClient(new DefaultAWSCredentialsProviderChain()); + (AmazonDynamoDBClient) AmazonDynamoDBClientBuilder.standard() + .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration("http://localhost:4566","us-east-1")) + .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("accessKey", "secretKey"))) + .build(); private static final Log LOG = LogFactory.getLog(LeaseIntegrationTest.class); From ea54fdd6b905cd07ae6b8733ee073fb734aaa847 Mon Sep 17 00:00:00 2001 From: Gayan Weerakutti Date: Mon, 1 Feb 2021 15:16:45 +0530 Subject: [PATCH 2/3] Revert changes made to test classes --- ...entLibLeaseCoordinatorIntegrationTest.java | 34 ++++++++++--------- .../leases/impl/LeaseIntegrationTest.java | 20 +++++------ 2 files changed, 27 insertions(+), 27 deletions(-) diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinatorIntegrationTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinatorIntegrationTest.java index 2bbe84e3..aedc627d 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinatorIntegrationTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinatorIntegrationTest.java @@ -14,28 +14,33 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; -import com.amazonaws.auth.AWSStaticCredentialsProvider; -import com.amazonaws.auth.BasicAWSCredentials; -import com.amazonaws.client.builder.AwsClientBuilder; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Callable; + +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; +import com.amazonaws.services.kinesis.leases.impl.GenericLeaseSelector; +import com.amazonaws.services.kinesis.leases.interfaces.LeaseSelector; +import junit.framework.Assert; + +import org.junit.Before; +import org.junit.Test; + + import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; import com.amazonaws.services.kinesis.leases.exceptions.DependencyException; import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.leases.exceptions.LeasingException; import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException; -import com.amazonaws.services.kinesis.leases.impl.GenericLeaseSelector; import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; import com.amazonaws.services.kinesis.leases.impl.KinesisClientLeaseManager; import com.amazonaws.services.kinesis.leases.impl.Lease; import com.amazonaws.services.kinesis.leases.interfaces.ILeaseRenewer; -import com.amazonaws.services.kinesis.leases.interfaces.LeaseSelector; -import junit.framework.Assert; -import org.junit.Before; -import org.junit.Test; - -import java.util.*; -import java.util.concurrent.Callable; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; @@ -54,10 +59,7 @@ public class KinesisClientLibLeaseCoordinatorIntegrationTest { final boolean useConsistentReads = true; LeaseSelector leaseSelector = new GenericLeaseSelector<>(); if (leaseManager == null) { - AmazonDynamoDBClient ddb = (AmazonDynamoDBClient) AmazonDynamoDBClientBuilder.standard() - .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration("http://localhost:4566","us-east-1")) - .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("accessKey", "secretKey"))) - .build(); + AmazonDynamoDBClient ddb = new AmazonDynamoDBClient(new DefaultAWSCredentialsProviderChain()); leaseManager = new KinesisClientLeaseManager(TABLE_NAME, ddb, useConsistentReads, KinesisClientLibConfiguration.DEFAULT_DDB_BILLING_MODE); diff --git a/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseIntegrationTest.java b/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseIntegrationTest.java index 1af0d42f..091de651 100644 --- a/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseIntegrationTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseIntegrationTest.java @@ -14,14 +14,10 @@ */ package com.amazonaws.services.kinesis.leases.impl; -import com.amazonaws.auth.AWSStaticCredentialsProvider; -import com.amazonaws.auth.BasicAWSCredentials; -import com.amazonaws.client.builder.AwsClientBuilder; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; +import java.util.logging.Logger; + import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; -import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper; -import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory; +import com.amazonaws.services.kinesis.leases.exceptions.LeasingException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.junit.Ignore; @@ -29,15 +25,17 @@ import org.junit.Rule; import org.junit.rules.TestWatcher; import org.junit.runner.Description; +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient; +import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper; +import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory; + @Ignore public class LeaseIntegrationTest { protected static KinesisClientLeaseManager leaseManager; protected static AmazonDynamoDBClient ddbClient = - (AmazonDynamoDBClient) AmazonDynamoDBClientBuilder.standard() - .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration("http://localhost:4566","us-east-1")) - .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("accessKey", "secretKey"))) - .build(); + new AmazonDynamoDBClient(new DefaultAWSCredentialsProviderChain()); private static final Log LOG = LogFactory.getLog(LeaseIntegrationTest.class); From d1bc673a9607f25dc2c33d755cafcd34872d938a Mon Sep 17 00:00:00 2001 From: Gayan Weerakutti Date: Mon, 1 Feb 2021 16:00:43 +0530 Subject: [PATCH 3/3] Revert changes made to ShardSyncTaskIntegrationTest --- .../worker/ShardSyncTaskIntegrationTest.java | 30 ++++--------------- 1 file changed, 5 insertions(+), 25 deletions(-) diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java index 5020f83f..a4afa053 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java @@ -18,15 +18,9 @@ import java.util.HashSet; import java.util.List; import java.util.Set; -import com.amazonaws.auth.AWSStaticCredentialsProvider; -import com.amazonaws.auth.BasicAWSCredentials; -import com.amazonaws.client.builder.AwsClientBuilder; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; import com.amazonaws.services.dynamodbv2.model.BillingMode; import com.amazonaws.services.dynamodbv2.model.DescribeTableResult; import com.amazonaws.services.dynamodbv2.model.ListTablesResult; -import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder; -import com.amazonaws.services.kinesis.model.CreateStreamRequest; import org.joda.time.DateTime; import org.junit.AfterClass; import org.junit.Assert; @@ -57,7 +51,7 @@ import static junit.framework.TestCase.fail; public class ShardSyncTaskIntegrationTest { private static final String STREAM_NAME = "IntegrationTestStream02"; - private static final String KINESIS_ENDPOINT = "http://localhost:4566"; + private static final String KINESIS_ENDPOINT = "https://kinesis.us-east-1.amazonaws.com"; private static AWSCredentialsProvider credentialsProvider; private IKinesisClientLeaseManager leaseManager; @@ -69,13 +63,8 @@ public class ShardSyncTaskIntegrationTest { */ @BeforeClass public static void setUpBeforeClass() throws Exception { - System.setProperty("com.amazonaws.sdk.disableCbor", "true"); - credentialsProvider = new DefaultAWSCredentialsProviderChain(); - AmazonKinesis kinesis = AmazonKinesisClientBuilder.standard() - .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration("http://localhost:4566","us-east-1")) - .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("accessKey", "secretKey"))) - .build(); + AmazonKinesis kinesis = new AmazonKinesisClient(credentialsProvider); try { kinesis.createStream(STREAM_NAME, 1); @@ -103,10 +92,7 @@ public class ShardSyncTaskIntegrationTest { boolean useConsistentReads = true; leaseManager = new KinesisClientLeaseManager(tableName, - AmazonDynamoDBClientBuilder.standard() - .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration("http://localhost:4566", "us-east-1")) - .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("accessKey", "secretKey"))) - .build(), + new AmazonDynamoDBClient(credentialsProvider), useConsistentReads, billingMode); @@ -154,10 +140,7 @@ public class ShardSyncTaskIntegrationTest { } private void cleanUpTable(String tableName) throws DependencyException { - AmazonDynamoDBClient client = (AmazonDynamoDBClient) AmazonDynamoDBClientBuilder.standard() - .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration("http://localhost:4566","us-east-1")) - .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("accessKey", "secretKey"))) - .build(); + AmazonDynamoDBClient client = new AmazonDynamoDBClient(DefaultAWSCredentialsProviderChain.getInstance()); ListTablesResult tables = client.listTables(); if(tables.getTableNames().contains(tableName)){ leaseManager.waitUntilLeaseTableExists(2,20); @@ -177,10 +160,7 @@ public class ShardSyncTaskIntegrationTest { } private void checkBillingMode(BillingMode billingMode, String tableName) { - AmazonDynamoDBClient client = (AmazonDynamoDBClient) AmazonDynamoDBClientBuilder.standard() - .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration("http://localhost:4566","us-east-1")) - .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("accessKey", "secretKey"))) - .build(); + AmazonDynamoDBClient client = new AmazonDynamoDBClient(DefaultAWSCredentialsProviderChain.getInstance()); DescribeTableResult tableDetails = client.describeTable(tableName); if(BillingMode.PAY_PER_REQUEST.equals(billingMode)) { Assert.assertEquals(tableDetails.getTable().getBillingModeSummary().getBillingMode(), billingMode.name());