PySparkを使用する機会が増えてきたので、個人的によく見返しすメモをまとめておきます。全般的にRDDではなくDataFrameを使用しています。
AWSでSparkが使えるサービスと言うとGlueやEMRが定番の印象ですが、最近ではAthenaやRedshiftでも使えるようになりました。 今後さらに利用シーンが増えていきそうです。
※PySparkのAPIのリファレンスとしては、以下のDatabricks Japanの方が書かれた逆引きの記事が参考になると思います。
※以下は公式のリファレンス
目次
1.基礎知識
(1)API操作のためのエントリーポイント作成する
- 以下のようにAPI操作のためのエントリーポイントを作成し、ここを経由して操作するのが基本である。
from pyspark.sql import SparkSession spark = SparkSession.builder.appName("hogehoge").getOrCreate() #appNameは任意
(2)PySparkは遅延評価である
- Sparkの処理は「変換」と「アクション」がある。※以下に記載しているメソッドは全量ではない。
- 「変換」はデータの生成、加工を行う。すぐには実行されない。
- read()
- filter()
- select()
- join()
- 「アクション」は処理結果の出力を行う。アクション実行時にそれまでの変換が全て行われる。
- show()
- count()
- save()
- 「変換」はデータの生成、加工を行う。すぐには実行されない。
- 要するに「アクション」が呼び出されるまでは、「変換」は実行されない(遅延評価。必要になるまで処理を保留にしておくイメージ)。
- 遅延評価をすることにより大規模のデータを扱いやすくなっている。加工のたびに大規模なデータを処理すると、その度に処理に時間がかかる。しかし遅延評価とすることで必要な時にまとめて処理を行える仕組みとなっている。
例)
df = spark.read.json(hogehoge.json) # データ読み込み。この段階ではまだ処理自体は実行しない。 df = df.select("foo", "bar") # fooとbarのみ抽出。この段階ではまだ処理自体は実行しない。 df.show() # データの中身を確認。このタイミングで初めてread()やselect()含めて実行する。
※何が変換で何がアクションかは以下がよく纏まっている。
引用元:https://www.slideshare.net/SparkSummit/visual-api-training
(3)DataFrameはImmutable(不変)である
- DataFrameはImmutableで直接変更することはできない。変換後は新しいDataFrameが生成されるため、それを元の変数 or 別の変数に代入していく。
例)
df.select("foo", "bar") # foo列とbar列のみ抽出して新しいDataFrameを返す。元のDataFrame(dfは変更されていない)。 df = df.select("foo", "bar") # 抽出したものを元の変数(df)に格納。これはdfをどんどん上書きして変換していくイメージ new_df = df.select("foo", "bar") # 抽出したものを別の変数(new_df)に格納。
2.個人的よく使うAPI
- 個人的によく使っているAPIを記載(そのため網羅的ではない)
- 使用時のPySparkのバージョンは2.1.1(なので今は古いものあるかも)
(0)共通系
列指定の方法としては以下4パターンがある。以下は全て df
から foo
という列を取得する方法であり、全て同じ結果になる。
個人的な感触として、パターン2~4であればほとんどのAPIは対応している印象。
# パターン1:列名を直接指定 df = df.select('foo') # パターン2:col関数を使用して列めいを指定 import pyspark.sql.functions as F df = df.select(F.col('foo')) # パターン3:「オブジェクト名.列名」で指定 df = df.select(df.foo) # パターン4:「オブジェクト名[列名]」で指定 df = df.select(df['foo'])
(1)加工、保存系
read:ファイルを読み込む
df = spark.read.json("/202112_trj/hogehoge.json") # jsonを読み込む場合。csvなども同様に使える df = spark.read.json("/202112_trj/*.json") # もちろん正規表現も使用可能
select:特定の列のみ抽出
df = df.select("foo", "bar") # fooとbarのみ抽出。
where, filter:特定の条件でフィルタリング(どっちでも結果は同じ
df.filter(df.foo=='aaa') # fooがaaaであるデータのみ抽出 df.where(df.foo=='aaa') # whereを使ったパターン。結果は↑と同じ
drop:特定の項目を削除する。
df.drop('foo') # DataFrameからfooを削除する。
withColumn:列を追加する。変換を加えた列を追加する際によく使う。
df = df.withColumn('new_foo', df.foo*2) # fooを2倍したものをnew_fooとして新規の列で追加。 df = df.drop('foo') # 元々のfooが不要になる場合はdropする
alias:列に別名をつける。selectで特定の列を別名として取得する際によく使う。
df = df.select(df.foo.alias('bar')) # fooという列を取得するが、列名はbarとする
drop_duplicates:重複するレコードを削除
df = df.drop_duplicates() # 引数を指定しない場合は全列一致で判定 df = df.drop_duplicates(["c0"]) # 列を指定した場合はその列のみで判定
groupby:いろいろ集計
df_job = df.groupby('job') \ .count() \ .sort("count", ascending=False) # jobで集計してカウントして降順でソート
explode:配列を展開する際に使う。
""" 以下のようなに1レコードに配列が入っている場合に +---+---------+ | id| values| +---+---------+ | 1|[a, b, c]| +---+---------+ 以下のように展開したい時に使う +---+----+ | id|value| +---+----+ | 1| a| | 1| b| | 1| c| +---+----+ """ df = df.withColumn('value', explode(df.values)) # valuesを展開したものをvalueとして新規追加 df = df.drop('values') # valuesは不要なので消す
saveAsTable:加工データをテーブルとして保存する。
df.write.format('parquet').saveAsTable('database_name.hogehoge_table') # dfをhogehoge_tableとして保存。フォーマットはparquetを使うことが多いが、他も使用可。
(2) データ確認系
count:データ数を取得する(アクションなので注意)
df.count() # データ数(レコード数)がわかる
show:データの中身を確認する(アクションなので注意)
df.show() # データの中身をチラ見できる。デフォルトだと20行など df.show(5) # 数値を指定すればその数だけ見れる
describe:統計量を取得する(アクションなので注意)
df.describe(df.foo).show() #foo列のデータ数、平均値、標準偏差、最小値、最大値を表示する
printSchema:スキーマが見れる。どんな項目があるかや、その型など
df.printSchema() #スキーマを表示