java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Flink DataStream框架

Flink DataStream基础框架源码分析

作者:xiangel

这篇文章主要为大家介绍了Flink DataStream基础框架源码分析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

引言

希望通过对Flink底层源码的学习来更深入了解Flink的相关实现逻辑。这里新开一个Flink源码解析的系列来深入介绍底层源码逻辑。说明:这里默认相关读者具备Flink相关基础知识和开发经验,所以不会过多介绍相关的基础概念相关内容,Flink使用的版本为1.15.2。初步确定按如下几个大的方面来介绍

计算模型

部署&调度

存储体系

底层支撑

概览

本篇是第一篇,介绍计算模型的基础DataStream的相关内容,这一篇只介绍DataStream的基础内容,如如何实现相关的操作,数据结构等,不会涉及到窗口、事件事件和状态等信息

DataStream是对数据流的一个抽象,其提供了丰富的操作算子(例如过滤、map、join、聚合、定义窗口等)来对数据流进行处理,下图描述了Flink中源数据通过DataStream的转换最后输出的整个过程。

通过上图可以来构想下,一般一个DataStream具有如下主要属性

属性说明
上游依赖标识上游依赖信息,这样能把整个处理流程串联起来
并行度处理逻辑的并行度信息,这样可以提高处理的速度
输入格式指定输入数据的格式,如InnerType { public int id; public String text; }
输出格式指定输出数据的格式
处理逻辑上游datastream转换到目前的datastream的具体逻辑操作,如map的具体逻辑信息。

最终整个数据流会生成一个DAG图(有向无环图),通过这个DAG图就可以生成对应的任务来运行了。下面来具体分析DataStream的实现和生成DAG图(Flink中叫Graph)

深入DataStream

首先我们通过下图来看看,DataStream中的一些主要的辅助类,DataStream类本身主要逻辑是对各类转换关系和sink的操作,而前面说到的一些主要属性信息都是通过辅助类来处理的。

Transformation:本身主要管理了输出格式、上游依赖、并行度、id编号等信息以及StreamOperator的工厂类(StreamOperatorFactory)

StreamOperator:主要是各类操作的具体处理逻辑

Function:用户自定义函数的接口,如DataStream中map处理时需要传入的MapFunction就是Function的子接口

DataStream

属性和方法

DataStream的属性比较简单,就2个,1个是实行的环境信息,另一个是Transformation。
DataStream中的方法主要分为以下几类

除了转换操作外,其他几类的逻辑都比较直观和简单,这里重点介绍下转换操作的处理,转换操作这里分为3类,1.返回是一个DataStream。如map、filter、union、keyBy等;2.返回的是一个Streams,即输入是多个DataStream,这类的操作主要是多流关联的操作,如join、coGroup。这些Streams的类中实现了一些方法,来返回一个DataStream; 3.window类,返回的是AllWindowedStream类型,同样这些类中也是有方法,来返回一个DataStream。

说明:如上的各个分类都是个人基于理解上做的各个分类处理,非官方定义

类体系

DataStream的类图关系比较简单,就如下这几个类,具体每个子类的信息见下表

子类名说明
SingleOutputStreamOperator只有1个输出的DataStream
IterativeStream迭代的DataStream,具体使用场景后面分析
DataStreamSource最开始的DataStream,里面有source的信息
KeyedStream有一个Key信息的DataStream

Transformation

属性和方法

DataStream类本身主要是提供了给外部的编程接口的支持,而对Streaming flow算子节点本身的一些属性和操作则由Transformation来负责

从上图可以看出其主要属性有节点id,名称,并行度,输出类型还有一些与资源相关的内容,还有一个是上游的输入Transformation,由于这个因不同的Transformation会有不同的数据个数,所以这个信息是放在各个子类中的。如ReduceTransformation是有一个input属性来记录上游依赖,而如TwoInputTransformation则是有2个属性input1和input2来记录上游依赖,另外如SourceTransformation,这个是源头的Transformation,是没有上游依赖的Transformation,所以没有属性来记录,但是有个Source属性来记录Source输入
具体对数据的操作处理,在Transformation里面有个StreamOperatorFactory属性,其中的StreamOperator实现了各种的处理算子。注意这里不是所有的Transformation都包含StreamOperatorFactory,如SourceTransformation中就没有,这个具体大家可以看看相关的代码。

Transformation的方法基本上是对上述属性的get和set操作,这里重点要说明一下的是PhysicalTransformation(下面类体系来介绍)中的setChainingStrategy方法,这里的ChainingStrategy是一个枚举类,主要是控制多个连续的算子是否可以进行链式处理,这个具体的我们在下面介绍StreamGraph时再介绍

类体系

Transformation的大多数类均为PhysicalTransformation的子类,PhysicalTransformation为有物理操作的,重点是这类的子类是支持Chaining操作的。我们先来看看其重要子类

子类名说明
SourceTransformation连接Source的Transformation,是整个streaming flow的最开始的转换处理
SinkTransformation输出的转换处理,是整个streaming flow的最后一个
OneInputTransformation只有1个输入的转换处理,如map、filter这类的处理
TwoInputTransformation有2个输入的转换处理

StreamOperator

属性和方法

StreamOperator负责对数据进行处理的具体逻辑,如map处理的StreamMap,由于各个Operator的处理方式的不同,这里主要以AbstractStreamOperator来介绍一些主要的属性,如output的数据,StreamConfig,StreamingRuntimeContext等。

下面我们重点介绍下相关的方法

StreamOperator接口有定义了重要的3个方法(这里只介绍与数据基础处理相关的部分)

方法说明
open()数据处理的前处理,如算子的初始化操作等
finish()数据处理的后处理,如缓存数据的flush操作等
close()该方法在算子生命周期的最后调用,不管是算子运行成功还是失败或者取消,主要是对算子使用到的资源的各种释放处理

另外关注的对数据进行实际处理的方法,

接口方法说明
OneInputStreamOperatorprocessElement()对数据元素进行处理,实际该接口在OneInputStreamOperator的父接口Input中定义
TwoInputStreamOperatorprocessElement1()对input1的数据元素进行处理
processElement2()对input2的数据元素进行处理

类体系

StreamOpterator的子类非常多,包括测试类的一起有287个,这些大致可以归属到如下3个子类中,

类名说明
OneInputStreamOperator只有1个输入的源
TwoInputStreamOperator有2个输入源
AbstractStreamOperator

Function

Function是针对所有的用户自定义的函数,各子类主要是实现对应的,这里定义了种类丰富的各类Function的子接口类来适配各种不同的加工场景,具体的就看源码了,这里就不详细介绍了

本节的最后,我们通过一个例子来看看这几个类是怎么组合的。如下是一个常见的对DataStream进行map处理的操作

      text
        .flatMap(new Tokenizer)

处理后对应的DataStream的结构如下图

DataStream生成提交执行的Graph

前面分析了DataStream,是单个节点的,接下来看看整个streaming flow在flink中是怎么转换为可以执行的逻辑的。一般整个数据流我们叫做DAG,那在Flink中叫PipeLine,其实现类是StreamGraph。这里先介绍2个概念

StreamGraph

下面我们来看看StreamGraph的主要属性和方法,以及如何从DataStream转换为StreamGraph的。

属性和方法

重要属性如下(这里只介绍与生成图相关的属性,还有一些如状态,存储类的后面介绍)

属性说明
Map<Integer, StreamNode> streamNodesStreamNode数据,kv格式,key为Transformation的id
Set sourcesStreamGraph的所有source集合,存储的是Transformation的id
Set sinksStreamGraph的sink集合

说明:StreamGraph只记录了StreamNode的信息,StreamEdge的信息是记录在StreamNode中的。如下2个属性记录了StreamNode的输入Edge和输出Edge

    //StreamNode.java
    private List<StreamEdge> inEdges = new ArrayList<StreamEdge>();
    private List<StreamEdge> outEdges = new ArrayList<StreamEdge>();

主要方法

方法说明
addSource()添加source节点
addSink()添加sink节点
addOperator()添加算子节点
addVirtualSideOutputNode()添加一个虚拟的siteOutput节点

StreamGraph生成

下面我们来看看DataStream是如何生成StreamGraph的。通过前面对DataStream的分析可知,DataStream的前后依赖关系是通过Transformation来存储的,这里StreamExecutionEnvironment有个transformations记录了所有的Transformation

    //StreamExecutionEnvironment.java
    List<Transformation<?>> transformations

这里的数据是在DataStream进行转换处理生成了新的Transformation,同时会把该实例添加到transformations里面,使用的是如下方法

    getExecutionEnvironment().addOperator(resultTransform);

而具体转换为StreamGraph是通过StreamExecutionEnvironment的 getStreamGraph()方法。最终转换的逻辑是通过StreamGraphGenerator类来实现。

这里要介绍一个新的类体系TransformationTranslator,有各种的子类来转换对应类型的Transformation。这里有定义了2个方法分别支持转换Streaming和Batch。

 //TransformationTranslator.java
 Collection<Integer> translateForBatch(final T transformation, final Context context);
 Collection<Integer> translateForStreaming(final T transformation, final Context context);

对应的映射关系存储在StreamGraphGenerator类的translatorMap中。

//StreamGraphGenerator.java 
private static final Map<
                    Class<? extends Transformation>,
                    TransformationTranslator<?, ? extends Transformation>>
            translatorMap;
static {
        @SuppressWarnings("rawtypes")
        Map<Class<? extends Transformation>, TransformationTranslator<?, ? extends Transformation>>
                tmp = new HashMap<>();
        tmp.put(OneInputTransformation.class, new OneInputTransformationTranslator<>());
        tmp.put(TwoInputTransformation.class, new TwoInputTransformationTranslator<>());
        tmp.put(MultipleInputTransformation.class, new MultiInputTransformationTranslator<>());
        tmp.put(KeyedMultipleInputTransformation.class, new MultiInputTransformationTranslator<>());
        tmp.put(SourceTransformation.class, new SourceTransformationTranslator<>());
        tmp.put(SinkTransformation.class, new SinkTransformationTranslator<>());
    ...

下面我们通过OneInputTransformationTranslator为例来看看是如何进行转换的。具体逻辑如下

 //调用addOperator添加StreamNode
 streamGraph.addOperator(
                transformationId,
                slotSharingGroup,
                transformation.getCoLocationGroupKey(),
                operatorFactory,
                inputType,
                transformation.getOutputType(),
                transformation.getName());
//获取上游依赖的transformations,然后添加边
        final List<Transformation<?>> parentTransformations = transformation.getInputs();
        for (Integer inputId : context.getStreamNodeIds(parentTransformations.get(0))) {
            streamGraph.addEdge(inputId, transformationId, 0);
        }

除了添加节点和边外,还有一些如设置节点的并行度等操作,这块大家可以去看看具体的代码。
这样当把所有的Transformation都转换完,这样StreamGraph就生成好了。

JobGraph

有了StreamGraph,为什么还需要一个JobGraph呢,这个和Spark中的Stage类似,如果有多个算子能够合并到一起处理,那这样性能可以提高很多。所以这里 根据一定的规则进行,先我们介绍相关的类

属性和方法

JobGraph的属性主要是通过Map<JobVertexID, JobVertex> taskVertices记录了JobVertex的信息。
另外这个JobGraph是提交到集群去执行的,所以会有一些执行相关的信息,相关的如下:

    private JobID jobID;
    private final String jobName;
    private SerializedValue<ExecutionConfig> serializedExecutionConfig;
    /** Set of JAR files required to run this job. */
    private final List<Path> userJars = new ArrayList<Path>();
     /** Set of custom files required to run this job. */
    private final Map<String, DistributedCache.DistributedCacheEntry> userArtifacts =
            new HashMap<>();
    /** Set of blob keys identifying the JAR files required to run this job. */
    private final List<PermanentBlobKey> userJarBlobKeys = new ArrayList<>();
    /** List of classpaths required to run this job. */
    private List<URL> classpaths = Collections.emptyList();

而相关的方法主要是

方法说明
addVertex()添加顶点
getVertices()获取顶点

而如何从StreamGraph转换到JobGraph这块的内容还是比较多,这块后续我们单独开一篇来介绍

总结

本篇从0开始介绍了DataStream的相关内容,并深入介绍了DataStream、Transformation、StreamOperator和Function之间的关系。另外介绍了streaming flow转换为提交执行的StreamGraph的过程及StreamGraph的存储结构。而从StreamGraph->JobGraph->ExecutionGraph这块涉及的内容也较多,且还涉及到提交部署的内容,这块后面单独来介绍。最后本篇介绍的DataStream只是介绍了最基础的计算框架,没有涉及到flink的streaming flow中的时间、状态、window等内容,更多关于Flink DataStream基础的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文