Skip to content

Draft of recent changes polling #33

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion dump_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
import sys
from sseclient import SSEClient as EventSource
from dateutil import parser
from store.stream import WikidataEditStream
from store.stream import WikidataEditStream, RecentChangesStream

if __name__ == '__main__':
s = WikidataEditStream()
s = RecentChangesStream()
offset = None
if len(sys.argv) > 1:
offset = parser.parse(sys.argv[1])
Expand Down
90 changes: 90 additions & 0 deletions store/stream.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import json
import dateutil
import datetime
from time import sleep
from sseclient import SSEClient as EventSource
import requests

class WikidataEditStream(object):
def __init__(self):
Expand All @@ -20,3 +24,89 @@ def stream(self, from_time=None):
pass


class RecentChangesStream(object):
"""
Generates an edit stream by polling the recent changes feed
on the target wiki
"""
def __init__(self, endpoint_url='https://www.wikidata.org/w/api.php', base='https://www.wikidata.org/entity/'):
self.url = endpoint_url
self.base = base
self.namespaces = [0, 120]

def stream(self, from_time=None):
params = {
'action': 'query',
'list': 'recentchanges',
'format': 'json',
'rcnamespace': '|'.join(str(n) for n in self.namespaces),
'rcdir': 'newer',
'rclimit': 500,
'rctype': 'edit|new|log',
'rcprop': 'user|comment|parsedcomment|title|ids|sizes|timestamp|flags|loginfo'
}
from_time = from_time or datetime.datetime.utcnow()
start_timestamp = from_time.isoformat().replace('+00:00', 'Z')
continue_token = None
previously_seen = set()
while True:
try:
full_params = dict(params)
if continue_token:
full_params['rccontinue'] = continue_token
elif start_timestamp:
full_params['rcstart'] = start_timestamp

sleep(1)
r = requests.get(self.url, full_params)
r.raise_for_status()
start_timestamp = None
try:
json_payload = r.json()
rcids = set()
for item in json_payload.get('query', {}).get('recentchanges') or []:
rcid = item.get('rcid')
rcids.add(rcid)
start_timestamp = item.get('timestamp')
if rcid not in previously_seen:
yield item # self.translate_to_eventstream_json(item)
else:
print('skipping {}'.format(rcid))
continue_token = json_payload.get('continue', {}).get('rccontinue')
except ValueError as e:
print(e)
pass
except requests.exceptions.RequestException as e:
print(e)
previously_seen = rcids

def translate_to_eventstream_json(self, rc_event):
"""
Translates the representation of an edit obtained form recentchanges
to the one exposed by the WMF eventstream
"""
return {
'type': rc_event.get('type'),
'meta': {
'uri': self.base+rc_event.get('title')
},
'namespace': rc_event.get('ns'),
'title': rc_event.get('title'),
'id': rc_event.get('pageid'),
'revision': {
'old': rc_event.get('old_revid'),
'new': rc_event.get('revid')
},
'user': rc_event.get('user'),
'bot': 'bot' in rc_event,
'minor': 'minor' in rc_event,
'length': {
'old': rc_event.get('oldlen'),
'new': rc_event.get('newlen')
},
'comment': rc_event.get('comment'),
'parsedcomment': rc_event.get('parsedcomment'),
'timestamp': int(dateutil.parser.parse(rc_event.get('timestamp')).timestamp()),
'log_action': rc_event.get('logaction')
}

126 changes: 126 additions & 0 deletions store/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from .models import Edit
from .models import Batch
from .stream import WikidataEditStream
from .stream import RecentChangesStream
from tagging.utils import FileBasedDiffInspector
from tagging.utils import BatchInspectorStub
from revert.models import RevertTask
Expand Down Expand Up @@ -342,6 +343,131 @@ def test_stream(self):
break
self.assertEquals('wikidatawiki', edit['wiki'])


class RecentChangesStreamTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.stream = RecentChangesStream()

def test_live(self):
for idx, edit in enumerate(self.stream.stream()):
if idx > 10:
break

def test_translate(self):
# Delete event
eventstream = self.stream.translate_to_eventstream_json(
{
"type": "log",
"ns": 0,
"title": "Q65133577",
"pageid": 64773771,
"revid": 0,
"old_revid": 0,
"rcid": 1213466409,
"user": "HakanIST",
"oldlen": 0,
"newlen": 0,
"timestamp": "2020-05-06T06:19:26Z",
"comment": "Empty item",
"parsedcomment": "Empty item",
"logid": 663320792,
"logtype": "delete",
"logaction": "delete",
"logparams": { }
}
)
self.assertEquals(eventstream,
{
'type': 'log',
'bot': False,
'comment': 'Empty item',
'id': 64773771,
'length': {'new': 0, 'old': 0},
'log_action': 'delete',
'meta': {'uri': 'https://www.wikidata.org/entity/Q65133577'},
'minor': False,
'namespace': 0,
'parsedcomment': 'Empty item',
'revision': {'new': 0, 'old': 0},
'timestamp': 1588745966,
'title': 'Q65133577',
'user': 'HakanIST'
}
)

# Simple edit
eventstream = self.stream.translate_to_eventstream_json(
{
"type": "edit",
"ns": 0,
"title": "Q2349542",
"pageid": 2269945,
"revid": 1174994291,
"old_revid": 1037245488,
"rcid": 1213544214,
"user": "SuccuBot",
"oldlen": 14935,
"newlen": 15006,
"timestamp": "2020-05-06T07:58:32Z",
"comment": "/* wbsetlabel-add:1|pt */ Efate raptor",
"parsedcomment": '<span dir="auto"><span class="autocomment">Added [pt] label: </span></span> Efate raptor'
})
self.assertEquals(eventstream,
{
'type': 'edit',
'comment': '/* wbsetlabel-add:1|pt */ Efate raptor',
'id': 2269945,
'length': {'new': 15006, 'old': 14935},
'log_action': None,
'meta': {'uri': 'https://www.wikidata.org/entity/Q2349542'},
'minor': False,
'namespace': 0,
'bot': False,
'parsedcomment': '<span dir="auto"><span class="autocomment">Added [pt] label: </span></span> Efate raptor',
'revision': {'new': 1174994291, 'old': 1037245488},
'timestamp': 1588751912,
'title': 'Q2349542',
'user': 'SuccuBot'
}
)
# Item creation
eventstream = self.stream.translate_to_eventstream_json(
{
"type": "new",
"ns": 0,
"title": "Q93422955",
"pageid": 92529989,
"revid": 1174999530,
"old_revid": 0,
"rcid": 1213549575,
"user": "Ghuron",
"oldlen": 0,
"newlen": 22996,
"timestamp": "2020-05-06T08:06:25Z",
"comment": '/* wbeditentity-create-item:0| */ batch import from [[Q654724|SIMBAD]] for object "TYC 3740-948-1"',
"parsedcomment": '<span dir="auto"><span class="autocomment">Created a new Item: </span></span> batch import from <a href="/wiki/Q654724" title="Q654724">SIMBAD</a> for object &quot;TYC 3740-948-1&quot;'
}
)
self.assertEquals(eventstream,
{
"type": "new",
"meta": { "uri": "https://www.wikidata.org/entity/Q93422955" },
"namespace": 0,
"title": "Q93422955",
"id": 92529989,
"revision": { "old": 0, "new": 1174999530 },
"user": "Ghuron",
"bot": False,
"minor": False,
"length": { "old": 0, "new": 22996 },
"comment": "/* wbeditentity-create-item:0| */ batch import from [[Q654724|SIMBAD]] for object \"TYC 3740-948-1\"",
"parsedcomment": "<span dir=\"auto\"><span class=\"autocomment\">Created a new Item: </span></span> batch import from <a href=\"/wiki/Q654724\" title=\"Q654724\">SIMBAD</a> for object &quot;TYC 3740-948-1&quot;",
"timestamp": 1588752385,
"log_action": None
}
)

class PagesTest(TestCase):

@classmethod
Expand Down