Handle Custom Metric Scope

This commit is contained in:
Wei 2017-09-28 14:47:31 -07:00
parent 50ed982255
commit abed367da0
7 changed files with 31 additions and 5 deletions

View file

@ -20,6 +20,9 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
import com.amazonaws.services.kinesis.metrics.impl.ThreadSafeMetricsDelegatingFactory;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import lombok.NonNull;
@ -42,6 +45,7 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
private final int maxRecordsPerCall;
private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
private final ExecutorService executorService;
private final IMetricsFactory metricsFactory;
private PrefetchCounters prefetchCounters;
@ -50,7 +54,8 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
public PrefetchGetRecordsCache(final int maxSize, final int maxByteSize, final int maxRecordsCount,
final int maxRecordsPerCall,
@NonNull final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy,
@NonNull final ExecutorService executorService) {
@NonNull final ExecutorService executorService,
@NonNull final IMetricsFactory metricsFactory) {
this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy;
this.maxRecordsPerCall = maxRecordsPerCall;
this.maxSize = maxSize;
@ -59,6 +64,7 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
this.getRecordsResultQueue = new LinkedBlockingQueue<>(this.maxSize);
this.prefetchCounters = new PrefetchCounters();
this.executorService = executorService;
this.metricsFactory = new ThreadSafeMetricsDelegatingFactory(metricsFactory);
}
@Override
@ -115,6 +121,7 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
}
if (prefetchCounters.shouldGetNewRecords()) {
try {
MetricsHelper.startScope(metricsFactory, "Prefetcheing");
GetRecordsResult getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall);
ProcessRecordsInput processRecordsInput = new ProcessRecordsInput()
.withRecords(getRecordsResult.getRecords())

View file

@ -14,6 +14,8 @@
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
/**
* The Amazon Kinesis Client Library will use this to instantiate a record fetcher per shard.
* Clients may choose to create separate instantiations, or re-use instantiations.
@ -36,4 +38,6 @@ public interface RecordsFetcherFactory {
void setDataFetchingStrategy(DataFetchingStrategy dataFetchingStrategy);
void setMetricsFactory(IMetricsFactory metricsFactory);
}

View file

@ -16,6 +16,7 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.util.concurrent.Executors;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
import lombok.extern.apachecommons.CommonsLog;
@CommonsLog
@ -25,6 +26,7 @@ public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory {
private int maxByteSize = 8 * 1024 * 1024;
private int maxRecordsCount = 30000;
private DataFetchingStrategy dataFetchingStrategy = DataFetchingStrategy.DEFAULT;
private IMetricsFactory metricsFactory;
public SimpleRecordsFetcherFactory(int maxRecords) {
this.maxRecords = maxRecords;
@ -36,7 +38,7 @@ public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory {
return new BlockingGetRecordsCache(maxRecords, getRecordsRetrievalStrategy);
} else {
return new PrefetchGetRecordsCache(maxSize, maxByteSize, maxRecordsCount, maxRecords,
getRecordsRetrievalStrategy, Executors.newFixedThreadPool(1));
getRecordsRetrievalStrategy, Executors.newFixedThreadPool(1), metricsFactory);
}
}
@ -59,4 +61,9 @@ public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory {
public void setDataFetchingStrategy(DataFetchingStrategy dataFetchingStrategy){
this.dataFetchingStrategy = dataFetchingStrategy;
}
@Override
public void setMetricsFactory(IMetricsFactory metricsFactory) {
this.metricsFactory = metricsFactory;
}
}

View file

@ -415,6 +415,7 @@ public class Worker implements Runnable {
this.shardPrioritization = shardPrioritization;
this.retryGetRecordsInSeconds = retryGetRecordsInSeconds;
this.maxGetRecordsThreadPool = maxGetRecordsThreadPool;
this.config.getRecordsFetcherFactory().setMetricsFactory(metricsFactory);
}
/**

View file

@ -34,6 +34,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory;
import com.amazonaws.services.kinesis.model.Record;
import org.junit.After;
import org.junit.Before;
@ -85,7 +86,8 @@ public class PrefetchGetRecordsCacheIntegrationTest {
MAX_RECORDS_COUNT,
MAX_RECORDS_PER_CALL,
getRecordsRetrievalStrategy,
executorService);
executorService,
new NullMetricsFactory());
}
@Test
@ -128,7 +130,8 @@ public class PrefetchGetRecordsCacheIntegrationTest {
MAX_RECORDS_COUNT,
MAX_RECORDS_PER_CALL,
getRecordsRetrievalStrategy2,
executorService2
executorService2,
new NullMetricsFactory()
);
getRecordsCache.start();

View file

@ -36,6 +36,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.IntStream;
import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -79,7 +80,8 @@ public class PrefetchGetRecordsCacheTest {
MAX_RECORDS_COUNT,
MAX_RECORDS_PER_CALL,
getRecordsRetrievalStrategy,
executorService);
executorService,
new NullMetricsFactory());
spyQueue = spy(getRecordsCache.getRecordsResultQueue);
records = spy(new ArrayList<>());

View file

@ -1,5 +1,6 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
@ -23,6 +24,7 @@ public class RecordsFetcherFactoryTest {
public void setUp() {
MockitoAnnotations.initMocks(this);
recordsFetcherFactory = new SimpleRecordsFetcherFactory(1);
recordsFetcherFactory.setMetricsFactory(new NullMetricsFactory());
}
@Test