class Orocos::RobyPlugin::DataFlowComputation

This class embeds the basic handling of computations that need to follow the dataflow

It provides an interface that allows to propagate information along the dataflow

Requirements on the information type:

Subclasses need to redefine the following methods (go to the method documentation for more information)

Attributes

done_ports[R]
missing_ports[R]
result[R]
triggering_connections[R]
triggering_dependencies[R]

Public Instance Methods

add_port_info(task, port_name, info) click to toggle source

Register information about the given task’s port.

If some information is already available, merge the new info object with what exists. Use #set_port_info to reset the current information with the new object.

# File lib/orocos/roby/dataflow_computation.rb, line 293
def add_port_info(task, port_name, info)
    if done_ports[task].include?(port_name)
        debug do
            debug "done_port_info(#{task}, #{port_name}) called at"
            @done_at[[task, port_name]].each do |line|
                debug "  #{line}"
            end
            break
        end
        raise ArgumentError, "trying to change port information for #{task}.#{port_name} after done_port_info has been called"
    end

    if !has_information_for_port?(task, port_name)
        @changed = true
        @result[task][port_name] = info
    else
        begin
            @changed = @result[task][port_name].merge(info)
        rescue Exception => e
            raise e, "while adding information to port #{port_name} on #{task}, #{e.message}", e.backtrace
        end
    end
end
apply_merges(merge_solver) click to toggle source

Maps the tasks stored in the dataflow dynamics information to the ones that merge_solver is pointing to

# File lib/orocos/roby/dataflow_computation.rb, line 434
def apply_merges(merge_solver)
    @result = result.map_key do |task, _|
        merge_solver.replacement_for(task)
    end
    @missing_ports = missing_ports.map_key do |task, _|
        merge_solver.replacement_for(task)
    end
    @done_ports = done_ports.map_key do |task, _|
        merge_solver.replacement_for(task)
    end
end
done_port_info(task, port_name) click to toggle source

Called when all information on task.port_name has been added

# File lib/orocos/roby/dataflow_computation.rb, line 330
def done_port_info(task, port_name)
    debug do
        debug "done computing information for #{task}.#{port_name}"
        log_nest(4) do
            if has_information_for_port?(task, port_name)
                log_pp(:debug, port_info(task, port_name))
            else
                debug "no stored information"
            end
        end
        @done_at ||= Hash.new
        @done_at[[task, port_name]] = caller
        break
    end

    if !done_ports[task].include?(port_name)
        @changed = true

        if has_information_for_port?(task, port_name)
            if port_info(task, port_name).empty?
                remove_port_info(task, port_name)
            end
        end

        done_ports[task] << port_name
        if missing_ports.has_key?(task)
            missing_ports[task].delete(port_name)
            if missing_ports[task].empty?
                missing_ports.delete(task)
            end
        end
    end
end
has_final_information_for_port?(task, port_name) click to toggle source
# File lib/orocos/roby/dataflow_computation.rb, line 40
def has_final_information_for_port?(task, port_name)
    done_ports.has_key?(task) &&
        done_ports[task].include?(port_name) &&
        has_information_for_port?(task, port_name)
end
has_information_for_port?(task, port_name) click to toggle source
# File lib/orocos/roby/dataflow_computation.rb, line 35
def has_information_for_port?(task, port_name)
    result.has_key?(task) &&
        result[task].has_key?(port_name)
end
initial_information(task) click to toggle source

Registers information about task that is independent of the connection graph, to seed the algorithm

The information must be added using #add_port_info

# File lib/orocos/roby/dataflow_computation.rb, line 368
def initial_information(task)
    raise NotImplementedError
end
port_info(task, port_name) click to toggle source
# File lib/orocos/roby/dataflow_computation.rb, line 46
def port_info(task, port_name)
    if result.has_key?(task)
        if result[task].has_key?(port_name)
            return result[task][port_name]
        end
    end
    if port_name
        raise ArgumentError, "no information currently available for #{task.orocos_name}.#{port_name}"
    else
        raise ArgumentError, "no information currently available for #{task.orocos_name}"
    end
end
propagate(tasks) click to toggle source
# File lib/orocos/roby/dataflow_computation.rb, line 140
def propagate(tasks)
    # Get the periods from the activities themselves directly (i.e.
    # not taking into account the port-driven behaviour)
    #
    # We also precompute relevant connections, as they won't change
    # during the computation
    @result = Hash.new { |h, k| h[k] = Hash.new }
    # Internal variable that is used to detect whether an iteration
    # added information
    @changed = false
    @done_ports = Hash.new { |h, k| h[k] = Set.new }
    @triggering_connections  = Hash.new { |h, k| h[k] = Hash.new }
    @triggering_dependencies = Hash.new { |h, k| h[k] = ValueSet.new }

    debug do
        debug "#{self.class}: computing on #{tasks.size} tasks"
        tasks.each do |t|
            debug "  #{t}"
        end
        break
    end

    # Compute the set of ports for which information is required.
    # This is called before #initial_information, so that
    # #initial_information can add the required information if it is
    # available
    @missing_ports = required_information(tasks)
    if !@missing_ports.kind_of?(Hash)
        raise ArgumentError, "#required_information is supposed to return a Hash, but returned #{@missing_ports}"
    end

    debug ""
    debug "== Gathering Initial Information"
    tasks.each do |task|
        debug { "computing initial information for #{task}" }

        log_nest(4) do
            initial_information(task)
            if connections = triggering_port_connections(task)
                triggering_connections[task] = connections
                triggering_dependencies[task] = connections.map do |port_name, triggers|
                    triggers.ports.map(&:first)
                end

                debug do
                    debug "#{connections.size} triggering connections for #{task}"
                    connections.each do |port_name, info|
                        debug "    for #{port_name}"
                        log_nest(8) do
                            log_pp :debug, info
                        end
                    end
                    break
                end
            end
        end
    end

    debug ""
    debug "== Propagation"
    remaining_tasks = tasks.dup
    while !missing_ports.empty?
        remaining_tasks = remaining_tasks.
            sort_by { |t| triggering_dependencies[t].size }

        @changed = false
        remaining_tasks.delete_if do |task|
            triggering_connections[task].delete_if do |port_name, triggers|
                next if has_final_information_for_port?(task, port_name)

                to_propagate, complete = triggers.ports_to_propagate(self)
                debug do
                    if to_propagate.empty?
                        debug { "nothing to propagate to #{task}.#{port_name}" }
                        debug { "    complete: #{complete}" }
                    else
                        debug { "propagating information to #{task}.#{port_name}" }
                        debug { "    complete: #{complete}" }
                        to_propagate.each do |info|
                            debug "    #{info.compact.join(".")}"
                        end
                    end
                    break
                end

                to_propagate.each do |info|
                    begin
                        add_port_info(task, port_name, port_info(*info))
                    rescue Exception => e
                        raise e, "while propagating information from port #{info} to #{port_name} on #{task}, #{e.message}", e.backtrace
                    end
                end
                if complete
                    done_port_info(task, port_name)
                    true
                else
                    false
                end
            end

            propagate_task(task)
        end

        if !@changed
            break
        end
    end

    if !missing_ports.empty?
        debug do
            debug "found fixed point, breaking out of propagation loop with #{missing_ports.size} missing ports"
            debug "removing partial port information"
            break
        end
        result.delete_if do |task, port_info|
            port_info.delete_if do |port, info|
                if info.empty?
                    debug do
                        debug "  #{task}.#{port} (empty)"
                        break
                    end
                    true

                elsif !has_final_information_for_port?(task, port)
                    debug do
                        debug "  #{task}.#{port} (not finalized)"
                        break
                    end
                    true
                end
            end
            port_info.empty?
        end
    else
        debug "done computing all required port information"
    end

    result
end
propagate_task(task) click to toggle source

Propagate information on task. Returns true if all information that can be computed has been (i.e. if calling #propagate_task on the same task again will never add new information)

# File lib/orocos/roby/dataflow_computation.rb, line 428
def propagate_task(task)
    raise NotImplementedError
end
remove_port_info(task, port_name) click to toggle source

Deletes all available information about the specified port

# File lib/orocos/roby/dataflow_computation.rb, line 318
def remove_port_info(task, port_name)
    if !@result.has_key?(task)
        return
    end
    task_info = @result[task]
    task_info.delete(port_name)
    if task_info.empty?
        @result.delete(task)
    end
end
required_information(tasks) click to toggle source

Returns the set of objects for which information is required as an output of the algorithm

The returned value is a map:

task => ports

Where ports is the set of port names that are required on task. nil can be used to denote the task itself.

# File lib/orocos/roby/dataflow_computation.rb, line 421
def required_information(tasks)
    raise NotImplementedError
end
set_port_info(task, port_name, info) click to toggle source
# File lib/orocos/roby/dataflow_computation.rb, line 280
def set_port_info(task, port_name, info)
    if !has_information_for_port?(task, port_name)
        add_port_info(task, port_name, info)
    else
        @result[task][port_name] = info
    end
end
triggering_inputs(task) click to toggle source

Returns the list of input ports in task that should trigger a recomputation of the information for task

# File lib/orocos/roby/dataflow_computation.rb, line 408
def triggering_inputs(task)
    raise NotImplementedError
end
triggering_port_connections(task) click to toggle source

Returns the list of ports whose information can be propagated to a port in task

The returned value is a hash of the form

port_name => [Set([other_task, other_port_name]), boolean]

where port_name is a port in task and the set is a set of ports whose information can be propagated to add information on port_name.

If the boolean is false, the information will be propagated only if all the listed ports have information. Otherwise, it will be as soon as one has some information

The default implementation calls a method triggering_inputs that simply returns a list of ports in task whose connections are triggering.

# File lib/orocos/roby/dataflow_computation.rb, line 390
def triggering_port_connections(task)
    result = Hash.new
    connections = Set.new

    triggering_inputs(task).each do |port|
        task.each_concrete_input_connection(port.name) do |from_task, from_port, to_port, _|
            connections << [from_task, from_port]
        end
        if !connections.empty?
            result[port.name] = Trigger.new(connections, Trigger::USE_ALL)
            connections = Set.new
        end
    end
    result
end