برنامه نویسی

Azure Synapse PySpark Toolbox 002: DataFrame Transformation

Summarize this content to 400 words in Persian Lang
محتویات جعبه ابزار

تبدیل پایه DataFrame ستون های تجمعی را از DataFrame فرزند ایجاد کنید

مشخصات محیطی

Azure Synapse Runtime برای Apache Spark 3.4
Azure Data Lake Storage
خرک کلید لاجوردی

وابستگی های وارداتی

import requests
from pyspark.sql import types as T, functions as F
import json
import datetime
import logging

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

تبدیل پایه DataFrame

تبدیل پایه DataFrame (افزودن ستون‌های جدید، تغییر نام، پر کردن مقادیر na، فیلتر ردیف‌ها، انتخاب ستون‌ها، رها کردن موارد تکراری، برگرداندن یک شی PySpark Dataframe)

def df_transform_basic(
df_input, #DataFrame
step1_new_cols, #List of Dictionaries [{‘new_col_name’: ‘xxx’, ‘new_col_expr’: ‘expression string for F.expr() function’}] step2_col_name_mapping, #Dictionary of old-new names mapping
step3_na_values,
step4_filter_expr, #String parameter for df.filter() function
step5_result_col_list, #List of column names
step6_drop_dup, #True/False
timestamp_name
):
df_output = df_input

if step1_new_cols:
for val in step1_new_cols:
df_output = df_output.withColumn(val[‘new_col_name’], F.expr(val[‘new_col_expr’]))

if step2_col_name_mapping:
df_output = df_output.withColumnsRenamed(step2_col_name_mapping)

if step3_na_values:
df_output = df_output.na.fill(step3_na_values)

if step4_filter_expr:
df_output = df_output.filter(step4_filter_expr)

if step5_result_col_list:
df_output = df_output.select(*step5_result_col_list)

if step6_drop_dup:
df_output = df_output.dropDuplicates()

if timestamp_name:
df_output = df_output.withColumn(timestamp_name, F.lit(F.current_timestamp()))

return df_output

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

بازگشت به بالا

ستون های تجمعی را از DataFrame فرزند ایجاد کنید

ستون های تجمعی را بر اساس داده های DataFrame فرزند ایجاد کنید و DataFrame به روز شده والدین را برمی گرداند. پیکربندی هر تجمع در شیء دیکشنری summary_config تعریف شده است.

def df_summary(
parent_df, #DataFrame
child_df, #DataFrame
summary_config
):

# Example of summary_config
# summary_config=[{‘agg’ : {‘RevisionNum’: ‘count’}, ‘agg_result_col_name’ : ‘CountOfRevisions’, ‘na_fill_value’ : 0, ‘join_on’ = [‘Contract’, ‘RevisionNum’]},
# {‘agg’ : {‘RevisionNum’: ‘max’}, ‘agg_result_col_name’ : ‘RecentRevNum’, ‘na_fill_value’ : 0, ‘join_on’ = [‘Contract’, ‘RevisionNum’]}]

if summary_config:
for summary_item in summary_config:
agg_col = list(summary_item[‘agg’])[0] agg_fun = summary_item[‘agg’][agg_col] child_df_grouped = child_df.groupBy(*join_on).agg(summary_item[‘agg’]).withColumnRenamed(f”{agg_fun}({agg_col})”, summary_item[‘agg_result_col_name’])
parent_df = parent_df.join(child_df_grouped, summary_item[‘agg’][‘join_on’], ‘left’)
na_value = {}
na_value[summary_item[‘agg_result_col_name’]] = summary_item[‘na_fill_value’] parent_df = parent_df.na.fill(na_value)

return parent_df

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

بازگشت به بالا

محتویات جعبه ابزار


تبدیل پایه DataFrame
ستون های تجمعی را از DataFrame فرزند ایجاد کنید


مشخصات محیطی

  • Azure Synapse Runtime برای Apache Spark 3.4
  • Azure Data Lake Storage
  • خرک کلید لاجوردی

وابستگی های وارداتی

import requests
from pyspark.sql import types as T, functions as F
import json
import datetime
import logging
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید


تبدیل پایه DataFrame

تبدیل پایه DataFrame (افزودن ستون‌های جدید، تغییر نام، پر کردن مقادیر na، فیلتر ردیف‌ها، انتخاب ستون‌ها، رها کردن موارد تکراری، برگرداندن یک شی PySpark Dataframe)

def df_transform_basic(
    df_input, #DataFrame
    step1_new_cols, #List of Dictionaries [{'new_col_name': 'xxx', 'new_col_expr': 'expression string for F.expr() function'}]
    step2_col_name_mapping, #Dictionary of old-new names mapping
    step3_na_values,
    step4_filter_expr, #String parameter for df.filter() function
    step5_result_col_list, #List of column names
    step6_drop_dup, #True/False
    timestamp_name
):
    df_output = df_input

    if step1_new_cols:
        for val in step1_new_cols:
            df_output = df_output.withColumn(val['new_col_name'], F.expr(val['new_col_expr']))

    if step2_col_name_mapping:
        df_output = df_output.withColumnsRenamed(step2_col_name_mapping)

    if step3_na_values:
        df_output = df_output.na.fill(step3_na_values)

    if step4_filter_expr:
        df_output = df_output.filter(step4_filter_expr)

    if step5_result_col_list:
        df_output = df_output.select(*step5_result_col_list)

    if step6_drop_dup:
        df_output = df_output.dropDuplicates()

    if timestamp_name:
        df_output = df_output.withColumn(timestamp_name, F.lit(F.current_timestamp()))

    return df_output
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

بازگشت به بالا


ستون های تجمعی را از DataFrame فرزند ایجاد کنید

ستون های تجمعی را بر اساس داده های DataFrame فرزند ایجاد کنید و DataFrame به روز شده والدین را برمی گرداند. پیکربندی هر تجمع در شیء دیکشنری summary_config تعریف شده است.

def df_summary(
    parent_df, #DataFrame
    child_df, #DataFrame
    summary_config
):

# Example of summary_config
# summary_config=[{'agg' : {'RevisionNum': 'count'}, 'agg_result_col_name' : 'CountOfRevisions', 'na_fill_value' : 0, 'join_on' = ['Contract', 'RevisionNum']},
#        {'agg' : {'RevisionNum': 'max'}, 'agg_result_col_name' : 'RecentRevNum', 'na_fill_value' : 0, 'join_on' = ['Contract', 'RevisionNum']}]

    if summary_config:
        for summary_item in summary_config:
            agg_col = list(summary_item['agg'])[0]
            agg_fun = summary_item['agg'][agg_col]
            child_df_grouped = child_df.groupBy(*join_on).agg(summary_item['agg']).withColumnRenamed(f"{agg_fun}({agg_col})", summary_item['agg_result_col_name'])
            parent_df = parent_df.join(child_df_grouped, summary_item['agg']['join_on'], 'left')
            na_value = {}
            na_value[summary_item['agg_result_col_name']] = summary_item['na_fill_value']
            parent_df = parent_df.na.fill(na_value)

    return parent_df
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

بازگشت به بالا

نوشته های مشابه

دیدگاهتان را بنویسید

نشانی ایمیل شما منتشر نخواهد شد. بخش‌های موردنیاز علامت‌گذاری شده‌اند *

دکمه بازگشت به بالا