在做项目过程中,实现websocket得时候,不知道哪里写的不太合适,客户端消息收到一定程度,剩下的消息收不到,修改了缓冲区大小,还是没有解决问题,后面因为项目结束期比较紧张,没有时间调试消息的时候,改用了redis队列去做了消息得暂存,客户端轮询去拿对应的消息。
1.生产者随机发布消息,用rpush发布。
2.消费者用lpop订阅消费,一旦没有消息,随机休眠。
redis做消息队列的缺点:没有持久化。一旦消息没有人消费,积累到一定程度后就会丢失
package main
import (
"fmt"
"time"
"os"
"strconv"
"math/rand"
"github.com/gomodule/redigo/redis"
)
const RMQ string = "mqtest"
func producer() {
redis_conn, err := redis.Dial("tcp", "127.0.0.1:6379", redis.DialPassword("hdiot"))
if err != nil {
fmt.Println(err)
return
}
defer redis_conn.Close()
rand.Seed(time.Now().UnixNano())
var i = 1
for {
_,err = redis_conn.Do("rpush", RMQ, strconv.Itoa(i))
if(err!=nil) {
fmt.Println("produce error")
continue
}
fmt.Println("produce element:%d", i)
time.Sleep(time.Duration(rand.Intn(10))*time.Second)
i++
}
}
func consumer() {
redis_conn, err := redis.Dial("tcp", "127.0.0.1:6379", redis.DialPassword("hdiot"))
if err != nil {
fmt.Println(err)
return
}
defer redis_conn.Close()
rand.Seed(time.Now().UnixNano())
for {
ele,err := redis.String(redis_conn.Do("lpop", RMQ))
if(err != nil) {
fmt.Println("no msg.sleep now")
time.Sleep(time.Duration(rand.Intn(10))*time.Second)
} else {
fmt.Println("cosume element:%s", ele)
}
}
}
func main() {
list := os.Args
if(list[1] == "pro") {
go producer()
} else if (list[1] == "con") {
go consumer()
}
for {
time.Sleep(time.Duration(10000)*time.Second)
}
}
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。

评论(0)