موازی سازی و همزمانی 102: جریان های موازی جاوا در عمل

Summarize this content to 400 words in Persian Lang
در مقاله قبلی “موازی و همزمانی 101” به بررسی مفاهیم اساسی پیرامون این دو مبحث پرداختیم. ما بحث می کنیم که چگونه این مفاهیم به برنامه ها اجازه می دهد تا چندین کار را به طور همزمان انجام دهند و عملکرد و کارایی را بهبود بخشند.
در این مقاله دوم از این مجموعه، ما عمیقتر به استفاده از آن خواهیم پرداخت parallel stream در جاوا معرفی شده در جاوا 8، parallel stream ویژگیای است که پردازش موازی مجموعهها را تسهیل میکند و از هستههای متعدد CPU برای بهبود عملکرد عملیات بر روی حجم زیادی از دادهها بهره میبرد.
بیایید بررسی کنیم که چگونه parallel stream کارها، مزایا و معایب آن، و نحوه سفارشی کردن استخر نخ مورد استفاده. ما همچنین در مورد تکنیک “سرقت کار” که توسط اجرا شده است صحبت خواهیم کرد ForkJoinPool و اهمیت آن برای تعادل بار و کارایی.
بیا، ما مطالب زیادی برای پوشش داریم!
فهرست مطالب
جریان موازی: موازی سازی آسان شده است
جاوا 8 استریم ها را به عنوان روشی جدید برای تکرار و انجام عملیات بر روی مجموعه ها به صورت اعلامی معرفی کرد. Stream ها یک API غنی برای دستکاری داده ها ارائه می دهند که عملیات هایی مانند فیلتر کردن، نقشه برداری، کاهش و غیره را امکان پذیر می کند.
جریان ها را می توان در حالت ترتیبی یا اجرای موازی ایجاد کرد. بیایید به نحوه ایجاد و استفاده از هر دو نوع جریان با مثال نگاه کنیم.
متوالی
import java.util.Arrays;
import java.util.List;
public class SequentialStreamExample {
public static void main(String[] args) {
ListInteger> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
// Criando um stream sequencial
numbers.stream()
.forEach(n -> System.out.println(“Thread: ” + Thread.currentThread().getName() + ” – Número: ” + n));
}
}
وارد حالت تمام صفحه شوید
از حالت تمام صفحه خارج شوید
در این مثال، متد stream() یک جریان متوالی از لیست اعداد ایجاد می کند. عملیات forEach روی هر عنصر در لیست تکرار می شود و شماره را همراه با نام رشته ای که عنصر را پردازش می کند چاپ می کند. از آنجایی که یک جریان متوالی است، همه عناصر توسط یک رشته پردازش می شوند.
اگر مثال بالا را اجرا کنیم، نتیجه زیر را خواهیم دید:
➜ sandbox java SequentialStreamExample
Thread: main – Número: 1
Thread: main – Número: 2
Thread: main – Número: 3
Thread: main – Número: 4
Thread: main – Número: 5
Thread: main – Número: 6
Thread: main – Número: 7
Thread: main – Número: 8
Thread: main – Número: 9
Thread: main – Número: 10
وارد حالت تمام صفحه شوید
از حالت تمام صفحه خارج شوید
موازی
حالا بیایید یک مثال را اجرا کنیم parallel. برای انجام این کار، فقط باید جریان خود را با آن ایجاد کنیم parallel stream:
import java.util.Arrays;
import java.util.List;
public class ParallelStreamExample {
public static void main(String[] args) {
ListInteger> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
// Criando um stream paralelo
numbers.parallelStream()
.forEach(n -> System.out.println(“Thread: ” + Thread.currentThread().getName() + ” – Número: ” + n));
}
}
وارد حالت تمام صفحه شوید
از حالت تمام صفحه خارج شوید
اگر این مثال را چندین بار اجرا کنیم، ترتیب همیشه متفاوت خواهد بود:
➜ sandbox java ParallelStreamExample
Thread: ForkJoinPool.commonPool-worker-2 – Número: 2
Thread: main – Número: 7
Thread: ForkJoinPool.commonPool-worker-6 – Número: 4
Thread: ForkJoinPool.commonPool-worker-3 – Número: 5
Thread: ForkJoinPool.commonPool-worker-4 – Número: 9
Thread: ForkJoinPool.commonPool-worker-1 – Número: 3
Thread: ForkJoinPool.commonPool-worker-9 – Número: 10
Thread: ForkJoinPool.commonPool-worker-8 – Número: 6
Thread: ForkJoinPool.commonPool-worker-7 – Número: 8
Thread: ForkJoinPool.commonPool-worker-5 – Número: 1
وارد حالت تمام صفحه شوید
از حالت تمام صفحه خارج شوید
راههای دیگری هم برای ایجاد استریم وجود دارد، هم متوالی و هم موازی، اما من این را به عنوان تکلیف شما میگذارم.
توازی و همزمانی
حال که یک مثال ساده با parallel، چه ارتباطی با مفاهیمی که در مقاله قبلی توازی و همزمانی مطرح کردیم، دارد؟
چگونه parallel stream آیا از رقابت استفاده می کند؟
وقتی یک استریم را به یک parallel stream، در صورت امکان، جاوا وظیفه را به چند کار فرعی تقسیم می کند که می توانند به طور همزمان اجرا شوند. هر یک از وظایف فرعی به یک رشته جداگانه اختصاص داده می شود که می تواند بر روی یک هسته پردازنده متفاوت اجرا شود. مدیریت این رشته ها شامل مفاهیم همزمانی است، مانند:
مجموعه موضوعات: O ForkJoinPool اغلب برای مدیریت موضوعات در استفاده می شود parallel stream.
همگام سازی: اطمینان از ایمن بودن عملیات روی داده های مشترک.
تغییر زمینه: از طریق سرقت کار ForkJoinPool وظایفی را از صف thread مشغول میگیرد تا روی رشتههایی که اشغال نشدهاند اجرا شوند.
چگونه parallel stream آیا از موازی سازی استفاده می کند؟
O parallel stream با اجرای همزمان این وظایف فرعی بر روی چندین هسته از موازی سازی بهره می برد. این می تواند منجر به اجرای سریعتر شود، به خصوص برای عملیاتی که مستقل هستند و می توانند به صورت موازی بدون تداخل با یکدیگر انجام شوند.
ForkJoinPool – homi چیست 😳؟
به طور پیش فرض، parallel stream استفاده می کند ForkJoinPool.commonPool()، که یک مخزن رشته مشترک است که برای همه وظایف فورک/پیوستن در دسترس است. این استخر به گونهای پیکربندی شده است که از تعدادی رشته برابر با تعداد هستههای موجود در پردازنده استفاده کند، که به کارها اجازه میدهد به طور موازی به طور موثر اجرا شوند.
اگر بخواهیم بررسی کنیم که چند موضوع داریم forkJoinPool خواهد داشت، فقط تعداد رنگ های موجود در جاوا را چاپ کنید. به عنوان مثال، اگر خط زیر را در هر برنامه جاوا در مک خود اجرا کنم:
System.out.println(“Number of cores available: “+ Runtime.getRuntime().availableProcessors());
Number of cores available: 12
وارد حالت تمام صفحه شوید
از حالت تمام صفحه خارج شوید
توجه داشته باشید که عدد بالا می تواند کمی پیچیده تر باشد، به خصوص زمانی که ظروف درگیر هستند.
تغییر اندازه ForkJoinPool استاندارد
🛑 نکته: این کار را نکنید!
اندازه استاندارد از ForkJoinPool.commonPool() را می توان با تنظیم ویژگی سیستم تغییر داد java.util.concurrent.ForkJoinPool.common.parallelism. این را می توان با راه اندازی JVM با گزینه -D انجام داد، به عنوان مثال:
java -Djava.util.concurrent.ForkJoinPool.common.parallelism=8 MinhaAplicacao
وارد حالت تمام صفحه شوید
از حالت تمام صفحه خارج شوید
این دستور پیکربندی می کند ForkJoinPool.commonPool() برای استفاده از 8 رشته
چون تو نه آیا باید اندازه پیش فرض ForkJoinPool را تغییر دهم؟
اندازه پیش فرض را تغییر دهید ForkJoinPool.commonPool() ممکن است بر بخشهای دیگر برنامه یا کتابخانههایی که از استخر مشترک نیز استفاده میکنند، تأثیر منفی بگذارد.
O commonPool این یک منبع مشترک است و تغییر رفتار آن می تواند مسائل مربوط به عملکرد و همزمانی را معرفی کند که تشخیص آنها دشوار است. در عوض، توصیه می شود که a را ایجاد و استفاده کنید ForkJoinPool برای کارهای خاصی که نیاز به موازی سازی دقیق دارند، سفارشی شده است، و تضمین می کند که سایر بخش های برنامه پایدار و قابل پیش بینی باقی می مانند.
جایگزین بهتری چیست؟
✅ نکته: در صورت لزوم این کار را انجام دهید!
برای اصلاح thread pool به صورت محلی می توانید از روش استفاده کنید ForkJoinPool#submit برای ارائه وظیفه ای که انجام می دهد parallel stream در زمینه استخر سفارشی.
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
public class CustomForkJoinPoolExample {
public static void main(String[] args) {
ListInteger> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
// Configurando o ForkJoinPool para usar um número específico de threads
ForkJoinPool customThreadPool = new ForkJoinPool(4);
try {
customThreadPool.submit(() ->
numbers.parallelStream()
.forEach(n -> {
System.out.println(“Thread: ” + Thread.currentThread().getName() + ” – Número: ” + n);
})
).get();
} catch (Exception e) {
e.printStackTrace();
} finally {
customThreadPool.shutdown();
}
}
}
وارد حالت تمام صفحه شوید
از حالت تمام صفحه خارج شوید
توجه داشته باشید که وقتی چاپ قبلی را انجام دادم، 12 رنگ در دسترس داشتم، اما توجه داشته باشید که هنگام اجرای کد بالا، فقط 4 رشته خواهم داشت.
Thread: ForkJoinPool-1-worker-4 – Número: 6
Thread: ForkJoinPool-1-worker-4 – Número: 8
Thread: ForkJoinPool-1-worker-1 – Número: 7
Thread: ForkJoinPool-1-worker-2 – Número: 3
Thread: ForkJoinPool-1-worker-3 – Número: 9
Thread: ForkJoinPool-1-worker-3 – Número: 1
Thread: ForkJoinPool-1-worker-4 – Número: 10
Thread: ForkJoinPool-1-worker-1 – Número: 2
Thread: ForkJoinPool-1-worker-2 – Número: 5
Thread: ForkJoinPool-1-worker-3 – Número: 4
وارد حالت تمام صفحه شوید
از حالت تمام صفحه خارج شوید
همچنین توجه داشته باشید که در مثال اول، وظایفی داریم که توسط main نخ. با این حال، در custom وظایف ما فقط توسط کارگران تعریف شده در استخر ما اجرا می شود.
اکنون که ما در مورد ایجاد می دانیم parallel stream و جایی که آنها اجرا می شوند (ForkJoinPool)، بیایید چند چیز جالب دیگر در مورد این عملکرد بحث کنیم.
Work-Stealing بدون ForkJoinPool
همانطور که قبلا ذکر کردیم، ForkJoinPool مخزن نخ پیش فرض استفاده شده توسط است parallel stream. این استخر تکنیکی به نام «سرقت کار» را اجرا می کند. این تکنیک برای اطمینان از کارایی و تعادل بار بین رزوه ها ضروری است.
سرقت کار چگونه کار می کند؟
بخش وظیفه: زمانی که وظیفه ای به ForkJoinPool، به وظایف فرعی کوچکتر تقسیم می شود که بین رشته های استخر توزیع شده است. هر رشته یک صف از وظایف را حفظ می کند.
اجرای محلی: هر رشته سعی می کند وظایف را از صف خود اجرا کند. اگر موضوعی وظایف خود را تمام کند یا بیکار شود، بررسی می کند که آیا کار بیشتری برای انجام وجود دارد یا خیر.
Work Stealing: اگر کار یک رشته تمام شود، سعی می کند وظایف را از صف های رشته های دیگر بدزدد. این کار با گرفتن وظایف از انتهای صف یک رشته دیگر انجام می شود، در حالی که خود نخ قربانی از ابتدای صف خود به انجام وظایف ادامه می دهد.
این مکانیسم سرقت کار کمک می کند تا همه رشته ها مشغول نگه داشته شوند و حجم کار به طور موثر متعادل شود.
مثال مشاهده کار دزدی
بیایید یک ForkJoinPool سفارشی با تعداد مشخصی از رشته ها ایجاد کنیم و وظایف را با زمان های مختلف اجرا شبیه سازی کنیم. هدف مشاهده این است که چگونه نخ های بیکار از رشته هایی که هنوز مشغول هستند، کار می کنند.
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
public class WorkStealingExample {
public static void main(String[] args) {
ListInteger> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
// Configurando o ForkJoinPool para usar um número específico de threads
ForkJoinPool customThreadPool = new ForkJoinPool(4);
try {
customThreadPool.submit(() ->
numbers.parallelStream()
.forEach(n -> {
try {
if (n % 2 == 0) {
// Simulando uma tarefa que leva tempo
Thread.sleep(2000);
} else {
Thread.sleep(100);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(“Thread: ” + Thread.currentThread().getName() + ” – Número: ” + n);
})
).get();
} catch (Exception e) {
e.printStackTrace();
} finally {
customThreadPool.shutdown();
}
}
}
وارد حالت تمام صفحه شوید
از حالت تمام صفحه خارج شوید
توجه داشته باشید که اگر کد بالا را اجرا کنید، این تمایل وجود دارد که کارهای با اعداد فرد ابتدا تمام شوند. به عنوان مثال، یکی از اجراهای من این بود:
Thread: ForkJoinPool-1-worker-2 – Número: 3
Thread: ForkJoinPool-1-worker-1 – Número: 7
Thread: ForkJoinPool-1-worker-3 – Número: 9
Thread: ForkJoinPool-1-worker-2 – Número: 5
Thread: ForkJoinPool-1-worker-4 – Número: 6
Thread: ForkJoinPool-1-worker-4 – Número: 1
Thread: ForkJoinPool-1-worker-3 – Número: 10
Thread: ForkJoinPool-1-worker-1 – Número: 2
Thread: ForkJoinPool-1-worker-2 – Número: 4
Thread: ForkJoinPool-1-worker-4 – Número: 8
وارد حالت تمام صفحه شوید
از حالت تمام صفحه خارج شوید
و چه اتفاقی می افتد اگر forkJoinPool نکرده بود work-stealing? خوب، رشتههایی با وظایف کوچکتر ابتدا وظایف خود را به پایان میرسانند و بیحرکت میمانند، در حالی که رشتههایی با وظایف بزرگتر شلوغ میشوند و یک گلوگاه غیرضروری در برنامه ایجاد میکنند.
تعجب! حتماً فکر میکنید «اکنون که همه اینها را میدانم، همیشه خلق خواهم کرد parallel streams برای افزایش عملکرد برنامه هایم!”
آرام باش پاداوان جوان، مثل همیشه، بستگی دارد…
عملکرد بین Sequential ه Parallel
خطی وجود دارد که استفاده از موازی جالب نیست. جاوا در بهینه سازی کد برنامه عالی است، بنابراین اغلب، یک استریم متوالی ساده () عملکرد عالی خواهد داشت! بیایید یک مثال با کد را ببینیم:
// Imports removidos pra manter o código breve. Veja o repo para o código completo.
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Thread)
@Fork(value = 2)
public class SeqParallelBenchmark {
@Param({“100”, “1000000”})
private int size;
private ListInteger> data;
@Setup
public void setup() {
data = IntStream.rangeClosed(1, size).boxed().collect(Collectors.toList());
}
@Benchmark
public void test_sequential() {
data.stream().mapToInt(Integer::intValue).sum();
}
@Benchmark
public void test_parallel() {
data.stream().parallel().mapToInt(Integer::intValue).sum();
}
public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
.include(SeqParallelBenchmark.class.getName()) // specify the benchmark class here
.forks(2)
.build();
new Runner(opt).run();
}
}
وارد حالت تمام صفحه شوید
از حالت تمام صفحه خارج شوید
وقتی معیار بالا را روی دستگاهم اجرا کردم، نتیجه زیر را گرفتم:
Benchmark (size) Mode Cnt Score Error Units
SeqParallelBenchmark.test_sequential 100 avgt 10 ≈ 10⁻⁴ ms/op
SeqParallelBenchmark.test_parallel 100 avgt 10 0.022 ± 0.008 ms/op
SeqParallelBenchmark.test_sequential 1000000 avgt 10 0.482 ± 0.016 ms/op
SeqParallelBenchmark.test_parallel 1000000 avgt 10 0.117 ± 0.016 ms/op
وارد حالت تمام صفحه شوید
از حالت تمام صفحه خارج شوید
توجه داشته باشید که با 100 عنصر، روش متوالی در 0.0001 ms اجرا می شود در حالی که موازی 0.022 میلی ثانیه طول می کشد، تقریبا 200 برابر کندتر. با این حال، وقتی برای میلیونها عنصر اجرا میکنیم، موازی 4 برابر سریعتر از نتیجه متوالی است.
من این بار معیار دیگری را با عملیات متفاوت اجرا کردم:
// Imports removidos pra manter o código breve. Veja o repo para o código completo.
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Thread)
public class AnotherSeqParallelBenchmark {
@Param({“100”, “1000000”})
private int size;
private ListInteger> data;
@Setup
public void setup() {
data = IntStream.rangeClosed(1, size).boxed().collect(Collectors.toList());
}
@Benchmark
public ListDouble> testSequentialStream() {
return data.stream()
.map(Math::sin)
.collect(Collectors.toList());
}
@Benchmark
public ListDouble> testParallelStream() {
return data.parallelStream()
.map(Math::sin)
.collect(Collectors.toList());
}
public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
.include(AnotherSeqParallelBenchmark.class.getName()) // specify the benchmark class here
.forks(2)
.build();
new Runner(opt).run();
}
}
وارد حالت تمام صفحه شوید
از حالت تمام صفحه خارج شوید
و نتیجه بسیار مشابه بود:
Benchmark (size) Mode Cnt Score Error Units
AnotherSeqParallelBenchmark.testSequentialStream 100 avgt 10 0.001 ± 0.001 ms/op
AnotherSeqParallelBenchmark.testParallelStream 100 avgt 10 0.027 ± 0.005 ms/op
AnotherSeqParallelBenchmark.testParallelStream 1000000 avgt 10 2.234 ± 0.025 ms/op
AnotherSeqParallelBenchmark.testSequentialStream 1000000 avgt 10 16.164 ± 0.573 ms/op
وارد حالت تمام صفحه شوید
از حالت تمام صفحه خارج شوید
توجه داشته باشید که چگونه، یک بار دیگر، در یک مجموعه کوچک، موازی بدتر از ترتیبی (~27 برابر بدتر) عمل کرد. با این حال، زمانی که با میلیونها محاسبه مواجه شدیم، عملکرد 8 برابر سریعتر از متوالی بود.
نتیجه
استفاده نکن parallel:
اگر عملیات شما محدود به ورودی/خروجی است: تماس شبکه با میکروسرویس باشد یا نوشتن فایل. چرا؟ شما از I/O بسیار بیشتر از CPU استفاده خواهید کرد، در واقع، می توانید عملیات های بسیار بیشتری را نسبت به تعداد هسته های برنامه خود باز کنید. به عنوان مثال، در سناریوی اخیر من ~ 500 اتصال را به یک میکروسرویس دیگر باز کردم، بسیار بالاتر از 8 هسته دستگاهم.
اگر حجم داده شما کم است: Parallel همانطور که در بالا دیدیم، سربار را با استفاده از Thread Pool، هماهنگی وظایف، سرقت کارهای متوقف شده و غیره اضافه می کند. با توجه به این موضوع، اگر حجم داده شما کوچک است، این احتمال وجود دارد که a sequential سریعتر از یک parallel. در صورت شک، آزمایش هایی را روی برنامه خود اجرا کنید تا ببینید آیا ارزش آن را دارد یا خیر.
استفاده کنید parallel:
اگر سناریوی شما محدود به CPU است: تبدیل داده ها، محاسبات و غیره چرا؟ اگر تعداد نخ های پیش فرض در parallel stream تعداد هسته های دستگاه است و عملکرد آن به CPU محدود می شود، به این معنی که شما می توانید از تمام هسته ها به صورت موازی استفاده کنید و از منابع موجود حداکثر استفاده را ببرید.
اگر وظایف شما مستقل است: همانطور که می خواهید از موازی سازی استفاده کنید، جالب است که وظایف شما مستقل و منزوی هستند، زیرا در زمینه های مختلف و به ترتیب غیرقابل پیش بینی اجرا می شوند.
اگر معیار شما افزایش عملکرد را نشان می دهد: برخی آزمایش ها را انجام دهید، به عنوان مثال با استفاده از ابزاری مانند JMH.
Parallel streams این یک راه آسان برای ورود به دنیای برنامه نویسی موازی است، زیرا عملکرد چندین نگرانی را انتزاعی می کند. برای همه موارد قابل اجرا نیست، اما زمانی که به خوبی اعمال شود می تواند کمک بزرگی در افزایش عملکرد برنامه های شما باشد.
تمام نمونه های این پست در مخزن من موجود است:
Repo برای ذخیره POC، آزمایش ها، آموزش ها.
Repo برای ذخیره POC، آزمایش ها، آموزش ها.
com.hugodesmarques.parallel – آزمایش برای مقاله “موازی و همزمانی 102: جریانهای موازی جاوا در عمل”.
در پست بعدی استفاده از نخ های سنتی و حوضچه های نخ با مجری را بیشتر بررسی خواهیم کرد. آنجا میبینمت!
در مقاله قبلی “موازی و همزمانی 101” به بررسی مفاهیم اساسی پیرامون این دو مبحث پرداختیم. ما بحث می کنیم که چگونه این مفاهیم به برنامه ها اجازه می دهد تا چندین کار را به طور همزمان انجام دهند و عملکرد و کارایی را بهبود بخشند.
در این مقاله دوم از این مجموعه، ما عمیقتر به استفاده از آن خواهیم پرداخت parallel stream
در جاوا معرفی شده در جاوا 8، parallel stream
ویژگیای است که پردازش موازی مجموعهها را تسهیل میکند و از هستههای متعدد CPU برای بهبود عملکرد عملیات بر روی حجم زیادی از دادهها بهره میبرد.
بیایید بررسی کنیم که چگونه parallel stream
کارها، مزایا و معایب آن، و نحوه سفارشی کردن استخر نخ مورد استفاده. ما همچنین در مورد تکنیک “سرقت کار” که توسط اجرا شده است صحبت خواهیم کرد ForkJoinPool
و اهمیت آن برای تعادل بار و کارایی.
بیا، ما مطالب زیادی برای پوشش داریم!
فهرست مطالب
جریان موازی: موازی سازی آسان شده است
جاوا 8 استریم ها را به عنوان روشی جدید برای تکرار و انجام عملیات بر روی مجموعه ها به صورت اعلامی معرفی کرد. Stream ها یک API غنی برای دستکاری داده ها ارائه می دهند که عملیات هایی مانند فیلتر کردن، نقشه برداری، کاهش و غیره را امکان پذیر می کند.
جریان ها را می توان در حالت ترتیبی یا اجرای موازی ایجاد کرد. بیایید به نحوه ایجاد و استفاده از هر دو نوع جریان با مثال نگاه کنیم.
متوالی
import java.util.Arrays;
import java.util.List;
public class SequentialStreamExample {
public static void main(String[] args) {
ListInteger> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
// Criando um stream sequencial
numbers.stream()
.forEach(n -> System.out.println("Thread: " + Thread.currentThread().getName() + " - Número: " + n));
}
}
در این مثال، متد stream() یک جریان متوالی از لیست اعداد ایجاد می کند. عملیات forEach
روی هر عنصر در لیست تکرار می شود و شماره را همراه با نام رشته ای که عنصر را پردازش می کند چاپ می کند. از آنجایی که یک جریان متوالی است، همه عناصر توسط یک رشته پردازش می شوند.
اگر مثال بالا را اجرا کنیم، نتیجه زیر را خواهیم دید:
➜ sandbox java SequentialStreamExample
Thread: main - Número: 1
Thread: main - Número: 2
Thread: main - Número: 3
Thread: main - Número: 4
Thread: main - Número: 5
Thread: main - Número: 6
Thread: main - Número: 7
Thread: main - Número: 8
Thread: main - Número: 9
Thread: main - Número: 10
موازی
حالا بیایید یک مثال را اجرا کنیم parallel
. برای انجام این کار، فقط باید جریان خود را با آن ایجاد کنیم parallel stream
:
import java.util.Arrays;
import java.util.List;
public class ParallelStreamExample {
public static void main(String[] args) {
ListInteger> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
// Criando um stream paralelo
numbers.parallelStream()
.forEach(n -> System.out.println("Thread: " + Thread.currentThread().getName() + " - Número: " + n));
}
}
اگر این مثال را چندین بار اجرا کنیم، ترتیب همیشه متفاوت خواهد بود:
➜ sandbox java ParallelStreamExample
Thread: ForkJoinPool.commonPool-worker-2 - Número: 2
Thread: main - Número: 7
Thread: ForkJoinPool.commonPool-worker-6 - Número: 4
Thread: ForkJoinPool.commonPool-worker-3 - Número: 5
Thread: ForkJoinPool.commonPool-worker-4 - Número: 9
Thread: ForkJoinPool.commonPool-worker-1 - Número: 3
Thread: ForkJoinPool.commonPool-worker-9 - Número: 10
Thread: ForkJoinPool.commonPool-worker-8 - Número: 6
Thread: ForkJoinPool.commonPool-worker-7 - Número: 8
Thread: ForkJoinPool.commonPool-worker-5 - Número: 1
راههای دیگری هم برای ایجاد استریم وجود دارد، هم متوالی و هم موازی، اما من این را به عنوان تکلیف شما میگذارم.
توازی و همزمانی
حال که یک مثال ساده با parallel
، چه ارتباطی با مفاهیمی که در مقاله قبلی توازی و همزمانی مطرح کردیم، دارد؟
چگونه parallel stream
آیا از رقابت استفاده می کند؟
وقتی یک استریم را به یک parallel stream
، در صورت امکان، جاوا وظیفه را به چند کار فرعی تقسیم می کند که می توانند به طور همزمان اجرا شوند. هر یک از وظایف فرعی به یک رشته جداگانه اختصاص داده می شود که می تواند بر روی یک هسته پردازنده متفاوت اجرا شود. مدیریت این رشته ها شامل مفاهیم همزمانی است، مانند:
- مجموعه موضوعات: O
ForkJoinPool
اغلب برای مدیریت موضوعات در استفاده می شودparallel stream
. - همگام سازی: اطمینان از ایمن بودن عملیات روی داده های مشترک.
- تغییر زمینه: از طریق سرقت کار
ForkJoinPool
وظایفی را از صف thread مشغول میگیرد تا روی رشتههایی که اشغال نشدهاند اجرا شوند.
چگونه parallel stream
آیا از موازی سازی استفاده می کند؟
O parallel stream
با اجرای همزمان این وظایف فرعی بر روی چندین هسته از موازی سازی بهره می برد. این می تواند منجر به اجرای سریعتر شود، به خصوص برای عملیاتی که مستقل هستند و می توانند به صورت موازی بدون تداخل با یکدیگر انجام شوند.
ForkJoinPool – homi چیست 😳؟
به طور پیش فرض، parallel stream
استفاده می کند ForkJoinPool.commonPool()
، که یک مخزن رشته مشترک است که برای همه وظایف فورک/پیوستن در دسترس است. این استخر به گونهای پیکربندی شده است که از تعدادی رشته برابر با تعداد هستههای موجود در پردازنده استفاده کند، که به کارها اجازه میدهد به طور موازی به طور موثر اجرا شوند.
اگر بخواهیم بررسی کنیم که چند موضوع داریم forkJoinPool
خواهد داشت، فقط تعداد رنگ های موجود در جاوا را چاپ کنید. به عنوان مثال، اگر خط زیر را در هر برنامه جاوا در مک خود اجرا کنم:
System.out.println("Number of cores available: "+ Runtime.getRuntime().availableProcessors());
Number of cores available: 12
توجه داشته باشید که عدد بالا می تواند کمی پیچیده تر باشد، به خصوص زمانی که ظروف درگیر هستند.
تغییر اندازه ForkJoinPool
استاندارد
🛑 نکته: این کار را نکنید!
اندازه استاندارد از ForkJoinPool.commonPool()
را می توان با تنظیم ویژگی سیستم تغییر داد java.util.concurrent.ForkJoinPool.common.parallelism
. این را می توان با راه اندازی JVM با گزینه -D انجام داد، به عنوان مثال:
java -Djava.util.concurrent.ForkJoinPool.common.parallelism=8 MinhaAplicacao
این دستور پیکربندی می کند ForkJoinPool.commonPool()
برای استفاده از 8 رشته
چون تو نه آیا باید اندازه پیش فرض ForkJoinPool را تغییر دهم؟
اندازه پیش فرض را تغییر دهید ForkJoinPool.commonPool()
ممکن است بر بخشهای دیگر برنامه یا کتابخانههایی که از استخر مشترک نیز استفاده میکنند، تأثیر منفی بگذارد.
O commonPool
این یک منبع مشترک است و تغییر رفتار آن می تواند مسائل مربوط به عملکرد و همزمانی را معرفی کند که تشخیص آنها دشوار است. در عوض، توصیه می شود که a را ایجاد و استفاده کنید ForkJoinPool
برای کارهای خاصی که نیاز به موازی سازی دقیق دارند، سفارشی شده است، و تضمین می کند که سایر بخش های برنامه پایدار و قابل پیش بینی باقی می مانند.
جایگزین بهتری چیست؟
✅ نکته: در صورت لزوم این کار را انجام دهید!
برای اصلاح thread pool به صورت محلی می توانید از روش استفاده کنید ForkJoinPool#submit
برای ارائه وظیفه ای که انجام می دهد parallel stream
در زمینه استخر سفارشی.
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
public class CustomForkJoinPoolExample {
public static void main(String[] args) {
ListInteger> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
// Configurando o ForkJoinPool para usar um número específico de threads
ForkJoinPool customThreadPool = new ForkJoinPool(4);
try {
customThreadPool.submit(() ->
numbers.parallelStream()
.forEach(n -> {
System.out.println("Thread: " + Thread.currentThread().getName() + " - Número: " + n);
})
).get();
} catch (Exception e) {
e.printStackTrace();
} finally {
customThreadPool.shutdown();
}
}
}
توجه داشته باشید که وقتی چاپ قبلی را انجام دادم، 12 رنگ در دسترس داشتم، اما توجه داشته باشید که هنگام اجرای کد بالا، فقط 4 رشته خواهم داشت.
Thread: ForkJoinPool-1-worker-4 - Número: 6
Thread: ForkJoinPool-1-worker-4 - Número: 8
Thread: ForkJoinPool-1-worker-1 - Número: 7
Thread: ForkJoinPool-1-worker-2 - Número: 3
Thread: ForkJoinPool-1-worker-3 - Número: 9
Thread: ForkJoinPool-1-worker-3 - Número: 1
Thread: ForkJoinPool-1-worker-4 - Número: 10
Thread: ForkJoinPool-1-worker-1 - Número: 2
Thread: ForkJoinPool-1-worker-2 - Número: 5
Thread: ForkJoinPool-1-worker-3 - Número: 4
همچنین توجه داشته باشید که در مثال اول، وظایفی داریم که توسط main
نخ. با این حال، در custom
وظایف ما فقط توسط کارگران تعریف شده در استخر ما اجرا می شود.
اکنون که ما در مورد ایجاد می دانیم parallel stream
و جایی که آنها اجرا می شوند (ForkJoinPool
)، بیایید چند چیز جالب دیگر در مورد این عملکرد بحث کنیم.
Work-Stealing
بدون ForkJoinPool
همانطور که قبلا ذکر کردیم، ForkJoinPool
مخزن نخ پیش فرض استفاده شده توسط است parallel stream
. این استخر تکنیکی به نام «سرقت کار» را اجرا می کند. این تکنیک برای اطمینان از کارایی و تعادل بار بین رزوه ها ضروری است.
سرقت کار چگونه کار می کند؟
- بخش وظیفه: زمانی که وظیفه ای به
ForkJoinPool
، به وظایف فرعی کوچکتر تقسیم می شود که بین رشته های استخر توزیع شده است. هر رشته یک صف از وظایف را حفظ می کند. - اجرای محلی: هر رشته سعی می کند وظایف را از صف خود اجرا کند. اگر موضوعی وظایف خود را تمام کند یا بیکار شود، بررسی می کند که آیا کار بیشتری برای انجام وجود دارد یا خیر.
- Work Stealing: اگر کار یک رشته تمام شود، سعی می کند وظایف را از صف های رشته های دیگر بدزدد. این کار با گرفتن وظایف از انتهای صف یک رشته دیگر انجام می شود، در حالی که خود نخ قربانی از ابتدای صف خود به انجام وظایف ادامه می دهد.
این مکانیسم سرقت کار کمک می کند تا همه رشته ها مشغول نگه داشته شوند و حجم کار به طور موثر متعادل شود.
مثال مشاهده کار دزدی
بیایید یک ForkJoinPool سفارشی با تعداد مشخصی از رشته ها ایجاد کنیم و وظایف را با زمان های مختلف اجرا شبیه سازی کنیم. هدف مشاهده این است که چگونه نخ های بیکار از رشته هایی که هنوز مشغول هستند، کار می کنند.
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
public class WorkStealingExample {
public static void main(String[] args) {
ListInteger> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
// Configurando o ForkJoinPool para usar um número específico de threads
ForkJoinPool customThreadPool = new ForkJoinPool(4);
try {
customThreadPool.submit(() ->
numbers.parallelStream()
.forEach(n -> {
try {
if (n % 2 == 0) {
// Simulando uma tarefa que leva tempo
Thread.sleep(2000);
} else {
Thread.sleep(100);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Thread: " + Thread.currentThread().getName() + " - Número: " + n);
})
).get();
} catch (Exception e) {
e.printStackTrace();
} finally {
customThreadPool.shutdown();
}
}
}
توجه داشته باشید که اگر کد بالا را اجرا کنید، این تمایل وجود دارد که کارهای با اعداد فرد ابتدا تمام شوند. به عنوان مثال، یکی از اجراهای من این بود:
Thread: ForkJoinPool-1-worker-2 - Número: 3
Thread: ForkJoinPool-1-worker-1 - Número: 7
Thread: ForkJoinPool-1-worker-3 - Número: 9
Thread: ForkJoinPool-1-worker-2 - Número: 5
Thread: ForkJoinPool-1-worker-4 - Número: 6
Thread: ForkJoinPool-1-worker-4 - Número: 1
Thread: ForkJoinPool-1-worker-3 - Número: 10
Thread: ForkJoinPool-1-worker-1 - Número: 2
Thread: ForkJoinPool-1-worker-2 - Número: 4
Thread: ForkJoinPool-1-worker-4 - Número: 8
و چه اتفاقی می افتد اگر forkJoinPool
نکرده بود work-stealing
? خوب، رشتههایی با وظایف کوچکتر ابتدا وظایف خود را به پایان میرسانند و بیحرکت میمانند، در حالی که رشتههایی با وظایف بزرگتر شلوغ میشوند و یک گلوگاه غیرضروری در برنامه ایجاد میکنند.
تعجب! حتماً فکر میکنید «اکنون که همه اینها را میدانم، همیشه خلق خواهم کرد parallel streams
برای افزایش عملکرد برنامه هایم!”
آرام باش پاداوان جوان، مثل همیشه، بستگی دارد…
عملکرد بین Sequential
ه Parallel
خطی وجود دارد که استفاده از موازی جالب نیست. جاوا در بهینه سازی کد برنامه عالی است، بنابراین اغلب، یک استریم متوالی ساده () عملکرد عالی خواهد داشت! بیایید یک مثال با کد را ببینیم:
// Imports removidos pra manter o código breve. Veja o repo para o código completo.
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Thread)
@Fork(value = 2)
public class SeqParallelBenchmark {
@Param({"100", "1000000"})
private int size;
private ListInteger> data;
@Setup
public void setup() {
data = IntStream.rangeClosed(1, size).boxed().collect(Collectors.toList());
}
@Benchmark
public void test_sequential() {
data.stream().mapToInt(Integer::intValue).sum();
}
@Benchmark
public void test_parallel() {
data.stream().parallel().mapToInt(Integer::intValue).sum();
}
public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
.include(SeqParallelBenchmark.class.getName()) // specify the benchmark class here
.forks(2)
.build();
new Runner(opt).run();
}
}
وقتی معیار بالا را روی دستگاهم اجرا کردم، نتیجه زیر را گرفتم:
Benchmark (size) Mode Cnt Score Error Units
SeqParallelBenchmark.test_sequential 100 avgt 10 ≈ 10⁻⁴ ms/op
SeqParallelBenchmark.test_parallel 100 avgt 10 0.022 ± 0.008 ms/op
SeqParallelBenchmark.test_sequential 1000000 avgt 10 0.482 ± 0.016 ms/op
SeqParallelBenchmark.test_parallel 1000000 avgt 10 0.117 ± 0.016 ms/op
توجه داشته باشید که با 100 عنصر، روش متوالی در 0.0001 ms اجرا می شود در حالی که موازی 0.022 میلی ثانیه طول می کشد، تقریبا 200 برابر کندتر. با این حال، وقتی برای میلیونها عنصر اجرا میکنیم، موازی 4 برابر سریعتر از نتیجه متوالی است.
من این بار معیار دیگری را با عملیات متفاوت اجرا کردم:
// Imports removidos pra manter o código breve. Veja o repo para o código completo.
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Thread)
public class AnotherSeqParallelBenchmark {
@Param({"100", "1000000"})
private int size;
private ListInteger> data;
@Setup
public void setup() {
data = IntStream.rangeClosed(1, size).boxed().collect(Collectors.toList());
}
@Benchmark
public ListDouble> testSequentialStream() {
return data.stream()
.map(Math::sin)
.collect(Collectors.toList());
}
@Benchmark
public ListDouble> testParallelStream() {
return data.parallelStream()
.map(Math::sin)
.collect(Collectors.toList());
}
public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
.include(AnotherSeqParallelBenchmark.class.getName()) // specify the benchmark class here
.forks(2)
.build();
new Runner(opt).run();
}
}
و نتیجه بسیار مشابه بود:
Benchmark (size) Mode Cnt Score Error Units
AnotherSeqParallelBenchmark.testSequentialStream 100 avgt 10 0.001 ± 0.001 ms/op
AnotherSeqParallelBenchmark.testParallelStream 100 avgt 10 0.027 ± 0.005 ms/op
AnotherSeqParallelBenchmark.testParallelStream 1000000 avgt 10 2.234 ± 0.025 ms/op
AnotherSeqParallelBenchmark.testSequentialStream 1000000 avgt 10 16.164 ± 0.573 ms/op
توجه داشته باشید که چگونه، یک بار دیگر، در یک مجموعه کوچک، موازی بدتر از ترتیبی (~27 برابر بدتر) عمل کرد. با این حال، زمانی که با میلیونها محاسبه مواجه شدیم، عملکرد 8 برابر سریعتر از متوالی بود.
نتیجه
استفاده نکن parallel
:
- اگر عملیات شما محدود به ورودی/خروجی است: تماس شبکه با میکروسرویس باشد یا نوشتن فایل. چرا؟ شما از I/O بسیار بیشتر از CPU استفاده خواهید کرد، در واقع، می توانید عملیات های بسیار بیشتری را نسبت به تعداد هسته های برنامه خود باز کنید. به عنوان مثال، در سناریوی اخیر من ~ 500 اتصال را به یک میکروسرویس دیگر باز کردم، بسیار بالاتر از 8 هسته دستگاهم.
-
اگر حجم داده شما کم است:
Parallel
همانطور که در بالا دیدیم، سربار را با استفاده از Thread Pool، هماهنگی وظایف، سرقت کارهای متوقف شده و غیره اضافه می کند. با توجه به این موضوع، اگر حجم داده شما کوچک است، این احتمال وجود دارد که asequential
سریعتر از یکparallel
. در صورت شک، آزمایش هایی را روی برنامه خود اجرا کنید تا ببینید آیا ارزش آن را دارد یا خیر.
استفاده کنید parallel
:
-
اگر سناریوی شما محدود به CPU است: تبدیل داده ها، محاسبات و غیره چرا؟ اگر تعداد نخ های پیش فرض در
parallel stream
تعداد هسته های دستگاه است و عملکرد آن به CPU محدود می شود، به این معنی که شما می توانید از تمام هسته ها به صورت موازی استفاده کنید و از منابع موجود حداکثر استفاده را ببرید. - اگر وظایف شما مستقل است: همانطور که می خواهید از موازی سازی استفاده کنید، جالب است که وظایف شما مستقل و منزوی هستند، زیرا در زمینه های مختلف و به ترتیب غیرقابل پیش بینی اجرا می شوند.
- اگر معیار شما افزایش عملکرد را نشان می دهد: برخی آزمایش ها را انجام دهید، به عنوان مثال با استفاده از ابزاری مانند JMH.
Parallel streams
این یک راه آسان برای ورود به دنیای برنامه نویسی موازی است، زیرا عملکرد چندین نگرانی را انتزاعی می کند. برای همه موارد قابل اجرا نیست، اما زمانی که به خوبی اعمال شود می تواند کمک بزرگی در افزایش عملکرد برنامه های شما باشد.
تمام نمونه های این پست در مخزن من موجود است:
Repo برای ذخیره POC، آزمایش ها، آموزش ها.
Repo برای ذخیره POC، آزمایش ها، آموزش ها.
- com.hugodesmarques.parallel – آزمایش برای مقاله “موازی و همزمانی 102: جریانهای موازی جاوا در عمل”.
در پست بعدی استفاده از نخ های سنتی و حوضچه های نخ با مجری را بیشتر بررسی خواهیم کرد. آنجا میبینمت!