Golang学习实战笔记-基础 > golang实战之grpc微服务编程
grpc server 配置源码分析

grpc 配置代码分析

grpc server 所有配置项目

ServerOption 默认 server 配置

var defaultServerOptions = serverOptions{
	maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
	maxSendMessageSize:    defaultServerMaxSendMessageSize,
	connectionTimeout:     120 * time.Second,
	writeBufferSize:       defaultWriteBufSize,
	readBufferSize:        defaultReadBufSize,
}

grpc ServerOption 接口,通过实现这个接口来修改grpc的配置参数

// ServerOption 接口 ,可以理解为一个包装类,用来包装 serverOptions
type ServerOption interface {
apply(*serverOptions)
}

funcServerOption 用来生成一个包装函数

// funcServerOption 是一个包装函数 ,它实现了ServerOption可以修改serverOptions内容.
type funcServerOption struct {
f func(*serverOptions)
}

func (fdo *funcServerOption) apply(do *serverOptions) {
fdo.f(do)
}

func newFuncServerOption(f func(*serverOptions)) *funcServerOption {
return &funcServerOption{
f: f,
}
}

golang grpc官方包定义的funcServerOption 包装函数实例

修改grpc server发送缓存区大小

func WriteBufferSize(s int) ServerOption {
return newFuncServerOption(func(o *serverOptions) {
o.writeBufferSize = s
})
}

修改连接保活选项

func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
if kp.Time > 0 && kp.Time < time.Second {
grpclog.Warning("Adjusting keepalive ping interval to minimum period of 1s")
kp.Time = time.Second
}

return newFuncServerOption(func(o *serverOptions) {
o.keepaliveParams = kp
})
}

设置自己的解码器

func CustomCodec(codec Codec) ServerOption {
return newFuncServerOption(func(o *serverOptions) {
o.codec = codec
})
}

设置最大的接受消息大小

func MaxRecvMsgSize(m int) ServerOption {
return newFuncServerOption(func(o *serverOptions) {
o.maxReceiveMessageSize = m
})
}

拦截器(中间件),只能配置一个拦截器, 多个拦截器可以在这个拦截器内部实现.

func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
return newFuncServerOption(func(o *serverOptions) {
if o.unaryInt != nil {
panic("The unary server interceptor was already set and may not be reset.")
}
o.unaryInt = i
})
}


## 配置日志中间件

grpcServer := grpc.NewServer(
    grpc_middleware.WithUnaryServerChain(
        grpc_logrus.UnaryServerInterceptor(entry),
        grpc_ctxtags.UnaryServerInterceptor(
            grpc_ctxtags.WithFieldExtractor(grpc_ctxtags.CodeGenRequestFieldExtractor),
            grpc_ctxtags.WithFieldExtractor(func(fullMethod string, req interface{}) map[string]interface{} {
                return map[string]interface{}{ "requestData": req}
            }),
        ),
))

下面分析 ctx_logrus 具体代码实现

ctx_logrus 暴露了三个比较实用的方法 AddFields, Extract ,ToContext,下面看看他们的具体实现

package ctxlogrus
import (
"github.com/grpc-ecosystem/go-grpc-middleware/tags"
"github.com/sirupsen/logrus"
"golang.org/x/net/context"
)
type ctxLoggerMarker struct{}
type ctxLogger struct {
logger *logrus.Entry
fields logrus.Fields
}
var (
   ctxLoggerKey = &ctxLoggerMarker{}
)
//添加logrus.Entry到context, 这个操作添加的logrus.Entry在后面AddFields和Extract都会使用到
func ToContext(ctx context.Context, entry *logrus.Entry) context.Context {
   l := &ctxLogger{
   logger: entry,
   fields: logrus.Fields{},
}
return context.WithValue(ctx, ctxLoggerKey, l)
}

//添加日志字段到日志中间件(ctx_logrus),后面记录日志追踪的request_id 就是使用这个方法
func AddFields(ctx context.Context, fields logrus.Fields) {
   l, ok := ctx.Value(ctxLoggerKey).(*ctxLogger)
   if !ok || l == nil {
       return
   }
   for k, v := range fields {
   l.fields[k] = v
}
}

//导出ctx_logrus日志库和grpc_ctxtags中间件在中记录的日志内容
func Extract(ctx context.Context) *logrus.Entry {
  l, ok := ctx.Value(ctxLoggerKey).(*ctxLogger)
  if !ok || l == nil {
   return logrus.NewEntry(nullLogger)
   }
   fields := logrus.Fields{}
   // Add grpc_ctxtags tags metadata until now.
   tags := grpc_ctxtags.Extract(ctx)
   for k, v := range tags.Values() {
       fields[k] = v
   }
   // Add logrus fields added until now.
   for k, v := range l.fields {
       fields[k] = v
   }
   return l.logger.WithFields(fields)
}

grpc_logrus.UnaryServerInterceptor() 如何将 ctxlogrus context 导入到 context

func newLoggerForCall(ctx context.Context, entry *logrus.Entry, 
        fullMethodString string, start time.Time) context.Context {
	service := path.Dir(fullMethodString)[1:]
	method := path.Base(fullMethodString)
	callLog := entry.WithFields(
		logrus.Fields{
			SystemField:       "grpc",
			KindField:         "server",
			"grpc.service":    service,
			"grpc.method":     method,
			"grpc.start_time": start.Format(time.RFC3339),
		})

	if d, ok := ctx.Deadline(); ok {
		callLog = callLog.WithFields(
			logrus.Fields{
				"grpc.request.deadline": d.Format(time.RFC3339),
			})
	}

	callLog = callLog.WithFields(ctx_logrus.Extract(ctx).Data)
	return ctxlogrus.ToContext(ctx, callLog)
}


代码实例 在request中间中将requestid 记录到日志中去

//request中间件 将request_id 记录到请求日志中日志中
func request(ctx context.Context) (context.Context, error) {
	md, _ := metadata.FromIncomingContext(ctx)
	var requestId string
	if val, ok := md[constants.RequestId]; ok {
		requestId = val[0]
	} else {
		return ctx, status.Errorf(codes.Unauthenticated, "no metadata %s", constants.RequestId)
	}

	ctx = context.WithValue(ctx, constants.RequestId, requestId)
        //将请求的request_id 记录到日志中间件ctxlogrus中
        //这样在本次请求的其他的日志都可以获取到RequestId,方便日志跟踪
	ctxlogrus.AddFields(ctx, logrus.Fields{
		constants.RequestId: requestId,
	})
	return ctx, nil
}

func Request() grpc.UnaryServerInterceptor {
	interceptor := func(ctx context.Context,
		req interface{},
		info *grpc.UnaryServerInfo,
		handler grpc.UnaryHandler) (resp interface{}, err error) {
		ctx, err = request(ctx)
		if err != nil {
			return
		}
		return handler(ctx, req)
	}
	return interceptor
}


// 在请求的其他地方可以使用ctxlogrus.Extract(ctx)导出*logrus.Entry来记录日志

// ctx保存了request_id 下面的日志会输出request_id

ctxlogrus.Extract(ctx).Errorf("数据库错误: %s", err.Error())