Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 39 additions & 25 deletions Source/WebSocket.swift
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ private class Deflater {
/// WebSocket objects are bidirectional network streams that communicate over HTTP. RFC 6455.
private class InnerWebSocket: Hashable {
var id : Int
var mutex = pthread_mutex_t()
var mutex = UnsafeMutablePointer<pthread_mutex_t>.allocate(capacity: 1)
let request : URLRequest!
let subProtocols : [String]!
var frames : [Frame] = []
Expand Down Expand Up @@ -611,9 +611,10 @@ private class InnerWebSocket: Hashable {
func hash(into hasher: inout Hasher) {
hasher.combine(id)
}

init(request: URLRequest, subProtocols : [String] = [], stub : Bool = false){
pthread_mutex_init(&mutex, nil)
mutex.initialize(to: pthread_mutex_t())
pthread_mutex_init(mutex, nil)
self.id = manager.nextId()
self.request = request
self.subProtocols = subProtocols
Expand All @@ -639,13 +640,14 @@ private class InnerWebSocket: Hashable {
if inputBytes != nil {
free(inputBytes)
}
pthread_mutex_init(&mutex, nil)
pthread_mutex_init(mutex, nil)
mutex.deallocate()
}
@inline(__always) fileprivate func lock(){
pthread_mutex_lock(&mutex)
pthread_mutex_lock(mutex)
}
@inline(__always) fileprivate func unlock(){
pthread_mutex_unlock(&mutex)
pthread_mutex_unlock(mutex)
}

fileprivate var dirty : Bool {
Expand Down Expand Up @@ -1035,7 +1037,7 @@ private class InnerWebSocket: Hashable {
security = .none
}

var path = CFURLCopyPath(req.url! as CFURL?) as String
var path = CFURLCopyPath(req.url! as CFURL) as String
if path == "" {
path = "/"
}
Expand Down Expand Up @@ -1069,7 +1071,7 @@ private class InnerWebSocket: Hashable {
var (rdo, wro) : (InputStream?, OutputStream?)
var readStream: Unmanaged<CFReadStream>?
var writeStream: Unmanaged<CFWriteStream>?
CFStreamCreatePairWithSocketToHost(nil, addr[0] as CFString?, UInt32(Int(addr[1])!), &readStream, &writeStream);
CFStreamCreatePairWithSocketToHost(nil, addr[0] as CFString, UInt32(Int(addr[1])!), &readStream, &writeStream);
rdo = readStream!.takeRetainedValue()
wro = writeStream!.takeRetainedValue()
(rd, wr) = (rdo!, wro!)
Expand Down Expand Up @@ -1155,8 +1157,8 @@ private class InnerWebSocket: Hashable {
} else {
key = ""
if let r = line.range(of: ":") {
key = trim(String(line.prefix(upTo: r.lowerBound)))
value = trim(String(line.suffix(from: r.upperBound)))
key = trim(String(line[..<r.lowerBound]))
value = trim(String(line[r.upperBound...]))
}
}

Expand Down Expand Up @@ -1613,38 +1615,43 @@ private enum TCPConnSecurity {
private class Manager {
var queue = DispatchQueue(label: "SwiftWebSocketInstance", attributes: [])
var once = Int()
var mutex = pthread_mutex_t()
var mutex = UnsafeMutablePointer<pthread_mutex_t>.allocate(capacity: 1)
var cond = pthread_cond_t()
var websockets = Set<InnerWebSocket>()
var _nextId = 0
init(){
pthread_mutex_init(&mutex, nil)
mutex.initialize(to: pthread_mutex_t())
pthread_mutex_init(mutex, nil)
pthread_cond_init(&cond, nil)
DispatchQueue(label: "SwiftWebSocket", attributes: []).async {
var wss : [InnerWebSocket] = []
while true {
var wait = true
wss.removeAll()
pthread_mutex_lock(&self.mutex)
pthread_mutex_lock(self.mutex)
for ws in self.websockets {
wss.append(ws)
}
for ws in wss {
self.checkForConnectionTimeout(ws)
if ws.dirty {
pthread_mutex_unlock(&self.mutex)
pthread_mutex_unlock(self.mutex)
ws.step()
pthread_mutex_lock(&self.mutex)
pthread_mutex_lock(self.mutex)
wait = false
}
}
if wait {
_ = self.wait(250)
}
pthread_mutex_unlock(&self.mutex)
pthread_mutex_unlock(self.mutex)
}
}
}
deinit{
pthread_mutex_init(mutex, nil)
mutex.deallocate()
}
func checkForConnectionTimeout(_ ws : InnerWebSocket) {
if ws.rd != nil && ws.wr != nil && (ws.rd.streamStatus == .opening || ws.wr.streamStatus == .opening) {
let age = CFAbsoluteTimeGetCurrent() - ws.createdAt
Expand All @@ -1663,28 +1670,28 @@ private class Manager {
ts.tv_nsec = v1 + v2;
ts.tv_sec += ts.tv_nsec / (1000 * 1000 * 1000);
ts.tv_nsec %= (1000 * 1000 * 1000);
return pthread_cond_timedwait(&self.cond, &self.mutex, &ts)
return pthread_cond_timedwait(&self.cond, self.mutex, &ts)
}
func signal(){
pthread_mutex_lock(&mutex)
pthread_mutex_lock(mutex)
pthread_cond_signal(&cond)
pthread_mutex_unlock(&mutex)
pthread_mutex_unlock(mutex)
}
func add(_ websocket: InnerWebSocket) {
pthread_mutex_lock(&mutex)
pthread_mutex_lock(mutex)
websockets.insert(websocket)
pthread_cond_signal(&cond)
pthread_mutex_unlock(&mutex)
pthread_mutex_unlock(mutex)
}
func remove(_ websocket: InnerWebSocket) {
pthread_mutex_lock(&mutex)
pthread_mutex_lock(mutex)
websockets.remove(websocket)
pthread_cond_signal(&cond)
pthread_mutex_unlock(&mutex)
pthread_mutex_unlock(mutex)
}
func nextId() -> Int {
pthread_mutex_lock(&mutex)
defer { pthread_mutex_unlock(&mutex) }
pthread_mutex_lock(mutex)
defer { pthread_mutex_unlock(mutex) }
_nextId += 1
return _nextId
}
Expand All @@ -1693,11 +1700,17 @@ private class Manager {
private let manager = Manager()

/// WebSocket objects are bidirectional network streams that communicate over HTTP. RFC 6455.
@objcMembers
open class WebSocket: NSObject {
fileprivate var ws: InnerWebSocket
fileprivate var id = manager.nextId()
fileprivate var opened: Bool

open override var hash: Int { return id }
open override func isEqual(_ other: Any?) -> Bool {
guard let other = other as? WebSocket else { return false }
return self.id == other.id
}

/// Create a WebSocket connection to a URL; this should be the URL to which the WebSocket server will respond.
public convenience init(_ url: String){
Expand Down Expand Up @@ -1861,6 +1874,7 @@ public func ==(lhs: WebSocket, rhs: WebSocket) -> Bool {

extension WebSocket {
/// The events of the WebSocket using a delegate.
@objc
public var delegate : WebSocketDelegate? {
get { return ws.eventDelegate }
set { ws.eventDelegate = newValue }
Expand Down