From 72c77d3c1a18810f602f6c415b0d98487150c973 Mon Sep 17 00:00:00 2001 From: parijas Date: Mon, 8 Jan 2018 16:51:35 -0800 Subject: [PATCH] Revert "Added unit tests for IKinesisProxy injection in Worker Builder" This reverts commit aa944c17061b1506c5c55cf3932857b6f6086049. Reverting to undo changes to import ordering. --- .../clientlibrary/lib/worker/Worker.java | 59 ++++++++------- .../clientlibrary/lib/worker/WorkerTest.java | 74 ------------------- 2 files changed, 29 insertions(+), 104 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 429236bd..1ce25579 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -14,33 +14,6 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; -import com.amazonaws.regions.Region; -import com.amazonaws.regions.RegionUtils; -import com.amazonaws.services.cloudwatch.AmazonCloudWatch; -import com.amazonaws.services.cloudwatch.AmazonCloudWatchClient; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient; -import com.amazonaws.services.kinesis.AmazonKinesis; -import com.amazonaws.services.kinesis.AmazonKinesisClient; -import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint; -import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; -import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; -import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; -import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; -import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxyFactory; -import com.amazonaws.services.kinesis.leases.exceptions.LeasingException; -import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; -import com.amazonaws.services.kinesis.leases.impl.KinesisClientLeaseManager; -import com.amazonaws.services.kinesis.metrics.impl.CWMetricsFactory; -import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory; -import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; -import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import lombok.Getter; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - import java.util.Collection; import java.util.HashSet; import java.util.List; @@ -59,6 +32,33 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import com.amazonaws.regions.Region; +import com.amazonaws.regions.RegionUtils; +import com.amazonaws.services.cloudwatch.AmazonCloudWatch; +import com.amazonaws.services.cloudwatch.AmazonCloudWatchClient; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient; +import com.amazonaws.services.kinesis.AmazonKinesis; +import com.amazonaws.services.kinesis.AmazonKinesisClient; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; +import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxyFactory; +import com.amazonaws.services.kinesis.leases.exceptions.LeasingException; +import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; +import com.amazonaws.services.kinesis.leases.impl.KinesisClientLeaseManager; +import com.amazonaws.services.kinesis.metrics.impl.CWMetricsFactory; +import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory; +import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; +import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + /** * Worker is the high level class that Kinesis applications use to start processing data. It initializes and oversees * different components (e.g. syncing shard and lease information, tracking shard assignments, and processing data from @@ -1063,7 +1063,6 @@ public class Worker implements Runnable { /** * Builder to construct a Worker instance. */ - @Getter public static class Builder { private IRecordProcessorFactory recordProcessorFactory; @@ -1199,7 +1198,7 @@ public class Worker implements Runnable { * Set KinesisProxy for the worker. * * @param kinesisProxy - * Sets an implementation of IKinesisProxy. + * KinesisProxy uses the AmazonKinesis client to get data from Kinesis or DynamoDBStreams * * @return A reference to this updated object so that method calls can be chained together. */ @@ -1268,7 +1267,7 @@ public class Worker implements Runnable { } } if (metricsFactory == null) { - metricsFactory = Worker.getMetricsFactory(cloudWatchClient, config); + metricsFactory = getMetricsFactory(cloudWatchClient, config); } if (shardPrioritization == null) { shardPrioritization = new ParentsFirstShardPrioritization(1); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index 692e680d..ce406dce 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -19,8 +19,6 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.isA; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThanOrEqualTo; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; @@ -43,7 +41,6 @@ import java.io.File; import java.lang.Thread.State; import java.lang.reflect.Field; import java.math.BigInteger; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.Date; @@ -66,13 +63,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy; -import com.amazonaws.services.kinesis.model.DescribeStreamResult; -import com.amazonaws.services.kinesis.model.ExpiredIteratorException; -import com.amazonaws.services.kinesis.model.GetRecordsResult; -import com.amazonaws.services.kinesis.model.InvalidArgumentException; -import com.amazonaws.services.kinesis.model.PutRecordResult; -import com.amazonaws.services.kinesis.model.ResourceNotFoundException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.hamcrest.Condition; @@ -1484,70 +1474,6 @@ public class WorkerTest { } - @Test - public void testBuilderWithDefaultKinesisProxy() { - IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); - - Worker.Builder builder = new Worker.Builder() - .recordProcessorFactory(recordProcessorFactory) - .config(config); - builder.build(); - assertNotNull(builder.getKinesisProxy()); - assertTrue(builder.getKinesisProxy() instanceof KinesisProxy); - } - - @Test - public void testBuilderWhenKinesisProxyIsSet() { - IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); - IKinesisProxy kinesisProxy = new MockKinesisProxy(); - Worker.Builder builder = new Worker.Builder() - .recordProcessorFactory(recordProcessorFactory) - .kinesisProxy(kinesisProxy) - .config(config); - builder.build(); - assertNotNull(builder.getKinesisProxy()); - assertTrue(builder.getKinesisProxy() instanceof MockKinesisProxy); - } - - private class MockKinesisProxy implements IKinesisProxy { - @Override public GetRecordsResult get(String shardIterator, int maxRecords) - throws ResourceNotFoundException, InvalidArgumentException, ExpiredIteratorException { - return null; - } - - @Override public DescribeStreamResult getStreamInfo(String startShardId) throws ResourceNotFoundException { - return null; - } - - @Override public Set getAllShardIds() throws ResourceNotFoundException { - return null; - } - - @Override public List getShardList() throws ResourceNotFoundException { - return null; - } - - @Override public String getIterator(String shardId, String iteratorEnum, String sequenceNumber) - throws ResourceNotFoundException, InvalidArgumentException { - return null; - } - - @Override public String getIterator(String shardId, String iteratorEnum) - throws ResourceNotFoundException, InvalidArgumentException { - return null; - } - - @Override public String getIterator(String shardId, Date timestamp) - throws ResourceNotFoundException, InvalidArgumentException { - return null; - } - - @Override public PutRecordResult put(String sequenceNumberForOrdering, String explicitHashKey, - String partitionKey, ByteBuffer data) throws ResourceNotFoundException, InvalidArgumentException { - return null; - } - } - private abstract class InjectableWorker extends Worker { InjectableWorker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, StreamConfig streamConfig,