diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/KinesisClientUtil.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/KinesisClientUtil.java index bec0a545..ef5194e4 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/KinesisClientUtil.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/KinesisClientUtil.java @@ -15,15 +15,22 @@ package software.amazon.kinesis.common; +import software.amazon.awssdk.http.Protocol; +import software.amazon.awssdk.http.nio.netty.Http2Configuration; import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder; +import java.time.Duration; + /** * Utility to setup KinesisAsyncClient to be used with KCL. */ public class KinesisClientUtil { + private static int INITIAL_WINDOW_SIZE_BYTES = 10 * 1024 * 1024; + private static long HEALTH_CHECK_PING_PERIOD_MILLIS = 60 * 1000; + /** * Creates a client from a builder. * @@ -35,6 +42,9 @@ public class KinesisClientUtil { } public static KinesisAsyncClientBuilder adjustKinesisClientBuilder(KinesisAsyncClientBuilder builder) { - return builder.httpClientBuilder(NettyNioAsyncHttpClient.builder().maxConcurrency(Integer.MAX_VALUE)); + return builder.httpClientBuilder(NettyNioAsyncHttpClient.builder().maxConcurrency(Integer.MAX_VALUE) + .http2Configuration(Http2Configuration.builder().initialWindowSize(INITIAL_WINDOW_SIZE_BYTES) + .healthCheckPingPeriod(Duration.ofMillis(HEALTH_CHECK_PING_PERIOD_MILLIS)).build()) + .protocol(Protocol.HTTP2)); } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java index 5545fc03..99a680bf 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java @@ -52,7 +52,7 @@ import software.amazon.kinesis.retrieval.RecordsPublisher; @KinesisClientInternalApi public class ShardConsumer { - public static final int MAX_TIME_BETWEEN_REQUEST_RESPONSE = 35000; + public static final int MAX_TIME_BETWEEN_REQUEST_RESPONSE = 60 * 1000; private final RecordsPublisher recordsPublisher; private final ExecutorService executorService; private final ShardInfo shardInfo; diff --git a/pom.xml b/pom.xml index b31c1257..ce4761fa 100644 --- a/pom.xml +++ b/pom.xml @@ -33,7 +33,7 @@ - 2.10.25 + 2.10.56