From e4b1c8d5613bed3d1044d670119954c1bfeb5a2a Mon Sep 17 00:00:00 2001 From: Ravindranath Kakarla Date: Tue, 27 Oct 2020 21:05:48 -0700 Subject: [PATCH 1/3] Add support for GlueSchemaRegistry message deserialization. --- amazon-kinesis-client/pom.xml | 5 + .../amazon/kinesis/coordinator/Scheduler.java | 10 +- .../kinesis/lifecycle/ConsumerStates.java | 4 +- .../amazon/kinesis/lifecycle/ProcessTask.java | 11 +- .../lifecycle/ShardConsumerArgument.java | 3 + .../retrieval/KinesisClientRecord.java | 2 + .../kinesis/retrieval/RetrievalConfig.java | 7 + .../schemaregistry/SchemaRegistryDecoder.java | 74 ++++++++ .../kinesis/lifecycle/ConsumerStatesTest.java | 5 +- .../kinesis/lifecycle/ProcessTaskTest.java | 173 +++++++++++++++++- 10 files changed, 285 insertions(+), 9 deletions(-) create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/schemaregistry/SchemaRegistryDecoder.java diff --git a/amazon-kinesis-client/pom.xml b/amazon-kinesis-client/pom.xml index 8c5ff5a4..69ce44db 100644 --- a/amazon-kinesis-client/pom.xml +++ b/amazon-kinesis-client/pom.xml @@ -74,6 +74,11 @@ netty-nio-client ${awssdk.version} + + software.aws.glue + schema-registry-serde + 1.0.0 + com.google.guava guava diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index ed2f889c..d7523549 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -97,6 +97,7 @@ import software.amazon.kinesis.processor.ShutdownNotificationAware; import software.amazon.kinesis.retrieval.AggregatorUtil; import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RetrievalConfig; +import software.amazon.kinesis.schemaregistry.SchemaRegistryDecoder; import static software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy.StreamsLeasesDeletionType; @@ -167,6 +168,7 @@ public class Scheduler implements Runnable { private final LeaderDecider leaderDecider; private final Map staleStreamDeletionMap = new HashMap<>(); private final LeaseCleanupManager leaseCleanupManager; + private final SchemaRegistryDecoder schemaRegistryDecoder; // Holds consumers for shards the worker is currently tracking. Key is shard // info, value is ShardConsumer. @@ -296,6 +298,10 @@ public class Scheduler implements Runnable { leaseManagementConfig.leasesRecoveryAuditorInconsistencyConfidenceThreshold()); this.leaseCleanupManager = this.leaseManagementConfig.leaseManagementFactory(leaseSerializer, isMultiStreamMode) .createLeaseCleanupManager(metricsFactory); + this.schemaRegistryDecoder = + this.retrievalConfig.glueSchemaRegistryDeserializer() == null ? + null + : new SchemaRegistryDecoder(this.retrievalConfig.glueSchemaRegistryDeserializer()); } /** @@ -929,7 +935,9 @@ public class Scheduler implements Runnable { aggregatorUtil, hierarchicalShardSyncerProvider.apply(streamConfig), metricsFactory, - leaseCleanupManager); + leaseCleanupManager, + schemaRegistryDecoder + ); return new ShardConsumer(cache, executorService, shardInfo, lifecycleConfig.logWarningForTaskAfterMillis(), argument, lifecycleConfig.taskExecutionListener(), lifecycleConfig.readTimeoutsToIgnoreBeforeWarning()); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java index 4d894d94..c4a87082 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java @@ -269,7 +269,9 @@ class ConsumerStates { input, argument.shouldCallProcessRecordsEvenForEmptyRecordList(), argument.idleTimeInMilliseconds(), - argument.aggregatorUtil(), argument.metricsFactory() + argument.aggregatorUtil(), + argument.metricsFactory(), + argument.schemaRegistryDecoder() ); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java index 6c52e0de..b3ba8a7d 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java @@ -36,6 +36,7 @@ import software.amazon.kinesis.retrieval.AggregatorUtil; import software.amazon.kinesis.retrieval.KinesisClientRecord; import software.amazon.kinesis.retrieval.ThrottlingReporter; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; +import software.amazon.kinesis.schemaregistry.SchemaRegistryDecoder; /** * Task for fetching data records and invoking processRecords() on the record processor instance. @@ -62,6 +63,7 @@ public class ProcessTask implements ConsumerTask { private final MetricsFactory metricsFactory; private final AggregatorUtil aggregatorUtil; private final String shardInfoId; + private final SchemaRegistryDecoder schemaRegistryDecoder; public ProcessTask(@NonNull ShardInfo shardInfo, @NonNull ShardRecordProcessor shardRecordProcessor, @@ -74,7 +76,8 @@ public class ProcessTask implements ConsumerTask { boolean shouldCallProcessRecordsEvenForEmptyRecordList, long idleTimeInMilliseconds, @NonNull AggregatorUtil aggregatorUtil, - @NonNull MetricsFactory metricsFactory) { + @NonNull MetricsFactory metricsFactory, + SchemaRegistryDecoder schemaRegistryDecoder) { this.shardInfo = shardInfo; this.shardInfoId = ShardInfo.getLeaseKey(shardInfo); this.shardRecordProcessor = shardRecordProcessor; @@ -85,6 +88,7 @@ public class ProcessTask implements ConsumerTask { this.shouldCallProcessRecordsEvenForEmptyRecordList = shouldCallProcessRecordsEvenForEmptyRecordList; this.idleTimeInMilliseconds = idleTimeInMilliseconds; this.metricsFactory = metricsFactory; + this.schemaRegistryDecoder = schemaRegistryDecoder; if (!skipShardSyncAtWorkerInitializationIfLeasesExist) { this.shard = shardDetector.shard(shardInfo.shardId()); @@ -133,6 +137,9 @@ public class ProcessTask implements ConsumerTask { throttlingReporter.success(); List records = deaggregateAnyKplRecords(processRecordsInput.records()); + if (schemaRegistryDecoder != null) { + records = schemaRegistryDecoder.decode(records); + } if (!records.isEmpty()) { scope.addData(RECORDS_PROCESSED_METRIC, records.size(), StandardUnit.COUNT, MetricsLevel.SUMMARY); @@ -163,6 +170,8 @@ public class ProcessTask implements ConsumerTask { } } + + private List deaggregateAnyKplRecords(List records) { if (shard == null) { return aggregatorUtil.deaggregate(records); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java index 0f18891c..505f4e06 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java @@ -15,6 +15,7 @@ package software.amazon.kinesis.lifecycle; +import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDeserializer; import lombok.Data; import lombok.NonNull; import lombok.experimental.Accessors; @@ -32,6 +33,7 @@ import software.amazon.kinesis.processor.Checkpointer; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.retrieval.AggregatorUtil; import software.amazon.kinesis.retrieval.RecordsPublisher; +import software.amazon.kinesis.schemaregistry.SchemaRegistryDecoder; import java.util.concurrent.ExecutorService; @@ -73,4 +75,5 @@ public class ShardConsumerArgument { @NonNull private final MetricsFactory metricsFactory; private final LeaseCleanupManager leaseCleanupManager; + private final SchemaRegistryDecoder schemaRegistryDecoder; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/KinesisClientRecord.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/KinesisClientRecord.java index 390c7377..8a3d4d13 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/KinesisClientRecord.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/KinesisClientRecord.java @@ -18,6 +18,7 @@ package software.amazon.kinesis.retrieval; import java.nio.ByteBuffer; import java.time.Instant; +import com.amazonaws.services.schemaregistry.common.Schema; import lombok.Builder; import lombok.EqualsAndHashCode; import lombok.Getter; @@ -43,6 +44,7 @@ public class KinesisClientRecord { private final long subSequenceNumber; private final String explicitHashKey; private final boolean aggregated; + private final Schema schema; public static KinesisClientRecord fromRecord(Record record) { return KinesisClientRecord.builder().sequenceNumber(record.sequenceNumber()) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java index 9ab4ca9e..c38f442d 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java @@ -15,6 +15,7 @@ package software.amazon.kinesis.retrieval; +import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDeserializer; import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.NonNull; @@ -56,6 +57,12 @@ public class RetrievalConfig { @NonNull private final String applicationName; + /** + * Glue Schema Registry Deserializer instance. + * If this instance is set, KCL will try to decode messages that might be + * potentially encoded with Glue Schema Registry Serializer. + */ + private GlueSchemaRegistryDeserializer glueSchemaRegistryDeserializer = null; /** * AppStreamTracker either for multi stream tracking or single stream diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/schemaregistry/SchemaRegistryDecoder.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/schemaregistry/SchemaRegistryDecoder.java new file mode 100644 index 00000000..0418a00a --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/schemaregistry/SchemaRegistryDecoder.java @@ -0,0 +1,74 @@ +package software.amazon.kinesis.schemaregistry; + +import com.amazonaws.services.schemaregistry.common.Schema; +import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDeserializer; +import lombok.extern.slf4j.Slf4j; +import software.amazon.kinesis.retrieval.KinesisClientRecord; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +/** + * Identifies and decodes Glue Schema Registry data from incoming KinesisClientRecords. + */ +@Slf4j +public class SchemaRegistryDecoder { + private final GlueSchemaRegistryDeserializer glueSchemaRegistryDeserializer; + + public SchemaRegistryDecoder( + GlueSchemaRegistryDeserializer glueSchemaRegistryDeserializer) { + this.glueSchemaRegistryDeserializer = glueSchemaRegistryDeserializer; + } + + /** + * Process the list records and return records with schema and decoded data set. + * @param records List + * @return List + */ + public List decode( + final List records) { + final List decodedRecords = new ArrayList<>(); + + for (final KinesisClientRecord record : records) { + final KinesisClientRecord decodedRecord = decodeRecord(record); + decodedRecords.add(decodedRecord); + } + + return decodedRecords; + } + + private KinesisClientRecord decodeRecord(final KinesisClientRecord record) { + if (record.data() == null) { + return record; + } + + int length = record.data().remaining(); + byte[] data = new byte[length]; + record.data().get(data, 0, length); + + try { + if (!isSchemaEncoded(data)) { + return record; + } + + final Schema schema = glueSchemaRegistryDeserializer.getSchema(data); + final ByteBuffer recordData = ByteBuffer.wrap(glueSchemaRegistryDeserializer.getData(data)); + + return + record.toBuilder() + .schema(schema) + .data(recordData) + .build(); + } catch (Exception e) { + log.warn("Unable to decode Glue Schema Registry information from record {}: ", + record.sequenceNumber(), e); + //We ignore Glue Schema Registry failures and return the record. + return record; + } + } + + private boolean isSchemaEncoded(byte[] data) { + return glueSchemaRegistryDeserializer.canDeserialize(data); + } +} diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java index 235937d0..f94d82fd 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java @@ -60,7 +60,7 @@ import software.amazon.kinesis.processor.RecordProcessorCheckpointer; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.retrieval.AggregatorUtil; import software.amazon.kinesis.retrieval.RecordsPublisher; - +import software.amazon.kinesis.schemaregistry.SchemaRegistryDecoder; @RunWith(MockitoJUnitRunner.class) public class ConsumerStatesTest { @@ -117,6 +117,7 @@ public class ConsumerStatesTest { private boolean ignoreUnexpectedChildShards = false; private long idleTimeInMillis = 1000L; private Optional logWarningForTaskAfterMillis = Optional.empty(); + private SchemaRegistryDecoder schemaRegistryDecoder = null; @Before public void setup() { @@ -125,7 +126,7 @@ public class ConsumerStatesTest { taskBackoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, listShardsBackoffTimeInMillis, maxListShardsRetryAttempts, shouldCallProcessRecordsEvenForEmptyRecordList, idleTimeInMillis, INITIAL_POSITION_IN_STREAM, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, shardDetector, - new AggregatorUtil(), hierarchicalShardSyncer, metricsFactory, leaseCleanupManager); + new AggregatorUtil(), hierarchicalShardSyncer, metricsFactory, leaseCleanupManager, schemaRegistryDecoder); when(shardInfo.shardId()).thenReturn("shardId-000000000000"); when(shardInfo.streamIdentifierSerOpt()).thenReturn(Optional.of(StreamIdentifier.singleStreamInstance(STREAM_NAME).serialize())); consumer = spy(new ShardConsumer(recordsPublisher, executorService, shardInfo, logWarningForTaskAfterMillis, diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ProcessTaskTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ProcessTaskTest.java index 16e6426a..4f27dc09 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ProcessTaskTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ProcessTaskTest.java @@ -24,8 +24,11 @@ import static org.hamcrest.beans.HasPropertyWithValue.hasProperty; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -46,6 +49,9 @@ import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import com.amazonaws.services.schemaregistry.common.Schema; +import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDeserializer; +import com.google.common.collect.ImmutableList; import org.hamcrest.Description; import org.hamcrest.Matcher; import org.hamcrest.TypeSafeDiagnosingMatcher; @@ -60,6 +66,7 @@ import com.google.protobuf.ByteString; import lombok.Data; import lombok.Getter; +import software.amazon.awssdk.services.glue.model.DataFormat; import software.amazon.awssdk.services.kinesis.model.HashKeyRange; import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; @@ -75,10 +82,13 @@ import software.amazon.kinesis.retrieval.ThrottlingReporter; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import software.amazon.kinesis.retrieval.kpl.Messages; import software.amazon.kinesis.retrieval.kpl.Messages.AggregatedRecord; +import software.amazon.kinesis.schemaregistry.SchemaRegistryDecoder; @RunWith(MockitoJUnitRunner.class) public class ProcessTaskTest { private static final long IDLE_TIME_IN_MILLISECONDS = 100L; + private static final Schema SCHEMA_REGISTRY_SCHEMA = new Schema("{}", DataFormat.AVRO.toString(), "demoSchema"); + private static final byte[] SCHEMA_REGISTRY_PAYLOAD = new byte[] {01, 05, 03, 05}; private boolean shouldCallProcessRecordsEvenForEmptyRecordList = true; private boolean skipShardSyncAtWorkerInitializationIfLeasesExist = true; @@ -89,6 +99,9 @@ public class ProcessTaskTest { @Mock private ShardDetector shardDetector; + @Mock + private GlueSchemaRegistryDeserializer glueSchemaRegistryDeserializer; + private static final byte[] TEST_DATA = new byte[] { 1, 2, 3, 4 }; @@ -117,14 +130,28 @@ public class ProcessTaskTest { skipShardSyncAtWorkerInitializationIfLeasesExist); } + private ProcessTask makeProcessTask(ProcessRecordsInput processRecordsInput, GlueSchemaRegistryDeserializer deserializer) { + return makeProcessTask(processRecordsInput, new AggregatorUtil(), skipShardSyncAtWorkerInitializationIfLeasesExist, new SchemaRegistryDecoder(deserializer)); + } + private ProcessTask makeProcessTask(ProcessRecordsInput processRecordsInput, AggregatorUtil aggregatorUtil, boolean skipShardSync) { - return new ProcessTask(shardInfo, shardRecordProcessor, checkpointer, taskBackoffTimeMillis, - skipShardSync, shardDetector, throttlingReporter, - processRecordsInput, shouldCallProcessRecordsEvenForEmptyRecordList, IDLE_TIME_IN_MILLISECONDS, - aggregatorUtil, new NullMetricsFactory()); + return makeProcessTask(processRecordsInput, aggregatorUtil, skipShardSync, null); } + private ProcessTask makeProcessTask(ProcessRecordsInput processRecordsInput, AggregatorUtil aggregatorUtil, boolean skipShardSync, + SchemaRegistryDecoder schemaRegistryDecoder) { + return new ProcessTask(shardInfo, shardRecordProcessor, checkpointer, taskBackoffTimeMillis, + skipShardSync, shardDetector, throttlingReporter, + processRecordsInput, shouldCallProcessRecordsEvenForEmptyRecordList, IDLE_TIME_IN_MILLISECONDS, + aggregatorUtil, + new NullMetricsFactory(), + schemaRegistryDecoder + ); + } + + + @Test public void testProcessTaskWithShardEndReached() { @@ -140,6 +167,11 @@ public class ProcessTaskTest { .approximateArrivalTimestamp(arrival).data(ByteBuffer.wrap(TEST_DATA)).build(); } + private KinesisClientRecord makeKinesisClientRecord(String partitionKey, String sequenceNumber, Instant arrival, ByteBuffer data, Schema schema) { + return KinesisClientRecord.builder().partitionKey(partitionKey).sequenceNumber(sequenceNumber) + .approximateArrivalTimestamp(arrival).data(data).schema(schema).build(); + } + @Test public void testNonAggregatedKinesisRecord() { final String sqn = new BigInteger(128, new Random()).toString(); @@ -404,6 +436,139 @@ public class ProcessTaskTest { assertThat(outcome.processRecordsCall.records(), equalTo(expectedRecords)); } + @Test + public void testProcessTask_WhenSchemaRegistryRecordsAreSent_ProcessesThemSuccessfully() { + processTask = makeProcessTask(processRecordsInput, glueSchemaRegistryDeserializer); + final BigInteger sqn = new BigInteger(128, new Random()); + final BigInteger previousCheckpointSqn = BigInteger.valueOf(1); + final String pk = UUID.randomUUID().toString(); + final Date ts = new Date(System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert(4, TimeUnit.HOURS)); + + //Payload set to SchemaRegistry encoded data and schema to null + //to mimic Schema Registry encoded message from Kinesis stream. + final KinesisClientRecord schemaRegistryRecord = + makeKinesisClientRecord(pk, sqn.toString(), ts.toInstant(), ByteBuffer.wrap(SCHEMA_REGISTRY_PAYLOAD), null); + + final KinesisClientRecord nonSchemaRegistryRecord = + makeKinesisClientRecord(pk, sqn.toString(), ts.toInstant()); + + when(processRecordsInput.records()) + .thenReturn( + ImmutableList.of( + schemaRegistryRecord, + nonSchemaRegistryRecord + ) + ); + + doReturn(true).when(glueSchemaRegistryDeserializer).canDeserialize(SCHEMA_REGISTRY_PAYLOAD); + doReturn(TEST_DATA).when(glueSchemaRegistryDeserializer).getData(SCHEMA_REGISTRY_PAYLOAD); + doReturn(SCHEMA_REGISTRY_SCHEMA).when(glueSchemaRegistryDeserializer).getSchema(SCHEMA_REGISTRY_PAYLOAD); + + ShardRecordProcessorOutcome outcome = testWithRecords(processTask, new ExtendedSequenceNumber(previousCheckpointSqn.toString(), 0L), + new ExtendedSequenceNumber(previousCheckpointSqn.add(previousCheckpointSqn).toString(), 1L)); + + KinesisClientRecord decodedSchemaRegistryRecord = + makeKinesisClientRecord(pk, sqn.toString(), ts.toInstant(), ByteBuffer.wrap(TEST_DATA), SCHEMA_REGISTRY_SCHEMA); + List expectedRecords = + ImmutableList.of( + decodedSchemaRegistryRecord, + nonSchemaRegistryRecord + ); + + List actualRecords = outcome.getProcessRecordsCall().records(); + + assertEquals(expectedRecords, actualRecords); + + verify(glueSchemaRegistryDeserializer, times(1)).canDeserialize(SCHEMA_REGISTRY_PAYLOAD); + verify(glueSchemaRegistryDeserializer, times(1)).getSchema(SCHEMA_REGISTRY_PAYLOAD); + verify(glueSchemaRegistryDeserializer, times(1)).getData(SCHEMA_REGISTRY_PAYLOAD); + } + + @Test + public void testProcessTask_WhenSchemaRegistryDecodeCheckFails_IgnoresRecord() { + processTask = makeProcessTask(processRecordsInput, glueSchemaRegistryDeserializer); + final BigInteger sqn = new BigInteger(128, new Random()); + final BigInteger previousCheckpointSqn = BigInteger.valueOf(1); + final String pk = UUID.randomUUID().toString(); + final Date ts = new Date(System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert(4, TimeUnit.HOURS)); + + //Payload set to SchemaRegistry encoded data and schema to null + //to mimic Schema Registry encoded message from Kinesis stream. + final KinesisClientRecord schemaRegistryRecord = + makeKinesisClientRecord(pk, sqn.toString(), ts.toInstant(), ByteBuffer.wrap(SCHEMA_REGISTRY_PAYLOAD), null); + + final KinesisClientRecord nonSchemaRegistryRecord = + makeKinesisClientRecord(pk, sqn.toString(), ts.toInstant()); + + when(processRecordsInput.records()) + .thenReturn( + ImmutableList.of( + schemaRegistryRecord, + nonSchemaRegistryRecord + ) + ); + + doThrow(new RuntimeException("Invalid data")) + .when(glueSchemaRegistryDeserializer).canDeserialize(SCHEMA_REGISTRY_PAYLOAD); + + ShardRecordProcessorOutcome outcome = testWithRecords(processTask, new ExtendedSequenceNumber(previousCheckpointSqn.toString(), 0L), + new ExtendedSequenceNumber(previousCheckpointSqn.add(previousCheckpointSqn).toString(), 1L)); + + List expectedRecords = + ImmutableList.of( + schemaRegistryRecord, + nonSchemaRegistryRecord + ); + + List actualRecords = outcome.getProcessRecordsCall().records(); + + assertEquals(expectedRecords, actualRecords); + } + + @Test + public void testProcessTask_WhenSchemaRegistryDecodingFails_IgnoresRecord() { + processTask = makeProcessTask(processRecordsInput, glueSchemaRegistryDeserializer); + final BigInteger sqn = new BigInteger(128, new Random()); + final BigInteger previousCheckpointSqn = BigInteger.valueOf(1); + final String pk = UUID.randomUUID().toString(); + final Date ts = new Date(System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert(4, TimeUnit.HOURS)); + + //Payload set to SchemaRegistry encoded data and schema to null + //to mimic Schema Registry encoded message from Kinesis stream. + final KinesisClientRecord schemaRegistryRecord = + makeKinesisClientRecord(pk, sqn.toString(), ts.toInstant(), ByteBuffer.wrap(SCHEMA_REGISTRY_PAYLOAD), null); + + final KinesisClientRecord nonSchemaRegistryRecord = + makeKinesisClientRecord(pk, sqn.toString(), ts.toInstant()); + + when(processRecordsInput.records()) + .thenReturn( + ImmutableList.of( + schemaRegistryRecord, + nonSchemaRegistryRecord + ) + ); + + doReturn(true) + .when(glueSchemaRegistryDeserializer).canDeserialize(SCHEMA_REGISTRY_PAYLOAD); + + doThrow(new RuntimeException("Cannot decode data")) + .when(glueSchemaRegistryDeserializer).getData(SCHEMA_REGISTRY_PAYLOAD); + + ShardRecordProcessorOutcome outcome = testWithRecords(processTask, new ExtendedSequenceNumber(previousCheckpointSqn.toString(), 0L), + new ExtendedSequenceNumber(previousCheckpointSqn.add(previousCheckpointSqn).toString(), 1L)); + + List expectedRecords = + ImmutableList.of( + schemaRegistryRecord, + nonSchemaRegistryRecord + ); + + List actualRecords = outcome.getProcessRecordsCall().records(); + + assertEquals(expectedRecords, actualRecords); + } + private KinesisClientRecord createAndRegisterAggregatedRecord(BigInteger sequenceNumber, AggregatedRecord.Builder aggregatedRecord, int i, Instant approximateArrivalTime) { byte[] dataArray = new byte[1024]; From 99cb3356bb71164950f640b180c5f02371659b56 Mon Sep 17 00:00:00 2001 From: Ravindranath Kakarla Date: Wed, 18 Nov 2020 13:25:00 -0800 Subject: [PATCH 2/3] Updating the dependency path --- amazon-kinesis-client/pom.xml | 2 +- .../amazon/kinesis/lifecycle/ShardConsumerArgument.java | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/amazon-kinesis-client/pom.xml b/amazon-kinesis-client/pom.xml index 69ce44db..3db0957c 100644 --- a/amazon-kinesis-client/pom.xml +++ b/amazon-kinesis-client/pom.xml @@ -75,7 +75,7 @@ ${awssdk.version} - software.aws.glue + software.amazon.glue schema-registry-serde 1.0.0 diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java index 505f4e06..0518b830 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java @@ -15,7 +15,6 @@ package software.amazon.kinesis.lifecycle; -import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDeserializer; import lombok.Data; import lombok.NonNull; import lombok.experimental.Accessors; From a7a61616cb78d3775d99b67bfa1a7261eae56921 Mon Sep 17 00:00:00 2001 From: Ravindranath Kakarla Date: Thu, 19 Nov 2020 10:47:07 -0800 Subject: [PATCH 3/3] Removing dependency on Glue SDK. Adding dependency on the Glue Schema Registry common package. --- amazon-kinesis-client/pom.xml | 5 +++++ .../software/amazon/kinesis/lifecycle/ProcessTaskTest.java | 3 +-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/amazon-kinesis-client/pom.xml b/amazon-kinesis-client/pom.xml index 3db0957c..1a3a20e1 100644 --- a/amazon-kinesis-client/pom.xml +++ b/amazon-kinesis-client/pom.xml @@ -79,6 +79,11 @@ schema-registry-serde 1.0.0 + + software.amazon.glue + schema-registry-common + 1.0.0 + com.google.guava guava diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ProcessTaskTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ProcessTaskTest.java index 4f27dc09..12476837 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ProcessTaskTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ProcessTaskTest.java @@ -66,7 +66,6 @@ import com.google.protobuf.ByteString; import lombok.Data; import lombok.Getter; -import software.amazon.awssdk.services.glue.model.DataFormat; import software.amazon.awssdk.services.kinesis.model.HashKeyRange; import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; @@ -87,7 +86,7 @@ import software.amazon.kinesis.schemaregistry.SchemaRegistryDecoder; @RunWith(MockitoJUnitRunner.class) public class ProcessTaskTest { private static final long IDLE_TIME_IN_MILLISECONDS = 100L; - private static final Schema SCHEMA_REGISTRY_SCHEMA = new Schema("{}", DataFormat.AVRO.toString(), "demoSchema"); + private static final Schema SCHEMA_REGISTRY_SCHEMA = new Schema("{}", "AVRO", "demoSchema"); private static final byte[] SCHEMA_REGISTRY_PAYLOAD = new byte[] {01, 05, 03, 05}; private boolean shouldCallProcessRecordsEvenForEmptyRecordList = true;