-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathmrlin_import.py
175 lines (143 loc) · 6.58 KB
/
mrlin_import.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
#!/usr/bin/env python
"""
mrlin - import
Imports an RDF/NTriples document concerning a graph URI into an HBase table.
See https://github.com/mhausenblas/mrlin/wiki/RDF-in-HBase for details.
Usage: python mrlin_import.py path/to/file | URL
Examples:
python mrlin_import.py data/Galway.ntriples http://example.org/
python mrlin_import.py http://dbpedia.org/data/Galway.ntriples http://example.org/
Copyright (c) 2012 The Apache Software Foundation, Licensed under the Apache License, Version 2.0.
@author: Michael Hausenblas, http://mhausenblas.info/#i
@since: 2012-10-27
@status: init
"""
import sys, os, logging, datetime, time, urllib, urllib2, json, requests, urlparse, ntriples, base64, happybase
from mrlin_utils import *
###############
# Configuration
DEBUG = False
if DEBUG:
FORMAT = '%(asctime)-0s %(levelname)s %(message)s [at line %(lineno)d]'
logging.basicConfig(level=logging.DEBUG, format=FORMAT, datefmt='%Y-%m-%dT%I:%M:%S')
else:
FORMAT = '%(asctime)-0s %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT, datefmt='%Y-%m-%dT%I:%M:%S')
#################################
# mrlin HBase interfacing classes
HBASE_BATCH_SIZE = 100
# patch the ntriples.Literal class
class SimpleLiteral(ntriples.Node):
"""Represents a simple, stripped RDF literal object."""
def __new__(cls, lit, lang=None, dtype=None):
# Note that the parsed object value in the default implementation is
# encoded as follows: '@LANGUAGE ^^DATATYPE" VALUE'
# For example:
# http://dbpedia.org/resource/Hildegarde_Naughton ... URI
# fi None Galway ... 'Galway'@fi
# None http://www.w3.org/2001/XMLSchema#int 14 ... '14'^^<http://www.w3.org/2001/XMLSchema#int>
# n = str(lang) + ' ' + str(dtype) + ' ' + lit
return unicode.__new__(cls, lit)
ntriples.Literal = SimpleLiteral
# END OF patch the ntriples.Literal class
class HBaseSink(ntriples.Sink):
"""Represents a sink for HBase."""
def __init__(self, server_port, graph_uri):
"""Inits the HBase sink. The server_port must be set to the port the Thrift server is listening.
See http://wiki.apache.org/hadoop/Hbase/ThriftApi for details.
"""
self.length = 0
self.server_port = server_port # Thrift server port
self.graph_uri = graph_uri # the target graph URI for the document
self.property_counter = {}
self.starttime = time.time()
self.time_delta = 0
# prepare the RDF table in HBase using Thrift interface:
self.hbm = HBaseThriftManager(host='localhost', server_port=self.server_port)
self.hbm.init()
self.batch = self.hbm.connection.table('rdf').batch()
def triple(self, s, p, o):
"""Processes one triple as arriving in the sink."""
if self.length == 0 : # we're starting the import task, source is ready
self.time_delta = time.time() - self.starttime
self.starttime = time.time()
logging.info('== STATUS ==')
logging.info(' Time to retrieve source: %.2f sec' %(self.time_delta))
self.length += 1
if DEBUG: logging.debug('Adding triple #%s: %s %s %s' %(self.length, s, p, o))
if self.length % HBASE_BATCH_SIZE == 0: # we have $batch_size triples processed, send batch and show stats
self.batch.send()
self.batch = self.hbm.connection.table('rdf').batch()
self.time_delta = time.time() - self.starttime
self.starttime = time.time()
logging.info('== STATUS ==')
logging.info(' Time elapsed since last checkpoint: %.2f sec' %(self.time_delta))
logging.info(' Import speed: %.2f triples per sec' %(HBASE_BATCH_SIZE/self.time_delta))
self.add_row_thrift(g=self.graph_uri,s=s,p=p,o=o)
def wrapup(self):
self.batch.send()
self.time_delta = time.time() - self.starttime
self.starttime = time.time()
logging.info('== FINAL STATUS ==')
logging.info(' Time elapsed since last checkpoint: %.2f sec' %(self.time_delta))
logging.info(' Import speed: %.2f triples per sec' %(HBASE_BATCH_SIZE/self.time_delta))
def add_row_thrift(self, g, s, p, o):
"""Inserts an RDF triple as a row with subject as key using the Thrift interface via Happybase."""
# make sure to store each property-object pair in its own column -
# for details see https://github.com/mhausenblas/mrlin/wiki/RDF-in-HBase
if s in self.property_counter:
self.property_counter[s] += 1
else:
self.property_counter[s] = 1
self.batch.put(s, { 'G:': g,
'P:' + str(self.property_counter[s]) : p,
'O:' + str(self.property_counter[s]) : repr(o) })
#######################
# CLI auxilary methods
def import_ntriples(source, graph_uri='http://example.org'):
"""Imports RDF/NTriples from directory, single file or URL."""
starttime = time.time()
imported_triples = 0
if os.path.isdir(source): # we have a directory in the local file system with RDF/NTriples files
logging.info('Importing RDF/NTriples from directory %s into graph %s' %(os.path.abspath(source), graph_uri))
logging.info('='*12)
imported_triples = _import_directory(source, graph_uri=graph_uri)
elif source[:5] == 'http:': # we have a URL where we get the RDF/NTriples file from
logging.info('Importing RDF/NTriples from URL %s into graph %s' %(source, graph_uri))
logging.info('='*12)
imported_triples = _import_data(src = urllib.urlopen(source), graph_uri=graph_uri)
else: # we have a single RDF/NTriples file from the local file system
logging.info('Importing RDF/NTriples from file %s into graph %s' %(source, graph_uri))
logging.info('='*12)
imported_triples = _import_data(src = open(source), graph_uri=graph_uri)
deltatime = time.time() - starttime
logging.info('='*12)
logging.info('Imported %d triples in %.2f seconds.' %(imported_triples, deltatime))
def _import_directory(src_dir, graph_uri):
"""Imports RDF/NTriples from directory."""
imported_triples = 0
for dirname, dirnames, filenames in os.walk(src_dir):
for filename in filenames:
if filename.endswith(('nt','ntriples')):
logging.info('Importing RDF/NTriples from file %s into graph %s' %(filename, graph_uri))
imported_triples += _import_data(src = urllib.urlopen(os.path.join(src_dir, filename)), graph_uri=graph_uri)
return imported_triples
def _import_data(src, graph_uri):
"""Imports RDF/NTriples from a single source, either local file or via URL."""
nt_parser = ntriples.NTriplesParser(sink=HBaseSink(server_port=HBASE_THRIFT_PORT, graph_uri=graph_uri))
sink = nt_parser.parse(src)
sink.wrapup() # needed as the number of triples can be smaller than batch size (!)
src.close()
return sink.length
#############
# Main script
if __name__ == '__main__':
try:
if len(sys.argv) == 3:
inp = sys.argv[1]
graph_uri = sys.argv[2]
import_ntriples(inp, graph_uri)
else: print __doc__
except Exception, e:
logging.error(e)
sys.exit(2)