Ensure turbo_frame_tag requests src after turbo_stream_from is connected

Hey there,

Context
I’ve been trying to move some turbo_frame_tags from standard show pages to turbo_streams via background workers due to “expensive” calculations. Here’s the summary of what I’m doing

# index page (slim)
= turbo_stream_from current_user, :campaign_list_items

- @campaigns.each do |campaign|
  = turbo_frame_tag dom_id(campaign), src: campaign_list_items_path(campaign), data: {turbo: true} do
    .grid.grid-cols-7.p-3.bt
      .left.w-6.word-wrap = link_to campaign.name, campaign_path(campaign), data: {"turbo-frame": "_top"}
      .right -
      .right -
      .right -
      .right -
      .right -
      .right -
# Controller
module Dashboard
  module Campaigns
    class ListItemsController < DashboardController
      def show
        ::Campaigns::ListItemStatsWorker.perform_async(params[:campaign_id])
        render status: :ok
      end
    end
  end
end
# worker
module Campaigns
  class ListItemStatsWorker
    include Sidekiq::Worker

    def perform(campaign_id)
      return unless (campaign = Campaign.find_by(campaign_id))

      campaign.user.broadcast_update_to(
        campaign.user,
        :campaign_list_items,
        target: "campaign_#{campaign.id}",
        partial: "dashboard/campaigns/list_items/list_item",
        locals: {
          campaign: campaign,
          stats: CampaignStatsPresenter.new(campaign, nil, nil)
        }
      )
    end
  end
end
#partial 
= turbo_frame_tag dom_id(campaign) do
  .grid.grid-cols-7.p-3.bt
    .left.w-6.word-wrap
      = link_to stats.campaign.name, 
        campaign_path(id: stats.campaign.id), 
        class: "primary hover-dark", 
        data: {"turbo-frame": "_top"}

The problem
It looks like the background worker is finishing prior to the page being able to subscribe fully to the channel. Thus the broadcast has nothing that is “listening” and the UI does not get updated.

Moving forward
I tried to add a delay on the worker invocation - but that’s sort of a bandaid and doesn’t work all the time. At this point I’m not sure if I’m just not doing something “the right way”.

Is there a way to fire off the the turbo_frame_tag src request after the turbo_stream_from finishes it’s connection?

Does anyone have any ideas?

I might consider the following sequence.

  1. Load HTML Page (including turbo-frame)
  2. After page loads, turbo-frame request controller which queues the expensive job
  3. Job updates original HTML page via turbo-stream

If this is a common pattern, I might consider making the controller more generic and allow passing params which specify the job to run.

Thanks for responding!

I may be misunderstanding you, but believe that’s exactly the process that I’m doing above.

1. Load HTML Page (including turbo-frame)

  • index slim file above with turbo_frame_tag and turbo_stream_from

2. After page loads, turbo-frame request controller which queues the expensive job

  • iterate through each turbo_frame_tag and makes request to the list_item controller and enqueues the worker

3. Job updates original HTML page via turbo-stream
ListItemStatsWorker broadcasts the updated html page via turbo_stream

Sorry, should’ve looked closer at your code examples. I’ve never run into this race condition. Are you running both Rails and ActionCable as the same or separate servers?

I’ve never run into this race condition.

I’ve run into it a few times now and I’ve always punted, but at this point I’m sort of determined to figure it out :rofl:

Are you running both Rails and ActionCable as the same or separate servers?
Yes they are both running on the same heroku server

I’ve always run them as separate services. I might suggest posting this issue to the hotwire/turbo github issues. It seems you might benefit from a way to detect when a specific turbo-stream has successfully connected.

One workaround (hack) might be to load the turbo-frame with a blank src, and then use a stimulus controller and update the URL once “all” the streams are connected.

[...Turbo.session.streamObserver.sources].every((source) => source.isConnected)

One workaround (hack) might be to load the turbo-frame with a blank src, and then use a stimulus controller and update the URL once “all” the streams are connected.

That’s an interesting idea indeed. I’ll see if I can wire something like that up.

I might suggest posting this issue to the hotwire/turbo github issues. It seems you might benefit from a way to detect when a specific turbo-stream has successfully connected.

I’ll re-post there as well - I was reading that it might be better to post here first :smiley:


Also I started working on some potential server side code to detect if a specific channel has a connection. I was thinking maybe I can delay the broadcast for a set period of time until the connect has been made, then potentially ignore if after a period passes.

Here’s what I came up with on the server side that could go into the worker…

     streamables = [@campaign.user, "campaign_list_items"]
     signed_stream_name = Turbo::StreamsChannel.signed_stream_name(streamables)
     verified_stream_name = Turbo::StreamsChannel.verified_stream_name(signed_stream_name)
     redis = ActionCable.server.pubsub.redis_connection_for_subscriptions

     while within_time_window do # haven't spent to much time on the loop here (this is pseudo code)
       # redis.pubsub("channels", verified_stream_name).present? is a valid check that returns true once the connection is made on the client. 
       if redis.pubsub("channels", verified_stream_name).present?
         campaign.user.broadcast_update_to(
           campaign.user, 
           :campaign_list_items, 
           target: "campaign_#{campaign.id}", 
           partial: "some_partial")
       else 
         redo
       end
     end

Update
I’ve created an issue Make turbo_frame_tag requests after all turbo_stream_froms have been connected · Issue #777 · hotwired/turbo · GitHub

I’ve been messing with this a bit and it looks like the “hack” you suggested may not work.

What I’m getting is a dom element and the return value from isConnected doesn’t seem to be related to TurboStream at all.

A boolean value that is true if the node is connected to its relevant context object, and false if not.

let test = document.createElement('p');
console.log(test.isConnected); // Returns false
document.body.appendChild(test);
console.log(test.isConnected); // Returns true

Looks like it just tells me that the element has been added to the connected to the Dom not that the turbo_stream web socket connection was made.

Back to the drawing board :pensive:

Another possible idea:

# index page (slim)
= turbo_stream_from current_user, :campaign_list_items, id: 'turbo_stream-current_user`
document.getElementById('turbo_stream-current_user').channel.state === 'connected'

Just an update here…

element.channel.state

This new suggestion seems to just return undefined.

element.channel 

This returns the following

Which makes me think that state is not an attribute.

At this point the only solution that’s seems to not have broken is the following

import { Controller } from "@hotwired/stimulus";

export default class extends Controller {
  static targets = ["delayedTurboFrameTag"];

  connect() {
    setTimeout(() => {
      this.delayedTurboFrameTagTargets.forEach((target) => {
        target.src = target.dataset.delayedSrc;
      });
    }, 250);
  }
}
<div data-controller="delayed-stream">
  <%= turbo_stream_from current_user, :campaign_list_items %>

  <% @campaigns.each do |campaign| %>
    <%= turbo_frame_tag dom_id(campaign),
      data: {
        delayed_stream_target: 'delayedTurboFrameTag',
        delayed_src: campaign_list_items_path(campaign.id)
      } do %>
        <div> - </div>
    <% end %>
  <% end %>
</div>

But I can’t imagine this actually being consistant

I wish there was a way to create a consumer/subscription manually with Hotwire and use the connected event there.

I also saw this connectedCallback. But I’m unsure how to hook into this.

:thinking:

Sorry, I’m using a slightly different turbo stream library which includes state. For the native Turbo, you could try.

What about something like:

import ApplicationController from 'controllers/application_controller';

export default class extends ApplicationController {
  connect() {
    this.dispatchConnectedTimeout();
  }

  disconnect() {
    this.dispatch('disconnected');
    this.dispatch(`disconnected:${this.element.id}`);
  }

  dispatchConnectedTimeout(timeout = 250) {
    if( Turbo.session.streamObserver.streamSourceIsConnected(this.element) ) {
      this.dispatch('connected');
      this.dispatch(`connected:${this.element.id}`);
    } else {
      setTimeout(() => {
        this.dispatchConnectedTimeout(timeout + 250);
      }, timeout);
    }
  }
}

then

# index page (slim)
= turbo_stream_from current_user, :campaign_list_items, id: 'current_user_campaign_list_items_stream', data: { controller: 'turbo-stream-events' }

- @campaigns.each do |campaign|
  = turbo_frame_tag dom_id(campaign), data: {turbo: true, controller: 'turbo-frame', action: 'turbo-stream-events:connected:current_user_campaign_list_items_stream@window->turbo-frame#src', turbo_frame_src_value: campaign_list_items_path(campaign)} do
    .grid.grid-cols-7.p-3.bt
      .left.w-6.word-wrap = link_to campaign.name, campaign_path(campaign), data: {"turbo-frame": "_top"}
      .right -
      .right -
      .right -
      .right -
      .right -
      .right -```

This is only for when the HTML component connects to the DOM, not when the stream connects to the websocket.

I believe turbo saves the state of the socket to the HTMLElement.streamSource. So this might also work.

document.getElementById('turbo_stream-current_user').streamSource.readyState === WebSocket.OPEN

Noice! I’ll have a look when I’ve got time. Fighting some other fires at the moment and will report back asap.

Thanks again @tleish for all the help here :smiley:

I tried to look into your most recent suggestion

But it looks like calling streamSource on my side just results in an undefined\

console.log(this.streamTarget.streamSource);

I also tried your other suggestion here but the unfortunate part is that I’m always getting a return of true.

Back to the drawing board.

My next thing I’m going to try is to see if I can manually open a connection like I used to with ActionCable directly using createConsumer and it’s connected callback. :thinking:

I decided to give the server implementation one more chance and I think I found a possible solution that I don’t hate that should be quite easy to reuse for other methods like this.

<%= turbo_stream_from [current_user, :campaign_list_items] %>

<% @campaigns.each do |campaign| %>
  <%= turbo_frame_tag dom_id(campaign), src: campaign_list_items_path(campaign.id) do %>
    <div class="grid grid-cols-7 p-3 bt"> </div>
  <% end %>
</div>
class RetriableBroadcaster
  def initialize(broadcast_method, *streamables, **options)
    @broadcast_method = broadcast_method
    @streamables = streamables
    @options = options
  end

  def call(attempts = 0)
    return broadcast if client_connected?
    return unless attempts <= max_retries

    attempts += 1
    sleep wait_for
    call(attempts)
  end

  private

  def broadcast
    @streamables.first.public_send(
      @broadcast_method,
      *@streamables,
      **@options.except(:max_retries, :wait_for)
    )
  end

  def wait_for
    @options[:wait_for].presence || 0.1
  end

  def max_retries
    @options[:max_retries].presence || 5
  end

  def client_connected?
    redis.pubsub("channels", verified_stream_name).present?
  end

  def redis
    @redis ||= ActionCable.server.pubsub.redis_connection_for_subscriptions
  end

  def signed_stream_name
    Turbo::StreamsChannel.signed_stream_name(@streamables)
  end

  def verified_stream_name
    Turbo::StreamsChannel.verified_stream_name(signed_stream_name)
  end
end
module Campaigns
  class ListItemStatsWorker
    include Sidekiq::Worker
    sidekiq_options queue: :within_30_seconds, retry: 0

    def perform(campaign_id)
      return unless (campaign = Campaign.find(campaign_id))

      RetriableBroadcaster.new(
        :broadcast_update_to,
        campaign.user,
        :campaign_list_items,
        target: "campaign_#{campaign.id}",
        partial: "dashboard/campaigns/list_items/list_item",
        locals: {
          campaign: campaign, stats: CampaignStatsPresenter.new(campaign, nil, nil)
        },
        max_retries: 5
      ).call
    end
  end
end

Ideally we could build this into source and provide a signature more like

user.broadcast_update_to(
  campaign.user, 
  :campaign_list_items, 
  target: "target_name", 
  partial: "partial_path", 
  max_retries: 5
)

I cloned a repo that uses the default turbo channel connection library and inspected the <turbo-cable-stream-source element in chrome. Doing so reveals all sorts of properties you can use to check state.

document.getElementById('ID').subscription.consumer.connection.isOpen()

document.getElementById('ID').subscription.consumer.connection.isActive()

document.getElementById('ID').subscription.consumer.connection.getState() == 'open'

document.getElementById('ID').subscription.consumer.connection.disconnected === false

document.getElementById('ID').subscription.consumer.connection.webSocket.readyState === WebSocket.OPEN

Super cool! I’ll take a look at those when I have a moment. Thanks for the continued digging!

My revised stimulus controller

import { Controller } from "@hotwired/stimulus"

export default class extends Controller {
  connect() {
    this.dispatchState();
    this.webSocket.addEventListener('open', this.dispatchState.bind(this));
    this.webSocket.addEventListener('close', this.dispatchState.bind(this));
  }

  disconnect() {
    this.webSocket.removeEventListener('open', this.dispatchState);
    this.webSocket.removeEventListener('close', this.dispatchState);
  }

  // events: connecting|open|closing|closed
  // see: https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/readyState
  dispatchState() {
    const state = this.connection.getState();
    console.debug('dispatch', state);
    console.debug('dispatch', `${state}:${this.element.id}`);
    this.dispatch(state);
    this.dispatch(`${state}:${this.element.id}`);
  }

  get connection() {
    return this.element.subscription.consumer.connection;
  }

  get webSocket() {
    return this.connection.webSocket;
  }
}
# index page (slim)
= turbo_stream_from current_user, :campaign_list_items, id: 'current_user_campaign_list_items_stream', data: { controller: 'actioncable-events' }

- @campaigns.each do |campaign|
  = turbo_frame_tag dom_id(campaign), data: {turbo: true, controller: 'turbo-frame', action: 'actioncable-events:open:current_user_campaign_list_items_stream@window->turbo-frame#src', turbo_frame_src_value: campaign_list_items_path(campaign)} do
    .grid.grid-cols-7.p-3.bt
      .left.w-6.word-wrap = link_to campaign.name, campaign_path(campaign), data: {"turbo-frame": "_top"}
      .right -
      .right -
      .right -
      .right -
      .right -
      .right -```
1 Like