برنامه نویسی

بهینه سازی بروزرسانی میلیون ها ردیف به راحتی

روز دیگر ، ما پرونده ای حاوی ساختمانها با تعداد مسکونی آنها و یک شناسه تطبیق برای ما دریافت کردیم ، که با یک امتیاز که نشان دهنده کیفیت تطبیق است ، وزن دارد. ایده این است که سعی کنید بدانید که چنین و چنین مسکونی در چنین ساختمانی و چنین ساختمانی قرار دارد.

بنابراین از من خواسته شد که این ساختمانها را وارد کنم و آنها را به خانه ها پیوند دهم.

دانستن اینکه buildings وت fast_housing جداول از قبل وجود دارد ، هیچ چیز غیرممکن است. ساده و ساده

مجبور شدم دو زمینه اضافه کنم rnb_id وت rnb_score (نمره ای که نشان دهنده کیفیت تطبیق است) به buildings جدول سپس به روزرسانی fast_housing جدول ، که قبلاً حاوی یک بود building_id میدان

بوها 1-n رابطه ساده ، اساسی

fast_housing مجموعه ای از واحدهای مسکن ملی ، خالی ، اجاره و غیره است. ما احتمالاً همه چیز را نداریم ، اما هنوز هم تمام شده است 10 میلیون ردیفبا 42 ستون، همه تقسیم شده توسط بخش و چندین فهرست در یک پایگاه داده PostgreSQL.

وارد کردن ساختمان ها

وارد کردن ساختمانها واقعاً مشکل نیست. برخی وجود دارد 26 میلیون ردیف در پرونده و حتی روش ساده لوحانه ، ما خیلی خوب عمل می کنیم.

ما جریان می دهیم buildings.jsonl پرونده (یک فایل JSON که در آن هر خط یک عنصر است) ، آن را در قالب مورد نظر نقشه برداری کنید ، آن را در دسته های 1000 جمع کنید ، آن را ذخیره کنید و آماده رفتن است.

در اصل ، ما انجام خواهیم داد INSERT ON CONFLICT UPDATE و خوب خواهد بود

import { chunkify, map, tap } from '@zerologementvacant/utils/node';
import { Bar } from 'cli-progress';
import { parse as parseJSONL } from 'jsonlines';
import fs from 'node:fs';
import path from 'node:path';
import { Readable } from 'node:stream';
import { ReadableStream, WritableStream } from 'node:stream/web';

import { BuildingApi } from '~/models/BuildingApi';
import buildingRepository from '~/repositories/buildingRepository';

const CHUNK_SIZE = 1_000;
const TOTAL = 25_958_378;

async function run(): Promise<void> {
  const progress = new Bar({
    fps: 4,
    etaAsynchronousUpdate: true,
    etaBuffer: CHUNK_SIZE,
    stopOnComplete: true
  });
  progress.start(TOTAL, 0);

  const file = path.join(__dirname, 'buildings.jsonl');
  await stream(file)
    .pipeThrough(mapper())
    .pipeThrough(chunkify({ size: CHUNK_SIZE }))
    .pipeThrough(saver())
    .pipeThrough(
      tap((buildings) => {
        progress.increment(buildings.length);
      })
    )
    .pipeTo(reporter());
}

interface Input {
  idbat: string;
  nlogh: number;
  rnb_id: string | null;
  rnb_id_score: number;
}

function stream(file: string): ReadableStream<Input> {
  const fileStream = fs.createReadStream(file);
  return Readable.toWeb(
    fileStream.pipe(
      parseJSONL({
        emitInvalidLines: true
      })
    )
  );
}

function mapper() {
  return map<Input, BuildingApi>((building) => ({
    id: building.idbat,
    housingCount: building.nlogh,
    rnbId: building.rnb_id,
    rnbIdScore: building.rnb_id_score,
    rentHousingCount: 0,
    vacantHousingCount: 0
  }));
}

function saver() {
  return tap<ReadonlyArray<BuildingApi>>(async (buildings) => {
    await buildingRepository.saveMany(buildings, {
      onConflict: ['id'],
      merge: ['housing_count', 'rnb_id', 'rnb_id_score']
    });
  });
}

function reporter() {
  let total = 0;

  return new WritableStream<ReadonlyArray<BuildingApi>>({
    write(chunk) {
      total += chunk.length;
    },
    close() {
      console.log(`Total saved: ${total}`);
    }
  });
}

run();
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

زمان اجرای تخمین زده شده: حدود 1 ساعت 02 دقیقه.

تا اینجای کار خیلی خوبه.

به لطف جریان ، هیچ مشکلی در حافظه وجود ندارد. عملکرد صحیح تقریباً خیلی آسان


پیوند ساختمانها به خانه ها

پرونده دیگر است buildings_housing.jsonlبشر در پشت این نام وحشیانه پیوندهای بین مسکن و ساختمانها ، در هنگام وجود آنها قرار دارد.

39 میلیون خط این بار دیگر شوخی نمی کنم

برای هر مسکن ، باید شناسه ساختمان را اضافه یا اصلاح کنیم. building_id میدان

من دوباره رویکرد ساده لوحانه را امتحان می کنم ، پس از همه ، اولین بار کار کرد … ما سعی خواهیم کرد جریان پرونده و نتیجه را در fast_housing جدول

و این جایی است که پیچیده می شود.

شما واقعاً نمی توانید یک توده انجام دهید UPDATE، از آنجا که هر سطر متفاوت است. بنابراین ما با یک خط به خط آزمایش می کنیم UPDATE، روش ساده لوح هشدار اسپویلر: بد است.

import { tap } from '@zerologementvacant/utils/node';
import { Bar } from 'cli-progress';
import { Readable } from 'node:stream';
import { ReadableStream, WritableStream } from 'node:stream/web';

import db from '~/infra/database';
import { Housing } from '~/repositories/housingRepository';

const TOTAL = 39_048_568;
const TEMPORARY_TABLE = 'buildings_housing_updates';

async function run(): Promise<void> {
  const progress = new Bar({
    fps: 4,
    etaAsynchronousUpdate: true,
    etaBuffer: 1000,
    stopOnComplete: true
  });
  progress.start(TOTAL, 0);

  await createTemporaryTableStream()
    .pipeThrough(saver())
    .pipeThrough(
      tap(() => {
        progress.increment();
      })
    )
    .pipeTo(reporter());
}

interface Input {
  idbat: string;
  idlocal: string;
  geocode: string;
}

function createTemporaryTableStream(): ReadableStream<Input> {
  return Readable.toWeb(db(TEMPORARY_TABLE).select().stream());
}

function saver() {
  return tap<Input>(async (input) => {
    await Housing()
      .where({
        geo_code: input.geocode,
        local_id: input.idlocal
      })
      .update({
        building_id: input.idbat
      });
  });
}

function reporter<A>() {
  let total = 0;

  return new WritableStream<A>({
    write() {
      total++;
    },
    close() {
      console.log(`Total saved: ${total}`);
    }
  });
}

run();  
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

زمان اجرای تخمین زده شده: بین 16 تا 25 ساعت …

در اینجا من با استفاده از جدول موقت که از قبل پر کردم آزمایش کردم ، اما با یک پرونده ، همین مورد است. این به همان اندازه کند است

تنبل یکی است saver عملکرد. یک خط به خط UPDATE در چنین حجم خیلی کند است.


راه حل: به روزرسانی انبوه از جدول موقت

بنابراین من کمی تحقیق کردم ، و پس از آن به نظر می رسد بهینه کردن UPDATE از جدول دیگر خیلی بهتر!

زمان اعدام: حدود 3 ساعت!

import { chunkify, tap } from '@zerologementvacant/utils/node';
import { Bar } from 'cli-progress';
import { Readable } from 'node:stream';
import { ReadableStream, WritableStream } from 'node:stream/web';

import db from '~/infra/database';
import { housingTable } from '~/repositories/housingRepository';

const CHUNK_SIZE = 10_000;
const TOTAL = 39_048_568;
const TEMPORARY_TABLE = 'buildings_housing_updates';

async function run(): Promise<void> {
  const progress = new Bar({
    fps: 4,
    etaAsynchronousUpdate: true,
    etaBuffer: 1000,
    stopOnComplete: true
  });
  progress.start(TOTAL, 0);

  await createTemporaryTableStream()
    .pipeThrough(
      chunkify({
        size: CHUNK_SIZE
      })
    )
    .pipeThrough(saver())
    .pipeThrough(
      tap((chunk) => {
        progress.increment(chunk.length);
      })
    )
    .pipeTo(reporter());
}

interface Input {
  idbat: string;
  idlocal: string;
  geocode: string;
}

function createTemporaryTableStream(): ReadableStream<Input> {
  return Readable.toWeb(db(TEMPORARY_TABLE).select().stream());
}

function saver() {
  return tap<ReadonlyArray<Input>>(async (chunk) => {
    await db(housingTable)
      .update({
        building_id: db.ref(`${TEMPORARY_TABLE}.idbat`)
      })
      .updateFrom(TEMPORARY_TABLE)
      .where(`${housingTable}.local_id`, db.ref(`${TEMPORARY_TABLE}.idlocal`))
      .where(`${housingTable}.geo_code`, db.ref(`${TEMPORARY_TABLE}.geocode`))
      .whereIn(
        [`${TEMPORARY_TABLE}.geocode`, `${TEMPORARY_TABLE}.idlocal`],
        chunk.map((building) => [building.geocode, building.idlocal])
      );
  });
}

function reporter<A>() {
  let total = 0;

  return new WritableStream<ReadonlyArray<A>>({
    write(chunk) {
      total += chunk.length;
    },
    close() {
      console.log(`Total saved: ${total}`);
    }
  });
}

run();
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

بازنویسی saver عملکرد به طوری که به روزرسانی های فله از یک جدول موقت ، زمان اجرای را به شدت کاهش داده است.

function saver() {
  return tap<ReadonlyArray<Input>>(async (chunk) => {
    await db(housingTable)
      .update({
        building_id: db.ref(`${TEMPORARY_TABLE}.idbat`)
      })
      .updateFrom(TEMPORARY_TABLE)
      .where(`${housingTable}.local_id`, db.ref(`${TEMPORARY_TABLE}.idlocal`))
      .where(`${housingTable}.geo_code`, db.ref(`${TEMPORARY_TABLE}.geocode`))
      .whereIn(
        [`${TEMPORARY_TABLE}.geocode`, `${TEMPORARY_TABLE}.idlocal`],
        chunk.map((building) => [building.geocode, building.idlocal])
      );
  });
}
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

به روزرسانی از جدول دیگر اجازه می دهد تا ردیف ها در تکه ها پردازش شوند ، در این مورد 1000 در 1000 تعداد پرس و جوها، و به طور بالقوه Postgres را قادر می سازد تا یک برنامه اجرای بهتر را انتخاب کنند ، دیسک I/O را کاهش دهند ، از شاخص ها و غیره بهتر استفاده کنند.

اگر از راه حل های دیگر برای بهینه سازی اطلاع دارید UPDATEs در مورد حجم زیادی از داده ها ، بیایید صحبت کنیم!

با تشکر


عکس بنر 🇸🇮 Janko Ferlič از Unsplash

نوشته های مشابه

دیدگاهتان را بنویسید

نشانی ایمیل شما منتشر نخواهد شد. بخش‌های موردنیاز علامت‌گذاری شده‌اند *

دکمه بازگشت به بالا