UP | HOME

6 Generic server processes

Table of Contents

GenServer

https://livebook.manning.com/#!/book/elixir-in-action-second-edition/chapter-6/v-2/20

1 Building a generic server process

构建一个通用的服务器进程

  • Spawn a separate process.
  • Run an infinite loop in the process.
  • Maintain the process state.
  • React to messages.
  • Send a response back to the caller.

无论您运行什么样的服务器进程,都需要执行这些任务, GenServer 就是干了这些工作

1.1 Plugging in with modules

模块名是atom, 可以存到一个变量

iex(1)> some_module = IO
iex(2)> some_module.puts("Hello")
Hello

实现一个通用的模块

  • 接受模块(callback module)作为参数
  • 在process state 维护 module atom
  • 在需要时调用callback module的函数

1.2 Implementing the generic code

实现一个通过的 ServerProcess

首先是启动 process, 并且初始化状态

defmodule ServerProcess do
  def start(callback_module) do
    spawn(fn ->
      initial_state = callback_module.init()
      loop(callback_module, initial_state)
    end)
  end

  ...
end

ServerProcess.start/1 函数参数是 callbackmodule, 然后启动进程执行 init/0 函数创建初始state

callback module 必须实现 init/0 函数

最后 进入loop函数, start/1 返回的是pid, 向pid发送消息, 进程处理request

接下来,需要实现loop函数,等待消息并处理它们, 在这个例子中, 将实现一个同步的发送和响应通信模式, server process 接受消息, 处理, 返回结果, 更改进程 state

loop 实现

defmodule ServerProcess do
  ...
  defp loop(callback_module, current_state) do
    receive do
      {request, caller} ->
        # 处理请求
        {response, new_state} =
          callback_module.handle_call(
            request,
            current_state
          )

        # 返回结果
        send(caller, {:response, response})

        # 继续loop 
        loop(callback_module, new_state)
    end
  end
  ...
end

handle_call/2 处理请求 返回 {response, new_state} 元组

还有一件事需要做:你需要提供一个函数来向服务器进程发出请求。

defmodule ServerProcess do
  ...
  def call(server_pid, request) do
    send(server_pid, {request, self()})

    receive do
      {:response, response} ->
        response
    end
  end
end

1.3 Using the generic abstraction 如何使用我们自己实现的GenServer?

实现一个 kv store

实现 init/0handle_call/2 函数

defmodule KeyValueStore do
  def init do
    %{}
  end

  def handle_call({:put, key, value}, state) do
    {:ok, Map.put(state, key, value)}
  end

  def handle_call({:get, key}, state) do
    {Map.get(state, key), state}
  end
end

我们只需要关注具体的逻辑实现, 创建进程, loop 这些活, 上面的 ServerProcess 帮我们干了

测试

iex(1)> pid = ServerProcess.start(KeyValueStore)

iex(2)> ServerProcess.call(pid, {:put, :some_key, :some_value})
:ok

iex(3)> ServerProcess.call(pid, {:get, :some_key})
:some_value

封装下 get, put 隐藏 ServerProcess 抽象, KeyValueStore 可以不用关心具体的 ServerProcess

defmodule KeyValueStore do
  def start do
    ServerProcess.start(KeyValueStore)
  end

  def put(pid, key, value) do
    ServerProcess.call(pid, {:put, key, value})
  end

  def get(pid, key) do
    ServerProcess.call(pid, {:get, key})
  end

  ...
end

1.4 Supporting asynchronous requests

支持异步请求

使用call进行同步请求, cast 进行异步请求

加入新的请求消息类型

defmodule ServerProcess do
  ...
  def call(server_pid, request) do
    send(server_pid, {:call, request, self()}) # 请求消息类型是 call 
    ...
  end

  defp loop(callback_module, current_state) do
    receive do
      {:call, request, caller} ->  # 处理call请求
         ...
    end
  end

  ...
end

加入 cast 支持

没有响应被发回给调用者,所以回调函数只能返回新的状态

defmodule ServerProcess do
  ...
  # cast 异步请求
  def cast(server_pid, request) do
    send(server_pid, {:cast, request})
  end

  defp loop(callback_module, current_state) do
    receive do
      {:call, request, caller} ->
        ...

      # 处理 cast 异步请求
      {:cast, request} ->
        new_state =
          callback_module.handle_cast(
            request,
            current_state
          )

        loop(callback_module, new_state)
    end
  end

  ...
end

处理cast请求 要实现 handle_cast/2 回调函数, 该函数必须处理消息并返回新的状态。 serverloop 中调用回调函数, 执行新的loop

实现 key-value store 的 cast 异步请求

defmodule KeyValueStore do
  ...

  def put(pid, key, value) do
    ServerProcess.cast(pid, {:put, key, value})
  end

  ...

  def handle_cast({:put, key, value}, state) do
    Map.put(state, key, value)
  end

  ...
end

put 请求使用cast, 因为客户端并不需要等待服务端返回响应

测试代码

iex(1)> pid = KeyValueStore.start()

iex(2)> KeyValueStore.put(pid, :some_key, :some_value)

iex(3)> KeyValueStore.get(pid, :some_key)
:some_value

2 Using GenServer

生产环境就没有必要自己手动实现 ServerProcess 了, 使用 GenServer 就行了

GenServer 特性

  • Support for calls and casts
  • Customizable timeouts for call requests
  • Propagation of server-process crashes to client processes waiting for a response
  • Support for distributed systems

OTP behaviours

  • genserver — Generic implementation of a stateful server process
  • supervisor — Provides error handling and recovery in concurrent systems
  • application — Generic implementation of components and libraries
  • genevent — Provides event-handling support
  • genstatem — Runs a finite state machine in a stateful server process

2.1 Plugging into GenServer

GenServer 的使用和之前的 ServerProcess 思想是一样的

总的来说,GenServer的行为需要七个回调函数,但通常你只需要其中的一部分

iex(1)> defmodule KeyValueStore do
          use GenServer
        end

use 宏在编译时调用 GenServer 模块的宏, 宏会把一系列函数注入到 KeyValueStore

看下有哪些函数

iex(2)> KeyValueStore.__info__(:functions)
[child_spec: 1, code_change: 3, handle_call: 3, handle_cast: 2,
 handle_info: 2, init: 1, terminate: 2]

因为 use GenServer 默认实现代码被注入到 KeyValueStore 模块中

使用 GenServer.start/2 启动一个genserver进程

iex(3)> GenServer.start(KeyValueStore, nil)
{:ok, #PID<0.51.0>}

GenServer.start/2 传入自定义参数初始人进程,目前不需要为 nil 成功返回值是 {:ok, pid}

2.2 Handling requests

实现 init/1 handle_cast/2 handle_call/3

  • init/1 接受一个参数, 由 GenServer.start/2 第二个参数提供
  • init/1 返回结果 {:ok, initial_state}
  • handle_cast/2 接收request和state 返回 {:noreply, new_state}
  • handle_call/3 接收request和caller(请求者信息), state 返回 {:reply, response, new_tate}
defmodule KeyValueStore do
  use GenServer

  def init(_) do
    {:ok, %{}}
  end

  def handle_cast({:put, key, value}, state) do
    {:noreply, Map.put(state, key, value)}
  end

  def handle_call({:get, key}, _, state) do
    {:reply, Map.get(state, key), state}
  end
end

GenServer.start/2 启动进程 GenServer.cast/2 GenServer.call/2 发出请求

defmodule KeyValueStore do
  use GenServer

  def start do
    GenServer.start(KeyValueStore, nil)
  end

  def put(pid, key, value) do
    GenServer.cast(pid, {:put, key, value})
  end

  def get(pid, key) do
    GenServer.call(pid, {:get, key})
  end
  ...
end

测试server代码

iex(1)> {:ok, pid} = KeyValueStore.start()

iex(2)> KeyValueStore.put(pid, :some_key, :some_value)

iex(3)> KeyValueStore.get(pid, :some_key)
:some_value

GenServer.start/2 是同步调用, start/2~返回要 ~init/1 初始化进程完成返回

因此,启动服务器的客户端进程将被阻止,直到服务器进程初始化。

最后请注意, GenServer.call/2 不会无限期地等待响应, 默认情况下, 如果响应消息未在5秒内响应, 则客户端进程中会抛出错误

GenServer.call(pid, request, timeout) 添加timeout自定义超时时间

2.3 Handling plain messages 处理普通消息

callcast 的请求

defmodule ServerProcess do
  ...

  def call(server_pid, request) do
    send(server_pid, {:call, request, self()})
    ...
  end

  def cast(server_pid, request) do
    send(server_pid, {:cast, request})
  end

  ...

  defp loop(callback_module, current_state) do
    receive do
      {:call, request, caller} ->
        ...

      {:cast, request} ->
        ...
    end
  end
  ...
end

ServerProcess 中消息类型有 :call:cast 对于 GenServer 则是 :"$gencast" 和 :"$gencall"

有时可能需要处理不是特定于Ge​​nServer的消息, 比如场景定时清理server state

可以使用 :timer.send_interval/2 定时发送消息给进程, 这些消息不是 callcast

相反,对于这样的普通消息,GenServer会调用 handle_info/2 调函数

iex(1)> defmodule KeyValueStore do
          use GenServer

          def init(_) do
            :timer.send_interval(5000, :cleanup)
            {:ok, %{}}
          end

          def handle_info(:cleanup, state) do
            IO.puts "performing cleanup..."
            {:noreply, state}
          end

          def handle_info(unknown_message, state) do
            super(unknown_message, state)
          end
        end

iex(2)> GenServer.start(KeyValueStore, nil)
performing cleanup...
performing cleanup...
performing cleanup...

在进程初始化期间,确保每五秒发送一次 :cleanup 清理消息到进程

handle_info/2 中处理, 返回 {:noreply, new_state}

其中 下面的代码匹配所有其它的消息, 进程可能会偶尔收到VM特定的消息,即使您没有要求。

def handle_info(unknown_message, state) do
  super(unknown_message, state)
end

super 调用默认实现, 这个默认实现会记录一个错误,它不会使进程崩溃

如下发送不确定的消息

iex(3)> {:ok, pid} = GenServer.start(KeyValueStore, nil)

iex(4)> send(pid, :some_message)

[error] KeyValueStore #PID<0.106.0> received unexpected message in
        handle_info/2: :some_message

2.4 Other GenServer features

2.4.1 编译期检查

定义moudle 属性 在callback函数 指定 @impl GenServer

iex(1)> defmodule EchoServer do
          use GenServer

          @impl GenServer
          def handle_call(some_request, server_state) do
            {:reply, some_request, server_state}
          end
        end

函数定义出错warning

warning: got "@impl GenServer" for function handle_call/2 but this
behaviour does not specify such callback.

2.4.2 Name registration

进程别名

GenServer.start(
  CallbackModule,
  init_param,
  name: :some_name
)

请求时并不需要 pid

GenServer.call(:some_name, ...)
GenServer.cast(:some_name, ...)

一般和module名字相同,模块名也是 atom

defmodule KeyValueStore do
  def start() do
    GenServer.start(KeyValueStore, nil, name: KeyValueStore)
  end

  def put(key, value) do
    GenServer.cast(KeyValueStore, {:put, key, value})
  end

  ...
end

使用 __MODULE__

defmodule KeyValueStore do
  def start() do
    GenServer.start(__MODULE__, nil, name: __MODULE__)
  end

  def put(key, value) do
    GenServer.cast(__MODULE__, {:put, key, value})
  end

  ...
end

2.4.3 Stopping the server

  • init/1 返回 {:stop, reason} 或者 :ignore 时不启动进程
  • init/1 返回 {:stop, reason} 时, start/2 返回 {:error, reason}, init/1 返回 :ignore, start/2 返回 :ignore
  • 对于 handle_* callback 返回 {:stop, reason, new_state} 时停止服务端进程, 如果是正常停止, reason 为 :normal
  • handle_call/3 中,想在进程终止前返回 response 给 client 则返回 {:stop, reason, response, new_state}
  • 在进程终止前, GenServer 会调用 terminate/2 callback
  • 最后,您还可以通过从客户端进程调用 GenServer.stop/3 来停止服务器进程, 该调用将向服务器发出同步请求, 该行为将通过停止服务器进程来停止请求本身

2.4.4 Process life cycle

进程生命周期

7121be43gy1fr8omau26ej20ln0cytah.jpg

Author: lidashuang

Created: 2018-05-12 Sat 17:30

Emacs 25.3.3 (Org mode 8.2.10)