Skip to content

Commit

Permalink
Fix issue 137: added Transform.CurrentRawRecord() for caller of omn…
Browse files Browse the repository at this point in the history
…iparser to access the raw ingested record. (#138)

See details in #137.
  • Loading branch information
jf-tech authored Jan 9, 2021
1 parent 69749f5 commit 4aa56d6
Show file tree
Hide file tree
Showing 19 changed files with 3,606 additions and 3,314 deletions.
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
[![PkgGoDev](https://pkg.go.dev/badge/github.com/jf-tech/omniparser)](https://pkg.go.dev/github.com/jf-tech/omniparser)
[![Mentioned in Awesome Go](https://awesome.re/mentioned-badge.svg)](https://github.com/avelino/awesome-go)

Omniparser is a native Golang ETL parser that ingests input data of various formats (**CSV, txt, fixed length/width, XML, EDI/X12/EDIFACT, JSON**, and
custom formats) in streaming fashion and transforms data into desired JSON output based on a schema written in JSON.
Omniparser is a native Golang ETL parser that ingests input data of various formats (**CSV, txt, fixed length/width,
XML, EDI/X12/EDIFACT, JSON**, and custom formats) in streaming fashion and transforms data into desired JSON output
based on a schema written in JSON.

Golang Version: 1.14

Expand Down Expand Up @@ -64,6 +65,7 @@ situations.
- Golang 1.14

## Recent Major Feature Additions/Changes
- Added `Transform.CurrentRawRecord()` for caller of omniparser to access the raw ingested record.
- Deprecated `custom_parse` in favor of `custom_func` (`custom_parse` is still usable for
back-compatibility, it is just removed from all public docs and samples).
- Added `NonValidatingReader` EDI segment reader.
Expand Down
7 changes: 6 additions & 1 deletion doc/programmability.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ for {
}
if err != nil { ... }
// output contains a []byte of the ingested and transformed record.
raw, err := transform.CurrentRawRecord()
if err != nil { ... }
rawRecord := raw.(*omniv21.RawRecord) // assuming the schema is of `omni.2.1` version.
fmt.Println(rawRecord.UUIDv3()) // rawRecord.UUIDv3() returns a stable hash of the current raw record.
}
```
Note this out-of-box omniparser setup contains only the `omni.2.1` schema handler, meaning only schemas
Expand Down Expand Up @@ -256,4 +261,4 @@ for {
See [IDR](#idr) notes about the JSON/XML readers above.
## XML Reader
See [IDR](#idr) notes about the JSON/XML readers above.
See [IDR](#idr) notes about the JSON/XML readers above.
37 changes: 30 additions & 7 deletions extensions/omniv21/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,32 +8,55 @@ import (
"github.com/jf-tech/omniparser/errs"
"github.com/jf-tech/omniparser/extensions/omniv21/fileformat"
"github.com/jf-tech/omniparser/extensions/omniv21/transform"
"github.com/jf-tech/omniparser/idr"
"github.com/jf-tech/omniparser/transformctx"
)

// RawRecord contains the raw data ingested in from the input stream in the form of an IDR tree.
// Note callers outside this package should absolutely make **NO** modifications to the content of
// RawRecord. Treat it like read-only.
type RawRecord struct {
Node *idr.Node
}

// UUIDv3 returns a stable MD5(v3) hash of the RawRecord.
func (rr *RawRecord) UUIDv3() string {
hash, _ := customfuncs.UUIDv3(nil, idr.JSONify2(rr.Node))
return hash
}

type ingester struct {
finalOutputDecl *transform.Decl
customFuncs customfuncs.CustomFuncs
customParseFuncs transform.CustomParseFuncs // Deprecated.
ctx *transformctx.Ctx
reader fileformat.FormatReader
rawRecord RawRecord
}

func (g *ingester) Read() ([]byte, error) {
// Read ingests a raw record from the input stream, transforms it according the given schema and return
// the raw record, transformed JSON bytes.
func (g *ingester) Read() (interface{}, []byte, error) {
if g.rawRecord.Node != nil {
g.reader.Release(g.rawRecord.Node)
g.rawRecord.Node = nil
}
n, err := g.reader.Read()
if n != nil {
g.rawRecord.Node = n
}
if err != nil {
// Read() supposed to have already done CtxAwareErr error wrapping. So directly return.
return nil, err
return nil, nil, err
}
defer g.reader.Release(n)
result, err := transform.NewParseCtx(
g.ctx, g.customFuncs, g.customParseFuncs).ParseNode(n, g.finalOutputDecl)
result, err := transform.NewParseCtx(g.ctx, g.customFuncs, g.customParseFuncs).ParseNode(n, g.finalOutputDecl)
if err != nil {
// ParseNode() error not CtxAwareErr wrapped, so wrap it.
// Note errs.ErrorTransformFailed is a continuable error.
return nil, errs.ErrTransformFailed(g.fmtErrStr("fail to transform. err: %s", err.Error()))
return nil, nil, errs.ErrTransformFailed(g.fmtErrStr("fail to transform. err: %s", err.Error()))
}
return json.Marshal(result)
transformed, err := json.Marshal(result)
return &g.rawRecord, transformed, err
}

func (g *ingester) IsContinuableError(err error) bool {
Expand Down
18 changes: 13 additions & 5 deletions extensions/omniv21/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (r *testReader) Read() (*idr.Node, error) {
return result, err
}

func (r *testReader) Release(n *idr.Node) { r.releaseCalled++ }
func (r *testReader) Release(_ *idr.Node) { r.releaseCalled++ }

func (r *testReader) IsContinuableError(err error) bool { return err == errContinuableInTest }

Expand All @@ -45,9 +45,10 @@ func TestIngester_Read_ReadFailure(t *testing.T) {
g := &ingester{
reader: &testReader{result: []*idr.Node{nil}, err: []error{errors.New("test failure")}},
}
b, err := g.Read()
raw, b, err := g.Read()
assert.Error(t, err)
assert.Equal(t, "test failure", err.Error())
assert.Nil(t, raw)
assert.Nil(t, b)
assert.Equal(t, 0, g.reader.(*testReader).releaseCalled)
}
Expand All @@ -64,15 +65,16 @@ func TestIngester_Read_ParseNodeFailure(t *testing.T) {
finalOutputDecl: finalOutputDecl,
reader: &testReader{result: []*idr.Node{ingesterTestNode}, err: []error{nil}},
}
b, err := g.Read()
raw, b, err := g.Read()
assert.Error(t, err)
assert.True(t, errs.IsErrTransformFailed(err))
assert.True(t, g.IsContinuableError(err))
assert.Equal(t,
`ctx: fail to transform. err: unable to convert value 'abc' to type 'int' on 'FINAL_OUTPUT', err: strconv.ParseInt: parsing "abc": invalid syntax`,
err.Error())
assert.Nil(t, raw)
assert.Nil(t, b)
assert.Equal(t, 1, g.reader.(*testReader).releaseCalled)
assert.Equal(t, 0, g.reader.(*testReader).releaseCalled)
}

func TestIngester_Read_Success(t *testing.T) {
Expand All @@ -87,9 +89,15 @@ func TestIngester_Read_Success(t *testing.T) {
finalOutputDecl: finalOutputDecl,
reader: &testReader{result: []*idr.Node{ingesterTestNode}, err: []error{nil}},
}
b, err := g.Read()
raw, b, err := g.Read()
assert.NoError(t, err)
assert.Equal(t, "41665284-dab9-300d-b647-7ace9cb514b4", raw.(*RawRecord).UUIDv3())
assert.Equal(t, "123", string(b))
assert.Equal(t, 0, g.reader.(*testReader).releaseCalled)
raw, b, err = g.Read()
assert.Equal(t, io.EOF, err)
assert.Nil(t, raw)
assert.Nil(t, b)
assert.Equal(t, 1, g.reader.(*testReader).releaseCalled)
}

Expand Down
84 changes: 48 additions & 36 deletions extensions/omniv21/samples/csv/.snapshots/Test1_Weather_Data_CSV
Original file line number Diff line number Diff line change
@@ -1,44 +1,56 @@
[
{
"date": "2019-01-31T12:34:56-08:00",
"high_temperature_fahrenheit": 50.9,
"latitude": 37.7749,
"longitude": 122.4194,
"low_temperature_fahrenheit": 30.2,
"note": "note 1",
"uv_index": [
"12",
"4",
"6"
],
"wind": "North 20.5 mph"
"RawRecord": "{\"DATE\":\"2019/01/31T12:34:56-0800\",\"HIGH_TEMP_C\":\"10.5\",\"LAT\":\"37.7749\",\"LONG\":\"122.4194\",\"LOW_TEMP_F\":\"30.2\",\"NOTE\":\"note 1\",\"UV_INDEX\":\"12/4/6\",\"WIND_DIR\":\"N\",\"WIND_SPEED_KMH\":\"33\"}",
"RawRecordHash": "24a341e6-bdac-3319-ac76-7354d42a7402",
"TransformedRecord": {
"date": "2019-01-31T12:34:56-08:00",
"high_temperature_fahrenheit": 50.9,
"latitude": 37.7749,
"longitude": 122.4194,
"low_temperature_fahrenheit": 30.2,
"note": "note 1",
"uv_index": [
"12",
"4",
"6"
],
"wind": "North 20.5 mph"
}
},
{
"date": "2020-07-31T01:23:45-05:00",
"high_temperature_fahrenheit": 102.2,
"latitude": 32.7767,
"longitude": 96.797,
"low_temperature_fahrenheit": 95,
"note": "' note with bad quotes",
"uv_index": [
"9",
"5",
"6"
],
"wind": "South East 4.97 mph"
"RawRecord": "{\"DATE\":\"2020/07/31T01:23:45-0500\",\"HIGH_TEMP_C\":\"39\",\"LAT\":\"32.7767\",\"LONG\":\"96.7970\",\"LOW_TEMP_F\":\"95\",\"NOTE\":\"' note with bad quotes\",\"UV_INDEX\":\"9/5/6\",\"WIND_DIR\":\"SE\",\"WIND_SPEED_KMH\":\"8\"}",
"RawRecordHash": "dba160be-3cfe-3efc-a891-f76461c37c08",
"TransformedRecord": {
"date": "2020-07-31T01:23:45-05:00",
"high_temperature_fahrenheit": 102.2,
"latitude": 32.7767,
"longitude": 96.797,
"low_temperature_fahrenheit": 95,
"note": "' note with bad quotes",
"uv_index": [
"9",
"5",
"6"
],
"wind": "South East 4.97 mph"
}
},
{
"date": "2030-11-22T20:18:00-05:00",
"high_temperature_fahrenheit": 59.9,
"latitude": 39.0997,
"longitude": 94.5786,
"low_temperature_fahrenheit": 17,
"note": "note 3",
"uv_index": [
"10",
"3",
"4"
],
"wind": "Tornado 111.84 mph"
"RawRecord": "{\"DATE\":\"2030/11/22T20:18:00-0500\",\"HIGH_TEMP_C\":\"15.5\",\"LAT\":\"39.0997\",\"LONG\":\"94.5786\",\"LOW_TEMP_F\":\"17\",\"NOTE\":\"note 3\",\"UV_INDEX\":\"10/3/4\",\"WIND_DIR\":\"X\",\"WIND_SPEED_KMH\":\"180\"}",
"RawRecordHash": "fcdd707d-1ed4-3641-aca3-b0df568b1084",
"TransformedRecord": {
"date": "2030-11-22T20:18:00-05:00",
"high_temperature_fahrenheit": 59.9,
"latitude": 39.0997,
"longitude": 94.5786,
"low_temperature_fahrenheit": 17,
"note": "note 3",
"uv_index": [
"10",
"3",
"4"
],
"wind": "Tornado 111.84 mph"
}
}
]
Loading

0 comments on commit 4aa56d6

Please sign in to comment.