diff --git a/server/model/monitor.js b/server/model/monitor.js index bb2c9e852..2ea2f958c 100644 --- a/server/model/monitor.js +++ b/server/model/monitor.js @@ -8,7 +8,7 @@ const { log, UP, DOWN, PENDING, MAINTENANCE, flipStatus, MAX_INTERVAL_SECOND, MI PING_COUNT_MIN, PING_COUNT_MAX, PING_COUNT_DEFAULT, PING_PER_REQUEST_TIMEOUT_MIN, PING_PER_REQUEST_TIMEOUT_MAX, PING_PER_REQUEST_TIMEOUT_DEFAULT } = require("../../src/util"); -const { ping, checkCertificate, checkStatusCode, getTotalClientInRoom, setting, mssqlQuery, postgresQuery, mysqlQuery, setSetting, httpNtlm, radius, grpcQuery, +const { ping, checkCertificate, checkStatusCode, getTotalClientInRoom, setting, mssqlQuery, postgresQuery, mysqlQuery, setSetting, httpNtlm, radius, kafkaProducerAsync, getOidcTokenClientCredentials, rootCertificatesFingerprints, axiosAbortSignal } = require("../util-server"); const { R } = require("redbean-node"); @@ -784,37 +784,6 @@ class Monitor extends BeanModel { bean.msg = ""; bean.status = UP; bean.ping = dayjs().valueOf() - startTime; - } else if (this.type === "grpc-keyword") { - let startTime = dayjs().valueOf(); - const options = { - grpcUrl: this.grpcUrl, - grpcProtobufData: this.grpcProtobuf, - grpcServiceName: this.grpcServiceName, - grpcEnableTls: this.grpcEnableTls, - grpcMethod: this.grpcMethod, - grpcBody: this.grpcBody, - }; - const response = await grpcQuery(options); - bean.ping = dayjs().valueOf() - startTime; - log.debug("monitor:", `gRPC response: ${JSON.stringify(response)}`); - let responseData = response.data; - if (responseData.length > 50) { - responseData = responseData.toString().substring(0, 47) + "..."; - } - if (response.code !== 1) { - bean.status = DOWN; - bean.msg = `Error in send gRPC ${response.code} ${response.errorMessage}`; - } else { - let keywordFound = response.data.toString().includes(this.keyword); - if (keywordFound === !this.isInvertKeyword()) { - bean.status = UP; - bean.msg = `${responseData}, keyword [${this.keyword}] ${keywordFound ? "is" : "not"} found`; - } else { - log.debug("monitor:", `GRPC response [${response.data}] + ", but keyword [${this.keyword}] is ${keywordFound ? "present" : "not"} in [" + ${response.data} + "]"`); - bean.status = DOWN; - bean.msg = `, but keyword [${this.keyword}] is ${keywordFound ? "present" : "not"} in [" + ${responseData} + "]`; - } - } } else if (this.type === "postgres") { let startTime = dayjs().valueOf(); diff --git a/server/monitor-types/grpc.js b/server/monitor-types/grpc.js new file mode 100644 index 000000000..ebc03b8c8 --- /dev/null +++ b/server/monitor-types/grpc.js @@ -0,0 +1,89 @@ +const { MonitorType } = require("./monitor-type"); +const { UP, log } = require("../../src/util"); +const dayjs = require("dayjs"); +const grpc = require("@grpc/grpc-js"); +const protojs = require("protobufjs"); + +class GrpcKeywordMonitorType extends MonitorType { + name = "grpc-keyword"; + + /** + * @inheritdoc + */ + async check(monitor, heartbeat, _server) { + const startTime = dayjs().valueOf(); + const service = this.constructGrpcService(monitor.grpcUrl, monitor.grpcProtobuf, monitor.grpcServiceName, monitor.grpcEnableTls); + let response = await this.grpcQuery(service, monitor.grpcMethod, monitor.grpcBody); + heartbeat.ping = dayjs().valueOf() - startTime; + log.debug(this.name, "gRPC response:", response); + let keywordFound = response.toString().includes(monitor.keyword); + if (keywordFound !== !monitor.isInvertKeyword()) { + log.debug(this.name, `GRPC response [${response}] + ", but keyword [${monitor.keyword}] is ${keywordFound ? "present" : "not"} in [" + ${response} + "]"`); + + let truncatedResponse = (response.length > 50) ? response.toString().substring(0, 47) + "..." : response; + + throw new Error(`keyword [${monitor.keyword}] is ${keywordFound ? "present" : "not"} in [" + ${truncatedResponse} + "]`); + } + heartbeat.status = UP; + heartbeat.msg = `${response}, keyword [${monitor.keyword}] ${keywordFound ? "is" : "not"} found`; + } + + /** + * Create gRPC client + * @param {string} url grpc Url + * @param {string} protobufData grpc ProtobufData + * @param {string} serviceName grpc ServiceName + * @param {string} enableTls grpc EnableTls + * @returns {grpc.Service} grpc Service + */ + constructGrpcService(url, protobufData, serviceName, enableTls) { + const protocObject = protojs.parse(protobufData); + const protoServiceObject = protocObject.root.lookupService(serviceName); + const Client = grpc.makeGenericClientConstructor({}); + const credentials = enableTls ? grpc.credentials.createSsl() : grpc.credentials.createInsecure(); + const client = new Client(url, credentials); + return protoServiceObject.create((method, requestData, cb) => { + const fullServiceName = method.fullName; + const serviceFQDN = fullServiceName.split("."); + const serviceMethod = serviceFQDN.pop(); + const serviceMethodClientImpl = `/${serviceFQDN.slice(1).join(".")}/${serviceMethod}`; + log.debug(this.name, `gRPC method ${serviceMethodClientImpl}`); + client.makeUnaryRequest( + serviceMethodClientImpl, + arg => arg, + arg => arg, + requestData, + cb); + }, false, false); + } + + /** + * Create gRPC client stib + * @param {grpc.Service} service grpc Url + * @param {string} method grpc Method + * @param {string} body grpc Body + * @returns {Promise} Result of gRPC query + */ + async grpcQuery(service, method, body) { + return new Promise((resolve, reject) => { + try { + service[method](JSON.parse(body), (err, response) => { + if (err) { + if (err.code !== 1) { + reject(err); + } + log.debug(this.name, `ignoring ${err.code} ${err.details}, as code=1 is considered OK`); + resolve(`${err.code} is considered OK because ${err.details}`); + } + resolve(JSON.stringify(response)); + }); + } catch (err) { + reject(err); + } + }); + } +} + +module.exports = { + GrpcKeywordMonitorType, +}; diff --git a/server/uptime-kuma-server.js b/server/uptime-kuma-server.js index 107b54671..d91f6be81 100644 --- a/server/uptime-kuma-server.js +++ b/server/uptime-kuma-server.js @@ -117,6 +117,7 @@ class UptimeKumaServer { UptimeKumaServer.monitorTypeList["smtp"] = new SMTPMonitorType(); UptimeKumaServer.monitorTypeList["group"] = new GroupMonitorType(); UptimeKumaServer.monitorTypeList["snmp"] = new SNMPMonitorType(); + UptimeKumaServer.monitorTypeList["grpc-keyword"] = new GrpcKeywordMonitorType(); UptimeKumaServer.monitorTypeList["mongodb"] = new MongodbMonitorType(); UptimeKumaServer.monitorTypeList["rabbitmq"] = new RabbitMqMonitorType(); UptimeKumaServer.monitorTypeList["port"] = new TCPMonitorType(); @@ -561,6 +562,7 @@ const { MqttMonitorType } = require("./monitor-types/mqtt"); const { SMTPMonitorType } = require("./monitor-types/smtp"); const { GroupMonitorType } = require("./monitor-types/group"); const { SNMPMonitorType } = require("./monitor-types/snmp"); +const { GrpcKeywordMonitorType } = require("./monitor-types/grpc"); const { MongodbMonitorType } = require("./monitor-types/mongodb"); const { RabbitMqMonitorType } = require("./monitor-types/rabbitmq"); const { TCPMonitorType } = require("./monitor-types/tcp.js"); diff --git a/server/util-server.js b/server/util-server.js index 6365b623c..250f897ba 100644 --- a/server/util-server.js +++ b/server/util-server.js @@ -16,8 +16,6 @@ const postgresConParse = require("pg-connection-string").parse; const mysql = require("mysql2"); const { NtlmClient } = require("./modules/axios-ntlm/lib/ntlmClient.js"); const { Settings } = require("./settings"); -const grpc = require("@grpc/grpc-js"); -const protojs = require("protobufjs"); const RadiusClient = require("./radius-client"); const oidc = require("openid-client"); const tls = require("tls"); @@ -892,64 +890,6 @@ module.exports.timeObjectToLocal = (obj, timezone = undefined) => { return timeObjectConvertTimezone(obj, timezone, false); }; -/** - * Create gRPC client stib - * @param {object} options from gRPC client - * @returns {Promise} Result of gRPC query - */ -module.exports.grpcQuery = async (options) => { - const { grpcUrl, grpcProtobufData, grpcServiceName, grpcEnableTls, grpcMethod, grpcBody } = options; - const protocObject = protojs.parse(grpcProtobufData); - const protoServiceObject = protocObject.root.lookupService(grpcServiceName); - const Client = grpc.makeGenericClientConstructor({}); - const credentials = grpcEnableTls ? grpc.credentials.createSsl() : grpc.credentials.createInsecure(); - const client = new Client( - grpcUrl, - credentials - ); - const grpcService = protoServiceObject.create(function (method, requestData, cb) { - const fullServiceName = method.fullName; - const serviceFQDN = fullServiceName.split("."); - const serviceMethod = serviceFQDN.pop(); - const serviceMethodClientImpl = `/${serviceFQDN.slice(1).join(".")}/${serviceMethod}`; - log.debug("monitor", `gRPC method ${serviceMethodClientImpl}`); - client.makeUnaryRequest( - serviceMethodClientImpl, - arg => arg, - arg => arg, - requestData, - cb); - }, false, false); - return new Promise((resolve, _) => { - try { - return grpcService[`${grpcMethod}`](JSON.parse(grpcBody), function (err, response) { - const responseData = JSON.stringify(response); - if (err) { - return resolve({ - code: err.code, - errorMessage: err.details, - data: "" - }); - } else { - log.debug("monitor:", `gRPC response: ${JSON.stringify(response)}`); - return resolve({ - code: 1, - errorMessage: "", - data: responseData - }); - } - }); - } catch (err) { - return resolve({ - code: -1, - errorMessage: `Error ${err}. Please review your gRPC configuration option. The service name must not include package name value, and the method name must follow camelCase format`, - data: "" - }); - } - - }); -}; - /** * Returns an array of SHA256 fingerprints for all known root certificates. * @returns {Set} A set of SHA256 fingerprints. diff --git a/test/backend-test/test-grpc.js b/test/backend-test/test-grpc.js new file mode 100644 index 000000000..31b588cff --- /dev/null +++ b/test/backend-test/test-grpc.js @@ -0,0 +1,306 @@ +const { describe, test } = require("node:test"); +const assert = require("node:assert"); +const grpc = require("@grpc/grpc-js"); +const protoLoader = require("@grpc/proto-loader"); +const { GrpcKeywordMonitorType } = require("../../server/monitor-types/grpc"); +const { UP, PENDING } = require("../../src/util"); +const fs = require("fs"); +const path = require("path"); +const os = require("os"); + +const testProto = ` +syntax = "proto3"; +package test; + +service TestService { + rpc Echo (EchoRequest) returns (EchoResponse); +} + +message EchoRequest { + string message = 1; +} + +message EchoResponse { + string message = 1; +} +`; + +/** + * Create a gRPC server for testing + * @param {number} port Port to listen on + * @param {object} methodHandlers Object with method handlers + * @returns {Promise} gRPC server instance + */ +async function createTestGrpcServer(port, methodHandlers) { + // Write proto to temp file + const tmpDir = os.tmpdir(); + const protoPath = path.join(tmpDir, `test-${port}.proto`); + fs.writeFileSync(protoPath, testProto); + + // Load proto file + const packageDefinition = protoLoader.loadSync(protoPath, { + keepCase: true, + longs: String, + enums: String, + defaults: true, + oneofs: true + }); + const protoDescriptor = grpc.loadPackageDefinition(packageDefinition); + const testPackage = protoDescriptor.test; + + const server = new grpc.Server(); + + // Add service implementation + server.addService(testPackage.TestService.service, { + Echo: (call, callback) => { + if (methodHandlers.Echo) { + methodHandlers.Echo(call, callback); + } else { + callback(null, { message: call.request.message }); + } + }, + }); + + return new Promise((resolve, reject) => { + server.bindAsync( + `0.0.0.0:${port}`, + grpc.ServerCredentials.createInsecure(), + (err) => { + if (err) { + reject(err); + } else { + server.start(); + // Clean up temp file + fs.unlinkSync(protoPath); + resolve(server); + } + } + ); + }); +} + +describe("GrpcKeywordMonitorType", { + skip: !!process.env.CI && (process.platform !== "linux" || process.arch !== "x64"), +}, () => { + test("gRPC keyword found in response", async () => { + const port = 50051; + const server = await createTestGrpcServer(port, { + Echo: (call, callback) => { + callback(null, { message: "Hello World with SUCCESS keyword" }); + } + }); + + const grpcMonitor = new GrpcKeywordMonitorType(); + const monitor = { + grpcUrl: `localhost:${port}`, + grpcProtobuf: testProto, + grpcServiceName: "test.TestService", + grpcMethod: "echo", + grpcBody: JSON.stringify({ message: "test" }), + keyword: "SUCCESS", + invertKeyword: false, + grpcEnableTls: false, + isInvertKeyword: () => false, + }; + + const heartbeat = { + msg: "", + status: PENDING, + }; + + try { + await grpcMonitor.check(monitor, heartbeat, {}); + assert.strictEqual(heartbeat.status, UP); + assert.ok(heartbeat.msg.includes("SUCCESS")); + assert.ok(heartbeat.msg.includes("is")); + } finally { + server.forceShutdown(); + } + }); + + test("gRPC keyword not found in response", async () => { + const port = 50052; + const server = await createTestGrpcServer(port, { + Echo: (call, callback) => { + callback(null, { message: "Hello World without the expected keyword" }); + } + }); + + const grpcMonitor = new GrpcKeywordMonitorType(); + const monitor = { + grpcUrl: `localhost:${port}`, + grpcProtobuf: testProto, + grpcServiceName: "test.TestService", + grpcMethod: "echo", + grpcBody: JSON.stringify({ message: "test" }), + keyword: "MISSING", + invertKeyword: false, + grpcEnableTls: false, + isInvertKeyword: () => false, + }; + + const heartbeat = { + msg: "", + status: PENDING, + }; + + try { + await assert.rejects( + grpcMonitor.check(monitor, heartbeat, {}), + (err) => { + assert.ok(err.message.includes("MISSING")); + assert.ok(err.message.includes("not")); + return true; + } + ); + } finally { + server.forceShutdown(); + } + }); + + test("gRPC inverted keyword - keyword present (should fail)", async () => { + const port = 50053; + const server = await createTestGrpcServer(port, { + Echo: (call, callback) => { + callback(null, { message: "Response with ERROR keyword" }); + } + }); + + const grpcMonitor = new GrpcKeywordMonitorType(); + const monitor = { + grpcUrl: `localhost:${port}`, + grpcProtobuf: testProto, + grpcServiceName: "test.TestService", + grpcMethod: "echo", + grpcBody: JSON.stringify({ message: "test" }), + keyword: "ERROR", + invertKeyword: true, + grpcEnableTls: false, + isInvertKeyword: () => true, + }; + + const heartbeat = { + msg: "", + status: PENDING, + }; + + try { + await assert.rejects( + grpcMonitor.check(monitor, heartbeat, {}), + (err) => { + assert.ok(err.message.includes("ERROR")); + assert.ok(err.message.includes("present")); + return true; + } + ); + } finally { + server.forceShutdown(); + } + }); + + test("gRPC inverted keyword - keyword not present (should pass)", async () => { + const port = 50054; + const server = await createTestGrpcServer(port, { + Echo: (call, callback) => { + callback(null, { message: "Response without error keyword" }); + } + }); + + const grpcMonitor = new GrpcKeywordMonitorType(); + const monitor = { + grpcUrl: `localhost:${port}`, + grpcProtobuf: testProto, + grpcServiceName: "test.TestService", + grpcMethod: "echo", + grpcBody: JSON.stringify({ message: "test" }), + keyword: "ERROR", + invertKeyword: true, + grpcEnableTls: false, + isInvertKeyword: () => true, + }; + + const heartbeat = { + msg: "", + status: PENDING, + }; + + try { + await grpcMonitor.check(monitor, heartbeat, {}); + assert.strictEqual(heartbeat.status, UP); + assert.ok(heartbeat.msg.includes("ERROR")); + assert.ok(heartbeat.msg.includes("not")); + } finally { + server.forceShutdown(); + } + }); + + test("gRPC connection failure", async () => { + const grpcMonitor = new GrpcKeywordMonitorType(); + const monitor = { + grpcUrl: "localhost:50099", + grpcProtobuf: testProto, + grpcServiceName: "test.TestService", + grpcMethod: "echo", + grpcBody: JSON.stringify({ message: "test" }), + keyword: "SUCCESS", + invertKeyword: false, + grpcEnableTls: false, + isInvertKeyword: () => false, + }; + + const heartbeat = { + msg: "", + status: PENDING, + }; + + await assert.rejects( + grpcMonitor.check(monitor, heartbeat, {}), + (err) => { + // Should fail with connection error + return true; + } + ); + }); + + test("gRPC response truncation for long messages", async () => { + const port = 50055; + const longMessage = "A".repeat(100) + " with SUCCESS keyword"; + + const server = await createTestGrpcServer(port, { + Echo: (call, callback) => { + callback(null, { message: longMessage }); + } + }); + + const grpcMonitor = new GrpcKeywordMonitorType(); + const monitor = { + grpcUrl: `localhost:${port}`, + grpcProtobuf: testProto, + grpcServiceName: "test.TestService", + grpcMethod: "echo", + grpcBody: JSON.stringify({ message: "test" }), + keyword: "MISSING", + invertKeyword: false, + grpcEnableTls: false, + isInvertKeyword: () => false, + }; + + const heartbeat = { + msg: "", + status: PENDING, + }; + + try { + await assert.rejects( + grpcMonitor.check(monitor, heartbeat, {}), + (err) => { + // Should truncate message to 50 characters with "..." + assert.ok(err.message.includes("...")); + return true; + } + ); + } finally { + server.forceShutdown(); + } + }); +}); diff --git a/test/manual-test-grpc/echo.proto b/test/manual-test-grpc/echo.proto new file mode 100644 index 000000000..39ae6a66a --- /dev/null +++ b/test/manual-test-grpc/echo.proto @@ -0,0 +1,7 @@ +syntax = "proto3"; +package echo; +service EchoService { + rpc Echo (EchoRequest) returns (EchoResponse); +} +message EchoRequest { string message = 1; } +message EchoResponse { string message = 1; } diff --git a/test/manual-test-grpc/simple-grpc-server.js b/test/manual-test-grpc/simple-grpc-server.js new file mode 100644 index 000000000..91a401e47 --- /dev/null +++ b/test/manual-test-grpc/simple-grpc-server.js @@ -0,0 +1,22 @@ +const grpc = require("@grpc/grpc-js"); +const protoLoader = require("@grpc/proto-loader"); +const packageDef = protoLoader.loadSync("echo.proto", {}); +const grpcObject = grpc.loadPackageDefinition(packageDef); +const { echo } = grpcObject; + +/** + * Echo service implementation + * @param {object} call Call object + * @param {Function} callback Callback function + * @returns {void} + */ +function Echo(call, callback) { + callback(null, { message: call.request.message }); +} + +const server = new grpc.Server(); +server.addService(echo.EchoService.service, { Echo }); +server.bindAsync("0.0.0.0:50051", grpc.ServerCredentials.createInsecure(), () => { + console.log("gRPC server running on :50051"); + server.start(); +});