Skip to content

Commit c225324

Browse files
authored
Merge pull request #17 from dentiny/hjiang/pread
Avoid update file offset in PRead interface
2 parents 6a5c0b0 + c98f92c commit c225324

File tree

2 files changed

+24
-11
lines changed

2 files changed

+24
-11
lines changed

extension/httpfs/httpfs.cpp

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -526,7 +526,6 @@ void HTTPFileSystem::Read(FileHandle &handle, void *buffer, int64_t nr_bytes, id
526526
throw InternalException("Cached file not initialized properly");
527527
}
528528
memcpy(buffer, hfh.cached_file_handle->GetData() + location, nr_bytes);
529-
hfh.file_offset = location + nr_bytes;
530529
return;
531530
}
532531

@@ -537,22 +536,30 @@ void HTTPFileSystem::Read(FileHandle &handle, void *buffer, int64_t nr_bytes, id
537536
bool skip_buffer = hfh.flags.DirectIO() || hfh.flags.RequireParallelAccess();
538537
if (skip_buffer && to_read > 0) {
539538
GetRangeRequest(hfh, hfh.path, {}, location, (char *)buffer, to_read);
539+
540+
// Update handle status within critical section for parallel access.
541+
if (hfh.flags.RequireParallelAccess()) {
542+
std::lock_guard<std::mutex> lck(hfh.mu);
543+
hfh.buffer_available = 0;
544+
hfh.buffer_idx = 0;
545+
return;
546+
}
547+
540548
hfh.buffer_available = 0;
541549
hfh.buffer_idx = 0;
542-
hfh.file_offset = location + nr_bytes;
543550
return;
544551
}
545552

546553
if (location >= hfh.buffer_start && location < hfh.buffer_end) {
547-
hfh.file_offset = location;
548554
hfh.buffer_idx = location - hfh.buffer_start;
549555
hfh.buffer_available = (hfh.buffer_end - hfh.buffer_start) - hfh.buffer_idx;
550556
} else {
551557
// reset buffer
552558
hfh.buffer_available = 0;
553559
hfh.buffer_idx = 0;
554-
hfh.file_offset = location;
555560
}
561+
562+
idx_t start_offset = location; // Start file offset to read from.
556563
while (to_read > 0) {
557564
auto buffer_read_len = MinValue<idx_t>(hfh.buffer_available, to_read);
558565
if (buffer_read_len > 0) {
@@ -564,36 +571,37 @@ void HTTPFileSystem::Read(FileHandle &handle, void *buffer, int64_t nr_bytes, id
564571

565572
hfh.buffer_idx += buffer_read_len;
566573
hfh.buffer_available -= buffer_read_len;
567-
hfh.file_offset += buffer_read_len;
574+
start_offset += buffer_read_len;
568575
}
569576

570577
if (to_read > 0 && hfh.buffer_available == 0) {
571-
auto new_buffer_available = MinValue<idx_t>(hfh.READ_BUFFER_LEN, hfh.length - hfh.file_offset);
578+
auto new_buffer_available = MinValue<idx_t>(hfh.READ_BUFFER_LEN, hfh.length - start_offset);
572579

573580
// Bypass buffer if we read more than buffer size
574581
if (to_read > new_buffer_available) {
575582
GetRangeRequest(hfh, hfh.path, {}, location + buffer_offset, (char *)buffer + buffer_offset, to_read);
576583
hfh.buffer_available = 0;
577584
hfh.buffer_idx = 0;
578-
hfh.file_offset += to_read;
585+
start_offset += to_read;
579586
break;
580587
} else {
581-
GetRangeRequest(hfh, hfh.path, {}, hfh.file_offset, (char *)hfh.read_buffer.get(),
588+
GetRangeRequest(hfh, hfh.path, {}, start_offset, (char *)hfh.read_buffer.get(),
582589
new_buffer_available);
583590
hfh.buffer_available = new_buffer_available;
584591
hfh.buffer_idx = 0;
585-
hfh.buffer_start = hfh.file_offset;
592+
hfh.buffer_start = start_offset;
586593
hfh.buffer_end = hfh.buffer_start + new_buffer_available;
587594
}
588595
}
589596
}
590597
}
591598

592599
int64_t HTTPFileSystem::Read(FileHandle &handle, void *buffer, int64_t nr_bytes) {
593-
auto &hfh = (HTTPFileHandle &)handle;
600+
auto &hfh = handle.Cast<HTTPFileHandle>();
594601
idx_t max_read = hfh.length - hfh.file_offset;
595602
nr_bytes = MinValue<idx_t>(max_read, nr_bytes);
596603
Read(handle, buffer, nr_bytes, hfh.file_offset);
604+
hfh.file_offset += nr_bytes;
597605
return nr_bytes;
598606
}
599607

@@ -602,7 +610,7 @@ void HTTPFileSystem::Write(FileHandle &handle, void *buffer, int64_t nr_bytes, i
602610
}
603611

604612
int64_t HTTPFileSystem::Write(FileHandle &handle, void *buffer, int64_t nr_bytes) {
605-
auto &hfh = (HTTPFileHandle &)handle;
613+
auto &hfh = handle.Cast<HTTPFileHandle>();
606614
Write(handle, buffer, nr_bytes, hfh.file_offset);
607615
return nr_bytes;
608616
}

extension/httpfs/include/httpfs.hpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
#include "duckdb/main/client_data.hpp"
99
#include "http_metadata_cache.hpp"
1010

11+
#include <mutex>
12+
1113
namespace duckdb_httplib_openssl {
1214
struct Response;
1315
class Result;
@@ -112,6 +114,9 @@ class HTTPFileHandle : public FileHandle {
112114
idx_t buffer_start;
113115
idx_t buffer_end;
114116

117+
// Used when file handle created with parallel access flag specified.
118+
std::mutex mu;
119+
115120
// Read buffer
116121
duckdb::unique_ptr<data_t[]> read_buffer;
117122
constexpr static idx_t READ_BUFFER_LEN = 1000000;

0 commit comments

Comments
 (0)