Revision: 59314
Initial Code
Initial URL
Initial Description
Initial Title
Initial Tags
Initial Language
at September 1, 2012 07:14 by scrapy
Initial Code
# This pipeline enqueues scraped items to a message queue. It depends on the [carrot](http://ask.github.com/carrot/genindex.html) library. [Gist](https://gist.github.com/1574231)
from scrapy.xlib.pydispatch import dispatcher
from scrapy import signals
from scrapy.exceptions import DropItem
from scrapy.utils.serialize import ScrapyJSONEncoder
from carrot.connection import BrokerConnection
from carrot.messaging import Publisher
from twisted.internet.threads import deferToThread
class MessageQueuePipeline(object):
def __init__(self, host_name, port, userid, password, virtual_host, encoder_class):
self.q_connection = BrokerConnection(hostname=host_name, port=port,
userid=userid, password=password,
virtual_host=virtual_host)
self.encoder = encoder_class()
dispatcher.connect(self.spider_opened, signals.spider_opened)
dispatcher.connect(self.spider_closed, signals.spider_closed)
@classmethod
def from_settings(cls, settings):
host_name = settings.get('BROKER_HOST', 'localhost')
port = settings.get('BROKER_PORT', 5672)
userid = settings.get('BROKER_USERID', "guest")
password = settings.get('BROKER_PASSWORD', "guest")
virtual_host = settings.get('BROKER_VIRTUAL_HOST', "/")
encoder_class = settings.get('MESSAGE_Q_SERIALIZER', ScrapyJSONEncoder)
return cls(host_name, port, userid, password, virtual_host, encoder_class)
def spider_opened(self, spider):
self.publisher = Publisher(connection=self.q_connection,
exchange="", routing_key=spider.name)
def spider_closed(self, spider):
self.publisher.close()
def process_item(self, item, spider):
return deferToThread(self._process_item, item, spider)
def _process_item(self, item, spider):
self.publisher.send({"scraped_data": self.encoder.encode(dict(item))})
return item
# Snippet imported from snippets.scrapy.org (which no longer works)
# author: zsquare
# date : Jan 07, 2012
Initial URL
Initial Description
Initial Title
Submit scraped items to Message Queue (amqp)
Initial Tags
Initial Language
Python