~~NOCACHE~~ ~~REVEAL theme=simple&disableLayout=0&transition=none&controls=1&show_progress_bar=1&build_all_lists=0&show_image_borders=0&horizontal_slide_level=2&enlarge_vertical_slide_headers=0&show_slide_details=1&open_in_new_window=1&size=1024x768~~ ====== Współbieżność w języku Java ====== ===== Wielozadaniowość a współbieżność ===== * **Wielozadaniowość** - równoczesne wykonanie wielu zadań (współbieżnie lub równolegle). * **Współbieżność** - jednoczesne wykonywanie wielu zadań w tym samym czasie. Mogą być wykonywane na jednym lub wielu rdzeniach. * **Równoległość** - faktyczne wykonywanie na wielu rdzeniach w tej samej chwili. * W praktyce: kod współbieżny może działać szybciej, ale przede wszystkim bywa bardziej responsywny. ===== Proces vs wątek ===== * **Proces** ma własną przestrzeń adresową i zasoby systemowe (pamięć, pliki, itp.). * procesy są izolowane, komunikacja między nimi jest kosztowna (IPC). * procesy konkurują o zasoby systemowe * **Wątek** to jednostka wykonawcza w ramach procesu, * współdzieli pamięć i zasoby z innymi wątkami tego samego procesu. * łatwa komunikacja między wątkami * szybsze tworzenie i przełączanie * Problemy: ryzyko wyścigów, zakleszczeń i trudnych błędów. ===== Gdzie współbieżność ma sens ===== * Operacje I/O: pliki, sieć, baza danych. * Aplikacje GUI: wątek UI nie może być blokowany długą operacją. * Serwery: równoległa obsługa wielu klientów. * Obliczenia dzielone na niezależne podzadania. * Nie każde zadanie przyspieszy - koszt tworzenia i synchronizacji wątków też istnieje. ===== Wątki w Javie - dwa podstawowe podejścia ===== * Dziedziczenie po klasie ''Thread''. * Implementacja interfejsu ''Runnable'' i przekazanie do ''new Thread(runnable)''. * Częściej preferujemy ''Runnable'' (lepsza separacja logiki zadania od mechanizmu wykonania). ===== Przykład: Thread ===== class CounterThread extends Thread { @Override public void run() { for (int i = 1; i <= 5; i++) { System.out.println(getName() + " -> " + i); try { Thread.sleep(200); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return; } } } } // użycie CounterThread t1 = new CounterThread(); t1.start(); ===== Przykład: Runnable ===== class PrintTask implements Runnable { private final String label; PrintTask(String label) { this.label = label; } @Override public void run() { System.out.println("Start: " + label + ", thread=" + Thread.currentThread().getName()); } } // użycie Thread t = new Thread(new PrintTask("Import danych")); t.start(); ===== Metody klasy Thread ===== * ''start()'' - uruchomienie nowego wątku. * ''run()'' - ciało zadania (wywoływane w nowym wątku po ''start()''). * ''sleep(ms)'' - pauza. * ''join()'' - blokuje wątek wywołujący, czekając na zakończenie tego wątku. * ''isAlive()'' - czy wątek jeszcze działa. * ''interrupt()'' - sygnał przerwania. * ''setName()'', ''getName()'', ''setPriority()'', ''getPriority()'', ''isInterrupted()'', ''getState()'', itd. Dokumentacja: [[https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/lang/Thread.html|Thread (Java SE 17 & JDK 17)]] ===== Cykl życia i stany wątku ===== * ''NEW'' - utworzony (nie uruchomiony). * ''RUNNABLE'' - gotowy lub wykonywany. * ''BLOCKED'' - zablokowany, czeka na monitor (''synchronized''). * ''WAITING'' - czeka na powiadomienie bez limitu (np. ''wait()'', ''join()''). * ''TIMED_WAITING'' - czeka z limitem (''sleep()'', ''wait(timeout)''). * ''TERMINATED'' - zakończony. {{ zajecia:java_2026_1:java_thraed_states.jpg?500 |Cykl życia wątku (źródło: https://howtodoinjava.com/java/multi-threading/java-thread-life-cycle-and-thread-states/)}} ===== Przykład ===== Thread worker = new Thread(() -> { try { Thread.sleep(500); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); worker.start(); // uruchomienie wątku System.out.println(worker.isAlive()); // zwykle true worker.join(); // czeka aż worker zakończy System.out.println(worker.isAlive()); // false ===== Przestarzałe metody Thread ===== * metody ''stop()'', ''suspend()'' i ''resume()'' są przestarzałe (deprecated) i niebezpieczne. * ''stop()'' może przerwać wątek w środku sekcji krytycznej. * ''suspend()'' może zatrzymać wątek trzymający blokadę i zablokować cały system. * ''resume()'' nie naprawia problemów projektowych i utrudnia przewidywalność. ===== Poprawny wzorzec zatrzymywania wątku ===== * Użyj ''interrupt()'' i współpracy kodu wątku. * W pętli sprawdzaj ''Thread.currentThread().isInterrupted()''. * Przy ''InterruptedException'' ustaw flagę ponownie i zakończ pracę. class Worker implements Runnable { @Override public void run() { while (!Thread.currentThread().isInterrupted()) { // porcja pracy try { Thread.sleep(100); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } } ===== Monitor w Javie ===== * Monitor to mechanizm synchronizacji, który zapewnia wzajemne wykluczanie. * Każdy obiekt w Javie ma monitor. * Klasa ''Object'' implementuje metody ''wait()'', ''notify()'' i ''notifyAll()''. * ''synchronized'' na metodzie lub bloku kodu = wejście do monitora obiektu (sekcja krytyczna). * Tylko jeden wątek może posiadać dany monitor naraz. ===== synchronized - metoda i blok ===== class SafeCounter { private int value; // metoda synchronizowana na całym obiekcie public synchronized void inc() { value++; } public int get() { // blok synchronizowany na tym obiekcie synchronized (this) { return value; } } } ===== Komunikacja między wątkami ===== * ''wait()'' zwalnia monitor i usypia wątek do czasu notyfikacji * ''notify()'' budzi jeden czekający wątek. * ''notifyAll()'' budzi wszystkie czekające. * Wywołujemy je tylko wewnątrz sekcji zsynchronizowanej na tym samym obiekcie. * Warunek blokady zawsze sprawdzamy w pętli ''while'', nie w ''if'', ponieważ po obudzeniu warunek może być już nieprawdziwy (np. inny wątek mógł przejąć monitor i zmienić stan). ===== Producent-konsument (monitor) ===== * Klasyczny problem synchronizacji. * Producent tworzy dane i umieszcza je w buforze. * Konsument pobiera dane z bufora i przetwarza. * Bufor może być ograniczony (np. jednoelementowy), co wymaga synchronizacji. class OneSlotBuffer { private Integer value = null; public synchronized void put(int v) throws InterruptedException { while (value != null) { wait(); // czeka aż konsument pobierze wartość } value = v; notifyAll(); } public synchronized int take() throws InterruptedException { while (value == null) { wait(); // czeka aż producent wstawi wartość } int result = value; value = null; notifyAll(); return result; } } ===== Priorytety wątków ===== * Zakres: ''Thread.MIN_PRIORITY'' (1) do ''Thread.MAX_PRIORITY'' (10). * Domyślnie ''Thread.NORM_PRIORITY'' (5). * Priorytet to tylko sugestia dla planisty systemu. * Nie opieramy poprawności programu na priorytetach. ===== Framework współbieżności: java.util.concurrent ===== * Zamiast ręcznie zarządzać wątkami i synchronizacją, można użyć gotowych narzędzi z pakietu [[https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/package-summary.html|java.util.concurrent]]. * Zapewniają lepszą wydajność, bezpieczeństwo i czytelność kodu. * ''ExecutorService'' - zarządzanie pulą wątków i zadaniami. * operacje i klasy atomowe, np. ''AtomicInteger'' * ''Fork/JoinPool'' - efektywne dzielenie zadań na mniejsze kawałki. * ''Lock'' i ''ReentrantLock'' - bardziej elastyczne blokady niż ''synchronized''. * synchronizacja wątków: ''Semaphore'', ''CountDownLatch'', ''CyclicBarrier'' * kolekcje współbieżne: ''ConcurrentHashMap'', ''CopyOnWriteArrayList'', itp. ===== Problem: race condition (wyścig danych) ===== * Dwa wątki modyfikują ten sam stan bez synchronizacji. * Operacje typu ''x++'' nie są atomowe. * Skutek: utrata aktualizacji i niepoprawne wyniki. class BrokenCounter { int value = 0; void inc() { value++; } } ===== Rozwiązania wyścigu danych ===== * ''synchronized'' - prosty i bezpieczny punkt startowy. * ''AtomicInteger'' - atomowe operacje bez jawnych blokad. * ''Lock'' (np. ''ReentrantLock'') - większa kontrola. * Dodatkowo: niemutowalność (niezmienność obiektów) i minimalizacja stanu współdzielonego. ===== Obiekty Lock (ReentrantLock) ===== import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; class LockCounter { private final Lock lock = new ReentrantLock(); private int value = 0; void inc() { lock.lock(); try { value++; } finally { lock.unlock(); } } } ===== Semafory ===== * ''Semaphore'' kontroluje liczbę jednoczesnych wejść do zasobu. * Przykłady: pula połączeń, limit zapytań, dostęp do ograniczonego zasobu. import java.util.concurrent.Semaphore; Semaphore semaphore = new Semaphore(3); // max 3 równolegle semaphore.acquire(); try { // sekcja z ograniczonym dostępem } finally { semaphore.release(); } ===== Narzędzia wysokiego poziomu: ExecutorService ===== * Zamiast ręcznie tworzyć wiele ''Thread'', używamy puli wątków. * Lepsza kontrola nad zasobami i prostszy kod. * Typowe metody: ''submit()'', ''invokeAll()'', ''shutdown()'', ''awaitTermination()''. import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; ExecutorService pool = Executors.newFixedThreadPool(4); pool.submit(() -> System.out.println("zadanie A")); pool.shutdown(); ===== Callable i Future ===== * ''Runnable'' nie zwraca wyniku i nie deklaruje wyjątku. * ''Callable'' zwraca wynik typu ''T'' i może rzucać wyjątek. * ''Future'' reprezentuje wynik obliczenia w tle. import java.util.concurrent.*; ExecutorService pool = Executors.newSingleThreadExecutor(); Future f = pool.submit(() -> 40 + 2); Integer result = f.get(); // może blokować pool.shutdown(); ===== Fork/JoinPool ===== * Model dziel i zwyciężaj (''divide and conquer'') * Duże zadanie dzielimy rekurencyjnie na mniejsze. * Klasy: ''RecursiveTask'' oraz ''RecursiveAction''. * Dobre dla obliczeń CPU-bound. ===== Deadlock, starvation, livelock ===== * **Deadlock** - dwa (lub więcej) wątków czeka cyklicznie na swoje blokady. * **Starvation** - wątek stale pomijany przez planowanie. * **Livelock** - wątki aktywne, ale bez postępu. ===== Jak unikać problemów współbieżności ===== * Stała kolejność zakładania blokad. * Małe sekcje krytyczne. * Timeouty i ''tryLock()'' dla krytycznych operacji. * Ograniczanie współdzielonego stanu. * Preferowanie gotowych struktur z ''java.util.concurrent''. ===== Częste wyjątki i pułapki ===== Nieprzechwycony i nieobsłużony wyjątek spowoduje przejście wątku do stanu TERMINATED Wyjątki, które mogą wystąpić w kodzie współbieżnym: * ''InterruptedException'' - rzucany, gdy wątek jest przerwany podczas oczekiwania, snu lub innej operacji blokującej * ''IllegalMonitorStateException'' - ''wait/notify'' wołany, gdy wątek nie posiada monitora obiektu * ''IllegalThreadStateException'' - np. próba uruchomienia wątku, który już został uruchomiony * ''SecurityException'' - próba operacji, do której wątek nie ma uprawnień * Nie ignoruj przerwań i nie zostawiaj blokad bez ''finally''. ===== Ćwiczenia praktyczne ===== * Napisz program, który tworzy wątek odliczający czas od N do 0, co sekundę wypisując aktualną wartość. Po odliczeniu do 0, wątek powinien wypisać "Czas minął!" i zakończyć działanie. * Uruchom wiele wątków odliczających * Zrealizuj wątek dziedzicząc po klasie ''Thread'' * Napisz program demonstrujący zjawisko wyścigu danych na przykładzie licznika, który jest inkrementowany przez kilka wątków bez synchronizacji. * zrealizuj wątki implementując ''Runnable''. * zaimplementuj bezpieczny licznik współbieżny za pomocą ''synchronized'' i ''AtomicInteger''. * Stwórz prosty producent-konsument z jednoelementowym buforem, używając ''wait()'' i ''notify()''. * Do programu odliczającego czas dodaj możliwość wstrzymania, wznowienia oraz przerwania odliczania, bez użycia przestarzałych metod ''suspend()'' i ''resume()''. * wstrzymanie zatrzymuje wątek (''wait()''), * wznowienie budzi wątek (''notify()''), * przerwanie przerywa odliczanie i kończy wątek. Zauważ, że przerwanie może nastąpić, gdy wątek jest wstrzymany. * przetestuj działanie i wypisz stan wątku po każdej operacji (''getState()''). ====== Zadanie 5: Liczby pierwsze ====== Napisz program, który wyznacza liczbę liczb pierwszych w zakresie od 2 do N wielowątkowo. Program sporządza porównanie szybkości działania w zależności od liczby wątków i pokazuje przyspieszenie względem wersji jednowątkowej. Wymagania: * Przyjmij dużą wartość N (np. 100_000_000) lub pozwól użytkownikowi podać N z klawiatury. * Obliczenia zrealizuj za pomocą K workerów: * dzielimy zakres wartości od 2 do N na K fragmentów * uruchamiamy K wątków (workerów), każdy z nich wyznacza liczbę liczb pierwszych w swoim fragmencie testując dzielniki dla każdej liczby w swoim fragmencie. Worker przechowuje wyniki w liczniku lokalnym. * liczbę pierwszą sprawdzaj przez test dzielników do sqrt(x). * po zakończeniu pracy wszystkich wątków sumujemy wyniki * Program przeprowadza pomiar czasu dla wersji jednowątkowej i wielowątkowej (dla różnych K). * Proponowane wartości K: 1, 2, 4, ..., P, 2P, gdzie P to liczba rdzeni procesora (tj. dostosuj zakres do liczby rdzeni, tak aby zobaczyć efekt przyspieszenia). * W programie wypisz liczbę rdzeni: ''Runtime.getRuntime().availableProcessors()''. * Zadbaj o poprawność operacji na współdzielonych danych (brak wyścigów). Program powinien wypisać, najlepiej w formie tabeli, dla każdej liczby wątków K=1,2,4,...: * liczbę liczb pierwszych, * czas wykonania, * przyspieszenie względem rozwiązania jednowątkowego T1 / Tk * informację, czy wynik jest poprawny (zgodny z jednowątkowym). ===== Materiały dodatkowe ===== * ''Thread'' API (Java 17): [[https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/lang/Thread.html|Thread]] * ''java.util.concurrent'' API: [[https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/concurrent/package-summary.html|java.util.concurrent]] * ''ExecutorService'' API: [[https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/concurrent/ExecutorService.html|ExecutorService]] * ''Lock'' API: [[https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/concurrent/locks/Lock.html|Lock]]