Threading и KeyboardInterrupt.
Как организовать пул потоков? Допустим у меня есть 100 данных (пусть они находятся в каком-либо списке) и все их нужно переработать в 10 потоков. Стартуем 10 потоков, в каждый передаем по 1 данному, следим за потоками, как только один отработал, стартуем новый и так пока не закончатся данные… В общем, это не то, чтобы бред, но есть способ поудобней.
За всё время работы скрипта, стартует только 10 потоков, просто каждый из них, отработав со своим 1 данным, не дохнет, а берет из списка следующее 1 данное и делает с ним эту же работу:
# -*- coding: utf-8 -*-
import threading, Queue, time
import traceback
class Worker(threading.Thread):
def __init__(self,queue):
threading.Thread.__init__(self)
self.__queue = queue
def run(self):
while True:
try: item = self.__queue.get_nowait() # ждём данные
except Queue.Empty: break # данные закончились, прекращаем работу
try: self.work(item) # работа
except Exception: traceback.print_exc()
time.sleep(0.5)
self.__queue.task_done() # задача завершена
return
def work(self,item):
print item
#pass
def main():
# Выводим в 5 потоков цифры от 1 до 100.
queue = Queue.Queue()
num_threads = 5 # 5 потоков
for x in xrange(100):
queue.put(x) # заносим данные в очередь
for i in xrange(num_threads):
t = Worker(queue) # создаем поток
t.start() # стартуем
time.sleep(0.1) # чтобы в консоли друг на друга не накладывались
queue.join() #блокируем выполнение программы, пока не будут израсходованы данные.
print "Done!"
if __name__ == '__main__':
main()
И, в общем, хотелось бы еще, чтобы можно всё это в любой момент можно было бы остановить. Но поскольку мы выполнение программы блокируем с помощью queue.join(), то скрипт наши Ctr+C банально «не услышит». Я начал искать, что делать в таком случае, нашел вот такое решение, но у меня оно работало абы как. А потом, поразмыслив, я сам придумал решение, которое оказалось ужасно простым.
# -*- coding: utf-8 -*-
import threading, Queue, time
import traceback
class Worker(threading.Thread):
def __init__(self,queue):
threading.Thread.__init__(self)
self.__queue = queue
self.kill_received = False # флаг прекращения работы
def run(self):
while not self.kill_received:
try: item = self.__queue.get_nowait() # ждём данные
except Queue.Empty: break
try: self.work(item)
except Exception: traceback.print_exc()
time.sleep(0.5)
self.__queue.task_done() # задача завершена
self.__queue.put(item) # зациклим
return
def work(self,item):
print item
def main():
queue = Queue.Queue()
num_threads = 5 # 5 потоков
threads = []
for x in xrange(100):
queue.put(x) # заносим данные в очередь
for i in xrange(num_threads):
t = Worker(queue) # создаем нить
threads.append(t)
t.start() # стартуем
time.sleep(0.1)
#Пока в "живых" не останется только главный поток, ждем.
while threading.activeCount()>1:
try:
time.sleep(1)
except KeyboardInterrupt:
print "Ctrl-c received! Sending kill to threads..."
for t in threads:
t.kill_received = True # даем сигнал о завершении всем потокам
print "Done!"
if __name__ == '__main__':
main()
Multiprocessing
Узнал про модуль spynner, это такой управляемый браузер. Если на страничке много трудноподдающегося JavaScript-а, то вместо urllib2 или curl можно попробовать натравить на него spynner. Основы работы с ним можно почитать здесь. В общем был он нужен мне, но проблема в том, что он достаточно медленный, а работать в несколько потоков с ним не представляется возможным, так он берет своё начало от GUI-класса библиотеки Qt, которые в свою очередь можно использовать только в main-потоке. Поэтому вместо многопоточности пришлось думать о многопроцессовости. Ну это тоже самое если сделать однопоточный скрипт и
запустить одновременно N его копий. Про многопоточность я уже написал один пост, который полностью состоит из быдлокода=) С тех пор, я чутка поумнел и для организации потоков наследуюсь от класса Thread. С многопроцессовостью всё аналогично, только классы другие, а методы и структура вся та же.
# coding: utf8
import spynner
from spynner import browser
import multiprocessing
from multiprocessing import Process
class Serfer(multiprocessing.Process):
def __init__(self,queue):
multiprocessing.Process.__init__(self)
self.__queue = queue # наша очередь заданий
self.kill_received = False # На всякий случай переменная, вдруг надо будет всё остановить
def run(self):
while not self.kill_received:
try: item = self.__queue.get_nowait() # ждём данные
except Queue.Empty: break
error = True
try: error = self.serfing(item)
except: traceback.print_exc()
time.sleep(0.1)
self.__queue.task_done() # задача завершена
if error: self.__queue.put(item) # Если была ошибка, то еще раз с этими данными
return
def serfing(self,url):
br = browser.Browser() # запускаем браузер
br.create_webview()
br.show() # отображаем
br.load(url) # загружаем страницу
br.wait_a_little(75)
br.destroy_webview()
br.close() # закрываем браузер
def main():
queue = multiprocessing.JoinableQueue() # создаем очередь заданий
processes = 5
# Урлы по которым надо будет перейти в браузере
urls = ["http://ya.ru","http://google.ru","http://yahoo.com",
"http://bing.com","http://rambler.ru"]
for url in urls:
queue.put(url) # заносим данные в очередь
for i in xrange(processes):
t = Serfer(queue) # создаем процесс
t.start() # стартуем
time.sleep(0.1)
queue.join() # приостанавливаем дальнейшее выполнение кода, пока очередь не опустошится
print "Done"
if __name__ == '__main__':
main()
Декодировка писем.
Узнал на форуме модуль для декодировки MIME. Когда в письме, которое мы тянем скриптом с какого-либо ящика, находится вот такая абракадабра:
=80=D0=BE=D0=B2=D0=B0=D0=BB=D0=B8 =D1=83=D1=87=D1=91=D1=82=D0=BD=D1=83=D1=
=8E =D0=B7=D0=B0=D0=BF=D0=B8=D1=81=D1=8C =D0=B2 =D0=A2=D0=B2=D0=B8=D1=82=D1=
нужно использовать модуль quopri
import quopri
s = '''=D0=95=D1=81=D0=BB=D0=B8 =D0=B2=D1=8B =D0=BF=D0=BE=D0=BB=D1=83=D1=87=D0=B8=
=D0=BB=D0=B8 =D1=8D=D1=82=D0=BE =D1=81=D0=BE=D0=BE=D0=B1=D1=89=D0=B5=D0=BD=
=D0=B8=D0=B5 =D0=BF=D0=BE =D0=BE=D1=88=D0=B8=D0=B1=D0=BA=D0=B5, =D0=B8 =D0=
=B2=D1=8B =D0=BD=D0=B5 =D1=80=D0=B5=D0=B3=D0=B8=D1=81=D1=82=D1=80=D0=B8=D1=
=80=D0=BE=D0=B2=D0=B0=D0=BB=D0=B8 =D1=83=D1=87=D1=91=D1=82=D0=BD=D1=83=D1=
=8E =D0=B7=D0=B0=D0=BF=D0=B8=D1=81=D1=8C =D0=B2 =D0=A2=D0=B2=D0=B8=D1=82=D1=
=82=D0=B5=D1=80=D0=B5, =D0=BF=D1=80=D0=BE=D0=B9=D0=B4=D0=B8=D1=82=D0=B5 =D0=
=BF=D0=BE =D1=81=D1=81=D1=8B=D0=BB=D0=BA=D0=B5:
http://twitter.com/account/not_my_account/sdfaasdfldfjlas/65558-5ED38-131=
666'''
print quopri.decodestring(s).decode('utf8')
Получаем:
Если вы получили это сообщение по ошибке, и вы не регистрировали учётную запись в Твиттере, пройдите по ссылке:
http://twitter.com/account/not_my_account/sdfaasdfldfjlas/65558-5ED38-131666
Еще письмо может быть закодировано в base64. Среди стандартных модулей Python есть один для работы с этой кодировкой:
import base64 body = base64.b64decode(body)
Использование TOR’а в pycurl
Писать много неохота. Поэтому коротенечко: нужно установить Vidalia Bundle. Затем модуль для Python — TorCtl. Затем проксей курлу указываем 127.0.0.1:9050 с типом Socks5. Всё, таким образом мы лазим серферим через какую-то проксю тора. Для смены этой прокси нам надо приконнектиться к тору и сбросить соединение с помощью модуля TorCtl.
conn = TorCtl.connect()
conn.sendAndRecv('signal newnym\r\n')
Динамический импорт модуля.
Нужно было реализовать что-то типа плагинов: есть скрипт, представляющий из себя основу, и набор модулей для этой основы, для разных сервисов. То есть появился новый сервис — написал модуль, закинул в папку к скрипту — всё работает. Делается это довольно просто с помощью функции __import__:
module = "common_module" // название файла модуля dyn_module = __import__(module) Class = dyn_module.Dyn_Class // Dyn_Class - класс из модуля
