Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion requirements-install.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ scrapy>=1.8.0
pika>=1.0.0
redisbloom>=0.2.0
redis>=3.0.1
kafka-python>=1.4.7
kafka-python>=1.4.7
rocketmq-client-python>=2.0.0
9 changes: 9 additions & 0 deletions scrapy_distributed/common/queue_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,13 @@ def __init__(
self.topic = topic
self.num_partitions = num_partitions
self.replication_factor = replication_factor
self.arguments = arguments


class RocketMQQueueConfig(object):
def __init__(self, topic, group="default", tags=None, keys=None, arguments=None):
self.topic = topic
self.group = group
self.tags = tags
self.keys = keys
self.arguments = arguments
2 changes: 1 addition & 1 deletion scrapy_distributed/pipelines/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

__all__ = ["amqp", "kafka"]
__all__ = ["amqp", "kafka", "rocketmq"]
72 changes: 72 additions & 0 deletions scrapy_distributed/pipelines/rocketmq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import logging

from rocketmq.client import Producer, Message
from scrapy.utils.serialize import ScrapyJSONEncoder
from twisted.internet.threads import deferToThread

from scrapy_distributed.common.queue_config import RocketMQQueueConfig


default_serialize = ScrapyJSONEncoder().encode

logger = logging.getLogger(__name__)


class RocketMQPipeline(object):
def __init__(self, item_conf: RocketMQQueueConfig, name_server: str):
self.item_conf = item_conf
self.name_server = name_server
self.serialize = default_serialize
self.producer = None
self.connect()

@classmethod
def from_crawler(cls, crawler):
if hasattr(crawler.spider, "item_conf"):
item_conf = crawler.spider.item_conf
else:
item_conf = RocketMQQueueConfig(cls.item_key(None, crawler.spider))
return cls(
item_conf=item_conf,
name_server=crawler.settings.get("ROCKETMQ_NAME_SERVER"),
)

def process_item(self, item, spider):
return deferToThread(self._process_item, item, spider)

def _process_item(self, item, spider):
body = self.serialize(item._values)
msg = Message(self.item_conf.topic)
if self.item_conf.tags:
msg.set_tags(self.item_conf.tags)
if self.item_conf.keys:
msg.set_keys(self.item_conf.keys)
msg.set_body(body.encode())
ret = self.producer.send_sync(msg)
spider.logger.info(f"produce: {body}, status: {ret.status}")
return item

@classmethod
def item_key(cls, item, spider):
return f"{spider.name}.items"

def connect(self):
logger.info(f"connect rocketmq: {self.name_server}")
if self.producer:
try:
self.producer.shutdown()
except Exception:
pass
self.producer = Producer(self.item_conf.group)
self.producer.set_name_server_address(self.name_server)
self.producer.start()

def close(self):
if self.producer:
try:
self.producer.shutdown()
except Exception:
pass
logger.error("rocketmq pipeline producer is closed")
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def read_rst(filename):
"pika>=1.0.0",
"redisbloom>=0.2.0",
"redis>=3.0.1",
"kafka-python>=1.4.7"
"kafka-python>=1.4.7",
"rocketmq-client-python>=2.0.0"
],
)