This class embeds the basic handling of computations that need to follow the dataflow
It provides an interface that allows to propagate information along the dataflow
Requirements on the information type:
empty?
merge(value)
Subclasses need to redefine the following methods (go to the method documentation for more information)
#initial_information(task)
#required_information(tasks)
#triggering_inputs(task)
#propagate_task(task)
Register information about the given task’s port.
If some information is already available, merge the new info
object with what exists. Use #set_port_info
to reset the current information with the new object.
# File lib/orocos/roby/dataflow_computation.rb, line 293 def add_port_info(task, port_name, info) if done_ports[task].include?(port_name) debug do debug "done_port_info(#{task}, #{port_name}) called at" @done_at[[task, port_name]].each do |line| debug " #{line}" end break end raise ArgumentError, "trying to change port information for #{task}.#{port_name} after done_port_info has been called" end if !has_information_for_port?(task, port_name) @changed = true @result[task][port_name] = info else begin @changed = @result[task][port_name].merge(info) rescue Exception => e raise e, "while adding information to port #{port_name} on #{task}, #{e.message}", e.backtrace end end end
Maps the tasks stored in the dataflow dynamics information to the ones that
merge_solver is pointing to
# File lib/orocos/roby/dataflow_computation.rb, line 434 def apply_merges(merge_solver) @result = result.map_key do |task, _| merge_solver.replacement_for(task) end @missing_ports = missing_ports.map_key do |task, _| merge_solver.replacement_for(task) end @done_ports = done_ports.map_key do |task, _| merge_solver.replacement_for(task) end end
Called when all information on task.port_name has
been added
# File lib/orocos/roby/dataflow_computation.rb, line 330 def done_port_info(task, port_name) debug do debug "done computing information for #{task}.#{port_name}" log_nest(4) do if has_information_for_port?(task, port_name) log_pp(:debug, port_info(task, port_name)) else debug "no stored information" end end @done_at ||= Hash.new @done_at[[task, port_name]] = caller break end if !done_ports[task].include?(port_name) @changed = true if has_information_for_port?(task, port_name) if port_info(task, port_name).empty? remove_port_info(task, port_name) end end done_ports[task] << port_name if missing_ports.has_key?(task) missing_ports[task].delete(port_name) if missing_ports[task].empty? missing_ports.delete(task) end end end end
# File lib/orocos/roby/dataflow_computation.rb, line 40 def has_final_information_for_port?(task, port_name) done_ports.has_key?(task) && done_ports[task].include?(port_name) && has_information_for_port?(task, port_name) end
# File lib/orocos/roby/dataflow_computation.rb, line 35 def has_information_for_port?(task, port_name) result.has_key?(task) && result[task].has_key?(port_name) end
Registers information about task that is independent of the
connection graph, to seed the algorithm
The information must be added using #add_port_info
# File lib/orocos/roby/dataflow_computation.rb, line 368 def initial_information(task) raise NotImplementedError end
# File lib/orocos/roby/dataflow_computation.rb, line 46 def port_info(task, port_name) if result.has_key?(task) if result[task].has_key?(port_name) return result[task][port_name] end end if port_name raise ArgumentError, "no information currently available for #{task.orocos_name}.#{port_name}" else raise ArgumentError, "no information currently available for #{task.orocos_name}" end end
# File lib/orocos/roby/dataflow_computation.rb, line 140 def propagate(tasks) # Get the periods from the activities themselves directly (i.e. # not taking into account the port-driven behaviour) # # We also precompute relevant connections, as they won't change # during the computation @result = Hash.new { |h, k| h[k] = Hash.new } # Internal variable that is used to detect whether an iteration # added information @changed = false @done_ports = Hash.new { |h, k| h[k] = Set.new } @triggering_connections = Hash.new { |h, k| h[k] = Hash.new } @triggering_dependencies = Hash.new { |h, k| h[k] = ValueSet.new } debug do debug "#{self.class}: computing on #{tasks.size} tasks" tasks.each do |t| debug " #{t}" end break end # Compute the set of ports for which information is required. # This is called before #initial_information, so that # #initial_information can add the required information if it is # available @missing_ports = required_information(tasks) if !@missing_ports.kind_of?(Hash) raise ArgumentError, "#required_information is supposed to return a Hash, but returned #{@missing_ports}" end debug "" debug "== Gathering Initial Information" tasks.each do |task| debug { "computing initial information for #{task}" } log_nest(4) do initial_information(task) if connections = triggering_port_connections(task) triggering_connections[task] = connections triggering_dependencies[task] = connections.map do |port_name, triggers| triggers.ports.map(&:first) end debug do debug "#{connections.size} triggering connections for #{task}" connections.each do |port_name, info| debug " for #{port_name}" log_nest(8) do log_pp :debug, info end end break end end end end debug "" debug "== Propagation" remaining_tasks = tasks.dup while !missing_ports.empty? remaining_tasks = remaining_tasks. sort_by { |t| triggering_dependencies[t].size } @changed = false remaining_tasks.delete_if do |task| triggering_connections[task].delete_if do |port_name, triggers| next if has_final_information_for_port?(task, port_name) to_propagate, complete = triggers.ports_to_propagate(self) debug do if to_propagate.empty? debug { "nothing to propagate to #{task}.#{port_name}" } debug { " complete: #{complete}" } else debug { "propagating information to #{task}.#{port_name}" } debug { " complete: #{complete}" } to_propagate.each do |info| debug " #{info.compact.join(".")}" end end break end to_propagate.each do |info| begin add_port_info(task, port_name, port_info(*info)) rescue Exception => e raise e, "while propagating information from port #{info} to #{port_name} on #{task}, #{e.message}", e.backtrace end end if complete done_port_info(task, port_name) true else false end end propagate_task(task) end if !@changed break end end if !missing_ports.empty? debug do debug "found fixed point, breaking out of propagation loop with #{missing_ports.size} missing ports" debug "removing partial port information" break end result.delete_if do |task, port_info| port_info.delete_if do |port, info| if info.empty? debug do debug " #{task}.#{port} (empty)" break end true elsif !has_final_information_for_port?(task, port) debug do debug " #{task}.#{port} (not finalized)" break end true end end port_info.empty? end else debug "done computing all required port information" end result end
Propagate information on task. Returns true if all information
that can be computed has been (i.e. if calling #propagate_task
on the same task again will never add new information)
# File lib/orocos/roby/dataflow_computation.rb, line 428 def propagate_task(task) raise NotImplementedError end
Deletes all available information about the specified port
# File lib/orocos/roby/dataflow_computation.rb, line 318 def remove_port_info(task, port_name) if !@result.has_key?(task) return end task_info = @result[task] task_info.delete(port_name) if task_info.empty? @result.delete(task) end end
Returns the set of objects for which information is required as an output of the algorithm
The returned value is a map:
task => ports
Where ports is the set of port names that are required on
task. nil can be used to denote the task itself.
# File lib/orocos/roby/dataflow_computation.rb, line 421 def required_information(tasks) raise NotImplementedError end
# File lib/orocos/roby/dataflow_computation.rb, line 280 def set_port_info(task, port_name, info) if !has_information_for_port?(task, port_name) add_port_info(task, port_name, info) else @result[task][port_name] = info end end
Returns the list of input ports in task that should trigger a
recomputation of the information for task
# File lib/orocos/roby/dataflow_computation.rb, line 408 def triggering_inputs(task) raise NotImplementedError end
Returns the list of ports whose information can be propagated to a port in
task
The returned value is a hash of the form
port_name => [Set([other_task, other_port_name]), boolean]
where port_name is a port in task and the set is
a set of ports whose information can be propagated to add information on
port_name.
If the boolean is false, the information will be propagated only if all the listed ports have information. Otherwise, it will be as soon as one has some information
The default implementation calls a method triggering_inputs
that simply returns a list of ports in task whose connections
are triggering.
# File lib/orocos/roby/dataflow_computation.rb, line 390 def triggering_port_connections(task) result = Hash.new connections = Set.new triggering_inputs(task).each do |port| task.each_concrete_input_connection(port.name) do |from_task, from_port, to_port, _| connections << [from_task, from_port] end if !connections.empty? result[port.name] = Trigger.new(connections, Trigger::USE_ALL) connections = Set.new end end result end