Press n or j to go to the next uncovered block, b, p or k for the previous block.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 | 1x 10x 10x 10x 10x 10x 8x 7x 2x 2x 3x 3x 1x 8x 8x 8x 8x 8x 1x 7x 6x 6x 5x 1x 1x 1x 4x 4x 1x 1x 3x 2x 8x | import { Handler } from "aws-lambda";
import { KafkaRecord, opensearch } from "shared-types";
import { KafkaEvent } from "shared-types";
import { decodeBase64WithUtf8 } from "shared-utils";
import { bulkUpdateDataWrapper, ErrorType, getTopic, logError } from "../libs/sink-lib";
export const handler: Handler<KafkaEvent> = async (event) => {
const loggableEvent = { ...event, records: "too large to display" };
try {
for (const topicPartition of Object.keys(event.records)) {
const topic = getTopic(topicPartition);
switch (topic) {
case "aws.seatool.debezium.cdc.SEA.dbo.Officers":
await officers(event.records[topicPartition], topicPartition);
break;
default:
logError({ type: ErrorType.BADTOPIC });
throw new Error(`topic (${topicPartition}) is invalid`);
}
}
} catch (error) {
logError({ type: ErrorType.UNKNOWN, metadata: { event: loggableEvent } });
throw error;
}
};
const officers = async (kafkaRecords: KafkaRecord[], topicPartition: string) => {
const docs: any[] = [];
for (const kafkaRecord of kafkaRecords) {
const { key, value } = kafkaRecord;
try {
// Handle delete events and continue
if (value === undefined) {
continue;
}
// Set id
const id: string = decodeBase64WithUtf8(key);
const decodedValue = Buffer.from(value, "base64").toString("utf-8");
const record = JSON.parse(decodedValue).payload.after;
// Handle tombstone events and continue
if (!record) {
console.log(`Tombstone detected for ${id}. Pushing delete record to os...`);
docs.push({
id,
delete: true,
});
continue;
}
const result = opensearch.cpocs.Officers.transform().safeParse(record);
if (!result.success) {
logError({
type: ErrorType.VALIDATION,
error: result?.error,
metadata: { topicPartition, kafkaRecord, record },
});
continue;
}
docs.push(result.data);
} catch (error) {
logError({
type: ErrorType.BADPARSE,
error,
metadata: { topicPartition, kafkaRecord },
});
}
}
await bulkUpdateDataWrapper(docs, "cpocs");
};
|