All files / lib/libs sink-lib.ts

100% Statements 34/34
100% Branches 15/15
100% Functions 8/8
100% Lines 33/33

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105  32x               69x     32x 32x 32x 32x 32x 32x     32x               32x                 52x                                   32x   52x 251x 251x 3x   248x         52x 192x 190x 486x   486x 140x 346x   3x           52x 52x             183x 183x   181x   3x 3x         3x      
import pino from "pino";
const logger = pino();
 
import { BaseIndex } from "shared-types/opensearch";
 
import * as os from "./opensearch-lib";
import { getDomainAndNamespace } from "./utils";
 
export function getTopic(topicPartition: string) {
  return topicPartition.split("--").pop()?.split("-").slice(0, -1)[0];
}
 
export enum ErrorType {
  VALIDATION = "validation",
  UNKNOWN = "unknown",
  BULKUPDATE = "bulkupdate",
  BADTOPIC = "badtopic",
  BADPARSE = "badparse",
}
 
const ErrorMessages = {
  [ErrorType.VALIDATION]: "A validation error occurred.",
  [ErrorType.UNKNOWN]: "An unknown error occurred.",
  [ErrorType.BULKUPDATE]: "An error occurred while bulk updating records.",
  [ErrorType.BADTOPIC]: "Topic is unknown, unsupported, or unable to be parsed.",
  [ErrorType.BADPARSE]: "An error occurred while parsing the record.",
};
 
export const logError = ({
  error,
  type,
  metadata = {},
}: {
  type: ErrorType;
  error?: Error | any;
  metadata?: Record<string, any>;
}): void => {
  logger.error(
    {
      error: error
        ? {
            message: error.message,
            stack: error.stack,
            ...error,
          }
        : {},
      custom: prettyPrintJsonInObject({
        type,
        metadata,
      }),
    },
    ErrorMessages[type],
  );
};
 
const prettyPrintJsonInObject = (obj: any): any => {
  // Helper function to check if a string is JSON
  const isJsonString = (str: string): boolean => {
    try {
      JSON.parse(str);
      return true;
    } catch {
      return false;
    }
  };
 
  // Recursive function to traverse and pretty-print JSON strings
  const traverseAndPrettyPrint = (element: any): any => {
    if (element && typeof element === "object") {
      Object.keys(element).forEach((key) => {
        const value = element[key];
        // If the value is an object, recurse into it
        if (typeof value === "object") {
          traverseAndPrettyPrint(value);
        } else if (typeof value === "string" && isJsonString(value)) {
          // Pretty print the JSON string
          element[key] = JSON.stringify(JSON.parse(value), null, 2);
        }
      });
    }
  };
 
  traverseAndPrettyPrint(obj);
  return obj;
};
 
export async function bulkUpdateDataWrapper(
  docs: { id: string; [key: string]: unknown }[],
  baseIndex: BaseIndex,
) {
  try {
    const { domain, index } = getDomainAndNamespace(baseIndex);
 
    await os.bulkUpdateData(domain, index, docs);
  } catch (error) {
    console.log({ error });
    logError({
      type: ErrorType.BULKUPDATE,
      error,
    });
 
    throw error;
  }
}