Skip to content

Support for workers #176

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 34 additions & 62 deletions src/serialport_win.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -341,9 +341,7 @@ void __stdcall WriteIOCompletion(DWORD errorCode, DWORD bytesTransferred, OVERLA
}
}

DWORD __stdcall WriteThread(LPVOID param) {
uv_async_t* async = static_cast<uv_async_t*>(param);
WriteBaton* baton = static_cast<WriteBaton*>(async->data);
DWORD __stdcall WriteThread(WriteBaton* baton) {

OVERLAPPED* ov = new OVERLAPPED;
memset(ov, 0, sizeof(OVERLAPPED));
Expand All @@ -365,27 +363,16 @@ DWORD __stdcall WriteThread(LPVOID param) {
SleepEx(INFINITE, TRUE);
}
delete ov;
// Signal the main thread to run the callback.
uv_async_send(async);
ExitThread(0);
}

void EIO_AfterWrite(uv_async_t* req) {
WriteBaton* baton = static_cast<WriteBaton*>(req->data);
Napi::Env env = baton->callback.Env();
Napi::HandleScope scope(env);
WaitForSingleObject(baton->hThread, INFINITE);
CloseHandle(baton->hThread);
uv_close(reinterpret_cast<uv_handle_t*>(req), AsyncCloseCallback);

v8::Local<v8::Value> argv[1];
if (baton->errorString[0]) {
baton->callback.Call({Napi::Error::New(env, baton->errorString).Value()});
} else {
baton->callback.Call({env.Null()});
}
baton->buffer.Reset();
baton->tsfn.BlockingCall([baton](Napi::Env env, Napi::Function jsCallback) {
if (baton->errorString[0]) {
jsCallback.Call({Napi::Error::New(env, baton->errorString).Value()});
} else {
jsCallback.Call({env.Null()});
}
});
baton->tsfn.Release();
delete baton;
return ERROR_SUCCESS;
}


Expand Down Expand Up @@ -415,20 +402,20 @@ Napi::Value Write(const Napi::CallbackInfo& info) {
}

WriteBaton* baton = new WriteBaton();
baton->callback = Napi::Persistent(info[2].As<Napi::Function>());

baton->fd = fd;
baton->buffer.Reset(buffer);
baton->bufferData = bufferData;
baton->bufferLength = bufferLength;
baton->offset = 0;
baton->complete = false;

uv_async_t* async = new uv_async_t;
uv_async_init(uv_default_loop(), async, EIO_AfterWrite);
async->data = baton;
// WriteFileEx requires a thread that can block. Create a new thread to
// run the write operation, saving the handle so it can be deallocated later.
baton->hThread = CreateThread(NULL, 0, WriteThread, async, 0, NULL);
Napi::Function callback = info[2].As<Napi::Function>();
baton->tsfn = Napi::ThreadSafeFunction::New(env, callback, "WriteThreadSafeFunction", 0, 1);

baton->nativeThread = std::thread(WriteThread,baton);
baton->nativeThread.detach();

return env.Null();
}

Expand Down Expand Up @@ -497,15 +484,11 @@ void __stdcall ReadIOCompletion(DWORD errorCode, DWORD bytesTransferred, OVERLAP
baton->complete = true;
}

DWORD __stdcall ReadThread(LPVOID param) {
uv_async_t* async = static_cast<uv_async_t*>(param);
ReadBaton* baton = static_cast<ReadBaton*>(async->data);
DWORD __stdcall ReadThread(ReadBaton* baton) {
DWORD lastError;

OVERLAPPED* ov = new OVERLAPPED;
memset(ov, 0, sizeof(OVERLAPPED));
ov->hEvent = static_cast<void*>(baton);

while (!baton->complete) {
// Reset the read timeout to 0, so that it will block until more data arrives.
COMMTIMEOUTS commTimeouts = {};
Expand All @@ -528,29 +511,19 @@ DWORD __stdcall ReadThread(LPVOID param) {
ErrorCodeToString("Reading from COM port (ReadFileEx)", lastError, baton->errorString);
break;
}
// IOCompletion routine is only called once this thread is in an alertable wait state.
SleepEx(INFINITE, TRUE);
}
delete ov;
// Signal the main thread to run the callback.
uv_async_send(async);
ExitThread(0);
}

void EIO_AfterRead(uv_async_t* req) {
ReadBaton* baton = static_cast<ReadBaton*>(req->data);
Napi::Env env = baton->callback.Env();
Napi::HandleScope scope(env);
WaitForSingleObject(baton->hThread, INFINITE);
CloseHandle(baton->hThread);
uv_close(reinterpret_cast<uv_handle_t*>(req), AsyncCloseCallback);

if (baton->errorString[0]) {
baton->callback.Call({Napi::Error::New(env, baton->errorString).Value(), env.Undefined()});
} else {
baton->callback.Call({env.Null(), Napi::Number::New(env, static_cast<int>(baton->bytesRead))});
}
baton->tsfn.BlockingCall([baton](Napi::Env env, Napi::Function jsCallback) {
if (baton->errorString[0]) {
jsCallback.Call({Napi::Error::New(env, baton->errorString).Value(),env.Undefined()});
} else {
jsCallback.Call({env.Null(), Napi::Number::New(env, static_cast<int>(baton->bytesRead))});
}
});
baton->tsfn.Release();
delete baton;
return ERROR_SUCCESS;
}

Napi::Value Read(const Napi::CallbackInfo& info) {
Expand Down Expand Up @@ -595,24 +568,23 @@ Napi::Value Read(const Napi::CallbackInfo& info) {
return env.Null();
}
ReadBaton* baton = new ReadBaton();
baton->callback = Napi::Persistent(info[4].As<Napi::Function>());
baton->fd = fd;
baton->offset = offset;
baton->bytesToRead = bytesToRead;
baton->bufferLength = bufferLength;

baton->bufferData = buffer.As<Napi::Buffer<char>>().Data();
baton->complete = false;

Napi::Function callback = info[4].As<Napi::Function>();
baton->tsfn = Napi::ThreadSafeFunction::New(env, callback, "ReadThreadSafeFunction", 0, 1);

baton->nativeThread = std::thread(ReadThread,baton);
baton->nativeThread.detach();

uv_async_t* async = new uv_async_t;
uv_async_init(uv_default_loop(), async, EIO_AfterRead);
async->data = baton;
baton->hThread = CreateThread(NULL, 0, ReadThread, async, 0, NULL);
// ReadFileEx requires a thread that can block. Create a new thread to
// run the read operation, saving the handle so it can be deallocated later.
return env.Null();
}


void CloseBaton::Execute() {
g_closingHandles.push_back(fd);

Expand Down
10 changes: 6 additions & 4 deletions src/serialport_win.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ struct WriteBaton {
size_t bufferLength = 0;
size_t offset = 0;
size_t bytesWritten = 0;
void* hThread = nullptr;
std::thread nativeThread;
bool complete = false;
Napi::ObjectReference buffer;
Napi::FunctionReference callback;
Napi::ThreadSafeFunction tsfn;
int result = 0;
char errorString[ERROR_STRING_SIZE];
};
Expand All @@ -37,10 +37,11 @@ struct ReadBaton {
size_t bytesRead = 0;
size_t bytesToRead = 0;
size_t offset = 0;
void* hThread = nullptr;
Napi::FunctionReference callback;

std::thread nativeThread;
bool complete = false;
char errorString[ERROR_STRING_SIZE];
Napi::ThreadSafeFunction tsfn;
};

Napi::Value Read(const Napi::CallbackInfo& info);
Expand Down Expand Up @@ -91,3 +92,4 @@ struct ListBaton : public Napi::AsyncWorker {
};

#endif // PACKAGES_SERIALPORT_SRC_SERIALPORT_WIN_H_