All files / lib/lambda sinkMain.ts

100% Statements 15/15
100% Branches 4/4
100% Functions 2/2
100% Lines 15/15

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44                  1x 4x   4x   4x 4x   4x   4x   4x   1x     1x     1x     1x 1x         1x   1x      
import { Handler } from "aws-lambda";
import { KafkaEvent } from "shared-types";
import { ErrorType, getTopic, logError } from "libs";
import {
  insertOneMacRecordsFromKafkaIntoMako,
  insertNewSeatoolRecordsFromKafkaIntoMako,
  syncSeatoolRecordDatesFromKafkaWithMako,
} from "./sinkMainProcessors";
 
export const handler: Handler<KafkaEvent> = async (event) => {
  const prettifiedEventJSON = JSON.stringify(event, null, 2);
 
  console.log(`event: ${prettifiedEventJSON}`);
 
  try {
    await Promise.all(
      Object.entries(event.records).map(async ([topicPartition, records]) => {
        const topic = getTopic(topicPartition);
 
        console.log(`topic: ${topic}`);
 
        switch (topic) {
          case "aws.onemac.migration.cdc":
            return insertOneMacRecordsFromKafkaIntoMako(records, topicPartition);
 
          case "aws.seatool.ksql.onemac.three.agg.State_Plan":
            return insertNewSeatoolRecordsFromKafkaIntoMako(records, topicPartition);
 
          case "aws.seatool.debezium.changed_date.SEA.dbo.State_Plan":
            return syncSeatoolRecordDatesFromKafkaWithMako(records, topicPartition);
 
          default:
            logError({ type: ErrorType.BADTOPIC });
            throw new Error(`topic (${topicPartition}) is invalid`);
        }
      }),
    );
  } catch (error) {
    logError({ type: ErrorType.UNKNOWN, metadata: { event: prettifiedEventJSON } });
 
    throw error;
  }
};