Adam Woźniak

Adam Woźniak software architect
and developer

Temat: framework pomagający synchronizować wielowątkowe...

Witam

Chciałbym Was zapytać, czy znacie jakieś gotowe rozwiązanie problemu:

Mamy pulę wątków, nazwałem ją ThreadPool, z N wątkami roboczymi.
Chcę do tego pool'a wrzucać taski do wykonania, które przez pool'a będą buforowane i, sukcesywnie, wykonywane (każdy task ma być wykonany przez jeden wątek roboczy).

Każdy task posiada (między innymi) property - queueId. Może (i jest) wiele tasków z tymi samymi queueId.

Dodatkowym wymaganiem jest, aby, dla zadanego queueId, taski z tymże queueId, były koniecznie wykonane w kolejności, w jakiej przyszły do ThreadPool'a.
Tym samym, dla tasków z różnymi queueId, nie ma wymagań co do kolejności ich wykonania (mogą być przetwarzane niezależnie).

Czy ktoś z Was zna jakieś gotowe rozwiązania podobnej synchronizacji?

Pozdrawiam,
Adam Woźniak

PS.
Jako, że nie znalazłem "gotowca", to sam to zaimplementowałem. Ale jestem ciekaw, czy ... kolejny raz nie wyłamywałem otwartych drzwi ;]

konto usunięte

Temat: framework pomagający synchronizować wielowątkowe...

Nie wiem czy dobrze zrozumiałem Twoje wymagania, ale wygląda na to, że gotową implementację masz już w JDK (od wersji 1.5). Zobacz np. na klasę ThreadPoolExecutor.
Sebastian Kolski

Sebastian Kolski programista/DBA

Temat: framework pomagający synchronizować wielowątkowe...

Adam Woźniak:
Witam

Chciałbym Was zapytać, czy znacie jakieś gotowe rozwiązanie problemu:

Mamy pulę wątków, nazwałem ją ThreadPool, z N wątkami roboczymi.
Chcę do tego pool'a wrzucać taski do wykonania, które przez pool'a będą buforowane i, sukcesywnie, wykonywane (każdy task ma być wykonany przez jeden wątek roboczy).

java.util.concurrent.ThreadPoolExecutor

Każdy task posiada (między innymi) property - queueId. Może (i jest) wiele tasków z tymi samymi queueId.

Dodatkowym wymaganiem jest, aby, dla zadanego queueId, taski z tymże queueId, były koniecznie wykonane w kolejności, w jakiej przyszły do ThreadPool'a.
Tym samym, dla tasków z różnymi queueId, nie ma wymagań co do kolejności ich wykonania (mogą być przetwarzane niezależnie).

Mam wrażenie, że dbanie o kolejność/logikę wrzucania tasków do wykonania jest to osobna odpowiedzialność do wyrzucenia do osobnej klasy. W każdym razie nie wydaje mi się, aby dopisywanie tej funkcjonalności bezpośrednio do ThreadPool'a było dobrym pomysłem, a odniosłem wrażenie, że tego oczekujesz. Oczywiście nie znam szczegółów implementacji i w sumie tylko strzelam.
Czy ktoś z Was zna jakieś gotowe rozwiązania podobnej synchronizacji?

Pozdrawiam,
Adam Woźniak

PS.
Jako, że nie znalazłem "gotowca", to sam to zaimplementowałem. Ale jestem ciekaw, czy ... kolejny raz nie wyłamywałem otwartych drzwi ;]

Pozdrawiam
Sebastian Kolski
Krzysztof K.

Krzysztof K. Experienced Software
Engineer

Temat: framework pomagający synchronizować wielowątkowe...

java.util.concurrent.ThreadPoolExecutor

Każdy task posiada (między innymi) property - queueId. Może (i jest) wiele tasków z tymi samymi queueId.

Dodatkowym wymaganiem jest, aby, dla zadanego queueId, taski z tymże queueId, były koniecznie wykonane w kolejności, w jakiej przyszły do ThreadPool'a.
Tym samym, dla tasków z różnymi queueId, nie ma wymagań co do kolejności ich wykonania (mogą być przetwarzane niezależnie).

Nie ma zadnej gwarancji w jakiej kolejnosci wykonaja sie taski (watki) wrzucone do ThreadPoolExecutor'a. Musisz to sam oprogramowac.
Sebastian Kolski

Sebastian Kolski programista/DBA

Temat: framework pomagający synchronizować wielowątkowe...

Krzysztof K.:
java.util.concurrent.ThreadPoolExecutor

Każdy task posiada (między innymi) property - queueId. Może (i jest) wiele tasków z tymi samymi queueId.

Dodatkowym wymaganiem jest, aby, dla zadanego queueId, taski z tymże queueId, były koniecznie wykonane w kolejności, w jakiej przyszły do ThreadPool'a.
Tym samym, dla tasków z różnymi queueId, nie ma wymagań co do kolejności ich wykonania (mogą być przetwarzane niezależnie).

Nie ma zadnej gwarancji w jakiej kolejnosci wykonaja sie taski (watki) wrzucone do ThreadPoolExecutor'a. Musisz to sam oprogramowac.

Dlatego, tak jak pisałem, potrzebny jest extra manager, który zadba o wrzucanie odpowiednich tasków do ThreadPool'a.
Czytam sobie właśnie "Clean Code A Handbook of Agile Software Craftsmanship" Robert C. Martin (według mnie lektura obowiązkowa dla programisty) i w ramach ćwiczeń praktycznych stworzyłem sobie na szybko takiego potworka, który powinien spełniać założenia


public interface RunnableWithId extends Runnable {
public Integer getId();
}

public class ThreadPoolQueueManager implements PropertyChangeListener {

private ThreadPoolExecutorWhichSignalTaskFinish threadPool;
private Set<Integer> runningJobsIds = new HashSet<Integer>();
private Map<Integer, Queue<RunnableWithId>> jobsToRun = new HashMap<Integer, Queue<RunnableWithId>>();

<ciach> konstruktor/metoda do inicjacj threadPool </ciach>

public void propertyChange(PropertyChangeEvent evt) {
if (TASK_FINISHED_PROPERTY.equals(evt.getPropertyName())) {
startNewJobOrRemoveIdFromRunningJobs((Integer) evt.getNewValue());
}
}

private synchronized void startNewJobOrRemoveIdFromRunningJobs(Integer jobId) {
Queue<RunnableWithId> jobsWithSameId = jobsWithSameId(jobId);
if (jobsWithSameId.peek() == null) {
runningJobsIds.remove(jobId);
} else {
Runnable job = jobsWithSameId.poll();
threadPool.execute(job);
}
}

public synchronized void submitForExecution(RunnableWithId job) {
Integer jobId = job.getId();
if (runningJobsIds.contains(jobId)) {
Queue<RunnableWithId> jobQueue = jobsWithSameId(jobId);
jobQueue.offer(job);
} else {
runningJobsIds.add(jobId);
threadPool.execute(job);
}
}

private Queue<RunnableWithId> jobsWithSameId(Integer jobId) {
Queue<RunnableWithId> queue = jobsToRun.get(jobId);
if (queue == null) {
queue = new LinkedList<RunnableWithId>();
jobsToRun.put(jobId, queue);
}
return queue;
}
}

public class ThreadPoolExecutorWhichSignalTaskFinish extends ThreadPoolExecutor {

public static final String TASK_FINISHED_PROPERTY = "TASK_FINISHED";
private PropertyChangeSupport support;

<ciach> kontruktory </ciach>

@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (r instanceof RunnableWithId) {
support.firePropertyChange(TASK_FINISHED_PROPERTY, null, ((RunnableWithId) r).getId());
}
}

public void addPropertyChangeListener(PropertyChangeListener listener) {
support.addPropertyChangeListener(listener);
}

public void removePropertyChangeListener(PropertyChangeListener listener) {
support.removePropertyChangeListener(listener);
}
}
Adam Woźniak

Adam Woźniak software architect
and developer

Temat: framework pomagający synchronizować wielowątkowe...

Sebastian Kolski:
Dlatego, tak jak pisałem, potrzebny jest extra manager, który zadba o wrzucanie odpowiednich tasków do ThreadPool'a.
Czytam sobie właśnie "Clean Code A Handbook of Agile Software Craftsmanship" Robert C. Martin (według mnie lektura obowiązkowa dla programisty) i w ramach ćwiczeń praktycznych stworzyłem sobie na szybko takiego potworka, który powinien spełniać założenia

Dziękuję Sebastian za kod.

btw:
Możesz ten kod udostępnić gdzieś (pokazywarka.pl?) w całości, jeśli jeszcze ten kod masz?
Łatwiej by mi się to analizowało.

Pozdrawiam, Adam
Adam Woźniak

Adam Woźniak software architect
and developer

Temat: framework pomagający synchronizować wielowątkowe...

Sebastian Kolski:

Taaa, widzę, że bez pełnego kodu chyba nie dam rady do końca zrozumieć algorytmu ;]

btw:
z tego co zdążyłem jednak z kodu zrozumieć, to mam wrażenie, że metoda startNewJobOrRemoveIdFromRunningJobs nie zadziała zgodnie z oczekiwaniami, kiedy zostaje wywołana po powrocie taska z queueId=A, a dla queueId=B są zakolejkowane jakieś zadania. Osobiście oczekiwałbym, że skoro nie ma więcej zadań dla queueId=A, to zostanie do pracy przekazany jakiś inny oczekujący task.
Do tego dochodzi kolejne wymaganie, że jeśli dla queueId=A nie ma więcej zadań, to do przetwarzania ma trafić task, który najdłużej oczekuje zakolejkowany do wykonania (tak aby zachować warunek sprawiedliwości i, przy okazji, nie spowodować, aby doszło do zagłodzenia kolejek dla niektórych queueId).

Ale, jak wspomniałem, mogę się mylić lub nie do końca rozumieć algorytmu.

Pozdrowienia,
Adam WoźniakAdam Woźniak edytował(a) ten post dnia 08.09.09 o godzinie 15:14
Sebastian Kolski

Sebastian Kolski programista/DBA

Temat: framework pomagający synchronizować wielowątkowe...

Adam Woźniak:
Sebastian Kolski:

Taaa, widzę, że bez pełnego kodu chyba nie dam rady do końca zrozumieć algorytmu ;]

btw:
z tego co zdążyłem jednak z kodu zrozumieć, to mam wrażenie, że metoda startNewJobOrRemoveIdFromRunningJobs nie zadziała zgodnie z oczekiwaniami, kiedy zostaje wywołana po powrocie taska z queueId=A, a dla queueId=B są zakolejkowane jakieś zadania. Osobiście oczekiwałbym, że skoro nie ma więcej zadań dla queueId=A, to zostanie do pracy przekazany jakiś inny oczekujący task.
Do tego dochodzi kolejne wymaganie, że jeśli dla queueId=A nie ma więcej zadań, to do przetwarzania ma trafić task, który najdłużej oczekuje zakolejkowany do wykonania (tak aby zachować warunek sprawiedliwości i, przy okazji, nie spowodować, aby doszło do zagłodzenia kolejek dla niektórych queueId).

Ale, jak wspomniałem, mogę się mylić lub nie do końca rozumieć algorytmu.

Pozdrowienia,
Adam WoźniakAdam Woźniak edytował(a) ten post dnia 08.09.09 o godzinie 15:14

Ten kod jest prawie cały z wyjątkiem konstruktorów/importów. Całość wrzuciłem http://pokazywarka.pl/threadpool/
Od razu dodam, że ja go nie uruchamiałem, a jedynie próbowałem napisać coś zgodnie z zasadami z "Clean Code ...."
Aby zadziałał trzebao stworzyć ThreadPoolExecutorWhichSignalTaskFinish i ThreadPoolQueueManager i wywołać na managerze setThreadPool

Jak to powinno działać:

ThreadPoolExecutorWhichSignalTaskFinish jest to standardowy ThreadPoolExecutor, czyli pula wątków, która posiada własną kolejkę zadań. Odpowiada on za realizowanie w wątkach zadań z kolejki i powiadamianiu o ich zakończeniu.

ThreadPoolQueueManager to jest klasa, która obsługuje logikę, czyli dba o to aby do kolejki ThreadPoolExecutor'a trafiło na raz tylko jedno zadanie o danym id.

Robi to w ten sposób, że w trzyma Set'a z Id zadań, które są w ThreadPoolExecutorze (zarówno tych, które się wykonują, jak i tych, które są w kolejce executora). W momencie gdy przychodzi nowe zadanie do wykonania, sprawdza czy zadanie o takim id jest w executorze (wykonuje się lub jest w jego kolejce). Jeśli tak to kolejkuje u siebie to zadanie, jeśli nie to pcha do executora (co nie znaczy, że zacznie się ono wykonywać).
W momencie, gdy zadanie kończy się, z executora przychodzi o tym sygnał zawierający id skończonego zadania. Jeśli w managerze jest kolejka zadań o tym id i nie jest ona pusta, to pierwsze zadanie z tej kolejki jest wysyłane do executora, w przeciwnym przypadku z listy id zadań, które są w executorze usuwane jest to id.

Metody submitForExecution i startNewJobOrRemoveIdFromRunningJobs są synchronizowane aby uniknąć sytuacji gdy wykonają się jednocześnie dla jakiegoś id i np startNewJobOrRemoveIdFromRunningJobs sprawdzi, że kolejka X jest pusta, następnie submitForExecution doda do tej kolejki nowego job'a, a wtedy startNewJobOrRemoveIdFromRunningJobs usunie X z listy Id jobów wysłanych do executora. Efektem było by to, że kolejka X by rosła i kolejne joby z niej nie były by startowane.

Zakładając, że nasz executor wykonuje jedno zadanie na raz i przychodzą nam do wykonania joby o id (1a, 2a, 3a, 1b, 3b), gdzie id to pierwsza cyfra. W executorze mamy wykonujący się job 1a i w kolejce executora 2a 3a. W kolejkach managera 1b, 3b i w liście id jobów wysłanych do executora (1, 2, 3). Po zakończeniu się joba 1a executor wysyła sygnał. Manager sprawdza, że kolejka dla 1 zawiera job'a (1b) i pcha go do executora. W executorze wykonuje się 2a i w kolejce jest (3a, 1b). Executor kończy joba 2a wysyła sygnał. Manager sprawdza, że w kolejce dla id 2 nie ma jobów więc usuwa 2 z listy jobów wysłanych do executora (zostaje 1, 3) i w liście jobów do wysłania 3b. Jeśli teraz przyszedł by job 2b to ponieważ 2 nie jest na liście jobów wysłanych do executora, to zostanie on od razu do niego wysłany. I tak dalej
Oczywiście w dużym uproszczeniu, executor pewnie nie będzie wykonywał jednego joba na raz i nie koniecznie musi wykonywać je po kolei.

Pozdrawiam



Wyślij zaproszenie do