برنامه نویسی

ساختن یک دریاچه داده مقیاس پذیر برای تجزیه و تحلیل ورزشی با AWS

دریاچه داده های تجزیه و تحلیل ورزشی

این پروژه یک دریاچه داده را برای ذخیره و پردازش داده های ورزشی NBA با استفاده از خدمات AWS مانند S3 ، AWS Glue و آتنا تنظیم می کند. از API SportsData.io برای واکشی داده های پخش کننده NBA و پردازش آن برای اهداف تجزیه و تحلیل استفاده می کند. دو اسکریپت اصلی در پروژه وجود دارد:

  1. setup_nba_data_lake.py: این اسکریپت با ایجاد منابع AWS لازم و واکشی داده های NBA از API SportsData.io ، دریاچه داده را تنظیم می کند.
  2. حذف: این اسکریپت منابع ایجاد شده در طول تنظیم را حذف می کند ، از جمله سطل S3 ، پایگاه داده چسب و نتایج پرس و جو آتنا.

نمای کلی از خدمات AWS مورد استفاده

  • سطل S3: برای ذخیره داده ها در قالب های مختلف (پرونده های JSON) استفاده می شود.
  • چسب AWS: یک سرویس ادغام بدون سرور برای فهرست کردن داده های ذخیره شده در S3 و بارگذاری آن در آتنا.
  • آمازون آتنا: اجازه می دهد تا پرس و جو از داده های ذخیره شده در S3 با استفاده از نمایش داده های SQL.

تفسیر پروژه

این پروژه به منظور خودکار سازی تنظیم یک دریاچه داده های تجزیه و تحلیل ورزشی برای داده های NBA طراحی شده است. با استفاده از خدمات AWS ، به کاربران امکان می دهد:

  • داده های پخش کننده NBA را از API SportsData.io جمع آوری کنید.
  • داده ها را در یک سطل S3 ذخیره کنید.
  • کاتالوگ و ساختار داده ها با استفاده از چسب AWS.
  • آتنا را تنظیم کنید تا از داده های ذخیره شده در S3 استفاده کنید.

این رویکرد یک راه حل مقیاس پذیر و مقرون به صرفه برای تجزیه و تحلیل مقادیر زیادی از داده های ورزشی و انجام تجزیه و تحلیل پیشرفته با استفاده از نمایش داده های مبتنی بر SQL ارائه می دهد.


مراحل تنظیم

پیش نیازهای

قبل از اجرای اسکریپت ، اطمینان حاصل کنید که موارد زیر را دارید:

  1. به SportsData.io بروید و یک حساب رایگان ایجاد کنید.
  2. در بالا سمت چپ ، باید “توسعه دهندگان” را ببینید. برای دیدن “منابع API” روی آن شناور شوید.
  3. روی کلیک کنید مقدمه و آزمایش، سپس “آزمایش رایگان SportsDataio API” را انتخاب کرده و اطلاعات را پر کنید. اطمینان حاصل کنید که NBA را برای این آموزش انتخاب کنید.
  4. با موضوع “پورتال توسعه دهنده راه اندازی” یک ایمیل دریافت خواهید کرد. پیوند ارائه شده را دنبال کنید.
  5. به طور پیش فرض ، شما را به بخش NFL می برد. در سمت چپ ، روی کلیک کنید NBAبشر
  6. تا زمانی که ببینید به پایین بروید “جدول رده بندی”بشر
  7. زیر “پارامترهای رشته پرس و جو”مقدار موجود در کادر کشویی کلید API شماست.
  8. این رشته را کپی کنید زیرا نیاز به چسباندن آن در اسکریپت دارید.

IAM نقش/مجوزها:

اطمینان حاصل کنید که کاربر یا نقش اجرای اسکریپت دارای مجوزهای زیر است:

  • S3: s3:CreateBucketبا s3:PutObjectبا s3:DeleteBucketبا s3:ListBucket
  • چسب: glue:CreateDatabaseبا glue:CreateTableبا glue:DeleteDatabaseبا glue:DeleteTable
  • آتنا: athena:StartQueryExecutionبا athena:GetQueryResults

مرحله 1: کنسول Cloudshell را باز کنید

  1. به Aws.Amazon.com بروید و وارد حساب خود شوید.
  2. در سمت راست بالا ، در کنار نوار جستجو ، روی مربع با یک کلیک کنید >_ در داخل برای باز کردن ابر.

پوسته

مرحله 2: ایجاد کنید setup_nba_data_lake.py پرونده

  1. در CLI (رابط خط فرمان) ، نوع:
nano setup_nba_data_lake.py
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

  1. اسکریپت زیر را در پرونده بچسبانید:
import boto3
import json
import time
import requests
from dotenv import load_dotenv
import os

# Load environment variables from .env file
load_dotenv()

# AWS configurations
region = "us-east-1"  # Replace with your preferred AWS region
bucket_name = "Jose-sports-analytics-data-lake"  # Change to a unique S3 bucket name
glue_database_name = "glue_nba_data_lake"
athena_output_location = f"s3://{bucket_name}/athena-results/"

# Sportsdata.io configurations (loaded from .env)
api_key = "SPORTS_DATA_API_KEY"  # Replace with your API key
nba_endpoint = "NBA_ENDPOINT"  # Get NBA endpoint from .env

# Use the hardcoded values from the script
api_key = SPORTS_DATA_API_KEY
nba_endpoint = NBA_ENDPOINT

# Create AWS clients
s3_client = boto3.client("s3", region_name=region)
glue_client = boto3.client("glue", region_name=region)
athena_client = boto3.client("athena", region_name=region)

def create_s3_bucket():
    """Create an S3 bucket for storing sports data."""
    try:
        if region == "us-east-1":
            s3_client.create_bucket(Bucket=bucket_name)
        else:
            s3_client.create_bucket(
                Bucket=bucket_name,
                CreateBucketConfiguration={"LocationConstraint": region},
            )
        print(f"S3 bucket '{bucket_name}' created successfully.")
    except Exception as e:
        print(f"Error creating S3 bucket: {e}")

def create_glue_database():
    """Create a Glue database for the data lake."""
    try:
        glue_client.create_database(
            DatabaseInput={
                "Name": glue_database_name,
                "Description": "Glue database for NBA sports analytics.",
            }
        )
        print(f"Glue database '{glue_database_name}' created successfully.")
    except Exception as e:
        print(f"Error creating Glue database: {e}")

def fetch_nba_data():
    """Fetch NBA player data from sportsdata.io."""
    try:
        headers = {"Ocp-Apim-Subscription-Key": api_key}
        response = requests.get(nba_endpoint, headers=headers)
        response.raise_for_status()  # Raise an error for bad status codes
        print("Fetched NBA data successfully.")
        return response.json()  # Return JSON response
    except Exception as e:
        print(f"Error fetching NBA data: {e}")
        return []

def convert_to_line_delimited_json(data):
    """Convert data to line-delimited JSON format."""
    print("Converting data to line-delimited JSON format...")
    return "\n".join([json.dumps(record) for record in data])

def upload_data_to_s3(data):
    """Upload NBA data to the S3 bucket."""
    try:
        # Convert data to line-delimited JSON
        line_delimited_data = convert_to_line_delimited_json(data)

        # Define S3 object key
        file_key = "raw-data/nba_player_data.jsonl"

        # Upload JSON data to S3
        s3_client.put_object(
            Bucket=bucket_name,
            Key=file_key,
            Body=line_delimited_data
        )
        print(f"Uploaded data to S3: {file_key}")
    except Exception as e:
        print(f"Error uploading data to S3: {e}")

def create_glue_table():
    """Create a Glue table for the data."""
    try:
        glue_client.create_table(
            DatabaseName=glue_database_name,
            TableInput={
                "Name": "nba_players",
                "StorageDescriptor": {
                    "Columns": [
                        {"Name": "PlayerID", "Type": "int"},
                        {"Name": "FirstName", "Type": "string"},
                        {"Name": "LastName", "Type": "string"},
                        {"Name": "Team", "Type": "string"},
                        {"Name": "Position", "Type": "string"},
                        {"Name": "Points", "Type": "int"}
                    ],
                    "Location": f"s3://{bucket_name}/raw-data/",
                    "InputFormat": "org.apache.hadoop.mapred.TextInputFormat",
                    "OutputFormat": "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
                    "SerdeInfo": {
                        "SerializationLibrary": "org.openx.data.jsonserde.JsonSerDe"
                    },
                },
                "TableType": "EXTERNAL_TABLE",
            },
        )
        print(f"Glue table 'nba_players' created successfully.")
    except Exception as e:
        print(f"Error creating Glue table: {e}")

def configure_athena():
    """Set up Athena output location."""
    try:
        athena_client.start_query_execution(
            QueryString="CREATE DATABASE IF NOT EXISTS nba_analytics",
            QueryExecutionContext={"Database": glue_database_name},
            ResultConfiguration={"OutputLocation": athena_output_location},
        )
        print("Athena output location configured successfully.")
    except Exception as e:
        print(f"Error configuring Athena: {e}")

# Main workflow
def main():
    print("Setting up data lake for NBA sports analytics...")
    create_s3_bucket()
    time.sleep(5)  # Ensure bucket creation propagates
    create_glue_database()
    nba_data = fetch_nba_data()
    if nba_data:  # Only proceed if data was fetched successfully
        upload_data_to_s3(nba_data)
    create_glue_table()
    configure_athena()
    print("Data lake setup complete.")

if __name__ == "__main__":
    main()
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

اسکریپت

مرحله 3: اسکریپت Delete.py را ایجاد کنید

import boto3
from botocore.exceptions import ClientError

# Define the names of resources to delete
BUCKET_NAME = "Jose-sports-analytics-data-lake"
GLUE_DATABASE_NAME = "glue_nba_data_lake"

def delete_athena_query_results(bucket_name):
    """Delete Athena query results stored in the specified S3 bucket."""
    s3 = boto3.client("s3")
    try:
        print(f"Deleting Athena query results in bucket: {bucket_name}")
        objects = s3.list_objects_v2(Bucket=bucket_name, Prefix="athena-results/")
        if "Contents" in objects:
            for obj in objects["Contents"]:
                s3.delete_object(Bucket=bucket_name, Key=obj["Key"])
                print(f"Deleted Athena query result: {obj['Key']}")
    except ClientError as e:
        print(f"Error deleting Athena query results in bucket {bucket_name}: {e}")

def delete_s3_bucket(bucket_name):
    """Delete a specific S3 bucket and its contents."""
    s3 = boto3.client("s3")
    try:
        print(f"Deleting bucket: {bucket_name}")
        # Delete all objects in the bucket
        objects = s3.list_objects_v2(Bucket=bucket_name)
        if "Contents" in objects:
            for obj in objects["Contents"]:
                s3.delete_object(Bucket=bucket_name, Key=obj["Key"])
                print(f"Deleted object: {obj['Key']}")
        # Delete the bucket
        s3.delete_bucket(Bucket=bucket_name)
        print(f"Deleted bucket: {bucket_name}")
    except ClientError as e:
        print(f"Error deleting bucket {bucket_name}: {e}")

def delete_glue_resources(database_name):
    """Delete Glue database and associated tables."""
    glue = boto3.client("glue")
    try:
        print(f"Deleting Glue database: {database_name}")
        # Get tables in the database
        tables = glue.get_tables(DatabaseName=database_name)["TableList"]
        for table in tables:
            table_name = table["Name"]
            print(f"Deleting Glue table: {table_name} in database {database_name}")
            glue.delete_table(DatabaseName=database_name, Name=table_name)
        # Delete the database
        glue.delete_database(Name=database_name)
        print(f"Deleted Glue database: {database_name}")
    except ClientError as e:
        print(f"Error deleting Glue resources for database {database_name}: {e}")

def main():
    print("Deleting resources created during data lake setup...")
    # Delete the S3 bucket
    delete_s3_bucket(BUCKET_NAME)
    # Delete Glue resources
    delete_glue_resources(GLUE_DATABASE_NAME)
    # Delete Athena query results
    delete_athena_query_results(BUCKET_NAME)
    print("All specified resources deleted successfully.")

if __name__ == "__main__":
    main()
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

حذف کردن

LS برای دیدن پرونده ها و دایرکتوری هایی که در فهرست فعلی یا فهرست موجود هستند

لس

اجرای اسکریپت ها

python3 setup_nba_data_lake.py
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

فیلمنامه اجرا

NB خطاها را نادیده می گیرد ، به این دلیل بود که من فیلمنامه را دو بار اجرا کردم

به سطل S3 بروید و پرونده JSON را باز کنید تا داده ها را بدست آورید

سطل S3

پرونده json

json

json

برای حذف منابع ، اجرا کنید:

python3 delete.py
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

پایان
این پروژه نحوه راه اندازی یک دریاچه داده را با استفاده از خدمات AWS و ادغام داده ها از API خارجی برای تجزیه و تحلیل ورزشی نشان می دهد.

نوشته های مشابه

دیدگاهتان را بنویسید

نشانی ایمیل شما منتشر نخواهد شد. بخش‌های موردنیاز علامت‌گذاری شده‌اند *

دکمه بازگشت به بالا