Press n or j to go to the next uncovered block, b, p or k for the previous block.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 | 1x 10x 10x 10x 10x 10x 8x 7x 2x 2x 3x 3x 1x 8x 8x 8x 8x 8x 1x 7x 6x 6x 5x 1x 1x 1x 4x 4x 1x 1x 3x 2x 8x | import { Handler } from "aws-lambda"; import { KafkaRecord, opensearch } from "shared-types"; import { KafkaEvent } from "shared-types"; import { ErrorType, bulkUpdateDataWrapper, getTopic, logError } from "../libs/sink-lib"; import { decodeBase64WithUtf8 } from "shared-utils"; export const handler: Handler<KafkaEvent> = async (event) => { const loggableEvent = { ...event, records: "too large to display" }; try { for (const topicPartition of Object.keys(event.records)) { const topic = getTopic(topicPartition); switch (topic) { case "aws.seatool.debezium.cdc.SEA.dbo.Officers": await officers(event.records[topicPartition], topicPartition); break; default: logError({ type: ErrorType.BADTOPIC }); throw new Error(`topic (${topicPartition}) is invalid`); } } } catch (error) { logError({ type: ErrorType.UNKNOWN, metadata: { event: loggableEvent } }); throw error; } }; const officers = async (kafkaRecords: KafkaRecord[], topicPartition: string) => { const docs: any[] = []; for (const kafkaRecord of kafkaRecords) { const { key, value } = kafkaRecord; try { // Handle delete events and continue if (value === undefined) { continue; } // Set id const id: string = decodeBase64WithUtf8(key); const decodedValue = Buffer.from(value, "base64").toString("utf-8"); const record = JSON.parse(decodedValue).payload.after; // Handle tombstone events and continue if (!record) { console.log(`Tombstone detected for ${id}. Pushing delete record to os...`); docs.push({ id, delete: true, }); continue; } const result = opensearch.cpocs.Officers.transform().safeParse(record); if (!result.success) { logError({ type: ErrorType.VALIDATION, error: result?.error, metadata: { topicPartition, kafkaRecord, record }, }); continue; } docs.push(result.data); } catch (error) { logError({ type: ErrorType.BADPARSE, error, metadata: { topicPartition, kafkaRecord }, }); } } await bulkUpdateDataWrapper(docs, "cpocs"); }; |