Connection management at runtime
# File lib/orocos/roby/runtime_connection_management.rb, line 9 def initialize(plan) @plan = plan end
# File lib/orocos/roby/runtime_connection_management.rb, line 13 def self.update(plan, dry_run = false) manager = RuntimeConnectionManagement.new(plan) manager.dry_run = dry_run manager.update end
Apply all connection changes on the system. The principle is to use a transaction-based approach: i.e. either we apply everything or nothing.
See #compute_connection_changes
for the format of new and removed
Returns a false value if it could not apply the changes and a true value otherwise.
# File lib/orocos/roby/runtime_connection_management.rb, line 173 def apply_connection_changes(new, removed) restart_tasks = ValueSet.new # Don't do anything if some of the connection changes are # between static ports and the relevant tasks are running # # Moreover, we check that the tasks are ready to be connected. # We do it only for the new set, as the removed connections are # obviously between tasks that can be connected ;-) new.each do |(source, sink), mappings| if !dry_run? if !sink.setup? || !source.setup? Engine.debug do Engine.debug "cannot modify connections from #{source}, either one is not yet set up" Engine.debug " to #{sink}" Engine.debug " source.executable?: #{source.executable?}" Engine.debug " source.ready_for_setup?: #{source.ready_for_setup?}" Engine.debug " source.setup?: #{source.setup?}" Engine.debug " sink.executable?: #{sink.executable?}" Engine.debug " sink.ready_for_setup?: #{sink.ready_for_setup?}" Engine.debug " sink.setup?: #{sink.setup?}" break end throw :cancelled end end update_restart_set(restart_tasks, source, sink, mappings.keys) end restart_task_proxies = ValueSet.new removed.each do |(source, sink), mappings| update_restart_set(restart_task_proxies, source, sink, mappings) end restart_task_proxies.each do |corba_handle| klass = Roby.app.orocos_tasks[corba_handle.model.name] task = plan.find_tasks(klass).running. find { |t| t.orocos_name == corba_handle.name } if task restart_tasks << task end end if !restart_tasks.empty? new_tasks = Array.new all_stopped = Roby::AndGenerator.new restart_tasks.each do |task| Engine.debug { "restarting #{task}" } replacement = plan.recreate(task) Engine.debug { " replaced by #{replacement}" } new_tasks << replacement all_stopped << task.stop_event end new_tasks.each do |new_task| all_stopped.add_causal_link new_task.start_event end throw :cancelled, all_stopped end # Remove connections first removed.each do |(source_task, sink_task), mappings| mappings.each do |source_port, sink_port| Engine.debug do Engine.debug "disconnecting #{source_task}:#{source_port}" Engine.debug " => #{sink_task}:#{sink_port}" break end source = source_task.port(source_port, false) sink = sink_task.port(sink_port, false) begin if !source.disconnect_from(sink) Engine.warn "while disconnecting #{source_task}:#{source_port} => #{sink_task}:#{sink_port} returned false" Engine.warn "I assume that the ports are disconnected, but this should not have happened" end rescue Orocos::NotFound => e Engine.warn "error while disconnecting #{source_task}:#{source_port} => #{sink_task}:#{sink_port}: #{e.message}" Engine.warn "I am assuming that the disconnection is actually effective, since one port does not exist anymore" rescue CORBA::ComError => e Engine.warn "CORBA error while disconnecting #{source_task}:#{source_port} => #{sink_task}:#{sink_port}: #{e.message}" Engine.warn "I am assuming that the source component is dead and that therefore the connection is actually effective" end ActualDataFlow.remove_connections(source_task, sink_task, [[source_port, sink_port]]) # The following test is meant to make sure that we # cleanup input ports after crashes. CORBA connections # will properly cleanup the output port-to-corba part # automatically, but never the corba-to-input port # # It will break code that connects to input ports # externally. This is not a common case however. # begin # if !ActualDataFlow.has_in_connections?(sink_task, sink_port) # Engine.debug { "calling #disconnect_all on the input port #{sink_task.name}:#{sink_port} since it has no input connections anymore" } # sink.disconnect_all # end # rescue Orocos::NotFound # rescue CORBA::ComError # end end end # And create the new ones pending_tasks = ValueSet.new new.each do |(from_task, to_task), mappings| # The task might have been killed while the connections # were already added to the data flow graph. Roby's GC will # deal with that. Ignore. next if !from_task.orogen_task next if !to_task.orogen_task mappings.each do |(from_port, to_port), policy| Engine.debug do Engine.debug "connecting #{from_task}:#{from_port}" Engine.debug " => #{to_task}:#{to_port}" Engine.debug " with policy #{policy}" break end begin policy, _ = Kernel.filter_options(policy, Orocos::Port::CONNECTION_POLICY_OPTIONS) from_task.orogen_task.port(from_port).connect_to(to_task.orogen_task.port(to_port), policy) ActualDataFlow.add_connections(from_task.orogen_task, to_task.orogen_task, [from_port, to_port] => policy) rescue CORBA::ComError # The task will be aborted. Simply ignore rescue Orocos::InterfaceObjectNotFound => e if e.task == from_task.orogen_task && e.name == from_port plan.engine.add_error(PortNotFound.new(from_task, from_port, :output)) else plan.engine.add_error(PortNotFound.new(to_task, to_port, :input)) end end end if !to_task.executable? pending_tasks << to_task end end # Tasks' executable flag is forcefully set to false until (1) # they are configured and (2) all static inputs are connected # # Check tasks for which we created an input. If they are not # executable and all_inputs_connected? returns true, set their # executable flag to nil Engine.debug do Engine.debug "#{pending_tasks.size} pending tasks" pending_tasks.each do |t| Engine.debug " #{t}: all_inputs_connected=#{t.all_inputs_connected?} executable=#{t.executable?}" end break end pending_tasks.each do |t| if t.all_inputs_connected? t.executable = nil Engine.debug { "#{t} has all its inputs connected, set executable to nil and executable? = #{t.executable?}" } end end true end
Computes the connection changes that are required to make the required connections (declared in the DataFlow relation) match the actual ones (on the underlying modules)
It returns nil if the change can’t be computed because the Roby tasks are not tied to an underlying RTT task context.
Returns [new, removed] where
new = { [from_task, to_task] => { [from_port, to_port] => policy, ... }, ... }
in which from_task and to_task are instances of
Orocos::RobyPlugin::TaskContext (i.e. Roby tasks), from_port and
to_port are the port names (i.e. strings) and policy the
policy hash that Orocos::OutputPort#connect_to
expects.
removed = { [from_task, to_task] => { [from_port, to_port], ... }, ... }
in which from_task and to_task are instances of
Orocos::TaskContext (i.e. the underlying
RTT tasks). from_port and to_port are the names
of the ports that have to be disconnected (i.e. strings)
# File lib/orocos/roby/runtime_connection_management.rb, line 68 def compute_connection_changes(tasks) if dry_run? return [], [] end not_running = tasks.find_all { |t| !t.orogen_task } if !not_running.empty? Engine.debug do Engine.debug "not computing connections because the deployment of the following tasks is not yet ready" tasks.each do |t| Engine.debug " #{t}" end break end return end update_required_dataflow_graph(tasks) new_edges, removed_edges, updated_edges = RequiredDataFlow.difference(ActualDataFlow, tasks, &:orogen_task) new = Hash.new new_edges.each do |source_task, sink_task| new[[source_task, sink_task]] = source_task[sink_task, RequiredDataFlow] end removed = Hash.new removed_edges.each do |source_task, sink_task| removed[[source_task, sink_task]] = source_task[sink_task, ActualDataFlow].keys.to_set end # We have to work on +updated+. The graphs are between tasks, # not between ports because of how ports are handled on both the # orocos.rb and Roby sides. So we must convert the updated # mappings into add/remove pairs. Moreover, to update a # connection policy we need to disconnect and reconnect anyway. # # Note that it is fine from a performance point of view, as in # most cases one removes all connections from two components to # recreate other ones between other components updated_edges.each do |source_task, sink_task| new_mapping = source_task[sink_task, RequiredDataFlow] old_mapping = source_task.orogen_task[sink_task.orogen_task, ActualDataFlow] new_connections = Hash.new removed_connections = Set.new new_mapping.each do |ports, new_policy| if old_policy = old_mapping[ports] if old_policy != new_policy new_connections[ports] = new_policy removed_connections << ports end else new_connections[ports] = new_policy end end if !new_connections.empty? new[[source_task, sink_task]] = new_connections end if !removed_connections.empty? removed[[source_task.orogen_task, sink_task.orogen_task]] = removed_connections end end return new, removed end
# File lib/orocos/roby/runtime_connection_management.rb, line 343 def update tasks = Flows::DataFlow.modified_tasks tasks.delete_if do |t| t.finished? end # The modifications to +tasks+ might have removed all input # connection. Make sure that in this case, executable? has been # reset to nil # # The normal workflow does not work in this case, as it is only # looking for tasks whose input connections have been modified tasks.each do |t| if t.setup? && !t.executable? && t.plan == plan && t.all_inputs_connected? t.executable = nil end end if !tasks.empty? # If there are some tasks that have been GCed/killed, we still # need to update the connection graph to remove the old # connections. However, we should remove these tasks now as they # should not be passed to compute_connection_changes main_tasks, proxy_tasks = tasks.partition { |t| t.plan == plan } main_tasks = main_tasks.to_value_set if Flows::DataFlow.pending_changes main_tasks.merge(Flows::DataFlow.pending_changes.first) end main_tasks.delete_if { |t| !t.plan || !t.execution_agent || t.execution_agent.ready_to_die? || t.execution_agent.finished? } proxy_tasks.delete_if { |t| !t.plan } Engine.debug do Engine.debug "computing data flow update from modified tasks" for t in main_tasks Engine.debug " #{t}" end break end new, removed = compute_connection_changes(main_tasks) if new Engine.debug do Engine.debug " new connections:" new.each do |(from_task, to_task), mappings| Engine.debug " #{from_task} (#{from_task.running? ? 'running' : 'stopped'}) =>" Engine.debug " #{to_task} (#{to_task.running? ? 'running' : 'stopped'})" mappings.each do |(from_port, to_port), policy| Engine.debug " #{from_port}:#{to_port} #{policy}" end end Engine.debug " removed connections:" Engine.debug " disable debug display because it is unstable in case of process crashes" #removed.each do |(from_task, to_task), mappings| # Engine.info " #{from_task} (#{from_task.running? ? 'running' : 'stopped'}) =>" # Engine.info " #{to_task} (#{to_task.running? ? 'running' : 'stopped'})" # mappings.each do |from_port, to_port| # Engine.info " #{from_port}:#{to_port}" # end #end break end pending_replacement = if Flows::DataFlow.pending_changes Flows::DataFlow.pending_changes[3] end Flows::DataFlow.pending_changes = [main_tasks, new, removed, pending_replacement] Flows::DataFlow.modified_tasks.clear Flows::DataFlow.modified_tasks.merge(proxy_tasks.to_value_set) else Engine.debug "cannot compute changes, keeping the tasks queued" end end if Flows::DataFlow.pending_changes _, new, removed, pending_replacement = Flows::DataFlow.pending_changes if pending_replacement && !pending_replacement.happened? && !pending_replacement.unreachable? Engine.debug "waiting for replaced tasks to stop" else if pending_replacement Engine.debug "successfully started replaced tasks, now applying pending changes" pending_replacement.clear_vertex plan.unmark_permanent(pending_replacement) end pending_replacement = catch :cancelled do Engine.debug "applying pending changes from the data flow graph" apply_connection_changes(new, removed) Flows::DataFlow.pending_changes = nil end if !Flows::DataFlow.pending_changes Engine.debug "successfully applied pending changes" elsif pending_replacement Engine.debug "waiting for replaced tasks to stop" plan.add_permanent(pending_replacement) Flows::DataFlow.pending_changes[3] = pending_replacement else Engine.debug "failed to apply pending changes" end end end end
Updates an intermediate graph (RobyPlugin::RequiredDataFlow) where we store the concrete connections. We don’t try to be smart: remove all tasks that have to be updated and add their connections again
# File lib/orocos/roby/runtime_connection_management.rb, line 23 def update_required_dataflow_graph(tasks) seen = ValueSet.new # Remove first all tasks. Otherwise, removing some tasks will # also remove the new edges we just added for t in tasks RequiredDataFlow.remove(t) end # Create the new connections for t in tasks t.each_concrete_input_connection do |source_task, source_port, sink_port, policy| next if seen.include?(source_task) RequiredDataFlow.add_connections(source_task, t, [source_port, sink_port] => policy) end t.each_concrete_output_connection do |source_port, sink_port, sink_task, policy| next if seen.include?(sink_task) RequiredDataFlow.add_connections(t, sink_task, [source_port, sink_port] => policy) end seen << t end end
Adds source_task (resp. sink_task) to set if modifying
connection specified in mappings will require source_task
(resp. sink_task) to be restarted.
Restart is required by having the task’s input ports marked as ‘static’ in the oroGen specification
# File lib/orocos/roby/runtime_connection_management.rb, line 142 def update_restart_set(set, source_task, sink_task, mappings) if !set.include?(source_task) needs_restart = mappings.any? do |source_port, sink_port| source_task.running? && source_task.output_port_model(source_port).static? end if needs_restart set << source_task end end if !set.include?(sink_task) needs_restart = mappings.any? do |source_port, sink_port| sink_task.running? && sink_task.input_port_model(sink_port).static? end if needs_restart set << sink_task end end set end