Skip to main content

今天有意思啦 !! spark 读取 GCS avro文件 + ETL操作 + 写avro文件

1. spark 初始化,因为要读取成dataframe或者sql形式,导入SparkSession

from pyspark.sql import SparkSession
import sys

2. 创建spark对象

spark = SparkSession \
.builder \
.master('yarn') \
.appName('gcs-sparkdataframe-sql-avro') \
.getOrCreate()

参数判断和参数设置

if len(sys.argv) != 4:
raise Exception("Exactly 3 arguments are required: <inputUri> <table1><table2>")

inputUri=sys.argv[1]
table1=sys.argv[2]
table2=sys.argv[3]

4 读取avro文件

df = spark.read.format('avro').load(inputUri)

5 注册视图,实行查询语句

df1 = spark.sql("select ACNO,%s from bigtable" % (",".join(df.columns[1:round(len(df.columns) / 2)])))
df2 = spark.sql("select ACNO,%s from bigtable" % (",".join(df.columns[round(len(df.columns) / 2):])))
df1.show(10)
df2.show(10)

6 处理好的dataframe对象写成avro文件 (注意,用sql处理过后的还是dataframe对象)

df1.write.format('avro').save(table1,'avro')

df2.write.format('avro').save(table2,'avro')

7 去到终端输入命令,创建dataproc集群,然后提交spark job

CLUSTER_NAME=newnew
gcloud beta dataproc clusters create ${CLUSTER_NAME} \
--region=global \
--zone=us-central1-b \
--worker-machine-type n1-standard-1 \
--num-workers 2 \
--image-version 1.4-debian \
--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
--metadata 'PIP_PACKAGES=google-cloud-storage avro-python3 dask[dataframe] gcsfs fastavro' \
--enable-component-gateway \
--worker-boot-disk-size=40 \
--optional-components=ANACONDA \
--enable-component-gateway
BUCKET_NAME=zz_mm_bucket
gcloud config set dataproc/region global
gcloud dataproc jobs submit pyspark avrosqlargs.py --cluster newnew \
--jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar \
--jars=https://repo1.maven.org/maven2/org/apache/spark/spark-avro_2.11/2.4.4/spark-avro_2.11-2.4.4.jar \
-- gs://${BUCKET_NAME}/input/gs://zz_mm_bucket/input/ gs://${BUCKET_NAME}/output/table1 gs://${BUCKET_NAME}/output/table2

这里注意的是gcloud dataproc jobs sumbit的参数格式是 pyspark.py文件, files

所以例子中我们的参数总共有4个 1 avrosqlargs.py

2 gs://${BUCKET_NAME}/input/gs://zz_mm_bucket/input/

3 gs://${BUCKET_NAME}/output/table1

4 gs://${BUCKET_NAME}/output/table2

jars和cluster都不算为参数

还有就是files的是文件夹形式而不能是文件形式,所以读入文件夹后,可以根据需要读取你需要的文件,比如sys.argv+'文件名'

所以整体可以改成:

#!/usr/bin/python
"""BigQuery I/O PySpark example."""
from pyspark.sql import SparkSession
import sys



spark = SparkSession \
.builder \
.master('yarn') \
.appName('gcs-sparkdataframe-sql-avro') \
.getOrCreate()

# get spark datafrom from avro file in GCS

if len(sys.argv) != 4:
raise Exception("Exactly 3 arguments are required: <inputUri> <table1><table2>")

inputUri=sys.argv[1]
table1=sys.argv[2]
table2=sys.argv[3]

file = inputUri+'account_id_schema_new.avro'
df = spark.read.format('avro').load(file)


#create temp table
df.createOrReplaceTempView('bigtable')

# split temp table into 2 spark dataframes
df1 = spark.sql("select ACNO,%s from bigtable" % (",".join(df.columns[1:round(len(df.columns) / 2)])))
df2 = spark.sql("select ACNO,%s from bigtable" % (",".join(df.columns[round(len(df.columns) / 2):])))
df1.show(10)
df2.show(10)

# Saving the dataframes into avro files and dump avro files into GCS

df1.write.mode("overwrite").format('avro').save(table1,'avro')

df2.write.mode("overwrite").format('avro').save(table2,'avro')

关于生成文件,因为spark是基于hadoop的,所以文件也会分布式存储,所以我们可以看到

df = spark.read.format('avro').load(sys.argv[3])

一般是分区是会根据你的电脑的cpu核数自动分配,我的电脑是core i5,也就是四核的,所以默认是4

我们可以重分区:

df.repartition(10) # 就是分10区
df.rdd.getNumPartitions() #查看分区数
df.coalesce(1)