Press n or j to go to the next uncovered block, b, p or k for the previous block.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 | 1x 5x 5x 5x 5x 5x 3x 3x 2x 2x 2x 2x 1x 3x 3x 3x 3x 3x 2x 1x 1x 1x 3x | import { Handler } from "aws-lambda"; import { decodeBase64WithUtf8 } from "shared-utils"; import { KafkaEvent, KafkaRecord } from "shared-types"; import { ErrorType, bulkUpdateDataWrapper, getTopic, logError } from "../libs/sink-lib"; export const handler: Handler<KafkaEvent> = async (event) => { const loggableEvent = { ...event, records: "too large to display" }; try { for (const topicPartition of Object.keys(event.records)) { const topic = getTopic(topicPartition); switch (topic) { case "aws.seatool.ksql.onemac.three.agg.State_Plan": await ksql(event.records[topicPartition], topicPartition); break; default: logError({ type: ErrorType.BADTOPIC }); throw new Error(`topic (${topicPartition}) is invalid`); } } } catch (error) { logError({ type: ErrorType.UNKNOWN, metadata: { event: loggableEvent } }); throw error; } }; const ksql = async (kafkaRecords: KafkaRecord[], topicPartition: string) => { const docs: any[] = []; for (const kafkaRecord of kafkaRecords) { const { key, value } = kafkaRecord; try { if (!value) continue; const id: string = decodeBase64WithUtf8(key); const record = JSON.parse(decodeBase64WithUtf8(value)); docs.push({ ...record, id }); } catch (error) { logError({ type: ErrorType.BADPARSE, error, metadata: { topicPartition, kafkaRecord }, }); } } await bulkUpdateDataWrapper(docs, "insights"); }; |