-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathfake.py
174 lines (138 loc) · 5.87 KB
/
fake.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
from typing import List
from pyspark.sql.datasource import (
DataSource,
DataSourceReader,
DataSourceStreamReader,
InputPartition,
)
from pyspark.sql.types import StringType, StructType
def _validate_faker_schema(schema):
# Verify the library is installed correctly.
try:
from faker import Faker
except ImportError:
raise Exception("You need to install `faker` to use the fake datasource.")
fake = Faker()
for field in schema.fields:
try:
getattr(fake, field.name)()
except AttributeError:
raise Exception(
f"Unable to find a method called `{field.name}` in faker. "
f"Please check Faker's documentation to see supported methods."
)
if field.dataType != StringType():
raise Exception(
f"Field `{field.name}` is not a StringType. "
f"Only StringType is supported in the fake datasource."
)
class FakeDataSource(DataSource):
"""
A fake data source for PySpark to generate synthetic data using the `faker` library.
This data source allows specifying a schema with field names that correspond to `faker`
providers to generate random data for testing and development purposes.
The default schema is `name string, date string, zipcode string, state string`, and the
default number of rows is `3`. Both can be customized by users.
Name: `fake`
Notes
-----
- The fake data source relies on the `faker` library. Make sure it is installed and accessible.
- Only string type fields are supported, and each field name must correspond to a method name in
the `faker` library.
- When using the stream reader, `numRows` is the number of rows per microbatch.
Examples
--------
Register the data source.
>>> from pyspark_datasources import FakeDataSource
>>> spark.dataSource.register(FakeDataSource)
Use the fake datasource with the default schema and default number of rows:
>>> spark.read.format("fake").load().show()
+-----------+----------+-------+-------+
| name| date|zipcode| state|
+-----------+----------+-------+-------+
|Carlos Cobb|2018-07-15| 73003|Indiana|
| Eric Scott|1991-08-22| 10085| Idaho|
| Amy Martin|1988-10-28| 68076| Oregon|
+-----------+----------+-------+-------+
Use the fake datasource with a custom schema:
>>> spark.read.format("fake").schema("name string, company string").load().show()
+---------------------+--------------+
|name |company |
+---------------------+--------------+
|Tanner Brennan |Adams Group |
|Leslie Maxwell |Santiago Group|
|Mrs. Jacqueline Brown|Maynard Inc |
+---------------------+--------------+
Use the fake datasource with a different number of rows:
>>> spark.read.format("fake").option("numRows", 5).load().show()
+--------------+----------+-------+------------+
| name| date|zipcode| state|
+--------------+----------+-------+------------+
| Pam Mitchell|1988-10-20| 23788| Tennessee|
|Melissa Turner|1996-06-14| 30851| Nevada|
| Brian Ramsey|2021-08-21| 55277| Washington|
| Caitlin Reed|1983-06-22| 89813|Pennsylvania|
| Douglas James|2007-01-18| 46226| Alabama|
+--------------+----------+-------+------------+
Streaming fake data:
>>> stream = spark.readStream.format("fake").load().writeStream.format("console").start()
Batch: 0
+--------------+----------+-------+------------+
| name| date|zipcode| state|
+--------------+----------+-------+------------+
| Tommy Diaz|1976-11-17| 27627|South Dakota|
|Jonathan Perez|1986-02-23| 81307|Rhode Island|
| Julia Farmer|1990-10-10| 40482| Virginia|
+--------------+----------+-------+------------+
Batch: 1
...
>>> stream.stop()
"""
@classmethod
def name(cls):
return "fake"
def schema(self):
return "name string, date string, zipcode string, state string"
def reader(self, schema: StructType) -> "FakeDataSourceReader":
_validate_faker_schema(schema)
return FakeDataSourceReader(schema, self.options)
def streamReader(self, schema) -> "FakeDataSourceStreamReader":
_validate_faker_schema(schema)
return FakeDataSourceStreamReader(schema, self.options)
class FakeDataSourceReader(DataSourceReader):
def __init__(self, schema, options) -> None:
self.schema: StructType = schema
self.options = options
def read(self, partition):
from faker import Faker
fake = Faker()
# Note: every value in this `self.options` dictionary is a string.
num_rows = int(self.options.get("numRows", 3))
for _ in range(num_rows):
row = []
for field in self.schema.fields:
value = getattr(fake, field.name)()
row.append(value)
yield tuple(row)
class FakeDataSourceStreamReader(DataSourceStreamReader):
def __init__(self, schema, options) -> None:
self.schema: StructType = schema
self.rows_per_microbatch = int(options.get("numRows", 3))
self.options = options
self.offset = 0
def initialOffset(self) -> dict:
return {"offset": 0}
def latestOffset(self) -> dict:
self.offset += self.rows_per_microbatch
return {"offset": self.offset}
def partitions(self, start, end) -> List[InputPartition]:
return [InputPartition(end["offset"] - start["offset"])]
def read(self, partition):
from faker import Faker
fake = Faker()
for _ in range(partition.value):
row = []
for field in self.schema.fields:
value = getattr(fake, field.name)()
row.append(value)
yield tuple(row)