Merge 2196a053fb into 7032ea67ec
This commit is contained in:
commit
9eb679c0d1
1 changed files with 94 additions and 59 deletions
|
|
@ -14,6 +14,7 @@
|
|||
*/
|
||||
package com.amazonaws.services.kinesis.clientlibrary.types;
|
||||
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.math.BigInteger;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.security.MessageDigest;
|
||||
|
|
@ -206,7 +207,6 @@ public class UserRecord extends Record {
|
|||
|
||||
for (Record r : records) {
|
||||
boolean isAggregated = true;
|
||||
long subSeqNum = 0;
|
||||
ByteBuffer bb = r.getData();
|
||||
|
||||
if (bb.remaining() >= magic.length) {
|
||||
|
|
@ -215,7 +215,8 @@ public class UserRecord extends Record {
|
|||
isAggregated = false;
|
||||
}
|
||||
|
||||
if (!Arrays.equals(AGGREGATED_RECORD_MAGIC, magic) || bb.remaining() <= DIGEST_SIZE) {
|
||||
if (!Arrays.equals(AGGREGATED_RECORD_MAGIC, magic)
|
||||
|| bb.remaining() <= DIGEST_SIZE) {
|
||||
isAggregated = false;
|
||||
}
|
||||
|
||||
|
|
@ -232,68 +233,12 @@ public class UserRecord extends Record {
|
|||
isAggregated = false;
|
||||
} else {
|
||||
try {
|
||||
Messages.AggregatedRecord ar = Messages.AggregatedRecord.parseFrom(messageData);
|
||||
List<String> pks = ar.getPartitionKeyTableList();
|
||||
List<String> ehks = ar.getExplicitHashKeyTableList();
|
||||
long aat = r.getApproximateArrivalTimestamp() == null
|
||||
? -1 : r.getApproximateArrivalTimestamp().getTime();
|
||||
try {
|
||||
int recordsInCurrRecord = 0;
|
||||
for (Messages.Record mr : ar.getRecordsList()) {
|
||||
String explicitHashKey = null;
|
||||
String partitionKey = pks.get((int) mr.getPartitionKeyIndex());
|
||||
if (mr.hasExplicitHashKeyIndex()) {
|
||||
explicitHashKey = ehks.get((int) mr.getExplicitHashKeyIndex());
|
||||
}
|
||||
|
||||
BigInteger effectiveHashKey = explicitHashKey != null
|
||||
? new BigInteger(explicitHashKey)
|
||||
: new BigInteger(1, md5(partitionKey.getBytes("UTF-8")));
|
||||
|
||||
if (effectiveHashKey.compareTo(startingHashKey) < 0
|
||||
|| effectiveHashKey.compareTo(endingHashKey) > 0) {
|
||||
for (int toRemove = 0; toRemove < recordsInCurrRecord; ++toRemove) {
|
||||
result.remove(result.size() - 1);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
++recordsInCurrRecord;
|
||||
Record record = new Record()
|
||||
.withData(ByteBuffer.wrap(mr.getData().toByteArray()))
|
||||
.withPartitionKey(partitionKey)
|
||||
.withSequenceNumber(r.getSequenceNumber())
|
||||
.withApproximateArrivalTimestamp(aat < 0 ? null : new Date(aat));
|
||||
result.add(new UserRecord(true, record, subSeqNum++, explicitHashKey));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("Unexpected exception during deaggregation, record was:\n");
|
||||
sb.append("PKS:\n");
|
||||
for (String s : pks) {
|
||||
sb.append(s).append("\n");
|
||||
}
|
||||
sb.append("EHKS: \n");
|
||||
for (String s : ehks) {
|
||||
sb.append(s).append("\n");
|
||||
}
|
||||
for (Messages.Record mr : ar.getRecordsList()) {
|
||||
sb.append("Record: [hasEhk=").append(mr.hasExplicitHashKeyIndex()).append(", ")
|
||||
.append("ehkIdx=").append(mr.getExplicitHashKeyIndex()).append(", ")
|
||||
.append("pkIdx=").append(mr.getPartitionKeyIndex()).append(", ")
|
||||
.append("dataLen=").append(mr.getData().toByteArray().length).append("]\n");
|
||||
}
|
||||
sb.append("Sequence number: ").append(r.getSequenceNumber()).append("\n")
|
||||
.append("Raw data: ")
|
||||
.append(javax.xml.bind.DatatypeConverter.printBase64Binary(messageData)).append("\n");
|
||||
LOG.error(sb.toString(), e);
|
||||
}
|
||||
deaggregateRecords(messageData, r, startingHashKey, endingHashKey);
|
||||
} catch (InvalidProtocolBufferException e) {
|
||||
isAggregated = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!isAggregated) {
|
||||
bb.rewind();
|
||||
result.add(new UserRecord(r));
|
||||
|
|
@ -301,5 +246,95 @@ public class UserRecord extends Record {
|
|||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private static List<UserRecord> deaggregateRecords(
|
||||
byte[] messageData,
|
||||
Record record,
|
||||
BigInteger startingHashKey,
|
||||
BigInteger endingHashKey
|
||||
) throws InvalidProtocolBufferException{
|
||||
Messages.AggregatedRecord aggregatedRecord = Messages.AggregatedRecord.parseFrom(messageData);
|
||||
List<UserRecord> results = new ArrayList<>();
|
||||
|
||||
Date arrivalTimeStamp = record.getApproximateArrivalTimestamp();
|
||||
String sequenceNumber = record.getSequenceNumber();
|
||||
|
||||
List<String> partitionKeyTable = aggregatedRecord.getPartitionKeyTableList();
|
||||
List<String> explicitHashKeyTable = aggregatedRecord.getExplicitHashKeyTableList();
|
||||
|
||||
try {
|
||||
long subSeqNum = 0;
|
||||
|
||||
for (Messages.Record subRecord : aggregatedRecord.getRecordsList()) {
|
||||
String explicitHashKey = subRecord.hasExplicitHashKeyIndex() ?
|
||||
explicitHashKeyTable.get((int) subRecord.getExplicitHashKeyIndex()) : null;
|
||||
|
||||
String partitionKey = partitionKeyTable.get((int) subRecord.getPartitionKeyIndex());
|
||||
|
||||
|
||||
BigInteger effectiveHashKey = explicitHashKey != null
|
||||
? new BigInteger(explicitHashKey)
|
||||
: new BigInteger(1, md5(partitionKey.getBytes("UTF-8")));
|
||||
|
||||
//if hash key is in invalid range, remove all records prior and exit loop
|
||||
if (effectiveHashKey.compareTo(startingHashKey) < 0
|
||||
|| effectiveHashKey.compareTo(endingHashKey) > 0) {
|
||||
for (int toRemove = 0; toRemove < subSeqNum; ++toRemove) {
|
||||
results.remove(results.size() - 1);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
Record newRecord = new Record()
|
||||
.withData(ByteBuffer.wrap(subRecord.getData().toByteArray()))
|
||||
.withPartitionKey(partitionKey)
|
||||
.withSequenceNumber(sequenceNumber)
|
||||
.withApproximateArrivalTimestamp(arrivalTimeStamp);
|
||||
results.add(new UserRecord(true, newRecord, subSeqNum++, explicitHashKey));
|
||||
}
|
||||
} catch (UnsupportedEncodingException e) {
|
||||
logUnsupportedEncodingException(
|
||||
e,
|
||||
partitionKeyTable,
|
||||
explicitHashKeyTable,
|
||||
aggregatedRecord,
|
||||
record,
|
||||
messageData
|
||||
);
|
||||
return null;
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
||||
private static void logUnsupportedEncodingException(
|
||||
UnsupportedEncodingException e,
|
||||
List<String> partitionKeyTable,
|
||||
List<String> explicitHashKeyTable,
|
||||
Messages.AggregatedRecord aggregatedRecord,
|
||||
Record record,
|
||||
byte[] messageData
|
||||
) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("Unexpected exception during deaggregation, record was:\n");
|
||||
sb.append("PKS:\n");
|
||||
for (String s : partitionKeyTable) {
|
||||
sb.append(s).append("\n");
|
||||
}
|
||||
sb.append("EHKS: \n");
|
||||
for (String s : explicitHashKeyTable) {
|
||||
sb.append(s).append("\n");
|
||||
}
|
||||
for (Messages.Record mr : aggregatedRecord.getRecordsList()) {
|
||||
sb.append("Record: [hasEhk=").append(mr.hasExplicitHashKeyIndex()).append(", ")
|
||||
.append("ehkIdx=").append(mr.getExplicitHashKeyIndex()).append(", ")
|
||||
.append("pkIdx=").append(mr.getPartitionKeyIndex()).append(", ")
|
||||
.append("dataLen=").append(mr.getData().toByteArray().length).append("]\n");
|
||||
}
|
||||
sb.append("Sequence number: ").append(record.getSequenceNumber()).append("\n")
|
||||
.append("Raw data: ")
|
||||
.append(javax.xml.bind.DatatypeConverter.printBase64Binary(messageData)).append("\n");
|
||||
LOG.error(sb.toString(), e);
|
||||
|
||||
}
|
||||
// CHECKSTYLE:ON NPathComplexity
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue