Wątki

Jest to swobodne tłumaczenie moim zdaniem najciekawszych fragmentów tego artykułu. Co nie zmienia faktu, iż zgadzam się z tezami tu przedstawionymi. ;)

Ostrzeżenie

W większości przypadków należy wystrzegać się używania wątków. Są bardzo skomplikowane w użyciu. Wiele bibliotek nie jest przystosowanych do wielowątkowości. Nawet sam CPython nie jest przystosowany do używania wielu wątków, stąd posiada coś takiego jak GIL (Global Interpreter Lock), czyli tylko jeden wątek ma dostęp do interpretera w danym czasie. Rozszerzenia Pythona napisane w C mogą zwalniać tę blokadę używając dedykowanych funkcji API, jak również ponownie ją zakładać przy ponownym odwołaniu do interpretera. Wątki generalnie nie skalują się dobrze (mam na myśli ilości rzędu 50-100 wątków). Każdy z wątków alokuje własną stertę, która zajmuje około 1 MB. Wątki są podatne na różne subtelne, trudne do zdiagnozowania problemy, ponieważ każdy z wątków może dokonywać zmian w danych innego. Np. załóżmy iż w tym przykładzie:

if key in some_dictionary:
    some_dictionary[key] += 1
else:
    some_dictionary[key] = 1

drugi wątek przejął kontrolę zaraz za wykonaniem sprawdzenia if key in. Jeśli klucz był w słowniku, to test ten zwróci True i sterowanie przejdzie do pierwszej gałęzi if-a. Inny wątek w międzyczasie może usunąć klucz ze słownika powodując, iż += 1 się nie wykona poprawnie. Można ten problem rozwiązać blokując dostęp do danych, ale wtedy mamy dodatkowy narzut na blokowanie i zwalnianie, nawet gdy kolizje nie są zbyt częste. Poza tym ciężko jest przetestować czy taki kod działa prawidłowo. Najlepszym podejściem w używaniu wątków jest nie współdzielenie danych w ogóle, ewentualnie współdzielenie danych tylko do odczytu lub ostatecznie korzystanie z blokad, co w Pythonie znacznie ułatwia klasa Queue (o tym za chwilę).

Reaktory i wielozadaniowość kooperacyjna

Innym rozwiązaniem, bardziej adekwatnym dla wielozadaniowości I/O, jest użycie dyspozytora (ang. dispatcher), który sprawdza czy jakieś dane dotarły z sieci czy np. zostały wprowadzone poprzez GUI. Jeśli tak, to przekazuje te dane do handlera oczekującego na zdarzenie. Gdy zdarzenie zostanie obsłużone handler zwraca sterowanie do dyspozytora, który pobiera następne zdarzenie. Takie podejście zwykle wymaga sporych zmian w kodzie, ponieważ handler musi jednoznacznie określić, kiedy zwraca sterowanie. Jest to nazywane właśnie "wielozadaniowością kooperacyjną" [1] w przeciwieństwie do "wielozadaniowości z wywłaszczaniem" (ang. preemptive multitasking), gdzie sterowanie może zostać przekazane do innego wątku w dowolnym momencie. Python w bibliotece standardowej zawiera moduły asyncore i asynchat, które implementują właśnie wielozadaniowość kooperacyjną, znaną również jako wzorzec reaktora. "async" jest to skrót od "asynchroniczny" (ang. asynchronous), ponieważ odseparowany jest moment pobierania danych, od wykonania na nich jakiejś akcji. Allegra [2] jest względnie nową biblioteką, ciągle rozwijaną, która ma rozszerzyć funkcjonalność biblioteki standardowej Pythona. Twisted [3] jest najbardziej znaną asynchroniczną biblioteką w świecie Pythona. Generalnie ludzie uważają, że użycie bibliotek asynchronicznych jest trudne. Kolejnym rozwiązaniem jest Stackless Python [4]. Korzysta on z zupełnie innego podejścia do wątków, które pozwala na uruchomienie nawet dziesiątek tysięcy wątków, lecz z pewnymi ograniczeniami, co do możliwości ich wykorzystania. Ograniczenia te przeważnie dotyczą rozszerzeń w C, a nie programów pythonowych. Pamiętaj, że Stackless nie zmienia podstawowych zasad problemu. Jeśli funkcja zostanie zablokowana (oczekując na I/O lub wykonując jakieś obliczenia) wtedy nadal musimy użyć wątku systemowego lub asynchronicznego I/O. Stakless nam to jedynie ułatwia. Poza potrzebą zmiany podejścia do programowania, jest jeszcze problem małej ilości kodu zaprojektowanego dla takich systemów. A zatem może się to skończyć dużymi przeróbkami istniejących rozwiązań, aby zaadaptować je do naszych potrzeb.

Tworzenie wątków

Oto funkcja, którą chcemy uruchomić w osobnym wątku:

import time

def oblicz(x):
    time.sleep(x)
    return x * x

W prawdziwym programie mogłoby to być wysłanie żądania http, uruchomienie innego programu, połączenie z bazą danych itp. Powszechnym sposobem na uruchomienie nowego wątku jest dziedziczenie po klasie threading.Thread. Ten sposób wymaga dwóch kroków: utworzenie obiektu i wywołanie jego metody start. Metoda start tworzy nowy wątek i wywołuje swoją własną metodę run. Każda klasa musi implementować swoją własną wersję metody run. W poniższym przykładzie uruchamiamy trzy wątki, każdy z inną wartością do obliczenia. Zauważ, że wątek główny kończy się zanim wątki "obliczeniowe" skończą swoją pracę. O ile wątki te nie zostały oznaczone jako "demony" (ang. daemons) Python nie zakończy pracy dopóki wszystkie wątki nie zakończą działania.

import time
import datetime
import threading

def oblicz(x):
    time.sleep(x)
    return x * x

# dodajemy stempelek czasowy do komunikatu
def log(message):
    now = datetime.datetime.now().strftime("%H:%M:%S")
    print "%s %s" % (now, message)

# kazde zadanie obliczeniowe uruchamiane jest w osobnym watku
class WatekOblicz(threading.Thread):
    def __init__(self, value):
        threading.Thread.__init__(self)
        self.value = value
    def run(self):
        result = oblicz(self.value)
        log("%s -> %s" % (self.value, result))

def main():
    log("uruchamiam watek glowny")
    WatekOblicz(3).start()
    WatekOblicz(2.5).start()
    WatekOblicz(1).start()
    log("koniec watku glownego")

if __name__ == "__main__":
    main()

Oto wyjście tego programu:

14:48:25 uruchamiam watek glowny
14:48:25 koniec watku glownego
14:48:26 1 -> 1
14:48:27 2.5 -> 6.25
14:48:28 3 -> 9

Pule wątków i kolejki

Przy uruchamianiu nowych wątków zawsze występuje pewien narzut, który oczywiście się z czasem sumuje. Często należy uwzględniać sytuację ograniczonych zasobów. Gdy np. wątek łączy się ze zdalnym serwerem, możesz chcieć ograniczyć program do wykonywania tylko trzech równoległych żądań. Zwykle rozwiązuje się ten problem poprzez wykorzystanie puli wątków. Są to wątki oczekujące na żądania. Inny wątek wysyła żądanie do puli wątków. Jeden z wątków w puli pobiera je i wykonuje. Po zakończeniu oczekuje na nowe żądanie. Kluczową częścią tego rozwiązania jest oczekiwanie na żądanie. Realizuje się to przy pomocy obiektu Queue.Queue, który pozwala na uzyskanie bezpiecznej komunikacji pomiędzy wątkami. Powtórzę to jeszcze raz: używaj Queue do komunikacji między wątkami. Nie musisz się wtedy przejmować blokadami i innymi niskopoziomowymi technikami. Queue wykona całą robotę za Ciebie. Queue implementuje metody put i get. Są one bezpieczne dla wątków, co oznacza, że każdy wątek może dodawać i pobierać dane z tej kolejki nie powodując żadnych konfliktów. Domyślnie kolejka ma nieograniczoną wielkość i wątek może po prostu dodać coś i kontynuować pracę. Jeśli kolejka jest pusta, wtedy wywołanie metody get w wątku spowoduje jego zablokowanie, aż coś zostanie wstawione do kolejki. Jeśli kolejka zawiera jakiś obiekt, wtedy wywołanie get powoduje pobranie go. Tylko i wyłącznie jeden wątek będzie mógł pobrać ten obiekt. Koncepcja Queue opiera się na tym, iż występują producenci i konsumenci. Producenci wstawiają coś do kolejki, a konsumenci coś pobierają. Konsumenci zawsze czekają, aż coś będzie gotowe i tylko jeden z nich może to coś pobrać z kolejki. Jest tu tylko jeden skomplikowany punkt. Skąd wątek ma wiedzieć kiedy zakończyć pracę? Powiedziałem "gdy zakończy pracę [wątek], to czeka na następne żądanie", ale jednocześnie powiedziałem "Python nie zakończy pracy, dopóki wszystkie wątki nie zakończą swojego działania". Moglibyśmy oznaczyć wątek jako demon, ale to by spowodowało, iż zakończyłby działanie natychmiast z momentem zakończenia działania wszystkich wątków nie będących demonami. Mogłoby to nastąpić nawet w trakcie wykonywania jakiegoś zadania. Jak rozwiążemy ten problem? Powiadomimy wszystkie wątki, aby zakończyły swe działanie. Wykorzystamy do tego specjalny rodzaj żądania. Kiedy wątek otrzyma je, wtedy będzie wiedział, że ma zakończyć pracę. Wykorzystam obiekt None do tego celu, ale może to być coś zupełnie innego. Wybór należy do Ciebie. Musimy wysłać tyle obiektów kończących wątki ile wątków uruchomiliśmy, czyli dla każdego wątku oddzielny.

Oto przykład puli trzech wątków obsługujących pięć żądań.

import time
import datetime
import threading
import Queue

def log(message):
    now = datetime.datetime.now().strftime("%H:%M:%S")
    print "%s %s" % (now, message)

def oblicz(x):
    time.sleep(x)
    return x * x

# Watki w puli oczekujace na zadania w kolejce ``kolejka_zadan``
class WatekOblicz(threading.Thread):
    def __init__(self, id, kolejka_zadan):
        threading.Thread.__init__(self, name="WatekOblicz-%d" % (id,))
        self.kolejka_zadan = kolejka_zadan
    def run(self):
        while True:
            # watek sie blokuje w oczekiwaniu az cos trafi do kolejki
            req = self.kolejka_zadan.get()
            if req is None:
                # Nie ma nic wiecej do przetwarzania, wiec konczymy
                break
            result = oblicz(req)
            log("%s %s -> %s" % (self.getName(), req, result))

def main():
    log("uruchamiam watek glowny")
    kolejka_zadan = Queue.Queue()
    # inicjalizujemy pule watkow z trzema watkami "obliczeniowymi"
    N_liczba_watkow = 3
    for i in range(N_liczba_watkow):
        WatekOblicz(i, kolejka_zadan).start()

    # wrzucamy 5 zadan
    kolejka_zadan.put(4)
    kolejka_zadan.put(5)
    kolejka_zadan.put(3)
    kolejka_zadan.put(1.5)
    kolejka_zadan.put(2.2)

    # wysylamy zadania zakonczenia przetwarzania do wszystkich watkow
    for i in range(N_liczba_watkow):
        kolejka_zadan.put(None)
    log("koniec watku glownego.")

if __name__ == "__main__":
    main()

i po uruchomieniu mamy:

14:49:57 uruchamiam watek glowny
14:49:57 koniec watku glownego.
14:50:00 WatekOblicz-2 3 -> 9
14:50:01 WatekOblicz-0 4 -> 16
14:50:01 WatekOblicz-2 1.5 -> 2.25
14:50:02 WatekOblicz-1 5 -> 25
14:50:03 WatekOblicz-0 2.2 -> 4.84

W Pythonie 2.5 klasa Queue została wzbogacona o dwie dodatkowe metody: task_done oraz join, które pozwalają nam nieco uzupełnić powyższe rozwiązanie. Czasem przed kontynuowaniem pracy program musi mieć pewność, iż wątki całkowicie zakończyły swoją działalność. Metoda task_done pozwala nam explicit sygnalizować zakończenie bieżącego zadania, natomiast metoda join blokuje się aż do momentu zakończenia wszystkich zadań wstawionych do kolejki. Czyli ostatecznie mamy:

import time
import datetime
import threading
import Queue

def log(message):
    now = datetime.datetime.now().strftime("%H:%M:%S")
    print "%s %s" % (now, message)

def oblicz(x):
    time.sleep(x)
    return x * x

# Watki w puli oczekujace na zadania w kolejce ``kolejka_zadan``
class WatekOblicz(threading.Thread):
    def __init__(self, id, kolejka_zadan):
        threading.Thread.__init__(self, name="WatekOblicz-%d" % (id,))
        self.kolejka_zadan = kolejka_zadan
    def run(self):
        while True:
            # watek sie blokuje w oczekiwaniu az cos trafi do kolejki
            req = self.kolejka_zadan.get()
            if req is None:
                # Nie ma nic wiecej do przetwarzania, wiec konczymy
                self.kolejka_zadan.task_done()
                break
            value, kolejka_rezultatow = req
            result = oblicz(value)
            log("%s %s -> %s" % (self.getName(), value, result))
            kolejka_rezultatow.put(result)
            self.kolejka_zadan.task_done()

kolejka_zadan = Queue.Queue()

def threaded_sum(values):
    nsum = 0.0
    kolejka_rezultatow = Queue.Queue()
    for value in values:
        kolejka_zadan.put((value, kolejka_rezultatow))
    # pobieramy wyniki; kolejnosc odpowiedzi nie musi byc identyczna jak zadan!
    # uzycie "_" jest konwencja oznaczajaca "wartosc tej zmiennej mnie nie interesuje"
    for _ in values:
        nsum += kolejka_rezultatow.get()
    return nsum

def main():
    log("uruchamiam watek glowny")
    # inicjalizujemy pule watkow z trzema watkami "obliczeniowymi"
    N_liczba_watkow = 3
    for i in range(N_liczba_watkow):
        WatekOblicz(i, kolejka_zadan).start()

    # wrzucamy 5 zadan
    result = threaded_sum( (4, 5, 3, 1.5, 2.2) )
    log("suma wynosi: %f" % (result,))

    # wysylamy zadania zakonczenia przetwarzania do wszystkich watkow
    for i in range(N_liczba_watkow):
        kolejka_zadan.put(None)
    kolejka_zadan.join()
    log("koniec watku glownego.")

if __name__ == "__main__":
    main()

Dwustronna komunikacja

W poprzednim przykładzie dane zostały przekazane z głównego wątku do wątków "obliczeniowych", ale nic nie zostało zwrócone do wątku głównego. W prawdziwym kodzie jest to raczej mało użyteczne. Przeważnie wątek wywołujący chce zrobić jakiś użytek z rezultatu przetwarzania innych wątków. Możemy to naprawić używając drugiej kolejki, czyli drugiej instancji klasy Queue. Wysyłając żądanie wysyłamy wartość do obliczeń, a także kolejkę, którą wątek może wykorzystać do zwrócenia rezultatu. Gdy obliczenia zostaną wykonane odpowiedź zostaje przekazana poprzez tę drugą kolejkę zwrotną.

Łatwej jest przedstawić to w działającym kodzie, niż dalej tłumaczyć:

import time
import datetime
import threading
import Queue

def log(message):
    now = datetime.datetime.now().strftime("%H:%M:%S")
    print "%s %s" % (now, message)

def oblicz(x):
    time.sleep(x)
    return x * x

# Watki w puli oczekujace na zadania w kolejce ``kolejka_zadan``
class WatekOblicz(threading.Thread):
    def __init__(self, id, kolejka_zadan):
        threading.Thread.__init__(self, name="WatekOblicz-%d" % (id,))
        self.kolejka_zadan = kolejka_zadan
    def run(self):
        while True:
            # watek sie blokuje w oczekiwaniu az cos trafi do kolejki
            req = self.kolejka_zadan.get()
            if req is None:
                # Nie ma nic wiecej do przetwarzania, wiec konczymy
                break
            value, kolejka_rezultatow = req
            result = oblicz(value)
            log("%s %s -> %s" % (self.getName(), value, result))
            kolejka_rezultatow.put(result)

kolejka_zadan = Queue.Queue()

def threaded_sum(values):
    nsum = 0.0
    kolejka_rezultatow = Queue.Queue()
    for value in values:
        kolejka_zadan.put((value, kolejka_rezultatow))
    # pobieramy wyniki; kolejnosc odpowiedzi nie musi byc identyczna jak zadan!
    # uzycie "_" jest konwencja oznaczajaca "wartosc tej zmiennej mnie nie interesuje"
    for _ in values:
        nsum += kolejka_rezultatow.get()
    return nsum

def main():
    log("uruchamiam watek glowny")
    # inicjalizujemy pule watkow z trzema watkami "obliczeniowymi"
    N_liczba_watkow = 3
    for i in range(N_liczba_watkow):
        WatekOblicz(i, kolejka_zadan).start()

    # wrzucamy 5 zadan
    result = threaded_sum( (4, 5, 3, 1.5, 2.2) )
    log("suma wynosi: %f" % (result,))

    # wysylamy zadania zakonczenia przetwarzania do wszystkich watkow
    for i in range(N_liczba_watkow):
        kolejka_zadan.put(None)
    log("koniec watku glownego.")

if __name__ == "__main__":
    main()

a oto wyjście:

14:53:57 uruchamiam watek glowny
14:54:00 WatekOblicz-2 3 -> 9
14:54:01 WatekOblicz-0 4 -> 16
14:54:01 WatekOblicz-2 1.5 -> 2.25
14:54:02 WatekOblicz-1 5 -> 25
14:54:03 WatekOblicz-0 2.2 -> 4.84
14:54:03 suma wynosi: 57.090000
14:54:03 koniec watku glownego.

Pracując nad takim kodem często zdarzają się jakieś błędy. W przypadku wątków denerwujące jest, że są trudne do ubicia. Wciskanie Ctrl-C nie zawsze pozwala na ubicie programu Pythona z wieloma wątkami. Jeśli tak się dzieje, to wtedy wciskam Ctrl-Z aby przenieść bieżący proces w tło, a następnie wpisuję kill %%, aby wysłać łagodny sygnał kończący. Następnie fg, aby powrócić do procesu Pythona i tym razem Ctrl-C powinno zakończyć proces. Mógłbym też od razu wpisać kill -KILL %%, ale to zawsze wydaje się takie brutalne.

Zmienne wątków

Czasami potrzebne są zmienne lokalne dla danego wątku. Dla przykładu, zmienna errno w C jest zmienną lokalną wątku, często zaimplementowana jako makro przy użyciu sztuczek z #define. Wartość errno odzwierciedla ostatnią wartość błędu w danym wątku, a nie wartość błędu z jakiegoś innego wątku. Innym przykładem jest CherryPy, bazująca na wątkach biblioteka implementująca m. in. serwer http. Każde żądanie obsługiwane jest przez osobny wątek. Informacje związane z żądaniem i odpowiedzią przechowywane są w zmiennych lokalnych wątków udostępnianych poprzez cherrypy.request i cherrypy.response. Na przykład aby zmienić nagłówek content-type odpowiedzi, handler CherryPy mógłby wyglądać tak:

@expose
def spam(self, x):
    cherrypy.response.headers["Content-Type"] = "text/plain"
    return "Prosiles o %r" % (x,)

Utworzenie nowej zmiennej lokalnej wątku jest łatwe. Zanim przejdziemy do tego jak, przypominam że podobnie jak zmienne globalne (lub modułowe) są przeważnie złym rozwiązaniem. Bardzo rzadko są potrzebne, ale zdarza się że jednak są użyteczne. Co oznacza, że pokazanie sensownego przykładu też nie jest łatwe. Załóżmy, że nasze zadanie obliczeniowe ma błąd, który pojawia się, gdy wartość żądania jest większa od 3 i chcemy zdiagnozować ten problem poprzez odpisy do logów (lub komunikaty na standardowe wyjście). Jest kilka sposobów na zaimplementowanie tego. Zdecydowałem się na rozwiązanie z użyciem zmiennych lokalnych wątku. Uzależniłem działanie funkcji odpisującej do logu od zmiennej log wątku i dodałem kilka wywołań funkcji log w funkcji obliczeniowej. W wątku głównym funkcja log wyświetla komunikaty na standardowe wyjście.

def log_to_stdout(message):
    now = datetime.datetime.now().strftime("%H:%M:%S")
    print "%s %s" % (now, message)

def log_nothing(message):
    pass

local = threading.local()
# watek glowny wypisuje na standardowe wyjscie
local.log = log_to_stdout

def log(message):
    local.log(message)

def oblicz(x):
    log("obliczam %r" % (x,))
    time.sleep(x)
    result = x * x
    log("koncze obliczanie %r; wynik wynosi %r" % (x, result))
    return result

podczas gdy WatekOblicz domyślnie korzysta z funkcji log_nothing.

class WatekOblicz(threading.Thread):
    ...
    def run(self):
        local.log = log_nothing
        while True:
            ...

Wołający może zmienić domyślne zachowanie używając nowej opcji "debug", która aktywuje funkcję odpisującą do logu dla danego wątku z puli.

def run(self):
    local.log = log_nothing
    while True:
        # watek sie blokuje w oczekiwaniu az cos trafi do kolejki
        req = self.kolejka_zadan.get()
        if req is None:
            # Nie ma nic wiecej do przetwarzania, wiec konczymy
            break
        value, kolejka_rezultatow, debug = req
        if debug:
            local.log = log_to_stdout
        else:
            local.log = log_nothing
        result = oblicz(value)
        log("%s %s -> %s" % (self.getName(), value, result))
        kolejka_rezultatow.put(result)

Aby aktywować komunikaty debugujące, gdy żądanie jest większe od 3, dodaję flagę "debug" jako trzecią wartość wstawianą do kolejki żądań poprzez metodę put (Od tego momentu zacząłem myśleć o użyciu obiektu klasy z atrybutami value, kolejka_zadan i debug i przesyłania go do kolejki zamiast krotki, lecz pozostawiam to jako zadanie dla czytelnika).

def threaded_sum(values):
    nsum = 0.0
    kolejka_rezultatow = Queue.Queue()
    for value in values:
        debug = (value > 3)
        kolejka_zadan.put((value, kolejka_rezultatow, debug))
    # pobieramy wyniki; kolejnosc odpowiedzi nie musi byc identyczna jak zadan!
    # uzycie "_" jest konwencja oznaczajaca "wartosc tej zmiennej mnie nie interesuje"
    for _ in values:
        nsum += kolejka_rezultatow.get()
    return nsum

Zbierzmy wszystkie zmiany w jeden program:

import time
import datetime
import threading
import Queue

def log_to_stdout(message):
    now = datetime.datetime.now().strftime("%H:%M:%S")
    print "%s %s" % (now, message)

def log_nothing(message):
    pass

local = threading.local()
# watek glowny wypisuje na standardowe wyjscie
local.log = log_to_stdout

def log(message):
    local.log(message)

def oblicz(x):
    log("obliczam %r" % (x,))
    time.sleep(x)
    result = x * x
    log("koncze obliczanie %r; wynik wynosi %r" % (x, result))
    return result

# Watki w puli oczekujace na zadania w kolejce ``kolejka_zadan``
class WatekOblicz(threading.Thread):
    def __init__(self, id, kolejka_zadan):
        threading.Thread.__init__(self, name="WatekOblicz-%d" % (id,))
        self.kolejka_zadan = kolejka_zadan
    def run(self):
        local.log = log_nothing
        while True:
            # watek sie blokuje w oczekiwaniu az cos trafi do kolejki
            req = self.kolejka_zadan.get()
            if req is None:
                # Nie ma nic wiecej do przetwarzania, wiec konczymy
                break
            value, kolejka_rezultatow, debug = req
            if debug:
                local.log = log_to_stdout
            else:
                local.log = log_nothing
            result = oblicz(value)
            log("%s %s -> %s" % (self.getName(), value, result))
            kolejka_rezultatow.put(result)

kolejka_zadan = Queue.Queue()

def threaded_sum(values):
    nsum = 0.0
    kolejka_rezultatow = Queue.Queue()
    for value in values:
        debug = (value > 3)
        kolejka_zadan.put((value, kolejka_rezultatow, debug))
    # pobieramy wyniki; kolejnosc odpowiedzi nie musi byc identyczna jak zadan!
    # uzycie "_" jest konwencja oznaczajaca "wartosc tej zmiennej mnie nie interesuje"
    for _ in values:
        nsum += kolejka_rezultatow.get()
    return nsum

def main():
    log("uruchamiam watek glowny")
    # inicjalizujemy pule watkow z trzema watkami "obliczeniowymi"
    N_liczba_watkow = 3
    for i in range(N_liczba_watkow):
        WatekOblicz(i, kolejka_zadan).start()

    # wrzucamy 5 zadan
    result = threaded_sum( (4, 5, 3, 1.5, 2.2) )
    log("suma wynosi: %f" % (result,))

    # wysylamy zadania zakonczenia przetwarzania do wszystkich watkow
    for i in range(N_liczba_watkow):
        kolejka_zadan.put(None)
    log("koniec watku glownego.")

if __name__ == "__main__":
    main()

Po uruchomieniu otrzymujemy:

14:56:49 uruchamiam watek glowny
14:56:49 obliczam 4
14:56:49 obliczam 5
14:56:53 koncze obliczanie 4; wynik wynosi 16
14:56:53 WatekOblicz-0 4 -> 16
14:56:54 koncze obliczanie 5; wynik wynosi 25
14:56:54 WatekOblicz-1 5 -> 25
14:56:56 suma wynosi: 57.090000
14:56:56 koniec watku glownego.

Blokady

Czasami użycie Queue wiąże się ze zbyt dużym narzutem. Queue używa blokad do synchronizacji i jest sposobem na umożliwienie komunikacji między wątkami. Ale co jeśli nie potrzebujemy tego wszystkiego? Na przykład instrukcja print wysyła komunikaty na standardowe wyjście, które jakby nie było jest zasobem współdzielonym. Możliwa jest sytuacja, w której dwa wątki będą wysyłały komunikaty w tym samym czasie, co spowoduje ich całkowite wymieszanie. Przy wysłaniu pojedynczego stringa to się nie może zdarzyć dzięki GIL-owi, ale już może się zdarzyć w przypadku komunikatów oddzielonych przecinkami, jak np.:

print "a=", a(), "b=", b()

ponieważ inny wątek może przejąć kontrolę nad standardowym wyjściem pomiędzy wywołaniem a(), a momentem wysłania komunikatu na standardowe wyjście. Nasza funkcja log_to_stdout została napisana tak, iż wyjście zawsze stanowi jeden string. W takim wypadku nie musieliśmy się martwić o blokowanie standardowego wyjścia.

def log_to_stdout(message):
    now = datetime.datetime.now().strftime("%H:%M:%S")
    print "%s %s" % (now, message)

Jednak załóżmy, że chcielibyśmy tę funkcję napisać tak:

def log_to_stdout(message):
    now = datetime.datetime.now().strftime("%H:%M:%S")
    print now, message

Zmiana wątku mogłaby nastąpić pomiędzy wypisaniem "now" i "message". Moglibyśmy użyć Queue i utworzyć dedykowany wątek do pisania na standardowe wyjście, ale jak już wspomniałem wydaje się to być zbyt duży i raczej zbędny narzut. W zamian możemy zastosować blokadę, która zagwarantuje, iż dwa wywołania log_to_stdout nie spowodują wymieszania komunikatów. Blokada jest zamknięta lub otwarta. Jeśli jest otwarta, wówczas wywołanie metody acquire zmienia jej stan na zamknięty. Jeśli jest zamknięta, wtedy wywołanie metody acquire blokuje wywołujący wątek, aż blokada zostanie zwolniona (i nikt inny nie zablokuje jej przed bieżącym wątkiem). Wywołanie metody release zwalnia blokadę.

stdout_lock = threading.Lock()
def log_to_stdout(message):
    now = datetime.datetime.now().strftime("%H:%M:%S")
    stdout_lock.acquire()
    try:
        print now, message
    finally:
        stdout_lock.release()

Od Pythona 2.5 dostępna jest instrukcja with, która została zaprojektowana do takiego właśnie zarządzania zasobami. Specjalna metoda __enter__ obiektu jest wywoływana na początku bloku kodu z with, a jego metoda __exit__ jest wywoływana na końcu bloku, nawet gdy w trakcie zostanie rzucony jakiś wyjątek. W Pythonie 2.5 obiekt blokady implementuje protokół with. Metoda __enter__ zamyka blokadę a __exit__ ją zwalnia. Czyli ostatecznie możemy to zapisać tak:

from __future__ import with_statement

stdout_lock = threading.Lock()
def log_to_stdout(message):
    now = datetime.datetime.now().strftime("%H:%M:%S")
    with stdout_lock:
        print now, message

Użycie bibliotek jednowątkowych

Niektóre biblioteki nie są przystosowane do wątków. Istnieje kilka rozwiązań tego problemu w zależności od tego co owo "nieprzystosowanie" oznacza. Jeśli biblioteka używa zmiennych globalnych (modułowych), to jest ona po prostu źle napisana. Natomiast biblioteki takie jak Tk czy SQLite nie są przystosowane do wątków w tym sensie, iż dwa wątki nie mogą wywoływać funkcji bibliotecznych w tym samym czasie. Taki problem możemy rozwiązać na dwa sposoby: 1) otoczyć blokadami wszystkie wywołania biblioteczne lub 2) wysyłać komunikaty do specjalnego wątku dedykowanego do obsługi danej biblioteki. Zamiast używać, którejś z tych skomplikowanych bibliotek, zaprezentuję przykład "nieodpornej" na wątki biblioteki, która zlicza ile razy została wywołana.

call_count = 0
def double(x):
    global call_count
    call_count += 1
    return x * 2

Chciałbym wywołać ją w ten sposób:

print "5 doubled is", call_in_thread(the_thread, double, 5)

A oto jak zaimplementować to używając Queue. W poniższym przykładzie powołamy do życia kilka wątków, które realizując swoje zadania muszą skorzystać z "nieodpornej" na wątki funkcji double.

from __future__ import with_statement

import threading
import Queue
import time
import datetime
import functools

stdout_lock = threading.Lock()
def log(message):
    now = datetime.datetime.now().strftime("%H:%M:%S")
    with stdout_lock:
        print now, message

class SingleThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.queue = Queue.Queue()
    def run(self):
        while True:
            req = self.queue.get()
            if req is None:
                break
            f, kolejka_rezultatow = req
            kolejka_rezultatow.put(f())

    def quit(self):
        self.queue.put(None)

single_thread = SingleThread()
single_thread.start()

# Cala magia wystepuje tutaj. Zakladamy, ze wszystkie watki maja atrybut "queue",
# ktory uzywamy do przekazywania nowych zadan.
# Zadania maja postac (callable, kolejka_rezultatow). Callable jest obiektem
# wykonywalnym uruchamianym w podanym watku.
def call_in_thread(T, f):
    kolejka_rezultatow = Queue.Queue()
    T.queue.put( (f, kolejka_rezultatow) )
    return kolejka_rezultatow.get()

call_count = 0
def double(x):
    global call_count
    call_count += 1
    return x * 2

class WatekOblicz(threading.Thread):
    def __init__(self, value):
        threading.Thread.__init__(self)
        self.value = value
    def run(self):
        # functools.partial jest dostepna od Pythona 2.5.
        # W Pythonie 2.4 mozemy to zapisac tak:  lambda : double(self.value)
        response = call_in_thread(single_thread,
                                  functools.partial(double, self.value))
        assert response == self.value * 2
        log("success with %s" % self.value)

def main():
    for i in (5, 2, 3.4, 1):
        WatekOblicz(i).start()

    time.sleep(2)
    print "Double bylo wywolane", call_count, "razy"
    single_thread.quit()

if __name__ == "__main__":
    main()

A oto jak możemy to samo zaimplementować używając blokad. Ten kod jest ewidentnie prostszy:

from __future__ import with_statement

import threading
import Queue
import time
import datetime
import functools

stdout_lock = threading.Lock()
def log(message):
    now = datetime.datetime.now().strftime("%H:%M:%S")
    with stdout_lock:
        print now, message

single_thread = threading.Lock()

def call_in_thread(T, f):
    # T musi byc obiektem blokady, a f obiektem wywolywalnym (callable).
    # Zamykamy bloade i wywolujemy podana funkcje. Uruchamiana jest ona
    # w biezacym watku.
    with T:
        return f()

call_count = 0
def double(x):
    global call_count
    call_count += 1
    return x * 2

class WatekOblicz(threading.Thread):
    def __init__(self, value):
        threading.Thread.__init__(self)
        self.value = value
    def run(self):
        # functools.partial jest dostepna od Pythona 2.5.
        # W Pythonie 2.4 mozemy to zapisac tak:  lambda : double(self.value)
        response = call_in_thread(single_thread, functools.partial(double, self.value))
        assert response == self.value * 2
        log("success with %s" % self.value)

def main():
    for i in (5, 2, 3.4, 1):
        WatekOblicz(i).start()

    time.sleep(2)
    print "Double bylo wywolane", call_count, "razy"

if __name__ == "__main__":
    main()

[1]ang. collaborative lub cooperative lub non-preemptive multitasking
[2]http://pypi.python.org/pypi/Allegra
[3]http://twistedmatrix.com/trac/
[4]http://www.stackless.com/