Press n or j to go to the next uncovered block, b, p or k for the previous block.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 | 1x 7x 7x 7x 7x 7x 5x 5x 2x 2x 2x 2x 1x 5x 5x 5x 5x 5x 4x 3x 1x 2x 2x 1x 1x 1x 2x 5x | import { Handler } from "aws-lambda"; import { KafkaRecord, opensearch } from "shared-types"; import { KafkaEvent } from "shared-types"; import { ErrorType, bulkUpdateDataWrapper, getTopic, logError } from "../libs/sink-lib"; export const handler: Handler<KafkaEvent> = async (event) => { const loggableEvent = { ...event, records: "too large to display" }; try { for (const topicPartition of Object.keys(event.records)) { const topic = getTopic(topicPartition); switch (topic) { case "aws.seatool.debezium.cdc.SEA.dbo.SPA_Type": await types(event.records[topicPartition], topicPartition); break; default: logError({ type: ErrorType.BADTOPIC }); throw new Error(`topic (${topicPartition}) is invalid`); } } } catch (error) { logError({ type: ErrorType.UNKNOWN, metadata: { event: loggableEvent } }); throw error; } }; const types = async (kafkaRecords: KafkaRecord[], topicPartition: string) => { const docs: any[] = []; for (const kafkaRecord of kafkaRecords) { const { value } = kafkaRecord; try { const decodedValue = Buffer.from(value, "base64").toString("utf-8"); const record = JSON.parse(decodedValue).payload.after; if (!record) { continue; } const result = opensearch.types.SPA_Type.transform().safeParse(record); if (!result.success) { logError({ type: ErrorType.VALIDATION, error: result?.error, metadata: { topicPartition, kafkaRecord, record }, }); continue; } docs.push(result.data); } catch (error) { logError({ type: ErrorType.BADPARSE, error, metadata: { topicPartition, kafkaRecord }, }); } } await bulkUpdateDataWrapper(docs, "types"); }; |