-
Notifications
You must be signed in to change notification settings - Fork 514
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support BQ Json arrays and literals #5544
base: main
Are you sure you want to change the base?
Conversation
6ee7ef5
to
9037aa6
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #5544 +/- ##
==========================================
+ Coverage 61.45% 61.46% +0.01%
==========================================
Files 314 314
Lines 11222 11228 +6
Branches 771 772 +1
==========================================
+ Hits 6896 6901 +5
- Misses 4326 4327 +1 ☔ View full report in Codecov by Sentry. |
5d0e8c1
to
b06a26d
Compare
38fee1d
to
2cf3bb8
Compare
2cf3bb8
to
574db49
Compare
Ideally, beam should read |
Thanks for this! I executed some loads to BQ based on 574db49 and it seems all entries with JSON non-null columns are just filtered out, which is odd (jsarray or jsobject as well). Tried to find the origin, but everything seems correct... |
tl;dr: I think I found it (see at the end). This problem is only when writing with Storage API. Test code: import com.spotify.scio.ContextAndArgs
import com.spotify.scio.bigquery._
import com.spotify.scio.bigquery.Table
import com.spotify.scio.bigquery.types.{BigQueryType, Json, description}
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition
import scala.concurrent.duration.Duration
object TestJob {
@BigQueryType.toTable
@description("Test")
case class Test(id: String, json: Option[Json])
def main(args: Array[String]): Unit = {
implicit val (sc, _) = ContextAndArgs(args)
val tests = sc.parallelize(Seq(
Test("stored", None),
Test("ignored", Some(Json("{\"key\":\"value\"}"))),
))
tests.saveAsTypedBigQueryTable(
Table.Spec(s"Test.test"),
method = BigQueryIO.Write.Method.STORAGE_WRITE_API,
writeDisposition = WriteDisposition.WRITE_APPEND,
)
sc.run().waitUntilFinish(Duration.Inf, cancelJob = false)
}
} Here the first After some digging, the error can be found in
It looks like at some point Suggested solutionIt seems it comes from def parse(json: Json): AnyRef = mapper.readValue(json.wkt, classOf[Object]) I think since it's there only to encapsulate a JSON string it should be: def parse(json: Json): AnyRef = json.wkt It solves the problem for my use-case, but I wonder if there are side-effects I don't know. (another thing is Beam silently dropping bad inserts...) |
case _ => | ||
new Json(mapper.writeValueAsString(value)) | ||
} | ||
def parse(json: Json): AnyRef = mapper.readValue(json.wkt, classOf[Object]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def parse(json: Json): AnyRef = mapper.readValue(json.wkt, classOf[Object]) | |
def parse(json: Json): AnyRef = json.wkt |
Json should simply be mapped to java
List
orMap
Fix #5542