This commit is contained in:
Daham Positha Pathiraja 2025-03-26 11:44:17 +09:00 committed by GitHub
commit c97f2ac822
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 40 additions and 6 deletions

View file

@ -256,6 +256,7 @@ public class KinesisClientLibConfiguration {
private Arn streamArn;
private String kinesisEndpoint;
private String dynamoDBEndpoint;
private String cloudWatchEndpoint;
private InitialPositionInStream initialPositionInStream;
private AWSCredentialsProvider kinesisCredentialsProvider;
private AWSCredentialsProvider dynamoDBCredentialsProvider;
@ -296,6 +297,9 @@ public class KinesisClientLibConfiguration {
private long leasesRecoveryAuditorExecutionFrequencyMillis;
private int leasesRecoveryAuditorInconsistencyConfidenceThreshold;
@Getter
private boolean isCBORProtocolDisabled;
@Getter
private Optional<Integer> timeoutInSeconds = Optional.empty();
@ -954,6 +958,13 @@ public class KinesisClientLibConfiguration {
return dynamoDBEndpoint;
}
/**
* @return CloudWatch endpoint
*/
public String getCloudWatchEndpoint() {
return cloudWatchEndpoint;
}
/**
* @return the initialPositionInStream
*/
@ -1202,6 +1213,15 @@ public class KinesisClientLibConfiguration {
return this;
}
/**
* @param isCBORProtocolDisabled is CBOR protocol disabled
* @return KinesisClientLibConfiguration
*/
public KinesisClientLibConfiguration withIsCBORProtocolDisabled(boolean isCBORProtocolDisabled) {
this.isCBORProtocolDisabled = isCBORProtocolDisabled;
return this;
}
/**
* @param dynamoDBEndpoint DynamoDB endpoint
* @return KinesisClientLibConfiguration
@ -1211,6 +1231,15 @@ public class KinesisClientLibConfiguration {
return this;
}
/**
* @param cloudWatchEndpoint CloudWatch endpoint
* @return KinesisClientLibConfiguration
*/
public KinesisClientLibConfiguration withCloudWatchEndpoint(String cloudWatchEndpoint) {
this.cloudWatchEndpoint = cloudWatchEndpoint;
return this;
}
/**
* @param initialPositionInStream One of LATEST or TRIM_HORIZON. The Amazon Kinesis Client Library
* will start fetching records from this position when the application starts up if there are no checkpoints.

View file

@ -35,6 +35,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import com.amazonaws.SDKGlobalConfiguration;
import com.amazonaws.services.kinesis.leases.exceptions.DependencyException;
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException;
@ -1448,6 +1449,11 @@ public class Worker implements Runnable {
if (execService == null) {
execService = getExecutorService();
}
if (config.isCBORProtocolDisabled()) {
System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, "true");
}
if (kinesisClient == null) {
kinesisClient = createClient(AmazonKinesisClientBuilder.standard(),
config.getKinesisCredentialsProvider(),
@ -1466,7 +1472,7 @@ public class Worker implements Runnable {
cloudWatchClient = createClient(AmazonCloudWatchClientBuilder.standard(),
config.getCloudWatchCredentialsProvider(),
config.getCloudWatchClientConfiguration(),
null,
config.getCloudWatchEndpoint(),
config.getRegionName());
}
// If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB.

View file

@ -1844,7 +1844,8 @@ public class WorkerTest {
KinesisClientLibConfiguration config = new KinesisClientLibConfiguration("TestApp", null, null, WORKER_ID)
.withRegionName(Regions.US_WEST_2.getName())
.withKinesisEndpoint(endpoint)
.withDynamoDBEndpoint(endpoint);
.withDynamoDBEndpoint(endpoint)
.withCloudWatchEndpoint(endpoint);
AmazonKinesis kinesisClient = spy(AmazonKinesisClientBuilder.standard().withRegion(Regions.US_WEST_2).build());
AmazonDynamoDB dynamoDBClient = spy(AmazonDynamoDBClientBuilder.standard().withRegion(Regions.US_WEST_2).build());
@ -1943,16 +1944,14 @@ public class WorkerTest {
String region = Regions.US_WEST_2.getName();
String endpointUrl = "TestEndpoint";
KinesisClientLibConfiguration config = new KinesisClientLibConfiguration("TestApp", null, null, WORKER_ID)
.withRegionName(region).withKinesisEndpoint(endpointUrl).withDynamoDBEndpoint(endpointUrl);
.withRegionName(region).withKinesisEndpoint(endpointUrl).withDynamoDBEndpoint(endpointUrl).withCloudWatchEndpoint(endpointUrl);
Worker.Builder builder = spy(new Worker.Builder());
builder.recordProcessorFactory(recordProcessorFactory).config(config).build();
verify(builder, times(2)).createClient(
verify(builder, times(3)).createClient(
any(AwsClientBuilder.class), eq(null), any(ClientConfiguration.class), eq(endpointUrl), eq(region));
verify(builder, times(1)).createClient(
any(AwsClientBuilder.class), eq(null), any(ClientConfiguration.class), eq(null), eq(region));
}
private abstract class InjectableWorker extends Worker {