博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
springboot中rabbitmq的配置和使用【进阶一】
阅读量:6268 次
发布时间:2019-06-22

本文共 8021 字,大约阅读时间需要 26 分钟。

hot3.png

1、yml配置

     

alimq:    ProducerId: PRODUCER(mq中定义)    ConsumerId: CONSUMER(mq中定义)    AccessKey:      SecretKey:     ONSAddr: http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet    SendMsgTimeoutMillis: 3000    topic: TOPIC    #mq开关 0-不启动消费  1-启动消费    mqflag: 1    tag: ZC_xxx(mq中定义)

2、ali生产者和消费者配置

package common.config;import com.aliyun.openservices.ons.api.PropertyKeyConst;import lombok.Data;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Configuration;import java.util.Properties;/** * @Description: * @Auther: liuyue * @Date:  */@Configuration@Datapublic class AliMQConfig {    @Value("${alimq.topic}")    private String topic;    @Value("${alimq.ProducerId}")    private String producerId;    @Value("${alimq.ConsumerId}")    private String consumerId;    @Value("${alimq.AccessKey}")    private String accesskey;    @Value("${alimq.SecretKey}")    private String secretkey;    @Value("${alimq.ONSAddr}")    private String onsaddr;    @Value("${alimq.tag}")    private String subExpression;    //提供消费者的配置    public Properties getConsumerProperties() {        Properties consumerProperties = new Properties();        consumerProperties.setProperty(PropertyKeyConst.ConsumerId, consumerId);        consumerProperties.setProperty(PropertyKeyConst.AccessKey, accesskey);        consumerProperties.setProperty(PropertyKeyConst.SecretKey, secretkey);        consumerProperties.setProperty(PropertyKeyConst.ONSAddr, onsaddr);        return consumerProperties;    }    //提供生产者的配置    public Properties getProducerProperties() {        Properties producerProperties = new Properties();        producerProperties.setProperty(PropertyKeyConst.ProducerId, producerId);        producerProperties.setProperty(PropertyKeyConst.AccessKey, accesskey);        producerProperties.setProperty(PropertyKeyConst.SecretKey, secretkey);        producerProperties.setProperty(PropertyKeyConst.ONSAddr, onsaddr);        return producerProperties;    }}

3、消费者监听器

package common.config;import config.alimq.MQMsgConsumer;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Value;import org.springframework.boot.CommandLineRunner;import org.springframework.stereotype.Component;import javax.annotation.Resource;/** * @Description: * @Auther: liuyue * @Date: 2018/11/14 19:31 */@Component@Slf4jpublic class ListenerConfig implements CommandLineRunner {    @Resource    MQMsgConsumer mqConsumer;    @Value("${alimq.mqflag}")    private String mqflag;    @Override    public void run(String... strings) throws Exception {        if("0".equals(mqflag)){            log.info("alimq没有开启消费");        }else{            log.info("=======alimq开始消费=========");            mqConsumer.start();            mqConsumer.onMessage();        }    }}

4、消费者类

package config.alimq;import com.alibaba.fastjson.JSON;import com.aliyun.openservices.ons.api.*;import config.SpringContextHolder;import config.AliMQConfig;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.DisposableBean;import org.springframework.beans.factory.InitializingBean;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import java.io.UnsupportedEncodingException;@SuppressWarnings("all")@Slf4j@Componentpublic class MQMsgConsumer implements InitializingBean, DisposableBean {    @Autowired    AliMQConfig busMqConfig;    private Consumer busConsumer;    @Autowired    IPanoramaProService panoramaProServiceImpl;    @Override    public void afterPropertiesSet() throws Exception {        log.info("消费者初始化");        busConsumer = ONSFactory.createConsumer(busMqConfig.getConsumerProperties());        // busConsumer.start();        log.info("消费者初始化完成");    }    public void start() {        busConsumer.start();    }    public void onMessage() {        busConsumer.subscribe(busMqConfig.getTopic(), busMqConfig.getSubExpression(), new MessageListener() {            @Override            public Action consume(Message message, ConsumeContext context) {                // System.out.println(JSON.toJSONString(message));                System.out.println("Receive: " + message);                System.out.println(new String(message.getBody()));                Action consumer = consumer(message, context);                return consumer;            }        });    }    @Override    public void destroy() throws Exception {        busConsumer.shutdown();        log.info("消费停止");    }    //执行mq消费    public Action consumer(Message message, ConsumeContext context) {        //更新审核时间         if("ZC_xxx".equals(message.getTag()))        {            boolean status = synchroProjectPlanStatus(message, context);            if (!status) {                return Action.CommitMessage;            }        }        return Action.ReconsumeLater;    }    /**     * 更新审核时间      * @author liu     * @since 2018年11月2日 下午2:10:34     * @param message     * @param context     * @return     */    private boolean synchroProjectPlanStatus( Message message, ConsumeContext context ){        boolean bl = false;        byte[] msgBody = message.getBody();        if( null != msgBody && msgBody.length > 0 ){            try {                String msgBodyStr = new String(msgBody, "UTF-8");                log.info(" THE MQ message body value: " + msgBodyStr);                //JSONObject msgJson = JSONObject.parseObject(msgBodyStr);                                if( null != msgBodyStr ){                    //转化为对象                    ProjectPlanParas projectPlanParas =                            JSON.parseObject(msgBodyStr, ProjectPlanParas.class);                    log.info(" THE ProjectPlanParas   value: " + projectPlanParas.getZutuanCode());                    log.info(" THE ProjectPlanParas   value: " + projectPlanParas.getFinishDate());                    //执行更新的操作                    bl = panoramaProServiceImpl.synchroProjectPlanStatus(projectPlanParas);                    log.info(" THE MQ synchroProjectPlanStatus status : " + bl);                }            } catch (UnsupportedEncodingException e) {                log.info(" THE MQ message UnsupportedEncodingException : " + e);                e.printStackTrace();            }                    }        return bl;    }    }

5、生产者类

package config.alimq;import com.aliyun.openservices.ons.api.*;import com.config.AliMQConfig;import lombok.extern.slf4j.Slf4j;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.DisposableBean;import org.springframework.beans.factory.InitializingBean;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import java.util.Date;import java.util.UUID;@Component@Slf4jpublic class MQMsgProducer implements InitializingBean, DisposableBean {    @Autowired    AliMQConfig busMqConfig;    private Producer producer;    @Override    public void afterPropertiesSet() throws Exception {        log.info("生产者初始化");        producer = ONSFactory.createProducer(busMqConfig.getProducerProperties());        producer.start();    }    public void sentMessage(Message message) {        producer.sendAsync(message, new SendCallback() {            @Override            public void onSuccess(SendResult sendResult) {                log.info(sendResult.getTopic() + "-----" + sendResult.getMessageId());            }            @Override            public void onException(OnExceptionContext context) {                log.error(context.getTopic() + "-----" + context.getMessageId() + ":error=" + context.getException());            }        });    }    @Override    public void destroy() throws Exception {        producer.shutdown();    }}

6、生产者调用类,推送消息,业务代码片段

@Resource    MQMsgProducer mqProducer;    //修改成,使用alimq更新年景计划的时间 edit by liuy at 20181102日     Message msg = new Message(aliMQConfig.getTopic(),                "ZC_xxx", json.getBytes("UTF-8"));     mqProducer.sentMessage(msg);

    至此,全部过程结束。
 

转载于:https://my.oschina.net/maojindaoGG/blog/3018480

你可能感兴趣的文章
leetcode 题解 || Valid Parentheses 问题
查看>>
将图片转成base64字符串并在JSP页面显示的Java代码
查看>>
什么是WeakHashMap--转
查看>>
js 面试题
查看>>
第二十二节,三元运算
查看>>
Yacc 与 Lex 快速入门
查看>>
Unity中HDR外发光的使用
查看>>
Flume负载均衡配置
查看>>
Ajax详解
查看>>
Ubuntu C/C++开发环境的安装和配置
查看>>
百世汇通快递地区选择插件,单独剥离
查看>>
Linux系统调用---同步IO: sync、fsync与fdatasync【转】
查看>>
【MyBatis学习06】输入映射和输出映射
查看>>
[LeetCode] Decode String 解码字符串
查看>>
数字逻辑的一些基本运算和概念
查看>>
ant重新编译打包hadoop-core-1.2.1.jar时遇到的错
查看>>
【★★★★★】提高PHP代码质量的36个技巧
查看>>
3 weekend110的配置hadoop(格式化) + 一些问题解决 + 未免密码配置
查看>>
JavaScript Creating 对象
查看>>
Java compiler level does not match the version of the installed Java project facet.(转)
查看>>