一、内存存储
咱们先来设计内存中的如何存储数据.
为了保证线程安全,统一使用 线程安全的 HashMap ConcurrentHashMap.
交换机存储结构
实现效果:通过 交换机的名字(交换机的身份标识) 即可 查询到对应的交换机对象
队列存储结构
实现效果:通过 队列的名字(队列的身份标识) 即可 查询到对应的队列对象
绑定关系存储结构
实现效果:通过 交换机的名字(交换机的身份标识) 即可 查询到所有与该交换机存在绑定关系的队列对象与绑定关系.
因为一个交换机可能绑定了多个队列,
所以以 交换机的名字为 key,value 是一个hashMap用来表示所有与该交换机存在绑定关系的队列,
这个hashMap以队列名字为key,Binding对象为 value.
所有消息信息存储结构
实现效果:通过 消息的id(消息的身份标识) 即可 查询到 所有的 消息对象
队列对应的消息信息存储结构
实现效果:通过 队列的名字(队列的身份标识) 即可 查询到该队列中所有的消息对象
被取走但未应答的消息存储结构
实现效果:通过 队列的名字(队列的身份标识) 即可 查询到该队列中所有的被取走未被应答消息对象(且可以根据消息id再查询到对应的消息)
二、编写代码
创建一个 MemoryDataCenter 来统一管理内存数据
/** * 使用这个类来统一管理内存中的所有数据 * 该类后续提供的一些方法,可能会在多线程环境下被使用,因此要注意线程安全问题 */
public class MemoryDataCenter {
// 存储 交换机信息,key 是 exchangeName,value 是 Exchange 对象
private ConcurrentHashMap<String, Exchange> exchangeMap = new ConcurrentHashMap<>();
// 存储 队列信息,key 是 queueName,value 是 MSGQueue 对象
private ConcurrentHashMap<String, MSGQueue> queueMap = new ConcurrentHashMap<>();
// 存储 交换机队列的绑定关系,第一个 key 是 exchangeName 第二个 key 是 queueName
private ConcurrentHashMap<String,ConcurrentHashMap<String, Binding>> bingdingsMap = new ConcurrentHashMap<>();
// 存储 所有消息信息,key 是 messageId,value 是 Message 对象
private ConcurrentHashMap<String, Message> messagesMap = new ConcurrentHashMap<>();
// 存储 队列对应的消息信息,第一个 key 是 queueName,value 是 一个链表
private ConcurrentHashMap<String, LinkedList<Message>> queueMessageMap = new ConcurrentHashMap<>();
// 存储 被取走但未应答的消息,第一个 key 是 queueName,第二个 key 是 messageId
private ConcurrentHashMap<String,ConcurrentHashMap<String,Message>> queueMessageWaitAckMap = new ConcurrentHashMap<>();
// 添加交换机
public void insertExchange(Exchange exchange) {
exchangeMap.put(exchange.getName(), exchange);
System.out.println("[MemoryDataCenter] 新交换机添加成功 exchangeName=" + exchange.getName());
}
// 根据交换机名字得到一个交换机
public Exchange getExchange(String exchangeName) {
return exchangeMap.get(exchangeName);
}
// 销毁一个交换机
public void deleteExchange(String exchangeName) {
exchangeMap.remove(exchangeName);
System.out.println("[MemoryDataCenter] 交换机删除成功 exchangeName=" + exchangeName);
}
// 添加队列
public void insertQueue(MSGQueue queue) {
queueMap.put(queue.getName(),queue);
System.out.println("[MemoryDataCenter] 新队列添加成功 queueName=" + queue.getName());
}
// 根据队列名字得到一个队列
public MSGQueue getQueue(String queueName) {
return queueMap.get(queueName);
}
// 销毁一个队列
public void deleteQueue(String queueName) {
queueMap.remove(queueName);
System.out.println("[MemoryDataCenter] 队列删除成功 queueName=" + queueName);
}
// 添加一个binding
public void insertBinding(Binding binding) throws MqException {
// 思路:
// 1.先根据 exangeName 去 bingdingsMap 中查找,看该交换机是否存在绑定关系
// 如不存在,则给该交换机 创建新的 HashMap 插入到 bingdingsMap,
// 2.找到该交换机的绑定关系的 HashMap bindingMap 后,
// 去 bindingMap 中查找该交换机是否与 queueName 存在绑定关系,
// 如果有则已经存在绑定关系,无法继续绑定,抛出异常
// 如果没有则 将 queueName 作为 key 与 binding 作为 value 插入到 该交换机的绑定关系中 bindingMap
// computeIfAbsent 方法做的是:先从 bingdingsMap 中去查找是否存在,binding.getExchangeName()
// 存在则直接返回,不存在则将 第二个参数 lambda 表达式的返回结果 直接插入到调用 HashMap 中(bingdingsMap)
ConcurrentHashMap<String,Binding> bindingMap = bingdingsMap.computeIfAbsent(binding.getExchangeName(),
k -> new ConcurrentHashMap<>());
// 咱们此处要先判定,该交换机与该队列是否存在绑定关系,存在则是意料之外的情况抛出异常,不存在则插入绑定.
// 如果一个线程,正在执行插入操作但未插入,然后调度到另一个线程,正在判断是否存在绑定关系,就会判断不存在,
// 就会出现交换机与队列存在两个绑定关系
// 因此我们针对 该交换机的所有绑定关系的 bindingMap 进行加锁,以保证线程安全
synchronized (bindingMap) {
if (bindingMap.get(binding.getQueueName()) != null) {
throw new MqException("[MemoryDataCenter] 绑定已经存在! exchangeName=" + binding.getExchangeName() +
", queueName=" + binding.getQueueName());
}
bindingMap.put(binding.getQueueName(),binding);
}
System.out.println("[MemoryDataCenter] 新绑定添加成功! exchangeName=" + binding.getExchangeName()
+ ", queueName=" + binding.getQueueName());
}
// 获取binding,写两个版本:
// 1. 根据 exchangeName 和 queueName 确定唯一一个 Binding
// 2. 根据 exchangeName 获取到所有的 Binding
public Binding getBinding(String exchanName,String queueName) {
ConcurrentHashMap<String,Binding> bindingMap = bingdingsMap.get(exchanName);
if (bindingMap == null) {
return null;
}
return bindingMap.get(queueName);
}
public ConcurrentHashMap<String,Binding> getBinding(String exchanName) {
return bingdingsMap.get(exchanName);
}
// 销毁 binding
public void deleteBinding(Binding binding) throws MqException {
ConcurrentHashMap<String,Binding> bindingMap = bingdingsMap.get(binding.getExchangeName());
if (bindingMap == null) {
// 该交换机没有绑定任何队列,抛出异常
throw new MqException("[MemoryDataCenter] 绑定不存在! exchangeName=" + binding.getExchangeName()
+ ", queueName=" + binding.getQueueName());
}
bindingMap.remove(binding.getQueueName());
System.out.println("[MemoryDataCenter] 绑定删除成功 exchangeName=" + binding.getExchangeName());
}
// 添加消息
public void addMessage(Message message ) {
messagesMap.put(message.getMessageId(), message);
System.out.println("[MemoryDataCenter] 新消息添加成功 messageId=" + message.getMessageId());
}
// 根据 id 查询消息
public Message getMessage(String messageId) {
return messagesMap.get(messageId);
}
// 根据 id 删除消息
public void removeMessage(String messageId) {
messagesMap.remove(messageId);
System.out.println("[MemoryDataCenter] 消息删除成功 messageId=" + messageId);
}
// 发送消息到指定队列
public void sendMessage(MSGQueue queue,Message message) {
// 把消息放到对应的队列数据结构中
// 先根据队列的名字, 找到该队列对应的消息链表
LinkedList<Message> messages = queueMessageMap.computeIfAbsent(queue.getName(), k -> new LinkedList<>());
// 再把数据加到 messages 里面
// 如果多个线程同时向这个链表,添加消息,就可能会覆盖添加的数据,因此针对这个链表进行加锁
synchronized(messages) {
messages.add(message);
}
// 在这里把该消息也往消息中心中插入一下. 假设如果 message 已经在消息中心存在, 重复插入也没关系.
// 主要就是相同 messageId, 对应的 message 的内容一定是一样的. (服务器代码不会对 Message 内容做修改 basicProperties 和 body)
addMessage(message);
System.out.println("[MemoryDataCenter] 新消息被投递到队列中 messageId=" + message.getMessageId());
}
// 从队列中取消息
public Message pollMessage(String queueName) {
LinkedList<Message> messages = queueMessageMap.get(queueName);
if (messages == null) {
return null;
}
synchronized (messages) {
if (messages.size() == 0) {
return null;
}
Message currentMessage = messages.remove(0);
System.out.println("[MemoryDataCenter] 消息从队列中取出 messageId=" + currentMessage.getMessageId());
return currentMessage;
}
}
// 获取指定队列中消息的个数
public int getMessageCount(String queueName) {
LinkedList<Message> messages = queueMessageMap.get(queueName);
if (messages == null) {
// 队列中无消息
return 0;
}
synchronized (messages) {
return messages.size();
}
}
// 添加未确认的消息
public void addMessageWaitAck(String queueName,Message message) {
ConcurrentHashMap<String,Message> messageHashMap = queueMessageWaitAckMap.computeIfAbsent(queueName,k -> new ConcurrentHashMap<>());
messageHashMap.put(message.getMessageId(),message);
System.out.println("[MemoryDataCenter] 消息进入待确认队列 messageId=" + message.getMessageId());
}
// 删除未确认消息(消息已经确认)
public void removeMessageWaitAck(String queueName,String messageId) {
ConcurrentHashMap<String,Message> messageHashMap = queueMessageWaitAckMap.get(queueName);
if (messageHashMap == null) {
return;
}
messageHashMap.remove(messageId);
System.out.println("[MemoryDataCenter] 消息从待确认队列删除 messageId=" + messageId);
}
// 获取指定的未确认的消息
public Message getMessageWaitAck(String queueNmae,String messageId) {
ConcurrentHashMap<String,Message> messageHashMap = queueMessageWaitAckMap.get(queueNmae);
if (messageHashMap == null) {
return null;
}
return messageHashMap.get(messageId);
}
// 将硬盘中的数据 恢复到内存中
public void recovery(DiskDataCenter diskDataCenter) throws IOException, MqException, ClassNotFoundException {
// 0.清空之前的所有数据
exchangeMap.clear();
queueMap.clear();
bingdingsMap.clear();
messagesMap.clear();
queueMessageMap.clear();
// 1.恢复所有的交换机数据
List<Exchange> exchangeList = diskDataCenter.selectAllExchanges();
for (Exchange exchange : exchangeList) {
exchangeMap.put(exchange.getName(),exchange);
}
// 2.恢复所有的队列数据
List<MSGQueue> queueList = diskDataCenter.selectAllQueues();
for (MSGQueue queue : queueList) {
queueMap.put(queue.getName(), queue);
}
// 3.恢复所有的绑定数据
List<Binding> bindingList = diskDataCenter.selectAllBindings();
for (Binding binding : bindingList) {
ConcurrentHashMap<String,Binding> bindingMap = bingdingsMap.computeIfAbsent(binding.getExchangeName(), k -> new ConcurrentHashMap<>());
bindingMap.put(binding.getQueueName(),binding);
}
// 4.恢复所有的消息数据
// 遍历所有的队列, 根据每个队列的名字, 获取到所有的消息.
for (MSGQueue queue : queueList) {
LinkedList<Message> messages = diskDataCenter.loadAllMessageFromQueue(queue.getName());
queueMessageMap.put(queue.getName(),messages);
for (Message message : messages) {
messagesMap.put(message.getMessageId(), message);
}
}
// 注意!! 针对 "未确认的消息" 这部分内存中的数据, 不需要从硬盘恢复. 之前考虑硬盘存储的时候, 也没设定这一块.
// 一旦在等待 ack 的过程中, 服务器重启了, 此时这些 "未被确认的消息", 就恢复成 "未被取走的消息" .
// 这个消息在硬盘上存储的时候, 就是当做 "未被取走"
}
}
文章评论