All files / lib/lambda sinkLegacyInsights.ts

100% Statements 27/27
100% Branches 8/8
100% Functions 2/2
100% Lines 26/26

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63          1x 7x 7x 7x 7x 7x   5x 5x   2x 2x       2x 2x       1x 5x 5x 5x 5x 5x 4x 1x       1x   3x 3x 2x                         1x             5x    
import { Handler } from "aws-lambda";
import { decodeBase64WithUtf8 } from "shared-utils";
import { KafkaEvent, KafkaRecord } from "shared-types";
import { ErrorType, bulkUpdateDataWrapper, getTopic, logError } from "../libs/sink-lib";
 
export const handler: Handler<KafkaEvent> = async (event) => {
  const loggableEvent = { ...event, records: "too large to display" };
  try {
    for (const topicPartition of Object.keys(event.records)) {
      const topic = getTopic(topicPartition);
      switch (topic) {
        case "aws.onemac.migration.cdc":
          await onemac(event.records[topicPartition], topicPartition);
          break;
        default:
          logError({ type: ErrorType.BADTOPIC });
          throw new Error(`topic (${topicPartition}) is invalid`);
      }
    }
  } catch (error) {
    logError({ type: ErrorType.UNKNOWN, metadata: { event: loggableEvent } });
    throw error;
  }
};
 
const onemac = async (kafkaRecords: KafkaRecord[], topicPartition: string) => {
  const docs: any[] = [];
  for (const kafkaRecord of kafkaRecords) {
    const { key, value, offset } = kafkaRecord;
    try {
      const id: string = decodeBase64WithUtf8(key);
      if (!value) {
        docs.push({
          id,
          hardDeletedFromLegacy: true,
        });
        continue;
      }
      const record = JSON.parse(decodeBase64WithUtf8(value));
      if (!record.sk) continue;
      docs.push({
        ...record,
        id: record.sk === "Package" ? id : offset.toString(),
        approvedEffectiveDate: null,
        changedDate: null,
        finalDispositionDate: null,
        proposedDate: null,
        proposedEffectiveDate: null,
        statusDate: null,
        submissionDate: null,
        hardDeletedFromLegacy: null,
      });
    } catch (error) {
      logError({
        type: ErrorType.BADPARSE,
        error,
        metadata: { topicPartition, kafkaRecord },
      });
    }
  }
  await bulkUpdateDataWrapper(docs, "legacyinsights");
};