Python как получить данные из потока

Многопоточность в Python: модуль threading

Современное программное обеспечение проектируется так, что его функции и задачи могут выполняться параллельно. Python предоставляет программисту мощный набор инструментов для работы с потоками в библиотеке threading.

Как работает многопоточность

Многопоточность — это выполнение программы сразу в нескольких потоках, которые выполняют её функции одновременно.

Многопоточное программирование можно спутать с мультипроцессорным. На самом деле их концепции очень похожи, но если в первом случае программа работает с потоками, то в другом — с процессами. Разница между потоками и процессами проста: потоки имеют общую память, поэтому изменения в одном потоке видны в других, а процессы используют разные области памяти.

На самом деле, если рассмотреть одноядерный процессор, операции из разных потоков не выполняются параллельно. Одно ядро может выполнить только одну операцию в единицу времени, но так как операции выполняются очень быстро, создается ощущение параллельного выполнения, псевдопараллельность. По-настоящему параллельно программы могут работать только на многоядерных процессорах, где каждое ядро может выполнять операции независимо от других.

Отличным примером использования многопоточности является программа, где отрисовка графического интерфейса и обработка ввода пользователя управляются разными потоками. Если бы обе задачи были помещены в один поток, отрисовка интерфейса прерывалась бы каждый раз, когда программа получает ввод от пользователя. Использование двух потоков позволяет сделать выполнение этих функций независимым друг от друга.

Однако при выполнении многопоточной программы на одноядерном процессоре, её производительность будет ниже, чем если бы она была написана в один поток. Это происходит потому, что на реализацию и управление потоками тратится дополнительная память.

Можно ли считать threading многопоточным?

В Python используется GIL (Global Interpreter Lock), который однопоточный. Все потоки, которые создаются с помощью threading будут работать внутри потока GIL. В связи с этим они будут обрабатываться только одним ядром. Ни о какой работе одновременно на нескольких физических ядрах процессора не может быть и речи.

Но без него никуда не деться, если вам нужно выполнять несколько задач одновременно:

  • Обрабатывать нажатие кнопки в графическом интерфейсе, например с помощью Tkinter. Если по нажатию кнопки надо производить много действий, которые требуют времени, то эти действия надо выполнять в другом потоке, чтобы графический интерфейс не подвис на это время. Соответственно кнопки надо блокировать, а как поток завершит вычисления — обратно разблокировать.
  • Если наша программа работает одновременно с несколькими подключенными устройствами. Они могут быть подключены к разным COM-портам.
  • Если мы загружаем файлы из сети и одновременно обрабатываем уже загруженные.
  • И так далее…

В чём преимущества тогда модуля Threading по сравнению с Multiprocessing? Рассмотрим их:

  • Простота использования.
  • Проще передавать данные из потока в основную программу. Вообще можно даже использовать глобальные переменные. Но при этом надо правильно проектировать программу, чтобы не было ошибок, связанных с «Состоянием гонки», которые мы рассмотрим ниже.

Подключение библиотеки threading

Threading – это стандартный модуль, который поставляется вместе с интерпретатором. Программисту не нужно устанавливать его, достаточно просто подключить модуль с помощью команды:

Работать с потоками можно, создавая экземпляры класса Thread. Чтобы создать отдельный, поток нужно создать экземпляр класса и применить к нему метод start() . Вот пример:

Здесь мы функцию mydef запустили в отдельном потоке. В качестве аргументов функции передали числа 1 и 2.

threading.Thread()

Эта конструкция позволяет создать новый поток, создав экземпляр класса Thread. Вот как выглядят её аргументы:

Она принимает аргументы:

Рассмотрим их подробнее:

  • group. Имеет значение None, зарезервирована для будущего расширения при реализации класса ThreadGroup.
  • target. Это функция, которая выполняется в потоке с помощью метода run(), если передано значение None, ничего не вызывается.
  • name. Это имя потока, по умолчанию оно принимает значение «Thread-X», где X – десятичное число. Программист может задать имя вручную.
  • args. Это кортеж, в котором хранятся аргументы, передаваемые в вызываемую функцию.
  • kwargs. Это словарь, в котором хранятся аргументы, передаваемые в функцию.
  • daemon. Это параметр, который устанавливает, является ли поток демоническим. По умолчанию имеет значение None, тогда свойство daemonic наследуется от текущего потока. Программист может самостоятельно установить значение параметра.

Демоны

Демонами называют процессы, которые работают в фоновом режиме. В Python для демона есть более конкретное значение: демонический поток или поток демона. В отличие от обычных потоков поток демона автоматически завершает свою работу при закрытии программы. Иными словами, программа не будет ожидать завершения демонического потока, при её закрытии эти потоки уничтожаются, в каком бы состоянии они не находились.

Демонические потоки используют для выполнения операций, выполняемых в бесконечном цикле. В других случаях обычно используют простые потоки, которые задерживают закрытие программы, пока не завершат выполнение всех операций. Использование демонических потоков позволяет операции в фоновом режиме, которые обычно не связаны с изменением и сохранением долгосрочных данных.

Например, если программа полностью перезаписывает содержимое файла, и механизм перезаписи реализован в демоническом потоке, то при неожиданном выходе из программы данные потеряются.

В демонические потоки часто помещают функции по рисованию графического интерфейса. Рисование интерфейса — бесконечный процесс, который завершается сразу после выхода из программы, если просто поместить его в обычный поток, это будет препятствовать закрытию программы.

Методы для работы с потоками

Для создания и управления потоками используются различные методы класса Thread. С их помощью можно легко манипулировать сразу несколькими потоками и определять их поведение.

start()

Он используется для запуска созданного потока. После использования threading.Thread() создаётся новый поток, однако он неактивен. Для того чтобы он начал работу, используется метод start().

Здесь пока мы не вызвали метод start , функция myfunc не будет запущена.

Этот метод блокирует выполнение потока, который его вызвал, до тех пор пока не завершится поток, метод которого был вызван. То есть если в потоке thread1 был вызван метод потока thread2: thread2.join(), то поток thread1 будет приостановлен до тех пор, пока выполнение thread2 не завершится.

С помощью этого метода можно заставить программу дождаться завершения демонического потока. Например, если вызвать метод в основном потоке, то программа не завершится, пока не выполнится демонический поток.

У метода join() есть аргумент timeout . По умолчанию он имеет значение None, но программист может передать в него число с плавающей точкой.

Если передать в качестве аргумента число, то для метода join() установится время ожидания, когда оно истечёт, поток продолжит свою работу.

Например, thr1.join(100) означает, что будет ожидаться завершение выполнения потока thr1 не более 100 секунд.

Так как метод join() всегда возвращает None, чтобы проверить, успел ли полностью выполниться поток за указанный timeout, нужно проверить, выполняется ли поток с помощью метода is_alive().

Здесь мы делаем поток демоническим, чтобы программа не дожидалась окончания выполнения функции. Подключаем модуль time, для того, чтобы сделать задержку в функции на 2.5 секунд. После старта потока, мы приостанавливаем основной поток на 0.125 секунд. Потом выполняем проверку is_alive(). Если выведет True, значит поток не закончил выполнение за 0.125 секунды.

В этом методе описываются операции, выполняемые потоком. Он используется, когда явно создается экземпляр класса. Пример:

Остановка потока

Бывают ситуации, когда требуется остановить поток, который работает в фоне. Допустим у нас поток у которого в функции run бесконечный цикл. В основной программе нам нужно его остановить. Тут самое простое — это создать некую переменную stop:

  • В бесконечном цикле делать постоянно её проверку и если она True, то завершать его.
  • Не использовать функции, которые могут блокировать выполнение на длительное время. Всегда использовать timeout.

Вот пример такой программы:

Здесь используем глобальную переменную stop. Когда нам нужно остановить поток, мы ей присваиваем значение True, а дальше просто ждём его завершения.

Состояние гонки

Состояние гонки или race condition – это ошибка, возникающая при неправильном проектировании многопоточной программы. Она возникает тогда, когда несколько потоков обращаются к одним и тем же данным. Например, переменная хранит число, которое пытаются одновременно изменить потоки thread1 и thread2, что приводит к непредсказуемым результатам или ошибке.

Распространена ситуация, когда один поток проверяет значение переменной на выполнение условия, чтобы совершить какое-то действие, но между проверкой условия и выполнением действия вмешивается второй поток, который изменяет значение переменной, что приводит к получению неправильных результатов, например:

Здесь мы создаём объект lock, с его помощью мы будем безопасно считывать и изменять данные. В качестве данных, которые мы будем блокировать в данном примере это одна переменная x. Далее показано безопасное изменение данных: вначале с помощью acquire дожидаемся своей очереди доступа к ним. Затем изменяем их (в нашем примере перезаписываем значение переменной с «Python 2» на «Python 3»). Далее выводим значение в консоль. После этого освобождаем доступ для других потоков. Если все потоки, которым нужен будет доступ к данным x будут использовать lock, то можно избежать «Состояния гонки».

deadlock

При использовании Lock возникает серьезная проблема, которая приводит к полной остановки работы программы. Если вызвать метод acquire(), а объект Lock уже заблокирован, то вызвавший acquire() поток будет ждать, пока заблокировавший объект поток не вызовет release().

Если один поток вызывает метод блокировки несколько раз подряд, то выполнение потока приостанавливается, пока он сам не вызовет release(). Однако он не может вызвать release, потому что его выполнение приостановлено, что означает бесконечную блокировку программы.

Самоблокировку можно предотвратить, если удалить лишний вызов acquire(), но это не всегда возможно. Самоблокировка может происходить из-за следующий вещей:

  • Возникновение ошибок, когда Lock остаётся заблокированным.
  • Неправильное проектирование программы, когда одна функция вызывается другой функцией, у которой отсутствует блокировка.

В случае возникновения ошибок достаточно воспользоваться конструкцией try-finally или оператором with.

Вот пример с with:

Конструкция try-finally позволяет удалять блокировку даже в случае возникновения ошибок, что позволяет избежать deadblock. Пример:

Конструкция try-finally гарантирует, что код в finally будет исполнен всегда, независимо от ошибок и результатов блока try.

Однако это не работает в случае самоблокировки из-за неправильного проектирования программы. Для этого был создан объект RLock.

RLock

Если Lock заблокирован, он блокирует любой поток, попытавшийся сделать то же самое, даже если этот поток и является владельцем блокировки в данный момент. Например, программист написал код:

Данный код будет работать, но его проблема заключается в том, что при вызове функции both_parts , в ней вызываются функции part1 и part2 . Между вызовами этих функций может получить доступ к данным какой-нибудь другой поток и их поменять. А что делать, если нужно избежать изменения другим потоком?

Чтобы решить проблему, нужно заблокировать lock1 и в both_parts , перепишем её:

Идея проста: внешняя both_parts блокирует поток на время выполнения функций part1 и part1 . Каждая из функций также блокирует поток для суммирования своей части объекта. Однако объект Lock не позволит этого сделать, этот код приведет к полному зависанию программы, потому что для Lock нет разницы, где в потоке был вызван acquire().

RLock блокирует поток только в том случае, если объект заблокирован другим потоком. Используя RLock, поток никогда не сможет заблокировать сам себя.

Использовать RLock нужно для управления вложенным доступом к разделяемым объектам. Чтобы решить возникшую проблему с Lock в коде выше, достаточно заменить строчку « lock1 = threading.Lock() » на « lock1 = threading.RLock() ».

Передача данных с помощью очередей (Queue)

Для передачи данных с помощью очередей используется класс Queue из библиотеки queue, который импортируется командной: «from queue import Queue».

Библиотеке queue содержит все необходимые инструменты для передачи данных между потоками и реализует нужные механизмы блокировки.

Класс Queue реализует очередь FIFO, который работает так: первый элемент, который пошел в очередь, первым и выйдет из неё. Эту очередь можно сравнить с вертикальной полой трубой, в которую сверху бросают элементы.

Queue имеет параметр maxsize, принимающий только целочисленные значения. Он указывает максимальное количество элементов, которое можно поместить в очередь. Когда максимум достигается, добавление в очередь элементов блокируется, пока в ней не освободится место. Если maxsize принимает значение <= 0, то очередь является бесконечной.

Для взаимодействия с очередями используется Event, объект модуля threading. С его помощью поток может выполнить нужные операции тогда, когда получит сигнал от другого потока. Кроме того, поток не обязательно должен приостанавливать свою работу на время ожидания сигнала.

Для передачи данных и работы с очередями используются методы (работают со всеми видами очередей, а не только с Queue):

qsize()

Возвращает примерный размер очереди. Важно понимать две вещи:

  • Если qsize() больше нуля, следующий метод get() всё равно может быть заблокирован.
  • Если qsize() меньше maxsize, следующий метод put() может быть заблокирован.

Это может возникнуть из-за того что к очереди могут обратиться другие потоки и получить/записать данные сразу после того как вы получили её размер.

empty()

Метод проверяет, содержится ли что-то в очереди. Если очередь пуста, возвращается True, если очередь содержит элементы, возвращается False.

Как и с методом qsize(), возврат True или False не гарантирует, что следующий метод put() или get() не будут заблокированы.

Проверяет, заполнена ли очередь. Если очередь заполнена, возвращает True, иначе возвращает False.

Как и в предыдущих методах, возврат True или False не даёт гарантий, что put() и get() не будут заблокированы.

Метод помещает новый объект в очередь, имеет обязательный аргумент item и два необязательных аргумента: block = True и timeout = None.

В зависимости от указанных аргументов, ожидание места в очереди будет вести себя по-разному:

  • Если аргумент block имеет значение True, а timeout — None, объект, который нужно загрузить в очередь, будет бесконечно ждать свободного места.
  • При timeout больше нуля, ожидание свободного места будет длиться не дольше указанного числа секунд, если за это время свободного места так и не появилось, возбудится исключение.
  • Если block имеет значение False, аргумент timeout игнорируется, и элемент можно поместить в очередь, только если есть свободное место, иначе сразу же возбуждается исключение.

Вот пример создания очереди на Python и добавления в неё элемента:

put_nowait()

Эквивалентно вызову put(item, False). То есть помещает элемент в очередь, только если есть место, иначе вызывает исключение.

Удаляет и возвращает элемент из очереди. Имеет два необязательных аргумента: block = True и timeout = None.

В зависимости от значений аргументов ожидание объекта ведёт себя по разному:

  • Если аргументы имеют значению по умолчанию, метод ожидает объект из очереди до тех пор, пока тот не станет доступен.
  • При timeout — положительное число, то объект из очереди ожидается определенное время, по истечении которого вызовется исключение.
  • Если block имеет значение False, элемент возвращается, только если он доступен, иначе вызывается исключение (аргумент timeout игнорируется).

Вот пример. Здесь мы добавляем строку в очередь. Затем мы её получаем и выводим в консоль:

get_nowait()

Эквивалентно вызову get(False).

task_done()

Этот метод работает в связке с методом join().

Метод показывает, что ранее поставленная задача была выполнена. После получения каждого элемента из очереди, допустим с помощью get(), нужно вызывать task_done(), чтобы уменьшить счётчик задач. Ниже описан метод join с примером

Если task_done() вызывается больше раз, чем количество элементов, помещенное в очередь, то возбуждается исключение ValueError.

Блокирует поток, пока все элементы очереди не будут получены и обработаны.

Каждый раз, когда в очередь добавляется новый элемент, увеличивается счетчик незавершённых задач. При вызове task_done(), счетчик уменьшается, показывая, что обработка элемента в очереди завершена и можно переходить к следующему. Когда счетчик равен нулю, с потока снимается блокировка.

Этот пример для того чтобы показать работу join и task_done. Здесь всё происходит в одном потоке. Обычно пишет в очередь один поток данные, потом ждёт когда их обработают с помощью join. Другой же поток при получении каждого нового значения вызывает task_done.

Пример программы

Суть программы проста: студенты должны сдать зачетную работу. Работа сдаётся двум преподавателям, время, за которое проверят работу, зависит от рейтинга студента (чем больше рейтинг, тем лучше работа, тем меньше проверяют).

Как видно из результатов, студенты обрабатывались в два потока, двумя преподавателями. q.join() заблокировало выполнение основного потока, так что текст распечатался только после завершения всех заданий очереди.

Полезные инструменты модуля threading

Threading имеет ещё несколько полезных инструментов, которые могут пригодиться для решения более специализированных задач.

Semaphore

Это один из старейших примитивов для синхронизации в истории информатики. Семафор использует внутренний счётчик, который уменьшается при каждом вызове acquire() и увеличивается при каждом вызове release(). Счётчик не может стать меньше нуля, когда он становится равным нулём, acquire() блокирует поток.

Здесь привёл два варианта получения доступа к данным:

  • C помощью acquire и release.
  • Используя with.

Timer

Этот класс позволяет контролировать время запуска какого-либо действия. Timer является подклассом Thread.

Вот его аргументы:

Таймеры запускаются также, как и потоки, с помощью метода start(). Их можно остановить, используя метод cancel().

С помощью таймера программист может вызывать функцию, присвоить значение переменной или, например, запустить поток в определенное время.

Пример использования:

Здесь функция myfunc выполнится через 4 секунды после вызова метода start().

Barrier

Этот класс позволяет реализовать простой механизм синхронизации потоков. Его можно использовать для фиксированного числа потоков, когда необходимо, чтобы каждый поток ждал выполнения какого-либо действия всеми.

Для того чтобы продолжить выполнения, все потоки должны вызвать метод wait(), если хоть один поток не сделал этого, остальные блокируются до тех пор, пока метод не будет вызван.

Так выглядят его аргументы:

Рассмотрим пример использования:

Здесь выставляю barrier на 2 вызова wait. То есть, для того, чтобы выполнился код после wait, wait должен быть вызван в 2 потоках. В данном случае функция myfunc сразу запускается в потоке, но она сразу не выведет ‘отработал barrier’ в консоль, а дождётся когда в основном потоке будет вызван wait тоже.

Event

Event представляет собой простой механизм реализации связи между потоками: один поток даёт сигнал о событии, другие ожидают этого сигнала.

Объект события управляет внутренним флагом, который может быть установлен в True или False с помощью методов set() и clear(). Также есть методы is_set(), которым можно проверить состояние внутреннего флага. С помощью метода wait(timeout=None) можно ждать пока не выставлен флаг в True. Так же при необходимости можно задать время ожидания.

Вот пример:

Заключение

Возможность управления потоками в Python – это мощный инструмент в разработке больших программ. Для работы с ними используется модуль Threading и библиотека queue в связке с ним.

Каждый программист Python должен уметь работать с потоками, очередями и понимать, как устроена блокировка, доступ к данным и их передача между потоками.

Основы работы с потоками в языке Python


Статья писалась в то время когда последними версиями CPython были: 2.6.2 для второй ветки и 3.1.1 для третьей ветки. В статье используются новоявленные в CPython 2.6 with statements, следовательно при желании ипользования этого кода в более ранних версиях придется переделывать код на своё усмотрение. В процессе написания данной статьи использовались только стандартные модули, доступные «из коробки». Также, исходя из того, что я являюсь не профессиональным программистом, а самоучкой, то прошу извинения у уважаемой аудитории за возможные неточности относительно трактования тех или иных понятий. Поэтому, приглашаю вас задавать вопросы, на которые я по возможности буду отвечать.

Итак, приступим, фактически, то что я собрался описывать впервые было порекомендовано уважаемым lorien с python.su (правда в его примере Queue вообще в отдельном потоке обрабатывалось :)), не уверен что он автор продемонстрированного им концепта, но впервые я увидел это опубликованным именно от него, и являет собой даже скорее не Thread Pool, а Task Pool (хотя возможно я и не прав в трактовании сего термина).
Что представляет собой многопоточное приложение? Это приложение, в котором определенное количество потоков выполняют некие задачи. Беда многих в том, что они не до конца улавливают то, что потоки действуют отдельно друг от друга до тех пор, пока активен главный поток. Лично я стараюсь писать таким образом, чтобы это им не мешало, но об этом позже. Также их проблемой является так называемый «индусский» код, который просто и бездумно откуда-то копируется, а программа доводится до уровня «лишь бы работало». Господа, усвойте раз и навсегда: если вы не понимаете, как работает тот или иной участок вашей программы, то перепишите его так, чтобы это было понятно ВАМ, если в будущем вы дорастете до понимания тех вещей, которые вы предполагали бездумно скопировать, то вам без проблем можно будет использовать этот код. Главным является именно ВАШЕ понимание того, как работает ваше творение.
Затронем проблему отдельной работы потоков. Господа, взаимодействие потоков стоит продумывать до того как вы начинаете писать приложение, а не когда вы его уже написали. В принципе, если придерживаться некоторых правил работы с исходным кодом приложения, то переделывание программы из однопоточной в многопоточную происходит легко, безболезненно, и быстро.
Касательно активности главного потока. Когда, как вам кажется, вы запускаете ОДИН поток, фактически работает уже ДВА потока. Нужно понимать, что количество потоков, активных в данный момент равняется количеству потоков, запущенных в данный момент вами +1 поток, в котором работает основное тело приложения. Лично я стараюсь писать таким образом, чтобы четко отделять основной поток от запущенных мной. Если этого не делать, то возможно преждевременное (как вам кажется) завершение работы приложения, хотя на самом деле приложение отработает именно так, как вы его написали.
Вроде на словах понятно, теперь приступаем к практике. На практике в CPython есть такое понятние как GIL (Global Interpreter Lock). Под сим подразумевается глобальная блокировка интерпритатора в тот момент когда потоки вашего приложения обращаются к процессору. Фактически, в каждый отдельно взятый момент с процессором работает только один поток. В связи с этим максимальное количество потоков, которое вообще можно запустить в стандартном CPython колеблется в районе 350 штук.
В качестве примера будет сделана попытка реализовать многопоточный парсер www.google.com. Как я уже написал выше, для работы будут использованы исключительно стандартные модули, для выполнения задачи понадобятся модули urllib2, urllib, queue, threading, re.

#==================<Имортирование необходимых модулей>==================
import urllib2
#Модуль для работы с протоколом HTTP, высокоуровневый
import urllib
#Модуль для работы с протоколом HTTP, более низкоуровневый чем urllib2,
#фактически из него необходима одна функция — urllib.urlquote
from Queue import Queue
#Модуль, который представляет собой «Pool», фактически это список, в
#котором на нужных местах вставлены замки таким образом, чтобы к нему
#одновременно мог обращаться только один поток
import threading
#Модуль для работы с потоками, из него понадобится только
#threading.active_count, threading.Thread, threading.Thread.start,
#threading.Rlock
import re
#Модуль для работы с регулярными выражениями, его использование выходит
#за пределы статьи
import time
#Модуль для работы со временем, из него нужна только функция sleep
queue = Queue()
#Обязательное присваивание, нужно делать именно так (т.е. импортировать
#класс Queue из модуля Queue и инициализировать его)
#==================</Имортирование необходимых модулей>=================

#==============================<Настройки>==============================
PROXY = «10.10.31.103:3128»
#Во время написания статьи сижу за прокси-сервером, поэтому в статье
#затрагивается и этот вопрос, этой строкой обьявляется глобальная
#переменная PROXY, в которой находится адрес прокси-сервера. Для работы
#напрямую необходимо указать значение None
HEADERS = «Accept» : «text/html, application/xml;q=0.9, application/xhtml+xml, image/ png, image/jpeg, image/gif, image/x-xbitmap, */*;q=0.1» ,
«Accept-Language» : «ru,uk-UA;q=0.9,uk;q=0.8,en;q=0.7» ,
«Accept-Charset» : «iso-8859-1, utf-8, utf-16, *;q=0.1» ,
«Accept-Encoding» : «identity, *;q=0» ,
«Connection» : «Keep-Alive» >
#Для того чтобы получить страницу с www.google.com НЕОБХОДИМО использовать
#заголовки браузера, они представлены выше в ассоциативном массиве HEADERS,
#соответствуют реальным заголовкам браузера Opera с маленько модификацией, эти
#заголовки означают что клиент не может принимать zlib compressed data, т.е.
#сжатые данные — не хотел я заморачиваться еще и с разархивироанием страниц, тем
#более что не все сайты их сжимают.
THREADS_COUNT = 10
#В принципе это все настройки приложения, это-количество потоков
DEEP = 30
#Это — значение, которое отвечает за глубину страниц поиска, которые
#нужно просматривать, фактически же определяет собой количество ссылок,
#которые будут собраны сборщиком.
ENCODING = «UTF-8»
#Кодировка ваших файлов (для загрузки данных из файла с запросами и
#последующего их перевода в юникод)
#==============================</Настройки>===================================

LOCK = threading . RLock()
# Вот тут то впервые и затрагивается модуль threading
#создается обьект LOCK, который представляет собой класс threading.RLock из
#модуля threading, это -простейший замок, который запрещает исполнение
#несколькими потоками участка кода который идет после вызова его метода
#acquire() Основным отличием threading.RLock от threading.Lock (тоже класс из
#модуля threading) является то, что каждый поток может обращаться к обьекту
#threading.RLock неограниченное количество раз, обьект threading.Lock может
#вызываться каждым потоком только единожды.

Основным принципом для беспроблемной реализации многопоточности я считаю модульность кода. Не обязательно выносить используемые Вами функции в отдельные файлы или классы, достаточно чтобы хотя бы это были отдельные функции (Прошу простить за каламбур).

Сейчас я выделю работу потока в отдельную функцию, пускай она будет представлять собой некое абстрактное понятие работы. На данном этапе пока что некоторые моменты будут непонятны, но позже все это выстроится в понятный алгоритм.

Главное, что нужно четко усвоить — это алгоритм работы самого потока, и что именно потоки должны обрабатывать независимо друг от друга. Итого, задачи потока очень просты — получить ссылку на страницу поиска, передать ее в функцию-обработчик, из которой вернутся ссылки на найденные сайты а также title этих сайтов, после записать ссылки и title в файл (все это будет находиться в parsed_data).

По мере работы реализовываются задачи от простейшей к сложной, поэтому на этих этапах практически не затронуты вопросы многопоточности. Далее настала очередь реализации функции, которая будет отвечать за запись в файл.

Далее идет реализация функции get_and_parse_page:

В итоге получаем нормально работающий многопоточный парсер. Естественно с многими минусами, но красиво написанное я задолбусь комментировать.

как получить возвращаемое значение из потока в Python?

как получить значение ‘foo’ , который возвращается из потока?

один очевидный способ сделать это, как показано выше, возвращает None .

20 ответов:

FWIW, the multiprocessing модуль имеет хороший интерфейс для этого с помощью Pool класса. И если вы хотите придерживаться темы, а не процессов, вы можете просто использовать multiprocessing.pool.ThreadPool класс как падени-в замене.

один из способов, который я видел, — передать изменяемый объект, такой как список или словарь, конструктору потока вместе с индексом или другим идентификатором какого-либо рода. Затем поток может хранить свои результаты в своем выделенном слоте в этом объекте. Например:

если вы действительно хотите join() вернуть возвращаемое значение вызываемой функции, вы можете сделать это с помощью Thread подкласс следующим образом:

это становится немного волосатым из-за некоторых имя искажения, и он обращается к» частным » структурам данных, которые являются специфичными для Thread реализация. но это работает.

для python3

Python в три ручья: работаем с потоками (часть 1)

Из этой статьи вы узнаете, как с Python выполнять несколько операций одновременно и распределять нагрузку между ядрами процессора, какие особенности языка учитывать. Но главное — поймете, когда многопоточность в Python нужна, а когда только мешает.

Небольшое предупреждение для тех, кто впервые слышит о параллельных вычислениях. Что такое поток и чем он отличается от процесса, мы выяснили в статье «Внутри процесса: многопоточность и пинг-понг mutex’ом». Тогда мы приводили примеры на Java, но теоретические основы многопоточности верны и для Python. Совпадают, в том числе, механизмы синхронизации потоков: семафоры, взаимные исключения (mutex), условия, события. Поэтому сегодня сделаем акцент на особенностях Python, его механизмах и инструментах, связанных с многопоточностью.

Организовать параллельные вычисления в Python без внешних библиотек можно с помощью модулей:

  • threading — для управления потоками.
  • queue — для организации очередей.
  • multiprocessing — для управления процессами.

Пока нас интересует только первый пункт списка.

Как создавать потоки в Python

Метод 1 — «функциональный»

Для работы с потоками из модуля threading импортируем класс Thread. В начале кода пишем:

После этого нам будет доступна функция Thread() — с ней легко создавать потоки. Синтаксис такой:

Первый параметр target — это «целевая» функция, которая определяет поведение потока и создаётся заранее. Следом идёт список аргументов. Если судьбу аргументов (например, кто будет делимым, а кто делителем в уравнении) определяет их позиция, их записывают как args=(x,y). Если же вам нужны аргументы в виде пар «ключ-значение», используйте запись вида kwargs=.

Ради удобства отладки можно также дать новому потоку имя. Для этого среди параметров функции прописывают name=«Имя потока». По умолчанию name хранит значение null. А ещё потоки можно группировать с помощью параметра group, который по умолчанию — None.

За дело! Пусть два потока параллельно выводят каждый в свой файл заданное число строк. Для начала нам понадобится функция, которая выполнит задуманный нами сценарий. Аргументами целевой функции будут число строк и имя текстового файла для записи.

Что start() запускает ранее созданный поток, вы уже догадались. Метод join() останавливает поток, когда тот выполнит свои задачи. Ведь нужно закрыть открытые файлы и освободить занятые ресурсы. Это называется «Уходя, гасите свет». Завершать потоки в предсказуемый момент и явно — надёжнее, чем снаружи и неизвестно когда. Меньше риск, что вмешаются случайные факторы. В качестве параметра в скобках можно указать, на сколько секунд блокировать поток перед продолжением его работы.

Метод 2 — «классовый»

Для потока со сложным поведением обычно пишут отдельный класс, который наследуют от Thread из модуля threading. В этом случае программу действий потока прописывают в методе run() созданного класса. Ту же петрушку мы видели и в Java.

Стандартные методы работы с потоками

Чтобы управлять потоками, нужно следить, как они себя ведут. И для этого в threading есть специальные методы:

current_thread() — смотрим, какой поток вызвал функцию;

active_count() — считаем работающие в данный момент экземпляры класса Thread;

enumerate() — получаем список работающих потоков.

Ещё можно управлять потоком через методы класса:

is_alive() — спрашиваем поток: «Жив ещё, курилка?» — получаем true или false;

getName() — узнаём имя потока;

setName(any_name) — даём потоку имя;

У каждого потока, пока он работает, есть уникальный идентификационный номер, который хранится в переменной ident.

Отсрочить операции в вызываемых потоком функциях можно с помощью таймера. В инициализаторе объектов класса Timer всего два аргумента — время ожидания в секундах и функция, которую нужно в итоге выполнить:

Таймер можно один раз создать, а затем запускать в разных частях кода.

Потусторонние потоки

Обычно Python-приложение не завершается, пока работает хоть один его поток. Но есть особые потоки, которые не мешают закрытию программы и останавливается вместе с ней. Их называют демонами (daemons). Проверить, является ли поток демоном, можно методом isDaemon(). Если является, метод вернёт истину.

Назначить поток демоном можно при создании — через параметр “daemon=True” или аргумент в инициализаторе класса.

Не поздно демонизировать и уже существующий поток методом setDaemon(daemonic).

Всё бы ничего, но это даже не верхушка айсберга, потому что прямо сейчас нас ждут великие открытия.

Приключение начинается. У древнего шлюза

Питон слывёт дружелюбным и простым в общении, но есть у него причуды. Нельзя просто взять и воспользоваться всеми преимуществами многопоточности в Python! Дорогу вам преградит огромный шлюз… Даже так — глобальный шлюз (Global Interpreter Lock, он же GIL), который ограничивает многопоточность на уровне интерпретатора. Технически, это один на всех mutex, созданный по умолчанию. Такого нет ни в C, ни в Java.

Задача шлюза — пропускать потоки строго по одному, чтоб не летали наперегонки, как печально известные стритрейсеры, и не создавали угрозу работе интерпретатора.

Без шлюза потоки подрезали бы друг друга, чтобы первыми добраться до памяти, но это еще не всё. Они имеют обыкновение внезапно засыпать за рулём! Операционная система не спрашивает, вовремя или невовремя — просто усыпляет их в ей одной известный момент. Из-за этого неупорядоченные потоки могут неожиданно перехватывать друг у друга инициативу в работе с общими ресурсами.

Дезориентированный спросонок поток, который видит перед собой совсем не ту ситуацию, при которой засыпал, рискует разбиться и повалить интерпретатор, либо попасть в тупиковую ситуацию (deadlock). Например, перед сном Поток 1 начал работу со списком, а после пробуждения не нашёл в этом списке элементов, т.к. их удалил или перезаписал Поток 2.

Чтобы такого не было, GIL в предсказуемый момент (по умолчанию раз в 5 миллисекунд для Python 3.2+) командует отработавшему потоку: «СПАААТЬ!» — тот отключается и не мешает проезжать следующему желающему. Даже если желающего нет, блокировщик всё равно подождёт, прежде чем вернуться к предыдущему активному потоку.

Благодаря шлюзу однопоточные приложения работают быстро, а потоки не конфликтуют. Но, к сожалению, многопоточные программы при таком подходе выполняются медленнее — слишком много времени уходит на регулировку «дорожного движения». А значит обработка графики, расчет математических моделей и поиск по большим массивам данных c GIL идут неприемлемо долго.

В статье «Understanding Python GIL»технический директор компании Gaglers Inc. и разработчик со стажем Chetan Giridhar приводит такой пример:

Код вычисляет факториал числа 100 000 и показывает, сколько времени ушло у машины на эту задачу. При тестировании на одном ядре и с одним потоком вычисления заняли 3,4 секунды. Тогда Четан создал и запустил второй поток. Расчет факториала на двух ядрах длился 6,2 секунды. А ведь по логике скорость вычислений не должна была существенно измениться! Повторите этот эксперимент на своей машине и посмотрите, насколько медленнее будет решена задача, если вы добавите thread2. Я получила замедление ровно вдвое.

Глобальный шлюз — наследие времён, когда программисты боролись за достойную реализацию многозадачности и у них не очень получалось. Но зачем он сегодня, когда есть много- и очень многоядерные процессоры? Как объяснил Гвидо ван Россум, без GIL не будут нормально работать C-расширения для Python. Ещё упадёт производительность однопоточных приложений: Python 3 станет медленнее, чем Python 2, а это никому не нужно.

«Нормальные герои всегда идут в обход»

Шлюз можно временно отключить. Для этого интерпретатор Python нужно отвлечь вызовом функции из внешней библиотеки или обращением к операционной системе. Например, шлюз выключится на время сохранения или открытия файла. Помните наш пример с записью строк в файлы? Как только вызванная функция возвратит управление коду Python или интерфейсу Python C API, GIL снова включается.

Как вариант, для параллельных вычислений можно использовать процессы, которые работают изолированно и неподвластны GIL. Но это большая отдельная тема. Сейчас нам важнее найти решение для многопоточности.

Если вы собираетесь использовать Python для сложных научных расчётов, обойти скоростную проблему GIL помогут библиотеки Numba, NumPy, SciPy и др. Опишу некоторые из них в двух словах, чтобы вы поняли, стоит ли разведывать это направление дальше.

Numba для математики

Numba — динамически, «на лету» компилирует Python-код, превращая его в машинный код для исполнения на CPU и GPU. Такая технология компиляции называется JIT — “Just in time”. Она помогает оптимизировать производительность программ за счет ускорения работы циклов и компиляции функций при первом запуске.

Суть в том, что вы ставите аннотации (декораторы) в узких местах кода, где вам нужно ускорить работу функций.

Для математических расчётов библиотеку удобно использовать в связке c NumPy. Допустим, нужно сложить одномерные массивы — элемент за элементом.

Метод nupmy.empty_like() принимает массив и возвращает (но не инициализирует!) другой — соответствующий исходному по форме и типу. Чтобы ускорить выполнение кода, импортируем класс jit из модуля numba и добавляем в начало кода аннотацию @jit:

Это скромное дополнение способно ускорить выполнение операции более чем в 100 раз! Если интересно, посмотрите замеры скорости математических расчётов при использовании разных библиотек для Python.

PyCUDA и Numba для графики

В графических вычислениях Numba тоже кое-что может. Она умеет работать с программной моделью CUDA, чтобы визуализировать научные данные и работу алгоритмов, выдавать информацию о GPU и др. Подробнее о том, как работают графический процессор и CUDA — здесь. И снова мы встретимся с многопоточностью.

При работе с многомерными массивами в CUDA, чтобы понять, какой поток сейчас работает с элементами массива, нужно отследить, кто и когда вызывает функцию ядра. Например, поток может определять свою позицию в сетке блоков и рассчитать соответствующий элемент массива:

Главный плюс этого кода даже не в скорости исполнения, а в прозрачности и простоте. Снова сошлюсь на Хабр, где есть сравнение скорости GPU-расчетов при использовании Numba, PyCUDA и эталонного С CUDA. Небольшой спойлер: PyCUDA позволяет достичь скорости вычислений, сопоставимой с Cи, а Numba подходит для небольших задач.

Когда многопоточность в Python оправдана

Стоит ли преодолевать связанные c GIL сложности и тратить время на реализацию многопоточности? Вот примеры ситуаций, когда многопоточность несёт с собой больше плюсов, чем минусов.

  • Для длительных и несвязанных друг с другом операций ввода-вывода. Например, нужно обрабатывать ворох разрозненных запросов с большой задержкой на ожидание. В режиме «живой очереди» это долго — лучше распараллелить задачу.
  • Вычисления занимают более миллисекунды и вы хотите сэкономить время за счёт их параллельного выполнения. Если операции укладываются в 1 мс, многопоточность не оправдает себя из-за высоких накладных расходов.
  • Число потоков не превышает количество ядер. В противном случае параллельной работы всех потоков не получается и мы больше теряем, чем выигрываем.

Когда лучше с одним потоком

  • При взаимозависимых вычислениях. Считать что-то в одном потоке и передавать для дальнейшей обработки второму — плохая идея. Возникает лишняя зависимость, которая приводит к снижению производительности, а в случае ошибки — к ступору и краху программы.
  • При работе через GIL. Это мы уже выяснили выше.
  • Когда важна хорошая переносимость на разных устройствах. Правильно подобрать число потоков для машины пользователя — задача не из легких. Если вы пишете под известное вам «железо», всё можно решить тестированием. Если же нет — понадобится дополнительно создавать гибкую систему подстройки под аппаратную часть, что потребует времени и умения.

Анонс — взаимные блокировки в Python

Самое смешное, что по умолчанию GIL защищает только интерпретатор и не предохраняет наш код от взаимных блокировок (deadlock) и других логических ошибок синхронизации. Поэтому разводить потоки по углам, как и в Java, нужно принудительно — с помощью блокирующих механизмов. Об этом и о не упомянутых в статье компонентах модуля threading мы поговорим в следующий раз.

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *