Springboot Rabbitmq 使用Jackson2JsonMessageConverter 消息传递后转对象

Springboot为了应对高并发,接入了消息队列Rabbitmq,第一版验证时使用简单消费队列:

//发送端
AbstractOrder order =new Order();
rabbitmqTemplate.convertAndSend(order);

//消费端
public void recieved(AbstractOrder order){
   log.info("recieved order:"+order);
   //处理逻辑
}

第二版为了应对可能出行的处理失败,使用了Rabbitmq的Ack

下面是最终版代码:

//发送端 //把订单加入队列
    public void convertAndSendOrder(AbstractOrder order){

        rabbitmqTemplate.setMandatory(true);
        rabbitmqTemplate.setConfirmCallback(confirmCallback);
        rabbitmqTemplate.setReturnCallback(returnCallback);
        //全局唯一 不然ReturnCallback 无效
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

        rabbitmqTemplate.convertAndSend("exchange-order","rkey-orderadd",order,correlationData);
    }
//消费端逻辑    @RabbitListener(bindings = {
            @QueueBinding(value = @Queue(value = "queue-order"),
                    exchange = @Exchange(value = "exchange-order"),
                    key = "rkey-order")},containerFactory="rabbitListenerContainerFactory")
    public void recieved(Message messageorigin, Channel channel){
        log.info("recieved message:"+messageorigin);
        boolean success = false;

        Order order =null;

        try {
            Jackson2JsonMessageConverter jackson2JsonMessageConverter =new Jackson2JsonMessageConverter();
            order = (Order)jackson2JsonMessageConverter.fromMessage(messageorigin);

        }catch (Exception e){
            log.info("get order fromBytes message exception"+e.getMessage());
        }
        
        //处理逻辑
        //回调成功确认消息
        if(success){
            //成功确认消息
            try{
                channel.basicAck(messageorigin.getMessageProperties().getDeliveryTag(),false);
            }catch (IOException e){
                log.info("basicAck ex:"+e.getMessage());
                try{
                    channel.basicAck(messageorigin.getMessageProperties().getDeliveryTag(),false);
                }catch (IOException ee){
                    log.info("again basicAck ex:"+ee.getMessage());

                }
            }
        }

注意:如果在received 中还像第一版直接转自定义对象,消息进程会报错

org.springframework.amqp.AmqpException: No method found for class com.xxx.Order

解决方案是使用Jackson2JsonMessageConverter 。在发送消息时,它会先将自定义的消息类序列化成json格式,再转成byte构造 Message

//发送 设置Converter
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(new Jackson2JsonMessageConverter());
        return template;
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        return factory;
    }//消费时 指定Converter
    @RabbitListener(bindings = {
            @QueueBinding(value = @Queue(value = "queue-order"),
                    exchange = @Exchange(value = "exchange-order"),
                    key = "rkey-order")},containerFactory="rabbitListenerContainerFactory")

但是直接使用Jackson2JsonMessageConverter后,反序列化时要求发送的类和接受的类完全一样(字段,类名,包路径)。

查了下 文档 https://docs.spring.io/spring-amqp/api/org/springframework/amqp/support/converter/Jackson2JsonMessageConverter.html

这里直接使用jackson2JsonMessageConverter.fromMessage的方法从Message拿出来。

 

ACK完成