Migrating to Phoenix Liveview Streams

Phoenix released 1.7.0 with Liveview 0.18.16 and introduced Streams for liveview. At first it was not clear to me how to replace my current prepend logic from the published blog  So I reached out to its creator of phoenix Chris McCord on twitter and he not only responded, even took out time to review my code and share what would be the way to get it done with stream. This is why elixir community is awesome.


The thing that was not clear from the blog post introducing streams was how would one send additional data over streams that is part of the page but not the actual stream. Our in house liveview app Ultronex used the prepend and it clearly could benefit from the streams and was an obvious choice for upgrade with streams release.

Here is how the old liveviewprepend code

defmodule UltronexWeb.Stream.LiveView do
  use UltronexWeb, :live_view

  @queue "stream"
  @cache :ultronex_cachex

  def mount(_params, _session, socket) do
    if connected?(socket), do: UltronexWeb.Endpoint.subscribe(@queue)
    {:ok, base_stream(socket), temporary_assigns: [msg: []]}
  end

  def handle_info(%{event: "success", payload: payload}, socket) do
    {:noreply,
     assign(socket,
       term: get_filter_term(socket.id),
       msg: filter_msg(socket.id, payload),
       node: nodename()
     )}
  end

  def handle_info(%{event: "danger", payload: payload}, socket) do
    {:noreply,
     assign(socket,
       term: get_filter_term(socket.id),
       msg: filter_msg(socket.id, payload),
       node: nodename()
     )}
  end

  def handle_info(%{event: "warning", payload: payload}, socket) do
    {:noreply,
     assign(socket,
       term: get_filter_term(socket.id),
       msg: filter_msg(socket.id, payload),
       node: nodename()
     )}
  end

  def handle_event(
        _event,
        %{"filter" => %{"term" => incoming_message_filter}},
        socket
      ) do
    if String.trim(incoming_message_filter) == "",
      do: Cachex.del(@cache, socket.id),
      else: Cachex.put(@cache, socket.id, {:ok, incoming_message_filter})

    {:noreply, assign(socket, term: incoming_message_filter, msg: :empty, node: nodename())}
  end

  defp base_stream(socket) do
    assign(socket, msg: :empty, term: "", node: nodename())
  end

  defp filter_msg(socket_id, msg) do
    case Cachex.get(@cache, socket_id) do
      {:ok, {:ok, filter}} ->
        if String.match?(msg.payload, ~r/#{filter}/), do: msg, else: :empty

      {:ok, nil} ->
        msg
    end
  end

  defp get_filter_term(socket_id) do
    case Cachex.get(@cache, socket_id) do
      {:ok, {:ok, filter}} ->
        filter

      {:ok, nil} ->
        ""
    end
  end

  defp nodename() do
    Ultronex.Utility.Helper.nodename()
  end
end
<div class="row">
  <div id="success-content" class="col-sm-4 mt-3">
    <div id="success-content-prepend" phx-update="prepend">
      <%= if @msg != :empty && @msg.message_type == "success" do %>
        <UltronexWeb.Components.Message.card msg={@msg} />
      <% end %>
    </div>
  </div>
  <div id="danger-content" class="col-sm-4 mt-3">
    <div id="danger-content-prepend" phx-update="prepend">
      <%= if @msg != :empty && @msg.message_type == "danger" do %>
        <UltronexWeb.Components.Message.card msg={@msg} />
      <% end %>
    </div>
  </div>
  <div id="warning-content" class="col-sm-4 mt-3">
    <div id="warning-content-prepend" phx-update="prepend">
      <%= if @msg != :empty && @msg.message_type == "warning" do %>
        <UltronexWeb.Components.Message.card msg={@msg} />
      <% end %>
    </div>
  </div>
</div>

Here is the updated liveview code based on streams

defmodule UltronexWeb.Stream.LiveView do
  use UltronexWeb, :live_view

  @queue "stream"
  @cache :ultronex_cachex

  def mount(_params, _session, socket) do
    if connected?(socket), do: UltronexWeb.Endpoint.subscribe(@queue)
    {:ok, base_stream(socket)}
  end

  def handle_info(%{event: "success", payload: payload}, socket) do
    {:noreply,
     assign(socket,
       term: get_filter_term(socket.id),
       node: nodename()
     )
     |> stream_insert(:msgs, filter_msg(socket.id, payload), at: 0)}
  end

  def handle_info(%{event: "danger", payload: payload}, socket) do
    {:noreply,
     assign(socket,
       term: get_filter_term(socket.id),
       node: nodename()
     )
     |> stream_insert(:msgs, filter_msg(socket.id, payload), at: 0)}
  end

  def handle_info(%{event: "warning", payload: payload}, socket) do
    {:noreply,
     assign(socket,
       term: get_filter_term(socket.id),
       node: nodename()
     )
     |> stream_insert(:msgs, filter_msg(socket.id, payload), at: 0)}
  end

  def handle_event(
        _event,
        %{"filter" => %{"term" => incoming_message_filter}},
        socket
      ) do
    if String.trim(incoming_message_filter) == "",
      do: Cachex.del(@cache, socket.id),
      else: Cachex.put(@cache, socket.id, {:ok, incoming_message_filter})

    {:noreply,
     assign(socket, term: incoming_message_filter, node: nodename())
     |> stream_insert(:msgs, %{payload: :empty, ksuid: Ksuid.generate()}, at: 0)}
  end

  defp base_stream(socket) do
    assign(socket, term: "", node: nodename())
    |> stream(:msgs, [%{payload: :empty, ksuid: Ksuid.generate()}], dom_id: &"msgs-#{&1.ksuid}")
  end

  defp filter_msg(socket_id, msg) do
    case Cachex.get(@cache, socket_id) do
      {:ok, {:ok, filter}} ->
        if String.match?(msg.payload, ~r/#{filter}/),
          do: msg,
          else: %{payload: :empty, ksuid: Ksuid.generate()}

      {:ok, nil} ->
        msg
    end
  end

  defp get_filter_term(socket_id) do
    case Cachex.get(@cache, socket_id) do
      {:ok, {:ok, filter}} ->
        filter

      {:ok, nil} ->
        ""
    end
  end

  defp nodename() do
    Ultronex.Utility.Helper.nodename()
  end
end
<div class="row">
  <div id="success-content" class="col-sm-4 mt-3">
    <div id="success-content-prepend" phx-update="stream">
      <div :for={{dom_id, msg} <- @streams.msgs} id={dom_id}>
        <%= if msg.payload != :empty && msg.message_type == "success" do %>
          <UltronexWeb.Components.Stream.card msg={msg} />
        <% end %>
      </div>
    </div>
  </div>
  <div id="danger-content" class="col-sm-4 mt-3">
    <div id="danger-content-prepend" phx-update="stream">
      <div :for={{dom_id, msg} <- @streams.msgs} id={dom_id}>
        <%= if msg.payload != :empty && msg.message_type == "danger" do %>
          <UltronexWeb.Components.Stream.card msg={msg} />
        <% end %>
      </div>
    </div>
  </div>
  <div id="warning-content" class="col-sm-4 mt-3">
    <div id="warning-content-prepend" phx-update="stream">
      <div :for={{dom_id, msg} <- @streams.msgs} id={dom_id}>
        <%= if msg.payload != :empty && msg.message_type == "warning" do %>
          <UltronexWeb.Components.Stream.card msg={msg} />
        <% end %>
      </div>
    </div>
  </div>
</div>

Streams definitely have a better developer experience compared to the append and prepend design. At times the release docs or blog posts don't cover all the use cases that are out there implemented and stalls upgrades. The elixir community really makes it possible for the developers to adopt the latest changes seamlessly.

View Comments