Skip to content

Commit

Permalink
new cpc_current download and parsing code for the app_db) (#80)
Browse files Browse the repository at this point in the history
  • Loading branch information
DiPietroch authored Jul 2, 2020
1 parent 6f6e4f5 commit dcdd51b
Show file tree
Hide file tree
Showing 2 changed files with 314 additions and 0 deletions.
108 changes: 108 additions & 0 deletions updater/collect_supplemental_data/cpc_parser/pgpubs_cpc_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import csv
import os
import sys
import pandas as pd
import numpy as np
from sqlalchemy import create_engine
from lib.configuration import get_config
from helpers import general_helpers


def parse_and_write_cpc(inputdir, config):
""" Parse CPC Classifications """

df_list = []

for filename in os.listdir(inputdir):
if (filename.startswith('US_PGPub_CPC_MCF_') and filename.endswith('.txt')):
df_list = parse_pgpub_file(inputdir +'/'+filename)

df = pd.DataFrame(df_list, columns = ['document_number', 'sequence', 'version', 'section_id', 'subsection_id', 'group_id',
'subgroup_id', 'symbol_position', 'value'])

df['category'] = None
df['category'] = np.select([df['value'] == 'I',df['value'] == 'A'],['inventional','additional'],df['category'])

database = '{}'.format(config['DATABASE']['TEMP_DATABASE'])
host = '{}'.format(config['DATABASE']['HOST'])
user = '{}'.format(config['DATABASE']['USERNAME'])
password = '{}'.format(config['DATABASE']['PASSWORD'])
port = '{}'.format(config['DATABASE']['PORT'])

engine = create_engine(
'mysql+pymysql://{0}:{1}@{2}:{3}/{4}?charset=utf8mb4'.format(user, password, host, port, database))


df.to_sql('cpc_current', con=engine, if_exists='append', index=False)

def parse_pgpub_file(filepath):
""" Extract CPC classification from ~35 million applications """
with open(filepath) as f:
input_rows = f.readlines()
print("Parsing app file: {}; rows: {}".format(filepath, len(input_rows)))

# Since applications are already sorted by app_number, we can check if the
# current application has the same number as the last one seen.
# Once we see a new application, save the classifications that have been
# recorded and reset the lists of recorded classifications
results = []

# Initial values -- this will give us a first row with no data.
last_application_seen = ''
primary_classifications = []
additional_classifications = []
sequence = 0

for i in range(len(input_rows)):
row = input_rows[i]

# Skip blank rows
if row != '':
app_number = row[10:21]
cpc_section = row[21]
cpc_subsection = cpc_section + row[22:24]
cpc_group = cpc_subsection + row[24]
cpc_subgroup = cpc_group + strip_whitespace(row[25:36])
symbol_position = row[44]
value = row[45]
version = row[36:44]
classification = strip_whitespace(row[21:36])
if i == 0:
last_application_seen = app_number

if i != 0 and app_number == last_application_seen:
sequence += 1
else:
sequence = 0

# Save the classifications found to our results dataset
results.append([app_number, sequence, version, cpc_section, cpc_subsection, cpc_group,
cpc_subgroup, symbol_position, value])

# Start recording for a new application
last_application_seen = app_number

# There is a problematic line that is cut short; as a result, we don't
# know whether it is primary or secondary; so, skip this classification
if len(row) <= 45:
continue

# Return all except the first row, which had empty placeholder values
return results

def strip_whitespace(s):
""" Strip whitespace really fast:
re.sub('\s+', '', s) 2.33 usec per loop
''.join(s.split()) 0.47 usec per loop
s.replace(' ','') 0.40 usec per loop
"""
return ''.join(s.split())


if __name__ == '__main__':
config = get_config()

location_of_cpc_files = '{}/{}'.format(config['FOLDERS']['WORKING_FOLDER'], 'cpc_input')
parse_and_write_cpc(location_of_cpc_files, config)
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
import urllib
import zipfile
import os
from lxml import html
import lxml.html
import sys
from QA.collect_supplemental_data.cpc_parser.CPCDownloadTest import CPCDownloadTest
from lib.configuration import get_config
from helpers import general_helpers

def download_cpc_schema(destination_folder):
""" Download and extract the most recent CPC Schema """

# Find the correct CPC Schema url
cpc_schema_url = find_cpc_schema_url()
cpc_schema_zip_filepath = os.path.join(destination_folder,
"CPC_Schema.zip")
print(cpc_schema_url)
# Download the CPC Schema zip file
print("Destination: {}".format(cpc_schema_zip_filepath))
general_helpers.download(url=cpc_schema_url, filepath=cpc_schema_zip_filepath)

# Unzip the zip file
print("Extracting contents to: {}".format(destination_folder))
z = zipfile.ZipFile(cpc_schema_zip_filepath)
z.extractall(destination_folder)
z.close()

# Remove the original zip file
print("Removing: {}".format(cpc_schema_zip_filepath))
os.remove(cpc_schema_zip_filepath)


def find_cpc_schema_url():
"""
Search the CPC Scheme & Definition Page for the most recent CPC Scheme.
This method is necessary because the schema zip file may change names, and
multiple versions of the schema may be listed on the webpage.
If there are multiple schema urls, sorting alphabetically ensures that the
most recent schema is returned.
"""
base_url = 'http://www.cooperativepatentclassification.org'
page = urllib.request.urlopen(base_url + '/cpcSchemeAndDefinitions/Bulk.html')
tree = html.fromstring(page.read())
potential_links = []
for link in tree.xpath('//a/@href'):
if (link.lstrip(".").startswith("/cpc/bulk/CPCSchemeXML")
and link.endswith(".zip")):
potential_links.append(link.replace('..',''))

# Since zip files are formatted CPCSchemeXMLYYYYMM.zip,
# the last sorted url corresponds to the latest version
return base_url + sorted(potential_links)[-1]


def download_cpc_pgpub_classifications(destination_folder):
"""
Download and extract the most recent CPC Master Classification Files (MCF)
"""
# Find the correct CPC Grant and PGPub MCF urls
cpc_pgpub_mcf_url = find_cpc_pgpub_urls()

cpc_pgpub_zip_filepath = os.path.join(destination_folder,
'CPC_pgpub_mcf.zip')

# Download and extract CPC Grant and PGPub classifications
for (filepath, url) in [(cpc_pgpub_zip_filepath, cpc_pgpub_mcf_url)]:

# Download the files
print("Destination: {}".format(filepath))
general_helpers.download(url=url, filepath=filepath)

# Rename and unzip zip files
# Zip files contain a single folder with many subfiles. We just want
# the contents, so rename the subfiles to ignore their container
z = zipfile.ZipFile(filepath)
text_files = [file for file in z.infolist()
if file.filename.endswith('.txt')]

# For example, zip file contents ['foo/', 'foo/bar.txt, 'foo/baz.txt']
# would be extracted as ['bar.txt', 'baz.txt'] (with 'foo/' ignored)
for text_file in text_files:
text_file.filename = text_file.filename.split('/')[-1]
z.extract(text_file, path=destination_folder)
z.close()

# Remove the original zip file
#print("Removing: {}".format(filepath))
#os.remove(filepath)


def find_cpc_pgpub_urls():
"""
Search the CPC Bulk Data Storage System for the most recent CPC MCF .
This method is necessary because the MCF zip file may change names, and
multiple versions of the MCF file may be listed on the webpage.
If there are multiple urls, sorting alphabetically ensures that the
most recent version is returned.
"""
base_url = 'https://bulkdata.uspto.gov/data/patent/classification/cpc/'
page = urllib.request.urlopen(base_url)
tree = html.fromstring(page.read())

potential_pgpub_links = []
for link in tree.xpath('//a/@href'):
if (link.startswith("US_PGPub_CPC_MCF_Text")
and link.endswith(".zip")):
potential_pgpub_links.append(link)

# Since zip files are formatted Filename_YYYY-MM-DD.zip,
# the last sorted url corresponds to the latest version

latest_pgpub_link = base_url + sorted(potential_pgpub_links)[-1]

return latest_pgpub_link


def download_ipc(destination_folder):
""" Download and extract the most recent CPC to IPC Concordance """
# Find the correct CPC to IPC Concordance
ipc_url = find_ipc_url()
print(ipc_url)
print("___________")
ipc_filepath = os.path.join(destination_folder, "ipc_concordance.txt")

# Download the IPC text file
print("Destination: {}".format(ipc_filepath))
general_helpers.download(url=ipc_url, filepath=ipc_filepath)


def find_ipc_url():
""" Find the url of the CPC to IPC concordance in text format """
base_url = 'http://www.cooperativepatentclassification.org'
page = urllib.request.urlopen(base_url + '/cpcConcordances')
tree = html.fromstring(page.read())

potential_links = []
for link in tree.xpath('//a/@href'):
print(link)
if (link.lstrip('.').lstrip("/").startswith("cpc/concordances/cpc-ipc-concordance")
and link.endswith(".txt")):
potential_links.append(link)

# There should be exactly one link to the CPC to IPC concordance.
# Since files are not formatted nicely, we can't sort alphabetically to
# determine the correct file. If multiple links found, raise an exception
print(potential_links)
assert (len(set(potential_links)) == 1), "Unsure which URL to use of: " \
"{}".format(potential_links)
return base_url + '/' + potential_links[0]


############################################
# TESTS
############################################

def find_cpc_schema_url_test():
expected_url = 'https://www.cooperativepatentclassification.org/cpc/bulk/CPCSchemeXML202005.zip'
assert (find_cpc_schema_url() == expected_url)


def find_cpc_pgpub_urls_test():
expected_pgpub_url = 'https://bulkdata.uspto.gov/data/patent/classification/cpc/US_PGPub_CPC_MCF_Text_2020-06-01.zip'
assert ((find_cpc_pgpub_urls()) ==
(expected_pgpub_url))


def find_ipc_url_test():
expected_url = 'http://www.cooperativepatentclassification.org/cpcConcordances/CPCtoIPCtxtMay2020.txt'
assert (find_ipc_url() == expected_url)


if __name__ == '__main__':
""" Running this script will execute tests; importing it will not """

import sys
import datetime
import configparser
config = get_config()
# Find URLs correctly
#TODO: update these to reflect most recent dates
print(find_cpc_schema_url())
#find_cpc_schema_url_test()

print(find_cpc_pgpub_urls())
#find_cpc_pgpub_urls_test()

print(find_ipc_url())
#find_ipc_url_test()

destination_folder = '{}/{}'.format(config['FOLDERS']['WORKING_FOLDER'], 'cpc_input')

if not os.path.exists(destination_folder):
os.makedirs(destination_folder)
# Download CPC data, and manually inspect output
print(str(datetime.datetime.now()))
download_cpc_schema(destination_folder) # <1 min
print(str(datetime.datetime.now()))
download_cpc_pgpub_classifications(destination_folder) # few minutes
print(str(datetime.datetime.now()))
download_ipc(destination_folder) # <1 min
print(str(datetime.datetime.now()))


0 comments on commit dcdd51b

Please sign in to comment.