Temat: framework pomagający synchronizować wielowątkowe...
Adam Woźniak:
Sebastian Kolski:
Taaa, widzę, że bez pełnego kodu chyba nie dam rady do końca zrozumieć algorytmu ;]
btw:
z tego co zdążyłem jednak z kodu zrozumieć, to mam wrażenie, że metoda startNewJobOrRemoveIdFromRunningJobs nie zadziała zgodnie z oczekiwaniami, kiedy zostaje wywołana po powrocie taska z queueId=A, a dla queueId=B są zakolejkowane jakieś zadania. Osobiście oczekiwałbym, że skoro nie ma więcej zadań dla queueId=A, to zostanie do pracy przekazany jakiś inny oczekujący task.
Do tego dochodzi kolejne wymaganie, że jeśli dla queueId=A nie ma więcej zadań, to do przetwarzania ma trafić task, który najdłużej oczekuje zakolejkowany do wykonania (tak aby zachować warunek sprawiedliwości i, przy okazji, nie spowodować, aby doszło do zagłodzenia kolejek dla niektórych queueId).
Ale, jak wspomniałem, mogę się mylić lub nie do końca rozumieć algorytmu.
Pozdrowienia,
Adam WoźniakAdam Woźniak edytował(a) ten post dnia 08.09.09 o godzinie 15:14
Ten kod jest prawie cały z wyjątkiem konstruktorów/importów. Całość wrzuciłem
http://pokazywarka.pl/threadpool/
Od razu dodam, że ja go nie uruchamiałem, a jedynie próbowałem napisać coś zgodnie z zasadami z "Clean Code ...."
Aby zadziałał trzebao stworzyć ThreadPoolExecutorWhichSignalTaskFinish i ThreadPoolQueueManager i wywołać na managerze setThreadPool
Jak to powinno działać:
ThreadPoolExecutorWhichSignalTaskFinish jest to standardowy ThreadPoolExecutor, czyli pula wątków, która posiada własną kolejkę zadań. Odpowiada on za realizowanie w wątkach zadań z kolejki i powiadamianiu o ich zakończeniu.
ThreadPoolQueueManager to jest klasa, która obsługuje logikę, czyli dba o to aby do kolejki ThreadPoolExecutor'a trafiło na raz tylko jedno zadanie o danym id.
Robi to w ten sposób, że w trzyma Set'a z Id zadań, które są w ThreadPoolExecutorze (zarówno tych, które się wykonują, jak i tych, które są w kolejce executora). W momencie gdy przychodzi nowe zadanie do wykonania, sprawdza czy zadanie o takim id jest w executorze (wykonuje się lub jest w jego kolejce). Jeśli tak to kolejkuje u siebie to zadanie, jeśli nie to pcha do executora (co nie znaczy, że zacznie się ono wykonywać).
W momencie, gdy zadanie kończy się, z executora przychodzi o tym sygnał zawierający id skończonego zadania. Jeśli w managerze jest kolejka zadań o tym id i nie jest ona pusta, to pierwsze zadanie z tej kolejki jest wysyłane do executora, w przeciwnym przypadku z listy id zadań, które są w executorze usuwane jest to id.
Metody submitForExecution i startNewJobOrRemoveIdFromRunningJobs są synchronizowane aby uniknąć sytuacji gdy wykonają się jednocześnie dla jakiegoś id i np startNewJobOrRemoveIdFromRunningJobs sprawdzi, że kolejka X jest pusta, następnie submitForExecution doda do tej kolejki nowego job'a, a wtedy startNewJobOrRemoveIdFromRunningJobs usunie X z listy Id jobów wysłanych do executora. Efektem było by to, że kolejka X by rosła i kolejne joby z niej nie były by startowane.
Zakładając, że nasz executor wykonuje jedno zadanie na raz i przychodzą nam do wykonania joby o id (1a, 2a, 3a, 1b, 3b), gdzie id to pierwsza cyfra. W executorze mamy wykonujący się job 1a i w kolejce executora 2a 3a. W kolejkach managera 1b, 3b i w liście id jobów wysłanych do executora (1, 2, 3). Po zakończeniu się joba 1a executor wysyła sygnał. Manager sprawdza, że kolejka dla 1 zawiera job'a (1b) i pcha go do executora. W executorze wykonuje się 2a i w kolejce jest (3a, 1b). Executor kończy joba 2a wysyła sygnał. Manager sprawdza, że w kolejce dla id 2 nie ma jobów więc usuwa 2 z listy jobów wysłanych do executora (zostaje 1, 3) i w liście jobów do wysłania 3b. Jeśli teraz przyszedł by job 2b to ponieważ 2 nie jest na liście jobów wysłanych do executora, to zostanie on od razu do niego wysłany. I tak dalej
Oczywiście w dużym uproszczeniu, executor pewnie nie będzie wykonywał jednego joba na raz i nie koniecznie musi wykonywać je po kolei.
Pozdrawiam