Golang rxgo库的具体使用
作者:X_PENG
本文主要介绍了Golang rxgo库的具体使用,类似于JavaStream的流式编程,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
关键总结
可以类比Java Stream流式编程,Observable就好比Stream。
两张重要的图告诉你如何使用rxgo:
每个方法都会返回一个channel,然后别人可以消费这个channel

如下是FlatMap的用法(可类比Java Stream的flatMap):

FlatMap就是将一个流中的每个元素都映射成另一个流,并且将所有的流都合并进行扁平化处理。
示例代码
简单示例:
func main() {
observable := rxgo.Just(1, 2, 3, 4, 5)()
ch := observable.Observe()
for item := range ch {
fmt.Println(item.V)
}
}
FromChannel可以直接从一个已存在的<-chan rxgo.Item对象中创建 Observable:
func main() {
ch := make(chan rxgo.Item)
go func() {
for i := 1; i <= 5; i++ {
ch <- rxgo.Of(i)
}
close(ch)
}()
observable := rxgo.FromChannel(ch)
for item := range observable.Observe() {
fmt.Println(item.V)
}
}
也可以使用Create方法:
// Create creates an Observable from scratch by calling observer methods programmatically.
func Create(f []Producer, opts ...Option) Observable {
return &ObservableImpl{
iterable: newCreateIterable(f, opts...),
}
}
传入一个[]rxgo.Producer的切片,其中rxgo.Producer的类型为func(ctx context.Context, next chan<- Item)。我们可以在代码中调用rxgo.Of(value)生成数据,rxgo.Error(err)生成错误,然后发送到next通道中:
func main() {
observable := rxgo.Create([]rxgo.Producer{func(ctx context.Context, next chan<- rxgo.Item) {
next <- rxgo.Of(1)
next <- rxgo.Of(2)
next <- rxgo.Of(3)
next <- rxgo.Error(errors.New("unknown"))
next <- rxgo.Of(4)
next <- rxgo.Of(5)
}})
ch := observable.Observe()
for item := range ch {
if item.Error() {
fmt.Println("error:", item.E)
} else {
fmt.Println(item.V)
}
}
}
分成2个rxgo.Producer也是一样的效果:
observable := rxgo.Create([]rxgo.Producer{func(ctx context.Context, next chan<- rxgo.Item) {
next <- rxgo.Of(1)
next <- rxgo.Of(2)
next <- rxgo.Of(3)
next <- rxgo.Error(errors.New("unknown"))
}, func(ctx context.Context, next chan<- rxgo.Item) {
next <- rxgo.Of(4)
next <- rxgo.Of(5)
}})
到此这篇关于Golang rxgo库的具体使用的文章就介绍到这了,更多相关Golang rxgo库内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
