GRPC元数据
在gRPC中,元数据(metadata)是用于在gRPC请求和响应中传递附加信息的一种机制。元数据是以键值对(key-value pairs)的形式组织的信息,它可以包含请求的上下文信息、安全凭证、消息传输相关的信息等。元数据在gRPC通信中非常重要,它可以用于各种用途,例如认证、授权、日志记录等。
gRPC的元数据可以分为两种类型:
-
Call Metadata(调用级元数据):这种类型的元数据与特定的gRPC调用相关联,包含了与该调用相关的信息。例如,客户端可以将认证令牌放入元数据中,以便服务器端进行身份验证。服务器端也可以在元数据中包含一些信息,以便客户端了解如何处理响应。
在gRPC中,元数据可以在
ClientContext
(客户端上下文)和ServerContext
(服务器端上下文)中设置和获取。客户端可以使用ClientContext
类的方法来设置和获取元数据,而服务器端可以使用ServerContext
类的方法来处理元数据。 -
Channel Metadata(通道级元数据):这种类型的元数据与整个gRPC通道(Channel)相关联,它包含了通道的配置信息。通道级元数据通常由客户端提供,用于在整个通道上设置一些特定的配置。例如,客户端可以在通道级别上设置认证凭证,以便所有的调用都可以使用相同的认证信息。
在gRPC中,元数据通常以key-value
的形式表示,其中key
是字符串,表示元数据的名称,而value
可以是字符串、二进制数据等。在gRPC的HTTP/2协议中,元数据被编码为HTTP/2的Header Frames,并在请求和响应中传递。
以下是一些常见的gRPC元数据的示例:
authority
:用于指定请求的目标地址(authority)。authorization
:用于携带认证信息,例如Bearer令牌。content-type
:指定消息的类型(例如,application/grpc
表示gRPC消息)。grpc-timeout
:指定请求的超时时间。grpc-status
:指定响应的状态码(例如,OK
表示成功)。
客户端和服务器端可以根据需求自定义元数据,以便在gRPC通信中传递必要的信息。在gRPC API中,通常提供了方法来设置和获取元数据,以方便开发者使用。
在客户端使用元数据:
在客户端,您可以使用ClientContext
类的方法来设置和获取元数据。以下是一些常见的操作示例:
设置元数据:
#include <grpc++/grpc++.h>
grpc::ClientContext context;
// 设置元数据:键值对形式
context.AddMetadata("authorization", "Bearer YourAccessToken");
context.AddMetadata("custom-key", "custom-value");
// 发起gRPC调用...
获取元数据:
#include <grpc++/grpc++.h>
grpc::ClientContext context;
// 发起gRPC调用...
// 获取响应中的元数据
const auto& metadata = context.GetServerTrailingMetadata();
// 使用metadata中的信息...
在服务器端使用元数据:
在服务器端,您可以使用ServerContext
类的方法来处理和获取元数据。以下是一些常见的操作示例:
处理客户端发送的元数据
#include <grpc++/grpc++.h>
void YourRpcMethod(grpc::ServerContext* context, RequestType* request,
ResponseType* response) {
// 获取客户端发送的元数据
const auto& metadata = context->client_metadata();
// 使用metadata中的信息...
// 处理gRPC请求...
}
设置服务器端响应的元数据:
#include <grpc++/grpc++.h>
void YourRpcMethod(grpc::ServerContext* context, RequestType* request,
ResponseType* response) {
// 处理gRPC请求...
// 设置服务器端响应的元数据
context->AddTrailingMetadata("custom-key", "custom-value");
// 发送响应...
}
在这些示例中,AddMetadata
和 AddTrailingMetadata
方法用于设置元数据,而 client_metadata()
方法和 GetServerTrailingMetadata()
方法用于获取客户端发送的元数据和服务器端响应的元数据,供开发者在gRPC通信中传递信息使用。这样,您就可以方便地使用元数据在gRPC的C++应用程序中传递附加信
配置生成proto文件
创建文件helloword.proto
syntax = "proto3";
package helloword;
service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply){}
rpc SayHelloStreamReply (HelloRequest) returns (HelloReply){}
rpc SayHelloBidiStream (stream HelloRequest) returns (stream HelloReply) {}
}
message HelloRequest {
string name = 1;
}
message HelloReply {
string message = 1;
}
然后执行:
protoc -I . --cpp_out=. ./helloword.proto #生成helloword.pb.cc文件
protoc -I . --grpc_out=. --plugin=protoc-gen-grpc=`which grpc_cpp_plugin` ./helloword.proto #生成helloword.grpc.pb.cc文件
-I :(-IPATH)指定要在其中搜索导入(import)的目录。可指定多次,目录将按顺序搜索。如果没有给出,则使用当前工作目录。
--cpp_out = . : 以c++语言格式输出,等号后面为输出文件存放的路径
--grpc_out = . :输出grpc框架接口文件。等号后面为输出文件存放的路径
--plugin=`which grpc_cpp_plugin` :指定一个protobuf插件(grpc_cpp_plugin)来生成grpc代码。
hello.proto : 核心文件,可以是路径helloword.proto ,或者绝对路径。
helloword.pb.cc/hello_pb.h:主要是对参数(MsgRequest,MsgResponse)的属性设置、参数类的重定向、参数成员的设置、获取等操作。
helloword.grpc.pb.cc/helloword.grpc.pb.h :该文件生成了proto方法(GetMsg)的属性操作接口,通过该存根实现服务器与客户端的通讯
四种不同编码方式
生成proto文件
建立route_guide.proto文件
syntax = "proto3";
option java_multiple_files = true;
option java_package = "io.grpc.examples.routeguide";
option java_outer_classname = "RouteGuideProto";
option objc_class_prefix = "RTG";
package routeguide;
// Interface exported by the server.
service RouteGuide {
// A simple RPC.
//
// Obtains the feature at a given position.
//
// A feature with an empty name is returned if there's no feature at the given
// position.
rpc GetFeature(Point) returns (Feature) {}
// A server-to-client streaming RPC.
//
// Obtains the Features available within the given Rectangle. Results are
// streamed rather than returned at once (e.g. in a response message with a
// repeated field), as the rectangle may cover a large area and contain a
// huge number of features.
rpc ListFeatures(Rectangle) returns (stream Feature) {}
// A client-to-server streaming RPC.
//
// Accepts a stream of Points on a route being traversed, returning a
// RouteSummary when traversal is completed.
rpc RecordRoute(stream Point) returns (RouteSummary) {}
// A Bidirectional streaming RPC.
//
// Accepts a stream of RouteNotes sent while a route is being traversed,
// while receiving other RouteNotes (e.g. from other users).
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
}
// Points are represented as latitude-longitude pairs in the E7 representation
// (degrees multiplied by 10**7 and rounded to the nearest integer).
// Latitudes should be in the range +/- 90 degrees and longitude should be in
// the range +/- 180 degrees (inclusive).
message Point {
int32 latitude = 1;
int32 longitude = 2;
}
// A latitude-longitude rectangle, represented as two diagonally opposite
// points "lo" and "hi".
message Rectangle {
// One corner of the rectangle.
Point lo = 1;
// The other corner of the rectangle.
Point hi = 2;
}
// A feature names something at a given point.
//
// If a feature could not be named, the name is empty.
message Feature {
// The name of the feature.
string name = 1;
// The point where the feature is detected.
Point location = 2;
}
// A RouteNote is a message sent while at a given point.
message RouteNote {
// The location from which the message is sent.
Point location = 1;
// The message to be sent.
string message = 2;
}
// A RouteSummary is received in response to a RecordRoute rpc.
//
// It contains the number of individual points received, the number of
// detected features, and the total distance covered as the cumulative sum of
// the distance between each point.
message RouteSummary {
// The number of points received.
int32 point_count = 1;
// The number of known features passed while traversing the route.
int32 feature_count = 2;
// The distance covered in metres.
int32 distance = 3;
// The duration of the traversal in seconds.
int32 elapsed_time = 4;
}
服务端代码
#include <algorithm>
#include <chrono>
#include <cmath>
#include <iostream>
#include <memory>
#include <string>
#include "helper.h"
#include <grpc/grpc.h>
#include <grpcpp/security/server_credentials.h>
#include <grpcpp/server.h>
#include <grpcpp/server_builder.h>
#include <grpcpp/server_context.h>
#ifdef BAZEL_BUILD
#include "examples/protos/route_guide.grpc.pb.h"
#else
#include "route_guide.grpc.pb.h"
#endif
using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::ServerReader;
using grpc::ServerReaderWriter;
using grpc::ServerWriter;
using grpc::Status;
using routeguide::Feature;
using routeguide::Point;
using routeguide::Rectangle;
using routeguide::RouteGuide;
using routeguide::RouteNote;
using routeguide::RouteSummary;
using std::chrono::system_clock;
float ConvertToRadians(float num) { return num * 3.1415926 / 180; }
// The formula is based on http://mathforum.org/library/drmath/view/51879.html
float GetDistance(const Point& start, const Point& end) {
const float kCoordFactor = 10000000.0;
float lat_1 = start.latitude() / kCoordFactor;
float lat_2 = end.latitude() / kCoordFactor;
float lon_1 = start.longitude() / kCoordFactor;
float lon_2 = end.longitude() / kCoordFactor;
float lat_rad_1 = ConvertToRadians(lat_1);
float lat_rad_2 = ConvertToRadians(lat_2);
float delta_lat_rad = ConvertToRadians(lat_2 - lat_1);
float delta_lon_rad = ConvertToRadians(lon_2 - lon_1);
float a = pow(sin(delta_lat_rad / 2), 2) +
cos(lat_rad_1) * cos(lat_rad_2) * pow(sin(delta_lon_rad / 2), 2);
float c = 2 * atan2(sqrt(a), sqrt(1 - a));
int R = 6371000; // metres
return R * c;
}
std::string GetFeatureName(const Point& point,
const std::vector<Feature>& feature_list) {
for (const Feature& f : feature_list) {
if (f.location().latitude() == point.latitude() &&
f.location().longitude() == point.longitude()) {
return f.name();
}
}
return "";
}
class RouteGuideImpl final : public RouteGuide::Service {
public:
explicit RouteGuideImpl(const std::string& db) {
routeguide::ParseDb(db, &feature_list_);
}
// 客户端使用存根向服务器发送请求 并等待响应返回
Status GetFeature(ServerContext* context, const Point* point,
Feature* feature) override {
feature->set_name(GetFeatureName(*point, feature_list_));
feature->mutable_location()->CopyFrom(*point);
return Status::OK;
}
// 服务器端流式RPC,其中客户端向服务器发送请求 并获取一个流来读取一系列消息
Status ListFeatures(ServerContext* context,
const routeguide::Rectangle* ,
ServerWriter<Feature>* writer) override {
auto lo = rectangle->lo();
auto hi = rectangle->hi();
long left = (std::min)(lo.longitude(), hi.longitude());
long right = (std::max)(lo.longitude(), hi.longitude());
long top = (std::max)(lo.latitude(), hi.latitude());
long bottom = (std::min)(lo.latitude(), hi.latitude());
for (const Feature& f : feature_list_) {
if (f.location().longitude() >= left &&
f.location().longitude() <= right &&
f.location().latitude() >= bottom &&
f.location().latitude() <= top) {
writer->Write(f);
}
}
return Status::OK;
}
// 客户端流式RPC,其中客户端写入一系列消息
// 并再次使用所提供的流将它们发送到服务器。一旦客户端 已完成写入消息,它等待服务器读取所有消息
Status RecordRoute(ServerContext* context, ServerReader<Point>* reader,
RouteSummary* summary) override {
Point point;
int point_count = 0;
int feature_count = 0;
float distance = 0.0;
Point previous;
system_clock::time_point start_time = system_clock::now();
while (reader->Read(&point)) {
point_count++;
if (!GetFeatureName(point, feature_list_).empty()) {
feature_count++;
}
if (point_count != 1) {
distance += GetDistance(previous, point);
}
previous = point;
}
system_clock::time_point end_time = system_clock::now();
summary->set_point_count(point_count);
summary->set_feature_count(feature_count);
summary->set_distance(static_cast<long>(distance));
auto secs =
std::chrono::duration_cast<std::chrono::seconds>(end_time - start_time);
summary->set_elapsed_time(secs.count());
return Status::OK;
}
// 双向流式RPC,其中双方都发送一系列消息 使用读写流。
// 这两个流独立运行,因此客户端 服务器可以按照他们喜欢的顺序进行读写:
// 例如, 服务器可以等待接收所有客户端消息,然后再写入其 响应,
// 或者它可以交替地读取消息然后写入消息,
// 或者 读和写的一些其他组合。消息在每个 流保存。您可以通过将stream 关键字在请求和响应之前
Status RouteChat(ServerContext* context,
ServerReaderWriter<RouteNote, RouteNote>* stream) override {
RouteNote note;
while (stream->Read(¬e)) {
std::unique_lock<std::mutex> lock(mu_);
for (const RouteNote& n : received_notes_) {
if (n.location().latitude() == note.location().latitude() &&
n.location().longitude() == note.location().longitude()) {
stream->Write(n);
}
}
received_notes_.push_back(note);
}
return Status::OK;
}
private:
std::vector<Feature> feature_list_;
std::mutex mu_;
std::vector<RouteNote> received_notes_;
};
void RunServer(const std::string& db_path) {
std::string server_address("0.0.0.0:50051");
RouteGuideImpl service(db_path);
ServerBuilder builder;
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
builder.RegisterService(&service);
std::unique_ptr<Server> server(builder.BuildAndStart());
std::cout << "Server listening on " << server_address << std::endl;
server->Wait();
}
int main(int argc, char** argv) {
// Expect only arg: --db_path=path/to/route_guide_db.json.
std::string db = routeguide::GetDbFileContent(argc, argv);
RunServer(db);
return 0;
}
客户端代码
#include <chrono>
#include <iostream>
#include <memory>
#include <random>
#include <string>
#include <thread>
#include "helper.h"
#include <grpc/grpc.h>
#include <grpcpp/channel.h>
#include <grpcpp/client_context.h>
#include <grpcpp/create_channel.h>
#include <grpcpp/security/credentials.h>
#ifdef BAZEL_BUILD
#include "examples/protos/route_guide.grpc.pb.h"
#else
#include "route_guide.grpc.pb.h"
#endif
using grpc::Channel;
using grpc::ClientContext;
using grpc::ClientReader;
using grpc::ClientReaderWriter;
using grpc::ClientWriter;
using grpc::Status;
using routeguide::Feature;
using routeguide::Point;
using routeguide::Rectangle;
using routeguide::RouteGuide;
using routeguide::RouteNote;
using routeguide::RouteSummary;
Point MakePoint(long latitude, long longitude) {
Point p;
p.set_latitude(latitude);
p.set_longitude(longitude);
return p;
}
Feature MakeFeature(const std::string& name, long latitude, long longitude) {
Feature f;
f.set_name(name);
f.mutable_location()->CopyFrom(MakePoint(latitude, longitude));
return f;
}
RouteNote MakeRouteNote(const std::string& message, long latitude,
long longitude) {
RouteNote n;
n.set_message(message);
n.mutable_location()->CopyFrom(MakePoint(latitude, longitude));
return n;
}
class RouteGuideClient {
public:
RouteGuideClient(std::shared_ptr<Channel> channel, const std::string& db)
: stub_(RouteGuide::NewStub(channel)) {
routeguide::ParseDb(db, &feature_list_);
}
void GetFeature() {
Point point;
Feature feature;
point = MakePoint(409146138, -746188906);
GetOneFeature(point, &feature);
point = MakePoint(0, 0);
GetOneFeature(point, &feature);
}
// 服务端流服务
void ListFeatures() {
routeguide::Rectangle rect;
Feature feature;
ClientContext context;
rect.mutable_lo()->set_latitude(400000000);
rect.mutable_lo()->set_longitude(-750000000);
rect.mutable_hi()->set_latitude(420000000);
rect.mutable_hi()->set_longitude(-730000000);
std::cout << "Looking for features between 40, -75 and 42, -73"
<< std::endl;
std::unique_ptr<ClientReader<Feature> > reader(
stub_->ListFeatures(&context, rect));
while (reader->Read(&feature)) {
std::cout << "Found feature called " << feature.name() << " at "
<< feature.location().latitude() / kCoordFactor_ << ", "
<< feature.location().longitude() / kCoordFactor_ << std::endl;
}
Status status = reader->Finish();
if (status.ok()) {
std::cout << "ListFeatures rpc succeeded." << std::endl;
} else {
std::cout << "ListFeatures rpc failed." << std::endl;
}
}
void RecordRoute() {
Point point;
RouteSummary stats;
ClientContext context;
const int kPoints = 10;
unsigned seed = std::chrono::system_clock::now().time_since_epoch().count();
std::default_random_engine generator(seed);
std::uniform_int_distribution<int> feature_distribution(
0, feature_list_.size() - 1);
std::uniform_int_distribution<int> delay_distribution(500, 1500);
std::unique_ptr<ClientWriter<Point> > writer(
stub_->RecordRoute(&context, &stats));
for (int i = 0; i < kPoints; i++) {
const Feature& f = feature_list_[feature_distribution(generator)];
std::cout << "Visiting point " << f.location().latitude() / kCoordFactor_
<< ", " << f.location().longitude() / kCoordFactor_
<< std::endl;
if (!writer->Write(f.location())) {
// Broken stream.
break;
}
std::this_thread::sleep_for(
std::chrono::milliseconds(delay_distribution(generator)));
}
writer->WritesDone();
Status status = writer->Finish();
if (status.ok()) {
std::cout << "Finished trip with " << stats.point_count() << " points\n"
<< "Passed " << stats.feature_count() << " features\n"
<< "Travelled " << stats.distance() << " meters\n"
<< "It took " << stats.elapsed_time() << " seconds"
<< std::endl;
} else {
std::cout << "RecordRoute rpc failed." << std::endl;
}
}
void RouteChat() {
ClientContext context;
std::shared_ptr<ClientReaderWriter<RouteNote, RouteNote> > stream(
stub_->RouteChat(&context));
std::thread writer([stream]() {
std::vector<RouteNote> notes{MakeRouteNote("First message", 0, 0),
MakeRouteNote("Second message", 0, 1),
MakeRouteNote("Third message", 1, 0),
MakeRouteNote("Fourth message", 0, 0)};
for (const RouteNote& note : notes) {
std::cout << "Sending message " << note.message() << " at "
<< note.location().latitude() << ", "
<< note.location().longitude() << std::endl;
stream->Write(note);
}
stream->WritesDone();
});
RouteNote server_note;
while (stream->Read(&server_note)) {
std::cout << "Got message " << server_note.message() << " at "
<< server_note.location().latitude() << ", "
<< server_note.location().longitude() << std::endl;
}
writer.join();
Status status = stream->Finish();
if (!status.ok()) {
std::cout << "RouteChat rpc failed." << std::endl;
}
}
private:
// 普通方式的调用
bool GetOneFeature(const Point& point, Feature* feature) {
ClientContext context;
Status status = stub_->GetFeature(&context, point, feature);
if (!status.ok()) {
std::cout << "GetFeature rpc failed." << std::endl;
return false;
}
if (!feature->has_location()) {
std::cout << "Server returns incomplete feature." << std::endl;
return false;
}
if (feature->name().empty()) {
std::cout << "Found no feature at "
<< feature->location().latitude() / kCoordFactor_ << ", "
<< feature->location().longitude() / kCoordFactor_ << std::endl;
} else {
std::cout << "Found feature called " << feature->name() << " at "
<< feature->location().latitude() / kCoordFactor_ << ", "
<< feature->location().longitude() / kCoordFactor_ << std::endl;
}
return true;
}
const float kCoordFactor_ = 10000000.0;
std::unique_ptr<RouteGuide::Stub> stub_;
std::vector<Feature> feature_list_;
};
int main(int argc, char** argv) {
// Expect only arg: --db_path=path/to/route_guide_db.json.
std::string db = routeguide::GetDbFileContent(argc, argv);
RouteGuideClient guide(
grpc::CreateChannel("localhost:50051",
grpc::InsecureChannelCredentials()),
db);
std::cout << "-------------- GetFeature --------------" << std::endl;
guide.GetFeature();
std::cout << "-------------- ListFeatures --------------" << std::endl;
guide.ListFeatures();
std::cout << "-------------- RecordRoute --------------" << std::endl;
guide.RecordRoute();
std::cout << "-------------- RouteChat --------------" << std::endl;
guide.RouteChat();
return 0;
}
异步的方式
服务端代码
#include <iostream>
#include <memory>
#include <string>
#include <thread>
#include "absl/flags/flag.h"
#include "absl/flags/parse.h"
#include "absl/strings/str_format.h"
#include <grpc/support/log.h>
#include <grpcpp/grpcpp.h>
#ifdef BAZEL_BUILD
#include "examples/protos/helloworld.grpc.pb.h"
#else
#include "helloworld.grpc.pb.h"
#endif
ABSL_FLAG(uint16_t, port, 50051, "Server port for the service");
using grpc::Server;
using grpc::ServerAsyncResponseWriter;
using grpc::ServerBuilder;
using grpc::ServerCompletionQueue;
using grpc::ServerContext;
using grpc::Status;
using helloworld::Greeter;
using helloworld::HelloReply;
using helloworld::HelloRequest;
class ServerImpl final {
public:
~ServerImpl() {
server_->Shutdown();
// 在关闭服务器之后总是要关闭完成队列
cq_->Shutdown();
}
// 没有服务器关闭的逻辑
void Run(uint16_t port) {
std::string server_address = absl::StrFormat("0.0.0.0:%d", port);
ServerBuilder builder;
// 在给定地址上进行监听,不适用任何的身份的验证
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
// 注册一个服务
builder.RegisterService(&service_);
// 获取用于Grpc异步通信的完成队列
cq_ = builder.AddCompletionQueue();
// 最后组装服务器
server_ = builder.BuildAndStart();
std::cout << "Server listening on " << server_address << std::endl;
// 进入到服务主循环
HandleRpcs();
}
private:
// 包含为处理请求所需的状态和逻辑的类
class CallData {
public:
// 包含为处理请求的所需的状态和逻辑的类
CallData(Greeter::AsyncService* service, ServerCompletionQueue* cq)
: service_(service), cq_(cq), responder_(&ctx_), status_(CREATE) {
// Invoke the serving logic right away.
Proceed();
}
void Proceed() {
if (status_ == CREATE) {
// 进行服务的调用
status_ = PROCESS;
// 作为初始 CREATE 状态的一部分,我们*请求*系统开始处理 SayHello 请求。
// 在此请求中,"this" 作为唯一标识请求的标签(以便不同的 CallData 实例可以同时为不同的请求提供服务),在这种情况下,是该 CallData 实例的内存地址。
service_->RequestSayHello(&ctx_, &request_, &responder_, cq_, cq_,
this);
} else if (status_ == PROCESS) {
// 在处理当前 CallData 的请求的同时,生成一个新的 CallData 实例以为新客户端提供服务。
// 该实例将在其 FINISH 状态下自动释放内存。
new CallData(service_, cq_);
// 实际处理逻辑。
std::string prefix("Hello ");
reply_.set_message(prefix + request_.name());
// 完成处理!使用该实例的内存地址作为事件的唯一标识标签,告诉 gRPC 运行时我们已经完成。
status_ = FINISH;
responder_.Finish(reply_, Status::OK, this);
} else {
GPR_ASSERT(status_ == FINISH);
// 一旦进入 FINISH 状态,释放自身(CallData)。
delete this;
}
}
private:
// 与 gRPC 运行时进行异步服务器通信的方法。
Greeter::AsyncService* service_;
// 用于异步服务器通知的生产者-消费者队列。
ServerCompletionQueue* cq_;
// 用于调整 rpc 的上下文,允许调整诸如使用压缩、身份验证等方面的参数,也可以将元数据发送回客户端。
ServerContext ctx_;
// 客户端发送的请求。
HelloRequest request_;
// 服务器发送给客户端的响应。
HelloReply reply_;
// 与客户端进行通信的方式。
ServerAsyncResponseWriter<HelloReply> responder_;
// 实现一个状态机,具有以下的状态
enum CallStatus { CREATE, PROCESS, FINISH };
// 当前的状态
CallStatus status_;
};
// 如果需要,可以在多个线程中运行此函数。
void HandleRpcs() {
// 生成一个新的 CallData 实例以为新客户端提供服务。
new CallData(&service_, cq_.get());
// 唯一标识请求的指针。
void* tag;
bool ok;
while (true) {
// 阻塞等待从完成队列读取下一个事件。该事件由其标签唯一标识,此处是 CallData 实例的内存地址。
// 应始终检查 Next 的返回值。该返回值告诉我们是否有任何事件或者 cq_ 是否正在关闭。
GPR_ASSERT(cq_->Next(&tag, &ok));
GPR_ASSERT(ok);
static_cast<CallData*>(tag)->Proceed();
}
}
std::unique_ptr<ServerCompletionQueue> cq_;
Greeter::AsyncService service_;
std::unique_ptr<Server> server_;
};
int main(int argc, char** argv) {
absl::ParseCommandLine(argc, argv);
ServerImpl server;
server.Run(absl::GetFlag(FLAGS_port));
return 0;
}
客户端代码
#include <iostream>
#include <memory>
#include <string>
#include "absl/flags/flag.h"
#include "absl/flags/parse.h"
#include <grpc/support/log.h>
#include <grpcpp/grpcpp.h>
#ifdef BAZEL_BUILD
#include "examples/protos/helloworld.grpc.pb.h"
#else
#include "helloworld.grpc.pb.h"
#endif
ABSL_FLAG(std::string, target, "localhost:50051", "Server address");
using grpc::Channel;
using grpc::ClientAsyncResponseReader;
using grpc::ClientContext;
using grpc::CompletionQueue;
using grpc::Status;
using helloworld::Greeter;
using helloworld::HelloReply;
using helloworld::HelloRequest;
class GreeterClient {
public:
explicit GreeterClient(std::shared_ptr<Channel> channel)
: stub_(Greeter::NewStub(channel)) {}
// 组装客户端数据,发送请求,并从服务器获取响应
std::string SayHello(const std::string& user) {
// 发送到服务端的数据
HelloRequest request;
request.set_name(user);
// 从服务端返回的代码
HelloReply reply;
// 客户端上下文信息
ClientContext context;
// 用来与Grpc进行同行时的异步通信生产者和消费者
CompletionQueue cq;
// 保存对象grpc状态
Status status;
std::unique_ptr<ClientAsyncResponseReader<HelloReply> > rpc(
stub_->AsyncSayHello(&context, request, &cq));
// 请求,在RPC完成后,将"reply"更新为服务器的响应;
// 将"status"更新为操作是否成功的指示。为请求添加标签1。
rpc->Finish(&reply, &status, (void*)1);
void* got_tag;
bool ok = false;
// 阻塞,等待完成队列"cq"中的下一个结果。
// Next的返回值应该始终被检查。返回值告诉我们是否有任何事件或者cq_是否正在关闭。
GPR_ASSERT(cq.Next(&got_tag, &ok));
// 确保"cq"中的结果与我们之前的请求所带的标签相符。
GPR_ASSERT(got_tag == (void*)1);
// 且请求成功完成。注意,"ok"仅与Finish()引入的请求更新相关。
GPR_ASSERT(ok);
// 根据实际RPC的状态采取行动。
if (status.ok()) {
return reply.message();
} else {
return "RPC failed";
}
}
private:
// 通过传入的Channel创建stub,存储在这里,这是我们对服务器提供的服务的视图。
std::unique_ptr<Greeter::Stub> stub_;
};
int main(int argc, char** argv) {
absl::ParseCommandLine(argc, argv);
// 实例化客户端。需要一个Channel,
// 用于创建实际的RPC。该Channel模拟了与通过"--target="参数指定的端点的连接。
std::string target_str = absl::GetFlag(FLAGS_target);
// We indicate that the channel isn't authenticated (use of
// InsecureChannelCredentials()).
GreeterClient greeter(
grpc::CreateChannel(target_str, grpc::InsecureChannelCredentials()));
std::string user("world");
std::string reply = greeter.SayHello(user); // The actual RPC call!
std::cout << "Greeter received: " << reply << std::endl;
return 0;
}
GRpc byte的使用
syntax = "proto3";
package image;
service ImageService
{ rpc GetImageData () returns (ImageResponse);
}
message ImageResponse {
bytes image_data = 1;
}
cv::Mat image = cv::imread("path/to/your/image.jpg");
std::vector<uchar> buffer;
cv::imencode(".jpg", image, buffer);
// 将字节流赋值给 gRPC 响应
response->set_image_data(buffer.data(), buffer.size());
文章评论