每一行代码都是深思熟虑……
FromSlice
FromChan
Of
Range
Subject
Timeout
Interval
Merge
Concat
Race
CombineLatest
Empty
Never
Throw
Do
Take
TakeWhile
TakeUntil
Skip
SkipWhile
SkipUntil
IgnoreElements
Share
StartWith
Zip
Filter
Distinct
DistinctUntilChanged
Debounce
DebounceTime
Throttle
ThrottleTime
First
Last
Count
Max
Min
Reduce
Map
MapTo
MergeMap
MergeMapTo
SwitchMap
SwitchMapTo
import ( . "github.com/langhuihui/RxGo/rx" ) func main(){ err := Of(1, 2, 3, 4).Take(2).Subscribe(ObserverFunc(func(event *Event) { })) }
import ( . "github.com/langhuihui/RxGo/rx" . "github.com/langhuihui/RxGo/pipe" ) func main(){ err := Of(1, 2, 3, 4).Pipe(Skip(1),Take(2)).Subscribe(ObserverFunc(func(event *Event) { })) }
管道模式相比链式模式,具有操作符可扩展性,用户可以按照规则创建属于自己的操作符
type Operator func(Observable) Observable
操作符只需要返回Operator这个类型即可,例如 实现一个happy为false就立即完成的操作符
func MyOperator(happy bool) Operator { return func(source Observable) Observable { return func (sink *Observer) error { if happy{ return source(sink) } return nil } } }
在任何时候,您都可以创建自定义的Observable,用来发送任何事件
import ( . "github.com/langhuihui/RxGo/rx" ) func MyObservable (sink *Control) error { sink.Next("hello") return nil } func main(){ ob := Observable(MyObservable) ob.Subscribe(ObserverFunc(func(event *Event) { })) }
所谓Observable
,就是一个可以被订阅,然后不断发送事件的事件源,见如下示意图
time --> (*)-------------(o)--------------(o)---------------(x)----------------|> | | | | | Start value value error Done
该示意图代表了,事件被订阅后(Start)开始不停发送事件的过程,直到发出error或者Done(完成)为止
有的Observable
并不会发出完成事件,比如Never
参考网站: rxmarbles
实现Rx的关键要素,是要问几个问题
Observable
,?(一个结构体?一个函数?一个接口?一个Channel
?)Observer
?)Observable
和操作符Observable---------Operator----------Operator-----------Observer <| <| <| 订阅/取消 订阅/取消 订阅/取消
Observable---------Operator----------Operator-----------Observer |> |> |> 完成/错误 完成/错误 完成/错误
实际情况远比这个复杂,后面会进行分析
Observable
Observable 被定义成为一个函数,该函数含有一个类型为*Observer的参数。
type Observable func(*Observer) error
任何事件源都是这样的一个函数,当调用该函数即意味着订阅了该事件源,入参为一个Observer
,具体功能见下面
如果该函数返回nil,即意味着事件流完成
否则意味着事件流异常
Observer
type Stop chan bool type Observer struct { next NextHandler //缓存当前的NextHandler,后续可以被替换 dispose Stop //取消订阅的信号,只用来close complete Stop //用于发出完成信号 err error //缓存当前的错误 }
该控制器为一个结构体,其中next记录了当前的NextHandler,
在任何时候,如果关闭了dispose这个channel,就意味着取消订阅。
//Dispose 取消订阅 func (c *Observer) Dispose() { select { case <-c.dispose: default: close(c.dispose) } } //Aborted 判断是否已经取消订阅或者已完成 func (c *Observer) Aborted() bool { select { case <-c.dispose: return true case <-c.complete: return true default: return false } }
由于Channel的close可以引发所有读取该Channel的阻塞行为唤醒,所以可以在不同层级复用该channel
并且,由于已经close的channel可以反复读取以取得是否close的状态信息,所以不需要再额外记录
Observer
对象为Observable
和事件处理逻辑共同持有,是二者沟通的桥梁
type Event struct { Data interface{} Target *Observer } NextHandler interface { OnNext(*Event) }
NextHandler
是一个接口,实现OnNext
函数,当Observable
数据推送到Observer
中时,即调用了该函数
Target
属性用于存储当前发送事件的Observer
对象,有两大重要使命
这样做的好处是可以实现不同的观察者,比如函数或者channel
type( NextFunc func(*Event) NextChan chan *Event ) func (next NextFunc) OnNext(event *Event) { next(event) } func (next NextChan) OnNext(event *Event) { next <- event }
//TakeUntil 一直获取事件直到unitl传来事件为止 func (ob Observable) TakeUntil(until Observable) Observable { return func(sink *Observer) error { go until(sink.New3(NextFunc(func(event *Event) { //获取到任何数据就让下游完成 sink.Complete() //由于复用了complete信号,所以会导致所有复用complete的事件流完成 }))) return ob(sink) } }
TakeUnitl的用途是,传入一个until事件源,当这个until事件源接受到事件时,就会导致当前的事件源"完成”。相当于某种中断信号。
看似简短的代码,确考虑各种不同的情形
几大实现细节:
本文向大家介绍golang操作elasticsearch的实现,包括了golang操作elasticsearch的实现的使用技巧和注意事项,需要的朋友参考一下 1、前提 1.1 docker 安装elasticsearch 查询elasticsearch 版本 将对应的版本拉到本地 创建一个网络 启动容器 1.2这里过后就可以去写go代码 为了直观搞了个可视化工具 ElisticHD 这里使用do
分享一波超赞的面试体验 首先自我介绍,介绍两段实习经历,所负责的业务模块 然后,本科和硕士最后成就感的事,我说打过cuba拿到过名次,读研就是自己开发过一个用于组内的项目 简单基础,sql的执行过程,一些关键字,内联函数的用法,sql的优化(这块答的不好) 然后就是b站实习做的购物车,搜索的业务,怎么使用es,分瓷器,缓存结构,怎么设计的 然后遇到过哪些困难,研究生做的课题是什么,介绍了一堆,反问
本文向大家介绍golang struct 实现 interface的方法,包括了golang struct 实现 interface的方法的使用技巧和注意事项,需要的朋友参考一下 golang中,一般strcut包含 interface类型后,struct类型都需要实现 interface导出的接口,从而成为相应的 interface接口类。 实际上,struct包含interface之后,并不需
本文向大家介绍golang实现跨域访问的方法,包括了golang实现跨域访问的方法的使用技巧和注意事项,需要的朋友参考一下 前端通过Ajax来获取服务器资源时,会存在跨域问题。因为Ajax只能同源使用(预防某些恶意行为),所以当访问不在同一个域中的资源时,就会出现跨域限制。尤其在开发和测试时,跨域问题会给前端测试带来非常不便。 不过CORS(Cross-Origin Resource Sharin
本文向大家介绍golang实现java uuid的序列化方法,包括了golang实现java uuid的序列化方法的使用技巧和注意事项,需要的朋友参考一下 目前只实现了java生成的固定的uuid:85bb94b8-fd4b-4e1c-8f49-3cedd49d8f28的序列化 java读取测试 到此这篇关于golang实现java uuid的序列化方法的文章就介绍到这了,更多相关golang实现
本文向大家介绍golang实现并发数控制的方法,包括了golang实现并发数控制的方法的使用技巧和注意事项,需要的朋友参考一下 golang并发 谈到golang这门语言,很自然的想起了他的的并发goroutine。这也是这门语言引以为豪的功能点。并发处理,在某种程度上,可以提高我们对机器的使用率,提升系统业务处理能力。但是并不是并发量越大越好,太大了,硬件环境就会吃不消,反而会影响到系统整体性能
本文向大家介绍Golang中禁止拷贝的实现代码,包括了Golang中禁止拷贝的实现代码的使用技巧和注意事项,需要的朋友参考一下 前言 Go中没有原生的禁止拷贝的方式,所以如果有的结构体,你希望使用者无法拷贝,只能指针传递保证全局唯一的话,可以这么干,定义 一个结构体叫 noCopy,要实现 sync.Locker 这个接口 方法如下: 然后把 noCopy 嵌到你自定义的结构体里,然后 go ve
问题内容: 谁能告诉我如何从字符串创建Type的新实例?反映? 有示例,但它们适用于语言[:(]的旧版本(Go 1之前的版本) 问题答案: 因此,如果我正确理解了您的问题,那么您在问的是,仅将类型名称作为字符串时,如何创建对象。因此,例如,您可能有一个字符串“ MyStruct”,并且想要创建这种类型的对象。 不幸的是,这是不容易实现的,因为Go是一种静态类型的语言,并且链接程序将消除无效代码(或