diff --git a/dump_events.py b/dump_events.py
index 7240091..3294a55 100644
--- a/dump_events.py
+++ b/dump_events.py
@@ -2,10 +2,11 @@
import sys
from sseclient import SSEClient as EventSource
from dateutil import parser
-from store.stream import WikidataEditStream
+from store.stream import WikidataEditStream, RecentChangesStream
if __name__ == '__main__':
s = WikidataEditStream()
+ s = RecentChangesStream()
offset = None
if len(sys.argv) > 1:
offset = parser.parse(sys.argv[1])
diff --git a/store/stream.py b/store/stream.py
index a9c53b0..5cd0950 100644
--- a/store/stream.py
+++ b/store/stream.py
@@ -1,5 +1,9 @@
import json
+import dateutil
+import datetime
+from time import sleep
from sseclient import SSEClient as EventSource
+import requests
class WikidataEditStream(object):
def __init__(self):
@@ -20,3 +24,89 @@ def stream(self, from_time=None):
pass
+class RecentChangesStream(object):
+ """
+ Generates an edit stream by polling the recent changes feed
+ on the target wiki
+ """
+ def __init__(self, endpoint_url='https://www.wikidata.org/w/api.php', base='https://www.wikidata.org/entity/'):
+ self.url = endpoint_url
+ self.base = base
+ self.namespaces = [0, 120]
+
+ def stream(self, from_time=None):
+ params = {
+ 'action': 'query',
+ 'list': 'recentchanges',
+ 'format': 'json',
+ 'rcnamespace': '|'.join(str(n) for n in self.namespaces),
+ 'rcdir': 'newer',
+ 'rclimit': 500,
+ 'rctype': 'edit|new|log',
+ 'rcprop': 'user|comment|parsedcomment|title|ids|sizes|timestamp|flags|loginfo'
+ }
+ from_time = from_time or datetime.datetime.utcnow()
+ start_timestamp = from_time.isoformat().replace('+00:00', 'Z')
+ continue_token = None
+ previously_seen = set()
+ while True:
+ try:
+ full_params = dict(params)
+ if continue_token:
+ full_params['rccontinue'] = continue_token
+ elif start_timestamp:
+ full_params['rcstart'] = start_timestamp
+
+ sleep(1)
+ r = requests.get(self.url, full_params)
+ r.raise_for_status()
+ start_timestamp = None
+ try:
+ json_payload = r.json()
+ rcids = set()
+ for item in json_payload.get('query', {}).get('recentchanges') or []:
+ rcid = item.get('rcid')
+ rcids.add(rcid)
+ start_timestamp = item.get('timestamp')
+ if rcid not in previously_seen:
+ yield item # self.translate_to_eventstream_json(item)
+ else:
+ print('skipping {}'.format(rcid))
+ continue_token = json_payload.get('continue', {}).get('rccontinue')
+ except ValueError as e:
+ print(e)
+ pass
+ except requests.exceptions.RequestException as e:
+ print(e)
+ previously_seen = rcids
+
+ def translate_to_eventstream_json(self, rc_event):
+ """
+ Translates the representation of an edit obtained form recentchanges
+ to the one exposed by the WMF eventstream
+ """
+ return {
+ 'type': rc_event.get('type'),
+ 'meta': {
+ 'uri': self.base+rc_event.get('title')
+ },
+ 'namespace': rc_event.get('ns'),
+ 'title': rc_event.get('title'),
+ 'id': rc_event.get('pageid'),
+ 'revision': {
+ 'old': rc_event.get('old_revid'),
+ 'new': rc_event.get('revid')
+ },
+ 'user': rc_event.get('user'),
+ 'bot': 'bot' in rc_event,
+ 'minor': 'minor' in rc_event,
+ 'length': {
+ 'old': rc_event.get('oldlen'),
+ 'new': rc_event.get('newlen')
+ },
+ 'comment': rc_event.get('comment'),
+ 'parsedcomment': rc_event.get('parsedcomment'),
+ 'timestamp': int(dateutil.parser.parse(rc_event.get('timestamp')).timestamp()),
+ 'log_action': rc_event.get('logaction')
+ }
+
diff --git a/store/tests.py b/store/tests.py
index cc738a9..ea37663 100644
--- a/store/tests.py
+++ b/store/tests.py
@@ -15,6 +15,7 @@
from .models import Edit
from .models import Batch
from .stream import WikidataEditStream
+from .stream import RecentChangesStream
from tagging.utils import FileBasedDiffInspector
from tagging.utils import BatchInspectorStub
from revert.models import RevertTask
@@ -342,6 +343,131 @@ def test_stream(self):
break
self.assertEquals('wikidatawiki', edit['wiki'])
+
+class RecentChangesStreamTest(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ cls.stream = RecentChangesStream()
+
+ def test_live(self):
+ for idx, edit in enumerate(self.stream.stream()):
+ if idx > 10:
+ break
+
+ def test_translate(self):
+ # Delete event
+ eventstream = self.stream.translate_to_eventstream_json(
+ {
+ "type": "log",
+ "ns": 0,
+ "title": "Q65133577",
+ "pageid": 64773771,
+ "revid": 0,
+ "old_revid": 0,
+ "rcid": 1213466409,
+ "user": "HakanIST",
+ "oldlen": 0,
+ "newlen": 0,
+ "timestamp": "2020-05-06T06:19:26Z",
+ "comment": "Empty item",
+ "parsedcomment": "Empty item",
+ "logid": 663320792,
+ "logtype": "delete",
+ "logaction": "delete",
+ "logparams": { }
+ }
+ )
+ self.assertEquals(eventstream,
+ {
+ 'type': 'log',
+ 'bot': False,
+ 'comment': 'Empty item',
+ 'id': 64773771,
+ 'length': {'new': 0, 'old': 0},
+ 'log_action': 'delete',
+ 'meta': {'uri': 'https://www.wikidata.org/entity/Q65133577'},
+ 'minor': False,
+ 'namespace': 0,
+ 'parsedcomment': 'Empty item',
+ 'revision': {'new': 0, 'old': 0},
+ 'timestamp': 1588745966,
+ 'title': 'Q65133577',
+ 'user': 'HakanIST'
+ }
+ )
+
+ # Simple edit
+ eventstream = self.stream.translate_to_eventstream_json(
+ {
+ "type": "edit",
+ "ns": 0,
+ "title": "Q2349542",
+ "pageid": 2269945,
+ "revid": 1174994291,
+ "old_revid": 1037245488,
+ "rcid": 1213544214,
+ "user": "SuccuBot",
+ "oldlen": 14935,
+ "newlen": 15006,
+ "timestamp": "2020-05-06T07:58:32Z",
+ "comment": "/* wbsetlabel-add:1|pt */ Efate raptor",
+ "parsedcomment": ' Efate raptor'
+ })
+ self.assertEquals(eventstream,
+ {
+ 'type': 'edit',
+ 'comment': '/* wbsetlabel-add:1|pt */ Efate raptor',
+ 'id': 2269945,
+ 'length': {'new': 15006, 'old': 14935},
+ 'log_action': None,
+ 'meta': {'uri': 'https://www.wikidata.org/entity/Q2349542'},
+ 'minor': False,
+ 'namespace': 0,
+ 'bot': False,
+ 'parsedcomment': ' Efate raptor',
+ 'revision': {'new': 1174994291, 'old': 1037245488},
+ 'timestamp': 1588751912,
+ 'title': 'Q2349542',
+ 'user': 'SuccuBot'
+ }
+ )
+ # Item creation
+ eventstream = self.stream.translate_to_eventstream_json(
+ {
+ "type": "new",
+ "ns": 0,
+ "title": "Q93422955",
+ "pageid": 92529989,
+ "revid": 1174999530,
+ "old_revid": 0,
+ "rcid": 1213549575,
+ "user": "Ghuron",
+ "oldlen": 0,
+ "newlen": 22996,
+ "timestamp": "2020-05-06T08:06:25Z",
+ "comment": '/* wbeditentity-create-item:0| */ batch import from [[Q654724|SIMBAD]] for object "TYC 3740-948-1"',
+ "parsedcomment": ' batch import from SIMBAD for object "TYC 3740-948-1"'
+ }
+ )
+ self.assertEquals(eventstream,
+ {
+ "type": "new",
+ "meta": { "uri": "https://www.wikidata.org/entity/Q93422955" },
+ "namespace": 0,
+ "title": "Q93422955",
+ "id": 92529989,
+ "revision": { "old": 0, "new": 1174999530 },
+ "user": "Ghuron",
+ "bot": False,
+ "minor": False,
+ "length": { "old": 0, "new": 22996 },
+ "comment": "/* wbeditentity-create-item:0| */ batch import from [[Q654724|SIMBAD]] for object \"TYC 3740-948-1\"",
+ "parsedcomment": " batch import from SIMBAD for object "TYC 3740-948-1"",
+ "timestamp": 1588752385,
+ "log_action": None
+ }
+ )
+
class PagesTest(TestCase):
@classmethod