🐛 fix: kafka producer bugs (#3771)

* 🐛 fix: missing Kafka Producer SSL option in frontend object

Signed-off-by: Muhammed Hussein Karimi <info@karimi.dev>

* ♻️  refactor: better error handling of kafka producer

Signed-off-by: Muhammed Hussein Karimi <info@karimi.dev>

---------

Signed-off-by: Muhammed Hussein Karimi <info@karimi.dev>
This commit is contained in:
Muhammed Hussein karimi 2023-09-23 23:00:15 +03:30 committed by GitHub
parent 90d0e8ccde
commit 2ab21ccf8a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 16 additions and 13 deletions

View File

@ -286,22 +286,22 @@ exports.kafkaProducerAsync = function (brokers, topic, message, options = {}, sa
producer.connect().then( producer.connect().then(
() => { () => {
try { producer.send({
producer.send({ topic: topic,
topic: topic, messages: [{
messages: [{ value: message,
value: message, }],
}], }).then((_) => {
});
connectedToKafka = true;
clearTimeout(timeoutID);
resolve("Message sent successfully"); resolve("Message sent successfully");
} catch (e) { }).catch((e) => {
connectedToKafka = true; connectedToKafka = true;
producer.disconnect(); producer.disconnect();
clearTimeout(timeoutID); clearTimeout(timeoutID);
reject(new Error("Error sending message: " + e.message)); reject(new Error("Error sending message: " + e.message));
} }).finally(() => {
connectedToKafka = true;
clearTimeout(timeoutID);
});
} }
).catch( ).catch(
(e) => { (e) => {
@ -313,8 +313,10 @@ exports.kafkaProducerAsync = function (brokers, topic, message, options = {}, sa
); );
producer.on("producer.network.request_timeout", (_) => { producer.on("producer.network.request_timeout", (_) => {
clearTimeout(timeoutID); if (!connectedToKafka) {
reject(new Error("producer.network.request_timeout")); clearTimeout(timeoutID);
reject(new Error("producer.network.request_timeout"));
}
}); });
producer.on("producer.disconnect", (_) => { producer.on("producer.disconnect", (_) => {

View File

@ -880,6 +880,7 @@ const monitorDefaults = {
kafkaProducerSaslOptions: { kafkaProducerSaslOptions: {
mechanism: "None", mechanism: "None",
}, },
kafkaProducerSsl: false,
gamedigGivenPortOnly: true, gamedigGivenPortOnly: true,
}; };