برنامه نویسی

جمع کردن یک پارچه با داده های مخازن اسناد سازمانی برای AI تولیدی

به منظور استفاده از AI مولد ، داده های OneDrive به Milvus را مصرف می کند.

شرح تصویر
تصویر حسن نیت ارائه می دهد milvus.io

مقدمه و انگیزه

در یک پروژه اخیر ، ما به مشتری خود کمک می کنیم تا یک پایگاه داده بردار RAG (Milvus در مورد ما) را با مصرف محتوا از مخازن SharePoint/OneDrive شرکت جمع کند ، که سپس از LLM متناسب با بهترین نیاز مشتری در پلت فرم Watsonx.ai استفاده می کند ، به عنوان مثال یک مدل گرایت.

در این مقاله کدی که من آماده می کنم و به صورت محلی در رایانه خود آزمایش می کنم. این کد اقتباس خواهد شد تا در پلتفرم مشتری استفاده شود ، اما طبق معمول دوست دارم از قبل تحقیقات خود را انجام دهم … (من کمی گیک هستم که حدس می زنم).

آماده سازی Milvus محلی

در مقاله قبلی ، من در مورد مشکلاتی که در تنظیم یک محیط Milvus روی لپ تاپ خود با آن روبرو شدم ، نوشتم ، خوب حدس می زنم که در نهایت راهی بسیار ساده و ساده برای انجام این کار پیدا کردم. آخرت مراحل است.

برای انجام این کار شما به Docker نیاز دارید ، یا در مورد من از Podman استفاده می کنم.

# fetch the startup script
curl -sfL https://raw.githubusercontent.com/milvus-io/milvus/master/scripts/standalone_embed.sh -o standalone_embed.sh
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

مرحله بعدی و تمام کاری که شما باید انجام دهید این است که فیلمنامه را اجرا کنید ، اگرچه من یک نام مستعار را در لپ تاپ خود ساخته ام که باعث می شود alias docker = “podman”، برای ساده سازی در این مورد ، من تمام دستورات “sudo docker” را به “sudo podman” جایگزین پرونده بارگیری شده “مستقل_مبد.

# the original call is 'bash standalone_embed.sh start'
bash standalone_podman_embed.sh start
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

و همین است. نمونه Milvus شما باید در حال اجرا باشد. اما ما می خواهیم از آن اطمینان داشته باشیم.

اتصال پایگاه داده Milvus را آزمایش کنید و برنامه اصلی را تهیه کنید

همانطور که برنامه را در پایتون می نویسم ، یک محیط مجازی را تهیه می کنم (بهترین عملی که به همه توصیه می کنم که راه سخت را یاد گرفتم!

python3.11 -m venv sharepoint_milvus
source sharepoint_milvus/bin/activate
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

مرحله بعدی نصب اولین بسته برای کار با Milvus است.

pip install --upgrade pymilvus
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

و سپس آزمایش اتصال به وکتور محلی DB.

from pymilvus import connections

try:
    connections.connect(host="127.0.0.1", port="19530")
    print("Milvus connection successful!")
    connections.disconnect("default")
except Exception as e:
    print(f"Milvus connection failed: {e}")
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

اتصال خوب است ، آزمایش دوم که من با استفاده از کد نمونه از سایت Milvus انجام دادم تا یک پایگاه داده بردار و آزمایش آن را انجام دهم.

from pymilvus import MilvusClient
import numpy as np

client = MilvusClient("./milvus_demo.db")
client.create_collection(
    collection_name="demo_collection",
    dimension=384  # The vectors we will use in this demo has 384 dimensions
)

docs = [
    "Artificial intelligence was founded as an academic discipline in 1956.",
    "Alan Turing was the first person to conduct substantial research in AI.",
    "Born in Maida Vale, London, Turing was raised in southern England.",
]

vectors = [[ np.random.uniform(-1, 1) for _ in range(384) ] for _ in range(len(docs)) ]
data = [ {"id": i, "vector": vectors[i], "text": docs[i], "subject": "history"} for i in range(len(vectors)) ]
res = client.insert(
    collection_name="demo_collection",
    data=data
)

res = client.search(
    collection_name="demo_collection",
    data=[vectors[0]],
    filter="subject == 'history'",
    limit=2,
    output_fields=["text", "subject"],
)
print(res)

res = client.query(
    collection_name="demo_collection",
    filter="subject == 'history'",
    output_fields=["text", "subject"],
)
print(res)

res = client.delete(
    collection_name="demo_collection",
    filter="subject == 'history'",
)
print(res)
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

هنگامی که این کار کرد ، می توانیم به جلو حرکت کنیم.

برنامه اصلی (و چالش ها)

نوشتن برنامه اصلی و آزمایش آن زمان زیادی طول کشید. همچنین یک سلب مسئولیت بزرگ در اینجا وجود دارد. در IBM ، ما از OneDrive به عنوان یکی از مخازن سازمانی خود استفاده می کنیم ، اگرچه می توانیم با سیستم ورود به سیستم داخلی SSO به درایو دسترسی پیدا کنیم ، اما خط مشی امنیتی IBM دقیق از طریق برنامه ای مانند من نوشتم. بنابراین من کد را در مورد آدرس های ایمیل خودم از “Outlook” و درایو خودم آزمایش کردم تا مطمئن شوم که می تواند کار کند (من همچنین در مورد فعالیت های مشکوک در حساب خود از طریق مایکروسافت ایمیل دریافت کردم.

با گفتن این حرف ، بیایید به درخواست من برگردیم. اول و طبق معمول ، من یک فایل “.env” را با اطلاعات اتصال/پیکربندی خود ایجاد کردم.

export SHAREPOINT_USERNAME="xxx@yyy.com"
export SHAREPOINT_PASSWORD="PASSWORD"
export SHAREPOINT_SITE_URL="THE_MAIN_URL"
export SHAREPOINT_DOCUMENT_LIBRARY="THE_MAIN_DOCUMENT_FOLDER"
export SHAREPOINT_SUBFOLDER_NAME="THE_TARGET_SUBFOLDER"

' specific to my environment
export TOKENIZERS_PARALLELISM="false" 
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

من این پرونده را برای استفاده از آن “منبع” می کنم.

source .env
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

سپس کد می آید. ابتدا نگاهی به وابستگی های مورد نیاز برنامه می اندازیم.

import os
from office365.runtime.auth.authentication_context import AuthenticationContext
from office365.sharepoint.client_context import ClientContext
from office365.sharepoint.files.file import File
from pymilvus import connections, Collection, CollectionSchema, FieldSchema, DataType, utility
import tiktoken
import hashlib
import time
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

پس از آن ، برخی از موارد اشکال زدایی به منظور اطمینان از اینکه برنامه دارای اطلاعات مناسب/اعتبار است.

# Configuration
SHAREPOINT_USERNAME = os.environ.get("SHAREPOINT_USERNAME")
SHAREPOINT_PASSWORD = os.environ.get("SHAREPOINT_PASSWORD")
SHAREPOINT_SITE_URL = os.environ.get("SHAREPOINT_SITE_URL")
SHAREPOINT_DOCUMENT_LIBRARY = os.environ.get("SHAREPOINT_DOCUMENT_LIBRARY")
SUBFOLDER_NAME = os.environ.get("SHAREPOINT_SUBFOLDER_NAME").strip()  # Trim spaces

print(f"SHAREPOINT_USERNAME: {os.environ.get('SHAREPOINT_USERNAME')}")
print(f"SHAREPOINT_PASSWORD: {os.environ.get('SHAREPOINT_PASSWORD')}")
print(f"SHAREPOINT_SITE_URL: {os.environ.get('SHAREPOINT_SITE_URL')}")
print(f"SHAREPOINT_DOCUMENT_LIBRARY: {os.environ.get('SHAREPOINT_DOCUMENT_LIBRARY')}")
print(f"SHAREPOINT_SUBFOLDER_NAME: {os.environ.get('SHAREPOINT_SUBFOLDER_NAME')}")

print(f"Complete URL: "+ SHAREPOINT_SITE_URL+ "https://dev.to/" + SHAREPOINT_DOCUMENT_LIBRARY + "https://dev.to/" + SUBFOLDER_NAME)
C_URL = SHAREPOINT_SITE_URL+ "https://dev.to/" + SHAREPOINT_DOCUMENT_LIBRARY + "https://dev.to/" + SUBFOLDER_NAME
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

Milvus من به صورت محلی در حال اجرا است ، بنابراین تنظیمات سخت است (خوب ، زشت است ، من اعتراف می کنم).

MILVUS_HOST = "localhost"
MILVUS_PORT = "19530"
MILVUS_COLLECTION_NAME = "document_chunks"
CHUNK_SIZE = 512
CHUNK_OVERLAP = 100
EMBEDDING_DIMENSION = 1536
ENCODING_NAME = "cl100k_base"
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

و با اینجا کد کامل است.

import os
from office365.runtime.auth.authentication_context import AuthenticationContext
from office365.sharepoint.client_context import ClientContext
from office365.sharepoint.files.file import File
from pymilvus import connections, Collection, CollectionSchema, FieldSchema, DataType, utility
import tiktoken
import hashlib
import time

# Configuration
SHAREPOINT_USERNAME = os.environ.get("SHAREPOINT_USERNAME")
SHAREPOINT_PASSWORD = os.environ.get("SHAREPOINT_PASSWORD")
SHAREPOINT_SITE_URL = os.environ.get("SHAREPOINT_SITE_URL")
SHAREPOINT_DOCUMENT_LIBRARY = os.environ.get("SHAREPOINT_DOCUMENT_LIBRARY")
SUBFOLDER_NAME = os.environ.get("SHAREPOINT_SUBFOLDER_NAME").strip()  # Trim spaces

print(f"SHAREPOINT_USERNAME: {os.environ.get('SHAREPOINT_USERNAME')}")
print(f"SHAREPOINT_PASSWORD: {os.environ.get('SHAREPOINT_PASSWORD')}")
print(f"SHAREPOINT_SITE_URL: {os.environ.get('SHAREPOINT_SITE_URL')}")
print(f"SHAREPOINT_DOCUMENT_LIBRARY: {os.environ.get('SHAREPOINT_DOCUMENT_LIBRARY')}")
print(f"SHAREPOINT_SUBFOLDER_NAME: {os.environ.get('SHAREPOINT_SUBFOLDER_NAME')}")

print(f"Complete URL: "+ SHAREPOINT_SITE_URL+ "https://dev.to/" + SHAREPOINT_DOCUMENT_LIBRARY + "https://dev.to/" + SUBFOLDER_NAME)
C_URL = SHAREPOINT_SITE_URL+ "https://dev.to/" + SHAREPOINT_DOCUMENT_LIBRARY + "https://dev.to/" + SUBFOLDER_NAME

MILVUS_HOST = "localhost"
MILVUS_PORT = "19530"
MILVUS_COLLECTION_NAME = "document_chunks"
CHUNK_SIZE = 512
CHUNK_OVERLAP = 100
EMBEDDING_DIMENSION = 1536
ENCODING_NAME = "cl100k_base"

def authenticate_sharepoint(site_url, username, password):
    """Authenticates with SharePoint."""
    ctx_auth = AuthenticationContext(site_url)
    if ctx_auth.acquire_token_for_user(username, password):
        ctx = ClientContext(site_url, ctx_auth)
        print("SharePoint authentication successful.")
        return ctx
    else:
        raise Exception(f"Failed to authenticate: {ctx_auth.get_last_error()}")

def download_file(ctx, server_relative_url):
    """Downloads a file from SharePoint."""
    file = File.open_binary(ctx, server_relative_url)
    ctx.execute_query()
    print(f"Downloaded file: {server_relative_url}")
    return file.content

def get_sharepoint_files(ctx, library_name, subfolder_path=None):
    """
    Fetches files from a SharePoint doc library or a specific subfolder.
    """
    try:
        list_obj = ctx.web.lists.get_by_title(library_name)
        ctx.load(list_obj.root_folder)
        ctx.execute_query()

        if subfolder_path:
            full_folder_url = f"{list_obj.root_folder.serverRelativeUrl}/{subfolder_path}"
            print(f"full_folder_url: {full_folder_url}")
            try:
                folder = ctx.web.get_folder_by_server_relative_url(full_folder_url)
                ctx.load(folder)
                ctx.execute_query()
                print(f"Folder object: {folder}")
                print(f"Folder.serverRelativeUrl: {folder.serverRelativeUrl}")
                print(f"Folder.exists: {folder.exists}")
            except Exception as folder_error:
                print(f"Error during get_folder_by_server_relative_url(): {folder_error}")
                return []

            ctx.load(folder)
            ctx.execute_query()
            try:
                items = folder.files.get_all().execute_query()
            except Exception as query_error:
                print(f"Error during folder.files.get_all().execute_query(): {query_error}")
                return []

        else:
            items = list_obj.items.get_all().execute_query()

        files = []
        if items:
            if isinstance(items, list):
                for i, item in enumerate(items):
                    try:
                        print(f"Item {i}: {item}")
                        if hasattr(item, 'properties'):
                            print(f"Item {i} properties: {item.properties}")
                        else:
                            print(f"Item {i} has no properties.")
                        if hasattr(item, 'serverRelativeUrl'):
                            print(f"Item {i} serverRelativeUrl: {item.serverRelativeUrl}")
                        else:
                            print(f"Item {i} has no serverRelativeUrl.")

                        if hasattr(item, 'properties') and 'ServerRelativeUrl' in item.properties:
                            files.append(item.properties["ServerRelativeUrl"])
                        else:
                            print(f"Item {i} does not have properties or ServerRelativeUrl: {item}")
                    except Exception as item_error:
                        print(f"Error processing item {i}: {item_error}")
            else:
                print("Items is not a list.")
        else:
            print("Items list is empty.")

        print(f"Found {len(files)} files in '{library_name}'. Subfolder: {subfolder_path}")
        print(f"Files retrieved: {files}")
        return files

    except Exception as e:
        print(f"Error in get_sharepoint_files: {e}")
        return []

def chunk_text(text, chunk_size, chunk_overlap, encoding_name):
    """Splits text into chunks with overlap."""
    encoder = tiktoken.get_encoding(encoding_name)
    tokens = encoder.encode(text)
    chunks = []
    for i in range(0, len(tokens), chunk_size - chunk_overlap):
        chunk = tokens[i : i + chunk_size]
        chunks.append(encoder.decode(chunk))
    print(f"Text chunked into {len(chunks)} chunks.")
    return chunks

def create_milvus_collection(collection_name, dimension):
    """Creates a Milvus collection."""
    connections.connect(host=MILVUS_HOST, port=MILVUS_PORT)
    if utility.has_collection(collection_name):
        utility.drop_collection(collection_name)

    fields = [
        FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
        FieldSchema(name="document_id", dtype=DataType.VARCHAR, max_length=256),
        FieldSchema(name="chunk_id", dtype=DataType.INT64),
        FieldSchema(name="chunk_text", dtype=DataType.VARCHAR, max_length=65535),
        FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=dimension),
    ]
    schema = CollectionSchema(fields, "Document Chunks")
    collection = Collection(collection_name, schema)
    index_params = {
        "metric_type": "IP",
        "index_type": "HNSW",
        "params": {"M": 8, "efConstruction": 64},
    }
    collection.create_index(field_name="embedding", index_params=index_params)
    print(f"Milvus collection '{collection_name}' created.")
    return collection

def insert_chunks_to_milvus(collection, document_id, chunks, embeddings):
    """Inserts into Milvus."""
    print(f"Chunks length: {len(chunks)}")
    print(f"Embeddings length: {len(embeddings)}")
    data = [
        [document_id] * len(chunks),
        list(range(len(chunks))),
        chunks,
        embeddings,
    ]
    collection.insert(data)
    print(f"Inserted {len(chunks)} chunks into Milvus.")

def generate_embeddings(chunks):
    """Generates embeddings for chunks """
    embeddings = [[0.1] * EMBEDDING_DIMENSION for _ in chunks]
    print(f"Generated embeddings for {len(chunks)} chunks.")
    print(f"Embeddings: {embeddings}")
    return embeddings

def process_sharepoint_documents():
    """Main process"""
    try:
        start_time = time.time()
        ctx = authenticate_sharepoint(SHAREPOINT_SITE_URL, SHAREPOINT_USERNAME, SHAREPOINT_PASSWORD)
        SUBFOLDER_NAME = os.environ.get("SHAREPOINT_SUBFOLDER_NAME").strip()
        print(f"SUBFOLDER_NAME (process_sharepoint_documents): {SUBFOLDER_NAME}")  
        print(f"SHAREPOINT_DOCUMENT_LIBRARY (process_sharepoint_documents): {SHAREPOINT_DOCUMENT_LIBRARY}")
        if SUBFOLDER_NAME:
            files = get_sharepoint_files(ctx, SHAREPOINT_DOCUMENT_LIBRARY, SUBFOLDER_NAME)
        else:
            files = get_sharepoint_files(ctx, SHAREPOINT_DOCUMENT_LIBRARY)

        if not files:
            print("No files found in the specified SharePoint location.")
            return

        collection = create_milvus_collection(MILVUS_COLLECTION_NAME, EMBEDDING_DIMENSION)

        for file_ref in files:
            file_start_time = time.time()
            try:
                print(f"Processing file: {file_ref}")
                file_content = download_file(ctx, file_ref).decode("utf-8", errors="ignore")
                document_id = hashlib.md5(file_ref.encode()).hexdigest()

                chunks = chunk_text(file_content, CHUNK_SIZE, CHUNK_OVERLAP, ENCODING_NAME)
                embeddings = generate_embeddings(chunks)

                if not embeddings:
                    print(f"No embeddings were generated for file: {file_ref}")
                    continue

                insert_chunks_to_milvus(collection, document_id, chunks, embeddings)
                print(f"File processed in {time.time() - file_start_time:.2f} seconds.")

            except Exception as file_error:
                print(f"Error processing file {file_ref}: {file_error}")
        collection.flush()
        print("SharePoint documents processed and indexed.")
        print(f"Total processing time: {time.time() - start_time:.2f} seconds.")

    except Exception as e:
        print(f"An error occurred: {e}")

if __name__ == "__main__":
    process_sharepoint_documents()
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

خوب ، این کد است. این تکه ها با استفاده از بهترین Fit LLM در “استفاده می شوند”watsonx.ai“بستر استفاده از مشتری.

پایان

همانطور که در بالا ذکر شد ، این یک برنامه آزمایشی به منظور آماده سازی خودم برای پروژه ای است که با مشتری خود تحقق می بخشیم. مثل همیشه ، من می خواهم دستانم را کثیف کنم و از آنچه ما به عنوان راه حل برای مشتریان و شرکای تجاری خود پیشنهاد می کنیم ، اطمینان داشته باشم ، و بالاتر از همه ، من این را دوست دارم.

با تشکر از خواندن

لینک

نوشته های مشابه

دیدگاهتان را بنویسید

نشانی ایمیل شما منتشر نخواهد شد. بخش‌های موردنیاز علامت‌گذاری شده‌اند *

دکمه بازگشت به بالا