All files / lib/libs sink-lib.ts

100% Statements 34/34
100% Branches 15/15
100% Functions 8/8
100% Lines 33/33

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105  39x               71x     39x 39x 39x 39x 39x 39x     39x               39x                 53x                                   39x   53x 300x 300x 5x   295x         53x 230x 222x 604x   604x 177x 427x   5x           53x 53x             194x 194x   192x   3x 3x         3x      
import pino from "pino";
const logger = pino();
 
import { BaseIndex } from "shared-types/opensearch";
 
import * as os from "./opensearch-lib";
import { getDomainAndNamespace } from "./utils";
 
export function getTopic(topicPartition: string) {
  return topicPartition.split("--").pop()?.split("-").slice(0, -1)[0];
}
 
export enum ErrorType {
  VALIDATION = "validation",
  UNKNOWN = "unknown",
  BULKUPDATE = "bulkupdate",
  BADTOPIC = "badtopic",
  BADPARSE = "badparse",
}
 
const ErrorMessages = {
  [ErrorType.VALIDATION]: "A validation error occurred.",
  [ErrorType.UNKNOWN]: "An unknown error occurred.",
  [ErrorType.BULKUPDATE]: "An error occurred while bulk updating records.",
  [ErrorType.BADTOPIC]: "Topic is unknown, unsupported, or unable to be parsed.",
  [ErrorType.BADPARSE]: "An error occurred while parsing the record.",
};
 
export const logError = ({
  error,
  type,
  metadata = {},
}: {
  type: ErrorType;
  error?: Error | any;
  metadata?: Record<string, any>;
}): void => {
  logger.error(
    {
      error: error
        ? {
            message: error.message,
            stack: error.stack,
            ...error,
          }
        : {},
      custom: prettyPrintJsonInObject({
        type,
        metadata,
      }),
    },
    ErrorMessages[type],
  );
};
 
const prettyPrintJsonInObject = (obj: any): any => {
  // Helper function to check if a string is JSON
  const isJsonString = (str: string): boolean => {
    try {
      JSON.parse(str);
      return true;
    } catch {
      return false;
    }
  };
 
  // Recursive function to traverse and pretty-print JSON strings
  const traverseAndPrettyPrint = (element: any): any => {
    if (element && typeof element === "object") {
      Object.keys(element).forEach((key) => {
        const value = element[key];
        // If the value is an object, recurse into it
        if (typeof value === "object") {
          traverseAndPrettyPrint(value);
        } else if (typeof value === "string" && isJsonString(value)) {
          // Pretty print the JSON string
          element[key] = JSON.stringify(JSON.parse(value), null, 2);
        }
      });
    }
  };
 
  traverseAndPrettyPrint(obj);
  return obj;
};
 
export async function bulkUpdateDataWrapper(
  docs: { id: string; [key: string]: unknown }[],
  baseIndex: BaseIndex,
) {
  try {
    const { domain, index } = getDomainAndNamespace(baseIndex);
 
    await os.bulkUpdateData(domain, index, docs);
  } catch (error) {
    console.log({ error });
    logError({
      type: ErrorType.BULKUPDATE,
      error,
    });
 
    throw error;
  }
}