Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

This documents the use of receive_first/send_first and select #279

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions guides/concurrency.md
Original file line number Diff line number Diff line change
Expand Up @@ -377,3 +377,60 @@ After send
```

Note that the first 2 sends are executed without switching to another fiber. However, in the third send the channel's buffer is full, so execution goes to the main fiber. Here the two values are received and the channel is depleted. At the third `receive` the main fiber blocks and execution goes to the other fiber, which sends more values, finishes, etc.

### Multiple channels

It is possible to try multiple channels simultaneously (for example, a server application receiving data from multiple clients, or to write data to multiple I/O backends). There are two ways to do this, which you choose depends on whether you need to know on which channel the operation occurred.

#### Unnown channel

Use `receive_first` to receive a message from the first channel of a set that has data available or `send_first` to send a message to the first channel of a set that is ready to receive, without knowing on which channel the operation occurred. Both methods accept an array of channels, a tuple of channels, or channels as individual parameters.

For example, here we create 3 channels and pass them to 3 fibers that wait for a certain number of seconds, write a message to their channel, then close the channel. The main loop receives each message without caring which channel it was sent on and deletes any closed channel from the array of channels:

```crystal
def send_after(interval : Int, channel : Channel(String))
sleep(interval)
channel.send("Sent after #{interval}s")
channel.close
end

channels = [] of Channel(String)

(1..3).each do |seconds|
new_channel = Channel(String).new
spawn send_after(seconds, new_channel)
channels << new_channel
end

while channels.size > 0
puts Channel.receive_first(channels)
Fiber.yield
channels.reject! { |c| c.closed? }
end
```

Note the `yield` after the receive. When a fiber sends a message it blocks and execution continues at `receive_first` in the main loop, just like a regular `receive`. If the main loop returned to `receive_first` without yielding then the fiber would close the channel _after_ the `receive_first` so the main loop would never exit.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps receive_first should return nil if all channels are closed, then you can do

while msg = Channel.receive_first(channels)
  puts msg
end

and avoid deleting any channels.


#### Known channel

If you need to know which channel data was received from (or which channel data was sent on) use `select`. This is conceptually similar to the POSIX system call `select()` (which monitors a set of file descriptors and returns those that are ready for I/O) but is easier to use.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Channel.select is designed to be used by the select statement, i'm not sure we should document it directly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's important to describe what the select statement expands to, because sometime you need a fine tuned Channel.select more than a usual select statement.
But the note about the select() syscall is not necessary here to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Channel.select is a very low-level primitive designed around the compiler. I don't think crystal has enough high-level primitives for selecting from an array of channels.

Copy link
Author

@joatca joatca Aug 31, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if Channel.select is a low-level primitive and there aren't high-level primitives to select from an array of channels, what's the approved method to do something like this? Specifically, what's the idiomatic way to implement, say, a TCP server that can accept multiple client connections and service all of them in a non-blocking way without forking processes?

Also, can you point me at any documentation for the select statement?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@joatca you'd usually just process each client in a different fiber.

And unfortunately there isn't any docs on the select statement apart from crystal-lang/crystal#3130

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, if I get some time I'll try documenting select.

Sure, every client has to be processed in a different fiber to avoid blocking, but what if there is global state that the program maintains? You could use an external database and have it handle the locking, but that implies one database connection per client fiber which sounds undesirable. You shouldn't access global state (or a global database connection) from multiple fibers because of data races, so you'd need to maintain another "service" fiber that maintains the state or database connection (or other API connection, or whatever) - but then that fiber has to respond to state requests on multiple channels - at least one per client - so that state changes are isolated to the service fiber.

As you already pointed out, the select statement isn't flexible enough to handle an arbitrary number of clients and I think this is a common enough use-case that we should document Channel.select until a better high-level mechanism is available.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@joatca recieve_first and send_first are the current high-level interfaces, and they work quite well. Perhaps some tweaks would be nice, but the only thing missing is the ability to send and receive at the same time. Perhaps there should be a higher level Channel.select which takes an array of recieve channels and an array of send channels

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think another thing that's missing is that Channel.select returns the index of the channel as well as the message, but receive_first only returns the message so if a response has to go back to a particular fiber then the code has to identify it in some other way, probably by passing it as part of the message. Ideally we'd have both receive_first and receive_first_with_index. If I submitted a patch with that functionality do you think it would be well-received (pun not intended)?

Anyway, as you say, this interface needs some tweaks so perhaps it shouldn't be documented yet.

Copy link
Contributor

@RX14 RX14 Sep 1, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@joatca yes, I think it'd be merged. I also would like to see select handling closed channels better. If all the channels are closed, then any select over them is a deadlock and that should return nil.


First construct a series of `Channel::SelectAction` objects which associate each channel with either a receive operation or a send operation. For example, assume you are using two channels of type `Channel(String)`, reading a string from `a` and sending a string to `b`. This code will try both operations simultaneously, the `select` will block and return the index of the operation that eventually completes, as well as any message received (or `nil` for a send operation):

```crystal
index, msg = Channel.select( a.receive_select_action,
b.send_select_action("data to send") )
case index
when 0
unless msg.nil?
# process received data
end
when 1
# note that data was sent
end
```

Like `receive_first` and `send_first` above, `select` accepts `SelectAction` objects as an array, a tuple, or as individual parameters.

This technique is commonly used for a network server with multiple connected clients. A main loop would spawn a fiber to listen for new connections and send them back via a channel, where it would be received using `select`. It would spawn another fiber (with associated channel) for each new connection, adding an appropriate `SelectAction` for each channel to the `select`, maintain all processing state centrally and delegating network I/O to the fibers via the channel list.