برنامه نویسی

موازی سازی و همزمانی 102: جریان های موازی جاوا در عمل

Summarize this content to 400 words in Persian Lang
در مقاله قبلی “موازی و همزمانی 101” به بررسی مفاهیم اساسی پیرامون این دو مبحث پرداختیم. ما بحث می کنیم که چگونه این مفاهیم به برنامه ها اجازه می دهد تا چندین کار را به طور همزمان انجام دهند و عملکرد و کارایی را بهبود بخشند.

در این مقاله دوم از این مجموعه، ما عمیق‌تر به استفاده از آن خواهیم پرداخت parallel stream در جاوا معرفی شده در جاوا 8، parallel stream ویژگی‌ای است که پردازش موازی مجموعه‌ها را تسهیل می‌کند و از هسته‌های متعدد CPU برای بهبود عملکرد عملیات بر روی حجم زیادی از داده‌ها بهره می‌برد.

بیایید بررسی کنیم که چگونه parallel stream کارها، مزایا و معایب آن، و نحوه سفارشی کردن استخر نخ مورد استفاده. ما همچنین در مورد تکنیک “سرقت کار” که توسط اجرا شده است صحبت خواهیم کرد ForkJoinPool و اهمیت آن برای تعادل بار و کارایی.

بیا، ما مطالب زیادی برای پوشش داریم!

فهرست مطالب

جریان موازی: موازی سازی آسان شده است

جاوا 8 استریم ها را به عنوان روشی جدید برای تکرار و انجام عملیات بر روی مجموعه ها به صورت اعلامی معرفی کرد. Stream ها یک API غنی برای دستکاری داده ها ارائه می دهند که عملیات هایی مانند فیلتر کردن، نقشه برداری، کاهش و غیره را امکان پذیر می کند.

جریان ها را می توان در حالت ترتیبی یا اجرای موازی ایجاد کرد. بیایید به نحوه ایجاد و استفاده از هر دو نوع جریان با مثال نگاه کنیم.

متوالی

import java.util.Arrays;
import java.util.List;

public class SequentialStreamExample {
public static void main(String[] args) {
ListInteger> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

// Criando um stream sequencial
numbers.stream()
.forEach(n -> System.out.println(“Thread: ” + Thread.currentThread().getName() + ” – Número: ” + n));
}
}

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

در این مثال، متد stream() یک جریان متوالی از لیست اعداد ایجاد می کند. عملیات forEach روی هر عنصر در لیست تکرار می شود و شماره را همراه با نام رشته ای که عنصر را پردازش می کند چاپ می کند. از آنجایی که یک جریان متوالی است، همه عناصر توسط یک رشته پردازش می شوند.

اگر مثال بالا را اجرا کنیم، نتیجه زیر را خواهیم دید:

➜ sandbox java SequentialStreamExample
Thread: main – Número: 1
Thread: main – Número: 2
Thread: main – Número: 3
Thread: main – Número: 4
Thread: main – Número: 5
Thread: main – Número: 6
Thread: main – Número: 7
Thread: main – Número: 8
Thread: main – Número: 9
Thread: main – Número: 10

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

موازی

حالا بیایید یک مثال را اجرا کنیم parallel. برای انجام این کار، فقط باید جریان خود را با آن ایجاد کنیم parallel stream:

import java.util.Arrays;
import java.util.List;

public class ParallelStreamExample {
public static void main(String[] args) {
ListInteger> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

// Criando um stream paralelo
numbers.parallelStream()
.forEach(n -> System.out.println(“Thread: ” + Thread.currentThread().getName() + ” – Número: ” + n));
}
}

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

اگر این مثال را چندین بار اجرا کنیم، ترتیب همیشه متفاوت خواهد بود:

➜ sandbox java ParallelStreamExample
Thread: ForkJoinPool.commonPool-worker-2 – Número: 2
Thread: main – Número: 7
Thread: ForkJoinPool.commonPool-worker-6 – Número: 4
Thread: ForkJoinPool.commonPool-worker-3 – Número: 5
Thread: ForkJoinPool.commonPool-worker-4 – Número: 9
Thread: ForkJoinPool.commonPool-worker-1 – Número: 3
Thread: ForkJoinPool.commonPool-worker-9 – Número: 10
Thread: ForkJoinPool.commonPool-worker-8 – Número: 6
Thread: ForkJoinPool.commonPool-worker-7 – Número: 8
Thread: ForkJoinPool.commonPool-worker-5 – Número: 1

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

راه‌های دیگری هم برای ایجاد استریم وجود دارد، هم متوالی و هم موازی، اما من این را به عنوان تکلیف شما می‌گذارم.

توازی و همزمانی

حال که یک مثال ساده با parallel، چه ارتباطی با مفاهیمی که در مقاله قبلی توازی و همزمانی مطرح کردیم، دارد؟

چگونه parallel stream آیا از رقابت استفاده می کند؟

وقتی یک استریم را به یک parallel stream، در صورت امکان، جاوا وظیفه را به چند کار فرعی تقسیم می کند که می توانند به طور همزمان اجرا شوند. هر یک از وظایف فرعی به یک رشته جداگانه اختصاص داده می شود که می تواند بر روی یک هسته پردازنده متفاوت اجرا شود. مدیریت این رشته ها شامل مفاهیم همزمانی است، مانند:

مجموعه موضوعات: O ForkJoinPool اغلب برای مدیریت موضوعات در استفاده می شود parallel stream.
همگام سازی: اطمینان از ایمن بودن عملیات روی داده های مشترک.
تغییر زمینه: از طریق سرقت کار ForkJoinPool وظایفی را از صف thread مشغول می‌گیرد تا روی رشته‌هایی که اشغال نشده‌اند اجرا شوند.

چگونه parallel stream آیا از موازی سازی استفاده می کند؟

O parallel stream با اجرای همزمان این وظایف فرعی بر روی چندین هسته از موازی سازی بهره می برد. این می تواند منجر به اجرای سریعتر شود، به خصوص برای عملیاتی که مستقل هستند و می توانند به صورت موازی بدون تداخل با یکدیگر انجام شوند.

ForkJoinPool – homi چیست 😳؟

به طور پیش فرض، parallel stream استفاده می کند ForkJoinPool.commonPool()، که یک مخزن رشته مشترک است که برای همه وظایف فورک/پیوستن در دسترس است. این استخر به گونه‌ای پیکربندی شده است که از تعدادی رشته برابر با تعداد هسته‌های موجود در پردازنده استفاده کند، که به کارها اجازه می‌دهد به طور موازی به طور موثر اجرا شوند.

اگر بخواهیم بررسی کنیم که چند موضوع داریم forkJoinPool خواهد داشت، فقط تعداد رنگ های موجود در جاوا را چاپ کنید. به عنوان مثال، اگر خط زیر را در هر برنامه جاوا در مک خود اجرا کنم:

System.out.println(“Number of cores available: “+ Runtime.getRuntime().availableProcessors());

Number of cores available: 12

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

توجه داشته باشید که عدد بالا می تواند کمی پیچیده تر باشد، به خصوص زمانی که ظروف درگیر هستند.

تغییر اندازه ForkJoinPool استاندارد

🛑 نکته: این کار را نکنید!

اندازه استاندارد از ForkJoinPool.commonPool() را می توان با تنظیم ویژگی سیستم تغییر داد java.util.concurrent.ForkJoinPool.common.parallelism. این را می توان با راه اندازی JVM با گزینه -D انجام داد، به عنوان مثال:

java -Djava.util.concurrent.ForkJoinPool.common.parallelism=8 MinhaAplicacao

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

این دستور پیکربندی می کند ForkJoinPool.commonPool() برای استفاده از 8 رشته

چون تو نه آیا باید اندازه پیش فرض ForkJoinPool را تغییر دهم؟

اندازه پیش فرض را تغییر دهید ForkJoinPool.commonPool() ممکن است بر بخش‌های دیگر برنامه یا کتابخانه‌هایی که از استخر مشترک نیز استفاده می‌کنند، تأثیر منفی بگذارد.

O commonPool این یک منبع مشترک است و تغییر رفتار آن می تواند مسائل مربوط به عملکرد و همزمانی را معرفی کند که تشخیص آنها دشوار است. در عوض، توصیه می شود که a را ایجاد و استفاده کنید ForkJoinPool برای کارهای خاصی که نیاز به موازی سازی دقیق دارند، سفارشی شده است، و تضمین می کند که سایر بخش های برنامه پایدار و قابل پیش بینی باقی می مانند.

جایگزین بهتری چیست؟

✅ نکته: در صورت لزوم این کار را انجام دهید!

برای اصلاح thread pool به صورت محلی می توانید از روش استفاده کنید ForkJoinPool#submit برای ارائه وظیفه ای که انجام می دهد parallel stream در زمینه استخر سفارشی.

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ForkJoinPool;

public class CustomForkJoinPoolExample {
public static void main(String[] args) {
ListInteger> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

// Configurando o ForkJoinPool para usar um número específico de threads
ForkJoinPool customThreadPool = new ForkJoinPool(4);

try {
customThreadPool.submit(() ->
numbers.parallelStream()
.forEach(n -> {
System.out.println(“Thread: ” + Thread.currentThread().getName() + ” – Número: ” + n);
})
).get();
} catch (Exception e) {
e.printStackTrace();
} finally {
customThreadPool.shutdown();
}
}
}

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

توجه داشته باشید که وقتی چاپ قبلی را انجام دادم، 12 رنگ در دسترس داشتم، اما توجه داشته باشید که هنگام اجرای کد بالا، فقط 4 رشته خواهم داشت.

Thread: ForkJoinPool-1-worker-4 – Número: 6
Thread: ForkJoinPool-1-worker-4 – Número: 8
Thread: ForkJoinPool-1-worker-1 – Número: 7
Thread: ForkJoinPool-1-worker-2 – Número: 3
Thread: ForkJoinPool-1-worker-3 – Número: 9
Thread: ForkJoinPool-1-worker-3 – Número: 1
Thread: ForkJoinPool-1-worker-4 – Número: 10
Thread: ForkJoinPool-1-worker-1 – Número: 2
Thread: ForkJoinPool-1-worker-2 – Número: 5
Thread: ForkJoinPool-1-worker-3 – Número: 4

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

همچنین توجه داشته باشید که در مثال اول، وظایفی داریم که توسط main نخ. با این حال، در custom وظایف ما فقط توسط کارگران تعریف شده در استخر ما اجرا می شود.

اکنون که ما در مورد ایجاد می دانیم parallel stream و جایی که آنها اجرا می شوند (ForkJoinPool)، بیایید چند چیز جالب دیگر در مورد این عملکرد بحث کنیم.

Work-Stealing بدون ForkJoinPool

همانطور که قبلا ذکر کردیم، ForkJoinPool مخزن نخ پیش فرض استفاده شده توسط است parallel stream. این استخر تکنیکی به نام «سرقت کار» را اجرا می کند. این تکنیک برای اطمینان از کارایی و تعادل بار بین رزوه ها ضروری است.

سرقت کار چگونه کار می کند؟

بخش وظیفه: زمانی که وظیفه ای به ForkJoinPool، به وظایف فرعی کوچکتر تقسیم می شود که بین رشته های استخر توزیع شده است. هر رشته یک صف از وظایف را حفظ می کند.
اجرای محلی: هر رشته سعی می کند وظایف را از صف خود اجرا کند. اگر موضوعی وظایف خود را تمام کند یا بیکار شود، بررسی می کند که آیا کار بیشتری برای انجام وجود دارد یا خیر.
Work Stealing: اگر کار یک رشته تمام شود، سعی می کند وظایف را از صف های رشته های دیگر بدزدد. این کار با گرفتن وظایف از انتهای صف یک رشته دیگر انجام می شود، در حالی که خود نخ قربانی از ابتدای صف خود به انجام وظایف ادامه می دهد.

این مکانیسم سرقت کار کمک می کند تا همه رشته ها مشغول نگه داشته شوند و حجم کار به طور موثر متعادل شود.

مثال مشاهده کار دزدی

بیایید یک ForkJoinPool سفارشی با تعداد مشخصی از رشته ها ایجاد کنیم و وظایف را با زمان های مختلف اجرا شبیه سازی کنیم. هدف مشاهده این است که چگونه نخ های بیکار از رشته هایی که هنوز مشغول هستند، کار می کنند.

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ForkJoinPool;

public class WorkStealingExample {
public static void main(String[] args) {
ListInteger> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

// Configurando o ForkJoinPool para usar um número específico de threads
ForkJoinPool customThreadPool = new ForkJoinPool(4);

try {
customThreadPool.submit(() ->
numbers.parallelStream()
.forEach(n -> {
try {
if (n % 2 == 0) {
// Simulando uma tarefa que leva tempo
Thread.sleep(2000);
} else {
Thread.sleep(100);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(“Thread: ” + Thread.currentThread().getName() + ” – Número: ” + n);
})
).get();
} catch (Exception e) {
e.printStackTrace();
} finally {
customThreadPool.shutdown();
}
}
}

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

توجه داشته باشید که اگر کد بالا را اجرا کنید، این تمایل وجود دارد که کارهای با اعداد فرد ابتدا تمام شوند. به عنوان مثال، یکی از اجراهای من این بود:

Thread: ForkJoinPool-1-worker-2 – Número: 3
Thread: ForkJoinPool-1-worker-1 – Número: 7
Thread: ForkJoinPool-1-worker-3 – Número: 9
Thread: ForkJoinPool-1-worker-2 – Número: 5
Thread: ForkJoinPool-1-worker-4 – Número: 6
Thread: ForkJoinPool-1-worker-4 – Número: 1
Thread: ForkJoinPool-1-worker-3 – Número: 10
Thread: ForkJoinPool-1-worker-1 – Número: 2
Thread: ForkJoinPool-1-worker-2 – Número: 4
Thread: ForkJoinPool-1-worker-4 – Número: 8

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

و چه اتفاقی می افتد اگر forkJoinPool نکرده بود work-stealing? خوب، رشته‌هایی با وظایف کوچک‌تر ابتدا وظایف خود را به پایان می‌رسانند و بی‌حرکت می‌مانند، در حالی که رشته‌هایی با وظایف بزرگ‌تر شلوغ می‌شوند و یک گلوگاه غیرضروری در برنامه ایجاد می‌کنند.

تعجب! حتماً فکر می‌کنید «اکنون که همه اینها را می‌دانم، همیشه خلق خواهم کرد parallel streams برای افزایش عملکرد برنامه هایم!”

آرام باش پاداوان جوان، مثل همیشه، بستگی دارد…

عملکرد بین Sequential ه Parallel

خطی وجود دارد که استفاده از موازی جالب نیست. جاوا در بهینه سازی کد برنامه عالی است، بنابراین اغلب، یک استریم متوالی ساده () عملکرد عالی خواهد داشت! بیایید یک مثال با کد را ببینیم:

// Imports removidos pra manter o código breve. Veja o repo para o código completo.

@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Thread)
@Fork(value = 2)
public class SeqParallelBenchmark {

@Param({“100”, “1000000”})
private int size;
private ListInteger> data;

@Setup
public void setup() {
data = IntStream.rangeClosed(1, size).boxed().collect(Collectors.toList());
}

@Benchmark
public void test_sequential() {
data.stream().mapToInt(Integer::intValue).sum();

}

@Benchmark
public void test_parallel() {
data.stream().parallel().mapToInt(Integer::intValue).sum();
}

public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
.include(SeqParallelBenchmark.class.getName()) // specify the benchmark class here
.forks(2)
.build();
new Runner(opt).run();
}
}

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

وقتی معیار بالا را روی دستگاهم اجرا کردم، نتیجه زیر را گرفتم:

Benchmark (size) Mode Cnt Score Error Units
SeqParallelBenchmark.test_sequential 100 avgt 10 ≈ 10⁻⁴ ms/op
SeqParallelBenchmark.test_parallel 100 avgt 10 0.022 ± 0.008 ms/op

SeqParallelBenchmark.test_sequential 1000000 avgt 10 0.482 ± 0.016 ms/op
SeqParallelBenchmark.test_parallel 1000000 avgt 10 0.117 ± 0.016 ms/op

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

توجه داشته باشید که با 100 عنصر، روش متوالی در 0.0001 ms اجرا می شود در حالی که موازی 0.022 میلی ثانیه طول می کشد، تقریبا 200 برابر کندتر. با این حال، وقتی برای میلیون‌ها عنصر اجرا می‌کنیم، موازی 4 برابر سریع‌تر از نتیجه متوالی است.

من این بار معیار دیگری را با عملیات متفاوت اجرا کردم:

// Imports removidos pra manter o código breve. Veja o repo para o código completo.

@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Thread)
public class AnotherSeqParallelBenchmark {

@Param({“100”, “1000000”})
private int size;

private ListInteger> data;

@Setup
public void setup() {
data = IntStream.rangeClosed(1, size).boxed().collect(Collectors.toList());
}

@Benchmark
public ListDouble> testSequentialStream() {
return data.stream()
.map(Math::sin)
.collect(Collectors.toList());
}

@Benchmark
public ListDouble> testParallelStream() {
return data.parallelStream()
.map(Math::sin)
.collect(Collectors.toList());
}

public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
.include(AnotherSeqParallelBenchmark.class.getName()) // specify the benchmark class here
.forks(2)
.build();

new Runner(opt).run();
}
}

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

و نتیجه بسیار مشابه بود:

Benchmark (size) Mode Cnt Score Error Units
AnotherSeqParallelBenchmark.testSequentialStream 100 avgt 10 0.001 ± 0.001 ms/op
AnotherSeqParallelBenchmark.testParallelStream 100 avgt 10 0.027 ± 0.005 ms/op

AnotherSeqParallelBenchmark.testParallelStream 1000000 avgt 10 2.234 ± 0.025 ms/op
AnotherSeqParallelBenchmark.testSequentialStream 1000000 avgt 10 16.164 ± 0.573 ms/op

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

توجه داشته باشید که چگونه، یک بار دیگر، در یک مجموعه کوچک، موازی بدتر از ترتیبی (~27 برابر بدتر) عمل کرد. با این حال، زمانی که با میلیون‌ها محاسبه مواجه شدیم، عملکرد 8 برابر سریع‌تر از متوالی بود.

نتیجه

استفاده نکن parallel:

اگر عملیات شما محدود به ورودی/خروجی است: تماس شبکه با میکروسرویس باشد یا نوشتن فایل. چرا؟ شما از I/O بسیار بیشتر از CPU استفاده خواهید کرد، در واقع، می توانید عملیات های بسیار بیشتری را نسبت به تعداد هسته های برنامه خود باز کنید. به عنوان مثال، در سناریوی اخیر من ~ 500 اتصال را به یک میکروسرویس دیگر باز کردم، بسیار بالاتر از 8 هسته دستگاهم.

اگر حجم داده شما کم است: Parallel همانطور که در بالا دیدیم، سربار را با استفاده از Thread Pool، هماهنگی وظایف، سرقت کارهای متوقف شده و غیره اضافه می کند. با توجه به این موضوع، اگر حجم داده شما کوچک است، این احتمال وجود دارد که a sequential سریعتر از یک parallel. در صورت شک، آزمایش هایی را روی برنامه خود اجرا کنید تا ببینید آیا ارزش آن را دارد یا خیر.

استفاده کنید parallel:

اگر سناریوی شما محدود به CPU است: تبدیل داده ها، محاسبات و غیره چرا؟ اگر تعداد نخ های پیش فرض در parallel stream تعداد هسته های دستگاه است و عملکرد آن به CPU محدود می شود، به این معنی که شما می توانید از تمام هسته ها به صورت موازی استفاده کنید و از منابع موجود حداکثر استفاده را ببرید.

اگر وظایف شما مستقل است: همانطور که می خواهید از موازی سازی استفاده کنید، جالب است که وظایف شما مستقل و منزوی هستند، زیرا در زمینه های مختلف و به ترتیب غیرقابل پیش بینی اجرا می شوند.

اگر معیار شما افزایش عملکرد را نشان می دهد: برخی آزمایش ها را انجام دهید، به عنوان مثال با استفاده از ابزاری مانند JMH.

Parallel streams این یک راه آسان برای ورود به دنیای برنامه نویسی موازی است، زیرا عملکرد چندین نگرانی را انتزاعی می کند. برای همه موارد قابل اجرا نیست، اما زمانی که به خوبی اعمال شود می تواند کمک بزرگی در افزایش عملکرد برنامه های شما باشد.

تمام نمونه های این پست در مخزن من موجود است:

Repo برای ذخیره POC، آزمایش ها، آموزش ها.

Repo برای ذخیره POC، آزمایش ها، آموزش ها.

com.hugodesmarques.parallel – آزمایش برای مقاله “موازی و همزمانی 102: جریانهای موازی جاوا در عمل”.

در پست بعدی استفاده از نخ های سنتی و حوضچه های نخ با مجری را بیشتر بررسی خواهیم کرد. آنجا میبینمت!

در مقاله قبلی “موازی و همزمانی 101” به بررسی مفاهیم اساسی پیرامون این دو مبحث پرداختیم. ما بحث می کنیم که چگونه این مفاهیم به برنامه ها اجازه می دهد تا چندین کار را به طور همزمان انجام دهند و عملکرد و کارایی را بهبود بخشند.

در این مقاله دوم از این مجموعه، ما عمیق‌تر به استفاده از آن خواهیم پرداخت parallel stream در جاوا معرفی شده در جاوا 8، parallel stream ویژگی‌ای است که پردازش موازی مجموعه‌ها را تسهیل می‌کند و از هسته‌های متعدد CPU برای بهبود عملکرد عملیات بر روی حجم زیادی از داده‌ها بهره می‌برد.

بیایید بررسی کنیم که چگونه parallel stream کارها، مزایا و معایب آن، و نحوه سفارشی کردن استخر نخ مورد استفاده. ما همچنین در مورد تکنیک “سرقت کار” که توسط اجرا شده است صحبت خواهیم کرد ForkJoinPool و اهمیت آن برای تعادل بار و کارایی.

بیا، ما مطالب زیادی برای پوشش داریم!

فهرست مطالب

جریان موازی: موازی سازی آسان شده است

جاوا 8 استریم ها را به عنوان روشی جدید برای تکرار و انجام عملیات بر روی مجموعه ها به صورت اعلامی معرفی کرد. Stream ها یک API غنی برای دستکاری داده ها ارائه می دهند که عملیات هایی مانند فیلتر کردن، نقشه برداری، کاهش و غیره را امکان پذیر می کند.

جریان ها را می توان در حالت ترتیبی یا اجرای موازی ایجاد کرد. بیایید به نحوه ایجاد و استفاده از هر دو نوع جریان با مثال نگاه کنیم.

متوالی

import java.util.Arrays;
import java.util.List;

public class SequentialStreamExample {
    public static void main(String[] args) {
        ListInteger> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

        // Criando um stream sequencial
        numbers.stream()
               .forEach(n -> System.out.println("Thread: " + Thread.currentThread().getName() + " - Número: " + n));
    }
}
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

در این مثال، متد stream() یک جریان متوالی از لیست اعداد ایجاد می کند. عملیات forEach روی هر عنصر در لیست تکرار می شود و شماره را همراه با نام رشته ای که عنصر را پردازش می کند چاپ می کند. از آنجایی که یک جریان متوالی است، همه عناصر توسط یک رشته پردازش می شوند.

اگر مثال بالا را اجرا کنیم، نتیجه زیر را خواهیم دید:

  sandbox java SequentialStreamExample      
Thread: main - Número: 1
Thread: main - Número: 2
Thread: main - Número: 3
Thread: main - Número: 4
Thread: main - Número: 5
Thread: main - Número: 6
Thread: main - Número: 7
Thread: main - Número: 8
Thread: main - Número: 9
Thread: main - Número: 10
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

موازی

حالا بیایید یک مثال را اجرا کنیم parallel. برای انجام این کار، فقط باید جریان خود را با آن ایجاد کنیم parallel stream:

import java.util.Arrays;
import java.util.List;

public class ParallelStreamExample {
    public static void main(String[] args) {
        ListInteger> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

        // Criando um stream paralelo
        numbers.parallelStream()
                .forEach(n -> System.out.println("Thread: " + Thread.currentThread().getName() + " - Número: " + n));
    }
}
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

اگر این مثال را چندین بار اجرا کنیم، ترتیب همیشه متفاوت خواهد بود:

  sandbox java ParallelStreamExample  
Thread: ForkJoinPool.commonPool-worker-2 - Número: 2
Thread: main - Número: 7
Thread: ForkJoinPool.commonPool-worker-6 - Número: 4
Thread: ForkJoinPool.commonPool-worker-3 - Número: 5
Thread: ForkJoinPool.commonPool-worker-4 - Número: 9
Thread: ForkJoinPool.commonPool-worker-1 - Número: 3
Thread: ForkJoinPool.commonPool-worker-9 - Número: 10
Thread: ForkJoinPool.commonPool-worker-8 - Número: 6
Thread: ForkJoinPool.commonPool-worker-7 - Número: 8
Thread: ForkJoinPool.commonPool-worker-5 - Número: 1
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

راه‌های دیگری هم برای ایجاد استریم وجود دارد، هم متوالی و هم موازی، اما من این را به عنوان تکلیف شما می‌گذارم.

توازی و همزمانی

حال که یک مثال ساده با parallel، چه ارتباطی با مفاهیمی که در مقاله قبلی توازی و همزمانی مطرح کردیم، دارد؟

چگونه parallel stream آیا از رقابت استفاده می کند؟

وقتی یک استریم را به یک parallel stream، در صورت امکان، جاوا وظیفه را به چند کار فرعی تقسیم می کند که می توانند به طور همزمان اجرا شوند. هر یک از وظایف فرعی به یک رشته جداگانه اختصاص داده می شود که می تواند بر روی یک هسته پردازنده متفاوت اجرا شود. مدیریت این رشته ها شامل مفاهیم همزمانی است، مانند:

  • مجموعه موضوعات: O ForkJoinPool اغلب برای مدیریت موضوعات در استفاده می شود parallel stream.
  • همگام سازی: اطمینان از ایمن بودن عملیات روی داده های مشترک.
  • تغییر زمینه: از طریق سرقت کار ForkJoinPool وظایفی را از صف thread مشغول می‌گیرد تا روی رشته‌هایی که اشغال نشده‌اند اجرا شوند.

چگونه parallel stream آیا از موازی سازی استفاده می کند؟

O parallel stream با اجرای همزمان این وظایف فرعی بر روی چندین هسته از موازی سازی بهره می برد. این می تواند منجر به اجرای سریعتر شود، به خصوص برای عملیاتی که مستقل هستند و می توانند به صورت موازی بدون تداخل با یکدیگر انجام شوند.

ForkJoinPool – homi چیست 😳؟

به طور پیش فرض، parallel stream استفاده می کند ForkJoinPool.commonPool()، که یک مخزن رشته مشترک است که برای همه وظایف فورک/پیوستن در دسترس است. این استخر به گونه‌ای پیکربندی شده است که از تعدادی رشته برابر با تعداد هسته‌های موجود در پردازنده استفاده کند، که به کارها اجازه می‌دهد به طور موازی به طور موثر اجرا شوند.

اگر بخواهیم بررسی کنیم که چند موضوع داریم forkJoinPool خواهد داشت، فقط تعداد رنگ های موجود در جاوا را چاپ کنید. به عنوان مثال، اگر خط زیر را در هر برنامه جاوا در مک خود اجرا کنم:

System.out.println("Number of cores available: "+ Runtime.getRuntime().availableProcessors());

Number of cores available: 12
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

توجه داشته باشید که عدد بالا می تواند کمی پیچیده تر باشد، به خصوص زمانی که ظروف درگیر هستند.

تغییر اندازه ForkJoinPool استاندارد

🛑 نکته: این کار را نکنید!

اندازه استاندارد از ForkJoinPool.commonPool() را می توان با تنظیم ویژگی سیستم تغییر داد java.util.concurrent.ForkJoinPool.common.parallelism. این را می توان با راه اندازی JVM با گزینه -D انجام داد، به عنوان مثال:

java -Djava.util.concurrent.ForkJoinPool.common.parallelism=8 MinhaAplicacao
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

این دستور پیکربندی می کند ForkJoinPool.commonPool() برای استفاده از 8 رشته

چون تو نه آیا باید اندازه پیش فرض ForkJoinPool را تغییر دهم؟

اندازه پیش فرض را تغییر دهید ForkJoinPool.commonPool() ممکن است بر بخش‌های دیگر برنامه یا کتابخانه‌هایی که از استخر مشترک نیز استفاده می‌کنند، تأثیر منفی بگذارد.

O commonPool این یک منبع مشترک است و تغییر رفتار آن می تواند مسائل مربوط به عملکرد و همزمانی را معرفی کند که تشخیص آنها دشوار است. در عوض، توصیه می شود که a را ایجاد و استفاده کنید ForkJoinPool برای کارهای خاصی که نیاز به موازی سازی دقیق دارند، سفارشی شده است، و تضمین می کند که سایر بخش های برنامه پایدار و قابل پیش بینی باقی می مانند.

جایگزین بهتری چیست؟

✅ نکته: در صورت لزوم این کار را انجام دهید!

برای اصلاح thread pool به صورت محلی می توانید از روش استفاده کنید ForkJoinPool#submit برای ارائه وظیفه ای که انجام می دهد parallel stream در زمینه استخر سفارشی.

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ForkJoinPool;

public class CustomForkJoinPoolExample {
    public static void main(String[] args) {
        ListInteger> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

        // Configurando o ForkJoinPool para usar um número específico de threads
        ForkJoinPool customThreadPool = new ForkJoinPool(4);

        try {
            customThreadPool.submit(() ->
                    numbers.parallelStream()
                            .forEach(n -> {
                                System.out.println("Thread: " + Thread.currentThread().getName() + " - Número: " + n);
                            })
            ).get();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            customThreadPool.shutdown();
        }
    }
}
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

توجه داشته باشید که وقتی چاپ قبلی را انجام دادم، 12 رنگ در دسترس داشتم، اما توجه داشته باشید که هنگام اجرای کد بالا، فقط 4 رشته خواهم داشت.

Thread: ForkJoinPool-1-worker-4 - Número: 6
Thread: ForkJoinPool-1-worker-4 - Número: 8
Thread: ForkJoinPool-1-worker-1 - Número: 7
Thread: ForkJoinPool-1-worker-2 - Número: 3
Thread: ForkJoinPool-1-worker-3 - Número: 9
Thread: ForkJoinPool-1-worker-3 - Número: 1
Thread: ForkJoinPool-1-worker-4 - Número: 10
Thread: ForkJoinPool-1-worker-1 - Número: 2
Thread: ForkJoinPool-1-worker-2 - Número: 5
Thread: ForkJoinPool-1-worker-3 - Número: 4
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

همچنین توجه داشته باشید که در مثال اول، وظایفی داریم که توسط main نخ. با این حال، در custom وظایف ما فقط توسط کارگران تعریف شده در استخر ما اجرا می شود.

اکنون که ما در مورد ایجاد می دانیم parallel stream و جایی که آنها اجرا می شوند (ForkJoinPool)، بیایید چند چیز جالب دیگر در مورد این عملکرد بحث کنیم.

Work-Stealing بدون ForkJoinPool

همانطور که قبلا ذکر کردیم، ForkJoinPool مخزن نخ پیش فرض استفاده شده توسط است parallel stream. این استخر تکنیکی به نام «سرقت کار» را اجرا می کند. این تکنیک برای اطمینان از کارایی و تعادل بار بین رزوه ها ضروری است.

سرقت کار چگونه کار می کند؟

  • بخش وظیفه: زمانی که وظیفه ای به ForkJoinPool، به وظایف فرعی کوچکتر تقسیم می شود که بین رشته های استخر توزیع شده است. هر رشته یک صف از وظایف را حفظ می کند.
  • اجرای محلی: هر رشته سعی می کند وظایف را از صف خود اجرا کند. اگر موضوعی وظایف خود را تمام کند یا بیکار شود، بررسی می کند که آیا کار بیشتری برای انجام وجود دارد یا خیر.
  • Work Stealing: اگر کار یک رشته تمام شود، سعی می کند وظایف را از صف های رشته های دیگر بدزدد. این کار با گرفتن وظایف از انتهای صف یک رشته دیگر انجام می شود، در حالی که خود نخ قربانی از ابتدای صف خود به انجام وظایف ادامه می دهد.

این مکانیسم سرقت کار کمک می کند تا همه رشته ها مشغول نگه داشته شوند و حجم کار به طور موثر متعادل شود.

مثال مشاهده کار دزدی

بیایید یک ForkJoinPool سفارشی با تعداد مشخصی از رشته ها ایجاد کنیم و وظایف را با زمان های مختلف اجرا شبیه سازی کنیم. هدف مشاهده این است که چگونه نخ های بیکار از رشته هایی که هنوز مشغول هستند، کار می کنند.

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ForkJoinPool;

public class WorkStealingExample {
    public static void main(String[] args) {
        ListInteger> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

        // Configurando o ForkJoinPool para usar um número específico de threads
        ForkJoinPool customThreadPool = new ForkJoinPool(4);

        try {
            customThreadPool.submit(() ->
                    numbers.parallelStream()
                            .forEach(n -> {
                                try {
                                    if (n % 2 == 0) {
                                        // Simulando uma tarefa que leva tempo
                                        Thread.sleep(2000);
                                    } else {
                                        Thread.sleep(100);
                                    }
                                } catch (InterruptedException e) {
                                    e.printStackTrace();
                                }
                                System.out.println("Thread: " + Thread.currentThread().getName() + " - Número: " + n);
                            })
            ).get();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            customThreadPool.shutdown();
        }
    }
}
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

توجه داشته باشید که اگر کد بالا را اجرا کنید، این تمایل وجود دارد که کارهای با اعداد فرد ابتدا تمام شوند. به عنوان مثال، یکی از اجراهای من این بود:

Thread: ForkJoinPool-1-worker-2 - Número: 3
Thread: ForkJoinPool-1-worker-1 - Número: 7
Thread: ForkJoinPool-1-worker-3 - Número: 9
Thread: ForkJoinPool-1-worker-2 - Número: 5
Thread: ForkJoinPool-1-worker-4 - Número: 6
Thread: ForkJoinPool-1-worker-4 - Número: 1
Thread: ForkJoinPool-1-worker-3 - Número: 10
Thread: ForkJoinPool-1-worker-1 - Número: 2
Thread: ForkJoinPool-1-worker-2 - Número: 4
Thread: ForkJoinPool-1-worker-4 - Número: 8
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

و چه اتفاقی می افتد اگر forkJoinPool نکرده بود work-stealing? خوب، رشته‌هایی با وظایف کوچک‌تر ابتدا وظایف خود را به پایان می‌رسانند و بی‌حرکت می‌مانند، در حالی که رشته‌هایی با وظایف بزرگ‌تر شلوغ می‌شوند و یک گلوگاه غیرضروری در برنامه ایجاد می‌کنند.

تعجب! حتماً فکر می‌کنید «اکنون که همه اینها را می‌دانم، همیشه خلق خواهم کرد parallel streams برای افزایش عملکرد برنامه هایم!”

آرام باش پاداوان جوان، مثل همیشه، بستگی دارد…

عملکرد بین Sequential ه Parallel

خطی وجود دارد که استفاده از موازی جالب نیست. جاوا در بهینه سازی کد برنامه عالی است، بنابراین اغلب، یک استریم متوالی ساده () عملکرد عالی خواهد داشت! بیایید یک مثال با کد را ببینیم:

// Imports removidos pra manter o código breve. Veja o repo para o código completo.

@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Thread)
@Fork(value = 2)
public class SeqParallelBenchmark {

    @Param({"100", "1000000"})
    private int size;
    private ListInteger> data;

    @Setup
    public void setup() {
        data = IntStream.rangeClosed(1, size).boxed().collect(Collectors.toList());
    }

    @Benchmark
    public void test_sequential() {
        data.stream().mapToInt(Integer::intValue).sum();

    }

    @Benchmark
    public void test_parallel() {
        data.stream().parallel().mapToInt(Integer::intValue).sum();
    }

    public static void main(String[] args) throws RunnerException {
        Options opt = new OptionsBuilder()
                .include(SeqParallelBenchmark.class.getName()) // specify the benchmark class here
                .forks(2)
                .build();
        new Runner(opt).run();
    }
}

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

وقتی معیار بالا را روی دستگاهم اجرا کردم، نتیجه زیر را گرفتم:

Benchmark                              (size)  Mode  Cnt   Score    Error  Units
SeqParallelBenchmark.test_sequential      100  avgt   10   10⁻⁴           ms/op
SeqParallelBenchmark.test_parallel        100  avgt   10   0.022 ±  0.008  ms/op

SeqParallelBenchmark.test_sequential  1000000  avgt   10   0.482 ±  0.016  ms/op
SeqParallelBenchmark.test_parallel    1000000  avgt   10   0.117 ±  0.016  ms/op
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

توجه داشته باشید که با 100 عنصر، روش متوالی در 0.0001 ms اجرا می شود در حالی که موازی 0.022 میلی ثانیه طول می کشد، تقریبا 200 برابر کندتر. با این حال، وقتی برای میلیون‌ها عنصر اجرا می‌کنیم، موازی 4 برابر سریع‌تر از نتیجه متوالی است.

من این بار معیار دیگری را با عملیات متفاوت اجرا کردم:

// Imports removidos pra manter o código breve. Veja o repo para o código completo.

@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Thread)
public class AnotherSeqParallelBenchmark {

    @Param({"100", "1000000"})
    private int size;

    private ListInteger> data;

    @Setup
    public void setup() {
        data = IntStream.rangeClosed(1, size).boxed().collect(Collectors.toList());
    }

    @Benchmark
    public ListDouble> testSequentialStream() {
        return data.stream()
                .map(Math::sin)
                .collect(Collectors.toList());
    }

    @Benchmark
    public ListDouble> testParallelStream() {
        return data.parallelStream()
                .map(Math::sin)
                .collect(Collectors.toList());
    }

    public static void main(String[] args) throws RunnerException {
        Options opt = new OptionsBuilder()
                .include(AnotherSeqParallelBenchmark.class.getName()) // specify the benchmark class here
                .forks(2)
                .build();

        new Runner(opt).run();
    }
}
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

و نتیجه بسیار مشابه بود:

Benchmark                                          (size)  Mode  Cnt   Score    Error  Units
AnotherSeqParallelBenchmark.testSequentialStream      100  avgt   10   0.001 ±  0.001  ms/op
AnotherSeqParallelBenchmark.testParallelStream        100  avgt   10   0.027 ±  0.005  ms/op

AnotherSeqParallelBenchmark.testParallelStream    1000000  avgt   10   2.234 ±  0.025  ms/op
AnotherSeqParallelBenchmark.testSequentialStream  1000000  avgt   10  16.164 ±  0.573  ms/op
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

توجه داشته باشید که چگونه، یک بار دیگر، در یک مجموعه کوچک، موازی بدتر از ترتیبی (~27 برابر بدتر) عمل کرد. با این حال، زمانی که با میلیون‌ها محاسبه مواجه شدیم، عملکرد 8 برابر سریع‌تر از متوالی بود.

نتیجه

استفاده نکن parallel:

  1. اگر عملیات شما محدود به ورودی/خروجی است: تماس شبکه با میکروسرویس باشد یا نوشتن فایل. چرا؟ شما از I/O بسیار بیشتر از CPU استفاده خواهید کرد، در واقع، می توانید عملیات های بسیار بیشتری را نسبت به تعداد هسته های برنامه خود باز کنید. به عنوان مثال، در سناریوی اخیر من ~ 500 اتصال را به یک میکروسرویس دیگر باز کردم، بسیار بالاتر از 8 هسته دستگاهم.
  2. اگر حجم داده شما کم است: Parallel همانطور که در بالا دیدیم، سربار را با استفاده از Thread Pool، هماهنگی وظایف، سرقت کارهای متوقف شده و غیره اضافه می کند. با توجه به این موضوع، اگر حجم داده شما کوچک است، این احتمال وجود دارد که a sequential سریعتر از یک parallel. در صورت شک، آزمایش هایی را روی برنامه خود اجرا کنید تا ببینید آیا ارزش آن را دارد یا خیر.

استفاده کنید parallel:

  1. اگر سناریوی شما محدود به CPU است: تبدیل داده ها، محاسبات و غیره چرا؟ اگر تعداد نخ های پیش فرض در parallel stream تعداد هسته های دستگاه است و عملکرد آن به CPU محدود می شود، به این معنی که شما می توانید از تمام هسته ها به صورت موازی استفاده کنید و از منابع موجود حداکثر استفاده را ببرید.
  2. اگر وظایف شما مستقل است: همانطور که می خواهید از موازی سازی استفاده کنید، جالب است که وظایف شما مستقل و منزوی هستند، زیرا در زمینه های مختلف و به ترتیب غیرقابل پیش بینی اجرا می شوند.
  3. اگر معیار شما افزایش عملکرد را نشان می دهد: برخی آزمایش ها را انجام دهید، به عنوان مثال با استفاده از ابزاری مانند JMH.

Parallel streams این یک راه آسان برای ورود به دنیای برنامه نویسی موازی است، زیرا عملکرد چندین نگرانی را انتزاعی می کند. برای همه موارد قابل اجرا نیست، اما زمانی که به خوبی اعمال شود می تواند کمک بزرگی در افزایش عملکرد برنامه های شما باشد.

تمام نمونه های این پست در مخزن من موجود است:

Repo برای ذخیره POC، آزمایش ها، آموزش ها.

Repo برای ذخیره POC، آزمایش ها، آموزش ها.

  • com.hugodesmarques.parallel – آزمایش برای مقاله “موازی و همزمانی 102: جریانهای موازی جاوا در عمل”.

در پست بعدی استفاده از نخ های سنتی و حوضچه های نخ با مجری را بیشتر بررسی خواهیم کرد. آنجا میبینمت!

نوشته های مشابه

دیدگاهتان را بنویسید

نشانی ایمیل شما منتشر نخواهد شد. بخش‌های موردنیاز علامت‌گذاری شده‌اند *

دکمه بازگشت به بالا