Azure Synapse PySpark Toolbox 002: DataFrame Transformation

Summarize this content to 400 words in Persian Lang
محتویات جعبه ابزار
تبدیل پایه DataFrame ستون های تجمعی را از DataFrame فرزند ایجاد کنید
مشخصات محیطی
Azure Synapse Runtime برای Apache Spark 3.4
Azure Data Lake Storage
خرک کلید لاجوردی
وابستگی های وارداتی
import requests
from pyspark.sql import types as T, functions as F
import json
import datetime
import logging
وارد حالت تمام صفحه شوید
از حالت تمام صفحه خارج شوید
تبدیل پایه DataFrame
تبدیل پایه DataFrame (افزودن ستونهای جدید، تغییر نام، پر کردن مقادیر na، فیلتر ردیفها، انتخاب ستونها، رها کردن موارد تکراری، برگرداندن یک شی PySpark Dataframe)
def df_transform_basic(
df_input, #DataFrame
step1_new_cols, #List of Dictionaries [{‘new_col_name’: ‘xxx’, ‘new_col_expr’: ‘expression string for F.expr() function’}]
step2_col_name_mapping, #Dictionary of old-new names mapping
step3_na_values,
step4_filter_expr, #String parameter for df.filter() function
step5_result_col_list, #List of column names
step6_drop_dup, #True/False
timestamp_name
):
df_output = df_input
if step1_new_cols:
for val in step1_new_cols:
df_output = df_output.withColumn(val[‘new_col_name’], F.expr(val[‘new_col_expr’]))
if step2_col_name_mapping:
df_output = df_output.withColumnsRenamed(step2_col_name_mapping)
if step3_na_values:
df_output = df_output.na.fill(step3_na_values)
if step4_filter_expr:
df_output = df_output.filter(step4_filter_expr)
if step5_result_col_list:
df_output = df_output.select(*step5_result_col_list)
if step6_drop_dup:
df_output = df_output.dropDuplicates()
if timestamp_name:
df_output = df_output.withColumn(timestamp_name, F.lit(F.current_timestamp()))
return df_output
وارد حالت تمام صفحه شوید
از حالت تمام صفحه خارج شوید
بازگشت به بالا
ستون های تجمعی را از DataFrame فرزند ایجاد کنید
ستون های تجمعی را بر اساس داده های DataFrame فرزند ایجاد کنید و DataFrame به روز شده والدین را برمی گرداند. پیکربندی هر تجمع در شیء دیکشنری summary_config تعریف شده است.
def df_summary(
parent_df, #DataFrame
child_df, #DataFrame
summary_config
):
# Example of summary_config
# summary_config=[{‘agg’ : {‘RevisionNum’: ‘count’}, ‘agg_result_col_name’ : ‘CountOfRevisions’, ‘na_fill_value’ : 0, ‘join_on’ = [‘Contract’, ‘RevisionNum’]},
# {‘agg’ : {‘RevisionNum’: ‘max’}, ‘agg_result_col_name’ : ‘RecentRevNum’, ‘na_fill_value’ : 0, ‘join_on’ = [‘Contract’, ‘RevisionNum’]}]
if summary_config:
for summary_item in summary_config:
agg_col = list(summary_item[‘agg’])[0]
agg_fun = summary_item[‘agg’][agg_col]
child_df_grouped = child_df.groupBy(*join_on).agg(summary_item[‘agg’]).withColumnRenamed(f”{agg_fun}({agg_col})”, summary_item[‘agg_result_col_name’])
parent_df = parent_df.join(child_df_grouped, summary_item[‘agg’][‘join_on’], ‘left’)
na_value = {}
na_value[summary_item[‘agg_result_col_name’]] = summary_item[‘na_fill_value’]
parent_df = parent_df.na.fill(na_value)
return parent_df
وارد حالت تمام صفحه شوید
از حالت تمام صفحه خارج شوید
بازگشت به بالا
محتویات جعبه ابزار
تبدیل پایه DataFrame
ستون های تجمعی را از DataFrame فرزند ایجاد کنید
مشخصات محیطی
- Azure Synapse Runtime برای Apache Spark 3.4
- Azure Data Lake Storage
- خرک کلید لاجوردی
وابستگی های وارداتی
import requests
from pyspark.sql import types as T, functions as F
import json
import datetime
import logging
تبدیل پایه DataFrame
تبدیل پایه DataFrame (افزودن ستونهای جدید، تغییر نام، پر کردن مقادیر na، فیلتر ردیفها، انتخاب ستونها، رها کردن موارد تکراری، برگرداندن یک شی PySpark Dataframe)
def df_transform_basic(
df_input, #DataFrame
step1_new_cols, #List of Dictionaries [{'new_col_name': 'xxx', 'new_col_expr': 'expression string for F.expr() function'}]
step2_col_name_mapping, #Dictionary of old-new names mapping
step3_na_values,
step4_filter_expr, #String parameter for df.filter() function
step5_result_col_list, #List of column names
step6_drop_dup, #True/False
timestamp_name
):
df_output = df_input
if step1_new_cols:
for val in step1_new_cols:
df_output = df_output.withColumn(val['new_col_name'], F.expr(val['new_col_expr']))
if step2_col_name_mapping:
df_output = df_output.withColumnsRenamed(step2_col_name_mapping)
if step3_na_values:
df_output = df_output.na.fill(step3_na_values)
if step4_filter_expr:
df_output = df_output.filter(step4_filter_expr)
if step5_result_col_list:
df_output = df_output.select(*step5_result_col_list)
if step6_drop_dup:
df_output = df_output.dropDuplicates()
if timestamp_name:
df_output = df_output.withColumn(timestamp_name, F.lit(F.current_timestamp()))
return df_output
بازگشت به بالا
ستون های تجمعی را از DataFrame فرزند ایجاد کنید
ستون های تجمعی را بر اساس داده های DataFrame فرزند ایجاد کنید و DataFrame به روز شده والدین را برمی گرداند. پیکربندی هر تجمع در شیء دیکشنری summary_config تعریف شده است.
def df_summary(
parent_df, #DataFrame
child_df, #DataFrame
summary_config
):
# Example of summary_config
# summary_config=[{'agg' : {'RevisionNum': 'count'}, 'agg_result_col_name' : 'CountOfRevisions', 'na_fill_value' : 0, 'join_on' = ['Contract', 'RevisionNum']},
# {'agg' : {'RevisionNum': 'max'}, 'agg_result_col_name' : 'RecentRevNum', 'na_fill_value' : 0, 'join_on' = ['Contract', 'RevisionNum']}]
if summary_config:
for summary_item in summary_config:
agg_col = list(summary_item['agg'])[0]
agg_fun = summary_item['agg'][agg_col]
child_df_grouped = child_df.groupBy(*join_on).agg(summary_item['agg']).withColumnRenamed(f"{agg_fun}({agg_col})", summary_item['agg_result_col_name'])
parent_df = parent_df.join(child_df_grouped, summary_item['agg']['join_on'], 'left')
na_value = {}
na_value[summary_item['agg_result_col_name']] = summary_item['na_fill_value']
parent_df = parent_df.na.fill(na_value)
return parent_df
بازگشت به بالا