博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
3.2 消息应答
阅读量:37441 次
发布时间:2020-12-04

本文共 3157 字,大约阅读时间需要 10 分钟。

目录


概述

代码实现

生产者

public class Task02 {    private static final String TASK_QUEUE_NAME = "ack_queue";    public static void main(String[] argv) throws Exception {        try (Channel channel = RabbitMqUtils.getChannel()) {        channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);        Scanner sc = new Scanner(System.in);        System.out.println("请输入信息");            while (sc.hasNext()) {                String message = sc.nextLine();                channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("UTF-8"));                System.out.println("生产者发出消息" + message);             }         }     }}

 消费者1

import com.aa.Utils.RabbitMqUtils;import com.aa.Utils.SleepUtils;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import java.nio.charset.StandardCharsets;public class Worker01 {    public static final String TASK_QUEUE_NAME = "queueName";    public static void main(String[] args) throws Exception{        Channel channel = RabbitMqUtils.getChannel();        DeliverCallback deliverCallback = (consumerTag, delivery) -> {            System.out.println("C1消息等待消息, 处理时间较短");            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);            SleepUtils.sleep(1);            System.out.println("收到消息 :" + message);            /*        是否批量发应答消息             1.消息标记 tag             2.是否批量应答未应答消息            */            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);        };        CancelCallback cancelCallback = (consumerTag) -> {            System.out.println(consumerTag+"消费者取消消费接口回调逻辑");        };        boolean autoAck = false;        channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback);    }}

消费者2

import com.aa.Utils.RabbitMqUtils;import com.aa.Utils.SleepUtils;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import java.nio.charset.StandardCharsets;public class Worker02 {    public static final String TASK_QUEUE_NAME = "queueName";    public static void main(String[] args) throws Exception{        Channel channel = RabbitMqUtils.getChannel();        DeliverCallback deliverCallback = (consumerTag, delivery) -> {            System.out.println("C2消息等待消息, 处理时间较长");            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);            SleepUtils.sleep(30);            System.out.println("收到消息 :" + message);            /*             1.消息标记 tag             2.是否批量应答未应答消息            */            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);        };        CancelCallback cancelCallback = (consumerTag) -> {            System.out.println(consumerTag+"消费者取消消费接口回调逻辑");        };        boolean autoAck = false;        channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback);    }}

睡眠工具类

public class SleepUtils {    public static void sleep(int second){        try {            Thread.sleep(1000*second);        } catch (InterruptedException _ignored) {            Thread.currentThread().interrupt();        }    }}

结果

转载地址:http://sbpowy.baihongyu.com/

你可能感兴趣的文章
MSDK手Q邀请透传参数问题:url编解码与base64编解码
查看>>
Unity中C#如何执行cmd命令(System.Diagnostics.Process的使用)
查看>>
C#用正则表达式去匹配被双引号包起来的中文
查看>>
lua table排序
查看>>
Unity发布的ios包在iphone上声音是从听筒里出来的问题
查看>>
UIScrollView复用节点示例
查看>>
Unity 5 AudioMixer
查看>>
Unity 代码混淆: CodeGuard的使用
查看>>
UGUI 列表循环使用
查看>>
使用命令行运行unity并执行某个静态函数(运用于命令行打包和批量打包)
查看>>
web.py框架
查看>>
web.py学习笔记
查看>>
python的代码缩进
查看>>
A* Pathfinding Project (Unity A*寻路插件) 使用教程
查看>>
bash学习笔记
查看>>
sqlite学习
查看>>
手把手教你实现Unity与Android的交互
查看>>
手把手教你使用Unity的Behavior Designer
查看>>
Unity3D摄像机裁剪——NGUI篇
查看>>
lua深拷贝一个table
查看>>