diff --git a/amazon-kinesis-client/pom.xml b/amazon-kinesis-client/pom.xml
index 8c5ff5a4..1a3a20e1 100644
--- a/amazon-kinesis-client/pom.xml
+++ b/amazon-kinesis-client/pom.xml
@@ -74,6 +74,16 @@
netty-nio-client
${awssdk.version}
+
+ software.amazon.glue
+ schema-registry-serde
+ 1.0.0
+
+
+ software.amazon.glue
+ schema-registry-common
+ 1.0.0
+
com.google.guava
guava
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java
index ed2f889c..d7523549 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java
@@ -97,6 +97,7 @@ import software.amazon.kinesis.processor.ShutdownNotificationAware;
import software.amazon.kinesis.retrieval.AggregatorUtil;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RetrievalConfig;
+import software.amazon.kinesis.schemaregistry.SchemaRegistryDecoder;
import static software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy.StreamsLeasesDeletionType;
@@ -167,6 +168,7 @@ public class Scheduler implements Runnable {
private final LeaderDecider leaderDecider;
private final Map staleStreamDeletionMap = new HashMap<>();
private final LeaseCleanupManager leaseCleanupManager;
+ private final SchemaRegistryDecoder schemaRegistryDecoder;
// Holds consumers for shards the worker is currently tracking. Key is shard
// info, value is ShardConsumer.
@@ -296,6 +298,10 @@ public class Scheduler implements Runnable {
leaseManagementConfig.leasesRecoveryAuditorInconsistencyConfidenceThreshold());
this.leaseCleanupManager = this.leaseManagementConfig.leaseManagementFactory(leaseSerializer, isMultiStreamMode)
.createLeaseCleanupManager(metricsFactory);
+ this.schemaRegistryDecoder =
+ this.retrievalConfig.glueSchemaRegistryDeserializer() == null ?
+ null
+ : new SchemaRegistryDecoder(this.retrievalConfig.glueSchemaRegistryDeserializer());
}
/**
@@ -929,7 +935,9 @@ public class Scheduler implements Runnable {
aggregatorUtil,
hierarchicalShardSyncerProvider.apply(streamConfig),
metricsFactory,
- leaseCleanupManager);
+ leaseCleanupManager,
+ schemaRegistryDecoder
+ );
return new ShardConsumer(cache, executorService, shardInfo, lifecycleConfig.logWarningForTaskAfterMillis(),
argument, lifecycleConfig.taskExecutionListener(), lifecycleConfig.readTimeoutsToIgnoreBeforeWarning());
}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java
index 4d894d94..c4a87082 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java
@@ -269,7 +269,9 @@ class ConsumerStates {
input,
argument.shouldCallProcessRecordsEvenForEmptyRecordList(),
argument.idleTimeInMilliseconds(),
- argument.aggregatorUtil(), argument.metricsFactory()
+ argument.aggregatorUtil(),
+ argument.metricsFactory(),
+ argument.schemaRegistryDecoder()
);
}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java
index 6c52e0de..b3ba8a7d 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java
@@ -36,6 +36,7 @@ import software.amazon.kinesis.retrieval.AggregatorUtil;
import software.amazon.kinesis.retrieval.KinesisClientRecord;
import software.amazon.kinesis.retrieval.ThrottlingReporter;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
+import software.amazon.kinesis.schemaregistry.SchemaRegistryDecoder;
/**
* Task for fetching data records and invoking processRecords() on the record processor instance.
@@ -62,6 +63,7 @@ public class ProcessTask implements ConsumerTask {
private final MetricsFactory metricsFactory;
private final AggregatorUtil aggregatorUtil;
private final String shardInfoId;
+ private final SchemaRegistryDecoder schemaRegistryDecoder;
public ProcessTask(@NonNull ShardInfo shardInfo,
@NonNull ShardRecordProcessor shardRecordProcessor,
@@ -74,7 +76,8 @@ public class ProcessTask implements ConsumerTask {
boolean shouldCallProcessRecordsEvenForEmptyRecordList,
long idleTimeInMilliseconds,
@NonNull AggregatorUtil aggregatorUtil,
- @NonNull MetricsFactory metricsFactory) {
+ @NonNull MetricsFactory metricsFactory,
+ SchemaRegistryDecoder schemaRegistryDecoder) {
this.shardInfo = shardInfo;
this.shardInfoId = ShardInfo.getLeaseKey(shardInfo);
this.shardRecordProcessor = shardRecordProcessor;
@@ -85,6 +88,7 @@ public class ProcessTask implements ConsumerTask {
this.shouldCallProcessRecordsEvenForEmptyRecordList = shouldCallProcessRecordsEvenForEmptyRecordList;
this.idleTimeInMilliseconds = idleTimeInMilliseconds;
this.metricsFactory = metricsFactory;
+ this.schemaRegistryDecoder = schemaRegistryDecoder;
if (!skipShardSyncAtWorkerInitializationIfLeasesExist) {
this.shard = shardDetector.shard(shardInfo.shardId());
@@ -133,6 +137,9 @@ public class ProcessTask implements ConsumerTask {
throttlingReporter.success();
List records = deaggregateAnyKplRecords(processRecordsInput.records());
+ if (schemaRegistryDecoder != null) {
+ records = schemaRegistryDecoder.decode(records);
+ }
if (!records.isEmpty()) {
scope.addData(RECORDS_PROCESSED_METRIC, records.size(), StandardUnit.COUNT, MetricsLevel.SUMMARY);
@@ -163,6 +170,8 @@ public class ProcessTask implements ConsumerTask {
}
}
+
+
private List deaggregateAnyKplRecords(List records) {
if (shard == null) {
return aggregatorUtil.deaggregate(records);
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java
index 0f18891c..0518b830 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java
@@ -32,6 +32,7 @@ import software.amazon.kinesis.processor.Checkpointer;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.retrieval.AggregatorUtil;
import software.amazon.kinesis.retrieval.RecordsPublisher;
+import software.amazon.kinesis.schemaregistry.SchemaRegistryDecoder;
import java.util.concurrent.ExecutorService;
@@ -73,4 +74,5 @@ public class ShardConsumerArgument {
@NonNull
private final MetricsFactory metricsFactory;
private final LeaseCleanupManager leaseCleanupManager;
+ private final SchemaRegistryDecoder schemaRegistryDecoder;
}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/KinesisClientRecord.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/KinesisClientRecord.java
index 390c7377..8a3d4d13 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/KinesisClientRecord.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/KinesisClientRecord.java
@@ -18,6 +18,7 @@ package software.amazon.kinesis.retrieval;
import java.nio.ByteBuffer;
import java.time.Instant;
+import com.amazonaws.services.schemaregistry.common.Schema;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
@@ -43,6 +44,7 @@ public class KinesisClientRecord {
private final long subSequenceNumber;
private final String explicitHashKey;
private final boolean aggregated;
+ private final Schema schema;
public static KinesisClientRecord fromRecord(Record record) {
return KinesisClientRecord.builder().sequenceNumber(record.sequenceNumber())
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java
index 9ab4ca9e..c38f442d 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java
@@ -15,6 +15,7 @@
package software.amazon.kinesis.retrieval;
+import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDeserializer;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NonNull;
@@ -56,6 +57,12 @@ public class RetrievalConfig {
@NonNull
private final String applicationName;
+ /**
+ * Glue Schema Registry Deserializer instance.
+ * If this instance is set, KCL will try to decode messages that might be
+ * potentially encoded with Glue Schema Registry Serializer.
+ */
+ private GlueSchemaRegistryDeserializer glueSchemaRegistryDeserializer = null;
/**
* AppStreamTracker either for multi stream tracking or single stream
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/schemaregistry/SchemaRegistryDecoder.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/schemaregistry/SchemaRegistryDecoder.java
new file mode 100644
index 00000000..0418a00a
--- /dev/null
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/schemaregistry/SchemaRegistryDecoder.java
@@ -0,0 +1,74 @@
+package software.amazon.kinesis.schemaregistry;
+
+import com.amazonaws.services.schemaregistry.common.Schema;
+import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDeserializer;
+import lombok.extern.slf4j.Slf4j;
+import software.amazon.kinesis.retrieval.KinesisClientRecord;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Identifies and decodes Glue Schema Registry data from incoming KinesisClientRecords.
+ */
+@Slf4j
+public class SchemaRegistryDecoder {
+ private final GlueSchemaRegistryDeserializer glueSchemaRegistryDeserializer;
+
+ public SchemaRegistryDecoder(
+ GlueSchemaRegistryDeserializer glueSchemaRegistryDeserializer) {
+ this.glueSchemaRegistryDeserializer = glueSchemaRegistryDeserializer;
+ }
+
+ /**
+ * Process the list records and return records with schema and decoded data set.
+ * @param records List
+ * @return List
+ */
+ public List decode(
+ final List records) {
+ final List decodedRecords = new ArrayList<>();
+
+ for (final KinesisClientRecord record : records) {
+ final KinesisClientRecord decodedRecord = decodeRecord(record);
+ decodedRecords.add(decodedRecord);
+ }
+
+ return decodedRecords;
+ }
+
+ private KinesisClientRecord decodeRecord(final KinesisClientRecord record) {
+ if (record.data() == null) {
+ return record;
+ }
+
+ int length = record.data().remaining();
+ byte[] data = new byte[length];
+ record.data().get(data, 0, length);
+
+ try {
+ if (!isSchemaEncoded(data)) {
+ return record;
+ }
+
+ final Schema schema = glueSchemaRegistryDeserializer.getSchema(data);
+ final ByteBuffer recordData = ByteBuffer.wrap(glueSchemaRegistryDeserializer.getData(data));
+
+ return
+ record.toBuilder()
+ .schema(schema)
+ .data(recordData)
+ .build();
+ } catch (Exception e) {
+ log.warn("Unable to decode Glue Schema Registry information from record {}: ",
+ record.sequenceNumber(), e);
+ //We ignore Glue Schema Registry failures and return the record.
+ return record;
+ }
+ }
+
+ private boolean isSchemaEncoded(byte[] data) {
+ return glueSchemaRegistryDeserializer.canDeserialize(data);
+ }
+}
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java
index 235937d0..f94d82fd 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java
@@ -60,7 +60,7 @@ import software.amazon.kinesis.processor.RecordProcessorCheckpointer;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.retrieval.AggregatorUtil;
import software.amazon.kinesis.retrieval.RecordsPublisher;
-
+import software.amazon.kinesis.schemaregistry.SchemaRegistryDecoder;
@RunWith(MockitoJUnitRunner.class)
public class ConsumerStatesTest {
@@ -117,6 +117,7 @@ public class ConsumerStatesTest {
private boolean ignoreUnexpectedChildShards = false;
private long idleTimeInMillis = 1000L;
private Optional logWarningForTaskAfterMillis = Optional.empty();
+ private SchemaRegistryDecoder schemaRegistryDecoder = null;
@Before
public void setup() {
@@ -125,7 +126,7 @@ public class ConsumerStatesTest {
taskBackoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, listShardsBackoffTimeInMillis,
maxListShardsRetryAttempts, shouldCallProcessRecordsEvenForEmptyRecordList, idleTimeInMillis,
INITIAL_POSITION_IN_STREAM, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, shardDetector,
- new AggregatorUtil(), hierarchicalShardSyncer, metricsFactory, leaseCleanupManager);
+ new AggregatorUtil(), hierarchicalShardSyncer, metricsFactory, leaseCleanupManager, schemaRegistryDecoder);
when(shardInfo.shardId()).thenReturn("shardId-000000000000");
when(shardInfo.streamIdentifierSerOpt()).thenReturn(Optional.of(StreamIdentifier.singleStreamInstance(STREAM_NAME).serialize()));
consumer = spy(new ShardConsumer(recordsPublisher, executorService, shardInfo, logWarningForTaskAfterMillis,
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ProcessTaskTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ProcessTaskTest.java
index 16e6426a..12476837 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ProcessTaskTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ProcessTaskTest.java
@@ -24,8 +24,11 @@ import static org.hamcrest.beans.HasPropertyWithValue.hasProperty;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -46,6 +49,9 @@ import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
+import com.amazonaws.services.schemaregistry.common.Schema;
+import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDeserializer;
+import com.google.common.collect.ImmutableList;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeDiagnosingMatcher;
@@ -75,10 +81,13 @@ import software.amazon.kinesis.retrieval.ThrottlingReporter;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import software.amazon.kinesis.retrieval.kpl.Messages;
import software.amazon.kinesis.retrieval.kpl.Messages.AggregatedRecord;
+import software.amazon.kinesis.schemaregistry.SchemaRegistryDecoder;
@RunWith(MockitoJUnitRunner.class)
public class ProcessTaskTest {
private static final long IDLE_TIME_IN_MILLISECONDS = 100L;
+ private static final Schema SCHEMA_REGISTRY_SCHEMA = new Schema("{}", "AVRO", "demoSchema");
+ private static final byte[] SCHEMA_REGISTRY_PAYLOAD = new byte[] {01, 05, 03, 05};
private boolean shouldCallProcessRecordsEvenForEmptyRecordList = true;
private boolean skipShardSyncAtWorkerInitializationIfLeasesExist = true;
@@ -89,6 +98,9 @@ public class ProcessTaskTest {
@Mock
private ShardDetector shardDetector;
+ @Mock
+ private GlueSchemaRegistryDeserializer glueSchemaRegistryDeserializer;
+
private static final byte[] TEST_DATA = new byte[] { 1, 2, 3, 4 };
@@ -117,14 +129,28 @@ public class ProcessTaskTest {
skipShardSyncAtWorkerInitializationIfLeasesExist);
}
+ private ProcessTask makeProcessTask(ProcessRecordsInput processRecordsInput, GlueSchemaRegistryDeserializer deserializer) {
+ return makeProcessTask(processRecordsInput, new AggregatorUtil(), skipShardSyncAtWorkerInitializationIfLeasesExist, new SchemaRegistryDecoder(deserializer));
+ }
+
private ProcessTask makeProcessTask(ProcessRecordsInput processRecordsInput, AggregatorUtil aggregatorUtil,
boolean skipShardSync) {
- return new ProcessTask(shardInfo, shardRecordProcessor, checkpointer, taskBackoffTimeMillis,
- skipShardSync, shardDetector, throttlingReporter,
- processRecordsInput, shouldCallProcessRecordsEvenForEmptyRecordList, IDLE_TIME_IN_MILLISECONDS,
- aggregatorUtil, new NullMetricsFactory());
+ return makeProcessTask(processRecordsInput, aggregatorUtil, skipShardSync, null);
}
+ private ProcessTask makeProcessTask(ProcessRecordsInput processRecordsInput, AggregatorUtil aggregatorUtil, boolean skipShardSync,
+ SchemaRegistryDecoder schemaRegistryDecoder) {
+ return new ProcessTask(shardInfo, shardRecordProcessor, checkpointer, taskBackoffTimeMillis,
+ skipShardSync, shardDetector, throttlingReporter,
+ processRecordsInput, shouldCallProcessRecordsEvenForEmptyRecordList, IDLE_TIME_IN_MILLISECONDS,
+ aggregatorUtil,
+ new NullMetricsFactory(),
+ schemaRegistryDecoder
+ );
+ }
+
+
+
@Test
public void testProcessTaskWithShardEndReached() {
@@ -140,6 +166,11 @@ public class ProcessTaskTest {
.approximateArrivalTimestamp(arrival).data(ByteBuffer.wrap(TEST_DATA)).build();
}
+ private KinesisClientRecord makeKinesisClientRecord(String partitionKey, String sequenceNumber, Instant arrival, ByteBuffer data, Schema schema) {
+ return KinesisClientRecord.builder().partitionKey(partitionKey).sequenceNumber(sequenceNumber)
+ .approximateArrivalTimestamp(arrival).data(data).schema(schema).build();
+ }
+
@Test
public void testNonAggregatedKinesisRecord() {
final String sqn = new BigInteger(128, new Random()).toString();
@@ -404,6 +435,139 @@ public class ProcessTaskTest {
assertThat(outcome.processRecordsCall.records(), equalTo(expectedRecords));
}
+ @Test
+ public void testProcessTask_WhenSchemaRegistryRecordsAreSent_ProcessesThemSuccessfully() {
+ processTask = makeProcessTask(processRecordsInput, glueSchemaRegistryDeserializer);
+ final BigInteger sqn = new BigInteger(128, new Random());
+ final BigInteger previousCheckpointSqn = BigInteger.valueOf(1);
+ final String pk = UUID.randomUUID().toString();
+ final Date ts = new Date(System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert(4, TimeUnit.HOURS));
+
+ //Payload set to SchemaRegistry encoded data and schema to null
+ //to mimic Schema Registry encoded message from Kinesis stream.
+ final KinesisClientRecord schemaRegistryRecord =
+ makeKinesisClientRecord(pk, sqn.toString(), ts.toInstant(), ByteBuffer.wrap(SCHEMA_REGISTRY_PAYLOAD), null);
+
+ final KinesisClientRecord nonSchemaRegistryRecord =
+ makeKinesisClientRecord(pk, sqn.toString(), ts.toInstant());
+
+ when(processRecordsInput.records())
+ .thenReturn(
+ ImmutableList.of(
+ schemaRegistryRecord,
+ nonSchemaRegistryRecord
+ )
+ );
+
+ doReturn(true).when(glueSchemaRegistryDeserializer).canDeserialize(SCHEMA_REGISTRY_PAYLOAD);
+ doReturn(TEST_DATA).when(glueSchemaRegistryDeserializer).getData(SCHEMA_REGISTRY_PAYLOAD);
+ doReturn(SCHEMA_REGISTRY_SCHEMA).when(glueSchemaRegistryDeserializer).getSchema(SCHEMA_REGISTRY_PAYLOAD);
+
+ ShardRecordProcessorOutcome outcome = testWithRecords(processTask, new ExtendedSequenceNumber(previousCheckpointSqn.toString(), 0L),
+ new ExtendedSequenceNumber(previousCheckpointSqn.add(previousCheckpointSqn).toString(), 1L));
+
+ KinesisClientRecord decodedSchemaRegistryRecord =
+ makeKinesisClientRecord(pk, sqn.toString(), ts.toInstant(), ByteBuffer.wrap(TEST_DATA), SCHEMA_REGISTRY_SCHEMA);
+ List expectedRecords =
+ ImmutableList.of(
+ decodedSchemaRegistryRecord,
+ nonSchemaRegistryRecord
+ );
+
+ List actualRecords = outcome.getProcessRecordsCall().records();
+
+ assertEquals(expectedRecords, actualRecords);
+
+ verify(glueSchemaRegistryDeserializer, times(1)).canDeserialize(SCHEMA_REGISTRY_PAYLOAD);
+ verify(glueSchemaRegistryDeserializer, times(1)).getSchema(SCHEMA_REGISTRY_PAYLOAD);
+ verify(glueSchemaRegistryDeserializer, times(1)).getData(SCHEMA_REGISTRY_PAYLOAD);
+ }
+
+ @Test
+ public void testProcessTask_WhenSchemaRegistryDecodeCheckFails_IgnoresRecord() {
+ processTask = makeProcessTask(processRecordsInput, glueSchemaRegistryDeserializer);
+ final BigInteger sqn = new BigInteger(128, new Random());
+ final BigInteger previousCheckpointSqn = BigInteger.valueOf(1);
+ final String pk = UUID.randomUUID().toString();
+ final Date ts = new Date(System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert(4, TimeUnit.HOURS));
+
+ //Payload set to SchemaRegistry encoded data and schema to null
+ //to mimic Schema Registry encoded message from Kinesis stream.
+ final KinesisClientRecord schemaRegistryRecord =
+ makeKinesisClientRecord(pk, sqn.toString(), ts.toInstant(), ByteBuffer.wrap(SCHEMA_REGISTRY_PAYLOAD), null);
+
+ final KinesisClientRecord nonSchemaRegistryRecord =
+ makeKinesisClientRecord(pk, sqn.toString(), ts.toInstant());
+
+ when(processRecordsInput.records())
+ .thenReturn(
+ ImmutableList.of(
+ schemaRegistryRecord,
+ nonSchemaRegistryRecord
+ )
+ );
+
+ doThrow(new RuntimeException("Invalid data"))
+ .when(glueSchemaRegistryDeserializer).canDeserialize(SCHEMA_REGISTRY_PAYLOAD);
+
+ ShardRecordProcessorOutcome outcome = testWithRecords(processTask, new ExtendedSequenceNumber(previousCheckpointSqn.toString(), 0L),
+ new ExtendedSequenceNumber(previousCheckpointSqn.add(previousCheckpointSqn).toString(), 1L));
+
+ List expectedRecords =
+ ImmutableList.of(
+ schemaRegistryRecord,
+ nonSchemaRegistryRecord
+ );
+
+ List actualRecords = outcome.getProcessRecordsCall().records();
+
+ assertEquals(expectedRecords, actualRecords);
+ }
+
+ @Test
+ public void testProcessTask_WhenSchemaRegistryDecodingFails_IgnoresRecord() {
+ processTask = makeProcessTask(processRecordsInput, glueSchemaRegistryDeserializer);
+ final BigInteger sqn = new BigInteger(128, new Random());
+ final BigInteger previousCheckpointSqn = BigInteger.valueOf(1);
+ final String pk = UUID.randomUUID().toString();
+ final Date ts = new Date(System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert(4, TimeUnit.HOURS));
+
+ //Payload set to SchemaRegistry encoded data and schema to null
+ //to mimic Schema Registry encoded message from Kinesis stream.
+ final KinesisClientRecord schemaRegistryRecord =
+ makeKinesisClientRecord(pk, sqn.toString(), ts.toInstant(), ByteBuffer.wrap(SCHEMA_REGISTRY_PAYLOAD), null);
+
+ final KinesisClientRecord nonSchemaRegistryRecord =
+ makeKinesisClientRecord(pk, sqn.toString(), ts.toInstant());
+
+ when(processRecordsInput.records())
+ .thenReturn(
+ ImmutableList.of(
+ schemaRegistryRecord,
+ nonSchemaRegistryRecord
+ )
+ );
+
+ doReturn(true)
+ .when(glueSchemaRegistryDeserializer).canDeserialize(SCHEMA_REGISTRY_PAYLOAD);
+
+ doThrow(new RuntimeException("Cannot decode data"))
+ .when(glueSchemaRegistryDeserializer).getData(SCHEMA_REGISTRY_PAYLOAD);
+
+ ShardRecordProcessorOutcome outcome = testWithRecords(processTask, new ExtendedSequenceNumber(previousCheckpointSqn.toString(), 0L),
+ new ExtendedSequenceNumber(previousCheckpointSqn.add(previousCheckpointSqn).toString(), 1L));
+
+ List expectedRecords =
+ ImmutableList.of(
+ schemaRegistryRecord,
+ nonSchemaRegistryRecord
+ );
+
+ List actualRecords = outcome.getProcessRecordsCall().records();
+
+ assertEquals(expectedRecords, actualRecords);
+ }
+
private KinesisClientRecord createAndRegisterAggregatedRecord(BigInteger sequenceNumber,
AggregatedRecord.Builder aggregatedRecord, int i, Instant approximateArrivalTime) {
byte[] dataArray = new byte[1024];