Merge pull request #754 from blacktooth/master

Add support for GlueSchemaRegistry message deserialization.
This commit is contained in:
ashwing 2020-11-19 13:57:43 -08:00 committed by GitHub
commit 8da8ddc9f5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 288 additions and 9 deletions

View file

@ -74,6 +74,16 @@
<artifactId>netty-nio-client</artifactId>
<version>${awssdk.version}</version>
</dependency>
<dependency>
<groupId>software.amazon.glue</groupId>
<artifactId>schema-registry-serde</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>software.amazon.glue</groupId>
<artifactId>schema-registry-common</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>

View file

@ -97,6 +97,7 @@ import software.amazon.kinesis.processor.ShutdownNotificationAware;
import software.amazon.kinesis.retrieval.AggregatorUtil;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RetrievalConfig;
import software.amazon.kinesis.schemaregistry.SchemaRegistryDecoder;
import static software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy.StreamsLeasesDeletionType;
@ -167,6 +168,7 @@ public class Scheduler implements Runnable {
private final LeaderDecider leaderDecider;
private final Map<StreamIdentifier, Instant> staleStreamDeletionMap = new HashMap<>();
private final LeaseCleanupManager leaseCleanupManager;
private final SchemaRegistryDecoder schemaRegistryDecoder;
// Holds consumers for shards the worker is currently tracking. Key is shard
// info, value is ShardConsumer.
@ -296,6 +298,10 @@ public class Scheduler implements Runnable {
leaseManagementConfig.leasesRecoveryAuditorInconsistencyConfidenceThreshold());
this.leaseCleanupManager = this.leaseManagementConfig.leaseManagementFactory(leaseSerializer, isMultiStreamMode)
.createLeaseCleanupManager(metricsFactory);
this.schemaRegistryDecoder =
this.retrievalConfig.glueSchemaRegistryDeserializer() == null ?
null
: new SchemaRegistryDecoder(this.retrievalConfig.glueSchemaRegistryDeserializer());
}
/**
@ -929,7 +935,9 @@ public class Scheduler implements Runnable {
aggregatorUtil,
hierarchicalShardSyncerProvider.apply(streamConfig),
metricsFactory,
leaseCleanupManager);
leaseCleanupManager,
schemaRegistryDecoder
);
return new ShardConsumer(cache, executorService, shardInfo, lifecycleConfig.logWarningForTaskAfterMillis(),
argument, lifecycleConfig.taskExecutionListener(), lifecycleConfig.readTimeoutsToIgnoreBeforeWarning());
}

View file

@ -269,7 +269,9 @@ class ConsumerStates {
input,
argument.shouldCallProcessRecordsEvenForEmptyRecordList(),
argument.idleTimeInMilliseconds(),
argument.aggregatorUtil(), argument.metricsFactory()
argument.aggregatorUtil(),
argument.metricsFactory(),
argument.schemaRegistryDecoder()
);
}

View file

@ -36,6 +36,7 @@ import software.amazon.kinesis.retrieval.AggregatorUtil;
import software.amazon.kinesis.retrieval.KinesisClientRecord;
import software.amazon.kinesis.retrieval.ThrottlingReporter;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import software.amazon.kinesis.schemaregistry.SchemaRegistryDecoder;
/**
* Task for fetching data records and invoking processRecords() on the record processor instance.
@ -62,6 +63,7 @@ public class ProcessTask implements ConsumerTask {
private final MetricsFactory metricsFactory;
private final AggregatorUtil aggregatorUtil;
private final String shardInfoId;
private final SchemaRegistryDecoder schemaRegistryDecoder;
public ProcessTask(@NonNull ShardInfo shardInfo,
@NonNull ShardRecordProcessor shardRecordProcessor,
@ -74,7 +76,8 @@ public class ProcessTask implements ConsumerTask {
boolean shouldCallProcessRecordsEvenForEmptyRecordList,
long idleTimeInMilliseconds,
@NonNull AggregatorUtil aggregatorUtil,
@NonNull MetricsFactory metricsFactory) {
@NonNull MetricsFactory metricsFactory,
SchemaRegistryDecoder schemaRegistryDecoder) {
this.shardInfo = shardInfo;
this.shardInfoId = ShardInfo.getLeaseKey(shardInfo);
this.shardRecordProcessor = shardRecordProcessor;
@ -85,6 +88,7 @@ public class ProcessTask implements ConsumerTask {
this.shouldCallProcessRecordsEvenForEmptyRecordList = shouldCallProcessRecordsEvenForEmptyRecordList;
this.idleTimeInMilliseconds = idleTimeInMilliseconds;
this.metricsFactory = metricsFactory;
this.schemaRegistryDecoder = schemaRegistryDecoder;
if (!skipShardSyncAtWorkerInitializationIfLeasesExist) {
this.shard = shardDetector.shard(shardInfo.shardId());
@ -133,6 +137,9 @@ public class ProcessTask implements ConsumerTask {
throttlingReporter.success();
List<KinesisClientRecord> records = deaggregateAnyKplRecords(processRecordsInput.records());
if (schemaRegistryDecoder != null) {
records = schemaRegistryDecoder.decode(records);
}
if (!records.isEmpty()) {
scope.addData(RECORDS_PROCESSED_METRIC, records.size(), StandardUnit.COUNT, MetricsLevel.SUMMARY);
@ -163,6 +170,8 @@ public class ProcessTask implements ConsumerTask {
}
}
private List<KinesisClientRecord> deaggregateAnyKplRecords(List<KinesisClientRecord> records) {
if (shard == null) {
return aggregatorUtil.deaggregate(records);

View file

@ -32,6 +32,7 @@ import software.amazon.kinesis.processor.Checkpointer;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.retrieval.AggregatorUtil;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.schemaregistry.SchemaRegistryDecoder;
import java.util.concurrent.ExecutorService;
@ -73,4 +74,5 @@ public class ShardConsumerArgument {
@NonNull
private final MetricsFactory metricsFactory;
private final LeaseCleanupManager leaseCleanupManager;
private final SchemaRegistryDecoder schemaRegistryDecoder;
}

View file

@ -18,6 +18,7 @@ package software.amazon.kinesis.retrieval;
import java.nio.ByteBuffer;
import java.time.Instant;
import com.amazonaws.services.schemaregistry.common.Schema;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
@ -43,6 +44,7 @@ public class KinesisClientRecord {
private final long subSequenceNumber;
private final String explicitHashKey;
private final boolean aggregated;
private final Schema schema;
public static KinesisClientRecord fromRecord(Record record) {
return KinesisClientRecord.builder().sequenceNumber(record.sequenceNumber())

View file

@ -15,6 +15,7 @@
package software.amazon.kinesis.retrieval;
import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDeserializer;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NonNull;
@ -56,6 +57,12 @@ public class RetrievalConfig {
@NonNull
private final String applicationName;
/**
* Glue Schema Registry Deserializer instance.
* If this instance is set, KCL will try to decode messages that might be
* potentially encoded with Glue Schema Registry Serializer.
*/
private GlueSchemaRegistryDeserializer glueSchemaRegistryDeserializer = null;
/**
* AppStreamTracker either for multi stream tracking or single stream

View file

@ -0,0 +1,74 @@
package software.amazon.kinesis.schemaregistry;
import com.amazonaws.services.schemaregistry.common.Schema;
import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDeserializer;
import lombok.extern.slf4j.Slf4j;
import software.amazon.kinesis.retrieval.KinesisClientRecord;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
/**
* Identifies and decodes Glue Schema Registry data from incoming KinesisClientRecords.
*/
@Slf4j
public class SchemaRegistryDecoder {
private final GlueSchemaRegistryDeserializer glueSchemaRegistryDeserializer;
public SchemaRegistryDecoder(
GlueSchemaRegistryDeserializer glueSchemaRegistryDeserializer) {
this.glueSchemaRegistryDeserializer = glueSchemaRegistryDeserializer;
}
/**
* Process the list records and return records with schema and decoded data set.
* @param records List<KinesisClientRecord>
* @return List<KinesisClientRecord>
*/
public List<KinesisClientRecord> decode(
final List<KinesisClientRecord> records) {
final List<KinesisClientRecord> decodedRecords = new ArrayList<>();
for (final KinesisClientRecord record : records) {
final KinesisClientRecord decodedRecord = decodeRecord(record);
decodedRecords.add(decodedRecord);
}
return decodedRecords;
}
private KinesisClientRecord decodeRecord(final KinesisClientRecord record) {
if (record.data() == null) {
return record;
}
int length = record.data().remaining();
byte[] data = new byte[length];
record.data().get(data, 0, length);
try {
if (!isSchemaEncoded(data)) {
return record;
}
final Schema schema = glueSchemaRegistryDeserializer.getSchema(data);
final ByteBuffer recordData = ByteBuffer.wrap(glueSchemaRegistryDeserializer.getData(data));
return
record.toBuilder()
.schema(schema)
.data(recordData)
.build();
} catch (Exception e) {
log.warn("Unable to decode Glue Schema Registry information from record {}: ",
record.sequenceNumber(), e);
//We ignore Glue Schema Registry failures and return the record.
return record;
}
}
private boolean isSchemaEncoded(byte[] data) {
return glueSchemaRegistryDeserializer.canDeserialize(data);
}
}

View file

@ -60,7 +60,7 @@ import software.amazon.kinesis.processor.RecordProcessorCheckpointer;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.retrieval.AggregatorUtil;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.schemaregistry.SchemaRegistryDecoder;
@RunWith(MockitoJUnitRunner.class)
public class ConsumerStatesTest {
@ -117,6 +117,7 @@ public class ConsumerStatesTest {
private boolean ignoreUnexpectedChildShards = false;
private long idleTimeInMillis = 1000L;
private Optional<Long> logWarningForTaskAfterMillis = Optional.empty();
private SchemaRegistryDecoder schemaRegistryDecoder = null;
@Before
public void setup() {
@ -125,7 +126,7 @@ public class ConsumerStatesTest {
taskBackoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, listShardsBackoffTimeInMillis,
maxListShardsRetryAttempts, shouldCallProcessRecordsEvenForEmptyRecordList, idleTimeInMillis,
INITIAL_POSITION_IN_STREAM, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, shardDetector,
new AggregatorUtil(), hierarchicalShardSyncer, metricsFactory, leaseCleanupManager);
new AggregatorUtil(), hierarchicalShardSyncer, metricsFactory, leaseCleanupManager, schemaRegistryDecoder);
when(shardInfo.shardId()).thenReturn("shardId-000000000000");
when(shardInfo.streamIdentifierSerOpt()).thenReturn(Optional.of(StreamIdentifier.singleStreamInstance(STREAM_NAME).serialize()));
consumer = spy(new ShardConsumer(recordsPublisher, executorService, shardInfo, logWarningForTaskAfterMillis,

View file

@ -24,8 +24,11 @@ import static org.hamcrest.beans.HasPropertyWithValue.hasProperty;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -46,6 +49,9 @@ import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import com.amazonaws.services.schemaregistry.common.Schema;
import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDeserializer;
import com.google.common.collect.ImmutableList;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeDiagnosingMatcher;
@ -75,10 +81,13 @@ import software.amazon.kinesis.retrieval.ThrottlingReporter;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import software.amazon.kinesis.retrieval.kpl.Messages;
import software.amazon.kinesis.retrieval.kpl.Messages.AggregatedRecord;
import software.amazon.kinesis.schemaregistry.SchemaRegistryDecoder;
@RunWith(MockitoJUnitRunner.class)
public class ProcessTaskTest {
private static final long IDLE_TIME_IN_MILLISECONDS = 100L;
private static final Schema SCHEMA_REGISTRY_SCHEMA = new Schema("{}", "AVRO", "demoSchema");
private static final byte[] SCHEMA_REGISTRY_PAYLOAD = new byte[] {01, 05, 03, 05};
private boolean shouldCallProcessRecordsEvenForEmptyRecordList = true;
private boolean skipShardSyncAtWorkerInitializationIfLeasesExist = true;
@ -89,6 +98,9 @@ public class ProcessTaskTest {
@Mock
private ShardDetector shardDetector;
@Mock
private GlueSchemaRegistryDeserializer glueSchemaRegistryDeserializer;
private static final byte[] TEST_DATA = new byte[] { 1, 2, 3, 4 };
@ -117,14 +129,28 @@ public class ProcessTaskTest {
skipShardSyncAtWorkerInitializationIfLeasesExist);
}
private ProcessTask makeProcessTask(ProcessRecordsInput processRecordsInput, GlueSchemaRegistryDeserializer deserializer) {
return makeProcessTask(processRecordsInput, new AggregatorUtil(), skipShardSyncAtWorkerInitializationIfLeasesExist, new SchemaRegistryDecoder(deserializer));
}
private ProcessTask makeProcessTask(ProcessRecordsInput processRecordsInput, AggregatorUtil aggregatorUtil,
boolean skipShardSync) {
return new ProcessTask(shardInfo, shardRecordProcessor, checkpointer, taskBackoffTimeMillis,
skipShardSync, shardDetector, throttlingReporter,
processRecordsInput, shouldCallProcessRecordsEvenForEmptyRecordList, IDLE_TIME_IN_MILLISECONDS,
aggregatorUtil, new NullMetricsFactory());
return makeProcessTask(processRecordsInput, aggregatorUtil, skipShardSync, null);
}
private ProcessTask makeProcessTask(ProcessRecordsInput processRecordsInput, AggregatorUtil aggregatorUtil, boolean skipShardSync,
SchemaRegistryDecoder schemaRegistryDecoder) {
return new ProcessTask(shardInfo, shardRecordProcessor, checkpointer, taskBackoffTimeMillis,
skipShardSync, shardDetector, throttlingReporter,
processRecordsInput, shouldCallProcessRecordsEvenForEmptyRecordList, IDLE_TIME_IN_MILLISECONDS,
aggregatorUtil,
new NullMetricsFactory(),
schemaRegistryDecoder
);
}
@Test
public void testProcessTaskWithShardEndReached() {
@ -140,6 +166,11 @@ public class ProcessTaskTest {
.approximateArrivalTimestamp(arrival).data(ByteBuffer.wrap(TEST_DATA)).build();
}
private KinesisClientRecord makeKinesisClientRecord(String partitionKey, String sequenceNumber, Instant arrival, ByteBuffer data, Schema schema) {
return KinesisClientRecord.builder().partitionKey(partitionKey).sequenceNumber(sequenceNumber)
.approximateArrivalTimestamp(arrival).data(data).schema(schema).build();
}
@Test
public void testNonAggregatedKinesisRecord() {
final String sqn = new BigInteger(128, new Random()).toString();
@ -404,6 +435,139 @@ public class ProcessTaskTest {
assertThat(outcome.processRecordsCall.records(), equalTo(expectedRecords));
}
@Test
public void testProcessTask_WhenSchemaRegistryRecordsAreSent_ProcessesThemSuccessfully() {
processTask = makeProcessTask(processRecordsInput, glueSchemaRegistryDeserializer);
final BigInteger sqn = new BigInteger(128, new Random());
final BigInteger previousCheckpointSqn = BigInteger.valueOf(1);
final String pk = UUID.randomUUID().toString();
final Date ts = new Date(System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert(4, TimeUnit.HOURS));
//Payload set to SchemaRegistry encoded data and schema to null
//to mimic Schema Registry encoded message from Kinesis stream.
final KinesisClientRecord schemaRegistryRecord =
makeKinesisClientRecord(pk, sqn.toString(), ts.toInstant(), ByteBuffer.wrap(SCHEMA_REGISTRY_PAYLOAD), null);
final KinesisClientRecord nonSchemaRegistryRecord =
makeKinesisClientRecord(pk, sqn.toString(), ts.toInstant());
when(processRecordsInput.records())
.thenReturn(
ImmutableList.of(
schemaRegistryRecord,
nonSchemaRegistryRecord
)
);
doReturn(true).when(glueSchemaRegistryDeserializer).canDeserialize(SCHEMA_REGISTRY_PAYLOAD);
doReturn(TEST_DATA).when(glueSchemaRegistryDeserializer).getData(SCHEMA_REGISTRY_PAYLOAD);
doReturn(SCHEMA_REGISTRY_SCHEMA).when(glueSchemaRegistryDeserializer).getSchema(SCHEMA_REGISTRY_PAYLOAD);
ShardRecordProcessorOutcome outcome = testWithRecords(processTask, new ExtendedSequenceNumber(previousCheckpointSqn.toString(), 0L),
new ExtendedSequenceNumber(previousCheckpointSqn.add(previousCheckpointSqn).toString(), 1L));
KinesisClientRecord decodedSchemaRegistryRecord =
makeKinesisClientRecord(pk, sqn.toString(), ts.toInstant(), ByteBuffer.wrap(TEST_DATA), SCHEMA_REGISTRY_SCHEMA);
List<KinesisClientRecord> expectedRecords =
ImmutableList.of(
decodedSchemaRegistryRecord,
nonSchemaRegistryRecord
);
List<KinesisClientRecord> actualRecords = outcome.getProcessRecordsCall().records();
assertEquals(expectedRecords, actualRecords);
verify(glueSchemaRegistryDeserializer, times(1)).canDeserialize(SCHEMA_REGISTRY_PAYLOAD);
verify(glueSchemaRegistryDeserializer, times(1)).getSchema(SCHEMA_REGISTRY_PAYLOAD);
verify(glueSchemaRegistryDeserializer, times(1)).getData(SCHEMA_REGISTRY_PAYLOAD);
}
@Test
public void testProcessTask_WhenSchemaRegistryDecodeCheckFails_IgnoresRecord() {
processTask = makeProcessTask(processRecordsInput, glueSchemaRegistryDeserializer);
final BigInteger sqn = new BigInteger(128, new Random());
final BigInteger previousCheckpointSqn = BigInteger.valueOf(1);
final String pk = UUID.randomUUID().toString();
final Date ts = new Date(System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert(4, TimeUnit.HOURS));
//Payload set to SchemaRegistry encoded data and schema to null
//to mimic Schema Registry encoded message from Kinesis stream.
final KinesisClientRecord schemaRegistryRecord =
makeKinesisClientRecord(pk, sqn.toString(), ts.toInstant(), ByteBuffer.wrap(SCHEMA_REGISTRY_PAYLOAD), null);
final KinesisClientRecord nonSchemaRegistryRecord =
makeKinesisClientRecord(pk, sqn.toString(), ts.toInstant());
when(processRecordsInput.records())
.thenReturn(
ImmutableList.of(
schemaRegistryRecord,
nonSchemaRegistryRecord
)
);
doThrow(new RuntimeException("Invalid data"))
.when(glueSchemaRegistryDeserializer).canDeserialize(SCHEMA_REGISTRY_PAYLOAD);
ShardRecordProcessorOutcome outcome = testWithRecords(processTask, new ExtendedSequenceNumber(previousCheckpointSqn.toString(), 0L),
new ExtendedSequenceNumber(previousCheckpointSqn.add(previousCheckpointSqn).toString(), 1L));
List<KinesisClientRecord> expectedRecords =
ImmutableList.of(
schemaRegistryRecord,
nonSchemaRegistryRecord
);
List<KinesisClientRecord> actualRecords = outcome.getProcessRecordsCall().records();
assertEquals(expectedRecords, actualRecords);
}
@Test
public void testProcessTask_WhenSchemaRegistryDecodingFails_IgnoresRecord() {
processTask = makeProcessTask(processRecordsInput, glueSchemaRegistryDeserializer);
final BigInteger sqn = new BigInteger(128, new Random());
final BigInteger previousCheckpointSqn = BigInteger.valueOf(1);
final String pk = UUID.randomUUID().toString();
final Date ts = new Date(System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert(4, TimeUnit.HOURS));
//Payload set to SchemaRegistry encoded data and schema to null
//to mimic Schema Registry encoded message from Kinesis stream.
final KinesisClientRecord schemaRegistryRecord =
makeKinesisClientRecord(pk, sqn.toString(), ts.toInstant(), ByteBuffer.wrap(SCHEMA_REGISTRY_PAYLOAD), null);
final KinesisClientRecord nonSchemaRegistryRecord =
makeKinesisClientRecord(pk, sqn.toString(), ts.toInstant());
when(processRecordsInput.records())
.thenReturn(
ImmutableList.of(
schemaRegistryRecord,
nonSchemaRegistryRecord
)
);
doReturn(true)
.when(glueSchemaRegistryDeserializer).canDeserialize(SCHEMA_REGISTRY_PAYLOAD);
doThrow(new RuntimeException("Cannot decode data"))
.when(glueSchemaRegistryDeserializer).getData(SCHEMA_REGISTRY_PAYLOAD);
ShardRecordProcessorOutcome outcome = testWithRecords(processTask, new ExtendedSequenceNumber(previousCheckpointSqn.toString(), 0L),
new ExtendedSequenceNumber(previousCheckpointSqn.add(previousCheckpointSqn).toString(), 1L));
List<KinesisClientRecord> expectedRecords =
ImmutableList.of(
schemaRegistryRecord,
nonSchemaRegistryRecord
);
List<KinesisClientRecord> actualRecords = outcome.getProcessRecordsCall().records();
assertEquals(expectedRecords, actualRecords);
}
private KinesisClientRecord createAndRegisterAggregatedRecord(BigInteger sequenceNumber,
AggregatedRecord.Builder aggregatedRecord, int i, Instant approximateArrivalTime) {
byte[] dataArray = new byte[1024];