بهینه سازی بروزرسانی میلیون ها ردیف به راحتی

روز دیگر ، ما پرونده ای حاوی ساختمانها با تعداد مسکونی آنها و یک شناسه تطبیق برای ما دریافت کردیم ، که با یک امتیاز که نشان دهنده کیفیت تطبیق است ، وزن دارد. ایده این است که سعی کنید بدانید که چنین و چنین مسکونی در چنین ساختمانی و چنین ساختمانی قرار دارد.
بنابراین از من خواسته شد که این ساختمانها را وارد کنم و آنها را به خانه ها پیوند دهم.
دانستن اینکه buildings
وت fast_housing
جداول از قبل وجود دارد ، هیچ چیز غیرممکن است. ساده و ساده
مجبور شدم دو زمینه اضافه کنم rnb_id
وت rnb_score
(نمره ای که نشان دهنده کیفیت تطبیق است) به buildings
جدول سپس به روزرسانی fast_housing
جدول ، که قبلاً حاوی یک بود building_id
میدان
بوها 1-n
رابطه ساده ، اساسی
fast_housing
مجموعه ای از واحدهای مسکن ملی ، خالی ، اجاره و غیره است. ما احتمالاً همه چیز را نداریم ، اما هنوز هم تمام شده است 10 میلیون ردیفبا 42 ستون، همه تقسیم شده توسط بخش و چندین فهرست در یک پایگاه داده PostgreSQL.
وارد کردن ساختمان ها
وارد کردن ساختمانها واقعاً مشکل نیست. برخی وجود دارد 26 میلیون ردیف در پرونده و حتی روش ساده لوحانه ، ما خیلی خوب عمل می کنیم.
ما جریان می دهیم buildings.jsonl
پرونده (یک فایل JSON که در آن هر خط یک عنصر است) ، آن را در قالب مورد نظر نقشه برداری کنید ، آن را در دسته های 1000 جمع کنید ، آن را ذخیره کنید و آماده رفتن است.
در اصل ، ما انجام خواهیم داد INSERT ON CONFLICT UPDATE
و خوب خواهد بود
import { chunkify, map, tap } from '@zerologementvacant/utils/node';
import { Bar } from 'cli-progress';
import { parse as parseJSONL } from 'jsonlines';
import fs from 'node:fs';
import path from 'node:path';
import { Readable } from 'node:stream';
import { ReadableStream, WritableStream } from 'node:stream/web';
import { BuildingApi } from '~/models/BuildingApi';
import buildingRepository from '~/repositories/buildingRepository';
const CHUNK_SIZE = 1_000;
const TOTAL = 25_958_378;
async function run(): Promise<void> {
const progress = new Bar({
fps: 4,
etaAsynchronousUpdate: true,
etaBuffer: CHUNK_SIZE,
stopOnComplete: true
});
progress.start(TOTAL, 0);
const file = path.join(__dirname, 'buildings.jsonl');
await stream(file)
.pipeThrough(mapper())
.pipeThrough(chunkify({ size: CHUNK_SIZE }))
.pipeThrough(saver())
.pipeThrough(
tap((buildings) => {
progress.increment(buildings.length);
})
)
.pipeTo(reporter());
}
interface Input {
idbat: string;
nlogh: number;
rnb_id: string | null;
rnb_id_score: number;
}
function stream(file: string): ReadableStream<Input> {
const fileStream = fs.createReadStream(file);
return Readable.toWeb(
fileStream.pipe(
parseJSONL({
emitInvalidLines: true
})
)
);
}
function mapper() {
return map<Input, BuildingApi>((building) => ({
id: building.idbat,
housingCount: building.nlogh,
rnbId: building.rnb_id,
rnbIdScore: building.rnb_id_score,
rentHousingCount: 0,
vacantHousingCount: 0
}));
}
function saver() {
return tap<ReadonlyArray<BuildingApi>>(async (buildings) => {
await buildingRepository.saveMany(buildings, {
onConflict: ['id'],
merge: ['housing_count', 'rnb_id', 'rnb_id_score']
});
});
}
function reporter() {
let total = 0;
return new WritableStream<ReadonlyArray<BuildingApi>>({
write(chunk) {
total += chunk.length;
},
close() {
console.log(`Total saved: ${total}`);
}
});
}
run();
زمان اجرای تخمین زده شده: حدود 1 ساعت 02 دقیقه.
تا اینجای کار خیلی خوبه.
به لطف جریان ، هیچ مشکلی در حافظه وجود ندارد. عملکرد صحیح تقریباً خیلی آسان
پیوند ساختمانها به خانه ها
پرونده دیگر است buildings_housing.jsonl
بشر در پشت این نام وحشیانه پیوندهای بین مسکن و ساختمانها ، در هنگام وجود آنها قرار دارد.
39 میلیون خط این بار دیگر شوخی نمی کنم
برای هر مسکن ، باید شناسه ساختمان را اضافه یا اصلاح کنیم. building_id
میدان
من دوباره رویکرد ساده لوحانه را امتحان می کنم ، پس از همه ، اولین بار کار کرد … ما سعی خواهیم کرد جریان پرونده و نتیجه را در fast_housing
جدول
و این جایی است که پیچیده می شود.
شما واقعاً نمی توانید یک توده انجام دهید UPDATE
، از آنجا که هر سطر متفاوت است. بنابراین ما با یک خط به خط آزمایش می کنیم UPDATE
، روش ساده لوح هشدار اسپویلر: بد است.
import { tap } from '@zerologementvacant/utils/node';
import { Bar } from 'cli-progress';
import { Readable } from 'node:stream';
import { ReadableStream, WritableStream } from 'node:stream/web';
import db from '~/infra/database';
import { Housing } from '~/repositories/housingRepository';
const TOTAL = 39_048_568;
const TEMPORARY_TABLE = 'buildings_housing_updates';
async function run(): Promise<void> {
const progress = new Bar({
fps: 4,
etaAsynchronousUpdate: true,
etaBuffer: 1000,
stopOnComplete: true
});
progress.start(TOTAL, 0);
await createTemporaryTableStream()
.pipeThrough(saver())
.pipeThrough(
tap(() => {
progress.increment();
})
)
.pipeTo(reporter());
}
interface Input {
idbat: string;
idlocal: string;
geocode: string;
}
function createTemporaryTableStream(): ReadableStream<Input> {
return Readable.toWeb(db(TEMPORARY_TABLE).select().stream());
}
function saver() {
return tap<Input>(async (input) => {
await Housing()
.where({
geo_code: input.geocode,
local_id: input.idlocal
})
.update({
building_id: input.idbat
});
});
}
function reporter<A>() {
let total = 0;
return new WritableStream<A>({
write() {
total++;
},
close() {
console.log(`Total saved: ${total}`);
}
});
}
run();
زمان اجرای تخمین زده شده: بین 16 تا 25 ساعت …
در اینجا من با استفاده از جدول موقت که از قبل پر کردم آزمایش کردم ، اما با یک پرونده ، همین مورد است. این به همان اندازه کند است
تنبل یکی است saver
عملکرد. یک خط به خط UPDATE
در چنین حجم خیلی کند است.
راه حل: به روزرسانی انبوه از جدول موقت
بنابراین من کمی تحقیق کردم ، و پس از آن به نظر می رسد بهینه کردن UPDATE
از جدول دیگر خیلی بهتر!
زمان اعدام: حدود 3 ساعت!
import { chunkify, tap } from '@zerologementvacant/utils/node';
import { Bar } from 'cli-progress';
import { Readable } from 'node:stream';
import { ReadableStream, WritableStream } from 'node:stream/web';
import db from '~/infra/database';
import { housingTable } from '~/repositories/housingRepository';
const CHUNK_SIZE = 10_000;
const TOTAL = 39_048_568;
const TEMPORARY_TABLE = 'buildings_housing_updates';
async function run(): Promise<void> {
const progress = new Bar({
fps: 4,
etaAsynchronousUpdate: true,
etaBuffer: 1000,
stopOnComplete: true
});
progress.start(TOTAL, 0);
await createTemporaryTableStream()
.pipeThrough(
chunkify({
size: CHUNK_SIZE
})
)
.pipeThrough(saver())
.pipeThrough(
tap((chunk) => {
progress.increment(chunk.length);
})
)
.pipeTo(reporter());
}
interface Input {
idbat: string;
idlocal: string;
geocode: string;
}
function createTemporaryTableStream(): ReadableStream<Input> {
return Readable.toWeb(db(TEMPORARY_TABLE).select().stream());
}
function saver() {
return tap<ReadonlyArray<Input>>(async (chunk) => {
await db(housingTable)
.update({
building_id: db.ref(`${TEMPORARY_TABLE}.idbat`)
})
.updateFrom(TEMPORARY_TABLE)
.where(`${housingTable}.local_id`, db.ref(`${TEMPORARY_TABLE}.idlocal`))
.where(`${housingTable}.geo_code`, db.ref(`${TEMPORARY_TABLE}.geocode`))
.whereIn(
[`${TEMPORARY_TABLE}.geocode`, `${TEMPORARY_TABLE}.idlocal`],
chunk.map((building) => [building.geocode, building.idlocal])
);
});
}
function reporter<A>() {
let total = 0;
return new WritableStream<ReadonlyArray<A>>({
write(chunk) {
total += chunk.length;
},
close() {
console.log(`Total saved: ${total}`);
}
});
}
run();
بازنویسی saver
عملکرد به طوری که به روزرسانی های فله از یک جدول موقت ، زمان اجرای را به شدت کاهش داده است.
function saver() {
return tap<ReadonlyArray<Input>>(async (chunk) => {
await db(housingTable)
.update({
building_id: db.ref(`${TEMPORARY_TABLE}.idbat`)
})
.updateFrom(TEMPORARY_TABLE)
.where(`${housingTable}.local_id`, db.ref(`${TEMPORARY_TABLE}.idlocal`))
.where(`${housingTable}.geo_code`, db.ref(`${TEMPORARY_TABLE}.geocode`))
.whereIn(
[`${TEMPORARY_TABLE}.geocode`, `${TEMPORARY_TABLE}.idlocal`],
chunk.map((building) => [building.geocode, building.idlocal])
);
});
}
به روزرسانی از جدول دیگر اجازه می دهد تا ردیف ها در تکه ها پردازش شوند ، در این مورد 1000 در 1000 تعداد پرس و جوها، و به طور بالقوه Postgres را قادر می سازد تا یک برنامه اجرای بهتر را انتخاب کنند ، دیسک I/O را کاهش دهند ، از شاخص ها و غیره بهتر استفاده کنند.
اگر از راه حل های دیگر برای بهینه سازی اطلاع دارید UPDATEs
در مورد حجم زیادی از داده ها ، بیایید صحبت کنیم!
با تشکر
عکس بنر Janko Ferlič از Unsplash