Skip to main content

apache beam 的总点知识点

1 Pipeline 管道

2 Pcollection

3 Ptransform

总体流程就是 设置pipeline >> read data >> pcollection >> ptransform >> pcollection

其中ptransform有ParDo和耦合函数功能

ParDo()里面只能接收的是DoFn类或集成DoFn类的对象的函数,ParDo操作的是每一行的数据,就好像dataframe里面的一行

敲黑板时间!!

读取Csv和txt文件,Pipeline读取得每一行为字符串,也就是,属于pandas里面的seris, 也就是只有一列,如果要分开几列,我们就要split字符串,然后做成key:value字典格式.

读取avro文件时候,Pipeline读取的每一行为字典{},也就是pandas里面的一行dataframe,如果要取值,我们需要element[列名]就可以取到

聚合函数操作后,就会返回一个整体的新的Pcollection 记住字典格式要做聚合函数要变成列表或者元组

Groupbykey--对象是一个Pcollection,首先选择要做聚合的key和值,然后tranform一个pcollection格式为turple,里面为(key,value),最后通过管道 | beam.GroupByKey()

就会生成按key分类, 以下这样的效果,具体说明例子

cat, 1
dog, 5
and, 1
jump, 3
tree, 2
cat, 5
dog, 2
and, 2
cat, 9
and, 6
...

变成

cat, [1,5,9]

dog, [5,2]

and, [1,2,6]

jump, [3]

tree, [2]
...

##CoGroupKey--操作对象Pcollection,把两个Pcollection通过key连接起来, 比如Pcollection1: 是宠物名字和年龄 ''' "Amy", 9

"Tom", 3

"Shierly", 3

"Miccle", 4

"Dockey", 4

Pcollection2: 是宠物名字和主人名字

"Amy", "michael"

"Tom", "Tommy"

"Shierly", "Darren"

"Miccle", "Cherry"

"Dockey", "Dick"


age_list = [("Amy", 9),

   ("Tom", 3),

("Shierly", 3)

("Miccle", 4)

("Dockey", 4])]

Owner_list = [("Amy", "michael"),

("Tom", "Tommy"),

("Shierly", "Darren"),

("Miccle", "Cherry"),

("Dockey", "Dick")]

然后我们开始创建两个Pcollections

age = P |"create age" >> beam.Create(age_list) owner = P | "create owner" >> beam.Create(owner_list)

我们用CoGroupbyKey的时候,是使用key,value的字典格式作为输入

格式为 results = {"Pcollection1名字":Pcollection1, "Pcollection2名字":Pcollection2} | beam.CoGroupByKey()

得到的效果是 [(Key1,{"Pcollection1名字":Pcollection1, "Pcollection2名字":Pcollection2}), (Key2,{"Pcollection1名字":Pcollection1, "Pcollection2名字":Pcollection2}),(Key3,{"Pcollection1名字":Pcollection1, "Pcollection2名字":Pcollection2})]

呈现效果是: ["Tom" , {'age':3, "owner":"Tommy"} "Shierly" ,{'age':3,"owner":"Darren"} "Amy",{'age':9,"owner":"michael"} ... ]


总结CoGroupByKey就是把两个Pcollection通过共同的Key连接起来,然后用元组(key,value)显示出来,values是一个字典格式,包含Pcollection名字:value_list

一句话表示:

COGroupByKey就是元组key包含字典pcollection与定义的value

GroubByKey就是元组key包含value_list

CombinePerKey(beam.combiners.MeanCombineFn)

就是把groupbykey 再对每个key的value做加权平均

Flatten 把多个PCollection 变成一个 PCollection,

说白了就是把多个列表的值放到一个列表

# Flatten takes a tuple of PCollection objects.
# Returns a single PCollection that contains all of the elements in the PCollection objects in that tuple.
merged = (
(pcoll1, pcoll2, pcoll3)
# A list of tuples can be "piped" directly into a Flatten transform.
| beam.Flatten())