diff --git a/requirements-install.txt b/requirements-install.txt index f1f0fd2..669fbba 100644 --- a/requirements-install.txt +++ b/requirements-install.txt @@ -2,4 +2,5 @@ scrapy>=1.8.0 pika>=1.0.0 redisbloom>=0.2.0 redis>=3.0.1 -kafka-python>=1.4.7 \ No newline at end of file +kafka-python>=1.4.7 +rocketmq-client-python>=2.0.0 diff --git a/scrapy_distributed/common/queue_config.py b/scrapy_distributed/common/queue_config.py index 6c1420f..6183a03 100644 --- a/scrapy_distributed/common/queue_config.py +++ b/scrapy_distributed/common/queue_config.py @@ -31,4 +31,13 @@ def __init__( self.topic = topic self.num_partitions = num_partitions self.replication_factor = replication_factor + self.arguments = arguments + + +class RocketMQQueueConfig(object): + def __init__(self, topic, group="default", tags=None, keys=None, arguments=None): + self.topic = topic + self.group = group + self.tags = tags + self.keys = keys self.arguments = arguments \ No newline at end of file diff --git a/scrapy_distributed/pipelines/__init__.py b/scrapy_distributed/pipelines/__init__.py index 37ee13a..4baed52 100644 --- a/scrapy_distributed/pipelines/__init__.py +++ b/scrapy_distributed/pipelines/__init__.py @@ -1,4 +1,4 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -__all__ = ["amqp", "kafka"] +__all__ = ["amqp", "kafka", "rocketmq"] diff --git a/scrapy_distributed/pipelines/rocketmq.py b/scrapy_distributed/pipelines/rocketmq.py new file mode 100644 index 0000000..c8d98b1 --- /dev/null +++ b/scrapy_distributed/pipelines/rocketmq.py @@ -0,0 +1,72 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +import logging + +from rocketmq.client import Producer, Message +from scrapy.utils.serialize import ScrapyJSONEncoder +from twisted.internet.threads import deferToThread + +from scrapy_distributed.common.queue_config import RocketMQQueueConfig + + +default_serialize = ScrapyJSONEncoder().encode + +logger = logging.getLogger(__name__) + + +class RocketMQPipeline(object): + def __init__(self, item_conf: RocketMQQueueConfig, name_server: str): + self.item_conf = item_conf + self.name_server = name_server + self.serialize = default_serialize + self.producer = None + self.connect() + + @classmethod + def from_crawler(cls, crawler): + if hasattr(crawler.spider, "item_conf"): + item_conf = crawler.spider.item_conf + else: + item_conf = RocketMQQueueConfig(cls.item_key(None, crawler.spider)) + return cls( + item_conf=item_conf, + name_server=crawler.settings.get("ROCKETMQ_NAME_SERVER"), + ) + + def process_item(self, item, spider): + return deferToThread(self._process_item, item, spider) + + def _process_item(self, item, spider): + body = self.serialize(item._values) + msg = Message(self.item_conf.topic) + if self.item_conf.tags: + msg.set_tags(self.item_conf.tags) + if self.item_conf.keys: + msg.set_keys(self.item_conf.keys) + msg.set_body(body.encode()) + ret = self.producer.send_sync(msg) + spider.logger.info(f"produce: {body}, status: {ret.status}") + return item + + @classmethod + def item_key(cls, item, spider): + return f"{spider.name}.items" + + def connect(self): + logger.info(f"connect rocketmq: {self.name_server}") + if self.producer: + try: + self.producer.shutdown() + except Exception: + pass + self.producer = Producer(self.item_conf.group) + self.producer.set_name_server_address(self.name_server) + self.producer.start() + + def close(self): + if self.producer: + try: + self.producer.shutdown() + except Exception: + pass + logger.error("rocketmq pipeline producer is closed") diff --git a/setup.py b/setup.py index 4035764..47f383a 100644 --- a/setup.py +++ b/setup.py @@ -59,6 +59,7 @@ def read_rst(filename): "pika>=1.0.0", "redisbloom>=0.2.0", "redis>=3.0.1", - "kafka-python>=1.4.7" + "kafka-python>=1.4.7", + "rocketmq-client-python>=2.0.0" ], )