Press n or j to go to the next uncovered block, b, p or k for the previous block.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 | 1x 1x 1x 1x 1x 1x 3x 1x 1x 1x 1x 7x 1x 9x 2x 3x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 2x 2x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x | import { ConfigResourceTypes, Kafka } from "kafkajs"; import * as _ from "lodash"; interface TopicConfig { topic: string; numPartitions: number; // Add other properties as needed } export async function createTopics(brokerString: string, topicsConfig: TopicConfig[]) { const topics = topicsConfig; const brokers = brokerString.split(","); const kafka = new Kafka({ clientId: "admin", brokers: brokers, ssl: true, }); const admin = kafka.admin(); const create = async () => { await admin.connect(); // Fetch topics from MSK and filter out __ internal management topic const existingTopicList = _.filter(await admin.listTopics(), (n) => !n.startsWith("_")); console.log("Existing topics:", JSON.stringify(existingTopicList, null, 2)); // Fetch the metadata for the topics in MSK const topicsMetadata = _.get( await admin.fetchTopicMetadata({ topics: existingTopicList }), "topics", ); console.log("Topics Metadata:", JSON.stringify(topicsMetadata, null, 2)); // Diff the existing topics array with the topic configuration collection const topicsToCreate = _.differenceWith( topics, existingTopicList, (topicConfig, topic) => _.get(topicConfig, "topic") === topic, ); // Find intersection of topics metadata collection with topic configuration collection // where partition count of topic in Kafka is less than what is specified in the topic configuration collection // ...can't remove partitions, only add them const topicsToUpdate = _.intersectionWith( topics, topicsMetadata, (topicConfig, topicMetadata) => _.get(topicConfig, "topic") === _.get(topicMetadata, "name") && _.get(topicConfig, "numPartitions") > _.get(topicMetadata, "partitions", []).length, ); // Create a collection to update topic partitioning const partitionConfig = _.map(topicsToUpdate, (topic) => ({ topic: _.get(topic, "topic"), count: _.get(topic, "numPartitions"), })); // Create a collection to allow querying of topic configuration const configOptions = _.map(topicsMetadata, (topic) => ({ name: _.get(topic, "name"), type: ConfigResourceTypes.TOPIC, })); // Query topic configuration const configs = configOptions.length !== 0 ? await admin.describeConfigs({ resources: configOptions, includeSynonyms: false, }) : []; console.log("Topics to Create:", JSON.stringify(topicsToCreate, null, 2)); console.log("Topics to Update:", JSON.stringify(topicsToUpdate, null, 2)); console.log("Partitions to Update:", JSON.stringify(partitionConfig, null, 2)); console.log("Topic configuration options:", JSON.stringify(configs, null, 2)); // Create topics that don't exist in MSK await admin.createTopics({ topics: topicsToCreate }); // If any topics have fewer partitions in MSK than in the configuration, add those partitions Eif (partitionConfig.length > 0) { await admin.createPartitions({ topicPartitions: partitionConfig }); } await admin.disconnect(); }; await create(); } export async function deleteTopics(brokerString: string, topicList: string[]) { // Check that each topic in the list is something we can delete for (const topic of topicList) { if (!topic.match(/.*--.*--.*--.*/g)) { throw new Error( "ERROR: The deleteTopics function only operates against topics that match /.*--.*--.*--.*/g", ); } } const brokers = brokerString.split(","); const kafka = new Kafka({ clientId: "admin", brokers: brokers, ssl: { rejectUnauthorized: false, }, requestTimeout: 295000, // 5s short of the lambda function's timeout }); const admin = kafka.admin(); await admin.connect(); const currentTopics = await admin.listTopics(); const topicsToDelete = _.filter(currentTopics, (currentTopic) => topicList.some((pattern) => !!currentTopic.match(pattern)), ); console.log(`Deleting topics: ${topicsToDelete}`); await admin.deleteTopics({ topics: topicsToDelete, timeout: 295000, }); await admin.disconnect(); } |