mazyu36の日記

某SIer所属のクラウドエンジニアのブログ

Glueの個人的チートシート

AWS Glueは個人的に好きなサービスでよく使います。

自分がよく使う情報源やメモについてまとめておきます。

※PySparkジョブをよく使うのでそれ関連のメモが中心です

目次

資料

APIリファレンス(よく見る)

公式のリファレンス。言わずもがな。

docs.aws.amazon.com

公式のサンプル集(たまに見る)

実装例やFAQが載っている。

github.com

パフォーマンスチューニングガイド(よく見る)

AWS JapanのSAの方作成のETLパフォーマンスチューニングガイド。

かなり詳細に説明されていてわかりやすい。個人的に超おすすめ資料。

https://d1.awsstatic.com/webinars/jp/pdf/services/202108_Blackbelt_glue_etl_performance1.pdf

https://d1.awsstatic.com/webinars/jp/pdf/services/202108_Blackbelt_glue_etl_performance2.pdf

※動画も上がっている。

www.youtube.com

www.youtube.com

Glue Jobのスクリプトメモ

Glue Job(PySpark) のスクリプトの構成のメモを示す。

コードはAWS公式ドキュメント から抜粋。

0.全体的な流れ

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

# 1.エントリーポイントの作成
sc = SparkContext().getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

# 2.ジョブの初期化
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# 3.データの加工
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "hr", table_name = "emp", transformation_ctx = "datasource0", additional_options = {"jobBookmarkKeys":["empno"],"jobBookmarkKeysSortOrder":"asc"})
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("ename", "string", "ename", "string"), ("hrly_rate", "decimal(38,0)", "hrly_rate", "decimal(38,0)"), ("comm", "decimal(7,2)", "comm", "decimal(7,2)"), ("hiredate", "timestamp", "hiredate", "timestamp"), ("empno", "decimal(5,0)", "empno", "decimal(5,0)"), ("mgr", "decimal(5,0)", "mgr", "decimal(5,0)"), ("photo", "string", "photo", "string"), ("job", "string", "job", "string"), ("deptno", "decimal(3,0)", "deptno", "decimal(3,0)"), ("ssn", "decimal(9,0)", "ssn", "decimal(9,0)"), ("sal", "decimal(7,2)", "sal", "decimal(7,2)")], transformation_ctx = "applymapping1")

# 4.データの保存 
datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://hr/employees"}, format = "csv", transformation_ctx = "datasink2")

# 5.ジョブのコミット
job.commit()

1.API操作のためのエントリーポイント作成する

  • 以下のようにAPI操作のためのエントリーポイントを作成する。
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

PySpark単体で使う時は、今はSparkSessionが標準だがGlueではSparkContextを直接使用している(基本この部分は自動生成されることが多いはず)。

SparkSessionの内部でSparkContextが保持されているため、PySpark2以降では今は前者を使うのが推奨。

また SparkContext.getOrCreate()ではなくSparkContext()となっている事もある。これらの違いとして、前者は既存のSparkContextがあれば使い回す、後者は常にSparkContextを生成して古いものは破棄すると言う動作になる。 基本的には前者が推奨される。

2.ジョブの初期化

  • ETLジョブとして使用する場合のみ記載する。
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'HOGE'])  # ジョブの実行時にスクリプトに渡される引数を取得(ここではJOB_NAMEを取得) 

job = Job(glue_context)  # ジョブのインスタンスを生成
job.init(args['JOB_NAME'], args)  # ジョブの名前と変数を渡して初期化

print(args['HOGE'])

3. データの加工

  • データをDynamicFrameとして読み込んで加工する。
  • DynamicFrameのまま加工するか、DataFrameに変換してからPySparkのAPIで加工し、DynamicFrameに戻すか、で行うのが基本

データの読み込み

  • データカタログからDynamicFrameを作成
dynamic_df = glueContext.create_dynamic_frame.from_catalog(
    database = "DatabaseName", 
    table_name = "TableName",
    transformation_ctx = 'datasource'  #ブックマークを使用する場合に指定するコンテキストオブジェクト。ブックマーク状態のキーのため一意な名前であればなんでも良い。
)

datasource0 = glueContext.create_dynamic_frame.from_catalog(
    database = "hr", 
    table_name = "emp", 
    transformation_ctx = "datasource0", 
    additional_options = {"jobBookmarkKeys":["empno"],"jobBookmarkKeysSortOrder":"asc"}  # 明示的にブックマークキーとして指定する場合
)

DynamicFrameとDataFrameの相互変換

toDF:DynamicFrameをDataFrameに変換

df = dynamic_df.toDF()

fromDF:DataFrameをDynamicFrameに変換

dynamic_df = DynamicFrame.fromDF(df, glueContext, "dynamic_df)  # 変換元のdf, glueContext, Dynamic Frameにつける名前を指定

DynamicFrameの加工API

SelectFields.apply:列の抽出

dynamic_df = SelectFields.apply(dynamic_df, paths=["foo", "bar"])  # fooとbarを抽出

DropFields.apply:列の削除

dynamic_df = DropFields.apply(dynamic_df,["foo", "bar"])  # dynamic_dfからfooとbarを削除

Map.apply:全てのレコードに関数を適用

dynamic_df =  Map.apply(dynamic_df, f = func)  # 全てのレコードにfuncを適用

rename_field:列をリネーム

dynamic_df = dynamic_df.rename_field("from_name", "to_name")

ApplyMapping.apply:名称と型を変換

dynamic_df = ApplyMapping.apply(frame = dynamic_df, 
    mappings = [("col0", "string", "int_col0", "int")],  # col0(string)を int_col0(int) に変換
    transformation_ctx = "applymapping1"  # ブックマークを使用する場合
)

Join.apply:列を元にDynamicFrameを結合

dynamic_df_union = Join.apply(dynamic_df1, dynamic_df2, 'id', 'id')  # idをキーにdynamic_df1, dynamic_df2を結合

SplitFields.apply:列単位で分割

dynamic_df_collection = SplitFields.apply(dynamic_df, ["foo","bar"], "dynamic_df_1", "dynamic_df_2")  # fooとbarをdynamic_df_1、それ以外をdynamic_df_2として分割し格納

# コレクションから分割したdfを取得
dynamic_df_1 = dynamic_df_collection.select("dynamic_df_1")
dynamic_df_2 = dynamic_df_collection.select("dynamic_df_2")

SplitRows.apply:行単位で分割

dynamic_df_collection = SplitRows.apply(dynamic_df, {"foo": {">": 5000.00}}, "dynamic_df_1", "dynamic_df_2")  # fooが5000より大きいレコードはdynamic_df_1、それ以外はdynamic_df_2に分割

# コレクションから分割したdfを取得
dynamic_df_1 = dynamic_df_collection.select("dynamic_df_1")
dynamic_df_2 = dynamic_df_collection.select("dynamic_df_2")

4. データの書き込み

  • 加工したデータをどこかに書き込む。以下はs3に保存する場合。
glueContext.write_dynamic_frame.from_options(dynamic_df, 
    connection_type = "s3",  # s3を指定
    connection_options = {"path": "s3://xxxx"},  # 保存先のパス 
    format = "json"  # 形式はjsonを指定。他はcsv, xml, parquet, avroなど
    transformation_ctx="hogehoge",  # bookmarkを使用する場合
)

5. ジョブのコミット

  • ETLジョブとして使用する場合は最後に以下を追加する。
job.commit()  # ジョブをコミット