All files / lib/lambda sinkInsights.ts

100% Statements 24/24
100% Branches 4/4
100% Functions 2/2
100% Lines 23/23

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46          1x 5x 5x 5x 5x 5x   3x 3x   2x 2x       2x 2x       1x 3x 3x 3x 3x 3x   2x 1x 1x   1x             3x    
import { Handler } from "aws-lambda";
import { decodeBase64WithUtf8 } from "shared-utils";
import { KafkaEvent, KafkaRecord } from "shared-types";
import { ErrorType, bulkUpdateDataWrapper, getTopic, logError } from "../libs/sink-lib";
 
export const handler: Handler<KafkaEvent> = async (event) => {
  const loggableEvent = { ...event, records: "too large to display" };
  try {
    for (const topicPartition of Object.keys(event.records)) {
      const topic = getTopic(topicPartition);
      switch (topic) {
        case "aws.seatool.ksql.onemac.three.agg.State_Plan":
          await ksql(event.records[topicPartition], topicPartition);
          break;
        default:
          logError({ type: ErrorType.BADTOPIC });
          throw new Error(`topic (${topicPartition}) is invalid`);
      }
    }
  } catch (error) {
    logError({ type: ErrorType.UNKNOWN, metadata: { event: loggableEvent } });
    throw error;
  }
};
 
const ksql = async (kafkaRecords: KafkaRecord[], topicPartition: string) => {
  const docs: any[] = [];
  for (const kafkaRecord of kafkaRecords) {
    const { key, value } = kafkaRecord;
    try {
      if (!value) continue;
 
      const id: string = decodeBase64WithUtf8(key);
      const record = JSON.parse(decodeBase64WithUtf8(value));
      docs.push({ ...record, id });
    } catch (error) {
      logError({
        type: ErrorType.BADPARSE,
        error,
        metadata: { topicPartition, kafkaRecord },
      });
    }
  }
  await bulkUpdateDataWrapper(docs, "insights");
};