All files / lib/lambda sinkCpocs.ts

100% Statements 33/33
100% Branches 8/8
100% Functions 2/2
100% Lines 33/33

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75            1x 10x 10x 10x 10x 10x   8x 7x   2x 2x       3x 3x       1x 8x 8x 8x   8x   8x 1x       7x   6x 6x     5x 1x 1x       1x     4x 4x 1x         1x   3x   2x               8x    
import { Handler } from "aws-lambda";
import { KafkaRecord, opensearch } from "shared-types";
import { KafkaEvent } from "shared-types";
import { ErrorType, bulkUpdateDataWrapper, getTopic, logError } from "../libs/sink-lib";
import { decodeBase64WithUtf8 } from "shared-utils";
 
export const handler: Handler<KafkaEvent> = async (event) => {
  const loggableEvent = { ...event, records: "too large to display" };
  try {
    for (const topicPartition of Object.keys(event.records)) {
      const topic = getTopic(topicPartition);
      switch (topic) {
        case "aws.seatool.debezium.cdc.SEA.dbo.Officers":
          await officers(event.records[topicPartition], topicPartition);
          break;
        default:
          logError({ type: ErrorType.BADTOPIC });
          throw new Error(`topic (${topicPartition}) is invalid`);
      }
    }
  } catch (error) {
    logError({ type: ErrorType.UNKNOWN, metadata: { event: loggableEvent } });
    throw error;
  }
};
 
const officers = async (kafkaRecords: KafkaRecord[], topicPartition: string) => {
  const docs: any[] = [];
  for (const kafkaRecord of kafkaRecords) {
    const { key, value } = kafkaRecord;
 
    try {
      // Handle delete events and continue
      if (value === undefined) {
        continue;
      }
 
      // Set id
      const id: string = decodeBase64WithUtf8(key);
 
      const decodedValue = Buffer.from(value, "base64").toString("utf-8");
      const record = JSON.parse(decodedValue).payload.after;
 
      // Handle tombstone events and continue
      if (!record) {
        console.log(`Tombstone detected for ${id}.  Pushing delete record to os...`);
        docs.push({
          id,
          delete: true,
        });
        continue;
      }
 
      const result = opensearch.cpocs.Officers.transform().safeParse(record);
      if (!result.success) {
        logError({
          type: ErrorType.VALIDATION,
          error: result?.error,
          metadata: { topicPartition, kafkaRecord, record },
        });
        continue;
      }
      docs.push(result.data);
    } catch (error) {
      logError({
        type: ErrorType.BADPARSE,
        error,
        metadata: { topicPartition, kafkaRecord },
      });
    }
  }
 
  await bulkUpdateDataWrapper(docs, "cpocs");
};