Adding wait to CW PutMetric future calls (#584)

* Making CW publish calls as blocking to reduce the throttling. Disclosing the CW publish failures.

* Fixing uniut test cases and adding CW exception manager
This commit is contained in:
ashwing 2019-08-09 09:52:53 -07:00 committed by Sahil Palvia
parent a150402e9c
commit 161590c2ce
2 changed files with 43 additions and 11 deletions

View file

@ -14,14 +14,20 @@
*/ */
package software.amazon.kinesis.metrics; package software.amazon.kinesis.metrics;
import java.util.ArrayList;
import java.util.List;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.cloudwatch.model.CloudWatchException; import software.amazon.awssdk.services.cloudwatch.model.CloudWatchException;
import software.amazon.awssdk.services.cloudwatch.model.MetricDatum; import software.amazon.awssdk.services.cloudwatch.model.MetricDatum;
import software.amazon.awssdk.services.cloudwatch.model.PutMetricDataRequest; import software.amazon.awssdk.services.cloudwatch.model.PutMetricDataRequest;
import software.amazon.kinesis.retrieval.AWSExceptionManager;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
/** /**
* Publisher that contains the logic to publish metrics. * Publisher that contains the logic to publish metrics.
@ -30,12 +36,17 @@ import software.amazon.awssdk.services.cloudwatch.model.PutMetricDataRequest;
public class CloudWatchMetricsPublisher { public class CloudWatchMetricsPublisher {
// CloudWatch API has a limit of 20 MetricDatums per request // CloudWatch API has a limit of 20 MetricDatums per request
private static final int BATCH_SIZE = 20; private static final int BATCH_SIZE = 20;
private static final int PUT_TIMEOUT_MILLIS = 5000;
private static final AWSExceptionManager CW_EXCEPTION_MANAGER = new AWSExceptionManager();
static {
CW_EXCEPTION_MANAGER.add(CloudWatchException.class, t -> t);
}
private final String namespace; private final String namespace;
private final CloudWatchAsyncClient cloudWatchClient; private final CloudWatchAsyncClient cloudWatchAsyncClient;
public CloudWatchMetricsPublisher(CloudWatchAsyncClient cloudWatchClient, String namespace) { public CloudWatchMetricsPublisher(CloudWatchAsyncClient cloudWatchClient, String namespace) {
this.cloudWatchClient = cloudWatchClient; this.cloudWatchAsyncClient = cloudWatchClient;
this.namespace = namespace; this.namespace = namespace;
} }
@ -56,16 +67,28 @@ public class CloudWatchMetricsPublisher {
for (int i = startIndex; i < endIndex; i++) { for (int i = startIndex; i < endIndex; i++) {
metricData.add(dataToPublish.get(i).datum); metricData.add(dataToPublish.get(i).datum);
} }
request = request.metricData(metricData); request = request.metricData(metricData);
try { try {
cloudWatchClient.putMetricData(request.build()); PutMetricDataRequest.Builder finalRequest = request;
// This needs to be blocking. Making it asynchronous leads to increased throttling.
log.debug("Successfully published {} datums.", endIndex - startIndex); blockingExecute(cloudWatchAsyncClient.putMetricData(finalRequest.build()), PUT_TIMEOUT_MILLIS,
} catch (CloudWatchException e) { CW_EXCEPTION_MANAGER);
} catch(CloudWatchException | TimeoutException e) {
log.warn("Could not publish {} datums to CloudWatch", endIndex - startIndex, e); log.warn("Could not publish {} datums to CloudWatch", endIndex - startIndex, e);
} catch (Exception e) {
log.error("Unknown exception while publishing {} datums to CloudWatch", endIndex - startIndex, e);
} }
} }
} }
private static <T> void blockingExecute(CompletableFuture<T> future, long timeOutMillis,
AWSExceptionManager exceptionManager) throws TimeoutException {
try {
future.get(timeOutMillis, MILLISECONDS);
} catch (ExecutionException e) {
throw exceptionManager.apply(e.getCause());
} catch (InterruptedException e) {
log.info("Thread interrupted.");
}
}
} }

View file

@ -18,6 +18,7 @@ import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -31,8 +32,12 @@ import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.cloudwatch.model.MetricDatum; import software.amazon.awssdk.services.cloudwatch.model.MetricDatum;
import software.amazon.awssdk.services.cloudwatch.model.PutMetricDataRequest; import software.amazon.awssdk.services.cloudwatch.model.PutMetricDataRequest;
import software.amazon.awssdk.services.cloudwatch.model.PutMetricDataResponse;
import software.amazon.awssdk.services.cloudwatch.model.StandardUnit; import software.amazon.awssdk.services.cloudwatch.model.StandardUnit;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class) @RunWith(MockitoJUnitRunner.class)
public class CloudWatchMetricsPublisherTest { public class CloudWatchMetricsPublisherTest {
private static final String NAMESPACE = "fakeNamespace"; private static final String NAMESPACE = "fakeNamespace";
@ -51,6 +56,10 @@ public class CloudWatchMetricsPublisherTest {
*/ */
@Test @Test
public void testMetricsPublisher() { public void testMetricsPublisher() {
final CompletableFuture<PutMetricDataResponse> putResponseFuture = new CompletableFuture<>();
putResponseFuture.complete(PutMetricDataResponse.builder().build());
when(cloudWatchClient.putMetricData(any(PutMetricDataRequest.class))).thenReturn(putResponseFuture);
List<MetricDatumWithKey<CloudWatchMetricKey>> dataToPublish = constructMetricDatumWithKeyList(25); List<MetricDatumWithKey<CloudWatchMetricKey>> dataToPublish = constructMetricDatumWithKeyList(25);
List<Map<String, MetricDatum>> expectedData = constructMetricDatumListMap(dataToPublish); List<Map<String, MetricDatum>> expectedData = constructMetricDatumListMap(dataToPublish);
publisher.publishMetrics(dataToPublish); publisher.publishMetrics(dataToPublish);