From 13582532bf5d0edff4df4f78803eea3fc9077ede Mon Sep 17 00:00:00 2001 From: lkitching Date: Wed, 30 May 2018 12:03:18 +0100 Subject: [PATCH] Avoid writing tabular files within pipelines. Add implementations of the csv2rdf RowSource protocol which allow transformed versions of pipeline input files to be passed directly. The RowSource protocol represents tabular resources as a logical sequence of records, each containing the source row number and parsed data cells. Pipelines previously wrote transformed version of the input files to disk so they could be passed to the CSVW process. Add implementations of hte RowSource protocol which allow the transformation process to be done in memory and presents the transformed row records directly into the CSVW process. The number of component specifications derived within cube-pipeline is expected to be quite small. Load these into memory and add a RowSource implementation which returns the corresponding tabular rows to csv2rdf. Update the tests which check the format of the intermediate transformed data to used the transformed row sources. --- src/table2qb/core.clj | 194 +++++++++++++++++++----------------- src/table2qb/source.clj | 100 +++++++++++++++++++ test/table2qb/core_test.clj | 72 +++++++------ 3 files changed, 240 insertions(+), 126 deletions(-) create mode 100644 src/table2qb/source.clj diff --git a/src/table2qb/core.clj b/src/table2qb/core.clj index b362c82..bf9bff4 100644 --- a/src/table2qb/core.clj +++ b/src/table2qb/core.clj @@ -11,7 +11,9 @@ [environ.core :as environ] [csv2rdf.csvw :as csvw] [csv2rdf.source :as source] - [grafter.rdf :as rdf])) + [grafter.rdf :as rdf] + [table2qb.source :as row-source]) + (:import [java.net URI])) ;; Config (def domain (environ/env :base-uri "http://gss-data.org.uk/")) @@ -250,6 +252,8 @@ (sequence (map (comp transform-columns validate-columns)) data))) +(def derive-observation (comp validate-columns transform-columns)) + (defn component->column [{:keys [name title property_template value_template datatype]}] (merge {"name" name "titles" name ;; could revert to title here (would need to do so in all output csv too) @@ -349,27 +353,21 @@ {"columns" (vec columns) "aboutUrl" codelist-uri}})) - -(defn components [reader] - (let [data (read-csv reader {"Label" :label - "Description" :description - "Component Type" :component_type - "Codelist" :codelist})] - (sequence (map (fn [row] - (-> row - (assoc :notation (gecu/slugize (:label row))) - (assoc :component_type_slug ({"Dimension" "dimension" - "Measure" "measure" - "Attribute" "attribute"} - (row :component_type))) - (assoc :property_slug (gecu/propertize (:label row))) - (assoc :class_slug (gecu/classize (:label row))) - (update :component_type {"Dimension" "qb:DimensionProperty" - "Measure" "qb:MeasureProperty" - "Attribute" "qb:AttributeProperty"}) - (assoc :parent_property (if (= "Measure" (:component_type row)) - "http://purl.org/linked-data/sdmx/2009/measure#obsValue"))))) - data))) +(defn derive-components [row] + (-> row + (assoc :notation (gecu/slugize (:label row))) + (assoc :component_type_slug ({"Dimension" "dimension" + "Measure" "measure" + "Attribute" "attribute"} + (row :component_type))) + (assoc :property_slug (gecu/propertize (:label row))) + (assoc :class_slug (gecu/classize (:label row))) + (update :component_type {"Dimension" "qb:DimensionProperty" + "Measure" "qb:MeasureProperty" + "Attribute" "qb:AttributeProperty"}) + (assoc :parent_property (if (= "Measure" (:component_type row)) + "http://purl.org/linked-data/sdmx/2009/measure#obsValue" + "")))) (defn components-metadata [csv-url] (let [ontology-uri (str domain-def "ontology/components")] @@ -428,12 +426,6 @@ "valueUrl" "rdf:Property"}], "aboutUrl" (str domain-def "{component_type_slug}/{notation}")}})) ;; property-slug? -(defn codes [reader] - (let [data (read-csv reader {"Label" :label - "Notation" :notation - "Parent Notation", :parent_notation})] - data)) - (defn codelist-metadata [csv-url codelist-name codelist-slug] (let [codelist-uri (str domain-def "concept-scheme/" codelist-slug) code-uri (str domain-def "concept/" codelist-slug "/{notation}") @@ -482,92 +474,108 @@ ;; pipelines -(defn csv-file->metadata-uri [csv-file] - (.resolve (.toURI csv-file) "meta.json")) - -(defn create-metadata-source [csv-file-str metadata-json] - (let [meta-uri (csv-file->metadata-uri (io/file csv-file-str))] +(defn create-metadata-source [^URI tabular-uri metadata-json] + (let [meta-uri (.resolve tabular-uri "metadata.json")] (source/->MapMetadataSource meta-uri metadata-json))) -(defn tempfile [filename extension] - (java.io.File/createTempFile filename extension)) - (def csv2rdf-config {:mode :standard}) -(defn codelist->csvw [input-csv codelist-csv] - (with-open [reader (io/reader input-csv) - writer (io/writer codelist-csv)] - (write-csv writer (codes reader)))) +(defn make-table-resolver [uri->table-source-map] + (fn [uri] + (if-let [table-source (get uri->table-source-map uri)] + table-source + (throw (IllegalArgumentException. (str "Unexepcted table URI: " uri)))))) -(defn codelist->csvw->rdf [input-csv codelist-name codelist-slug codelist-csv] - (codelist->csvw input-csv codelist-csv) - (let [codelist-meta (codelist-metadata codelist-csv codelist-name codelist-slug)] - (csvw/csv->rdf codelist-csv (create-metadata-source input-csv codelist-meta) csv2rdf-config))) +(defn codes-source [input-file] + (let [header-mapping {"Label" :label "Notation" :notation "Parent Notation" :parent_notation} + output-column-names [:label :notation :parent_notation]] + (row-source/header-replacing-source input-file header-mapping output-column-names))) (defn codelist-pipeline [input-csv codelist-name codelist-slug] - (let [codelist-csv (tempfile codelist-slug ".csv")] - (codelist->csvw->rdf input-csv codelist-name codelist-slug codelist-csv))) - -(defn components->csvw [input-csv components-csv] - (with-open [reader (io/reader input-csv) - writer (io/writer components-csv)] - (write-csv writer (components reader)))) - -(defn components->csvw->rdf [input-csv components-csv] - (components->csvw input-csv components-csv) - (let [components-meta (components-metadata components-csv)] - (csvw/csv->rdf components-csv (create-metadata-source input-csv components-meta) csv2rdf-config))) + (let [input-file (io/file input-csv) + input-uri (.toURI input-file) + table-source (codes-source input-file) + table-resolver (make-table-resolver {input-uri table-source}) + options (assoc csv2rdf-config :table-resolver table-resolver) + codelist-meta (codelist-metadata (.toURI input-file) codelist-name codelist-slug)] + (csvw/csv->rdf table-source (create-metadata-source input-uri codelist-meta) options))) + +(defn components-source [input-file] + (let [header-mapping {"Label" :label "Description" :description "Component Type" :component_type "Codelist" :codelist} + output-column-names [:label :description :component_type :codelist :notation :component_type_slug :property_slug :class_slug :parent_property]] + (row-source/->TransformingRowSource input-file + header-mapping + output-column-names + derive-components))) (defn components-pipeline [input-csv] - (let [components-csv (tempfile "components" ".csv")] - (components->csvw->rdf input-csv components-csv))) - -(defn cube->csvw [input-csv component-specifications-csv observations-csv] - (with-open [reader (io/reader input-csv) - writer (io/writer component-specifications-csv)] - (write-csv writer (component-specifications reader))) + (let [input-file (io/file input-csv) + input-uri (.toURI input-file) + table-source (components-source input-file) + table-resolver (make-table-resolver {input-uri table-source}) + options (assoc csv2rdf-config :table-resolver table-resolver) + components-meta (components-metadata input-uri)] + (csvw/csv->rdf table-source (create-metadata-source input-uri components-meta) options))) + +(defn metadata-output-column-keys [meta] + (let [cols (get-in meta ["tableSchema" "columns"])] + (->> cols + (remove (fn [col] (get col "virtual"))) + (mapv (fn [col] (keyword (get col "name"))))))) + +(defn create-observations-source [tabular-file] + (let [dummy-meta (with-open [reader (io/reader tabular-file)] + (observations-metadata reader (URI. "http://dummy") "dummy")) + col-keys (metadata-output-column-keys dummy-meta)] + (row-source/->TransformingRowSource tabular-file title->name col-keys derive-observation))) + +(defn get-component-specification-rows + "Returns a collection of component specification maps from the given observations file." + [observations-file] + (with-open [reader (io/reader observations-file)] + (into [] (component-specifications reader)))) + +(defn component-specification-source [observations-file tabular-uri] + (let [component-specification-rows (get-component-specification-rows observations-file) + output-column-names [:component_slug :component_attachment :component_property]] + (row-source/->MemoryRowSource tabular-uri output-column-names component-specification-rows))) - (with-open [reader (io/reader input-csv) - writer (io/writer observations-csv)] - (write-csv writer (observations reader)))) +(defn cube-pipeline [input-csv dataset-name dataset-slug] + (let [input-file (io/file input-csv) + observations-source (create-observations-source input-file) + input-uri (.toURI input-file) -(defn cube->csvw->rdf [input-csv dataset-name dataset-slug component-specifications-csv observations-csv] - (cube->csvw input-csv component-specifications-csv observations-csv) + component-specifications-source (component-specification-source input-file input-uri) + component-spec-resolver (make-table-resolver {input-uri component-specifications-source}) - (let [component-specification-metadata-meta (component-specification-metadata component-specifications-csv dataset-name dataset-slug) - dataset-metadata-meta (dataset-metadata component-specifications-csv dataset-name dataset-slug) - dsd-metadata-meta (data-structure-definition-metadata component-specifications-csv dataset-name dataset-slug) + component-specification-metadata-meta (component-specification-metadata input-uri dataset-name dataset-slug) + dataset-metadata-meta (dataset-metadata input-uri dataset-name dataset-slug) + dsd-metadata-meta (data-structure-definition-metadata input-uri dataset-name dataset-slug) observations-metadata-meta (with-open [reader (io/reader input-csv)] - (observations-metadata reader observations-csv dataset-slug)) - used-codes-codelists-metadata-meta (used-codes-codelists-metadata component-specifications-csv dataset-slug) + (observations-metadata reader input-uri dataset-slug)) + used-codes-codelists-metadata-meta (used-codes-codelists-metadata input-uri dataset-slug) used-codes-codes-metadata-meta (with-open [reader (io/reader input-csv)] - (used-codes-codes-metadata reader observations-csv dataset-slug))] + (used-codes-codes-metadata reader input-uri dataset-slug))] ;;TODO: don't use concat (concat - (csvw/csv->rdf component-specifications-csv (create-metadata-source input-csv component-specification-metadata-meta) {:mode :standard}) - (csvw/csv->rdf component-specifications-csv (create-metadata-source input-csv dataset-metadata-meta) {:mode :standard}) - (csvw/csv->rdf component-specifications-csv (create-metadata-source input-csv dsd-metadata-meta) {:mode :standard}) - (csvw/csv->rdf observations-csv (create-metadata-source input-csv observations-metadata-meta) {:mode :standard}) - (csvw/csv->rdf component-specifications-csv (create-metadata-source input-csv used-codes-codelists-metadata-meta) {:mode :standard}) - (csvw/csv->rdf observations-csv (create-metadata-source input-csv used-codes-codes-metadata-meta) {:mode :standard})))) - -(defn cube-pipeline [input-csv dataset-name dataset-slug] - (let [component-specifications-csv (tempfile "component-specifications" ".csv") - observations-csv (tempfile "observations" ".csv")] - (cube->csvw->rdf input-csv dataset-name dataset-slug - component-specifications-csv observations-csv))) + (csvw/csv->rdf component-specifications-source (create-metadata-source input-uri component-specification-metadata-meta) {:mode :standard :table-resolver component-spec-resolver}) + (csvw/csv->rdf component-specifications-source (create-metadata-source input-uri dataset-metadata-meta) {:mode :standard :table-resolver component-spec-resolver}) + (csvw/csv->rdf component-specifications-source (create-metadata-source input-uri dsd-metadata-meta) {:mode :standard :table-resolver component-spec-resolver}) + (csvw/csv->rdf input-file (create-metadata-source input-uri observations-metadata-meta) {:mode :standard :table-resolver (make-table-resolver {input-uri observations-source})}) + (csvw/csv->rdf component-specifications-source (create-metadata-source input-uri used-codes-codelists-metadata-meta) {:mode :standard :table-resolver component-spec-resolver}) + (csvw/csv->rdf input-file (create-metadata-source input-uri used-codes-codes-metadata-meta) {:mode :standard :table-resolver (make-table-resolver {input-uri observations-source})})))) (defn serialise-demo [out-dir] (with-open [output-stream (io/output-stream (str out-dir "/components.ttl"))] - (let [writer (gio/rdf-serializer output-stream :format :ttl)] - (rdf/add writer - (components-pipeline "./examples/regional-trade/csv/components.csv")))) + (let [writer (gio/rdf-serializer output-stream :format :ttl)] + (rdf/add writer + (components-pipeline "./examples/regional-trade/csv/components.csv")))) (with-open [output-stream (io/output-stream (str out-dir "/flow-directions.ttl"))] - (let [writer (gio/rdf-serializer output-stream :format :ttl)] - (rdf/add writer - (codelist-pipeline "./examples/regional-trade/csv/flow-directions.csv" - "Flow Directions" "flow-directions")))) + (let [writer (gio/rdf-serializer output-stream :format :ttl)] + (rdf/add writer + (codelist-pipeline "./examples/regional-trade/csv/flow-directions.csv" + "Flow Directions" "flow-directions")))) (with-open [output-stream (io/output-stream (str out-dir "/sitc-sections.ttl"))] (let [writer (gio/rdf-serializer output-stream :format :ttl)] diff --git a/src/table2qb/source.clj b/src/table2qb/source.clj new file mode 100644 index 0000000..63053fe --- /dev/null +++ b/src/table2qb/source.clj @@ -0,0 +1,100 @@ +(ns table2qb.source + "The csv2rdf RowSource protocol represents a source of a read-once sequence of logical CSV records. The source + must be re-opened if multiple iterations are required. The pipelines defined in table2qb.core sometimes need to + modify or derive additional data from the raw input files - the sources defined in this namespace allow the CSV + files input into the pipelines to be transformed before being presented to the CSVW process." + (:require [csv2rdf.metadata.dialect :as dialect] + [csv2rdf.source :as source] + [clojure.string :as string] + [csv2rdf.tabular.csv.reader :as reader])) + +(defn make-row + "Creates a CSV record in the format expected by csv2rdf. cells should be a vector of strings in the same order + as the declared column headings. Row numbers are indexed from 1." + [row-number cells] + {:source-row-number row-number + :content (string/join ", " cells) + :comment nil + :type :data + :cells cells}) + +(defn column-keys->header-row + "Creates the header record for a given collection of column keywords." + [column-keys] + (make-row 1 (mapv name column-keys))) + +(defn make-data-row-transformer + "Creates a mapping function for CSV records. input-column-keys contains the key names for the corresponding + cell values in the row - this should match the number of columns in the input file. output-column-keys + contains the names of the columns in the output - these should match the columns defined in the corresponding + metadata file. transform-fn is a function from source row map to result row map - the keys in the source map + match those defined in input-column-keys. The result map should contain all of the keys specified in + output-column-keys." + [input-column-keys output-column-keys transform-fn] + (fn [{:keys [cells] :as row}] + (let [source-map (zipmap input-column-keys cells) + result-map (transform-fn source-map) + result-cells (mapv result-map output-column-keys)] + (assoc row :cells result-cells)))) + +(defn open-transformed-rows + "Opens the given tabular source and transforms each data row according to the given transform function. + The source file should contain a single header row. column-header-mapping should be a map from source + column names in the input data to keys to map the corresponding cell value to in the map provided + to transform-fn. output-column-keys should contain the keys for the non-virtual columns defined in + the corresponding metadata file." + ([tabular-file dialect column-header-mapping output-column-keys] + (open-transformed-rows tabular-file dialect column-header-mapping output-column-keys identity)) + ([tabular-file dialect column-header-mapping output-column-keys transform-fn] + (let [{:keys [options rows]} (reader/read-tabular-source tabular-file dialect) + rows (reader/row-contents->rows rows options) + headers (:cells (first rows)) ;;TODO: handle empty input file + input-column-keys (mapv column-header-mapping headers) + header-row (column-keys->header-row output-column-keys) + row-transform-fn (make-data-row-transformer input-column-keys output-column-keys transform-fn)] + {:options options + :rows (cons header-row (map row-transform-fn (drop 1 rows)))}))) + +;;represents a source of CSV records for an in-memory sequence of data maps. uri is the logical URI of +;;the tabular source. input-column-keys contains the keys for the non-virtual columns defined in the +;;corresponding metadata file. Each of keys in input-column-keys should exist on each map within rows. +(defrecord MemoryRowSource [uri input-column-keys rows] + source/URIable + (->uri [_this] uri) + + reader/RowSource + (open-rows [_this dialect] + (let [options (dialect/dialect->options dialect) + header (column-keys->header-row input-column-keys) + data-rows (map-indexed (fn [row-index row] + (let [cells (mapv (fn [k] (get row k "")) input-column-keys) + ;;row numbers start at 1, plus header row + row-number (+ 2 row-index)] + (make-row row-number cells))) + rows)] + {:options options + :rows (cons header data-rows)}))) + +;;represents a source of CSV records which are loaded and then transformed from the given tabular data +;;source. The column heading are mapped to key using input-column-mapping - this mapping is used to +;;construct a map from key to the corresponding cell value for every row in the input. The row is +;;transformed using transform-fn and then projected into a sequence of cell values according to +;;output-column-keys. The keys in output-column-keys should correspond to the names of the columns +;;in the associated metadat file. +(defrecord TransformingRowSource [tabular-file input-column-mapping output-column-keys transform-fn] + source/URIable + (->uri [_this] (.toURI tabular-file)) + + reader/RowSource + (open-rows [_this dialect] + (open-transformed-rows tabular-file dialect input-column-mapping output-column-keys transform-fn))) + +(defn header-replacing-source + "Returns a RowSource for a tabular file which renames the source column headers according to + input-header-mapping. output-column-keys defines the order of the mapped columns within the + output sequence - these should match the order defined in the corresponding metadata file." + [tabular-file input-header-mapping output-column-keys] + (->TransformingRowSource tabular-file input-header-mapping output-column-keys identity)) + +(defn get-rows [source] + (:rows (reader/open-rows source dialect/default-dialect))) \ No newline at end of file diff --git a/test/table2qb/core_test.clj b/test/table2qb/core_test.clj index facd5fb..963707f 100644 --- a/test/table2qb/core_test.clj +++ b/test/table2qb/core_test.clj @@ -1,6 +1,7 @@ (ns table2qb.core-test (:require [clojure.test :refer :all] [table2qb.core :refer :all] + [table2qb.source :as source] [clojure.java.io :as io] [clojure.data :refer [diff]] [grafter.rdf :as rdf] @@ -55,46 +56,51 @@ (deftest components-test (testing "csv table" - (with-open [input-reader (io/reader (example-csv "regional-trade" "components.csv"))] - (let [components (doall (components input-reader))] - (testing "one row per component" - (is (= 4 (count components)))) - (testing "one column per attribute" - (testing "flow" - (let [flow (first-by :label "Flow" components)] - (are [attribute value] (= value (attribute flow)) - :notation "flow" - :description "Direction in which trade is measured" - :component_type "qb:DimensionProperty" - :component_type_slug "dimension" - :codelist (str domain-def "concept-scheme/flow-directions") - :property_slug "flow" - :class_slug "Flow" - :parent_property nil))) - (testing "gbp total" - (let [gbp-total (first-by :label "GBP Total" components)] - (are [attribute value] (= value (attribute gbp-total)) - :notation "gbp-total" - :component_type "qb:MeasureProperty" - :component_type_slug "measure" - :property_slug "gbpTotal" - :class_slug "GbpTotal" - :parent_property "http://purl.org/linked-data/sdmx/2009/measure#obsValue"))))))) + (let [components-file (io/file (example-csv "regional-trade" "components.csv")) + rows (doall (source/get-rows (components-source components-file))) + [[header] data-rows] (split-at 1 rows) + column-keys (map keyword (:cells header)) + components (map (fn [row] (zipmap column-keys (:cells row))) data-rows)] + (testing "one row per component" + (is (= 4 (count data-rows)))) + (testing "one column per attribute" + (testing "flow" + (let [flow (first-by :label "Flow" components)] + (are [attribute value] (= value (attribute flow)) + :notation "flow" + :description "Direction in which trade is measured" + :component_type "qb:DimensionProperty" + :component_type_slug "dimension" + :codelist (str domain-def "concept-scheme/flow-directions") + :property_slug "flow" + :class_slug "Flow" + :parent_property ""))) + (testing "gbp total" + (let [gbp-total (first-by :label "GBP Total" components)] + (are [attribute value] (= value (attribute gbp-total)) + :notation "gbp-total" + :component_type "qb:MeasureProperty" + :component_type_slug "measure" + :property_slug "gbpTotal" + :class_slug "GbpTotal" + :parent_property "http://purl.org/linked-data/sdmx/2009/measure#obsValue")))))) (testing "json metadata" (with-open [target-reader (io/reader (example-csvw "regional-trade" "components.json"))] (maps-match? (read-json target-reader) (components-metadata "components.csv"))))) - (deftest codelists-test (testing "csv table" - (with-open [input-reader (io/reader (example-csv "regional-trade" "flow-directions.csv"))] - (let [codes (doall (codes input-reader))] - (testing "one row per code" - (is (= 2 (count codes)))) - (testing "one column per attribute" - (is (= [:label :notation :parent_notation] - (-> codes first keys))))))) + (let [codes-file (io/file (example-csv "regional-trade" "flow-directions.csv")) + codes-source (codes-source codes-file) + codes-rows (doall (source/get-rows codes-source)) + [[header] data-rows] (split-at 1 codes-rows) + column-names (:cells header)] + (testing "one row per code" + (is (= 2 (count data-rows)))) + (testing "one column per attribute" + (is (= [:label :notation :parent_notation] + (mapv keyword column-names)))))) (testing "json metadata" (with-open [target-reader (io/reader (example-csvw "regional-trade" "flow-directions.json"))] (maps-match? (read-json target-reader)