บันทึกกันลืม-ประสบการณ์การใช้ ChatGPT for Data Warehousing

Aekanun Thongtae
4 min readFeb 4, 2023

--

งานเขียนนี้ที่ผมทำขึ้นเพื่อกันลืม แต่ก็คิดว่าน่าเป็นประโยชน์อยู่บ้างจึงนำมาแชร์เลยเสียตอนนี้เลย ผมหวังว่างานเขียนนี้จะช่วยให้ผู้อ่านสามารถพัฒนาจากสิ่งที่ผมเขียนได้บ้าง และไปต่อได้ด้วยตนเองครับ

ก่อนหน้านี้ผมได้โพสใน Facebook แนะนำการใช้ ChatGPT ช่วยเขียนโค้ด PySpark ที่มีการทำ Regular expression ดึงข้อมูลสำคัญออกจาก unstructured text และบรรจุข้อมูลสำคัญเหล่านั้นไว้ใน Spark’s dataframe

งานเขียนนี้กำลังเล่าเหตุการณ์ต่อจากนั้น โดยผมยังคงทำงานร่วมกับ ChatGPT ในการเขียนโค้ดเพื่อใช้ dataframe ดังกล่าวในการทำ ETL ข้อมูลเข้าสู่ MSSQL

Prompt ด้านล่างนี้เป็นการคุยกับ ChatGPT เพื่อให้ ChatGPT ช่วยสร้างโค้ด PySpark ให้แทนที่จะเขียนเองทั้งหมด

I have a Spark dataframe with 8 columns like these: 
ip: string (nullable = true)
status: string (nullable = true)
size: string (nullable = true)
timestamp: string (nullable = true)
OS: string (nullable = true)

I would like to split these to many dataframes that will be write into MSSQL tables with the Star-schema for data warehouse. The Star-schema have 2 types of tables. The first type is fact table and the second one is dimension table.

The dimension tables have primary key to associate with foreign key of fact table. Avoid same name of keys. The primary key must be generated by monotonically () function and the foreign key is equal to the primary. The suffix name of primary key is "_pk", and the suffix name of foreign key is "_fk". All dataframes are able to join with the primary key and the foreign key.

Write pyspark code, especially all key construction with withColumns().

จาก Prompt ข้างต้น ได้ผลลัพธ์คือ โค้ด PySpark ที่มีการ implement การออกแบบโครงสร้างข้อมูลในแบบ Star-schema บน dataframe ได้อย่างเหมาะสม

# Import necessary libraries
from pyspark.sql.functions import monotonically_increasing_id

# Create the primary key for each dimension table
df_ip = df.select("ip").dropDuplicates().withColumn("ip_pk", monotonically_increasing_id())
df_status = df.select("status").dropDuplicates().withColumn("status_pk", monotonically_increasing_id())
df_size = df.select("size").dropDuplicates().withColumn("size_pk", monotonically_increasing_id())
df_timestamp = df.select("timestamp").dropDuplicates().withColumn("timestamp_pk", monotonically_increasing_id())
df_OS = df.select("OS").dropDuplicates().withColumn("OS_pk", monotonically_increasing_id())

# Create the foreign key for the fact table
df_fact = df.join(df_ip, "ip", "left").join(df_status, "status", "left").join(df_size, "size", "left").join(df_timestamp, "timestamp", "left").join(df_OS, "OS", "left")
df_fact = df_fact.withColumn("ip_fk", df_fact["ip_pk"]).withColumn("status_fk", df_fact["status_pk"]).withColumn("size_fk", df_fact["size_pk"]).withColumn("timestamp_fk", df_fact["timestamp_pk"]).withColumn("OS_fk", df_fact["OS_pk"])

# Write the dimension tables and the fact table into MSSQL
df_ip.write.format("jdbc").options(url="jdbc:sqlserver://<server_name>:<port>;database=<database_name>", driver="com.microsoft.sqlserver.jdbc.SQLServerDriver", dbtable="<table_name_ip>", user="<username>", password="<password>").mode("overwrite").save()
df_status.write.format("jdbc").options(url="jdbc:sqlserver://<server_name>:<port>;database=<database_name>", driver="com.microsoft.sqlserver.jdbc.SQLServerDriver", dbtable="<table_name_status>", user="<username>", password="<password>").mode("overwrite").save()
df_size.write.format("jdbc").options(url="jdbc:sqlserver://<server_name>:<port>;database=<database_name>", driver="com.microsoft.sqlserver.jdbc.SQLServerDriver", dbtable="<table_name_size>", user="<username>", password="<password>").mode("overwrite").save()
df_timestamp.write.format("jdbc").options(url="jdbc:sqlserver://<server_name>:<port>;database=<database_name>", driver="com.microsoft.sqlserver.jdbc.SQLServerDriver", dbtable="<table_name_timestamp>", user="<username>", password="<password

ในการ run ทดสอบโค้ด ผมได้ตัดโค้ดข้างบนเฉพาะส่วนที่เป็น MSSQL (4 บรรทัดสุดท้าย) ทิ้งไปก่อน เนื่องจากยังไม่ถึงเวลาที่จะเขียนลง MSSQL ส่วนที่เหลือได้นำไปทดลอง run แล้ว พบว่าได้ผลดี ผมจึง reward ให้กับ ChatGPT ด้วยการ กด like

รูปที่ 1 แสดงให้ reward กับ ChatGPT และโค้ด Pyspark ชิ้นแรก ซึ่งได้จาก ChatGPT โดยโค้ดนี้มีการ implement การออกแบบโครงสร้างข้อมูลในแบบ Star-schema

ณ จุดนี้แท้จริงแล้ว โค้ดนี้สามารถสร้าง dataframe หลายๆ ตัว ที่ผมสามารถดัดแปลงอีกนิดหน่อย ก็จะนำไป write ลงสู่ MSSQL เป็น fact และ dimension table ได้เลย แต่ผมยังอยากได้มากกว่านี้อีก จึงทำต่อไปครับ

Business analyst user can predefine columns. 

Please revise the code with automate naming variables as the predefined columns.

Prompt ด้านบน คือสิ่งที่ผมเขียนบอก ChatGPT ให้ทำงานเพิ่มให้ผมอีกครับ โดยเป็นการบอกให้ ChatGPT ช่วยแก้ไขโค้ดเดิมทั้งหมดให้มีความเป็น automate มากขึ้นครับ โดยกระบวนการ ETL จะต้องปรับเปลี่ยนไปตาม columns ที่ user เลือกมาเป็น dimension table ได้เองโดยอัตโนมัติ ChatGPT จึงได้เขียนโค้ดให้ใหม่ (ด้านล่าง)

# Import necessary libraries
from pyspark.sql.functions import monotonically_increasing_id

# Define the predefined columns for the dimension tables
dim_cols = ["ip", "status", "size", "timestamp", "OS"]

# Create the dimension tables and the primary keys
dim_tables = []
for dim_col in dim_cols:
dim_table = df.select(dim_col).dropDuplicates().withColumn(f"{dim_col}_pk", monotonically_increasing_id())
dim_tables.append(dim_table)

# Create the fact table by joining the dimension tables and adding the foreign keys
fact_table = df
for dim_table in dim_tables:
dim_col = dim_table.columns[0]
fact_table = fact_table.join(dim_table, dim_col, "left")
fact_table = fact_table.withColumn(f"{dim_col}_fk", fact_table[f"{dim_col}_pk"])

# Write the dimension tables and the fact table into MSSQL
for dim_table in dim_tables:
dim_col = dim_table.columns[0]
dim_table.write.format("jdbc").options(url="jdbc:sqlserver://<server_name>:<port>;database=<database_name>", driver="com.microsoft.sqlserver.jdbc.SQLServerDriver", dbtable=f"<table_name_{dim_col}>", user="<username>", password="<password>").mode("overwrite").save()
fact_table.write.format("jdbc").options(url="jdbc:sqlserver://<server_name>:<port>;database=<database_name>", driver="com.microsoft.sqlserver.jdbc.SQLServerDriver", dbtable="<table_name_fact>", user="<username>", password="<password>").mode("overwrite").save()

ผมได้นำโค้ดข้างบนนี้ไปทดสอบการ run โดยตัดส่วน MSSQL ออกเช่นเดิม ผลคือ ได้รับ dataframe จำนวน 6 ตัว โดย dataframe ตัวหนึ่งที่จะทำหน้าที่เป็น fact table ได้บรรจุคอลัมน์ที่ทำหน้าที่เป็น primary key หลายๆ ตัวเอาไว้ แต่จากหลักการด้าน Data Warehouse โดยทั่วไปแล้ว fact table ควรประกอบด้วย foreign key หลายๆ ตัวที่เชื่อมโยงไปยัง dimension table ได้ ผมจึงต้องคุยกับ ChatGPT ต่อไป เพื่อให้ ChatGPT หาทางเอา primary key เหล่านี้ออก ด้วย Prompt ด้านล่าง

before write fact_table to MSSQL, drop columns that have suffix of "_pk"

จาก Prompt ด้านบน ผมได้รับโค้ด Pyspark ที่สามารถ run ได้ปกติอีกเช่นเคย แต่ผลการ run พบว่า dataframe ที่ทำหน้าที่เป็น fact table ยังคงมีคอลัมน์ที่ซ้ำซ้อนกับ dataframe ที่ทำหน้าที่เป็น dimension table อยู่หลายคอลัมน์เลยทีเดียว ผมจึงต้องใช้ Prompt กับ ChatGPT อีกด้านล่าง

before write fact_table to MSSQL, drop columns that are same in dim_table, too.

ขั้นตอนสุดท้ายเป็นการคุยกับ ChatGPT ด้วย Prompt ด้านล่าง เพื่อให้ ChatGPT หาทางทำการแทนที่ค่าว่าง (empty value) ด้วย N/A

replace empty value in "OS" columns with "N/A"

และนี่คือผลลัพธ์สุดท้ายเป็นโค้ด Pyspark ที่นำไปปรับแต่งอีกนิดเดียว ก่อน run ใน Spark ให้ทำหน้าที่ ETL ข้อมูล unstructured text เข้าสู่ Data Warehouse

# Import necessary libraries
from pyspark.sql.functions import monotonically_increasing_id, when, lit
from pyspark.sql.types import StringType

# Define the predefined columns for the dimension tables
dim_cols = ["ip", "status", "size", "timestamp", "OS"]

# Replace empty values in the "OS" column with "N/A"
df = df.withColumn("OS", when(df["OS"] == "", lit("N/A")).otherwise(df["OS"]))

# Create the dimension tables and the primary keys
dim_tables = []
for dim_col in dim_cols:
dim_table = df.select(dim_col).dropDuplicates().withColumn(f"{dim_col}_pk", monotonically_increasing_id().cast(StringType()))
dim_tables.append(dim_table)

# Create the fact table by joining the dimension tables and adding the foreign keys
fact_table = df
for dim_table in dim_tables:
dim_col = dim_table.columns[0]
fact_table = fact_table.join(dim_table, dim_col, "left")
fact_table = fact_table.withColumn(f"{dim_col}_fk", fact_table[f"{dim_col}_pk"])

# Drop the columns with a suffix of "_pk" and the columns that are the same in both the fact table and the dimension tables from the fact table
cols_to_drop = [col for col in fact_table.columns if "_pk" in col or col in dim_cols]
fact_table = fact_table.drop(*cols_to_drop)

# Write the dimension tables and the fact table into MSSQL
for dim_table in dim_tables:
dim_col = dim_table.columns[0]
dim_table.write.format("jdbc").options(url="jdbc:sqlserver://<server_name>:<port>;database=<database_name>", driver="com.microsoft.sqlserver.jdbc.SQLServerDriver", dbtable=f"<table_name_{dim_col}>", user="<username>", password="<password>").mode("overwrite").save()
fact_table.write.format("jdbc").options(url="jdbc:sqlserver://<server_name>:<port>;database=<database_name>", driver="com.microsoft.sqlserver.jdbc.SQLServerDriver", dbtable="<table_name_fact>", user="<username>", password="<password>").mode("overwrite").save()

สรุปท้าย

ข้อเท็จจริงที่พบจากประสบการณ์การใช้ ChatGPT คือ

- มีความแตกต่างค่อนข้างมากระหว่าง ChatGPT-model 4 ที่ใช้ผลิตงานเขียนนี้ และ ChatGPT-model 3.x ที่ผมไม่คิดจะเลือกใช้เลยหลังจากใช้ model 4 แล้ว

- ผมสามารถพูดคุยหรือสอบถามกับ ChatGPT ด้วยภาษาไทยได้ และดูไม่แตกต่างในเชิงความถูกต้องของผลลัพธ์ของหลายๆ กรณี (ตอนเริ่มต้นงานเขียนนี้ใช้ภาษาอังกฤษคุยกับ chatgpt มาโดยตลอดเป็นส่วนใหญ่ แต่หลังจากนั้นไม่นาน… จนถึงขณะกำลังเขียนสรุปนี้ใช้ภาษาไทยเป็นส่วนใหญ่ล่ะครับ)

- ผมพบว่าการใช้เทคนิคด้าน Prompt Engineer เป็นสิ่งสำคัญก็จริงอยู่ แต่สิ่งสำคัญยิ่งกว่าคือ Business Logic ของผมเอง เหมือนกับว่า ผมกำลังคุยกับเพื่อนผู้เชี่ยวชาญ แต่เค้าไม่ได้เชี่ยวชาญในรายละเอียดงานของผมเสมอไป ซึ่งทำให้คิดต่อไปได้ว่า ถ้าให้เลือกระหว่างการเรียนพัฒนาโค้ดโปรแกรม หรือการทบทวน Business ให้ชัดเจนแล้ว

- เพื่อให้ ChatGPT ตอบได้ตรงประเด็น ผมมักจะเริ่มด้วยประโยคบอกเล่า โดยเล่าเรื่องให้ ChatGPT เข้าใจก่อนว่า งานของผมอยู่ในบริบทหรือสภาพแวดล้อมใด ถ้าผมต้องการให้ ChatGPT ทำการ process data ให้ ผมก็มักจะมีข้อมูลตัวอย่างที่ตัดมาเฉพาะบางส่วนส่งให้ ChatGPT ด้วย

- ผมไม่เขินอายที่จะใช้ภาษาไทย เพื่อพูดคุยกับ ChatGPT เป็นภาษาไทยอีกต่อไปแล้วครับ :)

--

--

Aekanun Thongtae

Experienced Senior Big Data & Data Science Consultant with a history of working in many enterprises and various domains . Skilled in Apache Spark, and Hadoop.