All files / lib/libs topics-lib.ts

95.34% Statements 41/43
75% Branches 6/8
80% Functions 8/10
97.43% Lines 38/39

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132                    1x 1x   1x         1x   1x 1x     3x   1x     1x       1x     1x     7x           1x       9x         2x           3x             1x             1x 1x 1x 1x     1x     1x 1x     1x     1x         2x 2x 1x           1x   1x               1x   1x   1x   1x       1x 1x         1x    
import { ConfigResourceTypes, Kafka } from "kafkajs";
import * as _ from "lodash";
 
interface TopicConfig {
  topic: string;
  numPartitions: number;
  // Add other properties as needed
}
 
export async function createTopics(brokerString: string, topicsConfig: TopicConfig[]) {
  const topics = topicsConfig;
  const brokers = brokerString.split(",");
 
  const kafka = new Kafka({
    clientId: "admin",
    brokers: brokers,
    ssl: true,
  });
  const admin = kafka.admin();
 
  const create = async () => {
    await admin.connect();
 
    // Fetch topics from MSK and filter out __ internal management topic
    const existingTopicList = _.filter(await admin.listTopics(), (n) => !n.startsWith("_"));
 
    console.log("Existing topics:", JSON.stringify(existingTopicList, null, 2));
 
    // Fetch the metadata for the topics in MSK
    const topicsMetadata = _.get(
      await admin.fetchTopicMetadata({ topics: existingTopicList }),
      "topics",
    );
    console.log("Topics Metadata:", JSON.stringify(topicsMetadata, null, 2));
 
    // Diff the existing topics array with the topic configuration collection
    const topicsToCreate = _.differenceWith(
      topics,
      existingTopicList,
      (topicConfig, topic) => _.get(topicConfig, "topic") === topic,
    );
 
    // Find intersection of topics metadata collection with topic configuration collection
    // where partition count of topic in Kafka is less than what is specified in the topic configuration collection
    // ...can't remove partitions, only add them
    const topicsToUpdate = _.intersectionWith(
      topics,
      topicsMetadata,
      (topicConfig, topicMetadata) =>
        _.get(topicConfig, "topic") === _.get(topicMetadata, "name") &&
        _.get(topicConfig, "numPartitions") > _.get(topicMetadata, "partitions", []).length,
    );
 
    // Create a collection to update topic partitioning
    const partitionConfig = _.map(topicsToUpdate, (topic) => ({
      topic: _.get(topic, "topic"),
      count: _.get(topic, "numPartitions"),
    }));
 
    // Create a collection to allow querying of topic configuration
    const configOptions = _.map(topicsMetadata, (topic) => ({
      name: _.get(topic, "name"),
      type: ConfigResourceTypes.TOPIC,
    }));
 
    // Query topic configuration
    const configs =
      configOptions.length !== 0
        ? await admin.describeConfigs({
            resources: configOptions,
            includeSynonyms: false,
          })
        : [];
 
    console.log("Topics to Create:", JSON.stringify(topicsToCreate, null, 2));
    console.log("Topics to Update:", JSON.stringify(topicsToUpdate, null, 2));
    console.log("Partitions to Update:", JSON.stringify(partitionConfig, null, 2));
    console.log("Topic configuration options:", JSON.stringify(configs, null, 2));
 
    // Create topics that don't exist in MSK
    await admin.createTopics({ topics: topicsToCreate });
 
    // If any topics have fewer partitions in MSK than in the configuration, add those partitions
    Eif (partitionConfig.length > 0) {
      await admin.createPartitions({ topicPartitions: partitionConfig });
    }
 
    await admin.disconnect();
  };
 
  await create();
}
 
export async function deleteTopics(brokerString: string, topicList: string[]) {
  // Check that each topic in the list is something we can delete
  for (const topic of topicList) {
    if (!topic.match(/.*--.*--.*--.*/g)) {
      throw new Error(
        "ERROR: The deleteTopics function only operates against topics that match /.*--.*--.*--.*/g",
      );
    }
  }
 
  const brokers = brokerString.split(",");
 
  const kafka = new Kafka({
    clientId: "admin",
    brokers: brokers,
    ssl: {
      rejectUnauthorized: false,
    },
    requestTimeout: 295000, // 5s short of the lambda function's timeout
  });
  const admin = kafka.admin();
 
  await admin.connect();
 
  const currentTopics = await admin.listTopics();
 
  const topicsToDelete = _.filter(currentTopics, (currentTopic) =>
    topicList.some((pattern) => !!currentTopic.match(pattern)),
  );
 
  console.log(`Deleting topics: ${topicsToDelete}`);
  await admin.deleteTopics({
    topics: topicsToDelete,
    timeout: 295000,
  });
 
  await admin.disconnect();
}