Playing with Elixir streams in IEx
- elixir
- erlang
- stream
Today I wanted to play with Elixir Stream to mimic the ideas of Observable collections from the Rx (Reactive Extension) concepts. The best playground is the IEx, the interactive shell for Elixir. However, in order to achieve the results that I wanted there was a lot of details that you need to know before having fun with it.
I'm glad that I had some help from @rcillo a co-worker of mine that have been studying Elixir for quite some time. We paired to achieve the results that we wanted. It took just 10~15min to get the desired result.
So what are we going to see in this posts:
- running multiples shells
- naming process
- IO device
So, let's start!
Elixir version used 0.15.1
The Elixir stream code snippet
The concept that I wanted to try is use the GenEvent
stream support and the for
syntax. This is the snippet:
{:ok, manager} = GenEvent.start_link
stream = GenEvent.stream(manager)
for x <- stream, do: IO.puts(x)
manager.notify("Hello World")
Pretty simple. I'm not going to dig each line of the snippet, the basic idea is:
given a stream of events, for each event published I want to print it on the screen
Side note
(You can advance to the next section if you want)
I'm passionate for new programming languages learning new ones is a way to always keep thinking different. The for
notation in Elixir is available in other languages. (Each have different details in implementation, but all proposes the same feature)
Haskell have the do-notation and list-comprehension
main = do
x <- doSomeComputation
y <- doSomeComputation
return (x, y)
-- list comprehension
[x | x <- [1...5]]
In Scala we would have:
for {
x <- doSomeComputation
y <- doSomeComputation
} yield (x, y)
And C# LINQ also supports it
from x in doSomeComputation
from y in doSomeComputation
select new { x, y }
So, go check this stuff if you never saw them.
Running on IEx
So, to run the IEx
you just do
> iex
Then you can start writing Elixir code on it. So we the code that we want to test, but it didn't worked as expected.
iex> {:ok, manager} = GenEvent.start_link
{:ok, #PID<0.172.0>}
iex> stream = GenEvent.stream(manager)
%GenEvent.Stream{duration: :infinity, id: nil, manager: :eventor, mode: :ack, timeout: :infinity}
iex> for x <- stream, do: IO.puts(stream)
BOOM! console freezes
Well, once you write the for
code, the console is blocked because the stream is infinity. The <-
notation can be considered as a generator. You can read it as "for each x generated by stream, do...". We could stop it by stopping the GenEvent
process. But, how to do that if the console is blocked?
Spawning local shells
I didn't know about it, but @rcillo showed me some tricks. You can type the C-g
(control-g) and you will get a prompt where you can type some commands.
User switch command
--> h
c [nn] - connect to job
i [nn] - interrupt job
k [nn] - kill job
j - list all jobs
s [shell] - start local shell
r [node [shell]] - start remote shell
q - quit erlang
? | h - this message
Type h
to get some helps. If you type j
you will get all shells running with its job number.
--> j
1 {erlang,apply,[#Fun<Elixir.IEx.CLI.0.105530432>,[]]}
2 {'Elixir.IEx',start,[]}
3* {'Elixir.IEx',start,[]}
With this you are able to switch between shells by using c <job-number>
.
In order to spawn a new shell with IEx, you do:
--> s 'Elixir.IEx'
Note that it is with single quotes (double quotes will fail). In case the shell is seems not to be responsive, just hit ENTER and you it will show the iex>
prompt.
So, now that you have two shells, you just go and try:
iex> GenEvent.notify(manager, "hello world")
** (RuntimeError) undefined function: manager/0
Well, it didn't work. How can we fix this? We need to register the process with a name.
Registering processes
Erlang provides a register/2
method that allows you bind a PID process to a name, this way we will have access to it.
iex> {:ok, manager} = GenEvent.start_link
{:ok, #PID<0.185.0>}
iex> :erlang.register(:eventor, manager)
true
iex> :erlang.whereis(:eventor)
#PID<0.185.0>
Ok, now we are able to just GenEvent.notify(:eventor, "Hello World")
from both shells.
But, you just send notify
messages but nothing prints in the console. You might think that it will the test the shell that is listening for events, but it don't. What is the problem? In order to print the texts in console there is the concept of devices in Erlang (that I will not discuss it here, because I don't know anything about it). But, let's think that it is similar to the STDOUT. The problem is that the shell with for
snippet is blocked, and the IO.puts
there can't flush its content, and your second shell have a different "device".
We need to specify which device do we want the IO.puts
to print out.
Specifying the device where IO.puts prints out
If you check the IO.puts
doc you will see the following doc:
puts(device \\ :erlang.group_leader(), item)
So, we can specify the first argument the device that we want to output.
Each shell, has its own :erlang.group_leader()
, what we will do is register the group_leader that we want the text to output.
iex> :erlang.register(:globalio, :erlang.group_leader())
true
iex> IO.puts(:globalio, "Hello World")
Hello World
So what we have to do is just rewrite the for
code with the following code:
iex> for x <- stream, do: IO.puts(:globalio, x)
Now, every notify
event that you send will be printed out on the console! Yay =)
Summary
We saw a lot of things, so let's reproduce the steps in order. Considering the iex(n)>
to identify each shell.
iex(1)> {:ok, manager} = GenEvent.start_link
{:ok, #PID<0.185.0>}
iex(1)> :erlang.register(:eventor, manager)
true
Type C-g
s 'Elixir.IEx'
iex(2)> :erlang.register(:globalio, :erlang.group_leader())
true
Type C-g
--> c 1
iex(1)> stream = GenEvent.stream(manager)
%GenEvent.Stream{duration: :infinity, id: nil, manager: :eventor, mode: :ack, timeout: :infinity}
iex(1)> for x <- stream, do: IO.puts(:globalio, x)
Type C-g
--> c 2
iex(2)> GenEvent.notify(:eventor, "Hello World")
Hello World
:ok
Ok, now you can keep exploring the Stream API in Elixir. Have fun!