You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

90 lines
2.7 KiB

package internal
import (
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/gogf/gf/v2/errors/gerror"
)
var client mqtt.Client
func NewMqttClient(registerCallback mqtt.MessageHandler, callback mqtt.MessageHandler, server string) error {
if client != nil {
if client.IsConnected() == true {
client.Disconnect(1)
}
client = nil
}
if client == nil {
opts := mqtt.NewClientOptions()
opts.AddBroker(server) // 这个中转服务器不需要任何账号密码
opts.SetClientID("go_mqtt_client1")
opts.KeepAlive = 10
// opts.SetUsername("")
// opts.SetPassword("")
opts.OnConnect = func(c mqtt.Client) {
fmt.Println("MQTT链接成功!")
//ClientSend("client", 0, false, msg)
//ClientSend("client", 0, false, `{"reqId":3,"moduleId":"webadmin","body":{"uids":"lq0001","gm":0,"moduleType":"changeGM","moduleId":"webadmin","clientId":"pomelo_cli_1669166814692","username":"monitor"}}`)
ClientSend("register", 0, false, `{"type":"client","id":1,"username":"monitor","password":"monitor","md5":false}`)
//ClientSend("monitor", 0, false, `{"type":"client","moduleId":"webadmin","body":{"uids":"","gm":1,"moduleType":"changeGM","moduleId":"webadmin"}}`)
}
opts.OnConnectionLost = func(c mqtt.Client, err error) {
//NewClient()
}
client = mqtt.NewClient(opts)
}
if token := client.Connect(); token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
return gerror.New("GM工具无法连接游戏服务器")
}
client.Subscribe("register", 0, registerCallback)
client.Subscribe("client", 0, callback)
return nil
}
func ClientSend(topic string, qos byte, retained bool, payload interface{}) error {
token := client.Publish(topic, qos, retained, payload)
if token != nil {
token.Wait()
if token.Error() != nil {
fmt.Println(topic+"失败!", token.Error())
return token.Error()
}
}
fmt.Println("消息发布成功!" + topic)
return nil
}
// 订阅消息
func ClientSubscribe(topic string, qos byte, callback mqtt.MessageHandler, err func(error)) {
if token := client.Subscribe(topic, qos, func(c mqtt.Client, msg mqtt.Message) {
callback(c, msg)
}); token.Wait() && token.Error() != nil {
err(token.Error())
}
}
func errorcallback(err error) {
fmt.Println("err:", err.Error())
}
func registerCallback(c mqtt.Client, msg mqtt.Message) {
fmt.Println("registerCallback:", string(msg.Payload()))
ClientSend("client", 0, false, `{"reqId":3,"moduleId":"webadmin","body":{"uids":"lq0001","gm":1,"moduleType":"changeGM"}}`)
ClientSend("client", 0, false, `{"reqId":3,"moduleId":"onlineUser","body":{}}`)
}
func clientCallback(c mqtt.Client, msg mqtt.Message) {
fmt.Println("clientCallback:", string(msg.Payload()))
}