当前位置: 首页 > 知识库问答 >
问题:

使用Cowboy Websocket客户端进行Elixir测试

况浩邈
2023-03-14

首先,Cowboy和Websockets的文档确实很缺乏,但一般来说,一旦破译,使用它是非常好的。然后将这些信息从Erlang传输到Elixir是另一个步骤。多亏了7Stud的这篇文章,我得以让一个功能正常的websocket用于测试目的,但我无法让它同时监听和可选地发送消息。我认为这是因为receive阻塞了发送所需的线程,而这与websocket连接有内在的联系,所以它在等待接收时不能发送。也许这种认识是有缺陷的。我很想被纠正。我尝试过产卵,但没有成功,这就是为什么我认为receive阻塞了websocket线程。

def ws do
    localhost = 'localhost'
    path = '/ws/app/1'
    port = 5000

    {:ok, _} = :application.ensure_all_started(:gun)
    connect_opts = %{
      connect_timeout: :timer.minutes(1),
      retry: 10,
      retry_timeout: 100
    }

    {:ok, conn_pid} = :gun.open(localhost, port, connect_opts)
    IO.inspect(conn_pid, label: "conn_pid")
    {:ok, protocol} = :gun.await_up(conn_pid)
    IO.inspect(protocol, label: "protocol")
    # Set custom header with cookie for device id
    stream_ref = :gun.ws_upgrade(conn_pid, path, [{"cookie", "device_id=1235"}])
    IO.inspect(stream_ref, label: "stream_ref")
    receive do
      {:gun_upgrade, ^conn_pid, ^stream_ref, ["websocket"], headers} ->
              upgrade_success(conn_pid, headers, stream_ref)
      {:gun_response, ^conn_pid, _, _, status, headers} ->
              exit({:ws_upgrade_failed, status, headers})
      {:gun_error, _conn_pid, _stream_ref, reason} ->
              exit({:ws_upgrade_failed, reason})
      whatever ->
        IO.inspect(whatever, label: "Whatever")
      # More clauses here as needed.
    after 5000 ->
        IO.puts "Took too long!"
        :erlang.exit("barf!")
    end
    :ok
  end

  def upgrade_success(conn_pid, headers, stream_ref) do
    IO.puts("Upgraded #{inspect(conn_pid)}. Success!\nHeaders:\n#{inspect(headers)}\n")

    IO.inspect(self(), label: "upgrade self")
    # This one runs and message is received
    run_test(conn_pid)
    # This should spawn and therefore not block
    listen(conn_pid, stream_ref)
    # This never runs
    run_test(conn_pid)
  end

  def listen(conn_pid, stream_ref) do
    spawn receive_messages(conn_pid, stream_ref)
  end
  def receive_messages(conn_pid, stream_ref) do
    IO.inspect conn_pid, label: "conn_pid!"
    IO.inspect stream_ref, label: "stream_ref!"
    IO.inspect(self(), label: "self pid")
    receive do
      {:gun_ws, ^conn_pid, ^stream_ref, {:text, msg} } ->
          IO.inspect(msg, label: "Message from websocket server:")
      other_messages ->
        IO.inspect(other_messages, label: "Other messages")
    after 5000 ->
      IO.puts "Receive timed out"
    end
    receive_messages(conn_pid, stream_ref)
  end

  def send_message(message, conn_pid) do
    :gun.ws_send(conn_pid, {:text, message})
  end

  def run_test(conn_pid) do
    IO.puts "Running test"
    message = "{\"type\":\"init\",\"body\":{\"device_id\":1234}}"
    send_message(message, conn_pid)
  end

  def stop(conn_pid) do
    :gun.shutdown(conn_pid)
  end

共有1个答案

宋伟泽
2023-03-14

枪械文件

接收数据

Gun为它接收到的每个Websocket消息向所有者进程发送一个Erlang消息。

Gun连接是一个管理到远程endpoint的套接字的Erlang进程。此Gun连接由称为连接所有者的用户进程拥有,并由Gun应用程序的监管树管理。

所有者进程通过从模块Gun调用函数与Gun连接通信。所有函数都异步地执行各自的操作。Gun连接将在任何需要的时候向所有者进程发送Erlang消息。

虽然文档中没有特别提到它,但我非常肯定所有者进程是调用gun:open()的进程。我的尝试还显示,所有者进程必须调用gun:ws_send()。换句话说,所有者进程既要向服务器发送消息,又要从服务器接收消息。

下面的代码使用gen_server操作gun,使gen_server既向服务器发送消息,又从服务器接收消息。

当gun收到来自牛仔http服务器的消息时,gun发送消息,即pid!msg发送到所有者进程。在下面的代码中,gen_serverinit/1回调中创建连接,这意味着gun将砰(!)它在gen_server上从牛仔接收的消息。gen_server使用handle_info()处理直接发送到其邮箱的消息。

handle_cast()中,gen_server使用gun向cowboy发送请求。因为handle_cast()是异步的,这意味着您能够向Cowboy发送异步消息。并且,当gun收到来自牛仔的消息时,gun发送(!)发送到gen_server的消息,gen_server的handle_info()函数处理该消息。在handle_info()内部,调用gen_server:reply/2将消息中继到gen_server客户端。因此,每当gen_server客户端希望检查从Gun发送的服务器消息时,它就可以跳入receive子句。

-module(client).
-behavior(gen_server).
-export([start_server/0, send_sync/1, send_async/1, get_message/2, go/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).
-export([terminate/2, code_change/3]).  %%% client functions
-export([sender/1]).

%%% client functions
%%%

start_server() ->
    gen_server:start({local, ?MODULE}, ?MODULE, [], []).

send_sync(Requ) ->
    gen_server:call(?MODULE, Requ).

send_async(Requ) -> 
    gen_server:cast(?MODULE, {websocket_request, Requ}).

get_message(WebSocketPid, ClientRef) ->
    receive
        {ClientRef, {gun_ws, WebSocketPid, {text, Msg} }} ->
            io:format("Inside get_message(): Ref = ~w~n", [ClientRef]),
            io:format("Client received gun message: ~s~n", [Msg]);
        Other ->
            io:format("Client received other message: ~w~n", [Other])
    end.

receive_loop(WebSocketPid, ClientRef) ->
    receive
        {ClientRef, {gun_ws, WebSocketPid, {text, Msg} }} ->
            io:format("Client received Gun message: ~s~n", [Msg]);
        Other ->
            io:format("Client received other message: ~w~n", [Other])
    end,
    receive_loop(WebSocketPid, ClientRef).

go() ->
    {ok, GenServerPid} = start_server(),
    io:format("[ME]: Inside go(): GenServerPid=~w~n", [GenServerPid]),

    [{conn_pid, ConnPid}, {ref, ClientRef}] = send_sync(get_conn_pid),
    io:format("[ME]: Inside go(): ConnPid=~w~n", [ConnPid]),

    ok = send_async("ABCD"),
    get_message(ConnPid, ClientRef),

    spawn(?MODULE, sender, [1]),

    ok = send_async("XYZ"),
    get_message(ConnPid, ClientRef),

    receive_loop(ConnPid, ClientRef).

sender(Count) -> %Send messages to handle_info() every 3 secs
    send_async(lists:concat(["Hello", Count])),
    timer:sleep(3000),
    sender(Count+1).

%%%%%% gen_server callbacks
%%%

init(_Arg) ->
    {ok, {no_client, ws()}}.

handle_call(get_conn_pid, From={_ClientPid, ClientRef}, _State={_Client, WebSocketPid}) ->
    io:format("[ME]: Inside handle_call(): From = ~w~n", [From]),
    {reply, [{conn_pid, WebSocketPid}, {ref, ClientRef}], _NewState={From, WebSocketPid} };
handle_call(stop, _From, State) ->
    {stop, normal, shutdown_ok, State}; %Calls terminate()
handle_call(_Other, _From, State) ->
    {ok, State}.

handle_cast({websocket_request, Msg}, State={_From, WebSocketPid}) ->
    gun:ws_send(WebSocketPid, {text, Msg}), %{text, "It's raining!"}),
    {noreply, State}.

handle_info(Msg, State={From, _WebSocketPid}) ->
    io:format("[ME]: Inside handle_info(): Msg=~w~n", [Msg]),
    gen_server:reply(From, Msg),
    {noreply, State}.

terminate(_Reason, _State={_From, WebSocketPid}) -> 
    gun:shutdown(WebSocketPid).


code_change(_OldVsn, State, _Extra) ->
    {ok, State}.


%%%% private functions
%%%

ws() ->
    {ok, _} = application:ensure_all_started(gun),
    {ok, ConnPid} = gun:open("localhost", 8080),
    {ok, _Protocol} = gun:await_up(ConnPid),

    gun:ws_upgrade(ConnPid, "/please_upgrade_to_websocket"),

    receive
        {gun_ws_upgrade, ConnPid, ok, Headers} ->
            io:format("[ME]: Inside gun_ws_upgrade receive clause: ~w~n", 
                      [ConnPid]),
            upgrade_success_handler(ConnPid, Headers);
        {gun_response, ConnPid, _, _, Status, Headers} ->
            exit({ws_upgrade_failed, Status, Headers});
        {gun_error, _ConnPid, _StreamRef, Reason} ->
            exit({ws_upgrade_failed, Reason})
    after 1000 ->
        exit(timeout)
    end.


upgrade_success_handler(ConnPid, _Headers) ->
    io:format("[ME]: Inside upgrade_success_handler(): ~w~n", [ConnPid]),  
    ConnPid.

========

哎呀,下面的答案显示了如何让服务器将数据推送到客户机。

好吧,我知道了--在Erlang。这个例子有点折磨人。您需要做几件事:

init(Req, State) ->
    {cowboy_websocket, Req, State}.  %Perform websocket setup

websocket_init(State) ->
    io:format("[ME]: Inside websocket_init"),
    spawn(?MODULE, push, [self(), "Hi, there"]),
    {ok, State}.

push(WebSocketHandleProcess, Greeting) ->
    timer:sleep(4000),
    WebSocketHandleProcess ! {text, Greeting}.

websocket_handle({text, Msg}, State) ->
    timer:sleep(10000), %Don't respond to client request just yet.
    {
     reply, 
     {text, io_lib:format("Server received: ~s", [Msg]) },
     State
    };
websocket_handle(_Other, State) ->  %Ignore
    {ok, State}.

这将在客户机等待对客户机先前发送给服务器的请求的答复时向客户机推送一条消息。

2)如果向运行websocket_*函数的进程发送消息:

Pid ! {text, Msg}

则该消息将由websocket_info()函数处理,而不是websocket_handle()函数:

websocket_info({text, Text}, State) ->
    {reply, {text, Text}, State};
websocket_info(_Other, State) ->
    {ok, State}.
upgrade_success_handler(ConnPid, Headers) ->
    io:format("Upgraded ~w. Success!~nHeaders:~n~p~n", 
              [ConnPid, Headers]),

    gun:ws_send(ConnPid, {text, "It's raining!"}),

    get_messages(ConnPid).  %Move the receive clause into a recursive function

get_messages(ConnPid) ->
    receive
        {gun_ws, ConnPid, {text, "Greeting: " ++ Greeting} } ->
            io:format("~s~n", [Greeting]),
            get_messages(ConnPid);

        {gun_ws, ConnPid, {text, Msg} } ->
            io:format("~s~n", [Msg]),
            get_messages(ConnPid)
    end.
 类似资料:
  • 我已经创建了一个自定义弹性搜索客户端。我需要在各种功能上部署单元测试。我该怎么做呢? 下面是我的客户提供的一个方法。我应该如何在这里部署单元测试? 我该如何着手做这件事呢?

  • 使用通用gRPC客户端 使用Python客户端 使用Java客户端 使用Scala客户端 使用Golang客户端 使用C++客户端

  • 客户端测试更多关心客户端方面的代码执行情况,通常是web浏览器或者浏览器插件。区别于服务器端,代码在客户端执行并直接返回随后的结果。 下列文章描述了如何进行客户端的web应用测试: 基于DOM跨站脚本测试 (OTG-CLIENT-001) JavaScript脚本执行测试 (OTG-CLIENT-002) HTML注入测试 (OTG-CLIENT-003) 客户端URL重定向测试 (OTG-CLI

  • 我们实际上使用了JUnit和FakeSftpServerRule来测试我们定制的SFTP客户端。效果很好。 最后,我们希望摆脱junit,转而使用spock框架,因为我们试图迁移到groovy。 你们知道FakeSftpServerRule的等价物吗?或者,你们知道把junit规则“转换”成spock规则等价物的方法吗? 非常感谢。

  • 1. 创建 Maven 工程 服务端部署完毕后,我们可以新建一个 Maven 工程使用 SOFARegistry 提供的服务。首先新建一个 Maven 工程,然后引入如下依赖: <dependency> <groupId>com.alipay.sofa</groupId> <artifactId>registry-client-all</artifactId> <versi

  • 发起请求 让我们从导入aiohttp模块开始: import aiohttp 好啦,我们来尝试获取一个web页面。比如我们来获取下GitHub的时间轴。 async with aiohttp.ClientSession() as session: async with session.get('https://api.github.com/events') as resp: