在梳理变量机制中,发现与context、对象复用等机制都有一定关联,而context是贯穿始终的关键,所以本来只想记录变量机制,结果对context、对象复用封装都进行了review:
(review 相关经验:
)
mosnCtx: 是对context 库的wrapper,操作统一使用mosnCtx 库,而不使用标准的context库,为什么要加一个wrapper 呢?
context 两大功能:cancel (通知)、WithValue (request 级别的上下文);
缺点:WithValue 易被滥用,会返回一个新的context,后续查找时造成链表递归查询,效率低下
mosnCtx 封装WithValue的目的是:减少返回新的context、改进Value查询:将value 存储控制在本层,由于标准的context 只支持key,value,如果存储多个只能套娃,查询的时候也要遍历。所以改进的方式是:在本层使用数组(带 index 的 数组效率优于map),查询的时候也不套娃。所以使用的时候要注意这点:
实现了一个 context 接口
type valueCtx struct {
context.Context
builtin [ContextKeyEnd]interface{}
}
// 这里定义了 context 内置数组 只能存储下面 index 的value 上下文
// Context key types(built-in)
const (
ContextKeyStreamID ContextKey = iota
ContextKeyConnection
ContextKeyConnectionID
ContextKeyConnectionPoolIndex
ContextKeyListenerPort
ContextKeyListenerName
ContextKeyListenerType
ContextKeyListenerStatsNameSpace
ContextKeyNetworkFilterChainFactories
ContextKeyBufferPoolCtx
ContextKeyAccessLogs
ContextOriRemoteAddr
ContextKeyAcceptChan
ContextKeyAcceptBuffer
ContextKeyConnectionFd
ContextSubProtocol
ContextKeyTraceSpanKey
ContextKeyActiveSpan
ContextKeyTraceId
ContextKeyVariables
ContextKeyProxyGeneralConfig
ContextKeyDownStreamProtocol
ContextKeyConfigDownStreamProtocol
ContextKeyConfigUpStreamProtocol
ContextKeyDownStreamHeaders
ContextKeyDownStreamRespHeaders
ContextKeyEnd
)
查询值 WithValue:往内置数组存储 value, 尽量返回一个 valueCtx 类型的context,所以内容一定是存储在数组中的。
WithValue 封装这里强调 不能和 context.WithValue 混用:为什么呢,一旦 使用了 context.WithValue 会创建一个新的context,初始的context 的数组丢失了,而mosnCtx 在做值查询Value 的时候是(特意)没有做递归查询的,那么初始的数组内容会丢失。
// WithValue add the given key-value pair into the existed value context, or create a new value context which contains the pair.
// This Function should not be used along with the official context.WithValue !!
// The following context topology will leads to existed pair {'foo':'bar'} NOT FOUND, because recursive lookup for
// key-type=ContextKey is not supported by mosn.valueCtx.
//
// topology: context.Background -> mosn.valueCtx{'foo':'bar'} -> context.valueCtx -> mosn.valueCtx{'hmm':'haa'}
func WithValue(parent context.Context, key ContextKey, value interface{}) context.Context {
if mosnCtx, ok := parent.(*valueCtx); ok {
mosnCtx.builtin[key] = value
return mosnCtx
}
// create new valueCtx
mosnCtx := &valueCtx{Context: parent}
mosnCtx.builtin[key] = value
return mosnCtx
}
获取值:如果是valueCtx 则从数组获取 否则 兼容原生的语义
没有做递归查询 除了效率问题外,还有一点很重要 避免值查询滥用,使用者需清晰知道生命周期和边界 以减少 错误的产生 和代码逻辑的混乱。
封装的同时还保留了原有context的语义,但是我们尽量不要混用。
// ContextKey type
type ContextKey int
// 封装 value 是为了兼容原生Context
// 原生的value key必须时string 类型的 ,所以不用担心获取int 类型数组和原生的int 冲突问题
func (c *valueCtx) Value(key interface{}) interface{} {
if contextKey, ok := key.(ContextKey); ok {
return c.builtin[contextKey]
}
return c.Context.Value(key)
}
func Get(ctx context.Context, key ContextKey) interface{} {
if mosnCtx, ok := ctx.(*valueCtx); ok {
return mosnCtx.builtin[key]
}
return ctx.Value(key)
}
以上context 看似够用了,发现mosnCtx 还额外封装了一个Clone: 如果是原生的Context 就返回原生的 否则返回一个新的valueCtx,注意这里是数组拷贝:会返回一个新的数组 而不是指向 旧数组
// Clone copy the origin mosn value context(if it is), and return new one
func Clone(parent context.Context) context.Context {
if mosnCtx, ok := parent.(*valueCtx); ok {
clone := &valueCtx{Context: mosnCtx}
// array copy assign
clone.builtin = mosnCtx.builtin
return clone
}
return parent
}
我们查看 调用点:发现只有内存复用机制有使用到,下面先分析内存复用机制,再回过来分析 context 其他细节:
func (cm *ContextManager) Next() {
// buffer context
cm.curr = buffer.NewBufferPoolContext(mosnctx.Clone(cm.base))
// variable context
cm.curr = variable.NewVariableContext(cm.curr)
}
内存对象复用主要分为:通用的内存对象 管理(分配 和 重置) 以及 对象结构体引用(mosn 结合context 特有的管理)。
BufferPoolCtx (接口)是通用对象的管理方法,比如http/buffer.go 、 proxy/buffer.go 中的对象 proxyBuffer 和 httpBuffer 都是由 该接口 申请释放。httpBuffer、proxyBuffer 等对象 都继承了 tempBufferCtx 对象需要实现自己的 BufferPoolCtx方法。
const maxBufferPool = 16
var (
index int32
bPool = bufferPoolArray[:]
vPool = new(valuePool)
bufferPoolArray [maxBufferPool]bufferPool
nullBufferValue [maxBufferPool]interface{}
)
// bufferPool is buffer pool
type bufferPool struct {
ctx types.BufferPoolCtx
sync.Pool
}
type valuePool struct {
sync.Pool
}
被分配的对象 都继承了 tempBufferCtx
// BufferPoolCtx is the bufferpool's context
type BufferPoolCtx interface {
// Index returns the bufferpool's Index
Index() int
// New returns the buffer
New() interface{}
Reset 如果直接放弃了原先的成员指针,交由gc 处理了,否则如果需要复用成员指针指向的内存 则对成员指针指向的内容进行reset,内存指针继续保留
// Reset resets the buffer
Reset(interface{})
}
type TempBufferCtx struct {
index int
}
下面看一下 buffer/buffer.go 对外 方法:
RegisterBuffer 在 http/buffer.go 、proxy/buffer.go 中 init即会注册变量,为什么会有一个index 机制呢?因为要区分 不同对象 ,所以 每种对象都有自己一个单独的 sync.pool 大池子。
func RegisterBuffer(poolCtx types.BufferPoolCtx) {
// frist index is 1
i := atomic.AddInt32(&index, 1)
if i >= maxBufferPool {
panic("bufferSize over full")
}
bPool[i].ctx = poolCtx
setIndex(poolCtx, int(i))
}
setIndex 这个函数也很有意思。将接口转换成具体的对象,并设置具体对象的值。所以所有的内存对象都需要继承 tempBufferCtx 结构体
可以对比下 将setIndex 变成接口的方法,index 变成每个对象的成员变量的写法
// ifaceWords is interface internal representation.
type ifaceWords struct {
typ unsafe.Pointer
data unsafe.Pointer
}
// setIdex sets index, poolCtx must embedded TempBufferCtx
func setIndex(poolCtx types.BufferPoolCtx, i int) {
p := (*ifaceWords)(unsafe.Pointer(&poolCtx))
temp := (*TempBufferCtx)(p.data)
temp.index = i
}
bufferPool 是某一类对象的 sync pool 池子 ,返回 bufferPoolCtx 接口 new 的对象
// Take returns a buffer from buffer pool
func (p *bufferPool) take() (value interface{}) {
value = p.Get()
if value == nil {
value = p.ctx.New()
}
return
}
// Give returns a buffer to buffer pool
func (p *bufferPool) give(value interface{}) {
p.ctx.Reset(value)
p.Put(value)
}
以上已经够用,但是mosn 又单独封装了一个bufferValue 的 概念, bufferPool 的 take 和 give 并没有对外暴露, 对外暴露的是bufferValue的接口:
// bufferValue is buffer pool's Value
type bufferValue struct {
value [maxBufferPool]interface{}
transmit [maxBufferPool]interface{}
}
// newBufferValue returns bufferValue
func newBufferValue() (value *bufferValue) {
v := vPool.Get()
if v == nil {
value = new(bufferValue)
} else {
value = v.(*bufferValue)
}
return
}
Take 仅在find中调用?
// Take returns buffer from buffer pools
func (bv *bufferValue) Take(poolCtx types.BufferPoolCtx) (value interface{}) {
i := poolCtx.Index()
value = bPool[i].take()
bv.value[i] = value
return
}
// Give returns buffer to buffer pools
func (bv *bufferValue) Give() {
if index <= 0 {
return
}
// first index is 1
for i := 1; i <= int(index); i++ {
value := bv.value[i]
if value != nil {
bPool[i].give(value)
}
value = bv.transmit[i]
if value != nil {
bPool[i].give(value)
}
}
bv.value = nullBufferValue
bv.transmit = nullBufferValue
// Give bufferValue to Pool
vPool.Put(bv)
}
我们 看看 bufferValue 的对外方法:
NewBufferPoolContext (ctx context.Context) 将某个 context 的内置数组 types.ContextKeyBufferPoolCtx 下标 设置为newBufferValue bufferValue 新建结构体。
// NewBufferPoolContext returns a context with bufferValue
func NewBufferPoolContext(ctx context.Context) context.Context {
return mosnctx.WithValue(ctx, types.ContextKeyBufferPoolCtx, newBufferValue())
}
PoolContext(ctx context.Context) 从某个 context 中获取 bufferValue 对象,如果内置数组有 则返回内置数组中的bufferValue 指针,否则 新建一个
// PoolContext returns bufferValue by context
func PoolContext(ctx context.Context) *bufferValue {
if ctx != nil {
if val := mosnctx.Get(ctx, types.ContextKeyBufferPoolCtx); val != nil {
return val.(*bufferValue)
}
}
return newBufferValue()
}
PoolContext 经常与Find 一起使用:从上一步的 bufferValue 对象中,找到value 数组中的对象index 下标
// Find returns buffer from bufferValue
func (bv *bufferValue) Find(poolCtx types.BufferPoolCtx, x interface{}) interface{} {
i := poolCtx.Index()
if i <= 0 || i > int(index) {
panic("buffer should call buffer.RegisterBuffer()")
}
if bv.value[i] != nil {
return bv.value[i]
}
return bv.Take(poolCtx)
}
func proxyBuffersByContext(ctx context.Context) *proxyBuffers {
poolCtx := buffer.PoolContext(ctx)
return poolCtx.Find(&ins, nil).(*proxyBuffers)
}
从调用代码可以看出,bufferValue 经常被用来 结合context 入参 实现某个具体对象的获取。在变量机制中经常被用到。 一个完整的流程是:在 contextManager 中 为context 内置数组 types.ContextKeyBufferPoolCtx 分配一个 bufferValue对象,但是此时bufferValue 对象中val 是一个 各注册对象的数组,此时还没有具体分配具体对象。
func NewContextManager(base context.Context) *ContextManager {
return &ContextManager{
base: base,
}
}
func (cm *ContextManager) Next() {
// buffer context
cm.curr = buffer.NewBufferPoolContext(mosnctx.Clone(cm.base))
// variable context
cm.curr = variable.NewVariableContext(cm.curr)
}
通过 proxyBuffersByContext 此时在 bufferValue的 value 数组中填充一个具体的对象,并开始初始化一个对象
newActiveStream
proxyBuffers := proxyBuffersByContext(ctx)
stream := &proxyBuffers.stream
stream.ID = atomic.AddUint32(&currProxyID, 1)
stream.proxy = proxy
stream.requestInfo = &proxyBuffers.info
后续再次调用 proxyBuffersByContext 时 获取到的对象是已经初始化过内容的对象指针了。
以上如何理解bufferValue 呢?bufferValue 总是伴随context一起,如果没有bufferValue , 我们只能实现一个通用的对象池 获取一个对象壳子 以及归还一个对象壳子,有了bufferValue 以及 context, 不仅能够对壳子进行复用,还能获取到具体的对象指针:
我们再来看看:context 中的clone, 如果不clone 的话,cm.base中的 context 内置数组 中的 bufferValue 将会被覆盖。bufferValue 中的对象都是请求生命周期的,如果之前的context 已经new过一次,新的context 再new一次,则旧的context的bufferValue 下的 value 数组中的对象都会丢失。clone 之后不仅可以让原生context 中的bufferValue 对象不会丢失,也可以实现对象继承 或者覆盖(不太可能)
cm.curr = buffer.NewBufferPoolContext(mosnctx.Clone(cm.base))
除非一个context 走天下 且对象不会再分配,可以只new一次吧,否则一个新的context 最好new 一个新的 bufferValue
还有一个疑点是: bufferValue 中的 transmit 域:我们可以看到 transmit 域仅仅在释放的时候有使用到:把前一个context的value 拷贝到本 context 再一起归还给内存池。对于本身有clone的应该是没有必要的,但是对于那些不是clone的context 由于最后归还的context 只有一个,如果没有transmit 可能造成内存没有归还内存池?
// TransmitBufferPoolContext copy a context
func TransmitBufferPoolContext(dst context.Context, src context.Context) {
sValue := PoolContext(src)
if sValue.value == nullBufferValue {
return
}
dValue := PoolContext(dst)
dValue.transmit = sValue.value
sValue.value = nullBufferValue
}
最后bufferValue 本身也是在vpool 池子里复用的。
变量注册、变量赋值、变量获取、变量使用
变量必须在启动时先注册 后,才有后续的 赋值 和 获取等;
变量对象的描述:采用接口的模式 抽象 变量的设置 获取;
// Variable provides a flexible and convenient way to pass information
type Variable interface {
// variable name
Name() string
// variable data, which is useful for getter/setter
Data() interface{}
// variable flags
Flags() uint32
// value getter
Getter() GetterFunc
// value setter
Setter() SetterFunc
}
// Indexer indicates that variable needs to be cached by using pre-allocated IndexedValue
type Indexer interface {
// variable index
GetIndex() uint32
// set index to variable
SetIndex(index uint32)
}
type GetterFunc func(ctx context.Context, value *IndexedValue, data interface{}) (string, error)
// SetterFunc used to set the value of variable
type SetterFunc func(ctx context.Context, variableValue *IndexedValue, value string) error
具体的变量结构体:有BasicVariable 、IndexedVariable, 在 mosn 里面还有prefix_ | protocol variable 的 概念,后两者都是由 BasicVariable 承载;下面我们一步一步拆解这四个涵义,并比对异同点
// variable.Variable
type BasicVariable struct {
getter GetterFunc
setter SetterFunc
name string
data interface{}
flags uint32
}
func (bv *BasicVariable) Name() string {
return bv.name
}
func (bv *BasicVariable) Data() interface{} {
return bv.data
}
func (bv *BasicVariable) Flags() uint32 {
return bv.flags
}
func (bv *BasicVariable) Setter() SetterFunc {
return bv.setter
}
func (bv *BasicVariable) Getter() GetterFunc {
return bv.getter
}
// variable.Variable
// variable.VariableIndexer
type IndexedVariable struct {
BasicVariable
index uint32
}
func (iv *IndexedVariable) SetIndex(index uint32) {
iv.index = index
}
func (iv *IndexedVariable) GetIndex() uint32 {
return iv.index
}
// BasicSetter used for variable value setting only, and would not affect any real data structure, like headers.
func BasicSetter(ctx context.Context, variableValue *IndexedValue, value string) error {
variableValue.data = value
variableValue.Valid = true
return nil
}
我们先看看工厂中的全局存储结构体:factory.go
mux sync.RWMutex
variables = make(map[string]Variable, 32) // all built-in variable definitions
prefixVariables = make(map[string]Variable, 32) // all prefix getter definitions
indexedVariables = make([]Variable, 0, 32) // indexed variables
func RegisterVariable(variable Variable) error {
mux.Lock()
defer mux.Unlock()
name := variable.Name()
// check conflict
if _, ok := variables[name]; ok {
return errors.New(errVariableDuplicated + name)
}
// register
variables[name] = variable
// check index
if indexer, ok := variable.(Indexer); ok {
index := len(indexedVariables)
indexer.SetIndex(uint32(index))
indexedVariables = append(indexedVariables, variable)
}
return nil
}
对象的操作api在 api.go:
func GetVariableValue(ctx context.Context, name string) (string, error)
func SetVariableValue(ctx context.Context, name, value string) error
我们以proxy/var.go 为例:
BasicVariable :
初始化:
builtinVariables = []variable.Variable{
variable.NewBasicVariable(types.VarStartTime, nil, startTimeGetter, nil, 0),
variable.NewBasicVariable(types.VarRequestReceivedDuration, nil, receivedDurationGetter, nil, 0),
variable.NewBasicVariable(types.VarResponseReceivedDuration, nil, responseReceivedDurationGetter, nil, 0),
注册:
for idx := range builtinVariables {
variable.RegisterVariable(builtinVariables[idx])
}
我们发现BasicVariable 没有设置 setter 函数,只有getter 函数: 通过context 获取出具体请求相关的结构体(见上节内存复用分析),然后 从请求中直接获取相关的信息,所以没有setter 只有getter
// StartTimeGetter
// get request's arriving time
func startTimeGetter(ctx context.Context, value *variable.IndexedValue, data interface{}) (string, error) {
proxyBuffers := proxyBuffersByContext(ctx)
info := proxyBuffers.info
return info.StartTime().Format("2006/01/02 15:04:05.000"), nil
}
非index variable 直接从全局variables 接口map 获取变量接口的getter 方法进行调用
func GetVariableValue(ctx context.Context, name string) (string, error) {
// 1. find built-in variables
if variable, ok := variables[name]; ok {
// 1.1 check indexed value
if indexer, ok := variable.(Indexer); ok {
return getFlushedVariableValue(ctx, indexer.GetIndex())
}
// 1.2 use variable.Getter() to get value
getter := variable.Getter()
if getter == nil {
return "", errors.New(errGetterNotFound + name)
}
return getter(ctx, nil, variable.Data())
}
IndexedVariable
从初始化可以看出,对比 BasicVariable 普遍没有设置 variable.Getter(), 且需要显式调用api
func GetVariableValue(ctx context.Context, name string) (string, error)
func SetVariableValue(ctx context.Context, name, value string) error
还有多了一个 BasicSetter
// BasicSetter used for variable value setting only, and would not affect any real data structure, like headers.
func BasicSetter(ctx context.Context, variableValue *IndexedValue, value string) error {
variableValue.data = value
variableValue.Valid = true
return nil
}
type IndexedValue struct {
Valid bool
NotFound bool
noCacheable bool
//escape bool
data string
}
variable.NewIndexedVariable(types.VarProxyTryTimeout, nil, nil, variable.BasicSetter, 0),
variable.NewIndexedVariable(types.VarProxyGlobalTimeout, nil, nil, variable.BasicSetter, 0),
variable.NewIndexedVariable(types.VarProxyHijackStatus, nil, nil, variable.BasicSetter, 0),
variable.NewIndexedVariable(types.VarProxyGzipSwitch, nil, nil, variable.BasicSetter, 0),
variable.NewIndexedVariable(types.VarProxyIsDirectResponse, nil, nil, variable.BasicSetter, 0),
variable.NewIndexedVariable(types.VarHeaderStatus, nil, nil, variable.BasicSetter, 0),
variable.NewIndexedVariable(types.VarHeaderRPCMethod, nil, nil, variable.BasicSetter, 0),
variable.NewIndexedVariable(types.VarHeaderRPCService, nil, nil, variable.BasicSetter, 0),
下面 我们重点 捋一下 api:
注册变量时, 除了 把 indexVariable 放到全局的variable map 以外,还要放入一个variable 数组
// register
variables[name] = variable
// check index
if indexer, ok := variable.(Indexer); ok {
index := len(indexedVariables)
indexer.SetIndex(uint32(index))
indexedVariables = append(indexedVariables, variable)
}
先找到注册的变量对象,查询是否有实现index 接口。调用
setFlushedVariableValue(ctx context.Context, index uint32, value string)
func SetVariableValue(ctx context.Context, name, value string) error {
// find built-in & indexed variables, prefix and non-indexed are not supported
if variable, ok := variables[name]; ok {
// 1.1 check indexed value
if indexer, ok := variable.(Indexer); ok {
return setFlushedVariableValue(ctx, indexer.GetIndex(), value)
}
}
}
从context的 内置数组 下标 types.ContextKeyVariables 获取IndexedValue 数组,context 中的 IndexedValue 数组包含了所有indexed Variables 变量具体描述。
在contextManager 的Next 中new,在内存复用时已经clone过一次
func NewVariableContext(ctx context.Context) context.Context {
// TODO: sync.Pool reuse
values := make([]IndexedValue, len(indexedVariables)) // TODO: pre-alloc buffer for runtime variable
return mosnctx.WithValue(ctx, types.ContextKeyVariables, values)
}
此时 variable.Setter() 就是 basicSetter,将indexedValue 的data 设置为 value,并置为valid
func setFlushedVariableValue(ctx context.Context, index uint32, value string) error {
if variables := ctx.Value(types.ContextKeyVariables); variables != nil {
if values, ok := variables.([]IndexedValue); ok {
variable := indexedVariables[index]
variableValue := &values[index]
// should check variable.Flags
if (variable.Flags() & MOSN_VAR_FLAG_NOCACHEABLE) == MOSN_VAR_FLAG_NOCACHEABLE {
variableValue.noCacheable = true
}
setter := variable.Setter()
if setter == nil {
return errors.New(errSetterNotFound + variable.Name())
}
return setter(ctx, variableValue, value)
}
}
}
func GetVariableValue(ctx context.Context, name string) (string, error) {
// 1. find built-in variables
if variable, ok := variables[name]; ok {
// 1.1 check indexed value
if indexer, ok := variable.(Indexer); ok {
return getFlushedVariableValue(ctx, indexer.GetIndex())
}
从contex 的内置数组中,获取 index 对应的 indexdValue 如果是可缓存的 直接获取 data值
// TODO: provide direct access to this function, so the cost of variable name finding could be optimized
func getFlushedVariableValue(ctx context.Context, index uint32) (string, error) {
if variables := ctx.Value(types.ContextKeyVariables); variables != nil {
if values, ok := variables.([]IndexedValue); ok {
value := &values[index]
if value.Valid || value.NotFound {
if !value.noCacheable {
return value.data, nil
}
// clear flags
//value.Valid = false
//value.NotFound = false
}
return getIndexedVariableValue(ctx, value, index)
}
}
}
从getter 函数获取值,并赋值到context的index 下标
func getIndexedVariableValue(ctx context.Context, value *IndexedValue, index uint32) (string, error) {
variable := indexedVariables[index]
//if value.NotFound || value.Valid {
// return value.data, nil
//}
getter := variable.Getter()
if getter == nil {
return "", errors.New(errGetterNotFound + variable.Name())
}
vdata, err := getter(ctx, value, variable.Data())
if err != nil {
value.Valid = false
value.NotFound = true
return vdata, err
}
value.data = vdata
if (variable.Flags() & MOSN_VAR_FLAG_NOCACHEABLE) == MOSN_VAR_FLAG_NOCACHEABLE {
value.noCacheable = true
}
return value.data, nil
}
以上 indexed value 相当于contex 内置数组 对应下标的一个缓存,是对无法直接从context 转换对象 中getter 信息场景 的一个补充,
prefixVariables
VarPrefixReqHeader string = "request_header_"
// RespHeaderPrefix is the prefix of response header's formatter
VarPrefixRespHeader string = "response_header_"
prefixVariables = []variable.Variable{
variable.NewBasicVariable(types.VarPrefixReqHeader, nil, requestHeaderMapGetter, nil, 0),
variable.NewBasicVariable(types.VarPrefixRespHeader, nil, responseHeaderMapGetter, nil, 0),
}
变量注册 :
prefixVariables = make(map[string]Variable, 32) // all
func RegisterPrefixVariable(prefix string, variable Variable) error {
mux.Lock()
defer mux.Unlock()
// check conflict
if _, ok := prefixVariables[prefix]; ok {
return errors.New(errPrefixDuplicated + prefix)
}
// register
prefixVariables[prefix] = variable
return nil
}
变量获取: 遍历prefix map 调用getter
// 2. find prefix variables
for prefix, variable := range prefixVariables {
if strings.HasPrefix(name, prefix) {
getter := variable.Getter()
if getter == nil {
return "", errors.New(errGetterNotFound + name)
}
return getter(ctx, nil, name)
}
}
prefix 变量不支持 setter,看看getter 函数:
func requestHeaderMapGetter(ctx context.Context, value *variable.IndexedValue, data interface{}) (string, error) {
proxyBuffers := proxyBuffersByContext(ctx)
headers := proxyBuffers.stream.downstreamReqHeaders
headerName := data.(string)
headerValue, ok := headers.Get(headerName[reqHeaderIndex:])
if !ok {
return variable.ValueNotFound, nil
}
return string(headerValue), nil
}
func responseHeaderMapGetter(ctx context.Context, value *variable.IndexedValue, data interface{}) (string, error) {
proxyBuffers := proxyBuffersByContext(ctx)
headers := proxyBuffers.request.upstreamRespHeaders
headerName := data.(string)
headerValue, ok := headers.Get(headerName[respHeaderIndex:])
if !ok {
return variable.ValueNotFound, nil
}
return string(headerValue), nil
}
ProtocolVariables
// register protocol resource
variable.RegisterProtocolResource(protocol.HTTP1, api.SCHEME, types.VarProtocolRequestScheme)
variable.RegisterProtocolResource(protocol.HTTP1, api.PATH, types.VarProtocolRequestPath)
variable.RegisterProtocolResource(protocol.HTTP1, api.URI, types.VarProtocolRequestUri)
variable.RegisterProtocolResource(protocol.HTTP1, api.ARG, types.VarProtocolRequestArg)
variable.RegisterProtocolResource(protocol.HTTP1, api.COOKIE, types.VarProtocolCookie)
variable.RegisterProtocolResource(protocol.HTTP1, api.HEADER, types.VarProtocolRequestHeader)
var (
errUnregisterProtocolResource = "unregister Protocol resource, Protocol: "
protocolVar map[string]string
)
func init() {
protocolVar = make(map[string]string)
}
// RegisterProtocolResource registers the resource as ProtocolResourceName
// forexample protocolVar[Http1+api.URI] = http_request_uri var
func RegisterProtocolResource(protocol api.ProtocolName, resource api.ProtocolResourceName, varname string) error {
pr := convert(protocol, resource)
protocolVar[pr] = fmt.Sprintf("%s_%s", protocol, varname)
}
// GetProtocolResource get URI,PATH,ARG var depends on ProtocolResourceName
func GetProtocolResource(ctx context.Context, name api.ProtocolResourceName, data ...interface{}) (string, error) {
p, ok := mosnctx.Get(ctx, types.ContextKeyDownStreamProtocol).(api.ProtocolName)
if v, ok := protocolVar[convert(p, name)]; ok {
// apend data behind if data exists
if len(data) == 1 {
v = fmt.Sprintf("%s%s", v, data[0])
}
return GetVariableValue(ctx, v)
} else {
return "", errors.New(errUnregisterProtocolResource + string(p))
}
}
func convert(p api.ProtocolName, name api.ProtocolResourceName) string {
return string(p) + string(name)
}
NewActiveStream:
ctx = mosnctx.WithValue(ctx, types.ContextKeyDownStreamProtocol, proto)
contextManager
针对 内存复用 变量 进行上下文 环境管理;
stream 中的contextManager
// contextManager
type ContextManager struct {
base context.Context
curr context.Context
}
func (cm *ContextManager) Get() context.Context {
return cm.curr
}
func (cm *ContextManager) Next() {
// buffer context
cm.curr = buffer.NewBufferPoolContext(mosnctx.Clone(cm.base))
// variable context
cm.curr = variable.NewVariableContext(cm.curr)
}
func (cm *ContextManager) InjectTrace(ctx context.Context, span api.Span) context.Context {
if span != nil {
return mosnctx.WithValue(ctx, mosnctx.ContextKeyTraceId, span.TraceId())
}
// generate traceId
return mosnctx.WithValue(ctx, mosnctx.ContextKeyTraceId, trace.IdGen().GenerateTraceId())
}
func NewContextManager(base context.Context) *ContextManager {
return &ContextManager{
base: base,
}
}
先看看访问日志中对变量的使用 以及 变量包:
const DefaultAccessLogFormat = "%start_time% %request_received_duration% %response_received_duration% %bytes_sent%" + " " +
"%bytes_received% %protocol% %response_code% %duration% %response_flag% %response_code% %upstream_local_address%" + " " +
"%downstream_local_address% %downstream_remote_address% %upstream_host%"
%start_time% %ID% %topic% %dst_psm% %dst_address% %proto% %http_req_method% %http_req_uri% %http_req_arg_xxx% %http_req_hdr_xxx% %body_bytes_sent% %body_sent% %http_resp_code%
%http_resp_hdr_xxx% %body_bytes_recieved% %http_resp_hdr_time %http_resp_all_time
backend
传统的方式 是 定义一个结构体:然后将各种info在适当的时机赋值,最后打日志的时候 使用 switch case 根据每个logformat 获取对应info值;
而mosn 里面没有使用上述方式,好处 是: