Version 1.5.0 of the Amazon Kinesis Client Library
This commit is contained in:
parent
4dfc17d04a
commit
850db1a3da
33 changed files with 746 additions and 180 deletions
|
|
@ -2,7 +2,7 @@ Manifest-Version: 1.0
|
|||
Bundle-ManifestVersion: 2
|
||||
Bundle-Name: Amazon Kinesis Client Library for Java
|
||||
Bundle-SymbolicName: com.amazonaws.kinesisclientlibrary;singleton:=true
|
||||
Bundle-Version: 1.4.0
|
||||
Bundle-Version: 1.5.0
|
||||
Bundle-Vendor: Amazon Technologies, Inc
|
||||
Bundle-RequiredExecutionEnvironment: JavaSE-1.7
|
||||
Require-Bundle: org.apache.commons.codec;bundle-version="1.6",
|
||||
|
|
|
|||
|
|
@ -1,3 +1,3 @@
|
|||
AmazonKinesisClientLibrary
|
||||
Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
|
||||
|
|
|
|||
13
README.md
13
README.md
|
|
@ -29,6 +29,14 @@ For producer-side developers using the **[Kinesis Producer Library (KPL)][kinesi
|
|||
To make it easier for developers to write record processors in other languages, we have implemented a Java based daemon, called MultiLangDaemon that does all the heavy lifting. Our approach has the daemon spawn a sub-process, which in turn runs the record processor, which can be written in any language. The MultiLangDaemon process and the record processor sub-process communicate with each other over [STDIN and STDOUT using a defined protocol][multi-lang-protocol]. There will be a one to one correspondence amongst record processors, child processes, and shards. For Python developers specifically, we have abstracted these implementation details away and [expose an interface][kclpy] that enables you to focus on writing record processing logic in Python. This approach enables KCL to be language agnostic, while providing identical features and similar parallel processing model across all languages.
|
||||
|
||||
## Release Notes
|
||||
### Release 1.5.0 (July 9, 2015)
|
||||
* **[Metrics Enhancements][kinesis-guide-monitoring-with-kcl]**
|
||||
* Support metrics level and dimension configurations to control CloudWatch metrics emitted by the KCL.
|
||||
* Add new metrics that track time spent in record processor methods.
|
||||
* Disable WorkerIdentifier dimension by default.
|
||||
* **Exception Reporting** — Do not silently ignore exceptions in ShardConsumer.
|
||||
* **AWS SDK Component Dependencies** — Depend only on AWS SDK components that are used.
|
||||
|
||||
### Release 1.4.0 (June 2, 2015)
|
||||
* Integration with the **[Kinesis Producer Library (KPL)][kinesis-guide-kpl]**
|
||||
* Automatically de-aggregate records put into the Kinesis stream using the KPL.
|
||||
|
|
@ -40,10 +48,10 @@ To make it easier for developers to write record processors in other languages,
|
|||
* A new metric called "MillisBehindLatest", which tracks how far consumers are from real time, is now uploaded to CloudWatch.
|
||||
|
||||
### Release 1.2.1 (January 26, 2015)
|
||||
* **MultiLangDaemon** Changes to the MultiLangDaemon to make it easier to provide a custom worker.
|
||||
* **MultiLangDaemon** — Changes to the MultiLangDaemon to make it easier to provide a custom worker.
|
||||
|
||||
### Release 1.2 (October 21, 2014)
|
||||
* **Multi-Language Support** Amazon KCL now supports implementing record processors in any language by communicating with the daemon over [STDIN and STDOUT][multi-lang-protocol]. Python developers can directly use the [Amazon Kinesis Client Library for Python][kclpy] to write their data processing applications.
|
||||
* **Multi-Language Support** — Amazon KCL now supports implementing record processors in any language by communicating with the daemon over [STDIN and STDOUT][multi-lang-protocol]. Python developers can directly use the [Amazon Kinesis Client Library for Python][kclpy] to write their data processing applications.
|
||||
|
||||
### Release 1.1 (June 30, 2014)
|
||||
* **Checkpointing at a specific sequence number** — The IRecordProcessorCheckpointer interface now supports checkpointing at a sequence number specified by the record processor.
|
||||
|
|
@ -57,6 +65,7 @@ To make it easier for developers to write record processors in other languages,
|
|||
[kinesis-guide-begin]: http://docs.aws.amazon.com/kinesis/latest/dev/before-you-begin.html
|
||||
[kinesis-guide-create]: http://docs.aws.amazon.com/kinesis/latest/dev/step-one-create-stream.html
|
||||
[kinesis-guide-applications]: http://docs.aws.amazon.com/kinesis/latest/dev/kinesis-record-processor-app.html
|
||||
[kinesis-guide-monitoring-with-kcl]: http://docs.aws.amazon.com//kinesis/latest/dev/monitoring-with-kcl.html
|
||||
[kinesis-guide-kpl]: http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html
|
||||
[kinesis-guide-consumer-deaggregation]: http://docs.aws.amazon.com//kinesis/latest/dev/kinesis-kpl-consumer-deaggregation.html
|
||||
[kclpy]: https://github.com/awslabs/amazon-kinesis-client-python
|
||||
|
|
|
|||
24
pom.xml
24
pom.xml
|
|
@ -6,7 +6,7 @@
|
|||
<artifactId>amazon-kinesis-client</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
<name>Amazon Kinesis Client Library for Java</name>
|
||||
<version>1.4.0</version>
|
||||
<version>1.5.0</version>
|
||||
<description>The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data from Amazon Kinesis.</description>
|
||||
<url>https://aws.amazon.com/kinesis</url>
|
||||
|
||||
|
|
@ -29,9 +29,29 @@
|
|||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.amazonaws</groupId>
|
||||
<artifactId>aws-java-sdk</artifactId>
|
||||
<artifactId>aws-java-sdk-core</artifactId>
|
||||
<version>${aws-java-sdk.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.amazonaws</groupId>
|
||||
<artifactId>aws-java-sdk-dynamodb</artifactId>
|
||||
<version>${aws-java-sdk.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.amazonaws</groupId>
|
||||
<artifactId>aws-java-sdk-kinesis</artifactId>
|
||||
<version>${aws-java-sdk.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.amazonaws</groupId>
|
||||
<artifactId>aws-java-sdk-cloudwatch</artifactId>
|
||||
<version>${aws-java-sdk.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
<version>18.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.protobuf</groupId>
|
||||
<artifactId>protobuf-java</artifactId>
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
* Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
|
|
@ -67,7 +67,8 @@ public class KinesisClientLibConfigurator {
|
|||
new AWSCredentialsProviderPropertyValueDecoder(),
|
||||
new StringPropertyValueDecoder(),
|
||||
new InitialPositionInStreamPropertyValueDecoder(),
|
||||
new ClientConfigurationPropertyValueDecoder());
|
||||
new ClientConfigurationPropertyValueDecoder(),
|
||||
new SetPropertyValueDecoder());
|
||||
|
||||
classToDecoder = new Hashtable<Class<?>, IPropertyValueDecoder<?>>();
|
||||
for (IPropertyValueDecoder<?> getter : getters) {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,65 @@
|
|||
/*
|
||||
* Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.clientlibrary.config;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Provide {@link Set} property value. Note that since parameterized value cannot be figured out during compile time
|
||||
* for setter methods, only {@code Set} of {@code String}s are supported as property value decode.
|
||||
*/
|
||||
@SuppressWarnings("rawtypes")
|
||||
class SetPropertyValueDecoder implements IPropertyValueDecoder<Set> {
|
||||
|
||||
/**
|
||||
* Delimiter for the list provided as string.
|
||||
*/
|
||||
private static final String LIST_DELIMITER = ",";
|
||||
|
||||
/**
|
||||
* Package constructor for factory use only.
|
||||
*/
|
||||
SetPropertyValueDecoder() {
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public Set decodeValue(String propertyValue) {
|
||||
String[] values = propertyValue.split(LIST_DELIMITER);
|
||||
String value = null;
|
||||
Set<String> decodedValue = new HashSet<>();
|
||||
for (int i = 0; i < values.length; i++) {
|
||||
value = values[i].trim();
|
||||
if (!value.isEmpty()) {
|
||||
decodedValue.add(value);
|
||||
}
|
||||
}
|
||||
return decodedValue;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public List<Class<Set>> getSupportedTypes() {
|
||||
return Arrays.asList(Set.class);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
* Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
|
|
@ -21,6 +21,8 @@ import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint;
|
|||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
|
||||
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
|
||||
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
|
||||
|
||||
/**
|
||||
* Task for initializing shard position and invoking the RecordProcessor initialize() API.
|
||||
|
|
@ -28,6 +30,9 @@ import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
|
|||
class InitializeTask implements ITask {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(InitializeTask.class);
|
||||
|
||||
private static final String RECORD_PROCESSOR_INITIALIZE_METRIC = "RecordProcessor.initialize";
|
||||
|
||||
private final ShardInfo shardInfo;
|
||||
private final IRecordProcessor recordProcessor;
|
||||
private final KinesisDataFetcher dataFetcher;
|
||||
|
|
@ -72,16 +77,21 @@ class InitializeTask implements ITask {
|
|||
dataFetcher.initialize(initialCheckpoint.getSequenceNumber());
|
||||
recordProcessorCheckpointer.setLargestPermittedCheckpointValue(initialCheckpoint);
|
||||
recordProcessorCheckpointer.setInitialCheckpointValue(initialCheckpoint);
|
||||
|
||||
LOG.debug("Calling the record processor initialize().");
|
||||
final InitializationInput initializationInput = new InitializationInput()
|
||||
.withShardId(shardInfo.getShardId())
|
||||
.withExtendedSequenceNumber(initialCheckpoint);
|
||||
final long recordProcessorStartTimeMillis = System.currentTimeMillis();
|
||||
try {
|
||||
LOG.debug("Calling the record processor initialize().");
|
||||
final InitializationInput initializationInput = new InitializationInput()
|
||||
.withShardId(shardInfo.getShardId())
|
||||
.withExtendedSequenceNumber(initialCheckpoint);
|
||||
recordProcessor.initialize(initializationInput);
|
||||
LOG.debug("Record processor initialize() completed.");
|
||||
} catch (Exception e) {
|
||||
applicationException = true;
|
||||
throw e;
|
||||
} finally {
|
||||
MetricsHelper.addLatency(RECORD_PROCESSOR_INITIALIZE_METRIC, recordProcessorStartTimeMillis,
|
||||
MetricsLevel.SUMMARY);
|
||||
}
|
||||
|
||||
return new TaskResult(null);
|
||||
|
|
|
|||
|
|
@ -14,9 +14,15 @@
|
|||
*/
|
||||
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
import com.amazonaws.ClientConfiguration;
|
||||
import com.amazonaws.auth.AWSCredentialsProvider;
|
||||
import com.amazonaws.regions.RegionUtils;
|
||||
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
|
||||
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope;
|
||||
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
|
||||
/**
|
||||
* Configuration for the Amazon Kinesis Client Library.
|
||||
|
|
@ -25,6 +31,12 @@ public class KinesisClientLibConfiguration {
|
|||
|
||||
private static final long EPSILON_MS = 25;
|
||||
|
||||
/**
|
||||
* The location in the shard from which the KinesisClientLibrary will start fetching records from
|
||||
* when the application starts for the first time and there is no checkpoint for the shard.
|
||||
*/
|
||||
public static final InitialPositionInStream DEFAULT_INITIAL_POSITION_IN_STREAM = InitialPositionInStream.LATEST;
|
||||
|
||||
/**
|
||||
* Fail over time in milliseconds. A worker which does not renew it's lease within this time interval
|
||||
* will be regarded as having problems and it's shards will be assigned to other workers.
|
||||
|
|
@ -82,10 +94,32 @@ public class KinesisClientLibConfiguration {
|
|||
*/
|
||||
public static final int DEFAULT_METRICS_MAX_QUEUE_SIZE = 10000;
|
||||
|
||||
/**
|
||||
* Metrics level for which to enable CloudWatch metrics.
|
||||
*/
|
||||
public static final MetricsLevel DEFAULT_METRICS_LEVEL = MetricsLevel.DETAILED;
|
||||
|
||||
/**
|
||||
* Metrics dimensions that always will be enabled regardless of the config provided by user.
|
||||
*/
|
||||
public static final Set<String> METRICS_ALWAYS_ENABLED_DIMENSIONS = ImmutableSet.of(
|
||||
MetricsHelper.OPERATION_DIMENSION_NAME);
|
||||
|
||||
/**
|
||||
* Allowed dimensions for CloudWatch metrics. By default, worker ID dimension will be disabled.
|
||||
*/
|
||||
public static final Set<String> DEFAULT_METRICS_ENABLED_DIMENSIONS = ImmutableSet.<String>builder().addAll(
|
||||
METRICS_ALWAYS_ENABLED_DIMENSIONS).add(MetricsHelper.SHARD_ID_DIMENSION_NAME).build();
|
||||
|
||||
/**
|
||||
* Metrics dimensions that signify all possible dimensions.
|
||||
*/
|
||||
public static final Set<String> METRICS_DIMENSIONS_ALL = ImmutableSet.of(IMetricsScope.METRICS_DIMENSIONS_ALL);
|
||||
|
||||
/**
|
||||
* User agent set when Amazon Kinesis Client Library makes AWS requests.
|
||||
*/
|
||||
public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.4.0";
|
||||
public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.5.0";
|
||||
|
||||
/**
|
||||
* KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls
|
||||
|
|
@ -115,6 +149,8 @@ public class KinesisClientLibConfiguration {
|
|||
private long taskBackoffTimeMillis;
|
||||
private long metricsBufferTimeMillis;
|
||||
private int metricsMaxQueueSize;
|
||||
private MetricsLevel metricsLevel;
|
||||
private Set<String> metricsEnabledDimensions;
|
||||
private boolean validateSequenceNumberBeforeCheckpointing;
|
||||
private String regionName;
|
||||
|
||||
|
|
@ -153,7 +189,7 @@ public class KinesisClientLibConfiguration {
|
|||
AWSCredentialsProvider dynamoDBCredentialsProvider,
|
||||
AWSCredentialsProvider cloudWatchCredentialsProvider,
|
||||
String workerId) {
|
||||
this(applicationName, streamName, null, InitialPositionInStream.LATEST, kinesisCredentialsProvider,
|
||||
this(applicationName, streamName, null, DEFAULT_INITIAL_POSITION_IN_STREAM, kinesisCredentialsProvider,
|
||||
dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, DEFAULT_FAILOVER_TIME_MILLIS, workerId,
|
||||
DEFAULT_MAX_RECORDS, DEFAULT_IDLETIME_BETWEEN_READS_MILLIS,
|
||||
DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST, DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS,
|
||||
|
|
@ -253,6 +289,8 @@ public class KinesisClientLibConfiguration {
|
|||
this.taskBackoffTimeMillis = taskBackoffTimeMillis;
|
||||
this.metricsBufferTimeMillis = metricsBufferTimeMillis;
|
||||
this.metricsMaxQueueSize = metricsMaxQueueSize;
|
||||
this.metricsLevel = DEFAULT_METRICS_LEVEL;
|
||||
this.metricsEnabledDimensions = DEFAULT_METRICS_ENABLED_DIMENSIONS;
|
||||
this.validateSequenceNumberBeforeCheckpointing = validateSequenceNumberBeforeCheckpointing;
|
||||
this.regionName = regionName;
|
||||
}
|
||||
|
|
@ -352,7 +390,7 @@ public class KinesisClientLibConfiguration {
|
|||
/**
|
||||
* @return true if processRecords() should be called even for empty record lists
|
||||
*/
|
||||
boolean shouldCallProcessRecordsEvenForEmptyRecordList() {
|
||||
public boolean shouldCallProcessRecordsEvenForEmptyRecordList() {
|
||||
return callProcessRecordsEvenForEmptyRecordList;
|
||||
}
|
||||
|
||||
|
|
@ -420,19 +458,34 @@ public class KinesisClientLibConfiguration {
|
|||
}
|
||||
|
||||
/**
|
||||
* @return Metrics are buffered for at most this long before publishing to CloudWatch
|
||||
* @return Metrics are buffered for at most this long before publishing.
|
||||
*/
|
||||
public long getMetricsBufferTimeMillis() {
|
||||
return metricsBufferTimeMillis;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Max number of metrics to buffer before publishing to CloudWatch
|
||||
* @return Max number of metrics to buffer before publishing.
|
||||
*/
|
||||
public int getMetricsMaxQueueSize() {
|
||||
return metricsMaxQueueSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Metrics level enabled for metrics.
|
||||
*/
|
||||
public MetricsLevel getMetricsLevel() {
|
||||
return metricsLevel;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Enabled dimensions for metrics.
|
||||
*/
|
||||
public Set<String> getMetricsEnabledDimensions() {
|
||||
// Unmodifiable set.
|
||||
return metricsEnabledDimensions;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if we should clean up leases of shards after processing is complete (don't wait for expiration)
|
||||
*/
|
||||
|
|
@ -631,6 +684,46 @@ public class KinesisClientLibConfiguration {
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param metricsLevel Metrics level to enable.
|
||||
* @return KinesisClientLibConfiguration
|
||||
*/
|
||||
public KinesisClientLibConfiguration withMetricsLevel(MetricsLevel metricsLevel) {
|
||||
this.metricsLevel = metricsLevel == null ? DEFAULT_METRICS_LEVEL : metricsLevel;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets metrics level that should be enabled. Possible values are:
|
||||
* NONE
|
||||
* SUMMARY
|
||||
* DETAILED
|
||||
*
|
||||
* @param metricsLevel Metrics level to enable.
|
||||
* @return KinesisClientLibConfiguration
|
||||
*/
|
||||
public KinesisClientLibConfiguration withMetricsLevel(String metricsLevel) {
|
||||
this.metricsLevel = MetricsLevel.fromName(metricsLevel);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the dimensions that are allowed to be emitted in metrics.
|
||||
* @param metricsEnabledDimensions Set of dimensions that are allowed.
|
||||
* @return KinesisClientLibConfiguration
|
||||
*/
|
||||
public KinesisClientLibConfiguration withMetricsEnabledDimensions(Set<String> metricsEnabledDimensions) {
|
||||
if (metricsEnabledDimensions == null) {
|
||||
this.metricsEnabledDimensions = METRICS_ALWAYS_ENABLED_DIMENSIONS;
|
||||
} else if (metricsEnabledDimensions.contains(IMetricsScope.METRICS_DIMENSIONS_ALL)) {
|
||||
this.metricsEnabledDimensions = METRICS_DIMENSIONS_ALL;
|
||||
} else {
|
||||
this.metricsEnabledDimensions = ImmutableSet.<String>builder().addAll(
|
||||
metricsEnabledDimensions).addAll(METRICS_ALWAYS_ENABLED_DIMENSIONS).build();
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param validateSequenceNumberBeforeCheckpointing whether KCL should validate client provided sequence numbers
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
* Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
|
|
@ -16,6 +16,7 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
|||
|
||||
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
|
||||
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
|
||||
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
|
||||
|
||||
/**
|
||||
* Decorates an ITask and reports metrics about its timing and success/failure.
|
||||
|
|
@ -23,7 +24,7 @@ import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
|
|||
class MetricsCollectingTaskDecorator implements ITask {
|
||||
|
||||
private final ITask other;
|
||||
private IMetricsFactory factory;
|
||||
private final IMetricsFactory factory;
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
|
|
@ -41,15 +42,16 @@ class MetricsCollectingTaskDecorator implements ITask {
|
|||
*/
|
||||
@Override
|
||||
public TaskResult call() {
|
||||
String taskName = other.getClass().getSimpleName();
|
||||
MetricsHelper.startScope(factory, taskName);
|
||||
|
||||
long startTimeMillis = System.currentTimeMillis();
|
||||
TaskResult result = other.call();
|
||||
|
||||
MetricsHelper.addSuccessAndLatency(null, startTimeMillis, result.getException() == null);
|
||||
MetricsHelper.endScope();
|
||||
|
||||
MetricsHelper.startScope(factory, other.getClass().getSimpleName());
|
||||
TaskResult result = null;
|
||||
final long startTimeMillis = System.currentTimeMillis();
|
||||
try {
|
||||
result = other.call();
|
||||
} finally {
|
||||
MetricsHelper.addSuccessAndLatency(startTimeMillis, result != null && result.getException() == null,
|
||||
MetricsLevel.SUMMARY);
|
||||
MetricsHelper.endScope();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
* Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
|
|
@ -16,6 +16,7 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
|||
|
||||
import java.math.BigInteger;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.ListIterator;
|
||||
|
||||
|
|
@ -23,7 +24,6 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import com.amazonaws.services.cloudwatch.model.StandardUnit;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibException;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxyExtended;
|
||||
|
|
@ -32,6 +32,7 @@ import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
|
|||
import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
|
||||
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
|
||||
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope;
|
||||
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
|
||||
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
|
||||
import com.amazonaws.services.kinesis.model.GetRecordsResult;
|
||||
import com.amazonaws.services.kinesis.model.Record;
|
||||
|
|
@ -42,12 +43,13 @@ import com.amazonaws.services.kinesis.model.Shard;
|
|||
*/
|
||||
class ProcessTask implements ITask {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(ProcessTask.class);
|
||||
|
||||
private static final String EXPIRED_ITERATOR_METRIC = "ExpiredIterator";
|
||||
private static final String DATA_BYTES_PROCESSED_METRIC = "DataBytesProcessed";
|
||||
private static final String RECORDS_PROCESSED_METRIC = "RecordsProcessed";
|
||||
private static final String MILLIS_BEHIND_LATEST_METRIC = "MillisBehindLatest";
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(ProcessTask.class);
|
||||
private static final String RECORD_PROCESSOR_PROCESS_RECORDS_METRIC = "RecordProcessor.processRecords";
|
||||
|
||||
private final ShardInfo shardInfo;
|
||||
private final IRecordProcessor recordProcessor;
|
||||
|
|
@ -101,9 +103,9 @@ class ProcessTask implements ITask {
|
|||
public TaskResult call() {
|
||||
long startTimeMillis = System.currentTimeMillis();
|
||||
IMetricsScope scope = MetricsHelper.getMetricsScope();
|
||||
scope.addDimension("ShardId", shardInfo.getShardId());
|
||||
scope.addData(RECORDS_PROCESSED_METRIC, 0, StandardUnit.Count);
|
||||
scope.addData(DATA_BYTES_PROCESSED_METRIC, 0, StandardUnit.Bytes);
|
||||
scope.addDimension(MetricsHelper.SHARD_ID_DIMENSION_NAME, shardInfo.getShardId());
|
||||
scope.addData(RECORDS_PROCESSED_METRIC, 0, StandardUnit.Count, MetricsLevel.SUMMARY);
|
||||
scope.addData(DATA_BYTES_PROCESSED_METRIC, 0, StandardUnit.Bytes, MetricsLevel.SUMMARY);
|
||||
|
||||
Exception exception = null;
|
||||
|
||||
|
|
@ -113,14 +115,8 @@ class ProcessTask implements ITask {
|
|||
boolean shardEndReached = true;
|
||||
return new TaskResult(null, shardEndReached);
|
||||
}
|
||||
|
||||
final GetRecordsResult getRecordsResult = getRecords();
|
||||
|
||||
if (getRecordsResult.getMillisBehindLatest() != null) {
|
||||
scope.addData(MILLIS_BEHIND_LATEST_METRIC, getRecordsResult.getMillisBehindLatest(),
|
||||
StandardUnit.Milliseconds);
|
||||
}
|
||||
|
||||
final GetRecordsResult getRecordsResult = getRecordsResult();
|
||||
final List<Record> records = getRecordsResult.getRecords();
|
||||
|
||||
if (records.isEmpty()) {
|
||||
|
|
@ -146,7 +142,7 @@ class ProcessTask implements ITask {
|
|||
|
||||
// If we got more records, record the max extended sequence number. Sleep if there are no records.
|
||||
if (!records.isEmpty()) {
|
||||
scope.addData(RECORDS_PROCESSED_METRIC, numKinesisRecords, StandardUnit.Count);
|
||||
scope.addData(RECORDS_PROCESSED_METRIC, numKinesisRecords, StandardUnit.Count, MetricsLevel.SUMMARY);
|
||||
if (this.shard != null) {
|
||||
subRecords = UserRecord.deaggregate(records,
|
||||
new BigInteger(this.shard.getHashKeyRange().getStartingHashKey()),
|
||||
|
|
@ -161,23 +157,28 @@ class ProcessTask implements ITask {
|
|||
}
|
||||
|
||||
if ((!subRecords.isEmpty()) || streamConfig.shouldCallProcessRecordsEvenForEmptyRecordList()) {
|
||||
try {
|
||||
LOG.debug("Calling application processRecords() with " + numKinesisRecords + " Kinesis records ("
|
||||
+ numUserRecords + " user records) from " + shardInfo.getShardId());
|
||||
@SuppressWarnings("unchecked")
|
||||
final ProcessRecordsInput processRecordsInput = new ProcessRecordsInput()
|
||||
.withRecords((List<Record>) (List<?>) subRecords)
|
||||
.withCheckpointer(recordProcessorCheckpointer);
|
||||
LOG.debug("Calling application processRecords() with " + numKinesisRecords + " Kinesis records ("
|
||||
+ numUserRecords + " user records) from " + shardInfo.getShardId());
|
||||
@SuppressWarnings("unchecked")
|
||||
final ProcessRecordsInput processRecordsInput = new ProcessRecordsInput()
|
||||
.withRecords((List<Record>) (List<?>) subRecords)
|
||||
.withCheckpointer(recordProcessorCheckpointer)
|
||||
.withMillisBehindLatest(getRecordsResult.getMillisBehindLatest());
|
||||
|
||||
final long recordProcessorStartTimeMillis = System.currentTimeMillis();
|
||||
try {
|
||||
recordProcessor.processRecords(processRecordsInput);
|
||||
} catch (Exception e) {
|
||||
LOG.error("ShardId " + shardInfo.getShardId()
|
||||
+ ": Application processRecords() threw an exception when processing shard ", e);
|
||||
LOG.error("ShardId " + shardInfo.getShardId() + ": Skipping over the following data records: "
|
||||
+ subRecords);
|
||||
} finally {
|
||||
MetricsHelper.addLatencyPerShard(shardInfo.getShardId(), RECORD_PROCESSOR_PROCESS_RECORDS_METRIC,
|
||||
recordProcessorStartTimeMillis, MetricsLevel.SUMMARY);
|
||||
}
|
||||
}
|
||||
} catch (RuntimeException | KinesisClientLibException e) {
|
||||
} catch (RuntimeException e) {
|
||||
LOG.error("ShardId " + shardInfo.getShardId() + ": Caught exception: ", e);
|
||||
exception = e;
|
||||
|
||||
|
|
@ -191,6 +192,12 @@ class ProcessTask implements ITask {
|
|||
|
||||
return new TaskResult(exception);
|
||||
}
|
||||
// CHECKSTYLE:ON CyclomaticComplexity
|
||||
|
||||
@Override
|
||||
public TaskType getTaskType() {
|
||||
return taskType;
|
||||
}
|
||||
|
||||
/**
|
||||
* Scans a list of records to filter out records up to and including the most recent checkpoint value and to get
|
||||
|
|
@ -222,7 +229,8 @@ class ProcessTask implements ITask {
|
|||
largestExtendedSequenceNumber = extendedSequenceNumber;
|
||||
}
|
||||
|
||||
scope.addData(DATA_BYTES_PROCESSED_METRIC, record.getData().limit(), StandardUnit.Bytes);
|
||||
scope.addData(DATA_BYTES_PROCESSED_METRIC, record.getData().limit(), StandardUnit.Bytes,
|
||||
MetricsLevel.SUMMARY);
|
||||
}
|
||||
return largestExtendedSequenceNumber;
|
||||
}
|
||||
|
|
@ -231,19 +239,17 @@ class ProcessTask implements ITask {
|
|||
* Gets records from Kinesis and retries once in the event of an ExpiredIteratorException.
|
||||
*
|
||||
* @return list of data records from Kinesis
|
||||
* @throws KinesisClientLibException if reading checkpoints fails in the edge case where we haven't passed any
|
||||
* records to the client code yet
|
||||
*/
|
||||
private GetRecordsResult getRecords() throws KinesisClientLibException {
|
||||
int maxRecords = streamConfig.getMaxRecords();
|
||||
private GetRecordsResult getRecordsResult() {
|
||||
try {
|
||||
return dataFetcher.getRecords(maxRecords);
|
||||
return getRecordsResultAndRecordMillisBehindLatest();
|
||||
} catch (ExpiredIteratorException e) {
|
||||
// If we see a ExpiredIteratorException, try once to restart from the greatest remembered sequence number
|
||||
LOG.info("ShardId " + shardInfo.getShardId()
|
||||
+ ": getRecords threw ExpiredIteratorException - restarting after greatest seqNum "
|
||||
+ "passed to customer", e);
|
||||
MetricsHelper.getMetricsScope().addData(EXPIRED_ITERATOR_METRIC, 1, StandardUnit.Count);
|
||||
MetricsHelper.getMetricsScope().addData(EXPIRED_ITERATOR_METRIC, 1, StandardUnit.Count,
|
||||
MetricsLevel.SUMMARY);
|
||||
|
||||
/*
|
||||
* Advance the iterator to after the greatest processed sequence number (remembered by
|
||||
|
|
@ -254,7 +260,7 @@ class ProcessTask implements ITask {
|
|||
|
||||
// Try a second time - if we fail this time, expose the failure.
|
||||
try {
|
||||
return dataFetcher.getRecords(maxRecords);
|
||||
return getRecordsResultAndRecordMillisBehindLatest();
|
||||
} catch (ExpiredIteratorException ex) {
|
||||
String msg =
|
||||
"Shard " + shardInfo.getShardId()
|
||||
|
|
@ -265,9 +271,27 @@ class ProcessTask implements ITask {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public TaskType getTaskType() {
|
||||
return taskType;
|
||||
/**
|
||||
* Gets records from Kinesis and records the MillisBehindLatest metric if present.
|
||||
*
|
||||
* @return list of data records from Kinesis
|
||||
*/
|
||||
private GetRecordsResult getRecordsResultAndRecordMillisBehindLatest() {
|
||||
final GetRecordsResult getRecordsResult = dataFetcher.getRecords(streamConfig.getMaxRecords());
|
||||
|
||||
if (getRecordsResult == null) {
|
||||
// Stream no longer exists
|
||||
return new GetRecordsResult().withRecords(Collections.<Record>emptyList());
|
||||
}
|
||||
|
||||
if (getRecordsResult.getMillisBehindLatest() != null) {
|
||||
MetricsHelper.getMetricsScope().addData(MILLIS_BEHIND_LATEST_METRIC,
|
||||
getRecordsResult.getMillisBehindLatest(),
|
||||
StandardUnit.Milliseconds,
|
||||
MetricsLevel.SUMMARY);
|
||||
}
|
||||
|
||||
return getRecordsResult;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -14,7 +14,6 @@
|
|||
*/
|
||||
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
|
|
@ -153,14 +152,8 @@ class ShardConsumer {
|
|||
}
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(currentTask.getTaskType() + " task was interrupted: ", e);
|
||||
}
|
||||
} catch (ExecutionException e) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(currentTask.getTaskType() + " task encountered execution exception: ", e);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
updateState(taskCompletedSuccessfully);
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
* Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
|
|
@ -38,6 +38,7 @@ import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputExc
|
|||
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
|
||||
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
|
||||
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
|
||||
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
|
||||
import com.amazonaws.services.kinesis.model.Shard;
|
||||
|
||||
/**
|
||||
|
|
@ -126,7 +127,7 @@ class ShardSyncer {
|
|||
leaseManager.createLeaseIfNotExists(lease);
|
||||
success = true;
|
||||
} finally {
|
||||
MetricsHelper.addSuccessAndLatency("CreateLease", startTimeMillis, success);
|
||||
MetricsHelper.addSuccessAndLatency("CreateLease", startTimeMillis, success, MetricsLevel.DETAILED);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
* Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
|
|
@ -24,6 +24,8 @@ import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
|
|||
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
|
||||
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
|
||||
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
|
||||
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
|
||||
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
|
||||
|
||||
/**
|
||||
* Task for invoking the RecordProcessor shutdown() callback.
|
||||
|
|
@ -31,6 +33,9 @@ import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
|
|||
class ShutdownTask implements ITask {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(ShutdownTask.class);
|
||||
|
||||
private static final String RECORD_PROCESSOR_SHUTDOWN_METRIC = "RecordProcessor.shutdown";
|
||||
|
||||
private final ShardInfo shardInfo;
|
||||
private final IRecordProcessor recordProcessor;
|
||||
private final RecordProcessorCheckpointer recordProcessorCheckpointer;
|
||||
|
|
@ -87,10 +92,11 @@ class ShutdownTask implements ITask {
|
|||
|
||||
LOG.debug("Invoking shutdown() for shard " + shardInfo.getShardId() + ", concurrencyToken "
|
||||
+ shardInfo.getConcurrencyToken() + ". Shutdown reason: " + reason);
|
||||
try {
|
||||
final ShutdownInput shutdownInput = new ShutdownInput()
|
||||
final ShutdownInput shutdownInput = new ShutdownInput()
|
||||
.withShutdownReason(reason)
|
||||
.withCheckpointer(recordProcessorCheckpointer);
|
||||
final long recordProcessorStartTimeMillis = System.currentTimeMillis();
|
||||
try {
|
||||
recordProcessor.shutdown(shutdownInput);
|
||||
ExtendedSequenceNumber lastCheckpointValue = recordProcessorCheckpointer.getLastCheckpointValue();
|
||||
|
||||
|
|
@ -105,6 +111,9 @@ class ShutdownTask implements ITask {
|
|||
} catch (Exception e) {
|
||||
applicationException = true;
|
||||
throw e;
|
||||
} finally {
|
||||
MetricsHelper.addLatency(RECORD_PROCESSOR_SHUTDOWN_METRIC, recordProcessorStartTimeMillis,
|
||||
MetricsLevel.SUMMARY);
|
||||
}
|
||||
|
||||
if (reason == ShutdownReason.TERMINATE) {
|
||||
|
|
|
|||
|
|
@ -28,23 +28,6 @@ class StreamConfig {
|
|||
private InitialPositionInStream initialPositionInStream;
|
||||
private final boolean validateSequenceNumberBeforeCheckpointing;
|
||||
|
||||
/**
|
||||
* @param proxy Used to fetch records and information about the stream
|
||||
* @param maxRecords Max records to fetch in a call
|
||||
* @param idleTimeInMilliseconds Idle time between get calls to the stream
|
||||
* @param callProcessRecordsEvenForEmptyRecordList Call the RecordProcessor::processRecords() API even if
|
||||
* GetRecords returned an empty record list.
|
||||
* @param validateSequenceNumberBeforeCheckpointing Whether to call Amazon Kinesis to validate sequence numbers
|
||||
*/
|
||||
StreamConfig(IKinesisProxy proxy,
|
||||
int maxRecords,
|
||||
long idleTimeInMilliseconds,
|
||||
boolean callProcessRecordsEvenForEmptyRecordList,
|
||||
boolean validateSequenceNumberBeforeCheckpointing) {
|
||||
this(proxy, maxRecords, idleTimeInMilliseconds, callProcessRecordsEvenForEmptyRecordList,
|
||||
validateSequenceNumberBeforeCheckpointing, InitialPositionInStream.LATEST);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param proxy Used to fetch records and information about the stream
|
||||
* @param maxRecords Max records to be fetched in a call
|
||||
|
|
|
|||
|
|
@ -35,14 +35,16 @@ import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
|
|||
import com.amazonaws.services.kinesis.AmazonKinesis;
|
||||
import com.amazonaws.services.kinesis.AmazonKinesisClient;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxyFactory;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
|
||||
import com.amazonaws.services.kinesis.leases.exceptions.LeasingException;
|
||||
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLeaseManager;
|
||||
import com.amazonaws.services.kinesis.metrics.impl.CWMetricsFactory;
|
||||
import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory;
|
||||
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
|
||||
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
|
||||
|
||||
/**
|
||||
* Worker is the high level class that Kinesis applications use to start
|
||||
|
|
@ -177,10 +179,8 @@ public class Worker implements Runnable {
|
|||
AmazonDynamoDB dynamoDBClient,
|
||||
AmazonCloudWatch cloudWatchClient,
|
||||
ExecutorService execService) {
|
||||
this(recordProcessorFactory, config, kinesisClient, dynamoDBClient, new CWMetricsFactory(cloudWatchClient,
|
||||
config.getApplicationName(),
|
||||
config.getMetricsBufferTimeMillis(),
|
||||
config.getMetricsMaxQueueSize()), execService);
|
||||
this(recordProcessorFactory, config, kinesisClient, dynamoDBClient,
|
||||
getMetricsFactory(cloudWatchClient, config), execService);
|
||||
if (config.getRegionName() != null) {
|
||||
Region region = RegionUtils.getRegion(config.getRegionName());
|
||||
cloudWatchClient.setRegion(region);
|
||||
|
|
@ -212,7 +212,8 @@ public class Worker implements Runnable {
|
|||
.getProxy(config.getStreamName()),
|
||||
config.getMaxRecords(), config.getIdleTimeBetweenReadsInMillis(),
|
||||
config.shouldCallProcessRecordsEvenForEmptyRecordList(),
|
||||
config.shouldValidateSequenceNumberBeforeCheckpointing()),
|
||||
config.shouldValidateSequenceNumberBeforeCheckpointing(),
|
||||
config.getInitialPositionInStream()),
|
||||
config.getInitialPositionInStream(),
|
||||
config.getParentShardPollIntervalMillis(),
|
||||
config.getShardSyncIntervalMillis(),
|
||||
|
|
@ -639,6 +640,24 @@ public class Worker implements Runnable {
|
|||
execService);
|
||||
}
|
||||
|
||||
/**
|
||||
* Given configuration, returns appropriate metrics factory.
|
||||
* @param cloudWatchClient Amazon CloudWatch client
|
||||
* @param config KinesisClientLibConfiguration
|
||||
* @return Returns metrics factory based on the config.
|
||||
*/
|
||||
private static IMetricsFactory getMetricsFactory(
|
||||
AmazonCloudWatch cloudWatchClient, KinesisClientLibConfiguration config) {
|
||||
return config.getMetricsLevel() == MetricsLevel.NONE
|
||||
? new NullMetricsFactory() : new CWMetricsFactory(
|
||||
cloudWatchClient,
|
||||
config.getApplicationName(),
|
||||
config.getMetricsBufferTimeMillis(),
|
||||
config.getMetricsMaxQueueSize(),
|
||||
config.getMetricsLevel(),
|
||||
config.getMetricsEnabledDimensions());
|
||||
}
|
||||
|
||||
/**
|
||||
* Builder to construct a Worker instance.
|
||||
*/
|
||||
|
|
@ -805,10 +824,7 @@ public class Worker implements Runnable {
|
|||
}
|
||||
}
|
||||
if (metricsFactory == null) {
|
||||
metricsFactory = new CWMetricsFactory(cloudWatchClient,
|
||||
config.getApplicationName(),
|
||||
config.getMetricsBufferTimeMillis(),
|
||||
config.getMetricsMaxQueueSize());
|
||||
metricsFactory = getMetricsFactory(cloudWatchClient, config);
|
||||
}
|
||||
|
||||
return new Worker(config.getApplicationName(),
|
||||
|
|
@ -818,7 +834,8 @@ public class Worker implements Runnable {
|
|||
config.getMaxRecords(),
|
||||
config.getIdleTimeBetweenReadsInMillis(),
|
||||
config.shouldCallProcessRecordsEvenForEmptyRecordList(),
|
||||
config.shouldValidateSequenceNumberBeforeCheckpointing()),
|
||||
config.shouldValidateSequenceNumberBeforeCheckpointing(),
|
||||
config.getInitialPositionInStream()),
|
||||
config.getInitialPositionInStream(),
|
||||
config.getParentShardPollIntervalMillis(),
|
||||
config.getShardSyncIntervalMillis(),
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
* Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
|
|
@ -54,7 +54,7 @@ public class KinesisProxy implements IKinesisProxyExtended {
|
|||
|
||||
private AmazonKinesis client;
|
||||
private AWSCredentialsProvider credentialsProvider;
|
||||
private AtomicReference<List<Shard>> listOfShardsSinceLastGet = new AtomicReference();
|
||||
private AtomicReference<List<Shard>> listOfShardsSinceLastGet = new AtomicReference<>();
|
||||
|
||||
private final String streamName;
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
* Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
|
|
@ -26,6 +26,7 @@ import com.amazonaws.services.kinesis.model.PutRecordResult;
|
|||
import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
|
||||
import com.amazonaws.services.kinesis.model.Shard;
|
||||
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
|
||||
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
|
||||
|
||||
/**
|
||||
* IKinesisProxy implementation that wraps another implementation and collects metrics.
|
||||
|
|
@ -73,7 +74,8 @@ public class MetricsCollectingKinesisProxyDecorator implements IKinesisProxy {
|
|||
success = true;
|
||||
return response;
|
||||
} finally {
|
||||
MetricsHelper.addSuccessAndLatencyPerShard(getRecordsShardId, getRecordsMetric, startTime, success);
|
||||
MetricsHelper.addSuccessAndLatencyPerShard(getRecordsShardId, getRecordsMetric, startTime, success,
|
||||
MetricsLevel.DETAILED);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -89,7 +91,7 @@ public class MetricsCollectingKinesisProxyDecorator implements IKinesisProxy {
|
|||
success = true;
|
||||
return response;
|
||||
} finally {
|
||||
MetricsHelper.addSuccessAndLatency(getStreamInfoMetric, startTime, success);
|
||||
MetricsHelper.addSuccessAndLatency(getStreamInfoMetric, startTime, success, MetricsLevel.DETAILED);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -105,7 +107,7 @@ public class MetricsCollectingKinesisProxyDecorator implements IKinesisProxy {
|
|||
success = true;
|
||||
return response;
|
||||
} finally {
|
||||
MetricsHelper.addSuccessAndLatency(getStreamInfoMetric, startTime, success);
|
||||
MetricsHelper.addSuccessAndLatency(getStreamInfoMetric, startTime, success, MetricsLevel.DETAILED);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -122,7 +124,7 @@ public class MetricsCollectingKinesisProxyDecorator implements IKinesisProxy {
|
|||
success = true;
|
||||
return response;
|
||||
} finally {
|
||||
MetricsHelper.addSuccessAndLatency(getIteratorMetric, startTime, success);
|
||||
MetricsHelper.addSuccessAndLatency(getIteratorMetric, startTime, success, MetricsLevel.DETAILED);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -138,7 +140,7 @@ public class MetricsCollectingKinesisProxyDecorator implements IKinesisProxy {
|
|||
success = true;
|
||||
return response;
|
||||
} finally {
|
||||
MetricsHelper.addSuccessAndLatency(getShardListMetric, startTime, success);
|
||||
MetricsHelper.addSuccessAndLatency(getShardListMetric, startTime, success, MetricsLevel.DETAILED);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -157,7 +159,7 @@ public class MetricsCollectingKinesisProxyDecorator implements IKinesisProxy {
|
|||
success = true;
|
||||
return response;
|
||||
} finally {
|
||||
MetricsHelper.addSuccessAndLatency(putRecordMetric, startTime, success);
|
||||
MetricsHelper.addSuccessAndLatency(putRecordMetric, startTime, success, MetricsLevel.DETAILED);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -27,6 +27,7 @@ public class ProcessRecordsInput {
|
|||
|
||||
private List<Record> records;
|
||||
private IRecordProcessorCheckpointer checkpointer;
|
||||
private Long millisBehindLatest;
|
||||
|
||||
/**
|
||||
* Default constructor.
|
||||
|
|
@ -73,4 +74,26 @@ public class ProcessRecordsInput {
|
|||
this.checkpointer = checkpointer;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get milliseconds behind latest.
|
||||
*
|
||||
* @return The number of milliseconds this batch of records is from the tip of the stream,
|
||||
* indicating how far behind current time the record processor is.
|
||||
*/
|
||||
public Long getMillisBehindLatest() {
|
||||
return millisBehindLatest;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set milliseconds behind latest.
|
||||
*
|
||||
* @param millisBehindLatest The number of milliseconds this batch of records is from the tip of the stream,
|
||||
* indicating how far behind current time the record processor is.
|
||||
* @return A reference to this updated object so that method calls can be chained together.
|
||||
*/
|
||||
public ProcessRecordsInput withMillisBehindLatest(Long millisBehindLatest) {
|
||||
this.millisBehindLatest = millisBehindLatest;
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
* Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
|
|
@ -14,7 +14,6 @@
|
|||
*/
|
||||
package com.amazonaws.services.kinesis.leases.impl;
|
||||
|
||||
import java.lang.Long;
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
* Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
|
|
@ -35,6 +35,7 @@ import com.amazonaws.services.kinesis.metrics.impl.LogMetricsFactory;
|
|||
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
|
||||
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
|
||||
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope;
|
||||
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
|
||||
|
||||
/**
|
||||
* LeaseCoordinator abstracts away LeaseTaker and LeaseRenewer from the application code that's using leasing. It owns
|
||||
|
|
@ -174,7 +175,7 @@ public class LeaseCoordinator<T extends Lease> {
|
|||
success = true;
|
||||
} finally {
|
||||
scope.addDimension(WORKER_IDENTIFIER_METRIC, getWorkerIdentifier());
|
||||
MetricsHelper.addSuccessAndLatency(startTime, success);
|
||||
MetricsHelper.addSuccessAndLatency(startTime, success, MetricsLevel.SUMMARY);
|
||||
MetricsHelper.endScope();
|
||||
}
|
||||
}
|
||||
|
|
@ -195,7 +196,7 @@ public class LeaseCoordinator<T extends Lease> {
|
|||
success = true;
|
||||
} finally {
|
||||
scope.addDimension(WORKER_IDENTIFIER_METRIC, getWorkerIdentifier());
|
||||
MetricsHelper.addSuccessAndLatency(startTime, success);
|
||||
MetricsHelper.addSuccessAndLatency(startTime, success, MetricsLevel.SUMMARY);
|
||||
MetricsHelper.endScope();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
* Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
|
|
@ -34,6 +34,7 @@ import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputExc
|
|||
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
|
||||
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseRenewer;
|
||||
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
|
||||
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
|
||||
|
||||
/**
|
||||
* An implementation of ILeaseRenewer that uses DynamoDB via LeaseManager.
|
||||
|
|
@ -86,8 +87,10 @@ public class LeaseRenewer<T extends Lease> implements ILeaseRenewer<T> {
|
|||
}
|
||||
}
|
||||
|
||||
MetricsHelper.getMetricsScope().addData("LostLeases", lostLeases, StandardUnit.Count);
|
||||
MetricsHelper.getMetricsScope().addData("CurrentLeases", ownedLeases.size(), StandardUnit.Count);
|
||||
MetricsHelper.getMetricsScope().addData(
|
||||
"LostLeases", lostLeases, StandardUnit.Count, MetricsLevel.SUMMARY);
|
||||
MetricsHelper.getMetricsScope().addData(
|
||||
"CurrentLeases", ownedLeases.size(), StandardUnit.Count, MetricsLevel.SUMMARY);
|
||||
}
|
||||
|
||||
private boolean renewLease(T lease) throws DependencyException, InvalidStateException {
|
||||
|
|
@ -128,7 +131,7 @@ public class LeaseRenewer<T extends Lease> implements ILeaseRenewer<T> {
|
|||
}
|
||||
}
|
||||
} finally {
|
||||
MetricsHelper.addSuccessAndLatency("RenewLease", startTime, success);
|
||||
MetricsHelper.addSuccessAndLatency("RenewLease", startTime, success, MetricsLevel.DETAILED);
|
||||
}
|
||||
|
||||
return renewedLease;
|
||||
|
|
@ -258,7 +261,7 @@ public class LeaseRenewer<T extends Lease> implements ILeaseRenewer<T> {
|
|||
return updatedLease;
|
||||
}
|
||||
} finally {
|
||||
MetricsHelper.addSuccessAndLatency("UpdateLease", startTime, success);
|
||||
MetricsHelper.addSuccessAndLatency("UpdateLease", startTime, success, MetricsLevel.DETAILED);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
* Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
|
|
@ -38,6 +38,7 @@ import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
|
|||
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseTaker;
|
||||
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
|
||||
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope;
|
||||
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
|
||||
|
||||
/**
|
||||
* An implementation of ILeaseTaker that uses DynamoDB via LeaseManager.
|
||||
|
|
@ -116,7 +117,7 @@ public class LeaseTaker<T extends Lease> implements ILeaseTaker<T> {
|
|||
}
|
||||
}
|
||||
} finally {
|
||||
MetricsHelper.addSuccessAndLatency("ListLeases", startTime, success);
|
||||
MetricsHelper.addSuccessAndLatency("ListLeases", startTime, success, MetricsLevel.DETAILED);
|
||||
}
|
||||
|
||||
if (lastException != null) {
|
||||
|
|
@ -157,7 +158,7 @@ public class LeaseTaker<T extends Lease> implements ILeaseTaker<T> {
|
|||
}
|
||||
}
|
||||
} finally {
|
||||
MetricsHelper.addSuccessAndLatency("TakeLease", startTime, success);
|
||||
MetricsHelper.addSuccessAndLatency("TakeLease", startTime, success, MetricsLevel.DETAILED);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -175,7 +176,8 @@ public class LeaseTaker<T extends Lease> implements ILeaseTaker<T> {
|
|||
stringJoin(untakenLeaseKeys, ", ")));
|
||||
}
|
||||
|
||||
MetricsHelper.getMetricsScope().addData("TakenLeases", takenLeases.size(), StandardUnit.Count);
|
||||
MetricsHelper.getMetricsScope().addData(
|
||||
"TakenLeases", takenLeases.size(), StandardUnit.Count, MetricsLevel.SUMMARY);
|
||||
|
||||
return takenLeases;
|
||||
}
|
||||
|
|
@ -356,11 +358,11 @@ public class LeaseTaker<T extends Lease> implements ILeaseTaker<T> {
|
|||
}
|
||||
|
||||
IMetricsScope metrics = MetricsHelper.getMetricsScope();
|
||||
metrics.addData("TotalLeases", numLeases, StandardUnit.Count);
|
||||
metrics.addData("ExpiredLeases", originalExpiredLeasesSize, StandardUnit.Count);
|
||||
metrics.addData("NumWorkers", numWorkers, StandardUnit.Count);
|
||||
metrics.addData("NeededLeases", numLeasesToReachTarget, StandardUnit.Count);
|
||||
metrics.addData("LeasesToTake", leasesToTake.size(), StandardUnit.Count);
|
||||
metrics.addData("TotalLeases", numLeases, StandardUnit.Count, MetricsLevel.DETAILED);
|
||||
metrics.addData("ExpiredLeases", originalExpiredLeasesSize, StandardUnit.Count, MetricsLevel.SUMMARY);
|
||||
metrics.addData("NumWorkers", numWorkers, StandardUnit.Count, MetricsLevel.SUMMARY);
|
||||
metrics.addData("NeededLeases", numLeasesToReachTarget, StandardUnit.Count, MetricsLevel.DETAILED);
|
||||
metrics.addData("LeasesToTake", leasesToTake.size(), StandardUnit.Count, MetricsLevel.DETAILED);
|
||||
|
||||
return leasesToTake;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
* Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
|
|
@ -19,7 +19,7 @@ package com.amazonaws.services.kinesis.metrics.impl;
|
|||
* getting the key based off of the String KeyType.
|
||||
*/
|
||||
|
||||
public class AccumulateByNameMetricsScope extends AccumulatingMetricsScope<String> {
|
||||
public abstract class AccumulateByNameMetricsScope extends AccumulatingMetricsScope<String> {
|
||||
|
||||
@Override
|
||||
protected String getKey(String name) {
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
* Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
|
|
@ -20,6 +20,7 @@ import java.util.Map;
|
|||
import com.amazonaws.services.cloudwatch.model.MetricDatum;
|
||||
import com.amazonaws.services.cloudwatch.model.StandardUnit;
|
||||
import com.amazonaws.services.cloudwatch.model.StatisticSet;
|
||||
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
|
||||
|
||||
/**
|
||||
* An IMetricsScope that accumulates data from multiple calls to addData with
|
||||
|
|
@ -48,6 +49,11 @@ public abstract class AccumulatingMetricsScope<KeyType> extends EndingMetricsSco
|
|||
addData(getKey(name), name, value, unit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addData(String name, double value, StandardUnit unit, MetricsLevel level) {
|
||||
addData(getKey(name), name, value, unit);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param name
|
||||
* key name for a metric
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
* Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
|
|
@ -14,12 +14,16 @@
|
|||
*/
|
||||
package com.amazonaws.services.kinesis.metrics.impl;
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
import com.amazonaws.ClientConfiguration;
|
||||
import com.amazonaws.auth.AWSCredentialsProvider;
|
||||
import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
|
||||
import com.amazonaws.services.cloudwatch.AmazonCloudWatchClient;
|
||||
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
|
||||
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope;
|
||||
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
|
||||
/**
|
||||
* An IMetricsFactory that creates IMetricsScopes that output themselves via CloudWatch. Batches IMetricsScopes together
|
||||
|
|
@ -27,14 +31,34 @@ import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope;
|
|||
*/
|
||||
public class CWMetricsFactory implements IMetricsFactory {
|
||||
|
||||
/*
|
||||
/**
|
||||
* Default metrics level to enable. By default, all metrics levels are emitted.
|
||||
*/
|
||||
public static final MetricsLevel DEFAULT_METRICS_LEVEL = MetricsLevel.DETAILED;
|
||||
/**
|
||||
* Default metrics dimensions. By default, all dimensions are enabled.
|
||||
*/
|
||||
public static final Set<String> DEFAULT_METRICS_ENABLED_DIMENSIONS = ImmutableSet.of(
|
||||
IMetricsScope.METRICS_DIMENSIONS_ALL);
|
||||
|
||||
/**
|
||||
* If the CWPublisherRunnable accumulates more than FLUSH_SIZE distinct metrics, it will call CloudWatch
|
||||
* immediately instead of waiting for the next scheduled call.
|
||||
*/
|
||||
private static final int FLUSH_SIZE = 200;
|
||||
|
||||
private final CWPublisherRunnable<CWMetricKey> runnable;
|
||||
private final Thread publicationThread;
|
||||
|
||||
/**
|
||||
* Enabled metrics level. All metrics below this level will be dropped.
|
||||
*/
|
||||
private final MetricsLevel metricsLevel;
|
||||
/**
|
||||
* List of enabled dimensions for metrics.
|
||||
*/
|
||||
private final Set<String> metricsEnabledDimensions;
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
*
|
||||
|
|
@ -79,19 +103,41 @@ public class CWMetricsFactory implements IMetricsFactory {
|
|||
String namespace,
|
||||
long bufferTimeMillis,
|
||||
int maxQueueSize) {
|
||||
DefaultCWMetricsPublisher metricPublisher = new DefaultCWMetricsPublisher(cloudWatchClient, namespace);
|
||||
this(cloudWatchClient, namespace, bufferTimeMillis, maxQueueSize,
|
||||
DEFAULT_METRICS_LEVEL, DEFAULT_METRICS_ENABLED_DIMENSIONS);
|
||||
}
|
||||
|
||||
this.runnable =
|
||||
new CWPublisherRunnable<CWMetricKey>(metricPublisher, bufferTimeMillis, maxQueueSize, FLUSH_SIZE);
|
||||
/**
|
||||
* Constructor.
|
||||
*
|
||||
* @param cloudWatchClient Client used to make CloudWatch requests
|
||||
* @param namespace the namespace under which the metrics will appear in the CloudWatch console
|
||||
* @param bufferTimeMillis time to buffer metrics before publishing to CloudWatch
|
||||
* @param maxQueueSize maximum number of metrics that we can have in a queue
|
||||
* @param metricsLevel metrics level to enable
|
||||
* @param metricsEnabledDimensions metrics dimensions to allow
|
||||
*/
|
||||
public CWMetricsFactory(AmazonCloudWatch cloudWatchClient,
|
||||
String namespace,
|
||||
long bufferTimeMillis,
|
||||
int maxQueueSize,
|
||||
MetricsLevel metricsLevel,
|
||||
Set<String> metricsEnabledDimensions) {
|
||||
this.metricsLevel = (metricsLevel == null ? DEFAULT_METRICS_LEVEL : metricsLevel);
|
||||
this.metricsEnabledDimensions = (metricsEnabledDimensions == null
|
||||
? ImmutableSet.<String>of() : ImmutableSet.copyOf(metricsEnabledDimensions));
|
||||
|
||||
this.publicationThread = new Thread(runnable);
|
||||
runnable = new CWPublisherRunnable<CWMetricKey>(
|
||||
new DefaultCWMetricsPublisher(cloudWatchClient, namespace),
|
||||
bufferTimeMillis, maxQueueSize, FLUSH_SIZE);
|
||||
publicationThread = new Thread(runnable);
|
||||
publicationThread.setName("cw-metrics-publisher");
|
||||
publicationThread.start();
|
||||
}
|
||||
|
||||
@Override
|
||||
public IMetricsScope createMetrics() {
|
||||
return new CWMetricsScope(runnable);
|
||||
return new CWMetricsScope(runnable, metricsLevel, metricsEnabledDimensions);
|
||||
}
|
||||
|
||||
public void shutdown() {
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
* Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
|
|
@ -16,37 +16,32 @@ package com.amazonaws.services.kinesis.metrics.impl;
|
|||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import com.amazonaws.services.cloudwatch.model.MetricDatum;
|
||||
import com.amazonaws.services.cloudwatch.model.StandardUnit;
|
||||
import com.amazonaws.services.kinesis.metrics.impl.AccumulateByNameMetricsScope;
|
||||
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope;
|
||||
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
|
||||
|
||||
public class CWMetricsScope extends AccumulateByNameMetricsScope implements IMetricsScope {
|
||||
/**
|
||||
* Metrics scope for CloudWatch metrics.
|
||||
*/
|
||||
public class CWMetricsScope extends FilteringMetricsScope implements IMetricsScope {
|
||||
|
||||
private CWPublisherRunnable<CWMetricKey> publisher;
|
||||
|
||||
/**
|
||||
* Each CWMetricsScope takes a publisher which contains the logic of when to publish metrics.
|
||||
*
|
||||
* @param publisher publishing logic
|
||||
* Creates a CloudWatch metrics scope with given metrics level and enabled dimensions.
|
||||
* @param publisher Publisher that emits CloudWatch metrics periodically.
|
||||
* @param metricsLevel Metrics level to enable. All data with level below this will be dropped.
|
||||
* @param metricsEnabledDimensions Enabled dimensions for CloudWatch metrics.
|
||||
*/
|
||||
|
||||
public CWMetricsScope(CWPublisherRunnable<CWMetricKey> publisher) {
|
||||
public CWMetricsScope(CWPublisherRunnable<CWMetricKey> publisher,
|
||||
MetricsLevel metricsLevel, Set<String> metricsEnabledDimensions) {
|
||||
super(metricsLevel, metricsEnabledDimensions);
|
||||
this.publisher = publisher;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addData(String name, double value, StandardUnit unit) {
|
||||
super.addData(name, value, unit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addDimension(String name, String value) {
|
||||
super.addDimension(name, value);
|
||||
}
|
||||
|
||||
/*
|
||||
/**
|
||||
* Once we call this method, all MetricDatums added to the scope will be enqueued to the publisher runnable.
|
||||
* We enqueue MetricDatumWithKey because the publisher will aggregate similar metrics (i.e. MetricDatum with the
|
||||
* same metricName) in the background thread. Hence aggregation using MetricDatumWithKey will be especially useful
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
* Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
|
|
@ -15,6 +15,7 @@
|
|||
package com.amazonaws.services.kinesis.metrics.impl;
|
||||
|
||||
import com.amazonaws.services.cloudwatch.model.StandardUnit;
|
||||
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
|
||||
|
||||
public abstract class EndingMetricsScope extends DimensionTrackingMetricsScope {
|
||||
|
||||
|
|
@ -27,6 +28,13 @@ public abstract class EndingMetricsScope extends DimensionTrackingMetricsScope {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addData(String name, double value, StandardUnit unit, MetricsLevel level) {
|
||||
if (ended) {
|
||||
throw new IllegalArgumentException("Cannot call addData after calling IMetricsScope.end()");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addDimension(String name, String value) {
|
||||
super.addDimension(name, value);
|
||||
|
|
|
|||
|
|
@ -0,0 +1,106 @@
|
|||
/*
|
||||
* Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.metrics.impl;
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
import com.amazonaws.services.cloudwatch.model.StandardUnit;
|
||||
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
|
||||
/**
|
||||
* An {@code IMetricsScope} that filters {@link #addData} calls based on the provided metrics level. If the provided
|
||||
* metrics level is less than enabled level, then data is dropped. This class also adds the dimension to the scope
|
||||
* if it is enabled.
|
||||
*/
|
||||
public class FilteringMetricsScope extends AccumulateByNameMetricsScope {
|
||||
|
||||
/**
|
||||
* Enabled level for the metrics. All metrics below this level will be dropped.
|
||||
*/
|
||||
private final MetricsLevel metricsLevel;
|
||||
/**
|
||||
* Set of dimensions that are allowed to be emitted.
|
||||
*/
|
||||
private final Set<String> metricsEnabledDimensions;
|
||||
|
||||
/**
|
||||
* Flag that indicates whether all metrics dimensions are allowed or not.
|
||||
*/
|
||||
private final boolean metricsEnabledDimensionsAll;
|
||||
|
||||
/**
|
||||
* Creates a metrics scope that allows all metrics data and dimensions.
|
||||
*/
|
||||
public FilteringMetricsScope() {
|
||||
this(MetricsLevel.DETAILED, ImmutableSet.of(METRICS_DIMENSIONS_ALL));
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a metrics scope that drops data with level below the given enabled level and only allows dimensions
|
||||
* that are part of the given enabled dimensions list.
|
||||
* @param metricsLevel Level of metrics that is enabled. All metrics below this level will be dropped.
|
||||
* @param metricsEnabledDimensions Enabled dimensions.
|
||||
*/
|
||||
public FilteringMetricsScope(MetricsLevel metricsLevel, Set<String> metricsEnabledDimensions) {
|
||||
this.metricsLevel = metricsLevel;
|
||||
this.metricsEnabledDimensions = metricsEnabledDimensions;
|
||||
this.metricsEnabledDimensionsAll = (metricsEnabledDimensions != null &&
|
||||
metricsEnabledDimensions.contains(METRICS_DIMENSIONS_ALL));
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds the data to the metrics scope at lowest metrics level.
|
||||
* @param name Metrics data name.
|
||||
* @param value Value of the metrics.
|
||||
* @param unit Unit of the metrics.
|
||||
*/
|
||||
@Override
|
||||
public void addData(String name, double value, StandardUnit unit) {
|
||||
addData(name, value, unit, MetricsLevel.DETAILED);
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds the data to the metrics scope if the given level is equal to above the enabled metrics
|
||||
* level.
|
||||
* @param name Metrics data name.
|
||||
* @param value Value of the metrics.
|
||||
* @param unit Unit of the metrics.
|
||||
* @param level Metrics level for the data.
|
||||
*/
|
||||
@Override
|
||||
public void addData(String name, double value, StandardUnit unit, MetricsLevel level) {
|
||||
if (level.getValue() < metricsLevel.getValue()) {
|
||||
// Drop the data.
|
||||
return;
|
||||
}
|
||||
super.addData(name, value, unit);
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds given dimension with value if allowed dimensions list contains this dimension's name.
|
||||
* @param name Name of the dimension.
|
||||
* @param value Value for the dimension.
|
||||
*/
|
||||
@Override
|
||||
public void addDimension(String name, String value) {
|
||||
if (!metricsEnabledDimensionsAll &&
|
||||
(metricsEnabledDimensions == null || !metricsEnabledDimensions.contains(name))) {
|
||||
// Drop dimension.
|
||||
return;
|
||||
}
|
||||
super.addDimension(name, value);
|
||||
}
|
||||
}
|
||||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
* Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
|
|
@ -17,6 +17,7 @@ package com.amazonaws.services.kinesis.metrics.impl;
|
|||
import com.amazonaws.services.cloudwatch.model.StandardUnit;
|
||||
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
|
||||
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope;
|
||||
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
|
||||
|
||||
public abstract class InterceptingMetricsFactory implements IMetricsFactory {
|
||||
|
||||
|
|
@ -41,6 +42,10 @@ public abstract class InterceptingMetricsFactory implements IMetricsFactory {
|
|||
scope.addData(name, value, unit);
|
||||
}
|
||||
|
||||
protected void interceptAddData(String name, double value, StandardUnit unit, MetricsLevel level, IMetricsScope scope) {
|
||||
scope.addData(name, value, unit, level);
|
||||
}
|
||||
|
||||
protected void interceptAddDimension(String name, String value, IMetricsScope scope) {
|
||||
scope.addDimension(name, value);
|
||||
}
|
||||
|
|
@ -62,6 +67,11 @@ public abstract class InterceptingMetricsFactory implements IMetricsFactory {
|
|||
interceptAddData(name, value, unit, other);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addData(String name, double value, StandardUnit unit, MetricsLevel level) {
|
||||
interceptAddData(name, value, unit, level, other);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addDimension(String name, String value) {
|
||||
interceptAddDimension(name, value, other);
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
* Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
|
|
@ -20,6 +20,7 @@ import org.apache.commons.logging.LogFactory;
|
|||
import com.amazonaws.services.cloudwatch.model.StandardUnit;
|
||||
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
|
||||
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope;
|
||||
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
|
||||
|
||||
/**
|
||||
* MetricsHelper assists with common metrics operations, most notably the storage of IMetricsScopes objects in a
|
||||
|
|
@ -37,6 +38,7 @@ public class MetricsHelper {
|
|||
* Constants used to publish metrics.
|
||||
*/
|
||||
public static final String OPERATION_DIMENSION_NAME = "Operation";
|
||||
public static final String SHARD_ID_DIMENSION_NAME = "ShardId";
|
||||
public static final String TIME = "Time";
|
||||
public static final String SUCCESS = "Success";
|
||||
private static final String SEP = ".";
|
||||
|
|
@ -73,31 +75,53 @@ public class MetricsHelper {
|
|||
}
|
||||
}
|
||||
|
||||
public static void addSuccessAndLatency(long startTimeMillis, boolean success) {
|
||||
addSuccessAndLatency(null, startTimeMillis, success);
|
||||
public static void addSuccessAndLatency(long startTimeMillis, boolean success, MetricsLevel level) {
|
||||
addSuccessAndLatency(null, startTimeMillis, success, level);
|
||||
}
|
||||
|
||||
public static void addSuccessAndLatency(String prefix, long startTimeMillis, boolean success) {
|
||||
addSuccessAndLatencyPerShard(null, prefix, startTimeMillis, success);
|
||||
public static void addSuccessAndLatency(
|
||||
String prefix, long startTimeMillis, boolean success, MetricsLevel level) {
|
||||
addSuccessAndLatencyPerShard(null, prefix, startTimeMillis, success, level);
|
||||
}
|
||||
|
||||
public static void addSuccessAndLatencyPerShard (
|
||||
String shardId,
|
||||
String prefix,
|
||||
long startTimeMillis,
|
||||
boolean success) {
|
||||
boolean success,
|
||||
MetricsLevel level) {
|
||||
addSuccessAndLatency(shardId, prefix, startTimeMillis, success, level, true, true);
|
||||
}
|
||||
|
||||
public static void addLatency(long startTimeMillis, MetricsLevel level) {
|
||||
addLatency(null, startTimeMillis, level);
|
||||
}
|
||||
|
||||
public static void addLatency(String prefix, long startTimeMillis, MetricsLevel level) {
|
||||
addLatencyPerShard(null, prefix, startTimeMillis, level);
|
||||
}
|
||||
|
||||
public static void addLatencyPerShard(String shardId, String prefix, long startTimeMillis, MetricsLevel level) {
|
||||
addSuccessAndLatency(shardId, prefix, startTimeMillis, false, level, false, true);
|
||||
}
|
||||
|
||||
private static void addSuccessAndLatency(
|
||||
String shardId, String prefix, long startTimeMillis, boolean success, MetricsLevel level,
|
||||
boolean includeSuccess, boolean includeLatency) {
|
||||
IMetricsScope scope = getMetricsScope();
|
||||
|
||||
String realPrefix = prefix == null ? "" : prefix + SEP;
|
||||
|
||||
if (shardId != null) {
|
||||
scope.addDimension("ShardId", shardId);
|
||||
scope.addDimension(SHARD_ID_DIMENSION_NAME, shardId);
|
||||
}
|
||||
if (includeSuccess) {
|
||||
scope.addData(realPrefix + MetricsHelper.SUCCESS, success ? 1 : 0, StandardUnit.Count, level);
|
||||
}
|
||||
if (includeLatency) {
|
||||
scope.addData(realPrefix + MetricsHelper.TIME,
|
||||
System.currentTimeMillis() - startTimeMillis, StandardUnit.Milliseconds, level);
|
||||
}
|
||||
|
||||
scope.addData(realPrefix + MetricsHelper.SUCCESS, success ? 1 : 0, StandardUnit.Count);
|
||||
scope.addData(realPrefix + MetricsHelper.TIME,
|
||||
System.currentTimeMillis() - startTimeMillis,
|
||||
StandardUnit.Milliseconds);
|
||||
}
|
||||
|
||||
public static void endScope() {
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
* Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
|
|
@ -16,6 +16,7 @@ package com.amazonaws.services.kinesis.metrics.impl;
|
|||
|
||||
import com.amazonaws.services.cloudwatch.model.StandardUnit;
|
||||
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope;
|
||||
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
|
||||
|
||||
public class NullMetricsScope implements IMetricsScope {
|
||||
|
||||
|
|
@ -24,6 +25,11 @@ public class NullMetricsScope implements IMetricsScope {
|
|||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addData(String name, double value, StandardUnit unit, MetricsLevel level) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addDimension(String name, String value) {
|
||||
|
||||
|
|
@ -33,5 +39,4 @@ public class NullMetricsScope implements IMetricsScope {
|
|||
public void end() {
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
* Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
|
|
@ -22,6 +22,11 @@ import com.amazonaws.services.cloudwatch.model.StandardUnit;
|
|||
*/
|
||||
public interface IMetricsScope {
|
||||
|
||||
/**
|
||||
* Value that signifies that all dimensions are allowed for the metrics scope.
|
||||
*/
|
||||
public static final String METRICS_DIMENSIONS_ALL = "ALL";
|
||||
|
||||
/**
|
||||
* Adds a data point to this IMetricsScope. Multiple calls against the same IMetricsScope with the same name
|
||||
* parameter will result in accumulation.
|
||||
|
|
@ -32,6 +37,17 @@ public interface IMetricsScope {
|
|||
*/
|
||||
public void addData(String name, double value, StandardUnit unit);
|
||||
|
||||
/**
|
||||
* Adds a data point to this IMetricsScope if given metrics level is enabled. Multiple calls against the same
|
||||
* IMetricsScope with the same name parameter will result in accumulation.
|
||||
*
|
||||
* @param name data point name
|
||||
* @param value data point value
|
||||
* @param unit unit of data point
|
||||
* @param level metrics level of this data point
|
||||
*/
|
||||
public void addData(String name, double value, StandardUnit unit, MetricsLevel level);
|
||||
|
||||
/**
|
||||
* Adds a dimension that applies to all metrics in this IMetricsScope.
|
||||
*
|
||||
|
|
|
|||
|
|
@ -0,0 +1,93 @@
|
|||
/*
|
||||
* Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.metrics.interfaces;
|
||||
|
||||
/**
|
||||
* This class defines a set of standard metrics levels that can be used to control which metrics get emitted.
|
||||
* {@code MetricsLevel} objects are ordered and are specified by ordered integers. Enabling metrics at a given
|
||||
* level also enables the metrics at all higher levels.
|
||||
* <p>
|
||||
* Metrics levels in descending order are:
|
||||
* <ul>
|
||||
* <li>SUMMARY</li>
|
||||
* <li>DETAILED</li>
|
||||
* </ul>
|
||||
* In addition, NONE level can be used to turn off all metrics.
|
||||
*/
|
||||
public enum MetricsLevel {
|
||||
|
||||
/**
|
||||
* NONE metrics level can be used to turn off metrics.
|
||||
*/
|
||||
NONE("NONE", Integer.MAX_VALUE),
|
||||
|
||||
/**
|
||||
* SUMMARY metrics level can be used to emit only the most significant metrics.
|
||||
*/
|
||||
SUMMARY("SUMMARY", 10000),
|
||||
|
||||
/**
|
||||
* DETAILED metrics level can be used to emit all metrics.
|
||||
*/
|
||||
DETAILED("DETAILED", 9000);
|
||||
|
||||
/**
|
||||
* Name of the metrics level.
|
||||
*/
|
||||
private final String name;
|
||||
|
||||
/**
|
||||
* Integer value of the metrics level.
|
||||
*/
|
||||
private final int value;
|
||||
|
||||
/**
|
||||
* Creates metrics level with given name and value.
|
||||
* @param name Metrics level name
|
||||
* @param value Metrics level value
|
||||
*/
|
||||
private MetricsLevel(String name, int value) {
|
||||
this.name = name;
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the name for this metrics level.
|
||||
* @return Returns the name for this metrics level.
|
||||
*/
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the value for this metrics level.
|
||||
* @return Returns the value for this metrics level.
|
||||
*/
|
||||
public int getValue() {
|
||||
return value;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns metrics level associated with the given name.
|
||||
* @param name Name of the metrics level.
|
||||
* @return Returns metrics level associated with the given name.
|
||||
*/
|
||||
public static MetricsLevel fromName(String name) {
|
||||
if (name != null) {
|
||||
name = name.toUpperCase();
|
||||
}
|
||||
return valueOf(name);
|
||||
}
|
||||
}
|
||||
Loading…
Reference in a new issue