Skip to content

Commit eda1fa7

Browse files
authored
Various fixes to the DAOS tensorflow-io plugin. (#2)
General: Asserts were added and enabled after each DAOS event-related call in order to track down internal race conditions in the DAOS client code, see DAOS-10601. The DAOS_FILE structure documented behavior for the 'offset' field, but most of that behavior didn't need to be implemented. Field 'offset' was removed while fixing the Append() and Tell() functions, leaving only a single field 'file' in DAOS_FILE, so the DAOS_FILE struct was removed as well. Typos in error messages were corrected. File dfs_utils.cc: In DFS::Setup(): Code after the Connect() call replaced the detailed error status set in Connect() with a generic TF_NOT_FOUND error with no accompanying message. This cost me several days of debugging to realize that a problem was not some object not being found, but rather was a connection failure to an unhealthy container. The TF_NOT_FOUND has been removed, allowing the more detailed error messages in Connect() to be reported. In ReadBuffer::ReadBuffer() By setting buffer_offset to ULONG_MAX, an uninitialized buffer will never be matched by CacheHit(), removing the need for a separate 'initialized' variable. The valid variable is no longer needed as well, more on that below. In ReadBuffer::~ReadBuffer() daos_event_fini(() cannot be called on an event that is still in flight, it fails without doing anything, daos_event_test() must wait for any prior event to complete, otherwise the event delete that follows daos_event_fini() could then cause corruption of the event queue. Call the reworked WaitEvent() (see below) first to ensure that daos_event_fini() will clean up the event before it is deleted. In ReadBuffer::FinalizeEvent() The same problem exists here as in ~ReadBuffer(), daos_event_fini() can't be called on an event that is still in flight. However, FinalizeEvent() isn't actually needed, a call to dfs_file->buffers.clear() in Cleanup() accomplishes the same thing using the ~ReadBuffer code, so FinalizeEvent was removed. ReadBuffer::WaitEvent() There is a need for a WaitEvent() function in several places to wait for any outstanding event to complete, but this routine manipulates 'valid', so it can't be used anywhere else. Removed the 'valid' code so that this routine can become a void and be called in multiple places. ReadBuffer::AbortEvent() daos_event_abort() doesn't actually contain any logic to ask the server to abort an in-flight dfs_read() request. In addition it is buggy, internal DAOS asserts were hit due to daos_event_abort() calls during I/O testing. The code was changed to instead use WaitEvent() to simply wait for a prior read to complete before issuing a new one, and AbortEvent() was removed. ReadBuffer::ReadAsync() Both daos_event_fini() and daos_event_init() must be called on a daos_event_t structure before the event can be reused for another dfs_read() call. These have been added. The AbortEvent() call was replaced with a call to WaitEvent(). The code was changed to save the errno from a failed dfs_read() call in the event's ev_error field so that the error will be detected, and so a user cannot accidentally read trash data after a failed dfs_read() call. ReadBuffer::ReadSync() This function is no longer used, see below. ReadBuffer::CopyData() The WaitEvent() call ensures that the thread blocks until any in-flight read request is done. The event->ev_error field is used to detect I/O failure either at the time the dfs_read() is issued or in the reply, so the valid flag is no longer needed. ReadBuffer::CopyFromCache() The TF_RandomAccessFile read() function allows for int64_t-sized reads, so change the return value here to int64_t. If an I/O error occurred, then return -1 so that the caller function Read() can easily tell when there has been an I/O error. Provide a detailed error message so that the user can tell what caused the error. File dfs_filesystem.cc: In DFSRandomAccessFile constructor: Added an assert() on the event queue creation. In Cleanup(): Replaced FinalizeEvent() code with a dfs_file->buffers.clear() call. Add asserts on dfs function calls. In df_dfs_filesystem::Read(): The loop "for (auto& read_buf : dfs_file->buffers)" was missing a break statement, so CacheHit was called 256 times for each curr_offset value. A break was added. Support was added for detecting a read error and returning -1. Since Read() is now a while loop, there is no reason to specially use ReadSync() for the first buffer. Code changed to use ReadAsync() for all readahead, CopyFromCache() will block until the first buffer's I/O is complete. ReadSync is now unused, and is removed. I could not determine a reason for the WaitEvent loop: if (curr_offset >= dfs_file->file_size) because I/O requests will never be be started beyond EOF. The loop was removed. In DFSWritableFile: The Append() function had to make a dfs_get_size() call for each append to a file, adding a second round trip to the server for each append. This is very expensive. Member functions were added to cache the file size and update it locally as Append() operations are done.. Since the tensorflow API only allows one writer, local caching is allowable. Should there be an I/O error, the actual size of the file becomes unknown, the new member functions take that into account and call dfs_get_size() in those situations to reestablish the correct size of the file. In Append(): The dfs_file->daos_file.offset field was not updated after an Append() operation completed successfully, so a subsequent Tell() call would return the size of the file before the last Append(), not after, the reported size was incorrect. The code was changed to update the cached file size after successful Append() operations. In RenameFile(): Similar to the Setup() case, the detailed error statuses in Connect() were being hidden by a genereric TF_NOT_FOUND error. The generic error was removed. Signed-off-by: Kevan Rehm <[email protected]>
1 parent 1a4191f commit eda1fa7

File tree

3 files changed

+124
-153
lines changed

3 files changed

+124
-153
lines changed

tensorflow_io/core/filesystems/dfs/dfs_filesystem.cc

Lines changed: 87 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,34 @@
11
#include "tensorflow_io/core/filesystems/dfs/dfs_utils.h"
22

3+
#include <stdio.h>
4+
#undef NDEBUG
5+
#include <cassert>
6+
37
namespace tensorflow {
48
namespace io {
59
namespace dfs {
610

11+
712
// SECTION 1. Implementation for `TF_RandomAccessFile`
813
// ----------------------------------------------------------------------------
914
namespace tf_random_access_file {
1015
typedef struct DFSRandomAccessFile {
1116
std::string dfs_path;
1217
dfs_t* daos_fs;
13-
DAOS_FILE daos_file;
18+
dfs_obj_t *daos_file;
1419
std::vector<ReadBuffer> buffers;
1520
daos_size_t file_size;
1621
daos_handle_t mEventQueueHandle{};
22+
1723
DFSRandomAccessFile(std::string dfs_path, dfs_t* file_system, dfs_obj_t* obj)
1824
: dfs_path(std::move(dfs_path)) {
1925
daos_fs = file_system;
20-
daos_file.file = obj;
26+
daos_file = obj;
2127
dfs_get_size(daos_fs, obj, &file_size);
2228
size_t num_of_buffers;
2329
size_t buff_size;
2430
int rc = daos_eq_create(&mEventQueueHandle);
31+
assert(rc == 0);
2532

2633
if (char* env_num_of_buffers = std::getenv("TF_IO_DAOS_NUM_OF_BUFFERS")) {
2734
num_of_buffers = atoi(env_num_of_buffers);
@@ -42,12 +49,12 @@ typedef struct DFSRandomAccessFile {
4249

4350
void Cleanup(TF_RandomAccessFile* file) {
4451
auto dfs_file = static_cast<DFSRandomAccessFile*>(file->plugin_file);
45-
for (auto& buffer : dfs_file->buffers) {
46-
buffer.FinalizeEvent();
47-
}
52+
dfs_file->buffers.clear();
4853

49-
daos_eq_destroy(dfs_file->mEventQueueHandle, 0);
50-
dfs_release(dfs_file->daos_file.file);
54+
int rc = daos_eq_destroy(dfs_file->mEventQueueHandle, 0);
55+
assert(rc == 0);
56+
rc = dfs_release(dfs_file->daos_file);
57+
assert(rc == 0);
5158
dfs_file->daos_fs = nullptr;
5259
delete dfs_file;
5360
}
@@ -65,14 +72,19 @@ int64_t Read(const TF_RandomAccessFile* file, uint64_t offset, size_t n,
6572
int64_t total_bytes = 0;
6673
size_t ret_size = offset + n;
6774
while (curr_offset < ret_size && curr_offset < dfs_file->file_size) {
68-
size_t read_bytes = 0;
75+
int64_t read_bytes = 0;
6976
for (auto& read_buf : dfs_file->buffers) {
7077
if (read_buf.CacheHit(curr_offset)) {
7178
read_bytes = read_buf.CopyFromCache(ret, ret_offset, curr_offset, n,
7279
dfs_file->file_size, status);
80+
break;
7381
}
7482
}
7583

84+
if (read_bytes < 0) {
85+
return -1;
86+
}
87+
7688
if (read_bytes > 0) {
7789
curr_offset += read_bytes;
7890
ret_offset += read_bytes;
@@ -81,30 +93,13 @@ int64_t Read(const TF_RandomAccessFile* file, uint64_t offset, size_t n,
8193
continue;
8294
}
8395

84-
size_t async_offset = curr_offset + BUFF_SIZE;
85-
for (size_t i = 1; i < dfs_file->buffers.size(); i++) {
96+
size_t async_offset = curr_offset;
97+
for (size_t i = 0; i < dfs_file->buffers.size(); i++) {
8698
if (async_offset > dfs_file->file_size) break;
8799
dfs_file->buffers[i].ReadAsync(dfs_file->daos_fs,
88-
dfs_file->daos_file.file, async_offset);
100+
dfs_file->daos_file, async_offset);
89101
async_offset += BUFF_SIZE;
90102
}
91-
92-
dfs_file->buffers[0].ReadSync(dfs_file->daos_fs, dfs_file->daos_file.file,
93-
curr_offset);
94-
95-
read_bytes = dfs_file->buffers[0].CopyFromCache(
96-
ret, ret_offset, curr_offset, n, dfs_file->file_size, status);
97-
98-
curr_offset += read_bytes;
99-
ret_offset += read_bytes;
100-
total_bytes += read_bytes;
101-
n -= read_bytes;
102-
103-
if (curr_offset >= dfs_file->file_size) {
104-
for (size_t i = 0; i < dfs_file->buffers.size(); i++) {
105-
dfs_file->buffers[i].WaitEvent();
106-
}
107-
}
108103
}
109104

110105
return total_bytes;
@@ -118,17 +113,42 @@ namespace tf_writable_file {
118113
typedef struct DFSWritableFile {
119114
std::string dfs_path;
120115
dfs_t* daos_fs;
121-
DAOS_FILE daos_file;
116+
dfs_obj_t *daos_file;
117+
daos_size_t file_size;
118+
bool size_known;
119+
122120
DFSWritableFile(std::string dfs_path, dfs_t* file_system, dfs_obj_t* obj)
123121
: dfs_path(std::move(dfs_path)) {
124122
daos_fs = file_system;
125-
daos_file.file = obj;
123+
daos_file = obj;
124+
size_known=false;
125+
}
126+
127+
int get_file_size(daos_size_t &size) {
128+
if (!size_known) {
129+
int rc = dfs_get_size(daos_fs, daos_file, &file_size);
130+
if (rc != 0) {
131+
return rc;
132+
}
133+
size_known = true;
134+
}
135+
size = file_size;
136+
return 0;
137+
}
138+
139+
void set_file_size(daos_size_t size) {
140+
file_size = size;
141+
size_known = true;
142+
}
143+
144+
void unset_file_size(void) {
145+
size_known = false;
126146
}
127147
} DFSWritableFile;
128148

129149
void Cleanup(TF_WritableFile* file) {
130150
auto dfs_file = static_cast<DFSWritableFile*>(file->plugin_file);
131-
dfs_release(dfs_file->daos_file.file);
151+
dfs_release(dfs_file->daos_file);
132152
dfs_file->daos_fs = nullptr;
133153
delete dfs_file;
134154
}
@@ -144,32 +164,44 @@ void Append(const TF_WritableFile* file, const char* buffer, size_t n,
144164
wsgl.sg_nr = 1;
145165
wsgl.sg_iovs = &iov;
146166

147-
daos_size_t size;
148-
dfs_get_size(dfs_file->daos_fs, dfs_file->daos_file.file, &size);
149-
dfs_file->daos_file.offset = size;
167+
daos_size_t cur_file_size;
168+
rc = dfs_file->get_file_size(cur_file_size);
169+
if (rc != 0) {
170+
TF_SetStatus(status, TF_INTERNAL, "Cannot determine file size");
171+
return;
172+
}
150173

151-
rc = dfs_write(dfs_file->daos_fs, dfs_file->daos_file.file, &wsgl,
152-
dfs_file->daos_file.offset, NULL);
174+
rc = dfs_write(dfs_file->daos_fs, dfs_file->daos_file, &wsgl,
175+
cur_file_size, NULL);
153176
if (rc) {
154177
TF_SetStatus(status, TF_RESOURCE_EXHAUSTED, "");
178+
dfs_file->unset_file_size();
179+
return;
155180
}
156181

182+
dfs_file->set_file_size(cur_file_size + n);
157183
TF_SetStatus(status, TF_OK, "");
158184
}
159185

160186
int64_t Tell(const TF_WritableFile* file, TF_Status* status) {
161187
auto dfs_file = static_cast<DFSWritableFile*>(file->plugin_file);
162188

163-
TF_SetStatus(status, TF_OK, "");
189+
daos_size_t cur_file_size;
190+
int rc = dfs_file->get_file_size(cur_file_size);
191+
if (rc != 0) {
192+
TF_SetStatus(status, TF_INTERNAL, "Cannot determine file size");
193+
return -1;
194+
}
164195

165-
return dfs_file->daos_file.offset;
196+
TF_SetStatus(status, TF_OK, "");
197+
return cur_file_size;
166198
}
167199

168200
void Close(const TF_WritableFile* file, TF_Status* status) {
169201
auto dfs_file = static_cast<DFSWritableFile*>(file->plugin_file);
170-
dfs_release(dfs_file->daos_file.file);
202+
dfs_release(dfs_file->daos_file);
171203
dfs_file->daos_fs = nullptr;
172-
dfs_file->daos_file.file = nullptr;
204+
dfs_file->daos_file = nullptr;
173205
TF_SetStatus(status, TF_OK, "");
174206
}
175207

@@ -206,7 +238,7 @@ void NewFile(const TF_Filesystem* filesystem, const char* path, File_Mode mode,
206238
int rc;
207239
auto daos = static_cast<DFS*>(filesystem->plugin_filesystem)->Load();
208240
if (!daos) {
209-
TF_SetStatus(status, TF_INTERNAL, "Error initializng DAOS API");
241+
TF_SetStatus(status, TF_INTERNAL, "Error initializing DAOS API");
210242
return;
211243
}
212244
std::string pool, cont, file_path;
@@ -222,7 +254,7 @@ void NewWritableFile(const TF_Filesystem* filesystem, const char* path,
222254
if (TF_GetCode(status) != TF_OK) return;
223255
auto daos = static_cast<DFS*>(filesystem->plugin_filesystem)->Load();
224256
if (!daos) {
225-
TF_SetStatus(status, TF_INTERNAL, "Error initializng DAOS API");
257+
TF_SetStatus(status, TF_INTERNAL, "Error initializing DAOS API");
226258
return;
227259
}
228260
file->plugin_file =
@@ -237,13 +269,13 @@ void NewRandomAccessFile(const TF_Filesystem* filesystem, const char* path,
237269
if (TF_GetCode(status) != TF_OK) return;
238270
auto daos = static_cast<DFS*>(filesystem->plugin_filesystem)->Load();
239271
if (!daos) {
240-
TF_SetStatus(status, TF_INTERNAL, "Error initializng DAOS API");
272+
TF_SetStatus(status, TF_INTERNAL, "Error initializing DAOS API");
241273
return;
242274
}
243275
auto random_access_file =
244276
new tf_random_access_file::DFSRandomAccessFile(path, daos->daos_fs, obj);
245277
random_access_file->buffers[0].ReadAsync(
246-
daos->daos_fs, random_access_file->daos_file.file, 0);
278+
daos->daos_fs, random_access_file->daos_file, 0);
247279
file->plugin_file = random_access_file;
248280
TF_SetStatus(status, TF_OK, "");
249281
}
@@ -255,7 +287,7 @@ void NewAppendableFile(const TF_Filesystem* filesystem, const char* path,
255287
if (TF_GetCode(status) != TF_OK) return;
256288
auto daos = static_cast<DFS*>(filesystem->plugin_filesystem)->Load();
257289
if (!daos) {
258-
TF_SetStatus(status, TF_INTERNAL, "Error initializng DAOS API");
290+
TF_SetStatus(status, TF_INTERNAL, "Error initializing DAOS API");
259291
return;
260292
}
261293
file->plugin_file =
@@ -268,7 +300,7 @@ void PathExists(const TF_Filesystem* filesystem, const char* path,
268300
int rc;
269301
auto daos = static_cast<DFS*>(filesystem->plugin_filesystem)->Load();
270302
if (!daos) {
271-
TF_SetStatus(status, TF_INTERNAL, "Error initializng DAOS API");
303+
TF_SetStatus(status, TF_INTERNAL, "Error initializing DAOS API");
272304
return;
273305
}
274306
std::string pool, cont, file;
@@ -288,7 +320,7 @@ void CreateDir(const TF_Filesystem* filesystem, const char* path,
288320
int rc;
289321
auto daos = static_cast<DFS*>(filesystem->plugin_filesystem)->Load();
290322
if (!daos) {
291-
TF_SetStatus(status, TF_INTERNAL, "Error initializng DAOS API");
323+
TF_SetStatus(status, TF_INTERNAL, "Error initializing DAOS API");
292324
return;
293325
}
294326
std::string pool, cont, dir_path;
@@ -304,7 +336,7 @@ static void RecursivelyCreateDir(const TF_Filesystem* filesystem,
304336
std::string pool, cont, dir_path;
305337
auto daos = static_cast<DFS*>(filesystem->plugin_filesystem)->Load();
306338
if (!daos) {
307-
TF_SetStatus(status, TF_INTERNAL, "Error initializng DAOS API");
339+
TF_SetStatus(status, TF_INTERNAL, "Error initializing DAOS API");
308340
return;
309341
}
310342
rc = daos->Setup(path, pool, cont, dir_path, status);
@@ -333,13 +365,13 @@ void DeleteFileSystemEntry(const TF_Filesystem* filesystem, const char* path,
333365
auto daos = static_cast<DFS*>(filesystem->plugin_filesystem)->Load();
334366

335367
if (!daos) {
336-
TF_SetStatus(status, TF_INTERNAL, "Error initializng DAOS API");
368+
TF_SetStatus(status, TF_INTERNAL, "Error initializing DAOS API");
337369
return;
338370
}
339371

340372
rc = daos->Setup(path, pool, cont, dir_path, status);
341373
if (rc) {
342-
TF_SetStatus(status, TF_INTERNAL, "Error initializng DAOS API");
374+
TF_SetStatus(status, TF_INTERNAL, "Error initializing DAOS API");
343375
return;
344376
}
345377

@@ -376,7 +408,7 @@ bool IsDir(const TF_Filesystem* filesystem, const char* path,
376408
std::string pool, cont, file;
377409
auto daos = static_cast<DFS*>(filesystem->plugin_filesystem)->Load();
378410
if (!daos) {
379-
TF_SetStatus(status, TF_INTERNAL, "Error initializng DAOS API");
411+
TF_SetStatus(status, TF_INTERNAL, "Error initializing DAOS API");
380412
return is_dir;
381413
}
382414
rc = daos->Setup(path, pool, cont, file, status);
@@ -411,7 +443,7 @@ int64_t GetFileSize(const TF_Filesystem* filesystem, const char* path,
411443
int rc;
412444
auto daos = static_cast<DFS*>(filesystem->plugin_filesystem)->Load();
413445
if (!daos) {
414-
TF_SetStatus(status, TF_INTERNAL, "Error initializng DAOS API");
446+
TF_SetStatus(status, TF_INTERNAL, "Error initializing DAOS API");
415447
return -1;
416448
}
417449
std::string pool, cont, file;
@@ -448,7 +480,7 @@ void RenameFile(const TF_Filesystem* filesystem, const char* src,
448480
int rc;
449481
auto daos = static_cast<DFS*>(filesystem->plugin_filesystem)->Load();
450482
if (!daos) {
451-
TF_SetStatus(status, TF_INTERNAL, "Error initializng DAOS API");
483+
TF_SetStatus(status, TF_INTERNAL, "Error initializing DAOS API");
452484
return;
453485
}
454486
int allow_cont_creation = 1;
@@ -473,7 +505,6 @@ void RenameFile(const TF_Filesystem* filesystem, const char* src,
473505

474506
daos->Connect(pool_src, cont_src, allow_cont_creation, status);
475507
if (TF_GetCode(status) != TF_OK) {
476-
TF_SetStatus(status, TF_NOT_FOUND, "");
477508
return;
478509
}
479510

@@ -558,7 +589,7 @@ void Stat(const TF_Filesystem* filesystem, const char* path,
558589
int rc;
559590
auto daos = static_cast<DFS*>(filesystem->plugin_filesystem)->Load();
560591
if (!daos) {
561-
TF_SetStatus(status, TF_INTERNAL, "Error initializng DAOS API");
592+
TF_SetStatus(status, TF_INTERNAL, "Error initializing DAOS API");
562593
return;
563594
}
564595
std::string pool, cont, dir_path;
@@ -598,7 +629,7 @@ int GetChildren(const TF_Filesystem* filesystem, const char* path,
598629
int rc;
599630
auto daos = static_cast<DFS*>(filesystem->plugin_filesystem)->Load();
600631
if (!daos) {
601-
TF_SetStatus(status, TF_INTERNAL, "Error initializng DAOS API");
632+
TF_SetStatus(status, TF_INTERNAL, "Error initializing DAOS API");
602633
return -1;
603634
}
604635
std::string pool, cont, dir_path;

0 commit comments

Comments
 (0)