پردازش پیام های ضبط شده EventHub در فایل های Avro با استفاده از Databricks

مرحله 1: Mount Azure Blob Storage
اولین گام نصب Azure Blob Storage است که حاوی فایلهای Avro ما است. Azure Blob Storage یک پلت فرم ذخیره سازی اشیا مقیاس پذیر و ایمن است. سیستم فایل Databricks (DBFS) به شما این امکان را می دهد که ذخیره سازی blob را به گونه ای نصب کنید که مانند یک سیستم فایل محلی قابل دسترسی باشد. در زیر دستور mount Blob Storage آمده است:
dbutils.fs.mount(
source = "wasbs://{container}@{storage_name}.blob.core.windows.net",
mount_point = "/mnt/iotdata",
extra_configs = {"fs.azure.account.key.{storage_name}.blob.core.windows.net":"AccessKey"})
{container}، {storage_name} و AccessKey را به ترتیب با نام کانتینر، نام حساب ذخیرهسازی و کلید دسترسی جایگزین کنید.
مرحله 2: همه فایل های Avro را بارگیری کنید
پس از نصب حافظه، میتوانیم فایلهای Avro را با استفاده از تابع spark.read.format(‘avro’) بارگذاری کنیم. Avro یک سیستم سریال سازی داده است که امکان تبادل داده های بزرگ بین برنامه های نوشته شده به زبان های مختلف را فراهم می کند.
تعداد * در مسیر نشاندهنده عمق زیرشاخههایی است که در EventHub در قرارداد نامگذاری ضبط تنظیم کردهاید. هر * یک سطح دایرکتوری را نشان می دهد.
df=spark.read.format('avro').load("/mnt/iotdata/*/*/*/*/*/*/*/*/*.avro")
display(df)
این دستور همه فایلهای Avro را از ذخیرهسازی لکههای نصب شده که توسط عملکرد اصلی EventHub گرفته شدهاند میخواند و DataFrame را نمایش میدهد.
مرحله 3: پیام باینری Payload (بدن) را به رشته تبدیل کنید
فایل های Avro حاوی داده های باینری هستند که برای پردازش بیشتر باید به رشته ای تبدیل شوند. شما می توانید داده های باینری را با استفاده از تابع Cast در PySpark به فرمت رشته ای تبدیل کنید.
body_df=df.withColumn("Body",df.Body.cast("string")).select("Body")
display(body_df)
مرحله 4: نقشه رشته را به طرحواره JSON
پس از تبدیل داده ها به فرمت رشته ای، می توانیم آن را با استفاده از توابع StructType و from_json PySpark به یک طرح JSON نگاشت کنیم.
ابتدا طرحی را برای داده ها تعریف کنید:
from pyspark.sql.types import *
data_schema = StructType(
[
StructField("a",DoubleType(),True),
StructField("b", DoubleType(), True)
]
)
json_schema = StructType(
[
StructField("factoryId", LongType(), True),
StructField("timeStamp", TimestampType(), True),
StructField("data", data_schema, True)
]
)
سپس داده های رشته را طبق طرح به فرمت JSON تبدیل کنید:
from pyspark.sql.functions import from_json, col
json_df = body_df.withColumn("Body", from_json(col("Body"), json_schema))
display(json_df)
مرحله 5: نقشه JSON به قالب جدول
پس از تبدیل داده ها به فرمت JSON، می توانیم داده ها را صاف کرده و به فرمت جدول تبدیل کنیم.
final_df=json_df.select(
col("Body.data.a"),
col("Body.data.b"),
col("Body.factoryId"),
col("Body.timestamp"))
مرحله 6: آن را با استفاده از Spark SQL در Silver Layer ذخیره کنید
در نهایت، داده های پردازش شده خود را با استفاده از Spark SQL ذخیره می کنیم. در اینجا، ما آن را در یک لایه نقره ای ذخیره می کنیم، که نمایشی تمیز و پردازش شده از داده های خام ما است.
ابتدا یک پایگاه داده برای داده های پردازش شده خود ایجاد کنید:
%sql
CREATE DATABASE iotdata
LOCATION "/mnt/iotdata"
سپس، DataFrame پردازش شده را به عنوان جدول در پایگاه داده تازه ایجاد شده ذخیره کنید:
final_df.write.saveAsTable("iotdata.tablename")
و شما آن را دارید! شما با استفاده از Databricks فایل های Avro را که از Azure Event Hubs گرفته شده اند، با موفقیت پردازش کرده اید. شما دادههای باینری را به یک فرمت قابل خواندن تبدیل کردهاید، آنها را به یک طرح نقشهبرداری کردهاید، آنها را به یک قالب جدولی لایه نقرهای تبدیل کردهاید و با استفاده از Spark SQL ذخیره کردهاید.