AWS Glueは個人的に好きなサービスでよく使います。
自分がよく使う情報源やメモについてまとめておきます。
※PySparkジョブをよく使うのでそれ関連のメモが中心です
目次
- 目次
- 資料
- Glue Jobのスクリプトメモ
資料
APIリファレンス(よく見る)
公式のリファレンス。言わずもがな。
公式のサンプル集(たまに見る)
実装例やFAQが載っている。
パフォーマンスチューニングガイド(よく見る)
AWS JapanのSAの方作成のETLパフォーマンスチューニングガイド。
かなり詳細に説明されていてわかりやすい。個人的に超おすすめ資料。
https://d1.awsstatic.com/webinars/jp/pdf/services/202108_Blackbelt_glue_etl_performance1.pdf
https://d1.awsstatic.com/webinars/jp/pdf/services/202108_Blackbelt_glue_etl_performance2.pdf
※動画も上がっている。
Glue Jobのスクリプトメモ
Glue Job(PySpark) のスクリプトの構成のメモを示す。
コードはAWS公式ドキュメント から抜粋。
0.全体的な流れ
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job # 1.エントリーポイントの作成 sc = SparkContext().getOrCreate() glueContext = GlueContext(sc) spark = glueContext.spark_session # 2.ジョブの初期化 args = getResolvedOptions(sys.argv, ['JOB_NAME']) job = Job(glueContext) job.init(args['JOB_NAME'], args) # 3.データの加工 datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "hr", table_name = "emp", transformation_ctx = "datasource0", additional_options = {"jobBookmarkKeys":["empno"],"jobBookmarkKeysSortOrder":"asc"}) applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("ename", "string", "ename", "string"), ("hrly_rate", "decimal(38,0)", "hrly_rate", "decimal(38,0)"), ("comm", "decimal(7,2)", "comm", "decimal(7,2)"), ("hiredate", "timestamp", "hiredate", "timestamp"), ("empno", "decimal(5,0)", "empno", "decimal(5,0)"), ("mgr", "decimal(5,0)", "mgr", "decimal(5,0)"), ("photo", "string", "photo", "string"), ("job", "string", "job", "string"), ("deptno", "decimal(3,0)", "deptno", "decimal(3,0)"), ("ssn", "decimal(9,0)", "ssn", "decimal(9,0)"), ("sal", "decimal(7,2)", "sal", "decimal(7,2)")], transformation_ctx = "applymapping1") # 4.データの保存 datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://hr/employees"}, format = "csv", transformation_ctx = "datasink2") # 5.ジョブのコミット job.commit()
1.API操作のためのエントリーポイント作成する
- 以下のようにAPI操作のためのエントリーポイントを作成する。
sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) spark = glueContext.spark_session
PySpark単体で使う時は、今はSparkSession
が標準だがGlueではSparkContext
を直接使用している(基本この部分は自動生成されることが多いはず)。
SparkSession
の内部でSparkContext
が保持されているため、PySpark2以降では今は前者を使うのが推奨。
また SparkContext.getOrCreate()
ではなくSparkContext()
となっている事もある。これらの違いとして、前者は既存のSparkContext
があれば使い回す、後者は常にSparkContext
を生成して古いものは破棄すると言う動作になる。
基本的には前者が推奨される。
2.ジョブの初期化
- ETLジョブとして使用する場合のみ記載する。
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'HOGE']) # ジョブの実行時にスクリプトに渡される引数を取得(ここではJOB_NAMEを取得) job = Job(glue_context) # ジョブのインスタンスを生成 job.init(args['JOB_NAME'], args) # ジョブの名前と変数を渡して初期化 print(args['HOGE'])
3. データの加工
- データをDynamicFrameとして読み込んで加工する。
- DynamicFrameのまま加工するか、DataFrameに変換してからPySparkのAPIで加工し、DynamicFrameに戻すか、で行うのが基本
データの読み込み
- データカタログからDynamicFrameを作成
dynamic_df = glueContext.create_dynamic_frame.from_catalog( database = "DatabaseName", table_name = "TableName", transformation_ctx = 'datasource' #ブックマークを使用する場合に指定するコンテキストオブジェクト。ブックマーク状態のキーのため一意な名前であればなんでも良い。 ) datasource0 = glueContext.create_dynamic_frame.from_catalog( database = "hr", table_name = "emp", transformation_ctx = "datasource0", additional_options = {"jobBookmarkKeys":["empno"],"jobBookmarkKeysSortOrder":"asc"} # 明示的にブックマークキーとして指定する場合 )
DynamicFrameとDataFrameの相互変換
toDF:DynamicFrameをDataFrameに変換
df = dynamic_df.toDF()
fromDF:DataFrameをDynamicFrameに変換
dynamic_df = DynamicFrame.fromDF(df, glueContext, "dynamic_df) # 変換元のdf, glueContext, Dynamic Frameにつける名前を指定
DynamicFrameの加工API
SelectFields.apply:列の抽出
dynamic_df = SelectFields.apply(dynamic_df, paths=["foo", "bar"]) # fooとbarを抽出
DropFields.apply:列の削除
dynamic_df = DropFields.apply(dynamic_df,["foo", "bar"]) # dynamic_dfからfooとbarを削除
Map.apply:全てのレコードに関数を適用
dynamic_df = Map.apply(dynamic_df, f = func) # 全てのレコードにfuncを適用
rename_field:列をリネーム
dynamic_df = dynamic_df.rename_field("from_name", "to_name")
ApplyMapping.apply:名称と型を変換
dynamic_df = ApplyMapping.apply(frame = dynamic_df, mappings = [("col0", "string", "int_col0", "int")], # col0(string)を int_col0(int) に変換 transformation_ctx = "applymapping1" # ブックマークを使用する場合 )
Join.apply:列を元にDynamicFrameを結合
dynamic_df_union = Join.apply(dynamic_df1, dynamic_df2, 'id', 'id') # idをキーにdynamic_df1, dynamic_df2を結合
SplitFields.apply:列単位で分割
dynamic_df_collection = SplitFields.apply(dynamic_df, ["foo","bar"], "dynamic_df_1", "dynamic_df_2") # fooとbarをdynamic_df_1、それ以外をdynamic_df_2として分割し格納 # コレクションから分割したdfを取得 dynamic_df_1 = dynamic_df_collection.select("dynamic_df_1") dynamic_df_2 = dynamic_df_collection.select("dynamic_df_2")
SplitRows.apply:行単位で分割
dynamic_df_collection = SplitRows.apply(dynamic_df, {"foo": {">": 5000.00}}, "dynamic_df_1", "dynamic_df_2") # fooが5000より大きいレコードはdynamic_df_1、それ以外はdynamic_df_2に分割 # コレクションから分割したdfを取得 dynamic_df_1 = dynamic_df_collection.select("dynamic_df_1") dynamic_df_2 = dynamic_df_collection.select("dynamic_df_2")
4. データの書き込み
- 加工したデータをどこかに書き込む。以下はs3に保存する場合。
glueContext.write_dynamic_frame.from_options(dynamic_df, connection_type = "s3", # s3を指定 connection_options = {"path": "s3://xxxx"}, # 保存先のパス format = "json" # 形式はjsonを指定。他はcsv, xml, parquet, avroなど transformation_ctx="hogehoge", # bookmarkを使用する場合 )
5. ジョブのコミット
- ETLジョブとして使用する場合は最後に以下を追加する。
job.commit() # ジョブをコミット