群发消息:消息队列功能的实现由rabbitMQ替换原本的redis队列
parent
6d4722122c
commit
171f0e5089
5
pom.xml
5
pom.xml
|
@ -153,6 +153,11 @@
|
|||
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
|
||||
<version>2.5.2</version>
|
||||
</dependency>
|
||||
<!-- rabbitMQ -->
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-amqp</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
|
|
@ -28,7 +28,7 @@ public class AdminNoticeController {
|
|||
if (isEmptyUtil.strings(title,context)){
|
||||
modelAndView.addObject("msg","必填信息不能为空");
|
||||
}else{
|
||||
noticeService.send(title,context,type);
|
||||
noticeService.sendByRabbitMQ(title,context,type);
|
||||
modelAndView.addObject("msg","系统消息发送成功");
|
||||
}
|
||||
modelAndView.setViewName("redirect:/admin/notice");
|
||||
|
|
|
@ -0,0 +1,63 @@
|
|||
package com.example.jieyue.admin.listener;
|
||||
|
||||
import com.example.jieyue.common.entity.SysNotice;
|
||||
import com.example.jieyue.common.mapper.SysNoticeMapper;
|
||||
import com.rabbitmq.client.Channel;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.rabbit.annotation.*;
|
||||
import org.springframework.amqp.support.AmqpHeaders;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.messaging.handler.annotation.Headers;
|
||||
import org.springframework.messaging.handler.annotation.Payload;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* <p>群发消息队列监听类</p>
|
||||
* @author Bosen
|
||||
* @date 2021/8/8 23:30
|
||||
*/
|
||||
@Component
|
||||
@Slf4j
|
||||
public class NoticeListener {
|
||||
@Autowired
|
||||
SysNoticeMapper noticeMapper;
|
||||
|
||||
@RabbitListener(
|
||||
bindings = @QueueBinding(
|
||||
exchange = @Exchange(name = "notice-exchange"),
|
||||
value = @Queue(value = "notice-queue"),
|
||||
key = "notice.*"
|
||||
)
|
||||
)
|
||||
@RabbitHandler
|
||||
public void sendNotice(
|
||||
@Headers Map<String, Object> headers,
|
||||
@Payload SysNotice notice,
|
||||
Channel channel) throws IOException {
|
||||
if (insertToMysql(notice) != 1) {
|
||||
return;
|
||||
}
|
||||
// 手动确认消费
|
||||
channel.basicAck((long) headers.get(AmqpHeaders.DELIVERY_TAG), false);
|
||||
log.info("Rabbit订阅:群发通知入库成功!");
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>将通知消息插入进mysql</p>
|
||||
*/
|
||||
@RequestMapping("/insert/to/mysql")
|
||||
public int insertToMysql(SysNotice notice) {
|
||||
|
||||
String title = notice.getTitle();
|
||||
String context = notice.getContext();
|
||||
int type = notice.getType();
|
||||
int receive = notice.getReceive();
|
||||
long createTime = notice.getCreateTime();
|
||||
|
||||
return noticeMapper.insert(title, context, type, receive, createTime);
|
||||
}
|
||||
}
|
|
@ -2,11 +2,14 @@ package com.example.jieyue.admin.service;
|
|||
|
||||
import com.example.jieyue.common.entity.SysAdmin;
|
||||
import com.example.jieyue.common.entity.SysMt;
|
||||
import com.example.jieyue.common.entity.SysNotice;
|
||||
import com.example.jieyue.common.entity.SysUser;
|
||||
import com.example.jieyue.common.mapper.SysAdminMapper;
|
||||
import com.example.jieyue.common.mapper.SysMtMapper;
|
||||
import com.example.jieyue.common.mapper.SysNoticeMapper;
|
||||
import com.example.jieyue.common.mapper.SysUserMapper;
|
||||
import org.springframework.amqp.rabbit.connection.CorrelationData;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
@ -14,6 +17,7 @@ import org.springframework.stereotype.Service;
|
|||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
||||
@Service
|
||||
public class AdminNoticeService {
|
||||
|
@ -27,11 +31,13 @@ public class AdminNoticeService {
|
|||
SysNoticeMapper noticeMapper;
|
||||
@Autowired
|
||||
RedisTemplate redisTemplate;
|
||||
@Autowired
|
||||
RabbitTemplate rabbitTemplate;
|
||||
|
||||
/*
|
||||
* 将要发送的消息存入redis消息队列
|
||||
*/
|
||||
public void send(String title,String context,int type){
|
||||
public void sendByRedis(String title,String context,int type){
|
||||
Map<String,String> map = new HashMap<>();
|
||||
map.put("title",title);
|
||||
map.put("context",context);
|
||||
|
@ -69,4 +75,46 @@ public class AdminNoticeService {
|
|||
break;
|
||||
}
|
||||
}
|
||||
|
||||
public void sendByRabbitMQ(String title,String context,int type){
|
||||
switch (type){
|
||||
case 0:
|
||||
// 获取信息
|
||||
List<SysAdmin> adminList = adminMapper.findAll();
|
||||
for (SysAdmin sysAdmin : adminList) {
|
||||
addNotice(title, context, type, sysAdmin.getId());
|
||||
}
|
||||
break;
|
||||
case 1:
|
||||
// 获取信息
|
||||
List<SysMt> merchantList = merchantMapper.findAll();
|
||||
for (SysMt sysMt : merchantList) {
|
||||
addNotice(title, context, type, sysMt.getId());
|
||||
}
|
||||
break;
|
||||
case 2:
|
||||
// 获取信息
|
||||
List<SysUser> userList = userMapper.findAll();
|
||||
for (SysUser sysUser : userList) {
|
||||
addNotice(title, context, type, sysUser.getId());
|
||||
}
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
public int addNotice(String title, String context, int type, int receive) {
|
||||
|
||||
// 获取当前时间戳
|
||||
long createTime = System.currentTimeMillis();
|
||||
|
||||
SysNotice notice = new SysNotice(-1, type, title, context, receive, createTime);
|
||||
|
||||
// 将消息存入队列中
|
||||
rabbitTemplate.convertAndSend("notice-exchange", "notice.keyword", notice,
|
||||
new CorrelationData(UUID.randomUUID().toString()));
|
||||
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,15 @@
|
|||
package com.example.jieyue.common.entity;
|
||||
|
||||
public class SysNotice {
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
@Data
|
||||
public class SysNotice implements Serializable {
|
||||
private int id;
|
||||
private int type;
|
||||
private String title;
|
||||
|
|
|
@ -12,6 +12,7 @@ public interface SysUserMapper {
|
|||
List<SysUser> findLimit(int preNum,int sufNum);
|
||||
int userCount();
|
||||
List<SysUser> findAll();
|
||||
List<Integer> getAllId();
|
||||
int insert(@Param("username") String username,@Param("password") String password,
|
||||
@Param("email") String email,@Param("mark") int mark);
|
||||
SysUser selectByEmail(String email);
|
||||
|
|
|
@ -25,6 +25,17 @@ spring:
|
|||
cluster-name: elasticsearch
|
||||
repositories:
|
||||
enabled: true
|
||||
rabbitmq:
|
||||
addresses: ${site-url}:5672
|
||||
username: guest
|
||||
password: guest
|
||||
virtual-host: /
|
||||
connection-timeout: 15000
|
||||
listener:
|
||||
simple:
|
||||
concurrency: 5
|
||||
max-concurrency: 10
|
||||
acknowledge-mode: manual
|
||||
mvc:
|
||||
static-path-pattern: /*/**
|
||||
mail:
|
||||
|
|
Loading…
Reference in New Issue