Skip to content

Commit 27c19cc

Browse files
committed
v0.29
1 parent 96919e6 commit 27c19cc

File tree

4 files changed

+146
-101
lines changed

4 files changed

+146
-101
lines changed

LogReader.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ class CLogReader final
4848

4949
protected:
5050
#if 0
51-
#if 0
51+
#if 1
5252
CSyncLineReader _lineReader;
5353
#else
5454
CMappingLineReader _lineReader;

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,6 @@
8787
Я притянул часть STL на мой страх и риск. Ту часть, которая не требует исключений и работает без лишних накладных расходов.
8888
Не вижу смысла не пользоваться дешевыми абстракциями, позволяющими писать более чистый код
8989
и более защищенный от ошибок программиста. И без велосипедов.
90-
Я имею в виду всякие std::unique_ptr, std::string_view, std::optional, std::atomic.
90+
Я имею в виду всякие std::unique_ptr, std::string_view, std::optional.
9191

9292
---

ScanFile.cpp

Lines changed: 126 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,24 @@
33
#include <assert.h>
44

55
#include <algorithm>
6+
#include <atomic> // for std::atomic_thread_fence
67

78
#include <windows.h>
89

10+
namespace
11+
{
12+
// This is a dirty hack for speedup inter-thread communication on hyperthreading CPUs
13+
// We could choose any other paid of logical CPUs based on the same physical CPU and sharing the same cache
14+
// I'm sure it will give no effect in the current task because we are switching too rarely between threads.
15+
const int PreferedCpuForMainThread = 0;
16+
const int PreferedCpuForWorkerThread = PreferedCpuForMainThread + 1;
17+
static_assert(PreferedCpuForMainThread % 2 == 0);
18+
const DWORD ThreadPriority = THREAD_PRIORITY_TIME_CRITICAL;
19+
# define ENABLE_THREAD_PREFERRED_AFFINITY 0
20+
# define ENABLE_THREAD_FIXED_AFFINITY 0
21+
# define ENABLE_THREAD_PRIORITY 0
22+
}
23+
924

1025
CScanFile::~CScanFile()
1126
{
@@ -18,7 +33,7 @@ bool CScanFile::Open(const wchar_t* const filename, const bool asyncMode)
1833
{
1934
return false;
2035
}
21-
assert(this->_hEvent == nullptr);
36+
assert(this->_hAsyncEvent == nullptr);
2237

2338
const DWORD dwDesiredAccess = FILE_READ_DATA | FILE_READ_ATTRIBUTES; // minimal required rights
2439
// FILE_READ_ATTRIBUTES is needed to get file size for mapping file to memory
@@ -38,18 +53,18 @@ bool CScanFile::Open(const wchar_t* const filename, const bool asyncMode)
3853
}
3954

4055
// Init data for async IO:
41-
this->_hEvent = CreateEventW(nullptr, TRUE, FALSE, nullptr);
56+
this->_hAsyncEvent = CreateEventW(nullptr, TRUE, FALSE, nullptr);
4257

43-
if (this->_hEvent == nullptr)
58+
if (this->_hAsyncEvent == nullptr)
4459
{
4560
CloseHandle(this->_hFile);
4661
this->_hFile = nullptr;
4762
return false;
4863
}
4964

50-
this->_overlapped.hEvent = this->_hEvent;
51-
this->_fileOffset.QuadPart = 0;
52-
this->_operationInProgress = false;
65+
this->_asyncOverlapped.hEvent = this->_hAsyncEvent;
66+
this->_asyncFileOffset.QuadPart = 0;
67+
this->_asyncOperationInProgress = false;
5368

5469
return true;
5570
}
@@ -76,15 +91,19 @@ void CScanFile::Close()
7691
this->_hFile = nullptr;
7792
}
7893

79-
if (this->_hEvent != nullptr)
94+
if (this->_hAsyncEvent != nullptr)
8095
{
81-
CloseHandle(this->_hEvent);
82-
this->_hEvent = nullptr;
96+
CloseHandle(this->_hAsyncEvent);
97+
this->_hAsyncEvent = nullptr;
8398
}
8499

85-
this->_operationInProgress = false;
100+
this->_asyncOperationInProgress = false;
86101
}
87102

103+
//////////////////////////////////////////////////////////////////////////
104+
/// Implementation of mapping file to memory
105+
//////////////////////////////////////////////////////////////////////////
106+
88107
std::optional<std::string_view> CScanFile::MapToMemory()
89108
{
90109
if (this->_hFile == nullptr || this->_hFileMapping != nullptr)
@@ -131,6 +150,8 @@ std::optional<std::string_view> CScanFile::MapToMemory()
131150
return std::string_view(static_cast<const char*>(this->_pViewOfFile), fileSizeAsSizeT);
132151
}
133152

153+
//////////////////////////////////////////////////////////////////////////
154+
/// Implementation of synchronous file API
134155
//////////////////////////////////////////////////////////////////////////
135156

136157
__declspec(noinline) // noinline is added to help CPU profiling in release version
@@ -156,49 +177,51 @@ bool CScanFile::Read(char* const buffer, const size_t bufferLength, size_t& read
156177
return !!succeeded;
157178
}
158179

180+
//////////////////////////////////////////////////////////////////////////
181+
/// Implementation of Asynchronous file API
159182
//////////////////////////////////////////////////////////////////////////
160183

161184
__declspec(noinline) // noinline is added to help CPU profiling in release version
162185
bool CScanFile::AsyncReadStart(char* const buffer, const size_t bufferLength)
163186
{
164-
if (this->_hFile == nullptr || buffer == nullptr || this->_operationInProgress)
187+
if (this->_hFile == nullptr || buffer == nullptr || this->_asyncOperationInProgress)
165188
{
166189
return false;
167190
}
168-
assert(this->_hEvent != nullptr);
191+
assert(this->_hAsyncEvent != nullptr);
169192

170193
const DWORD usedBufferLength = static_cast<DWORD>(min(bufferLength, MAXDWORD));
171194

172-
this->_overlapped.Offset = this->_fileOffset.LowPart;
173-
this->_overlapped.OffsetHigh = this->_fileOffset.HighPart;
195+
this->_asyncOverlapped.Offset = this->_asyncFileOffset.LowPart;
196+
this->_asyncOverlapped.OffsetHigh = this->_asyncFileOffset.HighPart;
174197

175-
const bool readOk = !!ReadFile(this->_hFile, buffer, usedBufferLength, nullptr, &this->_overlapped);
198+
const bool readOk = !!ReadFile(this->_hFile, buffer, usedBufferLength, nullptr, &this->_asyncOverlapped);
176199
if (!readOk && GetLastError() != ERROR_IO_PENDING)
177200
{
178201
return false;
179202
}
180203

181-
this->_operationInProgress = true;
204+
this->_asyncOperationInProgress = true;
182205

183206
return true;
184207
}
185208

186209
__declspec(noinline) // noinline is added to help CPU profiling in release version
187210
bool CScanFile::AsyncReadWait(size_t& readBytes)
188211
{
189-
if (!this->_operationInProgress)
212+
if (!this->_asyncOperationInProgress)
190213
{
191214
return false;
192215
}
193216
assert(this->_hFile != nullptr);
194-
assert(this->_hEvent != nullptr);
217+
assert(this->_hAsyncEvent != nullptr);
195218

196219
DWORD numberOfBytesRead = 0;
197220

198-
const bool overlappedOk = !!GetOverlappedResult(this->_hFile, &this->_overlapped, &numberOfBytesRead, TRUE);
221+
const bool overlappedOk = !!GetOverlappedResult(this->_hFile, &this->_asyncOverlapped, &numberOfBytesRead, TRUE);
199222
if (!overlappedOk)
200223
{
201-
this->_operationInProgress = false; // I'm not sure this is correct
224+
this->_asyncOperationInProgress = false; // I'm not sure this is correct
202225
if (GetLastError() == ERROR_HANDLE_EOF)
203226
{
204227
assert(numberOfBytesRead == 0);
@@ -209,138 +232,164 @@ bool CScanFile::AsyncReadWait(size_t& readBytes)
209232
return false;
210233
}
211234

212-
this->_fileOffset.QuadPart += numberOfBytesRead;
213-
this->_operationInProgress = false;
235+
this->_asyncFileOffset.QuadPart += numberOfBytesRead;
236+
this->_asyncOperationInProgress = false;
214237

215238
readBytes = numberOfBytesRead;
216239

217240
return true;
218241
}
219242

243+
//////////////////////////////////////////////////////////////////////////
244+
/// Implementation of file API executed in a separate thread with the help of spinlocks
220245
//////////////////////////////////////////////////////////////////////////
221246

222247
bool CScanFile::LockFreecInit()
223248
{
224-
if (this->_pThread != nullptr)
249+
if (this->_hThread != nullptr)
225250
{
226251
return false;
227252
}
228253

229-
this->_threadFinishSignal = false;
230-
this->_threadOperationReadStartSignal = false;
231-
this->_threadOperationReadCompletedSignal = false;
254+
this->_threadFinishSpinlock = false;
255+
this->_threadOperationReadStartSpinlock = false;
256+
this->_threadOperationReadCompletedSpinlock = false;
257+
this->_pThreadReadBuffer = nullptr;
258+
this->_threadReadBufferSize = 0;
259+
this->_threadActuallyReadBytes = 0;
260+
this->_threadReadSucceeded = false;
261+
std::atomic_thread_fence(std::memory_order_release);
262+
263+
// This is a dirty hack for speedup inter-thread communication on hyperthreading CPUs
264+
#if ENABLE_THREAD_PREFERRED_AFFINITY
265+
SetThreadIdealProcessor(GetCurrentThread(), PreferedCpuForMainThread);
266+
#endif
267+
#if ENABLE_THREAD_FIXED_AFFINITY
268+
SetThreadAffinityMask(GetCurrentThread(), 1 << PreferedCpuForMainThread);
269+
#endif
270+
#if ENABLE_THREAD_PRIORITY
271+
SetThreadPriority(GetCurrentThread(), ThreadPriority);
272+
#endif
273+
274+
using ThreadProcType = unsigned __stdcall(void*);
275+
ThreadProcType* const threadProc = [](void* p) -> unsigned
276+
{
277+
// This is a dirty hack for speedup inter-thread communication on hyperthreading CPUs
278+
#if ENABLE_THREAD_PREFERRED_AFFINITY
279+
SetThreadIdealProcessor(GetCurrentThread(), PreferedCpuForWorkerThread);
280+
#endif
281+
#if ENABLE_THREAD_FIXED_AFFINITY
282+
SetThreadAffinityMask(GetCurrentThread(), 1 << PreferedCpuForWorkerThread);
283+
#endif
284+
#if ENABLE_THREAD_PRIORITY
285+
SetThreadPriority(GetCurrentThread(), ThreadPriority);
286+
#endif
287+
288+
CScanFile* const that = static_cast<CScanFile*>(p);
289+
that->LockFreeThreadProc();
290+
_endthreadex(0);
291+
return 0;
292+
};
293+
294+
unsigned threadID = 0;
295+
this->_hThread = reinterpret_cast<HANDLE>(_beginthreadex(NULL, 0, threadProc, this, 0, &threadID));
296+
if (this->_hThread == nullptr)
232297
{
233-
std::lock_guard<std::mutex> lock(this->_protectThreadData);
234-
this->_pThreadReadBuffer = nullptr;
235-
this->_threadReadBufferSize = 0;
236-
this->_threadActuallyReadBytes = 0;
237-
this->_threadReadSucceeded = false;
298+
return false;
238299
}
239300

240-
this->_pThread = std::make_unique<std::thread>(&CScanFile::LockFreeThread, this);
241-
242301
return true;
243302
}
244303

245304
void CScanFile::LockFreecClean()
246305
{
247-
if (this->_pThread != nullptr)
306+
if (this->_hThread != nullptr)
248307
{
249-
this->_threadFinishSignal = true;
250-
this->_pThread->join();
251-
this->_pThread.reset();
308+
this->_threadFinishSpinlock = true;
309+
WaitForSingleObject(this->_hThread, INFINITE); // ignore return value in this case
310+
this->_hThread = nullptr;
252311
}
253312
}
254313

255314
__declspec(noinline) // noinline is added to help CPU profiling in release version
256315
bool CScanFile::LockFreeReadStart(char* const buffer, const size_t bufferLength)
257316
{
258-
if (this->_pThread == nullptr || this->_threadOperationInProgress)
317+
if (this->_hThread == nullptr || this->_threadOperationInProgress)
259318
{
260319
return false;
261320
}
262321

263322
this->_threadOperationInProgress = true;
323+
this->_threadOperationReadCompletedSpinlock = false;
324+
this->_pThreadReadBuffer = buffer;
325+
this->_threadReadBufferSize = bufferLength;
326+
this->_threadActuallyReadBytes = 0;
327+
this->_threadReadSucceeded = false;
328+
std::atomic_thread_fence(std::memory_order_release);
264329

265-
this->_threadOperationReadCompletedSignal = false;
266-
267-
{
268-
std::lock_guard<std::mutex> lock(this->_protectThreadData);
269-
this->_pThreadReadBuffer = buffer;
270-
this->_threadReadBufferSize = bufferLength;
271-
this->_threadActuallyReadBytes = 0;
272-
this->_threadReadSucceeded = false;
273-
}
274-
275-
this->_threadOperationReadStartSignal = true;
330+
this->_threadOperationReadStartSpinlock = true;
331+
std::atomic_thread_fence(std::memory_order_release);
276332

277333
return true;
278334
}
279335

280336
__declspec(noinline) // noinline is added to help CPU profiling in release version
281337
bool CScanFile::LockFreeReadWait(size_t& readBytes)
282338
{
283-
if (this->_pThread == nullptr || !this->_threadOperationInProgress)
339+
if (this->_hThread == nullptr || !this->_threadOperationInProgress)
284340
{
285341
return false;
286342
}
287343

288344
this->_threadOperationInProgress = false;
345+
std::atomic_thread_fence(std::memory_order_release);
289346

290-
while (!this->_threadOperationReadCompletedSignal)
347+
std::atomic_thread_fence(std::memory_order_acquire);
348+
while (!this->_threadOperationReadCompletedSpinlock)
291349
{
292350
// nothing
351+
std::atomic_thread_fence(std::memory_order_acquire);
293352
}
294353

354+
std::atomic_thread_fence(std::memory_order_acquire);
355+
readBytes = this->_threadActuallyReadBytes;
356+
if (!this->_threadReadSucceeded)
295357
{
296-
std::lock_guard<std::mutex> lock(this->_protectThreadData);
297-
readBytes = this->_threadActuallyReadBytes;
298-
if (!this->_threadReadSucceeded)
299-
{
300-
return false;
301-
}
358+
return false;
302359
}
303360

304361
return true;
305362
}
306363

307-
void CScanFile::LockFreeThread()
364+
void CScanFile::LockFreeThreadProc()
308365
{
309366
while (true)
310367
{
311-
if (this->_threadFinishSignal)
368+
std::atomic_thread_fence(std::memory_order_acquire);
369+
if (this->_threadFinishSpinlock)
312370
{
313371
// thread exit signal is caught
314372
break;
315373
}
316374

317-
if (!this->_threadOperationReadStartSignal)
375+
if (!this->_threadOperationReadStartSpinlock)
318376
{
319377
// nothing to do
320378
continue;
321379
}
322380

323-
this->_threadOperationReadStartSignal = false;
324-
325-
char* buffer = nullptr;
326-
size_t bufferSize = 0;
327-
328-
{
329-
std::lock_guard<std::mutex> lock(this->_protectThreadData);
330-
buffer = this->_pThreadReadBuffer;
331-
bufferSize = this->_threadReadBufferSize;
332-
}
381+
std::atomic_thread_fence(std::memory_order_acquire);
333382

334383
size_t readBytes = 0;
335-
const bool readOk = this->Read(buffer, bufferSize, readBytes);
384+
const bool readOk = this->Read(this->_pThreadReadBuffer, this->_threadReadBufferSize, readBytes);
336385

337-
{
338-
std::lock_guard<std::mutex> lock(this->_protectThreadData);
339-
this->_threadActuallyReadBytes = readBytes;
340-
this->_threadReadSucceeded = readOk;
341-
}
386+
this->_threadOperationReadStartSpinlock = false;
387+
this->_threadActuallyReadBytes = readBytes;
388+
this->_threadReadSucceeded = readOk;
389+
std::atomic_thread_fence(std::memory_order_release);
342390

343-
this->_threadOperationReadCompletedSignal = true;
391+
this->_threadOperationReadCompletedSpinlock = true;
392+
std::atomic_thread_fence(std::memory_order_release);
344393
}
345394
}
346395

0 commit comments

Comments
 (0)