前面對於分布式事務也講了好幾篇了(可靠消息最終一致性 分布式事務 - TCC 分布式事務 - 2PC、3PC),但是還沒有實戰過。那麼本篇我們就來演示下如何在 .NET 環境下實現一個基於可靠消息的分布式事務。基於可靠消息的分布式事務流程上還是比較清晰明了的,但是要用代碼去一個個實現還是比較費事的。通過分析可以發現這個事務的關鍵點就是要在真正的業務邏輯的前面、後面插入對應的流程。很明顯這種流程是可以通過 AOP 技術來簡化操作的。於是就有了 AgileDT 。AgileDT 使用 Natasha 在啟動的時候動態生成代理類,來為你完成跟消息部分的操作,使用者只需關心核心業務邏輯就可以了。

https://github.com/kklldog/AgileDT 開源不易,大家多多

回顧

前面一篇文章(可靠消息最終一致性 )我們詳細介紹了基於可靠消息的分布式事務。為了更好的理解 AgileDT 的代碼,我們還是有必要簡單的來回顧下。



該方案總體流程上可分為以下步驟:

  1. 主動方在真正的業務開始前先向可靠消息服務發送一個“待確認”的消息
  2. 可靠消息服務收到待確認消息後持久化消息到數據庫
  3. 如果以上操作成功則主動方開始真正的業務,如果失敗則直接放弃執行業務
  4. 如果業務執行成功則發送“確認”消息給可靠消息服務,如果執行失敗則發送“取消”給可靠消息服務。
  5. 如果可靠消息服務收到“確認”消息則更新數據庫裏的消息記錄的狀態為“待發送”,如果收到的消息為“取消”則更新消息狀態為“已取消”
  6. 如果上一步更新的數據庫為“待發送”,那麼會開始往MQ投遞消息,並且更改數據庫裏的消息記錄的狀態為“已發送”
  7. 上一步往MQ投遞消息成功後,MQ會給被動方推送消息。
  8. 被動方收到消息後開始處理業務
  9. 如果業務處理成功,則被動方對MQ進行ACK回複,則這條消息會從MQ內移除掉
  10. 如果業務處理成功,則發送“已完成”消息給可靠消息服務
  11. 可靠消息服務收到“已完成”消息後更新數據庫消息記錄未“已完成”

廢話不多說了,下面讓我們演示下如何使用 AgileDT 來快速實現一個基於可靠消息的分布式事務。

以下我們還是以經典的訂單下單完成給會員贈送積分的場景來演示。

使用 AgileDT

依賴組件

  • mysql
  • rabbitmq

目前支持 mysql 數據庫,但是數據訪問組件使用的是 freesql 所以後續要實現支持別的數據庫也很簡單。目前框架使用的可靠消息服務為 rabbitmq 。

運行服務端

在服務新建一個數據庫並且新建一張錶

// crate event_message table on mysql
create table if not exists event_message
(
event_id varchar(36) not null
primary key,
biz_msg varchar(4000) null,
status enum('Prepare', 'Done', 'WaitSend', 'Sent', 'Finish', 'Cancel') not null,
create_time datetime(3) null,
event_name varchar(255) null
);

使用docker-compose運行服務端

version: "3"  # optional since v1.27.0
services:
agile_dt:
image: "kklldog/agile_dt"
ports:
- "5000:5000"
environment:
- db:provider=mysql
- db:conn= Database=agile_dt;Data Source=192.168.0.115;User Id=root;Password=mdsd;port=3306
- mq:userName=admin
- mq:password=123456
- mq:host=192.168.0.115
- mq:port=5672

安裝客戶端

在主動方跟被動方都需要安裝AgileDT的客戶端庫

Install-Package AgileDT.Client

主動方使用方法

  1. 在業務數據庫添加事務消息錶
// crate event_message table on mysql
create table if not exists event_message
(
event_id varchar(36) not null
primary key,
biz_msg varchar(4000) null,
status enum('Prepare', 'Done', 'WaitSend', 'Sent', 'Finish', 'Cancel') not null,
create_time datetime(3) null,
event_name varchar(255) null
);
  1. 修改配置文件
在appsettings.json文件添加以下節點:
"agiledt": {
"server": "http://localhost:5000",
"db": {
"provider": "mysql",
"conn": "Database=agile_order;Data Source=192.168.0.125;User Id=dev;Password=dev@123f;port=13306"
//"conn": "Database=agile_order;Data Source=192.168.0.115;User Id=root;Password=mdsd;port=3306"
},
"mq": {
"host": "192.168.0.125",
//"host": "192.168.0.115",
"userName": "admin",
"password": "123456",
"port": 5672
}
}
  1. 注入 AgileDT 客戶端服務
       public void ConfigureServices(IServiceCollection services)
{
services.AddAgileDT();
...
}
  1. 實現IEventService方法

    處理主動方業務邏輯的類需要實現IEventService接口,並且標記那個方法是真正的業務方法。AgileDT在啟動的時候會掃描這些類型,並且使用AOP技術生成代理類,在業務方法前後插入對應的邏輯來跟可靠消息服務通訊。

    這裏要注意的幾個地方:
  • 實現IEventService接口
  • 使用DtEventBizMethod注解標記業務入口方法
  • 使用DtEventName注解來標記事務的方法名稱,如果不標記則使用類名

注意:業務方法最終一定要使用事務來同步修改消息錶的status字段為done狀態,這個操作框架沒辦法幫你實現

注意:業務方法如果失敗請拋出Exception,如果不拋异常框架一律認為執行成功

 public interface IAddOrderService:IEventService
{
bool AddOrder(Order order);
} [DtEventName("orderservice.order_added")]
public class AddOrderService : IAddOrderService
{
private readonly ILogger<AddOrderService> _logger; public AddOrderService(ILogger<AddOrderService> logger)
{
_logger = logger;
} public string EventId {
get;
set;
} [DtEventBizMethod]
public virtual bool AddOrder(Order order)
{
var ret = false; //3. 寫 Order 跟 修改 event 的狀態必選寫在同一個事務內
FreeSQL.Instance.Ado.Transaction(() =>
{
order.EventId = EventId;//在訂單錶新增一個eventid字段,使order跟event_message錶關聯起來
var ret0 = FreeSQL.Instance.Insert(order).ExecuteAffrows();
var ret1 = FreeSQL.Instance.Update<OrderService.Data.entities.EventMessage>()
.Set(x => x.Status, MessageStatus.Done)
.Where(x => x.EventId == EventId)
.ExecuteAffrows(); ret = ret0 > 0 && ret1 > 0;
}); return ret; } /// <summary>
/// 構造後續業務處理需要的消息內容
/// </summary>
/// <returns></returns>
public string GetBizMsg()
{
//這裏可以構造傳遞到MQ的業務消息的內容,比如傳遞訂單編號啊 ,以便後續的被動方處理業務時候使用
var order = FreeSQL.Instance.Select<Order>().Where(x => x.EventId == EventId).First();
return order?.Id;
} }

在實現好 IAddOrderService 接口後,你可以像平常一樣使用 IAddOrderService 來注入實現類。比如在 Controller 的構造函數注入進去。因為 AgileDT 在啟動的時候會自動幫你注册。

注意:IAddOrderService 跟實現類的生命周期是 Scoped 。

被動方使用方法

  1. 在業務方數據庫建錶或者在業務錶上加字段

    對於被動方來說這裏不是必須要建一個錶。但是至少要有個地方來存儲event_id的信息,最簡單的是直接在業務主錶上加event_id字段。
  2. 修改配置文件
在appsettings.json文件添加以下節點:
"agiledt": {
"server": "http://localhost:5000",
"db": {
"provider": "mysql",
"conn": "Database=agile_order;Data Source=192.168.0.125;User Id=dev;Password=dev@123f;port=13306"
//"conn": "Database=agile_order;Data Source=192.168.0.115;User Id=root;Password=mdsd;port=3306"
},
"mq": {
"host": "192.168.0.125",
//"host": "192.168.0.115",
"userName": "admin",
"password": "123456",
"port": 5672
}
}
  1. 注入AgileDT服務
       public void ConfigureServices(IServiceCollection services)
{
services.AddAgileDT();
...
}
  1. 實現IEventMessageHandler接口

    被動方需要接收MQ投遞過來的消息,這些處理類需要實現IEventMessageHandler接口。AgileDT啟動的時候會去掃描這些類,然後跟MQ建立綁定關系。
  • 這裏必須使用DtEventName注解標記需要處理的事件名稱
  • Reveive 方法必須是冥等的
    public interface IOrderAddedMessageHandler: IEventMessageHandler
{
} [DtEventName("orderservice.order_added")]
public class OrderAddedMessageHandler: IOrderAddedMessageHandler
{
static object _lock = new object(); public bool Receive(EventMessage message)
{
var bizMsg = message.BizMsg;
var eventId = message.EventId;
string orderId = bizMsg; lock (_lock)
{
var entity = FreeSQL.Instance.Select<PointHistory>().Where(x => x.EventId == eventId).First();
if (entity == null)
{
var ret = FreeSQL.Instance.Insert(new PointHistory
{
Id = Guid.NewGuid().ToString(),
EventId = message.EventId,
OrderId = orderId,
Points = 20,
CreateTime = DateTime.Now
}).ExecuteAffrows(); return ret > 0;
}
else
{
return true;
}
}
}
}

總結

通過以上演示,我們快速的實現了一個訂單下單會員贈送積分的服務。可以看到使用 AgileDT 可以很快速的實現一個分布式事務。特別是在實現過一個分布式事務後,後面實現起來就特別簡單,只要實現幾個接口就可以了。AgileDT 才剛剛起步,希望大家多多支持,多多 ,多多 PR 。

https://github.com/kklldog/AgileDT

.Net Core with 微服務 - 使用 AgileDT 快速實現基於可靠消息的分布式事務的更多相關文章

  1. spring cloud+dotnet core搭建微服務架構:服務發現(二)

    前言 上篇文章實際上只講了服務治理中的服務注册,服務與服務之間如何調用呢?傳統的方式,服務A調用服務B,那麼服務A訪問的是服務B的負載均衡地址,通過負載均衡來指向到服務B的真實地址,上篇文章已經說了這 ...

  2. spring cloud+.net core搭建微服務架構:服務發現(二)

    前言 上篇文章實際上只講了服務治理中的服務注册,服務與服務之間如何調用呢?傳統的方式,服務A調用服務B,那麼服務A訪問的是服務B的負載均衡地址,通過負載均衡來指向到服務B的真實地址,上篇文章已經說了這 ...

  3. .Net Core with 微服務 - 架構圖

    上一次我們簡單介紹了什麼是微服務(.NET Core with 微服務 - 什麼是微服務 ).介紹了微服務的來龍去脈,一些基礎性的概念.有大佬在評論區指出說這根本不是微服務.由於本人的能力有限,大概也 ...

  4. 手把手教你使用spring cloud+dotnet core搭建微服務架構:服務治理(-)

    背景 公司去年開始使用dotnet core開發項目.公司的總體架構采用的是微服務,那時候由於對微服務的理解並不是太深,加上各種組件的不成熟,只是把項目的各個功能通過業務層面拆分,然後通過nginx代 ...

  5. spring cloud+dotnet core搭建微服務架構:Api網關(三)

    前言 國慶假期,一直沒有時間更新. 根據群裏面的同學的提問,强烈推薦大家先熟悉下spring cloud.文章下面有純潔大神的spring cloud系列. 上一章最後說了,因為服務是不對外暴露的,所 ...

  6. spring cloud+dotnet core搭建微服務架構:配置中心(四)

    前言 我們項目中有很多需要配置的地方,最常見的就是各種服務URL地址,這些地址針對不同的運行環境還不一樣,不管和打包還是部署都麻煩,需要非常的小心.一般配置都是存儲到配置文件裏面,不管多小的配置變動, ...

  7. spring cloud+dotnet core搭建微服務架構:配置中心續(五)

    前言 上一章最後講了,更新配置以後需要重啟客戶端才能生效,這在實際的場景中是不可取的.由於目前Steeltoe配置的重載只能由客戶端發起,沒有實現處理程序偵聽服務器更改事件,所以還沒辦法實現徹底實現這 ...

  8. spring cloud+dotnet core搭建微服務架構:Api授權認證(六)

    前言 這篇文章拖太久了,因為最近實在太忙了,加上這篇文章也非常長,所以花了不少時間,給大家說句抱歉.好,進入正題.目前的項目基本都是前後端分離了,前端分Web,Ios,Android...,後端也基本 ...

  9. spring cloud+.net core搭建微服務架構:服務注册(一)

    背景 公司去年開始使用dotnet core開發項目.公司的總體架構采用的是微服務,那時候由於對微服務的理解並不是太深,加上各種組件的不成熟,只是把項目的各個功能通過業務層面拆分,然後通過nginx代 ...

  10. spring cloud+.net core搭建微服務架構:Api授權認證(六)

    前言 這篇文章拖太久了,因為最近實在太忙了,加上這篇文章也非常長,所以花了不少時間,給大家說句抱歉.好,進入正題.目前的項目基本都是前後端分離了,前端分Web,Ios,Android...,後端也基本 ...

隨機推薦

  1. 利用QMP和QEMU虛擬機交互的幾種方式

    QMP是一種基於JSON格式的傳輸協議,我們能利用它與一個QEMU虛擬機實例進行交互,例如查詢,更改虛擬機的狀態,獲取設備信息等等.下面是幾種創建QMP的方法以及對其它的一些基本命令的使用: 1.基於 ...

  2. MySQL的高級查詢

    高級查詢 1.連接查詢(對列的擴展) 第一種形式select * from Info,Nation #會形成笛卡爾積 select * from Info,Nation where Info.Nati ...

  3. C#實現局域網文件傳輸

    網絡通信一般都是通過Socket進行的,稱為進程通信機制,通常也稱作"套接字",用於描述IP地址和端口,是一個通信鏈的句柄. 先學習一下socket基本原理: socket原理: ...

  4. 使用logmnr方法找回被誤删除Oracle的數據的脚本

    俗話說,常在河邊走,哪有不濕鞋的.作為一個經常與數據庫打交道的程序員,偶爾不小心誤删除或誤操作的數據也是在所難免的.如果是Oracle數據庫,這裏給您介紹一種從日志中找回數據的辦法,下面這個地址是我以 ...

  5. Cent OS安裝TL-WN725N 2.0 USB驅動

    TP Link官方沒有提供TL-WN725N 2.0的Linux驅動下載,折騰了我半天,試了各種方法.也有一部分原因是因為這機器還不能聯網,導致有一些驅動因為缺少依賴並不成功安裝. 後來終於在gith ...

  6. PHP免費API調用,使用(CURL)

    <?phpclass GetApiModel{//獲取第三方API //獲取身份證信息 //返回json /*{ "errNum": 0, "retMsg" ...

  7. 在XAML代碼中為節點樹安裝事件監聽器

    通過以下的演示樣例代碼,能够發現,我們能為隨意的節點指定要監聽的路由事件,而這個路由事件本身和這個元素可能根本就沒有關系. <Window x:Class="Demo002.MainW ...

  8. [linux]ubuntu apt-get安裝軟件失敗

    1.首先查看 dns 配置 sudo vi /etc/resolv.conf nameserver 114.114.114.114 nameserver 8.8.8.8 2.修改 apt-get 源 ...

  9. linux 備份 文件+sql

    sql 1 2 3 rm -f /bak/bak.sql mysqldump --databases nl -uroot -p413121 > /bak/bak.sql curl http:// ...

  10. JQ 為未來元素添加事件處理器—事件委托

    隨著DOM結構的複雜化和Ajax等動態脚本技術的運用,有了較多的動態添加進來的元素,直接用JQ添加click事件會發現新添加進來的元素並不能直接選取到,在這裏就需要用到事件委托方法,JQ為事件委托提供 ...