Class: Syskit::NetworkGeneration::DataFlowDynamics
- Inherits:
-
DataFlowComputation
- Object
- DataFlowComputation
- Syskit::NetworkGeneration::DataFlowDynamics
- Defined in:
- lib/syskit/network_generation/dataflow_dynamics.rb
Overview
Algorithms that make use of the dataflow modelling
The main task of this class is to compute the update rates and the default
policies for each of the existing connections in plan
. The
resulting information is stored in #dynamics
Instance Attribute Summary collapse
-
#plan ⇒ Object
readonly
Returns the value of attribute plan.
-
#task_from_name ⇒ Object
readonly
Mapping from a deployed task name to the corresponding Roby task object.
-
#triggers ⇒ Object
readonly
Returns the value of attribute triggers.
Attributes inherited from DataFlowComputation
#done_ports, #missing_ports, #result, #triggering_connections, #triggering_dependencies
Class Method Summary collapse
Instance Method Summary collapse
- #add_port_trigger(task, port_name, name, period, burst) ⇒ Object
- #add_task_info(task, info) ⇒ Object
- #add_task_trigger(task, name, period, burst) ⇒ Object
-
#compute_connection_policies ⇒ Object
Computes desired connection policies, based on the port dynamics and the oroGen's input port specifications.
-
#compute_info_for(task, port_name) ⇒ Object
Try to compute the information for the given task and port (or, if port_name is nil, for the task).
- #create_port_info(task, port_name) ⇒ Object
- #done_task_info(task) ⇒ Object
- #find_period_of(task) ⇒ Object
- #has_final_information_for_task?(task) ⇒ Boolean
- #has_information_for_task?(task) ⇒ Boolean
-
#initial_combus_information(task) ⇒ Object
Computes the initial port dynamics due to the devices that go through a communication bus.
-
#initial_device_information(task) ⇒ Object
Adds triggering information from the attached devices to
task
's ports. -
#initial_device_information_common(task, triggering_devices) ⇒ Object
Common external loop for adding initial device information in #initial_device_information.
-
#initial_device_information_internal_triggering(task, triggering_devices) ⇒ Object
Computes the initial port dynamics due to devices when the task gets triggered by the devices it is attached to.
-
#initial_device_information_periodic_triggering(task, triggering_devices, period) ⇒ Object
Computes the initial port dynamics due to devices when the task is triggered periodically.
-
#initial_information(task) ⇒ Object
Computes the initial port dynamics, i.e.
-
#initial_slaves_information(task) ⇒ Object
private
Computes a task's slaves initial information.
-
#initial_task_information(task) ⇒ Object
private
Computes a task's initial information.
-
#initialize(plan) ⇒ DataFlowDynamics
constructor
A new instance of DataFlowDynamics.
-
#policy_for(source_task, source_port_name, sink_port_name, sink_task, policy) ⇒ Object
Given the current knowledge about the port dynamics, returns the policy for the provided connection.
- #propagate_task(task) ⇒ Object
-
#required_information(tasks) ⇒ Object
Returns the set of objects for which information is required as an output of the algorithm.
- #reset(tasks = Array.new) ⇒ Object
- #task_info(task) ⇒ Object
-
#triggering_inputs(task) ⇒ Object
Computes the set of input ports in
task
that are used during the information propagation.
Methods inherited from DataFlowComputation
#add_port_info, #apply_merges, #done_port_info, #has_final_information_for_port?, #has_information_for_port?, #port_info, #propagate, #remove_port_info, #set_port_info, #triggering_port_connections
Constructor Details
#initialize(plan) ⇒ DataFlowDynamics
Returns a new instance of DataFlowDynamics
152 153 154 155 |
# File 'lib/syskit/network_generation/dataflow_dynamics.rb', line 152 def initialize(plan) @plan = plan super() end |
Instance Attribute Details
#plan ⇒ Object (readonly)
Returns the value of attribute plan
143 144 145 |
# File 'lib/syskit/network_generation/dataflow_dynamics.rb', line 143 def plan @plan end |
#task_from_name ⇒ Object (readonly)
Mapping from a deployed task name to the corresponding Roby task object
This is necessary to speedup lookup in some places of the algorithm
150 151 152 |
# File 'lib/syskit/network_generation/dataflow_dynamics.rb', line 150 def task_from_name @task_from_name end |
#triggers ⇒ Object (readonly)
Returns the value of attribute triggers
145 146 147 |
# File 'lib/syskit/network_generation/dataflow_dynamics.rb', line 145 def triggers @triggers end |
Class Method Details
.compute_connection_policies(plan) ⇒ Object
157 158 159 160 161 |
# File 'lib/syskit/network_generation/dataflow_dynamics.rb', line 157 def self.compute_connection_policies(plan) engine = DataFlowDynamics.new(plan) engine.compute_connection_policies engine.result end |
Instance Method Details
#add_port_trigger(task, port_name, name, period, burst) ⇒ Object
211 212 213 214 215 216 217 218 219 |
# File 'lib/syskit/network_generation/dataflow_dynamics.rb', line 211 def add_port_trigger(task, port_name, name, period, burst) if has_information_for_port?(task, port_name) @result[task][port_name].add_trigger(name, period, burst) else info = create_port_info(task, port_name) info.add_trigger(name, period, burst) info end end |
#add_task_info(task, info) ⇒ Object
184 185 186 |
# File 'lib/syskit/network_generation/dataflow_dynamics.rb', line 184 def add_task_info(task, info) add_port_info(task, nil, info) end |
#add_task_trigger(task, name, period, burst) ⇒ Object
180 181 182 |
# File 'lib/syskit/network_generation/dataflow_dynamics.rb', line 180 def add_task_trigger(task, name, period, burst) add_port_trigger(task, nil, name, period, burst) end |
#compute_connection_policies ⇒ Object
Computes desired connection policies, based on the port dynamics and the oroGen's input port specifications. See the user's guide for more details
It updates DataFlow#concrete_connection_policies
497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 |
# File 'lib/syskit/network_generation/dataflow_dynamics.rb', line 497 def compute_connection_policies # We only act on deployed tasks, as we need to know how the # tasks are triggered (what activity / priority / ...) deployed_tasks = plan.find_local_tasks(TaskContext). find_all(&:execution_agent) propagate(deployed_tasks) DataFlowDynamics.debug do DataFlowDynamics.debug "computing connections" deployed_tasks.each do |t| DataFlowDynamics.debug " #{t}" end DataFlowDynamics.debug "available information for" result.each do |task, ports| DataFlowDynamics.debug " #{task}: #{ports.keys.join(", ")}" end break end dataflow_graph = plan.task_relation_graph_for(Flows::DataFlow) connection_graph = dataflow_graph.compute_concrete_connection_graph policy_graph = Flows::DataFlow::ConcreteConnectionGraph.new deployed_tasks.each do |source_task| connection_graph.each_out_neighbour(source_task) do |sink_task| mappings = connection_graph.edge_info(source_task, sink_task) computed_policies = mappings.map_value do |(source_port_name, sink_port_name), policy| policy_for(source_task, source_port_name, sink_port_name, sink_task, policy) end policy_graph.add_edge(source_task, sink_task, computed_policies) end end #dataflow_graph.reset_computed_policies(policy_graph) result end |
#compute_info_for(task, port_name) ⇒ Object
Try to compute the information for the given task and port (or, if port_name is nil, for the task). Returns true if the required information could be computed as requested, and false otherwise.
439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 |
# File 'lib/syskit/network_generation/dataflow_dynamics.rb', line 439 def compute_info_for(task, port_name) triggers = @triggers[[task, port_name]].map do |trigger_task, trigger_port| if has_final_information_for_port?(trigger_task, trigger_port) port_info(trigger_task, trigger_port) else DataFlowDynamics.debug do DataFlowDynamics.debug " missing info on "\ "#{trigger_task}.#{trigger_port} to compute "\ "#{task}.#{port_name}" break end return false end end if (period = find_period_of(task)) triggers = triggers.map do |trigger_info| trigger_info.sampled_at(period) end end triggers.each do |trigger_info| add_port_info(task, port_name, trigger_info) end done_port_info(task, port_name) return true end |
#create_port_info(task, port_name) ⇒ Object
202 203 204 205 206 207 208 209 |
# File 'lib/syskit/network_generation/dataflow_dynamics.rb', line 202 def create_port_info(task, port_name) port_model = task.model.find_port(port_name) dynamics = PortDynamics.new("#{task.orocos_name}.#{port_model.name}", port_model.sample_size) dynamics.add_trigger("burst", port_model.burst_period, port_model.burst_size) set_port_info(task, port_name, dynamics) dynamics end |
#done_task_info(task) ⇒ Object
192 193 194 195 196 197 198 199 200 |
# File 'lib/syskit/network_generation/dataflow_dynamics.rb', line 192 def done_task_info(task) task.orogen_model.slaves.each do |slave_task| if slave_task = task_from_name[slave_task.name] add_task_info(slave_task, task_info(task)) done_task_info(slave_task) end end done_port_info(task, nil) end |
#find_period_of(task) ⇒ Object
426 427 428 429 430 431 432 433 434 |
# File 'lib/syskit/network_generation/dataflow_dynamics.rb', line 426 def find_period_of(task) orogen_model = task.orogen_model while (master = orogen_model.master) orogen_model = master end if orogen_model.activity_type.name == "Periodic" orogen_model.period end end |
#has_final_information_for_task?(task) ⇒ Boolean
176 177 178 |
# File 'lib/syskit/network_generation/dataflow_dynamics.rb', line 176 def has_final_information_for_task?(task) has_final_information_for_port?(task, nil) end |
#has_information_for_task?(task) ⇒ Boolean
172 173 174 |
# File 'lib/syskit/network_generation/dataflow_dynamics.rb', line 172 def has_information_for_task?(task) has_information_for_port?(task, nil) end |
#initial_combus_information(task) ⇒ Object
Computes the initial port dynamics due to the devices that go through a communication bus.
296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 |
# File 'lib/syskit/network_generation/dataflow_dynamics.rb', line 296 def initial_combus_information(task) handled_ports = Set.new task.each_attached_device do |dev| srv = task.find_data_service(dev.name) srv.each_input_port do |port| handled_ports << port.name port = port.to_component_port dynamics = PortDynamics.new("#{task.orocos_name}.#{port.name}", dev.sample_size) if dev.period dynamics.add_trigger(dev.name, dev.period, 1) dynamics.add_trigger(dev.name, dev.period * dev.burst, dev.burst) end add_port_info(task, port.name, dynamics) end end handled_ports.each do |port_name| done_port_info(task, port_name) end end |
#initial_device_information(task) ⇒ Object
Adds triggering information from the attached devices to
task
's ports
222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 |
# File 'lib/syskit/network_generation/dataflow_dynamics.rb', line 222 def initial_device_information(task) triggering_devices = task.model.each_master_driver_service.map do |srv| [srv, task.find_device_attached_to(srv)] end DataFlowDynamics.debug do DataFlowDynamics.debug "initial port dynamics on #{task} (device)" DataFlowDynamics.debug " attached devices: #{triggering_devices.map { |srv, dev| "#{dev.name} on #{srv.name}" }.join(", ")}" break end activity_type = task.orogen_model.activity_type.name case activity_type when "Periodic" initial_device_information_periodic_triggering( task, triggering_devices.to_a, task.orogen_model.period) else initial_device_information_internal_triggering( task, triggering_devices.to_a) end end |
#initial_device_information_common(task, triggering_devices) ⇒ Object
Common external loop for adding initial device information in #initial_device_information. It is used by initial_device_information_periodic_triggering and initial_device_information_internal_triggering
247 248 249 250 251 252 253 254 255 256 257 258 259 260 |
# File 'lib/syskit/network_generation/dataflow_dynamics.rb', line 247 def initial_device_information_common(task, triggering_devices) triggering_devices.each do |service, device| DataFlowDynamics.debug { " #{device.name}: #{device.period} #{device.burst}" } device_dynamics = PortDynamics.new(device.name, 1) if device.period device_dynamics.add_trigger(device.name, device.period, 1) end device_dynamics.add_trigger(device.name + "-burst", 0, device.burst) if !device_dynamics.empty? yield(service, device, device_dynamics) end end end |
#initial_device_information_internal_triggering(task, triggering_devices) ⇒ Object
Computes the initial port dynamics due to devices when the task gets triggered by the devices it is attached to
264 265 266 267 268 269 270 271 272 273 274 275 276 |
# File 'lib/syskit/network_generation/dataflow_dynamics.rb', line 264 def initial_device_information_internal_triggering(task, triggering_devices) DataFlowDynamics.debug " is triggered internally" initial_device_information_common(task, triggering_devices) do |service, device, device_dynamics| add_task_info(task, device_dynamics) service.each_output_port do |out_port| out_port = out_port.to_component_port out_port.orogen_model.triggered_on_update = false add_port_info(task, out_port.name, device_dynamics) done_port_info(task, out_port.name) end end end |
#initial_device_information_periodic_triggering(task, triggering_devices, period) ⇒ Object
Computes the initial port dynamics due to devices when the task is triggered periodically
280 281 282 283 284 285 286 287 288 289 290 291 292 |
# File 'lib/syskit/network_generation/dataflow_dynamics.rb', line 280 def initial_device_information_periodic_triggering(task, triggering_devices, period) DataFlowDynamics.debug { " is triggered with a period of #{period} seconds" } initial_device_information_common(task, triggering_devices) do |service, device, device_dynamics| service.each_output_port do |out_port| out_port = out_port.to_component_port out_port.orogen_model.triggered_on_update = false add_port_trigger(task, out_port.name, device.name, period, device_dynamics.queue_size(period)) done_port_info(task, out_port.name) end end end |
#initial_information(task) ⇒ Object
Computes the initial port dynamics, i.e. the dynamics that can be computed without knowing anything about the dataflow
318 319 320 321 |
# File 'lib/syskit/network_generation/dataflow_dynamics.rb', line 318 def initial_information(task) return if task.orogen_model.master initial_task_information(task) end |
#initial_slaves_information(task) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Computes a task's slaves initial information
326 327 328 329 330 331 332 333 334 |
# File 'lib/syskit/network_generation/dataflow_dynamics.rb', line 326 def initial_slaves_information(task) task.orogen_model.slaves.each do |orogen_slave_task| if slave_task = task_from_name[orogen_slave_task.name] if !has_information_for_task?(slave_task) initial_task_information(slave_task) end end end end |
#initial_task_information(task) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Computes a task's initial information
339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 |
# File 'lib/syskit/network_generation/dataflow_dynamics.rb', line 339 def initial_task_information(task) initial_slaves_information(task) set_port_info(task, nil, PortDynamics.new("#{task.orocos_name}.main")) task.model.each_output_port do |port| create_port_info(task, port.name) end add_task_info(task, task.requirements.dynamics.task) task.requirements.dynamics.ports.each do |port_name, dynamics| add_port_info(task, port_name, dynamics) done_port_info(task, port_name) end if task.kind_of?(Device) initial_device_information(task) end if task.kind_of?(ComBus) initial_combus_information(task) end activity_type = task.orogen_model.activity_type.name if activity_type == "Periodic" DataFlowDynamics.debug { " adding periodic trigger #{task.orogen_model.period} 1" } add_task_trigger(task, "#{task.orocos_name}.main-period", task.orogen_model.period, 1) done_task_info(task) elsif activity_type == "SlaveActivity" # The master's main trigger is propagated in #done_task_info elsif !task.model.each_event_port.find { true } done_task_info(task) end end |
#policy_for(source_task, source_port_name, sink_port_name, sink_task, policy) ⇒ Object
Given the current knowledge about the port dynamics, returns the policy for the provided connection
policy
is either the current connection policy, or a hash with
only a :fallback_policy value that contains a possible policy if the actual
one cannot be computed.
540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 |
# File 'lib/syskit/network_generation/dataflow_dynamics.rb', line 540 def policy_for(source_task, source_port_name, sink_port_name, sink_task, policy) policy = policy.dup fallback_policy = policy.delete(:fallback_policy) # Don't do anything if the policy has already been set if !policy.empty? DataFlowDynamics.debug " #{source_task}:#{source_port_name} => #{sink_task}:#{sink_port_name} already connected with #{policy}" return policy end source_port = source_task.model.find_output_port(source_port_name) sink_port = sink_task.model.find_input_port(sink_port_name) if !source_port raise InternalError, "#{source_port_name} is not a port of #{source_task.model}" elsif !sink_port raise InternalError, "#{sink_port_name} is not a port of #{sink_task.model}" end DataFlowDynamics.debug { " #{source_task}:#{source_port.name} => #{sink_task}:#{sink_port.name}" } if !sink_port.needs_reliable_connection? if sink_port.required_connection_type == :data policy = Orocos::Port.prepare_policy(:type => :data) DataFlowDynamics.debug { " result: #{policy}" } return policy elsif sink_port.required_connection_type == :buffer policy = Orocos::Port.prepare_policy(:type => :buffer, :size => 1) DataFlowDynamics.debug { " result: #{policy}" } return policy end end # Compute the buffer size input_dynamics = if has_final_information_for_port?(source_task, source_port.name) port_info(source_task, source_port.name) end sink_task_dynamics = if has_final_information_for_task?(sink_task) task_info(sink_task) end reading_latency = if sink_port.trigger_port? sink_task.trigger_latency elsif sink_task_dynamics && sink_task_dynamics.minimal_period sink_task_dynamics.minimal_period + sink_task.trigger_latency end if !input_dynamics || !reading_latency if fallback_policy if !input_dynamics DataFlowDynamics.warn do DataFlowDynamics.warn "Cannot compute the period information for the output port" DataFlowDynamics.warn " #{source_task}:#{source_port.name}" DataFlowDynamics.warn " This is needed to compute the policy to connect to" DataFlowDynamics.warn " #{sink_task}:#{sink_port_name}" DataFlowDynamics.warn " The fallback policy #{fallback_policy} will be used" break end else DataFlowDynamics.warn "#{sink_task} has no minimal period" DataFlowDynamics.warn "This is needed to compute the reading latency on #{sink_port.name}" DataFlowDynamics.warn "The fallback policy #{fallback_policy} will be used" end policy = fallback_policy elsif !input_dynamics raise SpecError, "the period information for output port #{source_task}:#{source_port.name} cannot be computed. This is needed to compute the policy to connect to #{sink_task}:#{sink_port_name}" else raise SpecError, "#{sink_task} has no minimal period, needed to compute reading latency on #{sink_port.name}" end else policy[:type] = :buffer size = (1.0 + Syskit.conf.buffer_size_margin) * input_dynamics.queue_size(reading_latency) policy[:size] = Integer(size) + 1 DataFlowDynamics.debug do DataFlowDynamics.debug " input_period:#{input_dynamics.minimal_period} => reading_latency:#{reading_latency}" DataFlowDynamics.debug " sample_size:#{input_dynamics.sample_size}" input_dynamics.triggers.each do |tr| DataFlowDynamics.debug " trigger(#{tr.name}): period=#{tr.period} count=#{tr.sample_count}" end break end DataFlowDynamics.debug { " result: #{policy}" } end policy end |
#propagate_task(task) ⇒ Object
467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 |
# File 'lib/syskit/network_generation/dataflow_dynamics.rb', line 467 def propagate_task(task) if !missing_ports.has_key?(task) return true end done = true required = missing_ports[task].dup DataFlowDynamics.debug do DataFlowDynamics.debug "trying to compute dataflow dynamics for #{task}" DataFlowDynamics.debug " requires information on: #{required.map(&:to_s).join(", ")}" break end required.each do |missing| if !compute_info_for(task, missing) DataFlowDynamics.debug do DataFlowDynamics.debug " cannot compute information on #{missing}" break end done = false end end done end |
#required_information(tasks) ⇒ Object
Returns the set of objects for which information is required as an output of the algorithm
The returned value is a map:
task => ports
Where ports
is the set of port names that are required on
task
. nil
can be used to denote the task itself.
414 415 416 417 418 419 420 421 422 423 424 |
# File 'lib/syskit/network_generation/dataflow_dynamics.rb', line 414 def required_information(tasks) result = Hash.new tasks.each do |t| ports = t.model.each_output_port.to_a if !ports.empty? result[t] = ports.map(&:name).to_set result[t] << nil end end result end |
#reset(tasks = Array.new) ⇒ Object
163 164 165 166 167 168 169 170 |
# File 'lib/syskit/network_generation/dataflow_dynamics.rb', line 163 def reset(tasks = Array.new) super @triggers = Hash.new { |h, k| h[k] = Set.new } @task_from_name = Hash.new tasks.each do |t| task_from_name[t.orocos_name] = t end end |
#task_info(task) ⇒ Object
188 189 190 |
# File 'lib/syskit/network_generation/dataflow_dynamics.rb', line 188 def task_info(task) port_info(task, nil) end |
#triggering_inputs(task) ⇒ Object
Computes the set of input ports in task
that are used during
the information propagation
376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 |
# File 'lib/syskit/network_generation/dataflow_dynamics.rb', line 376 def triggering_inputs(task) all_triggers = Set.new @triggers[[task, nil]] = Set.new task.model.each_event_port do |port| if task.has_concrete_input_connection?(port.name) all_triggers << port @triggers[[task, nil]] << [task, port.name] end end task.model.each_output_port do |port| if port.triggered_on_update? @triggers[[task, port.name]] << [task, nil] end port.port_triggers.each do |trigger_port| if task.has_concrete_input_connection?(trigger_port.name) @triggers[[task, port.name]] << [task, trigger_port.name] all_triggers << trigger_port end end end task.model.each_output_port do |port| if !@triggers.has_key?([task, port.name]) done_port_info(task, port.name) end end all_triggers end |