Merge branch 'awslabs:master' into master

This commit is contained in:
pelaezryan 2023-06-23 15:58:40 -07:00 committed by GitHub
commit 2f83cb68ac
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
45 changed files with 1312 additions and 78 deletions

32
.github/workflows/maven.yml vendored Normal file
View file

@ -0,0 +1,32 @@
# This workflow will build a Java project with Maven, and cache/restore any dependencies to improve the workflow execution time
# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-java-with-maven
# This workflow uses actions that are not certified by GitHub.
# They are provided by a third-party and are governed by
# separate terms of service, privacy policy, and support
# documentation.
name: Java CI with Maven
on:
push:
branches:
- "master"
pull_request:
branches:
- "master"
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up JDK 8
uses: actions/setup-java@v3
with:
java-version: '8'
distribution: 'corretto'
- name: Build with Maven
run: mvn -B package --file pom.xml -DskipITs

View file

@ -36,6 +36,12 @@ After you've downloaded the code from GitHub, you can build it using Maven. To d
resources (which requires manual cleanup). Integration tests require valid AWS credentials need to be discovered at
runtime. To skip running integration tests, add ` -DskipITs` option to the build command.
## Running Integration Tests
To run integration tests: `mvn -Dit.test=*IntegrationTest verify`.
This will look for a default AWS profile specified in your local `.aws/credentials`.
Optionally, you can provide the name of an IAM user/role to run tests with as a string using this command: `mvn -Dit.test=*IntegrationTest -DawsProfile="<PROFILE_NAME>" verify`.
## Integration with the Kinesis Producer Library
For producer-side developers using the **[Kinesis Producer Library (KPL)][kinesis-guide-kpl]**, the KCL integrates without additional effort. When the KCL retrieves an aggregated Amazon Kinesis record consisting of multiple KPL user records, it will automatically invoke the KPL to extract the individual user records before returning them to the user.

View file

@ -207,6 +207,10 @@
<name>sqlite4java.library.path</name>
<value>${sqlite4java.libpath}</value>
</property>
<property>
<name>awsProfile</name>
<value>${awsProfile}</value>
</property>
</systemProperties>
</configuration>
</plugin>

View file

@ -144,7 +144,8 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi
* {@inheritDoc}
*/
@Override
public PreparedCheckpointer prepareCheckpoint(byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
public PreparedCheckpointer prepareCheckpoint(byte[] applicationState)
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
return prepareCheckpoint(largestPermittedCheckpointValue.sequenceNumber(), applicationState);
}
@ -152,7 +153,8 @@ public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpoi
* {@inheritDoc}
*/
@Override
public PreparedCheckpointer prepareCheckpoint(Record record, byte[] applicationState) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
public PreparedCheckpointer prepareCheckpoint(Record record, byte[] applicationState)
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
//
// TODO: UserRecord Deprecation
//

View file

@ -103,7 +103,8 @@ public class DynamoDBCheckpointer implements Checkpointer {
}
@Override
public void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken, byte[] pendingCheckpointState) throws KinesisClientLibException {
public void prepareCheckpoint(String leaseKey, ExtendedSequenceNumber pendingCheckpoint, String concurrencyToken,
byte[] pendingCheckpointState) throws KinesisClientLibException {
try {
boolean wasSuccessful =
prepareCheckpoint(leaseKey, pendingCheckpoint, UUID.fromString(concurrencyToken), pendingCheckpointState);

View file

@ -23,10 +23,11 @@ import software.amazon.awssdk.services.kinesis.model.HashKeyRange;
import java.math.BigInteger;
@Value @Accessors(fluent = true)
/**
* Lease POJO to hold the starting hashkey range and ending hashkey range of kinesis shards.
*/
@Accessors(fluent = true)
@Value
public class HashKeyRangeForLease {
private final BigInteger startingHashKey;

View file

@ -16,15 +16,15 @@ package software.amazon.kinesis.coordinator;
public class NoOpWorkerStateChangeListener implements WorkerStateChangeListener {
/**
* Empty constructor for NoOp Worker State Change Listener
*/
public NoOpWorkerStateChangeListener() {
/**
* Empty constructor for NoOp Worker State Change Listener
*/
public NoOpWorkerStateChangeListener() {
}
}
@Override
public void onWorkerStateChange(WorkerState newState) {
@Override
public void onWorkerStateChange(WorkerState newState) {
}
}
}

View file

@ -348,7 +348,7 @@ class PeriodicShardSyncManager {
((MultiStreamLease) lease).shardId() :
lease.leaseKey();
final Shard shard = kinesisShards.get(shardId);
if(shard == null) {
if (shard == null) {
return lease;
}
lease.hashKeyRange(fromHashKeyRange(shard.hashKeyRange()));
@ -372,7 +372,7 @@ class PeriodicShardSyncManager {
List<Lease> leasesWithHashKeyRanges) {
// Sort the hash ranges by starting hash key.
List<Lease> sortedLeasesWithHashKeyRanges = sortLeasesByHashRange(leasesWithHashKeyRanges);
if(sortedLeasesWithHashKeyRanges.isEmpty()) {
if (sortedLeasesWithHashKeyRanges.isEmpty()) {
log.error("No leases with valid hashranges found for stream {}", streamIdentifier);
return Optional.of(new HashRangeHole());
}
@ -417,8 +417,9 @@ class PeriodicShardSyncManager {
@VisibleForTesting
static List<Lease> sortLeasesByHashRange(List<Lease> leasesWithHashKeyRanges) {
if (leasesWithHashKeyRanges.size() == 0 || leasesWithHashKeyRanges.size() == 1)
if (leasesWithHashKeyRanges.size() == 0 || leasesWithHashKeyRanges.size() == 1) {
return leasesWithHashKeyRanges;
}
Collections.sort(leasesWithHashKeyRanges, new HashKeyRangeComparator());
return leasesWithHashKeyRanges;
}

View file

@ -544,7 +544,8 @@ public class Scheduler implements Runnable {
final Map<Boolean, Set<StreamIdentifier>> staleStreamIdDeletionDecisionMap = staleStreamDeletionMap.keySet().stream().collect(Collectors
.partitioningBy(newStreamConfigMap::containsKey, Collectors.toSet()));
final Set<StreamIdentifier> staleStreamIdsToBeDeleted = staleStreamIdDeletionDecisionMap.get(false).stream().filter(streamIdentifier ->
Duration.between(staleStreamDeletionMap.get(streamIdentifier), Instant.now()).toMillis() >= waitPeriodToDeleteOldStreams.toMillis()).collect(Collectors.toSet());
Duration.between(staleStreamDeletionMap.get(streamIdentifier), Instant.now()).toMillis() >= waitPeriodToDeleteOldStreams.toMillis())
.collect(Collectors.toSet());
// These are the streams which are deleted in Kinesis and we encounter resource not found during
// shardSyncTask. This is applicable in MultiStreamMode only, in case of SingleStreamMode, store will
// not have any data.
@ -611,7 +612,7 @@ public class Scheduler implements Runnable {
}
private void removeStreamsFromStaleStreamsList(Set<StreamIdentifier> streamIdentifiers) {
for(StreamIdentifier streamIdentifier : streamIdentifiers) {
for (StreamIdentifier streamIdentifier : streamIdentifiers) {
staleStreamDeletionMap.remove(streamIdentifier);
}
}

View file

@ -19,16 +19,16 @@ package software.amazon.kinesis.coordinator;
*/
@FunctionalInterface
public interface WorkerStateChangeListener {
enum WorkerState {
CREATED,
INITIALIZING,
STARTED,
SHUT_DOWN_STARTED,
SHUT_DOWN
}
enum WorkerState {
CREATED,
INITIALIZING,
STARTED,
SHUT_DOWN_STARTED,
SHUT_DOWN
}
void onWorkerStateChange(WorkerState newState);
void onWorkerStateChange(WorkerState newState);
default void onAllInitializationAttemptsFailed(Throwable e) {
}
default void onAllInitializationAttemptsFailed(Throwable e) {
}
}

View file

@ -80,7 +80,7 @@ public class HierarchicalShardSyncer {
private static final String MIN_HASH_KEY = BigInteger.ZERO.toString();
private static final String MAX_HASH_KEY = new BigInteger("2").pow(128).subtract(BigInteger.ONE).toString();
private static final int retriesForCompleteHashRange = 3;
private static final int RETRIES_FOR_COMPLETE_HASH_RANGE = 3;
private static final long DELAY_BETWEEN_LIST_SHARDS_MILLIS = 1000;
@ -98,7 +98,7 @@ public class HierarchicalShardSyncer {
this.deletedStreamListProvider = deletedStreamListProvider;
}
private static final BiFunction<Lease, MultiStreamArgs, String> shardIdFromLeaseDeducer =
private static final BiFunction<Lease, MultiStreamArgs, String> SHARD_ID_FROM_LEASE_DEDUCER =
(lease, multiStreamArgs) ->
multiStreamArgs.isMultiStreamMode() ?
((MultiStreamLease) lease).shardId() :
@ -129,7 +129,9 @@ public class HierarchicalShardSyncer {
isLeaseTableEmpty);
}
//Provide a pre-collcted list of shards to avoid calling ListShards API
/**
* Provide a pre-collected list of shards to avoid calling ListShards API
*/
public synchronized boolean checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector,
final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition,
List<Shard> latestShards, final boolean ignoreUnexpectedChildShards, final MetricsScope scope, final boolean isLeaseTableEmpty)
@ -163,7 +165,7 @@ public class HierarchicalShardSyncer {
final long startTime = System.currentTimeMillis();
boolean success = false;
try {
if(leaseRefresher.createLeaseIfNotExists(lease)) {
if (leaseRefresher.createLeaseIfNotExists(lease)) {
createdLeases.add(lease);
}
success = true;
@ -268,7 +270,7 @@ public class HierarchicalShardSyncer {
List<Shard> shards;
for (int i = 0; i < retriesForCompleteHashRange; i++) {
for (int i = 0; i < RETRIES_FOR_COMPLETE_HASH_RANGE; i++) {
shards = shardDetector.listShardsWithFilter(shardFilter);
if (shards == null) {
@ -284,7 +286,7 @@ public class HierarchicalShardSyncer {
}
throw new KinesisClientLibIOException("Hash range of shards returned for " + streamName + " was incomplete after "
+ retriesForCompleteHashRange + " retries.");
+ RETRIES_FOR_COMPLETE_HASH_RANGE + " retries.");
}
private List<Shard> getShardList(@NonNull final ShardDetector shardDetector) throws KinesisClientLibIOException {
@ -365,7 +367,8 @@ public class HierarchicalShardSyncer {
* @return List of new leases to create sorted by starting sequenceNumber of the corresponding shard
*/
static List<Lease> determineNewLeasesToCreate(final LeaseSynchronizer leaseSynchronizer, final List<Shard> shards,
final List<Lease> currentLeases, final InitialPositionInStreamExtended initialPosition,final Set<String> inconsistentShardIds) {
final List<Lease> currentLeases, final InitialPositionInStreamExtended initialPosition,
final Set<String> inconsistentShardIds) {
return determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, initialPosition, inconsistentShardIds,
new MultiStreamArgs(false, null));
}
@ -499,11 +502,13 @@ public class HierarchicalShardSyncer {
if (descendantParentShardIds.contains(parentShardId)
&& !initialPosition.getInitialPositionInStream()
.equals(InitialPositionInStream.AT_TIMESTAMP)) {
log.info("Setting Lease '{}' checkpoint to 'TRIM_HORIZON'. Checkpoint was previously set to {}", lease.leaseKey(), lease.checkpoint());
log.info("Setting Lease '{}' checkpoint to 'TRIM_HORIZON'. Checkpoint was previously set to {}",
lease.leaseKey(), lease.checkpoint());
lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
} else {
final ExtendedSequenceNumber newCheckpoint = convertToCheckpoint(initialPosition);
log.info("Setting Lease '{}' checkpoint to '{}'. Checkpoint was previously set to {}", lease.leaseKey(), newCheckpoint, lease.checkpoint());
log.info("Setting Lease '{}' checkpoint to '{}'. Checkpoint was previously set to {}",
lease.leaseKey(), newCheckpoint, lease.checkpoint());
lease.checkpoint(newCheckpoint);
}
}
@ -728,8 +733,8 @@ public class HierarchicalShardSyncer {
@Override
public int compare(final Lease lease1, final Lease lease2) {
int result = 0;
final String shardId1 = shardIdFromLeaseDeducer.apply(lease1, multiStreamArgs);
final String shardId2 = shardIdFromLeaseDeducer.apply(lease2, multiStreamArgs);
final String shardId1 = SHARD_ID_FROM_LEASE_DEDUCER.apply(lease1, multiStreamArgs);
final String shardId2 = SHARD_ID_FROM_LEASE_DEDUCER.apply(lease2, multiStreamArgs);
final Shard shard1 = shardIdToShardMap.get(shardId1);
final Shard shard2 = shardIdToShardMap.get(shardId2);
@ -802,7 +807,7 @@ public class HierarchicalShardSyncer {
final Map<String, Shard> shardIdToShardMapOfAllKinesisShards = constructShardIdToShardMap(shards);
currentLeases.stream().peek(lease -> log.debug("{} : Existing lease: {}", streamIdentifier, lease))
.map(lease -> shardIdFromLeaseDeducer.apply(lease, multiStreamArgs))
.map(lease -> SHARD_ID_FROM_LEASE_DEDUCER.apply(lease, multiStreamArgs))
.collect(Collectors.toSet());
final List<Lease> newLeasesToCreate = getLeasesToCreateForOpenAndClosedShards(initialPosition, shards, multiStreamArgs, streamIdentifier);
@ -908,7 +913,7 @@ public class HierarchicalShardSyncer {
.map(streamId -> streamId.serialize()).orElse("");
final Set<String> shardIdsOfCurrentLeases = currentLeases.stream()
.peek(lease -> log.debug("{} : Existing lease: {}", streamIdentifier, lease))
.map(lease -> shardIdFromLeaseDeducer.apply(lease, multiStreamArgs))
.map(lease -> SHARD_ID_FROM_LEASE_DEDUCER.apply(lease, multiStreamArgs))
.collect(Collectors.toSet());
final List<Shard> openShards = getOpenShards(shards, streamIdentifier);

View file

@ -179,7 +179,7 @@ public class LeaseCleanupManager {
try {
if (cleanupLeasesUponShardCompletion && timeToCheckForCompletedShard) {
final Lease leaseFromDDB = leaseCoordinator.leaseRefresher().getLease(lease.leaseKey());
if(leaseFromDDB != null) {
if (leaseFromDDB != null) {
Set<String> childShardKeys = leaseFromDDB.childShardIds();
if (CollectionUtils.isNullOrEmpty(childShardKeys)) {
try {

View file

@ -310,7 +310,7 @@ public class LeaseManagementConfig {
private LeaseManagementFactory leaseManagementFactory;
public HierarchicalShardSyncer hierarchicalShardSyncer() {
if(hierarchicalShardSyncer == null) {
if (hierarchicalShardSyncer == null) {
hierarchicalShardSyncer = new HierarchicalShardSyncer();
}
return hierarchicalShardSyncer;
@ -356,7 +356,7 @@ public class LeaseManagementConfig {
* @return LeaseManagementFactory
*/
public LeaseManagementFactory leaseManagementFactory(final LeaseSerializer leaseSerializer, boolean isMultiStreamingMode) {
if(leaseManagementFactory == null) {
if (leaseManagementFactory == null) {
leaseManagementFactory = new DynamoDBLeaseManagementFactory(kinesisClient(),
dynamoDBClient(),
tableName(),

View file

@ -37,7 +37,7 @@ import software.amazon.kinesis.metrics.MetricsUtil;
@Slf4j
@KinesisClientInternalApi
public class ShardSyncTask implements ConsumerTask {
private final String SHARD_SYNC_TASK_OPERATION = "ShardSyncTask";
private static final String SHARD_SYNC_TASK_OPERATION = "ShardSyncTask";
@NonNull
private final ShardDetector shardDetector;

View file

@ -187,7 +187,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
public boolean createLeaseTableIfNotExists(@NonNull final Long readCapacity, @NonNull final Long writeCapacity)
throws ProvisionedThroughputException, DependencyException {
final CreateTableRequest.Builder builder = createTableRequestBuilder();
if(BillingMode.PROVISIONED.equals(billingMode)) {
if (BillingMode.PROVISIONED.equals(billingMode)) {
ProvisionedThroughput throughput = ProvisionedThroughput.builder().readCapacityUnits(readCapacity)
.writeCapacityUnits(writeCapacity).build();
builder.provisionedThroughput(throughput);
@ -467,7 +467,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
} catch (DynamoDbException | TimeoutException e) {
throw convertAndRethrowExceptions("create", lease.leaseKey(), e);
}
log.info("Created lease: {}",lease);
log.info("Created lease: {}", lease);
return true;
}

View file

@ -89,7 +89,7 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer {
result.put(PENDING_CHECKPOINT_STATE_KEY, DynamoUtils.createAttributeValue(lease.checkpoint().subSequenceNumber()));
}
if(lease.hashKeyRangeForLease() != null) {
if (lease.hashKeyRangeForLease() != null) {
result.put(STARTING_HASH_KEY, DynamoUtils.createAttributeValue(lease.hashKeyRangeForLease().serializedStartingHashKey()));
result.put(ENDING_HASH_KEY, DynamoUtils.createAttributeValue(lease.hashKeyRangeForLease().serializedEndingHashKey()));
}
@ -274,7 +274,7 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer {
result.put(CHILD_SHARD_IDS_KEY, putUpdate(DynamoUtils.createAttributeValue(lease.childShardIds())));
}
if(lease.hashKeyRangeForLease() != null) {
if (lease.hashKeyRangeForLease() != null) {
result.put(STARTING_HASH_KEY, putUpdate(DynamoUtils.createAttributeValue(lease.hashKeyRangeForLease().serializedStartingHashKey())));
result.put(ENDING_HASH_KEY, putUpdate(DynamoUtils.createAttributeValue(lease.hashKeyRangeForLease().serializedEndingHashKey())));
}

View file

@ -19,9 +19,9 @@ package software.amazon.kinesis.leases.exceptions;
*/
public class CustomerApplicationException extends Exception {
public CustomerApplicationException(Throwable e) { super(e);}
public CustomerApplicationException(Throwable e) { super(e); }
public CustomerApplicationException(String message, Throwable e) { super(message, e);}
public CustomerApplicationException(String message, Throwable e) { super(message, e); }
public CustomerApplicationException(String message) { super(message);}
public CustomerApplicationException(String message) { super(message); }
}

View file

@ -212,8 +212,10 @@ public class ProcessTask implements ConsumerTask {
log.debug("Calling application processRecords() with {} records from {}", records.size(),
shardInfoId);
final ProcessRecordsInput processRecordsInput = ProcessRecordsInput.builder().records(records).cacheExitTime(input.cacheExitTime()).cacheEntryTime(input.cacheEntryTime())
.isAtShardEnd(input.isAtShardEnd()).checkpointer(recordProcessorCheckpointer).millisBehindLatest(input.millisBehindLatest()).build();
final ProcessRecordsInput processRecordsInput = ProcessRecordsInput.builder().records(records)
.cacheExitTime(input.cacheExitTime()).cacheEntryTime(input.cacheEntryTime())
.isAtShardEnd(input.isAtShardEnd()).checkpointer(recordProcessorCheckpointer)
.millisBehindLatest(input.millisBehindLatest()).build();
final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, PROCESS_TASK_OPERATION);
shardInfo.streamIdentifierSerOpt()

View file

@ -61,7 +61,7 @@ class ShardConsumerSubscriber implements Subscriber<RecordsRetrieved> {
@Deprecated
ShardConsumerSubscriber(RecordsPublisher recordsPublisher, ExecutorService executorService, int bufferSize,
ShardConsumer shardConsumer) {
this(recordsPublisher,executorService,bufferSize,shardConsumer, LifecycleConfig.DEFAULT_READ_TIMEOUTS_TO_IGNORE);
this(recordsPublisher, executorService, bufferSize, shardConsumer, LifecycleConfig.DEFAULT_READ_TIMEOUTS_TO_IGNORE);
}
ShardConsumerSubscriber(RecordsPublisher recordsPublisher, ExecutorService executorService, int bufferSize,
@ -74,7 +74,6 @@ class ShardConsumerSubscriber implements Subscriber<RecordsRetrieved> {
this.shardInfoId = ShardInfo.getLeaseKey(shardConsumer.shardInfo());
}
void startSubscriptions() {
synchronized (lockObject) {
// Setting the lastRequestTime to allow for health checks to restart subscriptions if they failed to
@ -131,7 +130,9 @@ class ShardConsumerSubscriber implements Subscriber<RecordsRetrieved> {
Duration timeSinceLastResponse = Duration.between(lastRequestTime, now);
if (timeSinceLastResponse.toMillis() > maxTimeBetweenRequests) {
log.error(
// CHECKSTYLE.OFF: LineLength
"{}: Last request was dispatched at {}, but no response as of {} ({}). Cancelling subscription, and restarting. Last successful request details -- {}",
// CHECKSTYLE.ON: LineLength
shardInfoId, lastRequestTime, now, timeSinceLastResponse, recordsPublisher.getLastSuccessfulRequestDetails());
cancel();

View file

@ -283,7 +283,7 @@ public class ShutdownTask implements ConsumerTask {
}
}
for(ChildShard childShard : childShards) {
for (ChildShard childShard : childShards) {
final String leaseKey = ShardInfo.getLeaseKey(shardInfo, childShard.shardId());
if (leaseRefresher.getLease(leaseKey) == null) {
log.debug("{} - Shard {} - Attempting to create lease for child shard {}", shardDetector.streamIdentifier(), shardInfo.shardId(), leaseKey);

View file

@ -20,9 +20,7 @@ import java.util.Objects;
import software.amazon.awssdk.services.cloudwatch.model.Dimension;
import software.amazon.awssdk.services.cloudwatch.model.MetricDatum;
/*
/**
* A representation of a key of a MetricDatum. This class is useful when wanting to compare
* whether 2 keys have the same MetricDatum. This feature will be used in MetricAccumulatingQueue
* where we aggregate metrics across multiple MetricScopes.
@ -48,12 +46,15 @@ public class CloudWatchMetricKey {
@Override
public boolean equals(Object obj) {
if (this == obj)
if (this == obj) {
return true;
if (obj == null)
}
if (obj == null) {
return false;
if (getClass() != obj.getClass())
}
if (getClass() != obj.getClass()) {
return false;
}
CloudWatchMetricKey other = (CloudWatchMetricKey) obj;
return Objects.equals(other.dimensions, dimensions) && Objects.equals(other.metricName, metricName);
}

View file

@ -36,7 +36,6 @@ import java.util.Objects;
*
* MetricDatumWithKey<SampleMetricKey> sampleDatumWithKey = new MetricDatumWithKey<SampleMetricKey>(new
* SampleMetricKey(System.currentTimeMillis()), datum)
*
*/
@AllArgsConstructor
@Setter
@ -59,12 +58,15 @@ public class MetricDatumWithKey<KeyType> {
@Override
public boolean equals(Object obj) {
if (this == obj)
if (this == obj) {
return true;
if (obj == null)
}
if (obj == null) {
return false;
if (getClass() != obj.getClass())
}
if (getClass() != obj.getClass()) {
return false;
}
MetricDatumWithKey<?> other = (MetricDatumWithKey<?>) obj;
return Objects.equals(other.key, key) && Objects.equals(other.datum, datum);
}

View file

@ -192,7 +192,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
// Take action based on the time spent by the event in queue.
takeDelayedDeliveryActionIfRequired(streamAndShardId, recordsRetrievedContext.getEnqueueTimestamp(), log);
// Update current sequence number for the successfully delivered event.
currentSequenceNumber = ((FanoutRecordsRetrieved)recordsRetrieved).continuationSequenceNumber();
currentSequenceNumber = ((FanoutRecordsRetrieved) recordsRetrieved).continuationSequenceNumber();
// Update the triggering flow for post scheduling upstream request.
flowToBeReturned = recordsRetrievedContext.getRecordFlow();
// Try scheduling the next event in the queue or execute the subscription shutdown action.
@ -206,7 +206,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
if (flow != null && recordsDeliveryAck.batchUniqueIdentifier().getFlowIdentifier()
.equals(flow.getSubscribeToShardId())) {
log.error(
"{}: Received unexpected ack for the active subscription {}. Throwing. ", streamAndShardId, recordsDeliveryAck.batchUniqueIdentifier().getFlowIdentifier());
"{}: Received unexpected ack for the active subscription {}. Throwing.",
streamAndShardId, recordsDeliveryAck.batchUniqueIdentifier().getFlowIdentifier());
throw new IllegalStateException("Unexpected ack for the active subscription");
}
// Otherwise publisher received a stale ack.
@ -315,7 +316,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
synchronized (lockObject) {
if (!hasValidSubscriber()) {
if(hasValidFlow()) {
if (hasValidFlow()) {
log.warn(
"{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ {} id: {} -- Subscriber is null." +
" Last successful request details -- {}", streamAndShardId, flow.connectionStartedAt,
@ -335,7 +336,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
if (flow != null) {
String logMessage = String.format(
"%s: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ %s id: %s -- %s." +
" Last successful request details -- %s", streamAndShardId, flow.connectionStartedAt, flow.subscribeToShardId, category.throwableTypeString, lastSuccessfulRequestDetails);
" Last successful request details -- %s", streamAndShardId, flow.connectionStartedAt,
flow.subscribeToShardId, category.throwableTypeString, lastSuccessfulRequestDetails);
switch (category.throwableType) {
case READ_TIMEOUT:
log.debug(logMessage, propagationThrowable);
@ -367,7 +369,9 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
} else {
if (triggeringFlow != null) {
log.debug(
// CHECKSTYLE.OFF: LineLength
"{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ {} id: {} -- {} -> triggeringFlow wasn't the active flow. Didn't dispatch error",
// CHECKSTYLE.ON: LineLength
streamAndShardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId,
category.throwableTypeString);
triggeringFlow.cancel();
@ -603,7 +607,9 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
synchronized (lockObject) {
if (subscriber != s) {
log.warn(
// CHECKSTYLE.OFF: LineLength
"{}: (FanOutRecordsPublisher/Subscription#request) - Rejected an attempt to request({}), because subscribers don't match. Last successful request details -- {}",
// CHECKSTYLE.ON: LineLength
streamAndShardId, n, lastSuccessfulRequestDetails);
return;
}
@ -630,13 +636,17 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
synchronized (lockObject) {
if (subscriber != s) {
log.warn(
// CHECKSTYLE.OFF: LineLength
"{}: (FanOutRecordsPublisher/Subscription#cancel) - Rejected attempt to cancel subscription, because subscribers don't match. Last successful request details -- {}",
// CHECKSTYLE.ON: LineLength
streamAndShardId, lastSuccessfulRequestDetails);
return;
}
if (!hasValidSubscriber()) {
log.warn(
// CHECKSTYLE.OFF: LineLength
"{}: (FanOutRecordsPublisher/Subscription#cancel) - Cancelled called even with an invalid subscriber. Last successful request details -- {}",
// CHECKSTYLE.ON: LineLength
streamAndShardId, lastSuccessfulRequestDetails);
}
subscriber = null;
@ -778,7 +788,11 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
executeExceptionOccurred(throwable);
} else {
final SubscriptionShutdownEvent subscriptionShutdownEvent = new SubscriptionShutdownEvent(
() -> {parent.recordsDeliveryQueue.poll(); executeExceptionOccurred(throwable);}, "onError", throwable);
() -> {
parent.recordsDeliveryQueue.poll();
executeExceptionOccurred(throwable);
},
"onError", throwable);
tryEnqueueSubscriptionShutdownEvent(subscriptionShutdownEvent);
}
}
@ -786,7 +800,6 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
private void executeExceptionOccurred(Throwable throwable) {
synchronized (parent.lockObject) {
log.debug("{}: [SubscriptionLifetime]: (RecordFlow#exceptionOccurred) @ {} id: {} -- {}: {}",
parent.streamAndShardId, connectionStartedAt, subscribeToShardId, throwable.getClass().getName(),
throwable.getMessage());
@ -803,7 +816,9 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
isErrorDispatched = true;
} else {
log.debug(
// CHECKSTYLE.OFF: LineLength
"{}: [SubscriptionLifetime]: (RecordFlow#exceptionOccurred) @ {} id: {} -- An error has previously been dispatched, not dispatching this error {}: {}",
// CHECKSTYLE.OFF: LineLength
parent.streamAndShardId, connectionStartedAt, subscribeToShardId, throwable.getClass().getName(),
throwable.getMessage());
}
@ -817,7 +832,11 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
executeComplete();
} else {
final SubscriptionShutdownEvent subscriptionShutdownEvent = new SubscriptionShutdownEvent(
() -> {parent.recordsDeliveryQueue.poll(); executeComplete();}, "onComplete");
() -> {
parent.recordsDeliveryQueue.poll();
executeComplete();
},
"onComplete");
tryEnqueueSubscriptionShutdownEvent(subscriptionShutdownEvent);
}
}
@ -830,7 +849,9 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
.add(new RecordsRetrievedContext(Either.right(subscriptionShutdownEvent), this, Instant.now()));
} catch (Exception e) {
log.warn(
// CHECKSTYLE.OFF: LineLength
"{}: Unable to enqueue the {} shutdown event due to capacity restrictions in delivery queue with remaining capacity {}. Ignoring. Last successful request details -- {}",
// CHECKSTYLE.ON: LineLength
parent.streamAndShardId, subscriptionShutdownEvent.getEventIdentifier(), parent.recordsDeliveryQueue.remainingCapacity(),
parent.lastSuccessfulRequestDetails, subscriptionShutdownEvent.getShutdownEventThrowableOptional());
}
@ -854,7 +875,9 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
}
if (this.isDisposed) {
log.warn(
// CHECKSTYLE.OFF: LineLength
"{}: [SubscriptionLifetime]: (RecordFlow#complete) @ {} id: {} -- This flow has been disposed not dispatching completion. Last successful request details -- {}",
// CHECKSTYLE.ON: LineLength
parent.streamAndShardId, connectionStartedAt, subscribeToShardId, parent.lastSuccessfulRequestDetails);
return;
}

View file

@ -54,7 +54,7 @@ public class FanOutRetrievalFactory implements RetrievalFactory {
final StreamConfig streamConfig,
final MetricsFactory metricsFactory) {
final Optional<String> streamIdentifierStr = shardInfo.streamIdentifierSerOpt();
if(streamIdentifierStr.isPresent()) {
if (streamIdentifierStr.isPresent()) {
final StreamIdentifier streamIdentifier = StreamIdentifier.multiStreamInstance(streamIdentifierStr.get());
return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(),
getOrCreateConsumerArn(streamIdentifier, streamConfig.consumerArn()),

View file

@ -145,7 +145,9 @@ public class KinesisDataFetcher implements DataFetcher {
}
}
// CHECKSTYLE.OFF: MemberName
final DataFetcherResult TERMINAL_RESULT = new DataFetcherResult() {
// CHECKSTYLE.ON: MemberName
@Override
public GetRecordsResponse getResult() {
return GetRecordsResponse.builder()

View file

@ -137,7 +137,7 @@ public class PollingConfig implements RetrievalSpecificConfig {
@Override
public RetrievalFactory retrievalFactory() {
// Prioritize the PollingConfig specified value if its updated.
if(usePollingConfigIdleTimeValue) {
if (usePollingConfigIdleTimeValue) {
recordsFetcherFactory.idleMillisBetweenCalls(idleTimeBetweenReadsInMillis);
}
return new SynchronousBlockingRetrievalFactory(streamName(), kinesisClient(), recordsFetcherFactory,

View file

@ -327,7 +327,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
}
resetLock.writeLock().lock();
try {
publisherSession.reset((PrefetchRecordsRetrieved)recordsRetrieved);
publisherSession.reset((PrefetchRecordsRetrieved) recordsRetrieved);
wasReset = true;
} finally {
resetLock.writeLock().unlock();
@ -555,7 +555,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
return;
}
// Add a sleep if lastSuccessfulCall is still null but this is not the first try to avoid retry storm
if(lastSuccessfulCall == null) {
if (lastSuccessfulCall == null) {
Thread.sleep(idleMillisBetweenCalls);
return;
}

View file

@ -0,0 +1,192 @@
package software.amazon.kinesis.config;
import lombok.Value;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.utils.RecordValidatorQueue;
import software.amazon.kinesis.utils.ReshardOptions;
import software.amazon.kinesis.utils.TestRecordProcessorFactory;
import lombok.Builder;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
import software.amazon.awssdk.http.Protocol;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClientBuilder;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClientBuilder;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder;
import software.amazon.awssdk.utils.AttributeMap;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.retrieval.RetrievalConfig;
import java.io.IOException;
import java.net.Inet4Address;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.Optional;
/**
* Default configuration for a producer or consumer used in integration tests.
* Producer: puts records of size 60 KB at an interval of 100 ms
* Consumer: streaming configuration (vs polling) that starts processing records at shard horizon
*/
public abstract class KCLAppConfig {
private KinesisAsyncClient kinesisAsyncClient;
private DynamoDbAsyncClient dynamoDbAsyncClient;
private CloudWatchAsyncClient cloudWatchAsyncClient;
private RecordValidatorQueue recordValidator;
/**
* Name used for test stream and lease tracker table
*/
public abstract String getStreamName();
public int getShardCount() { return 4; }
public Region getRegion() { return Region.US_WEST_2; }
/**
* "default" profile, should match with profiles listed in "cat ~/.aws/config"
*/
private AwsCredentialsProvider getCredentialsProvider() {
final String awsProfile = System.getProperty("awsProfile");
return (awsProfile != null) ?
ProfileCredentialsProvider.builder().profileName(awsProfile).build() : DefaultCredentialsProvider.create();
}
public InitialPositionInStream getInitialPosition() {
return InitialPositionInStream.TRIM_HORIZON;
}
public abstract Protocol getKinesisClientProtocol();
public ProducerConfig getProducerConfig() {
return ProducerConfig.builder()
.isBatchPut(false)
.batchSize(1)
.recordSizeKB(60)
.callPeriodMills(100)
.build();
}
public ReshardConfig getReshardConfig() {
return null;
}
public final KinesisAsyncClient buildAsyncKinesisClient() throws URISyntaxException, IOException {
if (kinesisAsyncClient == null) {
// Setup H2 client config.
final NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder()
.maxConcurrency(Integer.MAX_VALUE);
builder.protocol(getKinesisClientProtocol());
final SdkAsyncHttpClient sdkAsyncHttpClient =
builder.buildWithDefaults(AttributeMap.builder().build());
// Setup client builder by default values
final KinesisAsyncClientBuilder kinesisAsyncClientBuilder = KinesisAsyncClient.builder().region(getRegion());
kinesisAsyncClientBuilder.httpClient(sdkAsyncHttpClient);
kinesisAsyncClientBuilder.credentialsProvider(getCredentialsProvider());
this.kinesisAsyncClient = kinesisAsyncClientBuilder.build();
}
return this.kinesisAsyncClient;
}
public final DynamoDbAsyncClient buildAsyncDynamoDbClient() throws IOException {
if (this.dynamoDbAsyncClient == null) {
final DynamoDbAsyncClientBuilder builder = DynamoDbAsyncClient.builder().region(getRegion());
builder.credentialsProvider(getCredentialsProvider());
this.dynamoDbAsyncClient = builder.build();
}
return this.dynamoDbAsyncClient;
}
public final CloudWatchAsyncClient buildAsyncCloudWatchClient() throws IOException {
if (this.cloudWatchAsyncClient == null) {
final CloudWatchAsyncClientBuilder builder = CloudWatchAsyncClient.builder().region(getRegion());
builder.credentialsProvider(getCredentialsProvider());
this.cloudWatchAsyncClient = builder.build();
}
return this.cloudWatchAsyncClient;
}
public final String getWorkerId() throws UnknownHostException {
return Inet4Address.getLocalHost().getHostName();
}
public final RecordValidatorQueue getRecordValidator() {
if (recordValidator == null) {
this.recordValidator = new RecordValidatorQueue();
}
return this.recordValidator;
}
public ShardRecordProcessorFactory getShardRecordProcessorFactory() {
return new TestRecordProcessorFactory(getRecordValidator());
}
public final ConfigsBuilder getConfigsBuilder() throws IOException, URISyntaxException {
final String workerId = getWorkerId();
return new ConfigsBuilder(getStreamName(), getStreamName(), buildAsyncKinesisClient(), buildAsyncDynamoDbClient(),
buildAsyncCloudWatchClient(), workerId, getShardRecordProcessorFactory());
}
public RetrievalConfig getRetrievalConfig() throws IOException, URISyntaxException {
final InitialPositionInStreamExtended initialPosition = InitialPositionInStreamExtended
.newInitialPosition(getInitialPosition());
// Default is a streaming consumer
final RetrievalConfig config = getConfigsBuilder().retrievalConfig();
config.initialPositionInStreamExtended(initialPosition);
return config;
}
/**
* Configure ingress load (batch size, record size, and calling interval)
*/
@Value
@Builder
static class ProducerConfig {
private boolean isBatchPut;
private int batchSize;
private int recordSizeKB;
private long callPeriodMills;
}
/**
* Description of the method of resharding for a test case
*/
@Value
@Builder
static class ReshardConfig {
/**
* reshardingFactorCycle: lists the order or reshards that will be done during one reshard cycle
* e.g {SPLIT, MERGE} means that the number of shards will first be doubled, then halved
*/
private ReshardOptions[] reshardingFactorCycle;
/**
* numReshardCycles: the number of resharding cycles that will be executed in a test
*/
private int numReshardCycles;
/**
* reshardFrequencyMillis: the period of time between reshard cycles (in milliseconds)
*/
private long reshardFrequencyMillis;
}
}

View file

@ -0,0 +1,41 @@
package software.amazon.kinesis.config;
import software.amazon.awssdk.http.Protocol;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.retrieval.RetrievalConfig;
import software.amazon.kinesis.retrieval.polling.PollingConfig;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.UUID;
/**
* Config for a polling consumer with HTTP protocol of HTTP1
*/
public class ReleaseCanaryPollingH1TestConfig extends KCLAppConfig {
private final UUID uniqueId = UUID.randomUUID();
@Override
public String getStreamName() {
return "KCLReleaseCanary2XPollingH1TestStream_" + uniqueId;
}
@Override
public Protocol getKinesisClientProtocol() {
return Protocol.HTTP1_1;
}
@Override
public RetrievalConfig getRetrievalConfig() throws IOException, URISyntaxException {
final InitialPositionInStreamExtended initialPosition = InitialPositionInStreamExtended
.newInitialPosition(getInitialPosition());
final RetrievalConfig config = getConfigsBuilder().retrievalConfig();
config.initialPositionInStreamExtended(initialPosition);
config.retrievalSpecificConfig(new PollingConfig(getStreamName(), config.kinesisClient()));
return config;
}
}

View file

@ -0,0 +1,41 @@
package software.amazon.kinesis.config;
import software.amazon.awssdk.http.Protocol;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.retrieval.RetrievalConfig;
import software.amazon.kinesis.retrieval.polling.PollingConfig;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.UUID;
/**
* Config for a polling consumer with HTTP protocol of HTTP2
*/
public class ReleaseCanaryPollingH2TestConfig extends KCLAppConfig {
private final UUID uniqueId = UUID.randomUUID();
@Override
public String getStreamName() {
return "KCLReleaseCanary2XPollingH2TestStream_" + uniqueId;
}
@Override
public Protocol getKinesisClientProtocol() {
return Protocol.HTTP2;
}
@Override
public RetrievalConfig getRetrievalConfig() throws IOException, URISyntaxException {
final InitialPositionInStreamExtended initialPosition = InitialPositionInStreamExtended
.newInitialPosition(getInitialPosition());
final RetrievalConfig config = getConfigsBuilder().retrievalConfig();
config.initialPositionInStreamExtended(initialPosition);
config.retrievalSpecificConfig(new PollingConfig(getStreamName(), config.kinesisClient()));
return config;
}
}

View file

@ -0,0 +1,24 @@
package software.amazon.kinesis.config;
import software.amazon.awssdk.http.Protocol;
import java.util.UUID;
/**
* Config for a streaming consumer with HTTP protocol of HTTP2
*/
public class ReleaseCanaryStreamingTestConfig extends KCLAppConfig {
private final UUID uniqueId = UUID.randomUUID();
@Override
public String getStreamName() {
return "KCLReleaseCanary2XStreamingTestStream_" + uniqueId;
}
@Override
public Protocol getKinesisClientProtocol() {
return Protocol.HTTP2;
}
}

View file

@ -0,0 +1,44 @@
package software.amazon.kinesis.lifecycle;
import org.junit.Test;
import software.amazon.kinesis.config.KCLAppConfig;
import software.amazon.kinesis.config.ReleaseCanaryPollingH2TestConfig;
import software.amazon.kinesis.config.ReleaseCanaryPollingH1TestConfig;
import software.amazon.kinesis.config.ReleaseCanaryStreamingTestConfig;
import software.amazon.kinesis.utils.TestConsumer;
public class BasicStreamConsumerIntegrationTest {
/**
* Test with a polling consumer using HTTP2 protocol.
* In the polling case, consumer makes calls to the producer each time to request records to process.
*/
@Test
public void kclReleaseCanaryPollingH2Test() throws Exception {
KCLAppConfig consumerConfig = new ReleaseCanaryPollingH2TestConfig();
TestConsumer consumer = new TestConsumer(consumerConfig);
consumer.run();
}
/**
* Test with a polling consumer using HTTP1 protocol.
* In the polling case, consumer makes calls to the producer each time to request records to process.
*/
@Test
public void kclReleaseCanaryPollingH1Test() throws Exception {
KCLAppConfig consumerConfig = new ReleaseCanaryPollingH1TestConfig();
TestConsumer consumer = new TestConsumer(consumerConfig);
consumer.run();
}
/**
* Test with a streaming consumer.
* In the streaming configuration, connection is made once between consumer and producer and producer continuously sends data to be processed.
*/
@Test
public void kclReleaseCanaryStreamingTest() throws Exception {
KCLAppConfig consumerConfig = new ReleaseCanaryStreamingTestConfig();
TestConsumer consumer = new TestConsumer(consumerConfig);
consumer.run();
}
}

View file

@ -0,0 +1,77 @@
package software.amazon.kinesis.utils;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.dynamodb.model.ListTablesRequest;
import software.amazon.awssdk.services.dynamodb.model.ListTablesResponse;
import software.amazon.kinesis.common.FutureUtils;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@Slf4j
@NoArgsConstructor
public abstract class AWSResourceManager {
/**
* Make delete resource API call for specific resource type
*/
public abstract void deleteResourceCall(String resourceName) throws Exception;
/**
* Check if resource with given name is in active state
*/
public abstract boolean isResourceActive(String name);
/**
* Get a list of all the names of resources of a specified type
* @return
* @throws Exception
*/
public abstract List<String> getAllResourceNames() throws Exception;
/**
* Delete resource with specified resource name
*/
public void deleteResource(String resourceName) throws Exception {
try {
deleteResourceCall(resourceName);
} catch (Exception e) {
throw new Exception("Could not delete resource: {}", e);
}
// Wait till resource is deleted to return
int i = 0;
while (true) {
i++;
if (i > 100) {
throw new RuntimeException("Failed resource deletion");
}
try {
if (!isResourceActive(resourceName)) {
log.info("Successfully deleted the resource {}", resourceName);
return;
}
} catch (Exception e) {
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(10));
} catch (InterruptedException e1) {}
log.info("Resource {} is not deleted yet, exception: ", resourceName);
}
}
}
/**
* Delete all instances of a particular resource type
*/
public void deleteAllResource() throws Exception {
final List<String> resourceNames = getAllResourceNames();
for (String resourceName : resourceNames) {
deleteResource(resourceName);
}
}
}

View file

@ -0,0 +1,68 @@
package software.amazon.kinesis.utils;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
import software.amazon.awssdk.services.dynamodb.model.ListTablesRequest;
import software.amazon.awssdk.services.dynamodb.model.ListTablesResponse;
import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
import software.amazon.awssdk.services.dynamodb.model.TableStatus;
import software.amazon.kinesis.common.FutureUtils;
import java.io.IOException;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@Slf4j
@AllArgsConstructor
public class LeaseTableManager extends AWSResourceManager {
private final DynamoDbAsyncClient dynamoClient;
public boolean isResourceActive(String tableName) {
final DescribeTableRequest request = DescribeTableRequest.builder().tableName(tableName).build();
final CompletableFuture<DescribeTableResponse> describeTableResponseCompletableFuture = dynamoClient.describeTable(request);
try {
final DescribeTableResponse response = describeTableResponseCompletableFuture.get(30, TimeUnit.SECONDS);
boolean isActive = response.table().tableStatus().equals(TableStatus.ACTIVE);
if (!isActive) {
throw new RuntimeException("Table is not active, instead in status: " + response.table().tableStatus());
}
return true;
} catch (ExecutionException e) {
if (e.getCause() instanceof ResourceNotFoundException) {
return false;
} else {
throw new RuntimeException(e);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public void deleteResourceCall(String tableName) throws Exception {
final DeleteTableRequest request = DeleteTableRequest.builder().tableName(tableName).build();
FutureUtils.resolveOrCancelFuture(dynamoClient.deleteTable(request), Duration.ofSeconds(60));
}
public List<String> getAllResourceNames() throws Exception {
ListTablesRequest listTableRequest = ListTablesRequest.builder().build();
List<String> allTableNames = new ArrayList<>();
ListTablesResponse result = null;
do {
result = FutureUtils.resolveOrCancelFuture(dynamoClient.listTables(listTableRequest), Duration.ofSeconds(60));
allTableNames.addAll(result.tableNames());
listTableRequest = ListTablesRequest.builder().exclusiveStartTableName(result.lastEvaluatedTableName()).build();
} while (result.lastEvaluatedTableName() != null);
return allTableNames;
}
}

View file

@ -0,0 +1,10 @@
package software.amazon.kinesis.utils;
/**
* Possible outcomes for record validation in RecordValidatorQueue
*/
public enum RecordValidationStatus {
OUT_OF_ORDER,
MISSING_RECORD,
NO_ERROR
}

View file

@ -0,0 +1,63 @@
package software.amazon.kinesis.utils;
import lombok.extern.slf4j.Slf4j;
import java.util.HashSet;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* Class that maintains a dictionary that maps shard IDs to a list of records
* that are processed by that shard.
* Validation ensures that
* 1. The records processed by each shard are in increasing order (duplicates allowed)
* 2. The total number of unique records processed is equal to the number of records put on the stream
*/
@Slf4j
public class RecordValidatorQueue {
private final ConcurrentHashMap<String, List<String>> dict = new ConcurrentHashMap<>();
public void add(String shardId, String data) {
final List<String> values = dict.computeIfAbsent(shardId, key -> new ArrayList<>());
values.add(data);
}
public RecordValidationStatus validateRecords(int expectedRecordCount) {
// Validate that each List in the HashMap has data records in increasing order
for (Map.Entry<String, List<String>> entry : dict.entrySet()) {
List<String> recordsPerShard = entry.getValue();
int prevVal = -1;
for (String record : recordsPerShard) {
int nextVal = Integer.parseInt(record);
if (prevVal > nextVal) {
log.error("The records are not in increasing order. Saw record data {} before {}.", prevVal, nextVal);
return RecordValidationStatus.OUT_OF_ORDER;
}
prevVal = nextVal;
}
}
// Validate that no records are missing over all shards
int actualRecordCount = 0;
for (Map.Entry<String, List<String>> entry : dict.entrySet()) {
List<String> recordsPerShard = entry.getValue();
Set<String> noDupRecords = new HashSet<String>(recordsPerShard);
actualRecordCount += noDupRecords.size();
}
// If this is true, then there was some record that was missed during processing.
if (actualRecordCount != expectedRecordCount) {
log.error("Failed to get correct number of records processed. Should be {} but was {}", expectedRecordCount, actualRecordCount);
return RecordValidationStatus.MISSING_RECORD;
}
// Record validation succeeded.
return RecordValidationStatus.NO_ERROR;
}
}

View file

@ -0,0 +1,44 @@
package software.amazon.kinesis.utils;
import org.junit.Assert;
import org.junit.Test;
public class RecordValidatorQueueTest {
private final RecordValidatorQueue recordValidator = new RecordValidatorQueue();
private static final String SHARD_ID = "ABC";
@Test
public void testValidationFailedRecordOutOfOrder() {
recordValidator.add(SHARD_ID, "0");
recordValidator.add(SHARD_ID, "1");
recordValidator.add(SHARD_ID, "3");
recordValidator.add(SHARD_ID, "2");
RecordValidationStatus error = recordValidator.validateRecords(4);
Assert.assertEquals(RecordValidationStatus.OUT_OF_ORDER, error);
}
@Test
public void testValidationFailedMissingRecord() {
recordValidator.add(SHARD_ID, "0");
recordValidator.add(SHARD_ID, "1");
recordValidator.add(SHARD_ID, "2");
recordValidator.add(SHARD_ID, "3");
RecordValidationStatus error = recordValidator.validateRecords(5);
Assert.assertEquals(RecordValidationStatus.MISSING_RECORD, error);
}
@Test
public void testValidRecords() {
recordValidator.add(SHARD_ID, "0");
recordValidator.add(SHARD_ID, "1");
recordValidator.add(SHARD_ID, "2");
recordValidator.add(SHARD_ID, "3");
RecordValidationStatus error = recordValidator.validateRecords(4);
Assert.assertEquals(RecordValidationStatus.NO_ERROR, error);
}
}

View file

@ -0,0 +1,11 @@
package software.amazon.kinesis.utils;
/**
* Specifies the types of resharding possible in integration tests
* Split doubles the number of shards.
* Merge halves the number of shards.
*/
public enum ReshardOptions {
SPLIT,
MERGE
}

View file

@ -0,0 +1,117 @@
package software.amazon.kinesis.utils;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.dynamodb.model.ListTablesRequest;
import software.amazon.awssdk.services.dynamodb.model.ListTablesResponse;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
import software.amazon.awssdk.services.kinesis.model.DeleteStreamRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryResponse;
import software.amazon.awssdk.services.kinesis.model.ListStreamsRequest;
import software.amazon.awssdk.services.kinesis.model.ListStreamsResponse;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import software.amazon.awssdk.services.kinesis.model.StreamStatus;
import software.amazon.kinesis.common.FutureUtils;
import software.amazon.kinesis.config.KCLAppConfig;
import java.io.IOException;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@Value
@Slf4j
public class StreamExistenceManager extends AWSResourceManager {
private final KinesisAsyncClient client;
private final KCLAppConfig testConfig;
public StreamExistenceManager(KCLAppConfig config) throws URISyntaxException, IOException {
this.testConfig = config;
this.client = config.buildAsyncKinesisClient();
}
public boolean isResourceActive(String streamName) {
final DescribeStreamSummaryRequest request = DescribeStreamSummaryRequest.builder().streamName(streamName).build();
final CompletableFuture<DescribeStreamSummaryResponse> describeStreamSummaryResponseCompletableFuture = client.describeStreamSummary(request);
try {
final DescribeStreamSummaryResponse response = describeStreamSummaryResponseCompletableFuture.get(30, TimeUnit.SECONDS);
boolean isActive = response.streamDescriptionSummary().streamStatus().equals(StreamStatus.ACTIVE);
if (!isActive) {
throw new RuntimeException("Stream is not active, instead in status: " + response.streamDescriptionSummary().streamStatus());
}
return true;
} catch (ExecutionException e) {
if (e.getCause() instanceof ResourceNotFoundException) {
return false;
} else {
throw new RuntimeException(e);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public void deleteResourceCall(String streamName) throws Exception {
final DeleteStreamRequest request = DeleteStreamRequest.builder().streamName(streamName).enforceConsumerDeletion(true).build();
client.deleteStream(request).get(30, TimeUnit.SECONDS);
}
public List<String> getAllResourceNames() throws Exception {
ListStreamsRequest listStreamRequest = ListStreamsRequest.builder().build();
List<String> allStreamNames = new ArrayList<>();
ListStreamsResponse result = null;
do {
result = FutureUtils.resolveOrCancelFuture(client.listStreams(listStreamRequest), Duration.ofSeconds(60));
allStreamNames.addAll(result.streamNames());
listStreamRequest = ListStreamsRequest.builder().exclusiveStartStreamName(result.nextToken()).build();
} while (result.hasMoreStreams());
return allStreamNames;
}
public void checkStreamAndCreateIfNecessary(String streamName) {
if (!isResourceActive(streamName)) {
createStream(streamName, testConfig.getShardCount());
}
log.info("Using stream {} with region {}", streamName, testConfig.getRegion());
}
private void createStream(String streamName, int shardCount) {
final CreateStreamRequest request = CreateStreamRequest.builder().streamName(streamName).shardCount(shardCount).build();
try {
client.createStream(request).get(30, TimeUnit.SECONDS);
} catch (Exception e) {
throw new RuntimeException("Failed to create stream with name " + streamName, e);
}
int i = 0;
while (true) {
i++;
if (i > 100) {
throw new RuntimeException("Failed stream creation, did not transition into active");
}
try {
boolean isActive = isResourceActive(streamName);
if (isActive) {
log.info("Succesfully created the stream {}", streamName);
return;
}
} catch (Exception e) {
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(10));
} catch (InterruptedException e1) {
log.error("Failed to sleep");
}
log.info("Stream {} is not active yet, exception: ", streamName, e);
}
}
}
}

View file

@ -0,0 +1,223 @@
package software.amazon.kinesis.utils;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.kinesis.checkpoint.CheckpointConfig;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.config.KCLAppConfig;
import software.amazon.kinesis.coordinator.CoordinatorConfig;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.leases.LeaseManagementConfig;
import software.amazon.kinesis.lifecycle.LifecycleConfig;
import software.amazon.kinesis.metrics.MetricsConfig;
import software.amazon.kinesis.processor.ProcessorConfig;
import software.amazon.kinesis.retrieval.RetrievalConfig;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@Slf4j
public class TestConsumer {
public final KCLAppConfig consumerConfig;
public final Region region;
public final String streamName;
public final KinesisAsyncClient kinesisClient;
private MetricsConfig metricsConfig;
private RetrievalConfig retrievalConfig;
private CheckpointConfig checkpointConfig;
private CoordinatorConfig coordinatorConfig;
private LeaseManagementConfig leaseManagementConfig;
private LifecycleConfig lifecycleConfig;
private ProcessorConfig processorConfig;
private Scheduler scheduler;
private ScheduledExecutorService producerExecutor;
private ScheduledFuture<?> producerFuture;
private ScheduledExecutorService consumerExecutor;
private ScheduledFuture<?> consumerFuture;
private DynamoDbAsyncClient dynamoClient;
private final ObjectMapper mapper = new ObjectMapper();
public int successfulPutRecords = 0;
public BigInteger payloadCounter = new BigInteger("0");
public TestConsumer(KCLAppConfig consumerConfig) throws Exception {
this.consumerConfig = consumerConfig;
this.region = consumerConfig.getRegion();
this.streamName = consumerConfig.getStreamName();
this.kinesisClient = consumerConfig.buildAsyncKinesisClient();
this.dynamoClient = consumerConfig.buildAsyncDynamoDbClient();
}
public void run() throws Exception {
final StreamExistenceManager streamExistenceManager = new StreamExistenceManager(this.consumerConfig);
final LeaseTableManager leaseTableManager = new LeaseTableManager(this.dynamoClient);
// Clean up any old streams or lease tables left in test environment
cleanTestResources(streamExistenceManager, leaseTableManager);
// Check if stream is created. If not, create it
streamExistenceManager.checkStreamAndCreateIfNecessary(this.streamName);
startProducer();
setUpConsumerResources();
try {
startConsumer();
// Sleep for three minutes to allow the producer/consumer to run and then end the test case.
Thread.sleep(TimeUnit.SECONDS.toMillis(60 * 3));
// Stops sending dummy data.
stopProducer();
// Wait a few seconds for the last few records to be processed
Thread.sleep(TimeUnit.SECONDS.toMillis(10));
// Finishes processing current batch of data already received from Kinesis before shutting down.
awaitConsumerFinish();
// Validate processed data
validateRecordProcessor();
} catch (Exception e) {
// Test Failed. Clean up resources and then throw exception.
log.info("----------Test Failed: Cleaning up resources------------");
throw e;
} finally {
// Clean up resources created
deleteResources(streamExistenceManager, leaseTableManager);
}
}
private void cleanTestResources(StreamExistenceManager streamExistenceManager, LeaseTableManager leaseTableManager) throws Exception {
log.info("----------Before starting, Cleaning test environment----------");
log.info("----------Deleting all lease tables in account----------");
leaseTableManager.deleteAllResource();
log.info("----------Finished deleting all lease tables-------------");
log.info("----------Deleting all streams in account----------");
streamExistenceManager.deleteAllResource();
log.info("----------Finished deleting all streams-------------");
}
private void startProducer() {
// Send dummy data to stream
this.producerExecutor = Executors.newSingleThreadScheduledExecutor();
this.producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 60, 1, TimeUnit.SECONDS);
}
private void setUpConsumerResources() throws Exception {
// Setup configuration of KCL (including DynamoDB and CloudWatch)
final ConfigsBuilder configsBuilder = consumerConfig.getConfigsBuilder();
retrievalConfig = consumerConfig.getRetrievalConfig();
checkpointConfig = configsBuilder.checkpointConfig();
coordinatorConfig = configsBuilder.coordinatorConfig();
leaseManagementConfig = configsBuilder.leaseManagementConfig()
.initialPositionInStream(InitialPositionInStreamExtended.newInitialPosition(consumerConfig.getInitialPosition()))
.initialLeaseTableReadCapacity(50).initialLeaseTableWriteCapacity(50);
lifecycleConfig = configsBuilder.lifecycleConfig();
processorConfig = configsBuilder.processorConfig();
metricsConfig = configsBuilder.metricsConfig();
// Create Scheduler
this.scheduler = new Scheduler(
checkpointConfig,
coordinatorConfig,
leaseManagementConfig,
lifecycleConfig,
metricsConfig,
processorConfig,
retrievalConfig
);
}
private void startConsumer() {
// Start record processing of dummy data
this.consumerExecutor = Executors.newSingleThreadScheduledExecutor();
this.consumerFuture = consumerExecutor.schedule(scheduler, 0, TimeUnit.SECONDS);
}
public void publishRecord() {
final PutRecordRequest request;
try {
request = PutRecordRequest.builder()
.partitionKey(RandomStringUtils.randomAlphabetic(5, 20))
.streamName(this.streamName)
.data(SdkBytes.fromByteBuffer(wrapWithCounter(5, payloadCounter))) // 1024 is 1 KB
.build();
kinesisClient.putRecord(request).get();
// Increment the payload counter if the putRecord call was successful
payloadCounter = payloadCounter.add(new BigInteger("1"));
successfulPutRecords += 1;
log.info("---------Record published, successfulPutRecords is now: {}", successfulPutRecords);
} catch (InterruptedException e) {
log.info("Interrupted, assuming shutdown. ", e);
} catch (ExecutionException | RuntimeException e) {
log.error("Error during publish records", e);
}
}
private ByteBuffer wrapWithCounter(int payloadSize, BigInteger payloadCounter) throws RuntimeException {
final byte[] returnData;
log.info("--------------Putting record with data: {}", payloadCounter);
try {
returnData = mapper.writeValueAsBytes(payloadCounter);
} catch (Exception e) {
throw new RuntimeException("Error converting object to bytes: ", e);
}
return ByteBuffer.wrap(returnData);
}
private void stopProducer() {
log.info("Cancelling producer and shutting down executor.");
producerFuture.cancel(false);
producerExecutor.shutdown();
}
private void awaitConsumerFinish() throws Exception {
Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
log.info("Waiting up to 20 seconds for shutdown to complete.");
try {
gracefulShutdownFuture.get(20, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.info("Interrupted while waiting for graceful shutdown. Continuing.");
} catch (ExecutionException | TimeoutException e) {
throw e;
}
log.info("Completed, shutting down now.");
}
private void validateRecordProcessor() throws Exception {
log.info("The number of expected records is: {}", successfulPutRecords);
final RecordValidationStatus errorVal = consumerConfig.getRecordValidator().validateRecords(successfulPutRecords);
if (errorVal != RecordValidationStatus.NO_ERROR) {
throw new RuntimeException("There was an error validating the records that were processed: " + errorVal.toString());
}
log.info("--------------Completed validation of processed records.--------------");
}
private void deleteResources(StreamExistenceManager streamExistenceManager, LeaseTableManager leaseTableManager) throws Exception {
log.info("-------------Start deleting stream.----------------");
streamExistenceManager.deleteResource(this.streamName);
log.info("-------------Start deleting lease table.----------------");
leaseTableManager.deleteResource(this.consumerConfig.getStreamName());
log.info("-------------Finished deleting resources.----------------");
}
}

View file

@ -0,0 +1,108 @@
package software.amazon.kinesis.utils;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.retrieval.KinesisClientRecord;
import java.nio.ByteBuffer;
/**
* Implement initialization and deletion of shards and shard record processing
*/
@Slf4j
public class TestRecordProcessor implements ShardRecordProcessor {
private static final String SHARD_ID_MDC_KEY = "ShardId";
private String shardId;
private final RecordValidatorQueue recordValidator;
public TestRecordProcessor(RecordValidatorQueue recordValidator) {
this.recordValidator = recordValidator;
}
@Override
public void initialize(InitializationInput initializationInput) {
shardId = initializationInput.shardId();
MDC.put(SHARD_ID_MDC_KEY, shardId);
try {
log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
} finally {
MDC.remove(SHARD_ID_MDC_KEY);
}
}
@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
MDC.put(SHARD_ID_MDC_KEY, shardId);
try {
log.info("Processing {} record(s)", processRecordsInput.records().size());
for (KinesisClientRecord kinesisRecord : processRecordsInput.records()) {
final String data = new String(asByteArray(kinesisRecord.data()));
log.info("Processing record pk: {}", data);
recordValidator.add(shardId, data);
}
} catch (Throwable t) {
log.error("Caught throwable while processing records. Aborting.", t);
Runtime.getRuntime().halt(1);
} finally {
MDC.remove(SHARD_ID_MDC_KEY);
}
}
public static byte[] asByteArray(ByteBuffer buf) {
byte[] bytes = new byte[buf.remaining()];
buf.get(bytes);
return bytes;
}
@Override
public void leaseLost(LeaseLostInput leaseLostInput) {
MDC.put(SHARD_ID_MDC_KEY, shardId);
try {
log.info("Lost lease, so terminating.");
} finally {
MDC.remove(SHARD_ID_MDC_KEY);
}
}
@Override
public void shardEnded(ShardEndedInput shardEndedInput) {
MDC.put(SHARD_ID_MDC_KEY, shardId);
try {
log.info("Reached shard end checkpointing.");
shardEndedInput.checkpointer().checkpoint();
} catch (ShutdownException | InvalidStateException e) {
log.error("Exception while checkpointing at shard end. Giving up.", e);
} finally {
MDC.remove(SHARD_ID_MDC_KEY);
}
}
@Override
public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
MDC.put(SHARD_ID_MDC_KEY, shardId);
try {
log.info("Scheduler is shutting down, checkpointing.");
shutdownRequestedInput.checkpointer().checkpoint();
} catch (ShutdownException | InvalidStateException e) {
log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
} finally {
MDC.remove(SHARD_ID_MDC_KEY);
}
}
}

View file

@ -0,0 +1,19 @@
package software.amazon.kinesis.utils;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
private final RecordValidatorQueue recordValidator;
public TestRecordProcessorFactory(RecordValidatorQueue recordValidator) {
this.recordValidator = recordValidator;
}
@Override
public ShardRecordProcessor shardRecordProcessor() {
return new TestRecordProcessor(this.recordValidator);
}
}

View file

@ -0,0 +1,8 @@
<!DOCTYPE suppressions PUBLIC
"-//Puppy Crawl//DTD Suppressions 1.1//EN"
"http://www.puppycrawl.com/dtds/suppressions_1_1.dtd">
<suppressions>
<!-- Disable all checks for protobuf-generated files. -->
<suppress files=".*/kpl/Messages.java" checks="[a-zA-Z0-9]*"/>
</suppressions>

39
checkstyle/checkstyle.xml Normal file
View file

@ -0,0 +1,39 @@
<?xml version="1.0"?>
<!DOCTYPE module PUBLIC
"-//Checkstyle//DTD Checkstyle Configuration 1.3//EN"
"https://checkstyle.org/dtds/configuration_1_3.dtd">
<module name="Checker">
<module name="FileTabCharacter">
<property name="eachLine" value="true"/>
</module>
<module name="LineLength">
<property name="fileExtensions" value="java"/>
<property name="max" value="170"/>
<property name="ignorePattern" value="^package.*|^import.*|a href|href|http://|https://|ftp://"/>
</module>
<module name="SuppressWithPlainTextCommentFilter">
<property name="offCommentFormat" value="CHECKSTYLE.OFF\: ([\w\|]+)"/>
<property name="onCommentFormat" value="CHECKSTYLE.ON\: ([\w\|]+)"/>
<!-- $1 refers to the first match group in the regex defined in commentFormat -->
<property name="checkFormat" value="$1"/>
</module>
<module name="TreeWalker">
<module name="AvoidStarImport"/>
<module name="ConstantName"/>
<module name="InvalidJavadocPosition"/>
<module name="LocalVariableName"/>
<module name="MemberName"/>
<module name="MethodName"/>
<module name="NeedBraces"/>
<module name="OneStatementPerLine"/>
<module name="OneTopLevelClass"/>
<module name="OuterTypeFilename"/>
<module name="ParameterName"/>
<module name="WhitespaceAfter"/>
</module>
</module>

21
pom.xml
View file

@ -72,6 +72,27 @@
</distributionManagement>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>3.3.0</version>
<configuration>
<configLocation>checkstyle/checkstyle.xml</configLocation>
<consoleOutput>true</consoleOutput>
<failOnViolation>true</failOnViolation>
<suppressionsLocation>checkstyle/checkstyle-suppressions.xml</suppressionsLocation>
</configuration>
<executions>
<execution>
<phase>validate</phase>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<plugin>