|
@@ -13,6 +13,12 @@ const ACTION_STATE = {
|
|
|
close: 'OFF'
|
|
close: 'OFF'
|
|
|
};
|
|
};
|
|
|
|
|
|
|
|
|
|
+function logMqtt(direction, message, details = {}) {
|
|
|
|
|
+ const timestamp = dayjs().format('YYYY-MM-DD HH:mm:ss');
|
|
|
|
|
+ const suffix = Object.keys(details).length ? ` ${JSON.stringify(details)}` : '';
|
|
|
|
|
+ console.log(`[${timestamp}] [MQTT:${direction}] ${message}${suffix}`);
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
class MqttService {
|
|
class MqttService {
|
|
|
constructor() {
|
|
constructor() {
|
|
|
this.client = null;
|
|
this.client = null;
|
|
@@ -41,22 +47,29 @@ class MqttService {
|
|
|
connectedAt: dayjs().format('YYYY-MM-DD HH:mm:ss')
|
|
connectedAt: dayjs().format('YYYY-MM-DD HH:mm:ss')
|
|
|
};
|
|
};
|
|
|
this.client.subscribe([latestTopicConfig.statusTopic, latestTopicConfig.telemetryTopic]);
|
|
this.client.subscribe([latestTopicConfig.statusTopic, latestTopicConfig.telemetryTopic]);
|
|
|
|
|
+ logMqtt('CONNECT', 'connected', {
|
|
|
|
|
+ url: latestTopicConfig.mqttUrl,
|
|
|
|
|
+ topics: [latestTopicConfig.statusTopic, latestTopicConfig.telemetryTopic]
|
|
|
|
|
+ });
|
|
|
});
|
|
});
|
|
|
|
|
|
|
|
this.client.on('reconnect', () => {
|
|
this.client.on('reconnect', () => {
|
|
|
this.status.message = '重连中';
|
|
this.status.message = '重连中';
|
|
|
this.status.connected = false;
|
|
this.status.connected = false;
|
|
|
|
|
+ logMqtt('RECONNECT', 'reconnecting');
|
|
|
});
|
|
});
|
|
|
|
|
|
|
|
this.client.on('close', () => {
|
|
this.client.on('close', () => {
|
|
|
this.status.connected = false;
|
|
this.status.connected = false;
|
|
|
this.status.message = '连接已断开';
|
|
this.status.message = '连接已断开';
|
|
|
|
|
+ logMqtt('CLOSE', 'connection closed');
|
|
|
});
|
|
});
|
|
|
|
|
|
|
|
this.client.on('error', (error) => {
|
|
this.client.on('error', (error) => {
|
|
|
this.status.connected = false;
|
|
this.status.connected = false;
|
|
|
this.status.message = '连接错误';
|
|
this.status.message = '连接错误';
|
|
|
this.status.lastError = error.message;
|
|
this.status.lastError = error.message;
|
|
|
|
|
+ logMqtt('ERROR', error.message);
|
|
|
});
|
|
});
|
|
|
|
|
|
|
|
this.client.on('message', (topic, payloadBuffer) => {
|
|
this.client.on('message', (topic, payloadBuffer) => {
|
|
@@ -117,8 +130,10 @@ class MqttService {
|
|
|
|
|
|
|
|
const publishAttempt = () => {
|
|
const publishAttempt = () => {
|
|
|
attempts += 1;
|
|
attempts += 1;
|
|
|
|
|
+ logMqtt('SEND', 'publish command', { topic, payload, channel, action, attempt: attempts });
|
|
|
this.client.publish(topic, payload, { qos: 0 }, (error) => {
|
|
this.client.publish(topic, payload, { qos: 0 }, (error) => {
|
|
|
if (error) {
|
|
if (error) {
|
|
|
|
|
+ logMqtt('ERROR', 'publish failed', { topic, payload, error: error.message });
|
|
|
cleanup();
|
|
cleanup();
|
|
|
resolve({ ok: false, message: error.message });
|
|
resolve({ ok: false, message: error.message });
|
|
|
}
|
|
}
|
|
@@ -129,6 +144,7 @@ class MqttService {
|
|
|
publishAttempt();
|
|
publishAttempt();
|
|
|
return;
|
|
return;
|
|
|
}
|
|
}
|
|
|
|
|
+ logMqtt('TIMEOUT', 'wait response timeout', { topic, payload, channel, action, attempts });
|
|
|
cleanup();
|
|
cleanup();
|
|
|
resolve({ ok: false, message: '等待设备回执超时。' });
|
|
resolve({ ok: false, message: '等待设备回执超时。' });
|
|
|
}, 5000);
|
|
}, 5000);
|
|
@@ -160,6 +176,7 @@ class MqttService {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
handleMessage(topic, payload) {
|
|
handleMessage(topic, payload) {
|
|
|
|
|
+ logMqtt('RECV', 'message received', { topic, payload });
|
|
|
const now = dayjs().toISOString();
|
|
const now = dayjs().toISOString();
|
|
|
db.prepare('INSERT INTO mqtt_messages (created_at, topic, payload) VALUES (?, ?, ?)').run(now, topic, payload);
|
|
db.prepare('INSERT INTO mqtt_messages (created_at, topic, payload) VALUES (?, ?, ?)').run(now, topic, payload);
|
|
|
|
|
|
|
@@ -222,6 +239,7 @@ class MqttService {
|
|
|
if (source === 'telemetry' && pending.action !== 'query') continue;
|
|
if (source === 'telemetry' && pending.action !== 'query') continue;
|
|
|
if (this.matchesPending(parsed, pending)) {
|
|
if (this.matchesPending(parsed, pending)) {
|
|
|
const prefix = source === 'telemetry' ? '已收到设备状态上报' : '设备已回执';
|
|
const prefix = source === 'telemetry' ? '已收到设备状态上报' : '设备已回执';
|
|
|
|
|
+ logMqtt('MATCH', prefix, { source, payload });
|
|
|
pending.resolve(`${prefix}:${payload}`);
|
|
pending.resolve(`${prefix}:${payload}`);
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|