Архив Январь, 2012
sqlalchemy
Sqlalchemy — фреймворк для работы с базами данных с технолгией ORM. Почитать про него можно тут. Моей задачей было понять, как использовать этот фреймворк в потоках.
import sqlalchemy from sqlalchemy.orm import sessionmaker, scoped_session connection = "mysql://root:@localhost/mydatabase" # pool_size - указывает сколько запросов хранить в очереди, если будет больше, вылетит exception engine = sqlalchemy.create_engine(connection, pool_size=30) # Создаем объект Session и дальше во всех потоках используем только его. Session = scoped_session(sessionmaker(bind=engine)) #Accounts - моя таблица, к которой мне нужно делать запросы, код по настройке отображения я опустил. accounts = Session.query(Accounts) account = accounts[0]
Один интересный момент, account, созданный в одном потоке и переданный в другой, изменить не получится.
Если мы поменяем атрибуты account в новом потоке и выполним Session.commit() — таблица не обновится.
Threading и KeyboardInterrupt.
комментарии: 3
Как организовать пул потоков? Допустим у меня есть 100 данных (пусть они находятся в каком-либо списке) и все их нужно переработать в 10 потоков. Стартуем 10 потоков, в каждый передаем по 1 данному, следим за потоками, как только один отработал, стартуем новый и так пока не закончатся данные… В общем, это не то, чтобы бред, но есть способ поудобней.
За всё время работы скрипта, стартует только 10 потоков, просто каждый из них, отработав со своим 1 данным, не дохнет, а берет из списка следующее 1 данное и делает с ним эту же работу:
# -*- coding: utf-8 -*- import threading, Queue, time import traceback class Worker(threading.Thread): def __init__(self,queue): threading.Thread.__init__(self) self.__queue = queue def run(self): while True: try: item = self.__queue.get_nowait() # ждём данные except Queue.Empty: break # данные закончились, прекращаем работу try: self.work(item) # работа except Exception: traceback.print_exc() time.sleep(0.5) self.__queue.task_done() # задача завершена return def work(self,item): print item #pass def main(): # Выводим в 5 потоков цифры от 1 до 100. queue = Queue.Queue() num_threads = 5 # 5 потоков for x in xrange(100): queue.put(x) # заносим данные в очередь for i in xrange(num_threads): t = Worker(queue) # создаем поток t.start() # стартуем time.sleep(0.1) # чтобы в консоли друг на друга не накладывались queue.join() #блокируем выполнение программы, пока не будут израсходованы данные. print "Done!" if __name__ == '__main__': main()
И, в общем, хотелось бы еще, чтобы можно всё это в любой момент можно было бы остановить. Но поскольку мы выполнение программы блокируем с помощью queue.join(), то скрипт наши Ctr+C банально «не услышит». Я начал искать, что делать в таком случае, нашел вот такое решение, но у меня оно работало абы как. А потом, поразмыслив, я сам придумал решение, которое оказалось ужасно простым.
# -*- coding: utf-8 -*- import threading, Queue, time import traceback class Worker(threading.Thread): def __init__(self,queue): threading.Thread.__init__(self) self.__queue = queue self.kill_received = False # флаг прекращения работы def run(self): while not self.kill_received: try: item = self.__queue.get_nowait() # ждём данные except Queue.Empty: break try: self.work(item) except Exception: traceback.print_exc() time.sleep(0.5) self.__queue.task_done() # задача завершена self.__queue.put(item) # зациклим return def work(self,item): print item def main(): queue = Queue.Queue() num_threads = 5 # 5 потоков threads = [] for x in xrange(100): queue.put(x) # заносим данные в очередь for i in xrange(num_threads): t = Worker(queue) # создаем нить threads.append(t) t.start() # стартуем time.sleep(0.1) #Пока в "живых" не останется только главный поток, ждем. while threading.activeCount()>1: try: time.sleep(1) except KeyboardInterrupt: print "Ctrl-c received! Sending kill to threads..." for t in threads: t.kill_received = True # даем сигнал о завершении всем потокам print "Done!" if __name__ == '__main__': main()