当前位置:网站首页>Shh! Is this really good for asynchronous events?
Shh! Is this really good for asynchronous events?
2020-11-06 01:33:42 【Yin Jihuan】
The story background
At the beginning of this year, I wrote an article 《 gather and watch : The process of internal decoupling based on event mechanism 》. This article is mainly about using ES Heterogeneous data scenarios . Program subscription Mysql Binlog Changes , Then the program uses Spring Event To distribute specific events , Because the data change of a table may need to update more than one ES Indexes .
In order to make it easier for you to understand, I copied the pictures of the previous scheme , as follows :
There is a problem with the scheme in the figure above , That's what we're going to talk about today .
The problem is when MQ Consumer After receiving the message , Just post it directly Event 了 , If it's synchronous , No problem . If a EventListener Failed to process in , So this message will not ACK.
If it's an asynchronous release Event Scene , As soon as the news is released ACK 了 . Even if someone EventListener Failed to process in ,MQ I can't feel it , There will be no redelivery of messages , That's the problem .
Solution
Scheme 1
Now that the news has ACK 了 , Then don't use MQ The retrying function of , Can users try again by themselves ?
It's certainly possible , Whether the internal processing is successful or not is certain to be known , If the processing fails, you can try again by default , Or try again with a certain strategy . I can't, but I can drop it in the library , Keep records .
The problem is that it's too boring , Every place you use it has to do it , And for the program brother who will take over your code in the future , This is likely to make the little brother's hair fall off slowly ....
It doesn't matter if you fall off , The point is that he doesn't know what to do with it , Maybe one day I'll be carrying the pot , Pathetic ....
Option two
To ensure the consistency of messages and business processing , You can't do it right away ACK operation . It's about waiting for the business to complete before deciding whether to ACK.
If there is a failure to handle, it should not ACK, So you can reuse MQ The retrial mechanism of .
Analyze it , This is a typical asynchronous to synchronous scenario . image Dubbo There's also this scene in , So we can learn from Dubbo In the realization of ideas .
Create a DefaultFuture It is used to synchronously wait to get the result of task execution . And then in MQ Where to consume DefaultFuture.
@Service
@RocketMQMessageListener(topic = "${rocketmq.topic.data_change}", consumerGroup = "${rocketmq.group.data_change_consumer}")
public class DataChangeConsume implements RocketMQListener<DataChangeRequest> {
@Autowired
private ApplicationContext applicationContext;
@Autowired
private CustomApplicationContextAware customApplicationContextAware;
@Override
public void onMessage(DataChangeRequest dataChangeRequest) {
log.info("received message {} , Thread {}", dataChangeRequest, Thread.currentThread().getName());
DataChangeEvent event = new DataChangeEvent(this);
event.setChangeType(dataChangeRequest.getChangeType());
event.setTable(dataChangeRequest.getTable());
event.setMessageId(dataChangeRequest.getMessageId());
DefaultFuture defaultFuture = DefaultFuture.newFuture(dataChangeRequest, customApplicationContextAware.getTaskCount(), 6000 * 10);
applicationContext.publishEvent(event);
Boolean result = defaultFuture.get();
log.info("MessageId {} Processing results {}", dataChangeRequest.getMessageId(), result);
if (!result) {
throw new RuntimeException(" Processing failed , No news ACK, Wait for the next time to try again ");
}
}
}
newFuture() Event parameters are passed in , Timeout time , Number of tasks several parameters . The number of tasks is used to judge all EventListener Is it all done .
defaultFuture.get(); It doesn't block , Wait for all tasks to complete before returning the result , If all the business is successful , So it will return true, End of the process , Message automatically ACK.
If you return false Prove that there are processing failures or timeouts , There is no need to ACK 了 , Throw an exception and wait to try again .
public Boolean get() {
if (isDone()) {
return true;
}
long start = System.currentTimeMillis();
lock.lock();
try {
while (!isDone()) {
done.await(timeout, TimeUnit.MILLISECONDS);
// There is feedback from failed tasks
if (!isSuccessDone()) {
return false;
}
// All executed successfully
if (isDone()) {
return true;
}
// Overtime
if (System.currentTimeMillis() - start > timeout) {
return false;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
return true;
}
isDone() Can judge whether the number of tasks with feedback results is consistent with the total number , If it is consistent, it means that the whole execution is completed .
public boolean isDone() {
return feedbackResultCount.get() == taskCount;
}
How to give feedback after the task is completed ? It's impossible for every method used to care about , So we define a facet to do this .
@Aspect
@Component
public class EventListenerAspect {
@Around(value = "@annotation(eventListener)")
public Object aroundAdvice(ProceedingJoinPoint joinpoint, EventListener eventListener) throws Throwable {
DataChangeEvent event = null;
boolean executeResult = true;
try {
event = (DataChangeEvent)joinpoint.getArgs()[0];
Object result = joinpoint.proceed();
return result;
} catch (Exception e) {
executeResult = false;
throw e;
} finally {
DefaultFuture.received(event.getMessageId(), executeResult);
}
}
}
adopt DefaultFuture.received() Feedback on execution results .
public static void received(String id, boolean result) {
DefaultFuture future = FUTURES.get(id);
if (future != null) {
// Add up the number of failed tasks
if (!result) {
future.feedbackFailResultCount.incrementAndGet();
}
// Add up the number of completed tasks
future.feedbackResultCount.incrementAndGet();
if (future.isDone()) {
FUTURES.remove(id);
future.doReceived();
}
}
}
private void doReceived() {
lock.lock();
try {
if (done != null) {
// Wake up blocked threads
done.signal();
}
} finally {
lock.unlock();
}
}
Let's summarize the whole process :
- received MQ news , Assemble into DefaultFuture, adopt get Method to get the execution result , This method blocks when the execution is not finished .
- Cut through the section and add EventListener Methods , Judge whether there is an exception to judge the execution result of the task .
- adopt DefaultFuture.received() Feedback results .
- Whether the calculation is complete in feedback , When all is done, the blocked thread is awakened .DefaultFuture.get() You can get the results .
- Is it necessary to ACK operation .
It's important to note that each EventListener The logic of internal consumption should be idempotent control .
Source code address :https://github.com/yinjihuan/kitty-cloud/tree/master/kitty-cloud-mqconsume
About author : Yin Jihuan , Simple technology enthusiasts ,《Spring Cloud Microservices - Full stack technology and case analysis 》, 《Spring Cloud Microservices introduction Actual combat and advanced 》 author , official account Ape world Originator . Personal wechat jihuan900, Welcome to hook up with .
I have compiled a complete set of learning materials , Those who are interested can search through wechat 「 Ape world 」, Reply key 「 Learning materials 」 Get what I've sorted out Spring Cloud,Spring Cloud Alibaba,Sharding-JDBC Sub database and sub table , Task scheduling framework XXL-JOB,MongoDB, Reptiles and other related information .
版权声明
本文为[Yin Jihuan]所创,转载请带上原文链接,感谢
边栏推荐
- C++ 数字、string和char*的转换
- C++学习——centos7上部署C++开发环境
- C++学习——一步步学会写Makefile
- C++学习——临时对象的产生与优化
- C++学习——对象的引用的用法
- C++编程经验(6):使用C++风格的类型转换
- Won the CKA + CKS certificate with the highest gold content in kubernetes in 31 days!
- C + + number, string and char * conversion
- C + + Learning -- capacity() and resize() in C + +
- C + + Learning -- about code performance optimization
猜你喜欢
-
C + + programming experience (6): using C + + style type conversion
-
Latest party and government work report ppt - Park ppt
-
在线身份证号码提取生日工具
-
Online ID number extraction birthday tool
-
️野指针?悬空指针?️ 一文带你搞懂!
-
Field pointer? Dangling pointer? This article will help you understand!
-
HCNA Routing&Switching之GVRP
-
GVRP of hcna Routing & Switching
-
Seq2Seq实现闲聊机器人
-
【闲聊机器人】seq2seq模型的原理
随机推荐
- LeetCode 91. 解码方法
- Seq2seq implements chat robot
- [chat robot] principle of seq2seq model
- Leetcode 91. Decoding method
- HCNA Routing&Switching之GVRP
- GVRP of hcna Routing & Switching
- HDU7016 Random Walk 2
- [Code+#1]Yazid 的新生舞会
- CF1548C The Three Little Pigs
- HDU7033 Typing Contest
- HDU7016 Random Walk 2
- [code + 1] Yazid's freshman ball
- CF1548C The Three Little Pigs
- HDU7033 Typing Contest
- Qt Creator 自动补齐变慢的解决
- HALCON 20.11:如何处理标定助手品质问题
- HALCON 20.11:标定助手使用注意事项
- Solution of QT creator's automatic replenishment slowing down
- Halcon 20.11: how to deal with the quality problem of calibration assistant
- Halcon 20.11: precautions for use of calibration assistant
- “十大科学技术问题”揭晓!|青年科学家50²论坛
- "Top ten scientific and technological issues" announced| Young scientists 50 ² forum
- 求反转链表
- Reverse linked list
- js的数据类型
- JS data type
- 记一次文件读写遇到的bug
- Remember the bug encountered in reading and writing a file
- 单例模式
- Singleton mode
- 在这个 N 多编程语言争霸的世界,C++ 究竟还有没有未来?
- In this world of N programming languages, is there a future for C + +?
- es6模板字符
- js Promise
- js 数组方法 回顾
- ES6 template characters
- js Promise
- JS array method review
- 【Golang】️走进 Go 语言️ 第一课 Hello World
- [golang] go into go language lesson 1 Hello World