diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index f96974b8..c6a7d815 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -256,6 +256,7 @@ public class KinesisClientLibConfiguration { private Arn streamArn; private String kinesisEndpoint; private String dynamoDBEndpoint; + private String cloudWatchEndpoint; private InitialPositionInStream initialPositionInStream; private AWSCredentialsProvider kinesisCredentialsProvider; private AWSCredentialsProvider dynamoDBCredentialsProvider; @@ -296,6 +297,9 @@ public class KinesisClientLibConfiguration { private long leasesRecoveryAuditorExecutionFrequencyMillis; private int leasesRecoveryAuditorInconsistencyConfidenceThreshold; + @Getter + private boolean isCBORProtocolDisabled; + @Getter private Optional timeoutInSeconds = Optional.empty(); @@ -954,6 +958,13 @@ public class KinesisClientLibConfiguration { return dynamoDBEndpoint; } + /** + * @return CloudWatch endpoint + */ + public String getCloudWatchEndpoint() { + return cloudWatchEndpoint; + } + /** * @return the initialPositionInStream */ @@ -1202,6 +1213,15 @@ public class KinesisClientLibConfiguration { return this; } + /** + * @param isCBORProtocolDisabled is CBOR protocol disabled + * @return KinesisClientLibConfiguration + */ + public KinesisClientLibConfiguration withIsCBORProtocolDisabled(boolean isCBORProtocolDisabled) { + this.isCBORProtocolDisabled = isCBORProtocolDisabled; + return this; + } + /** * @param dynamoDBEndpoint DynamoDB endpoint * @return KinesisClientLibConfiguration @@ -1211,6 +1231,15 @@ public class KinesisClientLibConfiguration { return this; } + /** + * @param cloudWatchEndpoint CloudWatch endpoint + * @return KinesisClientLibConfiguration + */ + public KinesisClientLibConfiguration withCloudWatchEndpoint(String cloudWatchEndpoint) { + this.cloudWatchEndpoint = cloudWatchEndpoint; + return this; + } + /** * @param initialPositionInStream One of LATEST or TRIM_HORIZON. The Amazon Kinesis Client Library * will start fetching records from this position when the application starts up if there are no checkpoints. diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 150841c9..876fc0f4 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -35,6 +35,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Consumer; +import com.amazonaws.SDKGlobalConfiguration; import com.amazonaws.services.kinesis.leases.exceptions.DependencyException; import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException; @@ -1448,6 +1449,11 @@ public class Worker implements Runnable { if (execService == null) { execService = getExecutorService(); } + + if (config.isCBORProtocolDisabled()) { + System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, "true"); + } + if (kinesisClient == null) { kinesisClient = createClient(AmazonKinesisClientBuilder.standard(), config.getKinesisCredentialsProvider(), @@ -1466,7 +1472,7 @@ public class Worker implements Runnable { cloudWatchClient = createClient(AmazonCloudWatchClientBuilder.standard(), config.getCloudWatchCredentialsProvider(), config.getCloudWatchClientConfiguration(), - null, + config.getCloudWatchEndpoint(), config.getRegionName()); } // If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB. diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index 75d74186..055f42c5 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -1844,7 +1844,8 @@ public class WorkerTest { KinesisClientLibConfiguration config = new KinesisClientLibConfiguration("TestApp", null, null, WORKER_ID) .withRegionName(Regions.US_WEST_2.getName()) .withKinesisEndpoint(endpoint) - .withDynamoDBEndpoint(endpoint); + .withDynamoDBEndpoint(endpoint) + .withCloudWatchEndpoint(endpoint); AmazonKinesis kinesisClient = spy(AmazonKinesisClientBuilder.standard().withRegion(Regions.US_WEST_2).build()); AmazonDynamoDB dynamoDBClient = spy(AmazonDynamoDBClientBuilder.standard().withRegion(Regions.US_WEST_2).build()); @@ -1943,16 +1944,14 @@ public class WorkerTest { String region = Regions.US_WEST_2.getName(); String endpointUrl = "TestEndpoint"; KinesisClientLibConfiguration config = new KinesisClientLibConfiguration("TestApp", null, null, WORKER_ID) - .withRegionName(region).withKinesisEndpoint(endpointUrl).withDynamoDBEndpoint(endpointUrl); + .withRegionName(region).withKinesisEndpoint(endpointUrl).withDynamoDBEndpoint(endpointUrl).withCloudWatchEndpoint(endpointUrl); Worker.Builder builder = spy(new Worker.Builder()); builder.recordProcessorFactory(recordProcessorFactory).config(config).build(); - verify(builder, times(2)).createClient( + verify(builder, times(3)).createClient( any(AwsClientBuilder.class), eq(null), any(ClientConfiguration.class), eq(endpointUrl), eq(region)); - verify(builder, times(1)).createClient( - any(AwsClientBuilder.class), eq(null), any(ClientConfiguration.class), eq(null), eq(region)); } private abstract class InjectableWorker extends Worker {