Press n or j to go to the next uncovered block, b, p or k for the previous block.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 | 1x 7x 7x 7x 7x 7x 5x 5x 2x 2x 2x 2x 1x 5x 5x 5x 5x 5x 4x 3x 1x 2x 2x 1x 1x 1x 2x 5x | import { Handler } from "aws-lambda";
import { KafkaRecord, opensearch } from "shared-types";
import { KafkaEvent } from "shared-types";
import { bulkUpdateDataWrapper, ErrorType, getTopic, logError } from "../libs/sink-lib";
export const handler: Handler<KafkaEvent> = async (event) => {
const loggableEvent = { ...event, records: "too large to display" };
try {
for (const topicPartition of Object.keys(event.records)) {
const topic = getTopic(topicPartition);
switch (topic) {
case "aws.seatool.debezium.cdc.SEA.dbo.Type":
await subtypes(event.records[topicPartition], topicPartition);
break;
default:
logError({ type: ErrorType.BADTOPIC });
throw new Error(`topic (${topicPartition}) is invalid`);
}
}
} catch (error) {
logError({ type: ErrorType.UNKNOWN, metadata: { event: loggableEvent } });
throw error;
}
};
const subtypes = async (kafkaRecords: KafkaRecord[], topicPartition: string) => {
const docs: any[] = [];
for (const kafkaRecord of kafkaRecords) {
const { value } = kafkaRecord;
try {
const decodedValue = Buffer.from(value, "base64").toString("utf-8");
const record = JSON.parse(decodedValue).payload.after;
if (!record) {
continue;
}
const result = opensearch.subtypes.Type.transform().safeParse(record);
if (!result.success) {
logError({
type: ErrorType.VALIDATION,
error: result?.error,
metadata: { topicPartition, kafkaRecord, record },
});
continue;
}
docs.push(result.data);
} catch (error) {
logError({
type: ErrorType.BADPARSE,
error,
metadata: { topicPartition, kafkaRecord },
});
}
}
await bulkUpdateDataWrapper(docs, "subtypes");
};
|