UP | HOME

7 Building a concurrent system

Table of Contents

对于中等复杂的系统来说,运行几千个进程并不罕见,而较大的系统可能由数十万甚至数百万个进程提供动力

这章通过创建一个todo应用,来演示怎么构建一个并发系统

1 7.1 Working with the mix project

通过mix创建项目

$ mix new todo

可以使用 mix compile 命令编译项目, 也可以使用 mix test 运行测试

使用 iex -S mix 启动项目

一些项目约定

  • 把所有代码放在一个顶层module里, 比如 Todo.List, Todo.Server
  • 一个文件应包含一个模块, 如果辅助模块很小并且仅在内部使用, 则它可以与使用它的模块放在同一文件中
  • 模块文件名是下划线命令, 模块名使用驼峰, TodoServer todoserver.ex
  • 文件结构要合理, Todo.Server 模块路径应该在 lib/todo/server.ex

todo 项目 code

https://github.com/sasa1977/elixir-in-action/tree/2nd-edition/code_samples/ch07/todo

2 7.2 Managing multiple to-do lists

要管理多个todo, 两个可能的实现

  • 抽象 TodoListCollection Todo.Server 实现新的抽象
  • 每个todo list运行一个genserver实例

第一种方法的问题是, 最终只能有一个 process 来为所有用户提供服务 如果系统被许多不同的用户使用, 它们将经常相互阻塞, 竞争相同的资源, 因为执行任务都是在单个服务器进程内

运行多个todo server进程, 需要管理状态, 需要一个key value的map保存todo server实现到 pid的映射

3 7.2.1 Implementing a cache

实现todo-cache 服务, 它将用于创建和返回与给定名称对应的待办事项服务器进程的pid

defmodule Todo.Cache do
  use GenServer

  def start do
    GenServer.start(__MODULE__, nil)
  end

  def server_process(cache_pid, todo_list_name) do
    GenServer.call(cache_pid, {:server_process, todo_list_name})
  end

  @impl GenServer
  def init(_) do
    {:ok, %{}}
  end

  @impl GenServer
  def handle_call({:server_process, todo_list_name}, _, todo_servers) do
    case Map.fetch(todo_servers, todo_list_name) do
      {:ok, todo_server} ->
        {:reply, todo_server, todo_servers}

      :error ->
        {:ok, new_server} = Todo.Server.start()

        {
          :reply,
          new_server,
          Map.put(todo_servers, todo_list_name, new_server)
        }
    end
  end
end

在iex中测试

iex(1)> {:ok, cache} = Todo.Cache.start()

iex(2)> Todo.Cache.server_process(cache, "Bob's list")
#PID<0.69.0>

iex(3)> Todo.Cache.server_process(cache, "Bob's list")
#PID<0.69.0>

iex(4)> Todo.Cache.server_process(cache, "Alice's list")
#PID<0.72.0>

返回的pid是todo server的进程pid

iex(5)> bobs_list = Todo.Cache.server_process(cache, "Bob's list")

iex(6)> Todo.Server.add_entry(bobs_list,
          %{date: ~D[2018-12-19], title: "Dentist"})

iex(7)> Todo.Server.entries(bobs_list, ~D[2018-12-19])
[%{date: ~D[2018-12-19], id: 1, title: "Dentist"}]

创建 1000000 个todo server进程

iex(1)> {:ok, cache} = Todo.Cache.start()

iex(2)> :erlang.system_info(:process_count)
54

iex(3)> Enum.each(
          1..100_000,
          fn index ->
            Todo.Cache.server_process(cache, "to-do list #{index}")
          end
        )

iex(4)> :erlang.system_info(:process_count)
100054

在这里使用 :erlang.system_info/1 函数来获取当前正在运行的进程数

4 7.2.2 Writing tests

自动化测试, 测试框架 ex_unit 运行测试命令 mix test

测试 Todo.Cache.server_process/2

Test file skeleton (todocache/test/todocachetest.exs)

defmodule TodoCacheTest do
  use ExUnit.Case
  ...
end

测试文件 文件名以 _test.exs 结尾

todo cache测试

Testing serverprocess (todocache/test/todocachetest.exs)

defmodule TodoCacheTest do
  use ExUnit.Case

  test "server_process" do
    {:ok, cache} = Todo.Cache.start()
    bob_pid = Todo.Cache.server_process(cache, "bob")

    assert bob_pid != Todo.Cache.server_process(cache, "alice")
    assert bob_pid == Todo.Cache.server_process(cache, "bob")
  end

  ...
end

todo 操作测试

defmodule TodoCacheTest do
  use ExUnit.Case

  ...

  test "to-do operations" do
    {:ok, cache} = Todo.Cache.start()
    alice = Todo.Cache.server_process(cache, "alice")
    Todo.Server.add_entry(alice, %{date: ~D[2018-12-19], title: "Dentist"})
    entries = Todo.Server.entries(alice, ~D[2018-12-19])

    assert [%{date: ~D[2018-12-19], title: "Dentist"}] = entries
  end
end

5 7.2.3 Analyzing process dependencies 分析进程依赖性

0069RVTdgy1fuu59jzvowj30f404t74k.jpg

这里,每个框表示一个进程, “客户端”框是任意客户端, 例如HTTP请求处理进程

6 7.3 Persisting data 持久化数据

6.1 7.3.1 Encoding and persisting 编码和持久化

函数 :erlang.term_to_binary/1:erlang.binary_to _term/1 序列化和反序列化

实现 Todo.Database, 支持 storeget 请求

defmodule Todo.Database do
  use GenServer

  @db_folder "./persist"

  def start do
    GenServer.start(__MODULE__, nil,
     name: __MODULE__
    )
  end

  def store(key, data) do
    GenServer.cast(__MODULE__, {:store, key, data})
  end

  def get(key) do
    GenServer.call(__MODULE__, {:get, key})
  end

  def init(_) do
    File.mkdir_p!(@db_folder)
    {:ok, nil}
  end

  def handle_cast({:store, key, data}, state) do
    key
    |> file_name()
    |> File.write!(:erlang.term_to_binary(data))

    {:noreply, state}
  end

  def handle_call({:get, key}, _, state) do
    data = case File.read(file_name(key)) do
      {:ok, contents} -> :erlang.binary_to_term(contents)
      _ -> nil
    end

    {:reply, data, state}
  end

  defp file_name(key) do
    Path.join(@db_folder, to_string(key))
  end
end

cast 有个大的问题是caller不知道请求是否成功处理

6.2 7.3.2 Using the database

  • 确保已启动数据库process
  • 每次修改都要保存到数据库
  • 尝试在第一次检索期间从磁盘中获取列表

persistabletodocache/lib/todo/cache.ex

defmodule Todo.Cache do
  ...

  def init(_) do
    Todo.Database.start()
    {:ok, %{}}
  end

  ...
end

todo server 保存数据

defmodule Todo.Server do
  ...
  def handle_cast({:add_entry, new_entry}, {name, todo_list}) do
    new_list = Todo.List.add_entry(todo_list, new_entry)
    Todo.Database.store(name, new_list)
    {:noreply, {name, new_list}}
  end
  ...
end

iex -S mix

iex(1)> {:ok, cache} = Todo.Cache.start()

iex(2)> bobs_list = Todo.Cache.server_process(cache, "bobs_list")

iex(3)> Todo.Server.add_entry(bobs_list,
          %{date: ~D[2018-12-19], title: "Dentist"})

读取数据 persistabletodocache/lib/todo/server.ex

defmodule Todo.Server do
  ...

  def init(name) do
    {:ok, {name, Todo.Database.get(name) || Todo.List.new()}}
  end

  ...
end

读取逻辑在 init/1 回调函数中, 加载磁盘数据库文件可能会耗时比较多, GenServer.start 调用在进程 初始化完返回, 这个操作会阻塞cache 进程, 而cache进程要应对大量的客户端

为了避免这个问题, 有一个简单的伎俩, 您可以使用 init/1 向自己发送内部消息, 然后在相应的handleinfo回调中初始化进程状态

def init(params) do
  send(self(), :real_init)
  {:ok, nil}
 end

...

def handle_info(:real_init, state) do
  ...
end

对于不注册进程名字是没啥问题, 对于进程没有注册名字, 只有知道pid才能发消息, pid 只有init初始化完成了才会返回 因此, 可以确定您发送给自己的消息是第一个被处理的消息

但是如果进程已注册,则有可能其他人会首先通过注册名称引用该进程将消息放入队列中 这个问题有几种解决方法,最简单的方法是不使用 :name 选项, 而是在发送自身消息后, 在 init/1 回调中手动注册进程

def init(params) do
  ...

  send(self(), :real_init)
  register(self(), :some_name)
end

6.3 7.3.3 Analyzing the system

006tNbRwgy1fuvsgvz95pj30eo0a50t8.jpg

  • 同步调用不会无限期地阻塞, 默认是5s
  • 当请求超时时,它不会从接收者的mailbox中删除
  • 超时意味着放弃等待响应, 但消息仍保留在接收方的邮箱中, 并将在某个时刻处理

如果计算可以安全地并行运行, 则应考虑在单独的进程中运行它们 相反, 如果操作必须同步, 则需要在单个进程中运行它

Author: lidashuang

Created: 2018-09-03 Mon 06:06

Emacs 25.3.3 (Org mode 8.2.10)