当前位置:网站首页>Details of dapr implementing distributed stateful service

Details of dapr implementing distributed stateful service

2020-11-06 01:15:08 egmkang

Dapr It's a cross language design for the cloud environment , Event driven , It is convenient to build micro service system . balabala a pile , Interested partners can go to learn about .

Dapr Providing stateful and stateless microservices . Most people do stateless Services ( Microservices ) Of , It's just that being stateless in some areas doesn't work well , Because the cost is too much ; Stateful services have fixed scenarios , It requires a small cost , Both latency and throughput are high . Don't talk nonsense , Look directly at Dapr How to implement stateful Services .

 

Let's take a look at stateful services first :

1. Stable routing

    Send to A Server requests , Can't send to B The server , Otherwise it's stateless

2. state

    The state is stored in its own server , Not remote storage , There is a clear difference between this and stateless , So stateless services need to use redis This kind of thing accelerates , There's no need to

3. Processing is single threaded

    States are generally more complex , It's difficult to do parallel computing on a complex thing ; Of course A and B There is no relationship between the logic of , In fact, it can be parallel , however A Its own logical execution needs serial execution .

 

For a stateful service (dapr), Realization 23 It's actually very relaxing , There are even some things that users need to implement , therefore 1 That's the key , The current news ( request ) Which server needs to be sent to for processing is the most important , It even determines what system he is .

Determine which request's destination address , This thing is called... In a distributed system Placement, Sometimes it's called Naming. TiDB There's a Server It's called PlacementDriver, abbreviation PD, It's the same thing .

Okay , Start studying Dapr Of Placement How did it happen .

 

There is one Placement The process of , 2333, Catalog cmd/placement, It's up to him

func main() {
	log.Infof("starting Dapr Placement Service -- version %s -- commit %s", version.Version(), version.Commit())

	cfg := newConfig()

	// Apply options to all loggers.
	if err := logger.ApplyOptionsToLoggers(&cfg.loggerOptions); err != nil {
		log.Fatal(err)
	}
	log.Infof("log level set to: %s", cfg.loggerOptions.OutputLevel)

	// Initialize dapr metrics for placement.
	if err := cfg.metricsExporter.Init(); err != nil {
		log.Fatal(err)
	}

	if err := monitoring.InitMetrics(); err != nil {
		log.Fatal(err)
	}

	// Start Raft cluster.
	raftServer := raft.New(cfg.raftID, cfg.raftInMemEnabled, cfg.raftBootStrap, cfg.raftPeers)
	if raftServer == nil {
		log.Fatal("failed to create raft server.")
	}

	if err := raftServer.StartRaft(nil); err != nil {
		log.Fatalf("failed to start Raft Server: %v", err)
	}

	// Start Placement gRPC server.
	hashing.SetReplicationFactor(cfg.replicationFactor)
	apiServer := placement.NewPlacementService(raftServer)

You can see main Function starts a raft server, Generally speaking , It shows that we have achieved strong consistency in some aspects of ability .

raft The library uses consul Realized raft, instead of etcd, because etcd Of raft It's not Ku , It can only be a server ( Include etcd embed), You can't customize the protocol inside , You can only use etcd For you client Come and visit him . This point etcd It was very unfriendly .

 

If you use raft Kuo does it placement, Then the protocol can be customized , You can find Apply Related functions , because raft State machines are just responsible for log The consistency of , log That's news , The processing of messages shows state , Apply Function is where the user needs to do message processing . Fortunately, I have done it before MIT 6.824 Of lab, A little bit about this .

// Apply log is invoked once a log entry is committed.
func (c *FSM) Apply(log *raft.Log) interface{} {
	buf := log.Data
	cmdType := CommandType(buf[0])

	if log.Index < c.state.Index {
		logging.Warnf("old: %d, new index: %d. skip apply", c.state.Index, log.Index)
		return nil
	}

	var err error
	var updated bool
	switch cmdType {
	case MemberUpsert:
		updated, err = c.upsertMember(buf[1:])
	case MemberRemove:
		updated, err = c.removeMember(buf[1:])
	default:
		err = errors.New("unimplemented command")
	}

	if err != nil {
		return err
	}

	return updated
}

stay pkg/placement/raft Find... Under the folder raft Related code , fsm.go There are message handling functions in it .

You can see , The processing of messages is very simple , There's only MemberUpsert, and MemberRemove Two news .  FSM The only state saved in the state machine is :

// DaprHostMemberState is the state to store Dapr runtime host and
// consistent hashing tables.
type DaprHostMemberState struct {
	// Index is the index number of raft log.
	Index uint64
	// Members includes Dapr runtime hosts.
	Members map[string]*DaprHostMember

	// TableGeneration is the generation of hashingTableMap.
	// This is increased whenever hashingTableMap is updated.
	TableGeneration uint64

	// hashingTableMap is the map for storing consistent hashing data
	// per Actor types.
	hashingTableMap map[string]*hashing.Consistent
}

Obviously , There's only DaprHostMember This useful information , and DaprHostMember It's the nodes in the cluster .

 

Here we can analyze , Dapr adopt Raft Agreement to maintain a Strong consistency Of Membership, Nothing else .... According to my friend , Follow Orleans It's a little bit similar , It's just Orleans yes AP System .

 

And then through consistency Hash Analysis of , You can see :

func (a *actorsRuntime) lookupActorAddress(actorType, actorID string) (string, string) {
	if a.placementTables == nil {
		return "", ""
	}

	t := a.placementTables.Entries[actorType]
	if t == nil {
		return "", ""
	}
	host, err := t.GetHost(actorID)
	if err != nil || host == nil {
		return "", ""
	}
	return host.Name, host.AppID
}

adopt  ActorType and ActorID To consistent Hash Look for host, that GetHost Implementation is consistency Hash Table implementation .

Actor RPC Call The implementation of the :

func (a *actorsRuntime) Call(ctx context.Context, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
	if a.placementBlock {
		<-a.placementSignal
	}

	actor := req.Actor()
	targetActorAddress, appID := a.lookupActorAddress(actor.GetActorType(), actor.GetActorId())
	if targetActorAddress == "" {
		return nil, errors.Errorf("error finding address for actor type %s with id %s", actor.GetActorType(), actor.GetActorId())
	}

	var resp *invokev1.InvokeMethodResponse
	var err error

	if a.isActorLocal(targetActorAddress, a.config.HostAddress, a.config.Port) {
		resp, err = a.callLocalActor(ctx, req)
	} else {
		resp, err = a.callRemoteActorWithRetry(ctx, retry.DefaultLinearRetryCount, retry.DefaultLinearBackoffInterval, a.callRemoteActor, targetActorAddress, appID, req)
	}

	if err != nil {
		return nil, err
	}
	return resp, nil
}

Through what we saw just now loopupActorAddress Function found Host, And then judge whether it's in the present Host In the host , Otherwise, it's sent to the remote , Optimized for the current host , It's not really chicken , Because in Distributed Systems , There are usually many host, At present host The probability within is actually very low .

 

From this side , We can probably analyze the whole picture , namely Dapr Details of implementing distributed stateful Services :

1. adopt Consul Raft Library maintenance Membership

2. Clusters and Placement Component communication , Get Membership

3. seek Actor The algorithm is implemented in Host Inside , instead of Placement Components . adopt ActorType Find something that can provide a service Host, And then form a consistency Hash surface , Look up in this table Host, And forward the request

 

Yes Host Internal consistency Hash Table lookup reference , Found a place to change the content :

func (a *actorsRuntime) updatePlacements(in *placementv1pb.PlacementTables) {
	a.placementTableLock.Lock()
	defer a.placementTableLock.Unlock()

	if in.Version != a.placementTables.Version {
		for k, v := range in.Entries {
			loadMap := map[string]*hashing.Host{}
			for lk, lv := range v.LoadMap {
				loadMap[lk] = hashing.NewHost(lv.Name, lv.Id, lv.Load, lv.Port)
			}
			c := hashing.NewFromExisting(v.Hosts, v.SortedSet, loadMap)
			a.placementTables.Entries[k] = c
		}

		a.placementTables.Version = in.Version
		a.drainRebalancedActors()

		log.Infof("placement tables updated, version: %s", in.GetVersion())

		a.evaluateReminders()
	}
}

You can see from these lines of code that , The versions are different , It's all updated , And it's going to be rehash, Namely a.drainRebalanceActors. 

If you've studied data structure , So I must have learned something called HashTable, HashTable It needs to be expanded rehash, You need to build a bigger table, And then put all the elements back in , The location will be very different from the original . And consistency Hash It can solve all the problems rehash The situation of , Let only part of the content rehash, There will be less invalid content .

however , There's a way for everything, but , All the nodes are at the same time rehash not so bad , But how can a distributed system do everything node All at the same time rehash, Obviously it can't be done , therefore Dapr Maintenance of Actor Address Catalog , It's ultimately consistent , In other words, there will be multiple ID same Actor( For a short time ), It still leads to inconsistencies .

 

Yes dapr/proto/placement/v1/placement.proto see , Verified my conjecture

// Placement service is used to report Dapr runtime host status.
service Placement {
  rpc ReportDaprStatus(stream Host) returns (stream PlacementOrder) {}
}

message PlacementOrder {
  PlacementTables tables = 1;
  string operation = 2;
}

Host start-up , Just go to placement Over there through gRPC Stream Subscribe to cluster changes . I'm so lazy , It's the whole thing membership Send it , Instead of sending diff.

 

To sum up , From the above source analysis, we can know , Dapr Of Membership yes CP System , however Actor Of Placement No , It's a final consensus AP System . and TiDB Of PD It's a CP System , It's just through etcd embed It's done . I hope it can help you a little bit .

It helps me , It could be Dapr about Consul raft Use .

 

Reference resources :

1. Dapr

2. Etcd Embed

3. Consul Raft

 

版权声明
本文为[egmkang]所创,转载请带上原文链接,感谢