'Elixir: Finding Prime Numbers with Workers, Only 1 CPU Used

I'm learning Elixir and decided to write a demo application using GenServer and workers to find prime numbers.

Application

Setup

I created my application as follows:

$ mix new primes
$ cd primes

Structure

$ tree lib
lib/
├── prime_numbers.ex
├── prime_server.ex
└── prime_worker.ex

lib/prime_numbers.ex

This is the module that performs the actual logic to figure out whether or not a given number is a prime number (is_prime/1):

defmodule PrimeNumbers do
  def is_prime(x) when is_number(x) and x == 2, do: true

  def is_prime(x) when is_number(x) and x > 2 do
    from = 2
    to = trunc(:math.sqrt(x))
    n_total = to - from + 1

    n_tried =
      Enum.take_while(from..to, fn i -> rem(x, i) != 0 end)
      |> Enum.count()

    n_total == n_tried
  end

  def is_prime(x) when is_number(x), do: false
end

lib/prime_worker.ex

This is a worker process that uses the PrimeNumbers module (is_prime/1):

defmodule PrimeWorker do
  defstruct number: -1, handled: 0
  use GenServer

  def start(number) do
    GenServer.start(__MODULE__, number)
  end

  def is_prime(pid, x) do
    GenServer.call(pid, {:is_prime, x})
  end

  def stats(pid) do
    GenServer.call(pid, {:stats})
  end

  def init(number) do
    {:ok, %PrimeWorker{number: number, handled: 0}}
  end

  def handle_call({:is_prime, x}, _, state) do
    result = PrimeNumbers.is_prime(x)

    {:reply, result, %PrimeWorker{state | handled: state.handled + 1}}
  end

  def handle_call({:stats}, _, state) do
    {:reply, state.handled, state}
  end
end

It also provides statistics (stats/1) about the amount of requests it handled.

lib/prime_server.ex

This is the server that creates and manages the workers, and provides them for individual computations (is_prime/1):

defmodule PrimeServer do
  defstruct n_workers: 0, workers: %{}
  use GenServer

  def start(n_workers) when is_number(n_workers) and n_workers > 0 do
    GenServer.start(__MODULE__, n_workers)
  end

  def is_prime(pid, x) do
    GenServer.call(pid, {:is_prime, x})
  end

  def worker_stats(pid) do
    GenServer.call(pid, {:worker_stats})
  end

  def init(n_workers) do
    workers =
      for i <- 0..(n_workers - 1),
          into: %{},
          do: {i, (fn {:ok, worker} -> worker end).(PrimeWorker.start(i))}

    {:ok, %PrimeServer{n_workers: n_workers, workers: workers}}
  end

  def handle_call({:is_prime, x}, caller, state) do
    spawn(fn ->
      i_worker = rem(x, state.n_workers)
      worker = Map.get(state.workers, i_worker)
      GenServer.reply(caller, PrimeWorker.is_prime(worker, x))
    end)

    {:noreply, state}
  end

  def handle_call({:worker_stats}, _, state) do
    stats =
      state.workers
      |> Enum.map(fn {i, pid} ->
        stats = PrimeWorker.stats(pid)
        {i, stats}
      end)

    {:reply, stats, state}
  end
end

It provides overall worker stats (worker_stats/1) to demonstrate that the work performed was actually distributed over multiple processes.

primes.exs

This is a test script to run the computation:

args = System.argv()
[n, procs | _] = args
{n, ""} = Integer.parse(n, 10)
{procs, ""} = Integer.parse(procs, 10)

{:ok, pid} = PrimeServer.start(procs)

1..n
|> Stream.filter(fn i -> PrimeServer.is_prime(pid, i) end)
|> Enum.each(fn i -> IO.puts("#{i} is a prime number.") end)

PrimeServer.worker_stats(pid)
|> Enum.each(fn {k, v} -> IO.puts("Worker #{k} handled #{v} jobs.") end)

It can be run as follows, indicating both the upper limit of the numbers to be checked, and the number of workers to be spawned:

$ mix run primes.exs 20 3
2 is a prime number.                                                                                                
5 is a prime number.                                                                                                
7 is a prime number.                                                                                                
11 is a prime number.                                                                                               
13 is a prime number.                                                                                               
17 is a prime number.                                                                                               
19 is a prime number.                                                                                               
Worker 0 handled 6 jobs.                                                                                            
Worker 1 handled 7 jobs.                                                                                            
Worker 2 handled 7 jobs.

The whole project can be found on GitHub

Problem

This all works fine. However, I want the program to make use of my 8 CPU cores. But that does not happen:

$ mix run primes.exs 1000000 8 &
$ htop

htop

Why isn't the work parallelized?

Versions Used

Elixir

$ elixir --version
Erlang/OTP 24 [erts-12.2.1] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:1] [jit]

Elixir 1.13.2 (compiled with Erlang/OTP 24)

Mix

$ mix --version
Erlang/OTP 24 [erts-12.2.1] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:1] [jit]

Mix 1.13.2 (compiled with Erlang/OTP 24)


Solution 1:[1]

I figured out a way to really make my CPU cores work. Now I use concurrency primitives instead of GenServer (prime_workers.exs):

defmodule PrimeNumbers do
  def is_prime(x) when is_number(x) and x == 2, do: true

  def is_prime(x) when is_number(x) and x > 2 do
    from = 2
    to = trunc(:math.sqrt(x))
    n_total = to - from + 1

    n_tried =
      Enum.take_while(from..to, fn i -> rem(x, i) != 0 end)
      |> Enum.count()

    n_total == n_tried
  end

  def is_prime(x) when is_number(x), do: false
end

defmodule PrimeWorker do
  def start() do
    spawn(fn -> loop() end)
  end

  defp loop() do
    receive do
      {:is_prime, x, pid} ->
        send(pid, {:prime_result, x, PrimeNumbers.is_prime(x)})
        loop()

      {:terminate, pid} ->
        send(pid, {:done})
    end
  end
end

defmodule PrimeClient do
  def start() do
    spawn(fn -> loop(0) end)
  end

  def loop(found) do
    receive do
      {:prime_result, _, prime} ->
        if prime do
          loop(found + 1)
        else
          loop(found)
        end

      {:query_primes, pid} ->
        send(pid, {:primes_found, found})
    end

    loop(found)
  end
end

args = System.argv()
[n, n_workers | _] = args
{n, ""} = Integer.parse(n, 10)
{n_workers, ""} = Integer.parse(n_workers, 10)

client = PrimeClient.start()

workers =
  for i <- 0..(n_workers - 1),
      into: %{},
      do: {i, PrimeWorker.start()}

Enum.each(2..n, fn x ->
  i_worker = rem(x, n_workers)
  worker = Map.get(workers, i_worker)
  send(worker, {:is_prime, x, client})
end)

workers
|> Enum.each(fn {_, w} ->
  send(w, {:terminate, self()})

  receive do
    {:done} -> {:nothing}
  end
end)

send(client, {:query_primes, self()})

receive do
  {:primes_found, found} ->
    IO.puts("Found #{found} primes from 2 to #{n}.")
end

Now my CPU cores are properly used:

$ time elixir prime_workers.exs 1000000 1000
Found 78497 primes from 2 to 1000000.

real    0m1.761s
user    0m10.327s
sys     0m0.346s

CPU loads (htop)

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1