diff --git a/src/table2qb/core.clj b/src/table2qb/core.clj index b362c82..bf9bff4 100644 --- a/src/table2qb/core.clj +++ b/src/table2qb/core.clj @@ -11,7 +11,9 @@ [environ.core :as environ] [csv2rdf.csvw :as csvw] [csv2rdf.source :as source] - [grafter.rdf :as rdf])) + [grafter.rdf :as rdf] + [table2qb.source :as row-source]) + (:import [java.net URI])) ;; Config (def domain (environ/env :base-uri "http://gss-data.org.uk/")) @@ -250,6 +252,8 @@ (sequence (map (comp transform-columns validate-columns)) data))) +(def derive-observation (comp validate-columns transform-columns)) + (defn component->column [{:keys [name title property_template value_template datatype]}] (merge {"name" name "titles" name ;; could revert to title here (would need to do so in all output csv too) @@ -349,27 +353,21 @@ {"columns" (vec columns) "aboutUrl" codelist-uri}})) - -(defn components [reader] - (let [data (read-csv reader {"Label" :label - "Description" :description - "Component Type" :component_type - "Codelist" :codelist})] - (sequence (map (fn [row] - (-> row - (assoc :notation (gecu/slugize (:label row))) - (assoc :component_type_slug ({"Dimension" "dimension" - "Measure" "measure" - "Attribute" "attribute"} - (row :component_type))) - (assoc :property_slug (gecu/propertize (:label row))) - (assoc :class_slug (gecu/classize (:label row))) - (update :component_type {"Dimension" "qb:DimensionProperty" - "Measure" "qb:MeasureProperty" - "Attribute" "qb:AttributeProperty"}) - (assoc :parent_property (if (= "Measure" (:component_type row)) - "http://purl.org/linked-data/sdmx/2009/measure#obsValue"))))) - data))) +(defn derive-components [row] + (-> row + (assoc :notation (gecu/slugize (:label row))) + (assoc :component_type_slug ({"Dimension" "dimension" + "Measure" "measure" + "Attribute" "attribute"} + (row :component_type))) + (assoc :property_slug (gecu/propertize (:label row))) + (assoc :class_slug (gecu/classize (:label row))) + (update :component_type {"Dimension" "qb:DimensionProperty" + "Measure" "qb:MeasureProperty" + "Attribute" "qb:AttributeProperty"}) + (assoc :parent_property (if (= "Measure" (:component_type row)) + "http://purl.org/linked-data/sdmx/2009/measure#obsValue" + "")))) (defn components-metadata [csv-url] (let [ontology-uri (str domain-def "ontology/components")] @@ -428,12 +426,6 @@ "valueUrl" "rdf:Property"}], "aboutUrl" (str domain-def "{component_type_slug}/{notation}")}})) ;; property-slug? -(defn codes [reader] - (let [data (read-csv reader {"Label" :label - "Notation" :notation - "Parent Notation", :parent_notation})] - data)) - (defn codelist-metadata [csv-url codelist-name codelist-slug] (let [codelist-uri (str domain-def "concept-scheme/" codelist-slug) code-uri (str domain-def "concept/" codelist-slug "/{notation}") @@ -482,92 +474,108 @@ ;; pipelines -(defn csv-file->metadata-uri [csv-file] - (.resolve (.toURI csv-file) "meta.json")) - -(defn create-metadata-source [csv-file-str metadata-json] - (let [meta-uri (csv-file->metadata-uri (io/file csv-file-str))] +(defn create-metadata-source [^URI tabular-uri metadata-json] + (let [meta-uri (.resolve tabular-uri "metadata.json")] (source/->MapMetadataSource meta-uri metadata-json))) -(defn tempfile [filename extension] - (java.io.File/createTempFile filename extension)) - (def csv2rdf-config {:mode :standard}) -(defn codelist->csvw [input-csv codelist-csv] - (with-open [reader (io/reader input-csv) - writer (io/writer codelist-csv)] - (write-csv writer (codes reader)))) +(defn make-table-resolver [uri->table-source-map] + (fn [uri] + (if-let [table-source (get uri->table-source-map uri)] + table-source + (throw (IllegalArgumentException. (str "Unexepcted table URI: " uri)))))) -(defn codelist->csvw->rdf [input-csv codelist-name codelist-slug codelist-csv] - (codelist->csvw input-csv codelist-csv) - (let [codelist-meta (codelist-metadata codelist-csv codelist-name codelist-slug)] - (csvw/csv->rdf codelist-csv (create-metadata-source input-csv codelist-meta) csv2rdf-config))) +(defn codes-source [input-file] + (let [header-mapping {"Label" :label "Notation" :notation "Parent Notation" :parent_notation} + output-column-names [:label :notation :parent_notation]] + (row-source/header-replacing-source input-file header-mapping output-column-names))) (defn codelist-pipeline [input-csv codelist-name codelist-slug] - (let [codelist-csv (tempfile codelist-slug ".csv")] - (codelist->csvw->rdf input-csv codelist-name codelist-slug codelist-csv))) - -(defn components->csvw [input-csv components-csv] - (with-open [reader (io/reader input-csv) - writer (io/writer components-csv)] - (write-csv writer (components reader)))) - -(defn components->csvw->rdf [input-csv components-csv] - (components->csvw input-csv components-csv) - (let [components-meta (components-metadata components-csv)] - (csvw/csv->rdf components-csv (create-metadata-source input-csv components-meta) csv2rdf-config))) + (let [input-file (io/file input-csv) + input-uri (.toURI input-file) + table-source (codes-source input-file) + table-resolver (make-table-resolver {input-uri table-source}) + options (assoc csv2rdf-config :table-resolver table-resolver) + codelist-meta (codelist-metadata (.toURI input-file) codelist-name codelist-slug)] + (csvw/csv->rdf table-source (create-metadata-source input-uri codelist-meta) options))) + +(defn components-source [input-file] + (let [header-mapping {"Label" :label "Description" :description "Component Type" :component_type "Codelist" :codelist} + output-column-names [:label :description :component_type :codelist :notation :component_type_slug :property_slug :class_slug :parent_property]] + (row-source/->TransformingRowSource input-file + header-mapping + output-column-names + derive-components))) (defn components-pipeline [input-csv] - (let [components-csv (tempfile "components" ".csv")] - (components->csvw->rdf input-csv components-csv))) - -(defn cube->csvw [input-csv component-specifications-csv observations-csv] - (with-open [reader (io/reader input-csv) - writer (io/writer component-specifications-csv)] - (write-csv writer (component-specifications reader))) + (let [input-file (io/file input-csv) + input-uri (.toURI input-file) + table-source (components-source input-file) + table-resolver (make-table-resolver {input-uri table-source}) + options (assoc csv2rdf-config :table-resolver table-resolver) + components-meta (components-metadata input-uri)] + (csvw/csv->rdf table-source (create-metadata-source input-uri components-meta) options))) + +(defn metadata-output-column-keys [meta] + (let [cols (get-in meta ["tableSchema" "columns"])] + (->> cols + (remove (fn [col] (get col "virtual"))) + (mapv (fn [col] (keyword (get col "name"))))))) + +(defn create-observations-source [tabular-file] + (let [dummy-meta (with-open [reader (io/reader tabular-file)] + (observations-metadata reader (URI. "http://dummy") "dummy")) + col-keys (metadata-output-column-keys dummy-meta)] + (row-source/->TransformingRowSource tabular-file title->name col-keys derive-observation))) + +(defn get-component-specification-rows + "Returns a collection of component specification maps from the given observations file." + [observations-file] + (with-open [reader (io/reader observations-file)] + (into [] (component-specifications reader)))) + +(defn component-specification-source [observations-file tabular-uri] + (let [component-specification-rows (get-component-specification-rows observations-file) + output-column-names [:component_slug :component_attachment :component_property]] + (row-source/->MemoryRowSource tabular-uri output-column-names component-specification-rows))) - (with-open [reader (io/reader input-csv) - writer (io/writer observations-csv)] - (write-csv writer (observations reader)))) +(defn cube-pipeline [input-csv dataset-name dataset-slug] + (let [input-file (io/file input-csv) + observations-source (create-observations-source input-file) + input-uri (.toURI input-file) -(defn cube->csvw->rdf [input-csv dataset-name dataset-slug component-specifications-csv observations-csv] - (cube->csvw input-csv component-specifications-csv observations-csv) + component-specifications-source (component-specification-source input-file input-uri) + component-spec-resolver (make-table-resolver {input-uri component-specifications-source}) - (let [component-specification-metadata-meta (component-specification-metadata component-specifications-csv dataset-name dataset-slug) - dataset-metadata-meta (dataset-metadata component-specifications-csv dataset-name dataset-slug) - dsd-metadata-meta (data-structure-definition-metadata component-specifications-csv dataset-name dataset-slug) + component-specification-metadata-meta (component-specification-metadata input-uri dataset-name dataset-slug) + dataset-metadata-meta (dataset-metadata input-uri dataset-name dataset-slug) + dsd-metadata-meta (data-structure-definition-metadata input-uri dataset-name dataset-slug) observations-metadata-meta (with-open [reader (io/reader input-csv)] - (observations-metadata reader observations-csv dataset-slug)) - used-codes-codelists-metadata-meta (used-codes-codelists-metadata component-specifications-csv dataset-slug) + (observations-metadata reader input-uri dataset-slug)) + used-codes-codelists-metadata-meta (used-codes-codelists-metadata input-uri dataset-slug) used-codes-codes-metadata-meta (with-open [reader (io/reader input-csv)] - (used-codes-codes-metadata reader observations-csv dataset-slug))] + (used-codes-codes-metadata reader input-uri dataset-slug))] ;;TODO: don't use concat (concat - (csvw/csv->rdf component-specifications-csv (create-metadata-source input-csv component-specification-metadata-meta) {:mode :standard}) - (csvw/csv->rdf component-specifications-csv (create-metadata-source input-csv dataset-metadata-meta) {:mode :standard}) - (csvw/csv->rdf component-specifications-csv (create-metadata-source input-csv dsd-metadata-meta) {:mode :standard}) - (csvw/csv->rdf observations-csv (create-metadata-source input-csv observations-metadata-meta) {:mode :standard}) - (csvw/csv->rdf component-specifications-csv (create-metadata-source input-csv used-codes-codelists-metadata-meta) {:mode :standard}) - (csvw/csv->rdf observations-csv (create-metadata-source input-csv used-codes-codes-metadata-meta) {:mode :standard})))) - -(defn cube-pipeline [input-csv dataset-name dataset-slug] - (let [component-specifications-csv (tempfile "component-specifications" ".csv") - observations-csv (tempfile "observations" ".csv")] - (cube->csvw->rdf input-csv dataset-name dataset-slug - component-specifications-csv observations-csv))) + (csvw/csv->rdf component-specifications-source (create-metadata-source input-uri component-specification-metadata-meta) {:mode :standard :table-resolver component-spec-resolver}) + (csvw/csv->rdf component-specifications-source (create-metadata-source input-uri dataset-metadata-meta) {:mode :standard :table-resolver component-spec-resolver}) + (csvw/csv->rdf component-specifications-source (create-metadata-source input-uri dsd-metadata-meta) {:mode :standard :table-resolver component-spec-resolver}) + (csvw/csv->rdf input-file (create-metadata-source input-uri observations-metadata-meta) {:mode :standard :table-resolver (make-table-resolver {input-uri observations-source})}) + (csvw/csv->rdf component-specifications-source (create-metadata-source input-uri used-codes-codelists-metadata-meta) {:mode :standard :table-resolver component-spec-resolver}) + (csvw/csv->rdf input-file (create-metadata-source input-uri used-codes-codes-metadata-meta) {:mode :standard :table-resolver (make-table-resolver {input-uri observations-source})})))) (defn serialise-demo [out-dir] (with-open [output-stream (io/output-stream (str out-dir "/components.ttl"))] - (let [writer (gio/rdf-serializer output-stream :format :ttl)] - (rdf/add writer - (components-pipeline "./examples/regional-trade/csv/components.csv")))) + (let [writer (gio/rdf-serializer output-stream :format :ttl)] + (rdf/add writer + (components-pipeline "./examples/regional-trade/csv/components.csv")))) (with-open [output-stream (io/output-stream (str out-dir "/flow-directions.ttl"))] - (let [writer (gio/rdf-serializer output-stream :format :ttl)] - (rdf/add writer - (codelist-pipeline "./examples/regional-trade/csv/flow-directions.csv" - "Flow Directions" "flow-directions")))) + (let [writer (gio/rdf-serializer output-stream :format :ttl)] + (rdf/add writer + (codelist-pipeline "./examples/regional-trade/csv/flow-directions.csv" + "Flow Directions" "flow-directions")))) (with-open [output-stream (io/output-stream (str out-dir "/sitc-sections.ttl"))] (let [writer (gio/rdf-serializer output-stream :format :ttl)] diff --git a/src/table2qb/source.clj b/src/table2qb/source.clj new file mode 100644 index 0000000..63053fe --- /dev/null +++ b/src/table2qb/source.clj @@ -0,0 +1,100 @@ +(ns table2qb.source + "The csv2rdf RowSource protocol represents a source of a read-once sequence of logical CSV records. The source + must be re-opened if multiple iterations are required. The pipelines defined in table2qb.core sometimes need to + modify or derive additional data from the raw input files - the sources defined in this namespace allow the CSV + files input into the pipelines to be transformed before being presented to the CSVW process." + (:require [csv2rdf.metadata.dialect :as dialect] + [csv2rdf.source :as source] + [clojure.string :as string] + [csv2rdf.tabular.csv.reader :as reader])) + +(defn make-row + "Creates a CSV record in the format expected by csv2rdf. cells should be a vector of strings in the same order + as the declared column headings. Row numbers are indexed from 1." + [row-number cells] + {:source-row-number row-number + :content (string/join ", " cells) + :comment nil + :type :data + :cells cells}) + +(defn column-keys->header-row + "Creates the header record for a given collection of column keywords." + [column-keys] + (make-row 1 (mapv name column-keys))) + +(defn make-data-row-transformer + "Creates a mapping function for CSV records. input-column-keys contains the key names for the corresponding + cell values in the row - this should match the number of columns in the input file. output-column-keys + contains the names of the columns in the output - these should match the columns defined in the corresponding + metadata file. transform-fn is a function from source row map to result row map - the keys in the source map + match those defined in input-column-keys. The result map should contain all of the keys specified in + output-column-keys." + [input-column-keys output-column-keys transform-fn] + (fn [{:keys [cells] :as row}] + (let [source-map (zipmap input-column-keys cells) + result-map (transform-fn source-map) + result-cells (mapv result-map output-column-keys)] + (assoc row :cells result-cells)))) + +(defn open-transformed-rows + "Opens the given tabular source and transforms each data row according to the given transform function. + The source file should contain a single header row. column-header-mapping should be a map from source + column names in the input data to keys to map the corresponding cell value to in the map provided + to transform-fn. output-column-keys should contain the keys for the non-virtual columns defined in + the corresponding metadata file." + ([tabular-file dialect column-header-mapping output-column-keys] + (open-transformed-rows tabular-file dialect column-header-mapping output-column-keys identity)) + ([tabular-file dialect column-header-mapping output-column-keys transform-fn] + (let [{:keys [options rows]} (reader/read-tabular-source tabular-file dialect) + rows (reader/row-contents->rows rows options) + headers (:cells (first rows)) ;;TODO: handle empty input file + input-column-keys (mapv column-header-mapping headers) + header-row (column-keys->header-row output-column-keys) + row-transform-fn (make-data-row-transformer input-column-keys output-column-keys transform-fn)] + {:options options + :rows (cons header-row (map row-transform-fn (drop 1 rows)))}))) + +;;represents a source of CSV records for an in-memory sequence of data maps. uri is the logical URI of +;;the tabular source. input-column-keys contains the keys for the non-virtual columns defined in the +;;corresponding metadata file. Each of keys in input-column-keys should exist on each map within rows. +(defrecord MemoryRowSource [uri input-column-keys rows] + source/URIable + (->uri [_this] uri) + + reader/RowSource + (open-rows [_this dialect] + (let [options (dialect/dialect->options dialect) + header (column-keys->header-row input-column-keys) + data-rows (map-indexed (fn [row-index row] + (let [cells (mapv (fn [k] (get row k "")) input-column-keys) + ;;row numbers start at 1, plus header row + row-number (+ 2 row-index)] + (make-row row-number cells))) + rows)] + {:options options + :rows (cons header data-rows)}))) + +;;represents a source of CSV records which are loaded and then transformed from the given tabular data +;;source. The column heading are mapped to key using input-column-mapping - this mapping is used to +;;construct a map from key to the corresponding cell value for every row in the input. The row is +;;transformed using transform-fn and then projected into a sequence of cell values according to +;;output-column-keys. The keys in output-column-keys should correspond to the names of the columns +;;in the associated metadat file. +(defrecord TransformingRowSource [tabular-file input-column-mapping output-column-keys transform-fn] + source/URIable + (->uri [_this] (.toURI tabular-file)) + + reader/RowSource + (open-rows [_this dialect] + (open-transformed-rows tabular-file dialect input-column-mapping output-column-keys transform-fn))) + +(defn header-replacing-source + "Returns a RowSource for a tabular file which renames the source column headers according to + input-header-mapping. output-column-keys defines the order of the mapped columns within the + output sequence - these should match the order defined in the corresponding metadata file." + [tabular-file input-header-mapping output-column-keys] + (->TransformingRowSource tabular-file input-header-mapping output-column-keys identity)) + +(defn get-rows [source] + (:rows (reader/open-rows source dialect/default-dialect))) \ No newline at end of file diff --git a/test/table2qb/core_test.clj b/test/table2qb/core_test.clj index facd5fb..963707f 100644 --- a/test/table2qb/core_test.clj +++ b/test/table2qb/core_test.clj @@ -1,6 +1,7 @@ (ns table2qb.core-test (:require [clojure.test :refer :all] [table2qb.core :refer :all] + [table2qb.source :as source] [clojure.java.io :as io] [clojure.data :refer [diff]] [grafter.rdf :as rdf] @@ -55,46 +56,51 @@ (deftest components-test (testing "csv table" - (with-open [input-reader (io/reader (example-csv "regional-trade" "components.csv"))] - (let [components (doall (components input-reader))] - (testing "one row per component" - (is (= 4 (count components)))) - (testing "one column per attribute" - (testing "flow" - (let [flow (first-by :label "Flow" components)] - (are [attribute value] (= value (attribute flow)) - :notation "flow" - :description "Direction in which trade is measured" - :component_type "qb:DimensionProperty" - :component_type_slug "dimension" - :codelist (str domain-def "concept-scheme/flow-directions") - :property_slug "flow" - :class_slug "Flow" - :parent_property nil))) - (testing "gbp total" - (let [gbp-total (first-by :label "GBP Total" components)] - (are [attribute value] (= value (attribute gbp-total)) - :notation "gbp-total" - :component_type "qb:MeasureProperty" - :component_type_slug "measure" - :property_slug "gbpTotal" - :class_slug "GbpTotal" - :parent_property "http://purl.org/linked-data/sdmx/2009/measure#obsValue"))))))) + (let [components-file (io/file (example-csv "regional-trade" "components.csv")) + rows (doall (source/get-rows (components-source components-file))) + [[header] data-rows] (split-at 1 rows) + column-keys (map keyword (:cells header)) + components (map (fn [row] (zipmap column-keys (:cells row))) data-rows)] + (testing "one row per component" + (is (= 4 (count data-rows)))) + (testing "one column per attribute" + (testing "flow" + (let [flow (first-by :label "Flow" components)] + (are [attribute value] (= value (attribute flow)) + :notation "flow" + :description "Direction in which trade is measured" + :component_type "qb:DimensionProperty" + :component_type_slug "dimension" + :codelist (str domain-def "concept-scheme/flow-directions") + :property_slug "flow" + :class_slug "Flow" + :parent_property ""))) + (testing "gbp total" + (let [gbp-total (first-by :label "GBP Total" components)] + (are [attribute value] (= value (attribute gbp-total)) + :notation "gbp-total" + :component_type "qb:MeasureProperty" + :component_type_slug "measure" + :property_slug "gbpTotal" + :class_slug "GbpTotal" + :parent_property "http://purl.org/linked-data/sdmx/2009/measure#obsValue")))))) (testing "json metadata" (with-open [target-reader (io/reader (example-csvw "regional-trade" "components.json"))] (maps-match? (read-json target-reader) (components-metadata "components.csv"))))) - (deftest codelists-test (testing "csv table" - (with-open [input-reader (io/reader (example-csv "regional-trade" "flow-directions.csv"))] - (let [codes (doall (codes input-reader))] - (testing "one row per code" - (is (= 2 (count codes)))) - (testing "one column per attribute" - (is (= [:label :notation :parent_notation] - (-> codes first keys))))))) + (let [codes-file (io/file (example-csv "regional-trade" "flow-directions.csv")) + codes-source (codes-source codes-file) + codes-rows (doall (source/get-rows codes-source)) + [[header] data-rows] (split-at 1 codes-rows) + column-names (:cells header)] + (testing "one row per code" + (is (= 2 (count data-rows)))) + (testing "one column per attribute" + (is (= [:label :notation :parent_notation] + (mapv keyword column-names)))))) (testing "json metadata" (with-open [target-reader (io/reader (example-csvw "regional-trade" "flow-directions.json"))] (maps-match? (read-json target-reader)