api 发消息给mq服务时候把整个链路穿起来,在go-zero中怎么做
创始人
2024-03-25 00:10:18
0

今天我想讲的是,除了 go-zero 默认在 api 的 middleware 与 rpc 的 interceptor 中帮我们集成好的链路追踪,我们想自己在某些本地方法添加链路追踪代码或者我们想在 api 发送一个消息给 mq 服务时候想把整个链路包含 mq 的 producer、consumer 穿起来,在 go-zero 中该如何做。

场景

我们先简单讲一下我们的小 demo 的场景,一个请求进来调用 api 的 Login 方法,在 Login 方法中先调用 rpc 的 GetUserByMobile 方法,之后在调用 api 本地的 local 方法,紧接着调用 rabbitmq 传递消息到 mq 服务。

go-zero 默认集成了 jaeger、zinpink,这里我们就以 jaeger 为例

我们希望看到的链路是

也就是 api 衍生出来三条子链路,api.producerMq 有一条调用 mq.Consumer 的子链路。

我们想要将一个方法添加到链路中需要两个因素,一个 traceId,一个 span,当我们在同一个 traceId 下开启 span 把相关的 span 都串联起来,如果想形成父子关系,就要把 span 之间相互串联起来,因为「微服务实践」公众号中讲解原理太多,我这里就简单提一下不涉及过多,如果不是特别熟悉原理可以看文章开头推荐的文章,这里我们只需要知道 traceId 与 spanId 关系就好。

核心业务代码

1、首先 API 中 LoginLogic 代码

type LoginLogic struct {logx.Loggerctx    context.ContextsvcCtx *svc.ServiceContext
}func NewLoginLogic(ctx context.Context, svcCtx *svc.ServiceContext) *LoginLogic {return &LoginLogic{Logger: logx.WithContext(ctx),ctx:    ctx,svcCtx: svcCtx,}
}type MsgBody struct {Carrier *propagation.HeaderCarrierMsg     string
}func (l *LoginLogic) Login(req *types.RegisterReq) (*types.AccessTokenResp, error) {resp, err := l.svcCtx.UserRpc.GetUserByMobile(l.ctx, &usercenter.GetUserByMobileReq{Mobile: req.Mobile,})if err != nil {return &types.AccessTokenResp{}, nil}l.local()tracer := otel.GetTracerProvider().Tracer(trace.TraceName)spanCtx, span := tracer.Start(l.ctx, "send_msg_mq",  oteltrace.WithSpanKind(oteltrace.SpanKindProducer))carrier := &propagation.HeaderCarrier{}otel.GetTextMapPropagator().Inject(spanCtx, carrier)producer := rabbit.NewRabbitmqPublisher(RabbitmqDNS)msg :=  &MsgBody{Carrier: carrier,Msg:     req.Mobile,}b, err := json.Marshal(msg)if err != nil{panic(err)}if err := producer.Publish(spanCtx, ExchangeName, RoutineKeys, b); err != nil {logx.Errorf("Publish Fail , msg :%s , err:%v", msg, err)}span.End()return &types.AccessTokenResp{AccessExpire: resp.User.Id,}, err
}func (l *LoginLogic) local() {tracer := otel.GetTracerProvider().Tracer(trace.TraceName)_ , span := tracer.Start(l.ctx, "local", oteltrace.WithSpanKind(oteltrace.SpanKindInternal))defer span.End()// 执行你的代码 .....
}

2、rpc 中 GetUserByMobile 的代码

func (s *Logic) GetUserByMobile(context.Context, *usercenterPb.GetUserByMobileReq) (*usercenterPb.GetUserByMobileResp, error) {vo := &usercenterPb.UserVo{Id: 1,}return &usercenterPb.GetUserByMobileResp{User: vo,}, nil
}

3、mq 中 Consumer 的代码

type MsgBody struct {Carrier *propagation.HeaderCarrierMsg     string
}func (c *consumer) Consumer(ctx context.Context, data []byte) error {var msg MsgBodyif err := json.Unmarshal(data, &msg); err != nil {logx.Errorf(" consumer err : %v", err)} else {logx.Infof("consumerOne Consumer  , msg:%+v", msg)wireContext := otel.GetTextMapPropagator().Extract(ctx, msg.Carrier)tracer := otel.GetTracerProvider().Tracer(trace.TraceName)_, span := tracer.Start(wireContext, "mq_consumer_msg", oteltrace.WithSpanKind(oteltrace.SpanKindConsumer))defer span.End()}return nil
}

代码详解

1、go-zero 默认集成

当一个请求进入 api 后,我们可以在 go-zero 源码中查看到 https://github.com/zeromicro/go-zero/blob/master/rest/engine.go#L92。go-zero 已经在 api 的 middleware 中帮我们添加了第一层 trace,当进入 Login 方法内,我们调用了 rpc 的 GetUserByMobile 方法,通过 go-zero 的源码 https://github.com/zeromicro/go-zero/blob/master/zrpc/internal/rpcserver.go#L55 可以看到在 rpc 的 interceptor 也默认帮我们添加好了,这两层都是 go-zero 默认帮我们做好的。

2、本地方法

当调用完 rpc 的 GetUserByMobile 之后,api 调用了本地的 local,如果我们想在整个链路上体现出来调用了本地 local 方法,那默认的 go-zero 是没有帮我们做的,需要我们手动来添加。

  tracer := otel.GetTracerProvider().Tracer(trace.TraceName)_ , span := tracer.Start(l.ctx, "local",  oteltrace.WithSpanKind(oteltrace.SpanKindInternal))defer span.End()// 执行你的代码 .....

我们通过上面代码拿到 tracer,ctx 之后开启一个 local 的 span,因为 start 时候会从 ctx 获取父 span 所以会将 local 方法与 Login 串联起父子调用关系,这样就将本次操作加入了这个链路

3、mq 的 producer 到 mq 的 consumer

我们在 mq 传递中如何串联起来这个链路呢?也就是形成 api.Login->api.producer->mq.Consumer

想一下原理,虽然跨越了网络,api 可以通过 header 传递,rpc 可以通过 metadata 传递,那么 mq 是不是也可以通过 headerbody 传递就可以了,按照这个想法来看下我门的代码。

  tracer := otel.GetTracerProvider().Tracer(trace.TraceName)spanCtx , span := tracer.Start(l.ctx, "send_msg_mq", oteltrace.WithSpanKind(oteltrace.SpanKindProducer))carrier := &propagation.HeaderCarrier{}otel.GetTextMapPropagator().Inject(spanCtx,carrier)producer := rabbit.NewRabbitmqPublisher(RabbitmqDNS)msg := &MsgBody{Carrier: carrier,Msg:     req.Mobile,}b , err := json.Marshal(msg)if err != nil{panic(err)}if err := producer.Publish(spanCtx, ExchangeName, RoutineKeys, b); err != nil {logx.Errorf("Publish Fail, msg :%s, err:%v", msg, err)}span.End()

首先获取到了这个全局的 tracer,然后开启一个 producer 的 span,跟 local 方法一样,我们开启 producer 的 span 时候也是通过 ctx 获取到上一级父级 span,这样就可以将 producer 的 span 与 Login 形成父子 span 调用关系,那我们想将 producer 的 span 与 mq 的 consumer 中的 span 形成调用父子关系怎么做?我们将 api.producer 的 spanCtx 注入到 carrier 中,这里我们通过 mq 的 body 将 carrier 发送给 consumer,发送完成我们 stop 我们的 producer,那么 producer 的这层链路完成了。

随后我们来看 mq-consumer 在接收到 body 消息之后怎么做的。

type MsgBody struct {Carrier *propagation.HeaderCarrierMsg     string
}func (c *consumer) Consumer(ctx context.Context, data []byte) error {var msg MsgBodyif err := json.Unmarshal(data, &msg); err != nil {logx.Errorf(" consumer err : %v", err)} else {logx.Infof("consumerOne Consumer  , msg:%+v", msg)wireContext := otel.GetTextMapPropagator().Extract(ctx, msg.Carrier)tracer := otel.GetTracerProvider().Tracer(trace.TraceName)_, span := tracer.Start(wireContext, "mq_consumer_msg", oteltrace.WithSpanKind(oteltrace.SpanKindConsumer))defer span.End()}return nil
}

consumer 接收到消息后反序列化出来 Carrier *propagation.HeaderCarrier,然后通过 otel.GetTextMapPropagator().Extract 取出来 api.producer 注入的 wireContext,在通过 tracer.StartwireContext 创建 consumer 的 span,这样 consumer 就是 api.producer 的子 span,就形成了调用链路关系,最终我们得到的关系就是

让我们来调用一下 Logic 方法,看下 jaeger 中的链路如果与我们预想的链路一致,so happy~

相关内容

热门资讯

监控摄像头接入GB28181平... 流程简介将监控摄像头的视频在网站和APP中直播,要解决的几个问题是:1&...
Windows10添加群晖磁盘... 在使用群晖NAS时,我们需要通过本地映射的方式把NAS映射成本地的一块磁盘使用。 通过...
protocol buffer... 目录 目录 什么是protocol buffer 1.protobuf 1.1安装  1.2使用...
在Word、WPS中插入AxM... 引言 我最近需要写一些文章,在排版时发现AxMath插入的公式竟然会导致行间距异常&#...
Fluent中创建监测点 1 概述某些仿真问题,需要创建监测点,用于获取空间定点的数据࿰...
educoder数据结构与算法...                                                   ...
MySQL下载和安装(Wind... 前言:刚换了一台电脑,里面所有东西都需要重新配置,习惯了所...
MFC文件操作  MFC提供了一个文件操作的基类CFile,这个类提供了一个没有缓存的二进制格式的磁盘...
有效的括号 一、题目 给定一个只包括 '(',')','{','}'...
【Ctfer训练计划】——(三... 作者名:Demo不是emo  主页面链接:主页传送门 创作初心ÿ...