Press n or j to go to the next uncovered block, b, p or k for the previous block.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 | 1x 4x 4x 4x 4x 4x 4x 4x 1x 1x 1x 1x 1x 1x 1x | import { Handler } from "aws-lambda";
import { ErrorType, getTopic, logError } from "libs";
import { KafkaEvent } from "shared-types";
import {
insertNewSeatoolRecordsFromKafkaIntoMako,
insertOneMacRecordsFromKafkaIntoMako,
syncSeatoolRecordDatesFromKafkaWithMako,
} from "./sinkMainProcessors";
export const handler: Handler<KafkaEvent> = async (event) => {
const prettifiedEventJSON = JSON.stringify(event, null, 2);
console.log(`event: ${prettifiedEventJSON}`);
try {
await Promise.all(
Object.entries(event.records).map(async ([topicPartition, records]) => {
const topic = getTopic(topicPartition);
console.log(`topic: ${topic}`);
switch (topic) {
case "aws.onemac.migration.cdc":
return insertOneMacRecordsFromKafkaIntoMako(records, topicPartition);
case "aws.seatool.ksql.onemac.three.agg.State_Plan":
return insertNewSeatoolRecordsFromKafkaIntoMako(records, topicPartition);
case "aws.seatool.debezium.changed_date.SEA.dbo.State_Plan":
return syncSeatoolRecordDatesFromKafkaWithMako(records, topicPartition);
default:
logError({ type: ErrorType.BADTOPIC });
throw new Error(`topic (${topicPartition}) is invalid`);
}
}),
);
} catch (error) {
logError({ type: ErrorType.UNKNOWN, metadata: { event: prettifiedEventJSON } });
throw error;
}
};
|