From 171f0e5089b46c0f37dc6d6d248aadb37f9a9005 Mon Sep 17 00:00:00 2001 From: Bosen Date: Mon, 9 Aug 2021 00:00:14 +0800 Subject: [PATCH] =?UTF-8?q?=E7=BE=A4=E5=8F=91=E6=B6=88=E6=81=AF=EF=BC=9A?= =?UTF-8?q?=E6=B6=88=E6=81=AF=E9=98=9F=E5=88=97=E5=8A=9F=E8=83=BD=E7=9A=84?= =?UTF-8?q?=E5=AE=9E=E7=8E=B0=E7=94=B1rabbitMQ=E6=9B=BF=E6=8D=A2=E5=8E=9F?= =?UTF-8?q?=E6=9C=AC=E7=9A=84redis=E9=98=9F=E5=88=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 5 ++ .../controller/AdminNoticeController.java | 2 +- .../jieyue/admin/listener/NoticeListener.java | 63 +++++++++++++++++++ .../admin/service/AdminNoticeService.java | 50 ++++++++++++++- .../jieyue/common/entity/SysNotice.java | 11 +++- .../jieyue/common/mapper/SysUserMapper.java | 1 + src/main/resources/application.yml | 11 ++++ 7 files changed, 140 insertions(+), 3 deletions(-) create mode 100644 src/main/java/com/example/jieyue/admin/listener/NoticeListener.java diff --git a/pom.xml b/pom.xml index 3d89f0b..28de8c1 100644 --- a/pom.xml +++ b/pom.xml @@ -153,6 +153,11 @@ spring-boot-starter-data-elasticsearch 2.5.2 + + + org.springframework.boot + spring-boot-starter-amqp + diff --git a/src/main/java/com/example/jieyue/admin/controller/AdminNoticeController.java b/src/main/java/com/example/jieyue/admin/controller/AdminNoticeController.java index 94302cb..319318d 100644 --- a/src/main/java/com/example/jieyue/admin/controller/AdminNoticeController.java +++ b/src/main/java/com/example/jieyue/admin/controller/AdminNoticeController.java @@ -28,7 +28,7 @@ public class AdminNoticeController { if (isEmptyUtil.strings(title,context)){ modelAndView.addObject("msg","必填信息不能为空"); }else{ - noticeService.send(title,context,type); + noticeService.sendByRabbitMQ(title,context,type); modelAndView.addObject("msg","系统消息发送成功"); } modelAndView.setViewName("redirect:/admin/notice"); diff --git a/src/main/java/com/example/jieyue/admin/listener/NoticeListener.java b/src/main/java/com/example/jieyue/admin/listener/NoticeListener.java new file mode 100644 index 0000000..47912e3 --- /dev/null +++ b/src/main/java/com/example/jieyue/admin/listener/NoticeListener.java @@ -0,0 +1,63 @@ +package com.example.jieyue.admin.listener; + +import com.example.jieyue.common.entity.SysNotice; +import com.example.jieyue.common.mapper.SysNoticeMapper; +import com.rabbitmq.client.Channel; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.annotation.*; +import org.springframework.amqp.support.AmqpHeaders; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.messaging.handler.annotation.Headers; +import org.springframework.messaging.handler.annotation.Payload; +import org.springframework.stereotype.Component; +import org.springframework.web.bind.annotation.RequestMapping; + +import java.io.IOException; +import java.util.Map; + +/** + *

群发消息队列监听类

+ * @author Bosen + * @date 2021/8/8 23:30 + */ +@Component +@Slf4j +public class NoticeListener { + @Autowired + SysNoticeMapper noticeMapper; + + @RabbitListener( + bindings = @QueueBinding( + exchange = @Exchange(name = "notice-exchange"), + value = @Queue(value = "notice-queue"), + key = "notice.*" + ) + ) + @RabbitHandler + public void sendNotice( + @Headers Map headers, + @Payload SysNotice notice, + Channel channel) throws IOException { + if (insertToMysql(notice) != 1) { + return; + } + // 手动确认消费 + channel.basicAck((long) headers.get(AmqpHeaders.DELIVERY_TAG), false); + log.info("Rabbit订阅:群发通知入库成功!"); + } + + /** + *

将通知消息插入进mysql

+ */ + @RequestMapping("/insert/to/mysql") + public int insertToMysql(SysNotice notice) { + + String title = notice.getTitle(); + String context = notice.getContext(); + int type = notice.getType(); + int receive = notice.getReceive(); + long createTime = notice.getCreateTime(); + + return noticeMapper.insert(title, context, type, receive, createTime); + } +} diff --git a/src/main/java/com/example/jieyue/admin/service/AdminNoticeService.java b/src/main/java/com/example/jieyue/admin/service/AdminNoticeService.java index baee366..d3b57cd 100644 --- a/src/main/java/com/example/jieyue/admin/service/AdminNoticeService.java +++ b/src/main/java/com/example/jieyue/admin/service/AdminNoticeService.java @@ -2,11 +2,14 @@ package com.example.jieyue.admin.service; import com.example.jieyue.common.entity.SysAdmin; import com.example.jieyue.common.entity.SysMt; +import com.example.jieyue.common.entity.SysNotice; import com.example.jieyue.common.entity.SysUser; import com.example.jieyue.common.mapper.SysAdminMapper; import com.example.jieyue.common.mapper.SysMtMapper; import com.example.jieyue.common.mapper.SysNoticeMapper; import com.example.jieyue.common.mapper.SysUserMapper; +import org.springframework.amqp.rabbit.connection.CorrelationData; +import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; @@ -14,6 +17,7 @@ import org.springframework.stereotype.Service; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; @Service public class AdminNoticeService { @@ -27,11 +31,13 @@ public class AdminNoticeService { SysNoticeMapper noticeMapper; @Autowired RedisTemplate redisTemplate; + @Autowired + RabbitTemplate rabbitTemplate; /* * 将要发送的消息存入redis消息队列 */ - public void send(String title,String context,int type){ + public void sendByRedis(String title,String context,int type){ Map map = new HashMap<>(); map.put("title",title); map.put("context",context); @@ -69,4 +75,46 @@ public class AdminNoticeService { break; } } + + public void sendByRabbitMQ(String title,String context,int type){ + switch (type){ + case 0: + // 获取信息 + List adminList = adminMapper.findAll(); + for (SysAdmin sysAdmin : adminList) { + addNotice(title, context, type, sysAdmin.getId()); + } + break; + case 1: + // 获取信息 + List merchantList = merchantMapper.findAll(); + for (SysMt sysMt : merchantList) { + addNotice(title, context, type, sysMt.getId()); + } + break; + case 2: + // 获取信息 + List userList = userMapper.findAll(); + for (SysUser sysUser : userList) { + addNotice(title, context, type, sysUser.getId()); + } + break; + default: + break; + } + } + + public int addNotice(String title, String context, int type, int receive) { + + // 获取当前时间戳 + long createTime = System.currentTimeMillis(); + + SysNotice notice = new SysNotice(-1, type, title, context, receive, createTime); + + // 将消息存入队列中 + rabbitTemplate.convertAndSend("notice-exchange", "notice.keyword", notice, + new CorrelationData(UUID.randomUUID().toString())); + + return 1; + } } diff --git a/src/main/java/com/example/jieyue/common/entity/SysNotice.java b/src/main/java/com/example/jieyue/common/entity/SysNotice.java index 1da8245..deb030a 100644 --- a/src/main/java/com/example/jieyue/common/entity/SysNotice.java +++ b/src/main/java/com/example/jieyue/common/entity/SysNotice.java @@ -1,6 +1,15 @@ package com.example.jieyue.common.entity; -public class SysNotice { +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serializable; + +@AllArgsConstructor +@NoArgsConstructor +@Data +public class SysNotice implements Serializable { private int id; private int type; private String title; diff --git a/src/main/java/com/example/jieyue/common/mapper/SysUserMapper.java b/src/main/java/com/example/jieyue/common/mapper/SysUserMapper.java index 0159d80..3278240 100644 --- a/src/main/java/com/example/jieyue/common/mapper/SysUserMapper.java +++ b/src/main/java/com/example/jieyue/common/mapper/SysUserMapper.java @@ -12,6 +12,7 @@ public interface SysUserMapper { List findLimit(int preNum,int sufNum); int userCount(); List findAll(); + List getAllId(); int insert(@Param("username") String username,@Param("password") String password, @Param("email") String email,@Param("mark") int mark); SysUser selectByEmail(String email); diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 6a491f3..e59eb1d 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -25,6 +25,17 @@ spring: cluster-name: elasticsearch repositories: enabled: true + rabbitmq: + addresses: ${site-url}:5672 + username: guest + password: guest + virtual-host: / + connection-timeout: 15000 + listener: + simple: + concurrency: 5 + max-concurrency: 10 + acknowledge-mode: manual mvc: static-path-pattern: /*/** mail: