目录
  • 前言
  • 原理
  • 简单的实现
    • 生产者
    • 延迟服务
    • 消费者
  • 改进点
    • 通用的延迟服务
    • 生产者负责延迟服务
  • 总结

    前言

    延迟队列是一个非常有用的工具,我们经常遇到需要使用延迟队列的场景,比如延迟通知,订单关闭等等。

    这篇文章主要是使用Go+Kafka实现延迟消息。

    使用了sarama客户端。

    原理

    Kafka实现延迟消息分为下面三步:

    • 生产者把消息发送到延迟队列
    • 延迟服务把延迟队列里超过延迟时间的消息写入真实队列
    • 消费者消费真实队列里的消息

    简单的实现

    生产者

    生产者只是把消息发送到延迟队列

    msg := &sarama.ProducerMessage{
       Topic: kafka_delay_queue_test.DelayTopic,
       Value: sarama.ByteEncoder("test" + strconv.Itoa(i)),
    }
    if _, _, err := producer.SendMessage(msg); err != nil {
       log.Println(err)
    }

    延迟服务

    延迟服务会订阅延迟队列的消息,并把超时消息发送到真实队列

    if err = consumerGroup.Consume(context.Background(),
       []string{kafka_delay_queue_test.DelayTopic}, consumer); err != nil {
       break
    }
    type Consumer struct {
       producer sarama.SyncProducer
       delay    time.Duration
    }
    
    func NewConsumer(producer sarama.SyncProducer, delay time.Duration) *Consumer {
       return &Consumer{
          producer: producer,
          delay:    delay,
       }
    }
    
    func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
       for message := range claim.Messages() {
          // 如果消息已经超时,把消息发送到真实队列
          now := time.Now()
          if now.Sub(message.Timestamp) >= c.delay {
             _, _, err := c.producer.SendMessage(&sarama.ProducerMessage{
                Topic: kafka_delay_queue_test.RealTopic,
                Key:   sarama.ByteEncoder(message.Key),
                Value: sarama.ByteEncoder(message.Value),
             })
             if err == nil {
                session.MarkMessage(message, "")
             }
             continue
          }
          // 否则休眠一秒
          time.Sleep(time.Second)
          return nil
       }
       return nil
    }

    消费者

    消费者只是订阅真实队列并消费消息

    if err = consumerGroup.Consume(context.Background(), 
       []string{kafka_delay_queue_test.RealTopic}, consumer); err != nil {
       break
    }
    type Consumer struct{}
    
    func NewConsumer() *Consumer {
       return &Consumer{}
    }
    
    func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
       for message := range claim.Messages() {
          fmt.Println("收到消息:", message.Value, message.Timestamp)
          session.MarkMessage(message, "")
       }
       return nil
    }

    改进点

    通用的延迟服务

    可以把延迟服务封装成一个通用的服务,这样生产者可以直接把消息发送给延迟服务,让延迟服务去处理剩下的逻辑。

    延迟服务可以提供多个延时等级,比如5s、10s、30s、1m、5m、10m、1h、2h等,类似于RocketMQ。

    生产者负责延迟服务

    也可以让生产者负责延迟服务,让生产者自己把延迟队列里面的消息发送到真实队列。

    下面是一个简单的实现:

    // KafkaDelayQueueProducer 延迟队列生产者,包含了生产者和延迟服务
    type KafkaDelayQueueProducer struct {
       producer   sarama.SyncProducer // 生产者
       delayTopic string              // 延迟服务主题
    }
    
    // NewKafkaDelayQueueProducer 创建延迟队列生产者
    // producer 生产者
    // delayServiceConsumerGroup 延迟服务消费者
    // delayTime 延迟时间
    // delayTopic 延迟服务主题
    // realTopic 真实队列主题
    func NewKafkaDelayQueueProducer(producer sarama.SyncProducer, delayServiceConsumerGroup sarama.ConsumerGroup,
       delayTime time.Duration, delayTopic, realTopic string) *KafkaDelayQueueProducer {
       // 启动延迟服务
       consumer := NewDelayServiceConsumer(producer, delayTime, realTopic)
       go func() {
          for {
             if err := delayServiceConsumerGroup.Consume(context.Background(),
                []string{delayTopic}, consumer); err != nil {
                break
             }
          }
       }()
       return &KafkaDelayQueueProducer{
          producer:   producer,
          delayTopic: delayTopic,
       }
    }
    
    // SendMessage 发送消息
    func (q *KafkaDelayQueueProducer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) {
       msg.Topic = q.delayTopic
       return q.producer.SendMessage(msg)
    }
    
    // DelayServiceConsumer 延迟服务消费者
    type DelayServiceConsumer struct {
       producer  sarama.SyncProducer
       delay     time.Duration
       realTopic string
    }
    
    func NewDelayServiceConsumer(producer sarama.SyncProducer, delay time.Duration,
       realTopic string) *DelayServiceConsumer {
       return &DelayServiceConsumer{
          producer:  producer,
          delay:     delay,
          realTopic: realTopic,
       }
    }
    
    func (c *DelayServiceConsumer) ConsumeClaim(session sarama.ConsumerGroupSession,
       claim sarama.ConsumerGroupClaim) error {
       for message := range claim.Messages() {
          // 如果消息已经超时,把消息发送到真实队列
          now := time.Now()
          if now.Sub(message.Timestamp) >= c.delay {
             _, _, err := c.producer.SendMessage(&sarama.ProducerMessage{
                Topic: c.realTopic,
                Key:   sarama.ByteEncoder(message.Key),
                Value: sarama.ByteEncoder(message.Value),
             })
             if err == nil {
                session.MarkMessage(message, "")
             }
             continue
          }
          // 否则休眠一秒
          time.Sleep(time.Second)
          return nil
       }
       return nil
    }
    
    func (c *DelayServiceConsumer) Setup(sarama.ConsumerGroupSession) error {
       return nil
    }
    
    func (c *DelayServiceConsumer) Cleanup(sarama.ConsumerGroupSession) error {
       return nil
    }

    总结

    使用中间队列+轮询可以很容易的在Kafka实现延迟消息,如果需要一个通用的延迟队列也可以实现一个通用的延迟服务,也可以让消费者负责延迟服务的功能。

    完整代码:

    • 简单实现例子:https://github.com/jiaxwu/dq/blob/main/kafka_delay_queue_producer.go
    • 包含延迟服务的生产者:https://github.com/jiaxwu/dq/tree/main/kafka_delay_queue_example
    声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。