Fabric-sdk-go在Fabric 2.4以上版本中并不适用,所以这里使用fabric-gateway-go来构fabric应用。
另外,对于fabric还处在学习阶段,所以有些内容不会太深入。
1. Fabric-Gateway介绍
Fabric-Gateway
是Hyperledger Fabric v2.4及更高版本中引入的一项服务,它位于Fabric Peer节点中,通过一个简单的gRPC接口服务于客户端应用。Fabric-Gateway
将交易提交逻辑从客户端应用程序中剥离出来,可以减轻开发者的负担,并优化网络流量和安全性。
Fabric-Gateway
支持的语言有:JavaScript/TypeScript、Java、Go及Python等。这里仅以Fabric-Gateway-Go
为例来说明。
2. Fabric-Gateway简介
需要说明一点,Fabric-Gateway
与Fabric-SDK
相比,不能部署链码,所以在使用Fabric-Gateway
开发区块链应用之前,需要先自行完成通道及链码的部署。具体通道及链码的部署过程这里不再赘述。
本篇博客主要介绍Fabric-Gateway
的两个功能: 提交交易以及事件监听。
2.1 交易
Hyperledger Fabric中的交易(Transaction)是指在区块链上执行的操作,通过交易可以实现对区块链上数据的创建、读取、更新和删除操作。在Hyperledger Fabric中,交易的执行过程主要包括以下步骤:
- 提交交易提案:客户端应用程序创建一个交易提案,指定要调用的链码函数和相关参数。交易提案被发送到一个或多个背书节点。
- 链码执行(模拟):背书节点接收到交易提案后,调用指定的链码函数,执行链码代码。链码在模拟执行过程中不会真正修改账本,而是生成一个读写集,记录链码尝试读取和修改的状态。
- 交易背书:背书节点将链码执行的结果(读写集)和节点的签名返回给客户端。客户端应用程序收集足够数量的背书签名,验证其有效性。
- 提交交易:客户端应用程序将已背书的交易提交给Orderer节点。排序节点将交易按照时间顺序打包成区块,并广播给所有Peer节点。
- 交易验证和提交:Peer节点接收到区块后,验证交易的有效性(如读写集是否冲突)。验证通过的交易将被提交到账本,并更新世界状态数据库。
2.2 事件监听
2.2.1 事件监听类型
目前Fabric-Gataway-Go
中的监听事件主要有以下几种:
- 链码事件(Chaincode Events):该事件由链码在执行过程中触发,用于通知外部系统特定的业务事件。
- 区块事件(Block Events): 区块事件对于监控区块链网络的整体状态和活动非常有用。当新的区块被添加到账本时触发。区块事件提供了整个区块的详细信息,包括所有交易的细节。
- 过滤区块事件(Filtered Block Events):该类事件是区块事件的一种简化版本,只包含区块和交易的元数据,不包含交易的详细读写集。过滤区块事件减少了数据量,适合需要监控但不需要完整交易详细信息的应用场景。
2.2 事件监听作用
在Hyperledger Fabric中,事件监听的作用主要有以下几个:
- 实时通知和响应: 应用程序可以在链码事件或区块事件发生时,立即收到通知并执行相应操作。
- 数据同步: 事件监听可以用于将区块链上的数据同步到外部数据库或系统中,确保数据的一致性和实时性。
- 监控和审计:区块事件监听可以用于实时监控区块链网络的活动,记录每个区块和交易的详细信息,便于事后审计和追踪。
3. 具体实现
3.1 准备工作
这里依然选择使用GoLand来编写代码,所以这里需要完成的前期准备工作主要包括:
- GoLand连接虚拟机系统(hyperledger fabric网络所在的系统);
- 配置peer节点的host文件。
以上两点具体实现过程可以参考:https://blog.csdn.net/yeshang_lady/article/details/134921528
3.2 具体代码
&esmp; 这里为了方便,使用如下链码文件bussiness_CC.go
。其具体代码如下:
package main
import (
"encoding/json"
"fmt"
"github.com/hyperledger/fabric-contract-api-go/contractapi"
"log"
)
type SmartContract struct {
contractapi.Contract
}
type Asset struct {
AppraisedValue int `json:"AppraisedValue"`
Color string `json:"Color"`
ID string `json:"ID"`
Owner string `json:"Owner"`
Size int `json:"Size"`
}
func (s *SmartContract) InitLedger(ctx contractapi.TransactionContextInterface) error {
assets := []Asset{
{
ID: "asset1", Color: "blue", Size: 5, Owner: "Tomoko", AppraisedValue: 300},
{
ID: "asset2", Color: "red", Size: 5, Owner: "Brad", AppraisedValue: 400},
{
ID: "asset3", Color: "green", Size: 10, Owner: "Jin Soo", AppraisedValue: 500},
{
ID: "asset4", Color: "yellow", Size: 10, Owner: "Max", AppraisedValue: 600},
{
ID: "asset5", Color: "black", Size: 15, Owner: "Adriana", AppraisedValue: 700},
{
ID: "asset6", Color: "white", Size: 15, Owner: "Michel", AppraisedValue: 800},
}
for _, asset := range assets {
assetJSON, err := json.Marshal(asset)
if err != nil {
return err
}
err = ctx.GetStub().PutState(asset.ID, assetJSON)
if err != nil {
return fmt.Errorf("failed to put to world state. %v", err)
}
}
// 触发链码事件
ctx.GetStub().SetEvent("InitLedger", []byte("success"))
return nil
}
func (s *SmartContract) GetAllAssets(ctx contractapi.TransactionContextInterface) ([]*Asset, error) {
resultsIterator, err := ctx.GetStub().GetStateByRange("", "")
if err != nil {
return nil, err
}
defer resultsIterator.Close()
var assets []*Asset
for resultsIterator.HasNext() {
queryResponse, err := resultsIterator.Next()
if err != nil {
return nil, err
}
var asset Asset
err = json.Unmarshal(queryResponse.Value, &asset)
if err != nil {
return nil, err
}
assets = append(assets, &asset)
}
return assets, nil
}
func (s *SmartContract) AssetExists(ctx contractapi.TransactionContextInterface, id string) (bool, error) {
assetJSON, err := ctx.GetStub().GetState(id)
if err != nil {
return false, fmt.Errorf("failed to read from world state: %v", err)
}
return assetJSON != nil, nil
}
func (s *SmartContract) CreateAsset(ctx contractapi.TransactionContextInterface, id string, color string, size int, owner string, appraisedValue int) error {
exists, err := s.AssetExists(ctx, id)
if err != nil {
return err
}
if exists {
return fmt.Errorf("the asset %s already exists", id)
}
asset := Asset{
ID: id,
Color: color,
Size: size,
Owner: owner,
AppraisedValue: appraisedValue,
}
assetJSON, err := json.Marshal(asset)
if err != nil {
return err
}
return ctx.GetStub().PutState(id, assetJSON)
}
func main() {
assetChaincode, err := contractapi.NewChaincode(&SmartContract{
})
if err != nil {
log.Panicf("Error creating asset-transfer-basic chaincode: %v", err)
}
if err := assetChaincode.Start(); err != nil {
log.Panicf("Error starting asset-transfer-basic chaincode: %v", err)
}
}
客户端代码如下:
package main
import (
"bytes"
"context"
"crypto/x509"
"encoding/json"
"errors"
"fmt"
"github.com/hyperledger/fabric-gateway/pkg/client"
"github.com/hyperledger/fabric-gateway/pkg/identity"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"os"
"path"
"strconv"
"time"
)
var mspID = "Org1MSP"
var cryptoPath = os.Getenv("GOPATH") + "/src/DataCleansing/fixtures/organizations/peerOrganizations/org1.datacleansing.com"
var certPath = cryptoPath + "/users/[email protected]/msp/signcerts"
var keyPath = cryptoPath + "/users/[email protected]/msp/keystore"
var tlsCertPath = cryptoPath + "/peers/peer0.org1.datacleansing.com/tls/ca.crt"
var peerEndpoint = "peer0.org1.datacleansing.com:7051"
var gatewayPeer = "peer0.org1.datacleansing.com"
const (
chaincodeName = "business"
channelName = "buschannel"
)
type Asset struct {
AppraisedValue int `json:"AppraisedValue"`
Color string `json:"Color"`
ID string `json:"ID"`
Owner string `json:"Owner"`
Size int `json:"Size"`
}
func main() {
clientConnection := newGrpcConnection()
defer clientConnection.Close()
id := newIdentity()
sign := newSign()
gw, err := client.Connect(
id,
client.WithSign(sign),
client.WithClientConnection(clientConnection),
client.WithEvaluateTimeout(5*time.Second),
client.WithEndorseTimeout(15*time.Second),
client.WithSubmitTimeout(5*time.Second),
client.WithCommitStatusTimeout(1*time.Minute),
)
if err != nil {
panic(err)
}
defer gw.Close()
network := gw.GetNetwork(channelName)
contract := network.GetContract(chaincodeName)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
startChaincodeEventListening(ctx, network)
firstBlockNumber := initLedger(contract)
createAsset(contract)
getAllAssets(contract)
replayChaincodeEvents(ctx, network, firstBlockNumber)
}
func newGrpcConnection() *grpc.ClientConn {
certificatePEM, err := os.ReadFile(tlsCertPath)
if err != nil {
panic(fmt.Errorf("failed to read TLS certifcate file: %w", err))
}
certificate, err := identity.CertificateFromPEM(certificatePEM)
if err != nil {
panic(err)
}
certPool := x509.NewCertPool()
certPool.AddCert(certificate)
transportCredentials := credentials.NewClientTLSFromCert(certPool, gatewayPeer)
connection, err := grpc.NewClient(peerEndpoint, grpc.WithTransportCredentials(transportCredentials))
if err != nil {
panic(fmt.Errorf("failed to create gRPC connection: %w", err))
}
return connection
}
func newIdentity() *identity.X509Identity {
certificatePEM, err := readFirstFile(certPath)
if err != nil {
panic(fmt.Errorf("failed to read certificate file: %w", err))
}
certificate, err := identity.CertificateFromPEM(certificatePEM)
if err != nil {
panic(err)
}
id, err := identity.NewX509Identity(mspID, certificate)
if err != nil {
panic(err)
}
return id
}
func newSign() identity.Sign {
privateKeyPEM, err := readFirstFile(keyPath)
if err != nil {
panic(fmt.Errorf("failed to read private key file: %w", err))
}
privateKey, err := identity.PrivateKeyFromPEM(privateKeyPEM)
if err != nil {
panic(err)
}
sign, err := identity.NewPrivateKeySign(privateKey)
if err != nil {
panic(err)
}
return sign
}
func startChaincodeEventListening(ctx context.Context, network *client.Network) {
fmt.Println("\n*** Start chaincode event listening")
events, err := network.ChaincodeEvents(ctx, chaincodeName)
if err != nil {
panic(fmt.Errorf("failed to start chaincode event listening: %w", err))
}
go func() {
for event := range events {
fmt.Println(event.Payload)
data := event.Payload
fmt.Printf("\n<-- Chaincode event received: %s - %s\n", event.EventName, data)
}
}()
}
func readFirstFile(dirPath string) ([]byte, error) {
dir, err := os.Open(dirPath)
if err != nil {
return nil, err
}
fileNames, err := dir.Readdirnames(1)
if err != nil {
return nil, err
}
return os.ReadFile(path.Join(dirPath, fileNames[0]))
}
func initLedger(contract *client.Contract) uint64 {
fmt.Printf("\n--> Submit Transaction: InitLedger, function creates the initial set of assets on the ledger \n")
_, commit, err := contract.SubmitAsync("InitLedger")
if err != nil {
panic(fmt.Errorf("failed to submit transaction: %w", err))
}
status, err := commit.Status()
if err != nil {
panic(fmt.Errorf("failed to get transaction commit status: %w", err))
}
if !status.Successful {
panic(fmt.Errorf("failed to commit transaction with status code %v", status.Code))
}
fmt.Println("\n*** InitLedger committed successfully")
return status.BlockNumber
}
func getAllAssets(contract *client.Contract) {
fmt.Println("\n--> Evaluate Transaction: GetAllAssets, function returns all the current assets on the ledger")
evaluateResult, err := contract.EvaluateTransaction("GetAllAssets")
if err != nil {
panic(fmt.Errorf("failed to evaluate transaction: %w", err))
}
result := formatJSON(evaluateResult)
fmt.Printf("*** Result:%s\n", result)
}
func createAsset(contract *client.Contract) error {
_, err := contract.SubmitTransaction("CreateAsset", "asset7", "blue", strconv.Itoa(5), "Tomoko", strconv.Itoa(300))
if err != nil {
fmt.Printf("*** Error: %v\n", err)
return err
}
fmt.Printf("*** Transaction committed successfully\n")
return nil
}
func formatJSON(data []byte) string {
var prettyJSON bytes.Buffer
if err := json.Indent(&prettyJSON, data, "", " "); err != nil {
panic(fmt.Errorf("failed to parse JSON: %w", err))
}
return prettyJSON.String()
}
func replayChaincodeEvents(ctx context.Context, network *client.Network, startBlock uint64) {
fmt.Println("\n*** Start chaincode event replay")
events, err := network.ChaincodeEvents(ctx, chaincodeName, client.WithStartBlock(startBlock))
if err != nil {
panic(fmt.Errorf("failed to start chaincode event listening: %w", err))
}
for {
select {
case <-time.After(10 * time.Second):
panic(errors.New("timeout waiting for event replay"))
case event := <-events:
data := event.Payload
fmt.Printf("\n<-- Chaincode event replayed: %s - %s\n", event.EventName, data)
if event.EventName == "InitLedger" {
// Reached the last submitted transaction so return to stop listening for events
return
}
}
}
}
代码执行结果如下:
*** Start chaincode event listening
--> Submit Transaction: InitLedger, function creates the initial set of assets on the ledger
*** InitLedger committed successfully
[115 117 99 99 101 115 115]
<-- Chaincode event received: InitLedger - success
*** Transaction committed successfully
--> Evaluate Transaction: GetAllAssets, function returns all the current assets on the ledger
*** Result:[
{
"AppraisedValue": 300,
"Color": "blue",
"ID": "asset1",
"Owner": "Tomoko",
"Size": 5
},
{
"AppraisedValue": 400,
"Color": "red",
"ID": "asset2",
"Owner": "Brad",
"Size": 5
},
{
"AppraisedValue": 500,
"Color": "green",
"ID": "asset3",
"Owner": "Jin Soo",
"Size": 10
},
{
"AppraisedValue": 600,
"Color": "yellow",
"ID": "asset4",
"Owner": "Max",
"Size": 10
},
{
"AppraisedValue": 700,
"Color": "black",
"ID": "asset5",
"Owner": "Adriana",
"Size": 15
},
{
"AppraisedValue": 800,
"Color": "white",
"ID": "asset6",
"Owner": "Michel",
"Size": 15
},
{
"AppraisedValue": 300,
"Color": "blue",
"ID": "asset7",
"Owner": "Tomoko",
"Size": 5
}
]
*** Start chaincode event replay
<-- Chaincode event replayed: InitLedger - success
文章评论