Skip to main content

今天有意思啦 !! big query + pyspark + Dataproc

流程很简单

我们要用spark读取从bigquery读取table,然后我们对这个table做一个简单的处理,再分成两dataframe对象,然后把两个对象写入bigquery

1. spark 初始化,因为要读取成dataframe或者sql形式,导入SparkSession

#!/usr/bin/python
from pyspark.sql import SparkSession

2. 创建spark对象

spark = SparkSession.builder.master('yarn').appName('your app name').getOrCreate()

3 我们通过connector连接一个google storage bucket 给Bigquery输出数据临时用

bucket = "haha_mm_bucket"
spark.conf.set('temporaryGcsBucket',bucket)

4 配置好tmp bucket,我们可以开始读取数据,并且把数据框注册为视图

df = spark.read.format('bigquery').option('table','datasetid:tableid').load()
df.createTempView("temp table name(比如words)")
也可以是df.createOrReplaceTempView('words') 这样就可以覆盖原来同样名字的临时视图

5 开始使用sql语句

lefttable = spark.sql("SELECT ACNO, FIELD_1, FIELD_2 FROM words")
righttable = spark.sql("SELECT ACNO, FIELD_3, FIELD_4 FROM words")
lefttable.show()
lefttable.printSchema()
righttable.show()
righttable.printSchema()

6 处理好的dataframe对象写入bigquery (注意,用sql处理过后的还是dataframe对象)

lefttable.write.format('bigquery').option('table','query-11:newdata.lefttable').save()
righttable.write.format('bigquery').option('table','query-11:newdata.righttable').save()

7 去到终端输入命令,提交spark job

gcloud dataproc jobs submit pyspark wordcount.py \
--cluster cluster-name \
--region cluster-region (example: "us-central1") \
--jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar

主要格式: gcloud dataproc jobs submit pyspark python.py(python文件) \ --cluster cluster-name \ --region cluster-region(比如:us-central1,一定要对应dataproc集群的region) --jars 与biguqery连接的包 注意这里的jars: If you are using Dataproc image 1.5, add the following parameter: --jars=gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar If you are using Dataproc image 1.4 or below, add the following parameter: --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar

gcloud config set dataproc/region us-central1
BUCKET_NAME=haha_mm_bucket
input=new.avro
gcloud dataproc jobs submit pyspark wordcount3.py \
--cluster cluster-662b \
-- gs://${BUCKET_NAME}/${input} \
--jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar \
--packages com.databricks:spark-avro_2.11:4.0.0