本文共 3157 字,大约阅读时间需要 10 分钟。
目录
生产者
public class Task02 { private static final String TASK_QUEUE_NAME = "ack_queue"; public static void main(String[] argv) throws Exception { try (Channel channel = RabbitMqUtils.getChannel()) { channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null); Scanner sc = new Scanner(System.in); System.out.println("请输入信息"); while (sc.hasNext()) { String message = sc.nextLine(); channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("UTF-8")); System.out.println("生产者发出消息" + message); } } }}
消费者1
import com.aa.Utils.RabbitMqUtils;import com.aa.Utils.SleepUtils;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import java.nio.charset.StandardCharsets;public class Worker01 { public static final String TASK_QUEUE_NAME = "queueName"; public static void main(String[] args) throws Exception{ Channel channel = RabbitMqUtils.getChannel(); DeliverCallback deliverCallback = (consumerTag, delivery) -> { System.out.println("C1消息等待消息, 处理时间较短"); String message = new String(delivery.getBody(), StandardCharsets.UTF_8); SleepUtils.sleep(1); System.out.println("收到消息 :" + message); /* 是否批量发应答消息 1.消息标记 tag 2.是否批量应答未应答消息 */ channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }; CancelCallback cancelCallback = (consumerTag) -> { System.out.println(consumerTag+"消费者取消消费接口回调逻辑"); }; boolean autoAck = false; channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback); }}
消费者2
import com.aa.Utils.RabbitMqUtils;import com.aa.Utils.SleepUtils;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import java.nio.charset.StandardCharsets;public class Worker02 { public static final String TASK_QUEUE_NAME = "queueName"; public static void main(String[] args) throws Exception{ Channel channel = RabbitMqUtils.getChannel(); DeliverCallback deliverCallback = (consumerTag, delivery) -> { System.out.println("C2消息等待消息, 处理时间较长"); String message = new String(delivery.getBody(), StandardCharsets.UTF_8); SleepUtils.sleep(30); System.out.println("收到消息 :" + message); /* 1.消息标记 tag 2.是否批量应答未应答消息 */ channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }; CancelCallback cancelCallback = (consumerTag) -> { System.out.println(consumerTag+"消费者取消消费接口回调逻辑"); }; boolean autoAck = false; channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback); }}
睡眠工具类
public class SleepUtils { public static void sleep(int second){ try { Thread.sleep(1000*second); } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } }}
结果
转载地址:http://sbpowy.baihongyu.com/