grpc server 所有配置项目
ServerOption 默认 server 配置
var defaultServerOptions = serverOptions{ maxReceiveMessageSize: defaultServerMaxReceiveMessageSize, maxSendMessageSize: defaultServerMaxSendMessageSize, connectionTimeout: 120 * time.Second, writeBufferSize: defaultWriteBufSize, readBufferSize: defaultReadBufSize, }
grpc ServerOption 接口,通过实现这个接口来修改grpc的配置参数
// ServerOption 接口 ,可以理解为一个包装类,用来包装 serverOptions
type ServerOption interface {
apply(*serverOptions)
}
funcServerOption 用来生成一个包装函数
// funcServerOption 是一个包装函数 ,它实现了ServerOption可以修改serverOptions内容.
type funcServerOption struct {
f func(*serverOptions)
}
func (fdo *funcServerOption) apply(do *serverOptions) {
fdo.f(do)
}
func newFuncServerOption(f func(*serverOptions)) *funcServerOption {
return &funcServerOption{
f: f,
}
}
func WriteBufferSize(s int) ServerOption {
return newFuncServerOption(func(o *serverOptions) {
o.writeBufferSize = s
})
}
func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
if kp.Time > 0 && kp.Time < time.Second {
grpclog.Warning("Adjusting keepalive ping interval to minimum period of 1s")
kp.Time = time.Second
}
return newFuncServerOption(func(o *serverOptions) {
o.keepaliveParams = kp
})
}
func CustomCodec(codec Codec) ServerOption {
return newFuncServerOption(func(o *serverOptions) {
o.codec = codec
})
}
func MaxRecvMsgSize(m int) ServerOption {
return newFuncServerOption(func(o *serverOptions) {
o.maxReceiveMessageSize = m
})
}
func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
return newFuncServerOption(func(o *serverOptions) {
if o.unaryInt != nil {
panic("The unary server interceptor was already set and may not be reset.")
}
o.unaryInt = i
})
}
grpcServer := grpc.NewServer(
grpc_middleware.WithUnaryServerChain(
grpc_logrus.UnaryServerInterceptor(entry),
grpc_ctxtags.UnaryServerInterceptor(
grpc_ctxtags.WithFieldExtractor(grpc_ctxtags.CodeGenRequestFieldExtractor),
grpc_ctxtags.WithFieldExtractor(func(fullMethod string, req interface{}) map[string]interface{} {
return map[string]interface{}{ "requestData": req}
}),
),
))
下面分析 ctx_logrus 具体代码实现
ctx_logrus 暴露了三个比较实用的方法 AddFields, Extract ,ToContext,下面看看他们的具体实现
package ctxlogrus
import (
"github.com/grpc-ecosystem/go-grpc-middleware/tags"
"github.com/sirupsen/logrus"
"golang.org/x/net/context"
)
type ctxLoggerMarker struct{}
type ctxLogger struct {
logger *logrus.Entry
fields logrus.Fields
}
var (
ctxLoggerKey = &ctxLoggerMarker{}
)
//添加logrus.Entry到context, 这个操作添加的logrus.Entry在后面AddFields和Extract都会使用到
func ToContext(ctx context.Context, entry *logrus.Entry) context.Context {
l := &ctxLogger{
logger: entry,
fields: logrus.Fields{},
}
return context.WithValue(ctx, ctxLoggerKey, l)
}
//添加日志字段到日志中间件(ctx_logrus),后面记录日志追踪的request_id 就是使用这个方法
func AddFields(ctx context.Context, fields logrus.Fields) {
l, ok := ctx.Value(ctxLoggerKey).(*ctxLogger)
if !ok || l == nil {
return
}
for k, v := range fields {
l.fields[k] = v
}
}
//导出ctx_logrus日志库和grpc_ctxtags中间件在中记录的日志内容
func Extract(ctx context.Context) *logrus.Entry {
l, ok := ctx.Value(ctxLoggerKey).(*ctxLogger)
if !ok || l == nil {
return logrus.NewEntry(nullLogger)
}
fields := logrus.Fields{}
// Add grpc_ctxtags tags metadata until now.
tags := grpc_ctxtags.Extract(ctx)
for k, v := range tags.Values() {
fields[k] = v
}
// Add logrus fields added until now.
for k, v := range l.fields {
fields[k] = v
}
return l.logger.WithFields(fields)
}
grpc_logrus.UnaryServerInterceptor() 如何将 ctxlogrus context 导入到 context
func newLoggerForCall(ctx context.Context, entry *logrus.Entry, fullMethodString string, start time.Time) context.Context { service := path.Dir(fullMethodString)[1:] method := path.Base(fullMethodString) callLog := entry.WithFields( logrus.Fields{ SystemField: "grpc", KindField: "server", "grpc.service": service, "grpc.method": method, "grpc.start_time": start.Format(time.RFC3339), }) if d, ok := ctx.Deadline(); ok { callLog = callLog.WithFields( logrus.Fields{ "grpc.request.deadline": d.Format(time.RFC3339), }) } callLog = callLog.WithFields(ctx_logrus.Extract(ctx).Data) return ctxlogrus.ToContext(ctx, callLog) }
代码实例 在request中间中将requestid 记录到日志中去
//request中间件 将request_id 记录到请求日志中日志中 func request(ctx context.Context) (context.Context, error) { md, _ := metadata.FromIncomingContext(ctx) var requestId string if val, ok := md[constants.RequestId]; ok { requestId = val[0] } else { return ctx, status.Errorf(codes.Unauthenticated, "no metadata %s", constants.RequestId) } ctx = context.WithValue(ctx, constants.RequestId, requestId) //将请求的request_id 记录到日志中间件ctxlogrus中 //这样在本次请求的其他的日志都可以获取到RequestId,方便日志跟踪 ctxlogrus.AddFields(ctx, logrus.Fields{ constants.RequestId: requestId, }) return ctx, nil } func Request() grpc.UnaryServerInterceptor { interceptor := func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { ctx, err = request(ctx) if err != nil { return } return handler(ctx, req) } return interceptor }
// 在请求的其他地方可以使用ctxlogrus.Extract(ctx)导出*logrus.Entry来记录日志
// ctx保存了request_id 下面的日志会输出request_id
ctxlogrus.Extract(ctx).Errorf("数据库错误: %s", err.Error())