From 66f5204d84de099ebc37d4a5d51ed23d798751cf Mon Sep 17 00:00:00 2001
From: Joshua Kim <20001595+jushkem@users.noreply.github.com>
Date: Tue, 28 Jan 2020 11:00:51 -0800
Subject: [PATCH] Updating to new version of AWS SDK 2.10.56, changing Netty
client defaults. (#679)
* Updating AWS SDK version to 2.10.56
* Changing default netty client to use 60 second ping health check timeout and 10MB initial window size.
* Tuning default request response timeout to 60 seconds.
---
.../amazon/kinesis/common/KinesisClientUtil.java | 12 +++++++++++-
.../amazon/kinesis/lifecycle/ShardConsumer.java | 2 +-
pom.xml | 2 +-
3 files changed, 13 insertions(+), 3 deletions(-)
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/KinesisClientUtil.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/KinesisClientUtil.java
index bec0a545..ef5194e4 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/KinesisClientUtil.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/KinesisClientUtil.java
@@ -15,15 +15,22 @@
package software.amazon.kinesis.common;
+import software.amazon.awssdk.http.Protocol;
+import software.amazon.awssdk.http.nio.netty.Http2Configuration;
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder;
+import java.time.Duration;
+
/**
* Utility to setup KinesisAsyncClient to be used with KCL.
*/
public class KinesisClientUtil {
+ private static int INITIAL_WINDOW_SIZE_BYTES = 10 * 1024 * 1024;
+ private static long HEALTH_CHECK_PING_PERIOD_MILLIS = 60 * 1000;
+
/**
* Creates a client from a builder.
*
@@ -35,6 +42,9 @@ public class KinesisClientUtil {
}
public static KinesisAsyncClientBuilder adjustKinesisClientBuilder(KinesisAsyncClientBuilder builder) {
- return builder.httpClientBuilder(NettyNioAsyncHttpClient.builder().maxConcurrency(Integer.MAX_VALUE));
+ return builder.httpClientBuilder(NettyNioAsyncHttpClient.builder().maxConcurrency(Integer.MAX_VALUE)
+ .http2Configuration(Http2Configuration.builder().initialWindowSize(INITIAL_WINDOW_SIZE_BYTES)
+ .healthCheckPingPeriod(Duration.ofMillis(HEALTH_CHECK_PING_PERIOD_MILLIS)).build())
+ .protocol(Protocol.HTTP2));
}
}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java
index 5545fc03..99a680bf 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java
@@ -52,7 +52,7 @@ import software.amazon.kinesis.retrieval.RecordsPublisher;
@KinesisClientInternalApi
public class ShardConsumer {
- public static final int MAX_TIME_BETWEEN_REQUEST_RESPONSE = 35000;
+ public static final int MAX_TIME_BETWEEN_REQUEST_RESPONSE = 60 * 1000;
private final RecordsPublisher recordsPublisher;
private final ExecutorService executorService;
private final ShardInfo shardInfo;
diff --git a/pom.xml b/pom.xml
index b31c1257..ce4761fa 100644
--- a/pom.xml
+++ b/pom.xml
@@ -33,7 +33,7 @@
- 2.10.25
+ 2.10.56