1、yml配置
alimq: ProducerId: PRODUCER(mq中定义) ConsumerId: CONSUMER(mq中定义) AccessKey: SecretKey: ONSAddr: http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet SendMsgTimeoutMillis: 3000 topic: TOPIC #mq开关 0-不启动消费 1-启动消费 mqflag: 1 tag: ZC_xxx(mq中定义)2、ali生产者和消费者配置
package common.config;import com.aliyun.openservices.ons.api.PropertyKeyConst;import lombok.Data;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Configuration;import java.util.Properties;/** * @Description: * @Auther: liuyue * @Date: */@Configuration@Datapublic class AliMQConfig { @Value("${alimq.topic}") private String topic; @Value("${alimq.ProducerId}") private String producerId; @Value("${alimq.ConsumerId}") private String consumerId; @Value("${alimq.AccessKey}") private String accesskey; @Value("${alimq.SecretKey}") private String secretkey; @Value("${alimq.ONSAddr}") private String onsaddr; @Value("${alimq.tag}") private String subExpression; //提供消费者的配置 public Properties getConsumerProperties() { Properties consumerProperties = new Properties(); consumerProperties.setProperty(PropertyKeyConst.ConsumerId, consumerId); consumerProperties.setProperty(PropertyKeyConst.AccessKey, accesskey); consumerProperties.setProperty(PropertyKeyConst.SecretKey, secretkey); consumerProperties.setProperty(PropertyKeyConst.ONSAddr, onsaddr); return consumerProperties; } //提供生产者的配置 public Properties getProducerProperties() { Properties producerProperties = new Properties(); producerProperties.setProperty(PropertyKeyConst.ProducerId, producerId); producerProperties.setProperty(PropertyKeyConst.AccessKey, accesskey); producerProperties.setProperty(PropertyKeyConst.SecretKey, secretkey); producerProperties.setProperty(PropertyKeyConst.ONSAddr, onsaddr); return producerProperties; }}
3、消费者监听器
package common.config;import config.alimq.MQMsgConsumer;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Value;import org.springframework.boot.CommandLineRunner;import org.springframework.stereotype.Component;import javax.annotation.Resource;/** * @Description: * @Auther: liuyue * @Date: 2018/11/14 19:31 */@Component@Slf4jpublic class ListenerConfig implements CommandLineRunner { @Resource MQMsgConsumer mqConsumer; @Value("${alimq.mqflag}") private String mqflag; @Override public void run(String... strings) throws Exception { if("0".equals(mqflag)){ log.info("alimq没有开启消费"); }else{ log.info("=======alimq开始消费========="); mqConsumer.start(); mqConsumer.onMessage(); } }}
4、消费者类
package config.alimq;import com.alibaba.fastjson.JSON;import com.aliyun.openservices.ons.api.*;import config.SpringContextHolder;import config.AliMQConfig;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.DisposableBean;import org.springframework.beans.factory.InitializingBean;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import java.io.UnsupportedEncodingException;@SuppressWarnings("all")@Slf4j@Componentpublic class MQMsgConsumer implements InitializingBean, DisposableBean { @Autowired AliMQConfig busMqConfig; private Consumer busConsumer; @Autowired IPanoramaProService panoramaProServiceImpl; @Override public void afterPropertiesSet() throws Exception { log.info("消费者初始化"); busConsumer = ONSFactory.createConsumer(busMqConfig.getConsumerProperties()); // busConsumer.start(); log.info("消费者初始化完成"); } public void start() { busConsumer.start(); } public void onMessage() { busConsumer.subscribe(busMqConfig.getTopic(), busMqConfig.getSubExpression(), new MessageListener() { @Override public Action consume(Message message, ConsumeContext context) { // System.out.println(JSON.toJSONString(message)); System.out.println("Receive: " + message); System.out.println(new String(message.getBody())); Action consumer = consumer(message, context); return consumer; } }); } @Override public void destroy() throws Exception { busConsumer.shutdown(); log.info("消费停止"); } //执行mq消费 public Action consumer(Message message, ConsumeContext context) { //更新审核时间 if("ZC_xxx".equals(message.getTag())) { boolean status = synchroProjectPlanStatus(message, context); if (!status) { return Action.CommitMessage; } } return Action.ReconsumeLater; } /** * 更新审核时间 * @author liu * @since 2018年11月2日 下午2:10:34 * @param message * @param context * @return */ private boolean synchroProjectPlanStatus( Message message, ConsumeContext context ){ boolean bl = false; byte[] msgBody = message.getBody(); if( null != msgBody && msgBody.length > 0 ){ try { String msgBodyStr = new String(msgBody, "UTF-8"); log.info(" THE MQ message body value: " + msgBodyStr); //JSONObject msgJson = JSONObject.parseObject(msgBodyStr); if( null != msgBodyStr ){ //转化为对象 ProjectPlanParas projectPlanParas = JSON.parseObject(msgBodyStr, ProjectPlanParas.class); log.info(" THE ProjectPlanParas value: " + projectPlanParas.getZutuanCode()); log.info(" THE ProjectPlanParas value: " + projectPlanParas.getFinishDate()); //执行更新的操作 bl = panoramaProServiceImpl.synchroProjectPlanStatus(projectPlanParas); log.info(" THE MQ synchroProjectPlanStatus status : " + bl); } } catch (UnsupportedEncodingException e) { log.info(" THE MQ message UnsupportedEncodingException : " + e); e.printStackTrace(); } } return bl; } }
5、生产者类
package config.alimq;import com.aliyun.openservices.ons.api.*;import com.config.AliMQConfig;import lombok.extern.slf4j.Slf4j;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.DisposableBean;import org.springframework.beans.factory.InitializingBean;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import java.util.Date;import java.util.UUID;@Component@Slf4jpublic class MQMsgProducer implements InitializingBean, DisposableBean { @Autowired AliMQConfig busMqConfig; private Producer producer; @Override public void afterPropertiesSet() throws Exception { log.info("生产者初始化"); producer = ONSFactory.createProducer(busMqConfig.getProducerProperties()); producer.start(); } public void sentMessage(Message message) { producer.sendAsync(message, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.info(sendResult.getTopic() + "-----" + sendResult.getMessageId()); } @Override public void onException(OnExceptionContext context) { log.error(context.getTopic() + "-----" + context.getMessageId() + ":error=" + context.getException()); } }); } @Override public void destroy() throws Exception { producer.shutdown(); }}
6、生产者调用类,推送消息,业务代码片段
@Resource MQMsgProducer mqProducer; //修改成,使用alimq更新年景计划的时间 edit by liuy at 20181102日 Message msg = new Message(aliMQConfig.getTopic(), "ZC_xxx", json.getBytes("UTF-8")); mqProducer.sentMessage(msg);至此,全部过程结束。