All files / lib/libs sink-lib.ts

100% Statements 34/34
100% Branches 15/15
100% Functions 8/8
100% Lines 33/33

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105  10x               66x     10x 10x 10x 10x 10x 10x     10x               10x                 52x                                   10x   52x 251x 251x 3x   248x         52x 192x 190x 486x   486x 140x 346x   3x           52x 52x             111x 111x   109x   3x 3x         3x      
import pino from "pino";
const logger = pino();
 
import { BaseIndex } from "shared-types/opensearch";
 
import * as os from "./opensearch-lib";
import { getDomainAndNamespace } from "./utils";
 
export function getTopic(topicPartition: string) {
  return topicPartition.split("--").pop()?.split("-").slice(0, -1)[0];
}
 
export enum ErrorType {
  VALIDATION = "validation",
  UNKNOWN = "unknown",
  BULKUPDATE = "bulkupdate",
  BADTOPIC = "badtopic",
  BADPARSE = "badparse",
}
 
const ErrorMessages = {
  [ErrorType.VALIDATION]: "A validation error occurred.",
  [ErrorType.UNKNOWN]: "An unknown error occurred.",
  [ErrorType.BULKUPDATE]: "An error occurred while bulk updating records.",
  [ErrorType.BADTOPIC]: "Topic is unknown, unsupported, or unable to be parsed.",
  [ErrorType.BADPARSE]: "An error occurred while parsing the record.",
};
 
export const logError = ({
  error,
  type,
  metadata = {},
}: {
  type: ErrorType;
  error?: Error | any;
  metadata?: Record<string, any>;
}): void => {
  logger.error(
    {
      error: error
        ? {
            message: error.message,
            stack: error.stack,
            ...error,
          }
        : {},
      custom: prettyPrintJsonInObject({
        type,
        metadata,
      }),
    },
    ErrorMessages[type],
  );
};
 
const prettyPrintJsonInObject = (obj: any): any => {
  // Helper function to check if a string is JSON
  const isJsonString = (str: string): boolean => {
    try {
      JSON.parse(str);
      return true;
    } catch {
      return false;
    }
  };
 
  // Recursive function to traverse and pretty-print JSON strings
  const traverseAndPrettyPrint = (element: any): any => {
    if (element && typeof element === "object") {
      Object.keys(element).forEach((key) => {
        const value = element[key];
        // If the value is an object, recurse into it
        if (typeof value === "object") {
          traverseAndPrettyPrint(value);
        } else if (typeof value === "string" && isJsonString(value)) {
          // Pretty print the JSON string
          element[key] = JSON.stringify(JSON.parse(value), null, 2);
        }
      });
    }
  };
 
  traverseAndPrettyPrint(obj);
  return obj;
};
 
export async function bulkUpdateDataWrapper(
  docs: { id: string; [key: string]: unknown }[],
  baseIndex: BaseIndex,
) {
  try {
    const { domain, index } = getDomainAndNamespace(baseIndex);
 
    await os.bulkUpdateData(domain, index, docs);
  } catch (error) {
    console.log({ error });
    logError({
      type: ErrorType.BULKUPDATE,
      error,
    });
 
    throw error;
  }
}