Press n or j to go to the next uncovered block, b, p or k for the previous block.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 | 1x 4x 4x 4x 4x 4x 4x 4x 1x 1x 1x 1x 1x 1x 1x | import { Handler } from "aws-lambda"; import { KafkaEvent } from "shared-types"; import { ErrorType, getTopic, logError } from "libs"; import { insertOneMacRecordsFromKafkaIntoMako, insertNewSeatoolRecordsFromKafkaIntoMako, syncSeatoolRecordDatesFromKafkaWithMako, } from "./sinkMainProcessors"; export const handler: Handler<KafkaEvent> = async (event) => { const prettifiedEventJSON = JSON.stringify(event, null, 2); console.log(`event: ${prettifiedEventJSON}`); try { await Promise.all( Object.entries(event.records).map(async ([topicPartition, records]) => { const topic = getTopic(topicPartition); console.log(`topic: ${topic}`); switch (topic) { case "aws.onemac.migration.cdc": return insertOneMacRecordsFromKafkaIntoMako(records, topicPartition); case "aws.seatool.ksql.onemac.three.agg.State_Plan": return insertNewSeatoolRecordsFromKafkaIntoMako(records, topicPartition); case "aws.seatool.debezium.changed_date.SEA.dbo.State_Plan": return syncSeatoolRecordDatesFromKafkaWithMako(records, topicPartition); default: logError({ type: ErrorType.BADTOPIC }); throw new Error(`topic (${topicPartition}) is invalid`); } }), ); } catch (error) { logError({ type: ErrorType.UNKNOWN, metadata: { event: prettifiedEventJSON } }); throw error; } }; |