Class: Syskit::NetworkGeneration::DataFlowDynamics

Inherits:
DataFlowComputation show all
Defined in:
lib/syskit/network_generation/dataflow_dynamics.rb

Overview

Algorithms that make use of the dataflow modelling

The main task of this class is to compute the update rates and the default policies for each of the existing connections in plan. The resulting information is stored in #dynamics

Instance Attribute Summary collapse

Attributes inherited from DataFlowComputation

#done_ports, #missing_ports, #result, #triggering_connections, #triggering_dependencies

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from DataFlowComputation

#add_port_info, #apply_merges, #done_port_info, #has_final_information_for_port?, #has_information_for_port?, #port_info, #propagate, #remove_port_info, #set_port_info, #triggering_port_connections

Constructor Details

#initialize(plan) ⇒ DataFlowDynamics

Returns a new instance of DataFlowDynamics



152
153
154
155
# File 'lib/syskit/network_generation/dataflow_dynamics.rb', line 152

def initialize(plan)
    @plan = plan
    super()
end

Instance Attribute Details

#planObject (readonly)

Returns the value of attribute plan



143
144
145
# File 'lib/syskit/network_generation/dataflow_dynamics.rb', line 143

def plan
  @plan
end

#task_from_nameObject (readonly)

Mapping from a deployed task name to the corresponding Roby task object

This is necessary to speedup lookup in some places of the algorithm



150
151
152
# File 'lib/syskit/network_generation/dataflow_dynamics.rb', line 150

def task_from_name
  @task_from_name
end

#triggersObject (readonly)

Returns the value of attribute triggers



145
146
147
# File 'lib/syskit/network_generation/dataflow_dynamics.rb', line 145

def triggers
  @triggers
end

Class Method Details

.compute_connection_policies(plan) ⇒ Object



157
158
159
160
161
# File 'lib/syskit/network_generation/dataflow_dynamics.rb', line 157

def self.compute_connection_policies(plan)
    engine = DataFlowDynamics.new(plan)
    engine.compute_connection_policies
    engine.result
end

Instance Method Details

#add_port_trigger(task, port_name, name, period, burst) ⇒ Object



211
212
213
214
215
216
217
218
219
# File 'lib/syskit/network_generation/dataflow_dynamics.rb', line 211

def add_port_trigger(task, port_name, name, period, burst)
    if has_information_for_port?(task, port_name)
        @result[task][port_name].add_trigger(name, period, burst)
    else
        info = create_port_info(task, port_name)
        info.add_trigger(name, period, burst)
        info
    end
end

#add_task_info(task, info) ⇒ Object



184
185
186
# File 'lib/syskit/network_generation/dataflow_dynamics.rb', line 184

def add_task_info(task, info)
    add_port_info(task, nil, info)
end

#add_task_trigger(task, name, period, burst) ⇒ Object



180
181
182
# File 'lib/syskit/network_generation/dataflow_dynamics.rb', line 180

def add_task_trigger(task, name, period, burst)
    add_port_trigger(task, nil, name, period, burst)
end

#compute_connection_policiesObject

Computes desired connection policies, based on the port dynamics and the oroGen's input port specifications. See the user's guide for more details

It updates DataFlow#concrete_connection_policies



497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
# File 'lib/syskit/network_generation/dataflow_dynamics.rb', line 497

def compute_connection_policies
    # We only act on deployed tasks, as we need to know how the
    # tasks are triggered (what activity / priority / ...)
    deployed_tasks = plan.find_local_tasks(TaskContext).
        find_all(&:execution_agent)

    propagate(deployed_tasks)

    DataFlowDynamics.debug do
        DataFlowDynamics.debug "computing connections"
        deployed_tasks.each do |t|
            DataFlowDynamics.debug "  #{t}"
        end

        DataFlowDynamics.debug "available information for"
        result.each do |task, ports|
            DataFlowDynamics.debug "  #{task}: #{ports.keys.join(", ")}"
        end
        break
    end

    dataflow_graph = plan.task_relation_graph_for(Flows::DataFlow)
    connection_graph = dataflow_graph.compute_concrete_connection_graph
    policy_graph = Flows::DataFlow::ConcreteConnectionGraph.new
    deployed_tasks.each do |source_task|
        connection_graph.each_out_neighbour(source_task) do |sink_task|
            mappings = connection_graph.edge_info(source_task, sink_task)
            computed_policies = mappings.map_value do |(source_port_name, sink_port_name), policy|
                policy_for(source_task, source_port_name, sink_port_name, sink_task, policy)
            end
            policy_graph.add_edge(source_task, sink_task, computed_policies)
        end
    end
    #dataflow_graph.reset_computed_policies(policy_graph)
    result
end

#compute_info_for(task, port_name) ⇒ Object

Try to compute the information for the given task and port (or, if port_name is nil, for the task). Returns true if the required information could be computed as requested, and false otherwise.



439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
# File 'lib/syskit/network_generation/dataflow_dynamics.rb', line 439

def compute_info_for(task, port_name)
    triggers = @triggers[[task, port_name]].map do |trigger_task, trigger_port|
        if has_final_information_for_port?(trigger_task, trigger_port)
            port_info(trigger_task, trigger_port)
        else
            DataFlowDynamics.debug do
                DataFlowDynamics.debug "  missing info on "\
                    "#{trigger_task}.#{trigger_port} to compute "\
                    "#{task}.#{port_name}"
                break
            end
            return false
        end
    end

    if (period = find_period_of(task))
        triggers = triggers.map do |trigger_info|
            trigger_info.sampled_at(period)
        end
    end

    triggers.each do |trigger_info|
        add_port_info(task, port_name, trigger_info)
    end
    done_port_info(task, port_name)
    return true
end

#create_port_info(task, port_name) ⇒ Object



202
203
204
205
206
207
208
209
# File 'lib/syskit/network_generation/dataflow_dynamics.rb', line 202

def create_port_info(task, port_name)
    port_model = task.model.find_port(port_name)
    dynamics = PortDynamics.new("#{task.orocos_name}.#{port_model.name}",
                            port_model.sample_size)
    dynamics.add_trigger("burst", port_model.burst_period, port_model.burst_size)
    set_port_info(task, port_name, dynamics)
    dynamics
end

#done_task_info(task) ⇒ Object



192
193
194
195
196
197
198
199
200
# File 'lib/syskit/network_generation/dataflow_dynamics.rb', line 192

def done_task_info(task)
    task.orogen_model.slaves.each do |slave_task|
        if slave_task = task_from_name[slave_task.name]
            add_task_info(slave_task, task_info(task))
            done_task_info(slave_task)
        end
    end
    done_port_info(task, nil)
end

#find_period_of(task) ⇒ Object



426
427
428
429
430
431
432
433
434
# File 'lib/syskit/network_generation/dataflow_dynamics.rb', line 426

def find_period_of(task)
    orogen_model = task.orogen_model
    while (master = orogen_model.master)
        orogen_model = master
    end
    if orogen_model.activity_type.name == "Periodic"
        orogen_model.period
    end
end

#has_final_information_for_task?(task) ⇒ Boolean

Returns:

  • (Boolean)


176
177
178
# File 'lib/syskit/network_generation/dataflow_dynamics.rb', line 176

def has_final_information_for_task?(task)
    has_final_information_for_port?(task, nil)
end

#has_information_for_task?(task) ⇒ Boolean

Returns:

  • (Boolean)


172
173
174
# File 'lib/syskit/network_generation/dataflow_dynamics.rb', line 172

def has_information_for_task?(task)
    has_information_for_port?(task, nil)
end

#initial_combus_information(task) ⇒ Object

Computes the initial port dynamics due to the devices that go through a communication bus.



296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
# File 'lib/syskit/network_generation/dataflow_dynamics.rb', line 296

def initial_combus_information(task)
    handled_ports = Set.new
    task.each_attached_device do |dev|
        srv = task.find_data_service(dev.name)
        srv.each_input_port do |port|
            handled_ports << port.name
            port = port.to_component_port
            dynamics = PortDynamics.new("#{task.orocos_name}.#{port.name}", dev.sample_size)
            if dev.period
                dynamics.add_trigger(dev.name, dev.period, 1)
                dynamics.add_trigger(dev.name, dev.period * dev.burst, dev.burst)
            end
            add_port_info(task, port.name, dynamics)
        end
    end
    handled_ports.each do |port_name|
        done_port_info(task, port_name)
    end
end

#initial_device_information(task) ⇒ Object

Adds triggering information from the attached devices to task's ports



222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
# File 'lib/syskit/network_generation/dataflow_dynamics.rb', line 222

def initial_device_information(task)
    triggering_devices = task.model.each_master_driver_service.map do |srv|
        [srv, task.find_device_attached_to(srv)]
    end
    DataFlowDynamics.debug do
        DataFlowDynamics.debug "initial port dynamics on #{task} (device)"
        DataFlowDynamics.debug "  attached devices: #{triggering_devices.map { |srv, dev| "#{dev.name} on #{srv.name}" }.join(", ")}"
        break
    end

    activity_type = task.orogen_model.activity_type.name
    case activity_type
    when "Periodic"
        initial_device_information_periodic_triggering(
            task, triggering_devices.to_a, task.orogen_model.period)
    else
        initial_device_information_internal_triggering(
            task, triggering_devices.to_a)
    end
end

#initial_device_information_common(task, triggering_devices) ⇒ Object

Common external loop for adding initial device information in #initial_device_information. It is used by initial_device_information_periodic_triggering and initial_device_information_internal_triggering



247
248
249
250
251
252
253
254
255
256
257
258
259
260
# File 'lib/syskit/network_generation/dataflow_dynamics.rb', line 247

def initial_device_information_common(task, triggering_devices)
    triggering_devices.each do |service, device|
        DataFlowDynamics.debug { "  #{device.name}: #{device.period} #{device.burst}" }
        device_dynamics = PortDynamics.new(device.name, 1)
        if device.period
            device_dynamics.add_trigger(device.name, device.period, 1)
        end
        device_dynamics.add_trigger(device.name + "-burst", 0, device.burst)

        if !device_dynamics.empty?
            yield(service, device, device_dynamics)
        end
    end
end

#initial_device_information_internal_triggering(task, triggering_devices) ⇒ Object

Computes the initial port dynamics due to devices when the task gets triggered by the devices it is attached to



264
265
266
267
268
269
270
271
272
273
274
275
276
# File 'lib/syskit/network_generation/dataflow_dynamics.rb', line 264

def initial_device_information_internal_triggering(task, triggering_devices)
    DataFlowDynamics.debug "  is triggered internally"

    initial_device_information_common(task, triggering_devices) do |service, device, device_dynamics|
        add_task_info(task, device_dynamics)
        service.each_output_port do |out_port|
            out_port = out_port.to_component_port
            out_port.orogen_model.triggered_on_update = false
            add_port_info(task, out_port.name, device_dynamics)
            done_port_info(task, out_port.name)
        end
    end
end

#initial_device_information_periodic_triggering(task, triggering_devices, period) ⇒ Object

Computes the initial port dynamics due to devices when the task is triggered periodically



280
281
282
283
284
285
286
287
288
289
290
291
292
# File 'lib/syskit/network_generation/dataflow_dynamics.rb', line 280

def initial_device_information_periodic_triggering(task, triggering_devices, period)
    DataFlowDynamics.debug { "  is triggered with a period of #{period} seconds" }

    initial_device_information_common(task, triggering_devices) do |service, device, device_dynamics|
        service.each_output_port do |out_port|
            out_port = out_port.to_component_port
            out_port.orogen_model.triggered_on_update = false
            add_port_trigger(task, out_port.name,
                device.name, period, device_dynamics.queue_size(period))
            done_port_info(task, out_port.name)
        end
    end
end

#initial_information(task) ⇒ Object

Computes the initial port dynamics, i.e. the dynamics that can be computed without knowing anything about the dataflow



318
319
320
321
# File 'lib/syskit/network_generation/dataflow_dynamics.rb', line 318

def initial_information(task)
    return if task.orogen_model.master
    initial_task_information(task)
end

#initial_slaves_information(task) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Computes a task's slaves initial information



326
327
328
329
330
331
332
333
334
# File 'lib/syskit/network_generation/dataflow_dynamics.rb', line 326

def initial_slaves_information(task)
    task.orogen_model.slaves.each do |orogen_slave_task|
        if slave_task = task_from_name[orogen_slave_task.name]
            if !has_information_for_task?(slave_task)
                initial_task_information(slave_task)
            end
        end
    end
end

#initial_task_information(task) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Computes a task's initial information



339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
# File 'lib/syskit/network_generation/dataflow_dynamics.rb', line 339

def initial_task_information(task)
    initial_slaves_information(task)

    set_port_info(task, nil, PortDynamics.new("#{task.orocos_name}.main"))
    task.model.each_output_port do |port|
        create_port_info(task, port.name)
    end

    add_task_info(task, task.requirements.dynamics.task)
    task.requirements.dynamics.ports.each do |port_name, dynamics|
        add_port_info(task, port_name, dynamics)
        done_port_info(task, port_name)
    end

    if task.kind_of?(Device)
        initial_device_information(task)
    end
    if task.kind_of?(ComBus)
        initial_combus_information(task)
    end

    activity_type = task.orogen_model.activity_type.name
    if activity_type == "Periodic"
        DataFlowDynamics.debug { "  adding periodic trigger #{task.orogen_model.period} 1" }
        add_task_trigger(task, "#{task.orocos_name}.main-period", task.orogen_model.period, 1)
        done_task_info(task)

    elsif activity_type == "SlaveActivity"
        # The master's main trigger is propagated in #done_task_info

    elsif !task.model.each_event_port.find { true }
        done_task_info(task)
    end
end

#policy_for(source_task, source_port_name, sink_port_name, sink_task, policy) ⇒ Object

Given the current knowledge about the port dynamics, returns the policy for the provided connection

policy is either the current connection policy, or a hash with only a :fallback_policy value that contains a possible policy if the actual one cannot be computed.



540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
# File 'lib/syskit/network_generation/dataflow_dynamics.rb', line 540

def policy_for(source_task, source_port_name, sink_port_name, sink_task, policy)
    policy = policy.dup
    fallback_policy = policy.delete(:fallback_policy)

    # Don't do anything if the policy has already been set
    if !policy.empty?
        DataFlowDynamics.debug " #{source_task}:#{source_port_name} => #{sink_task}:#{sink_port_name} already connected with #{policy}"
        return policy
    end

    source_port = source_task.model.find_output_port(source_port_name)
    sink_port   = sink_task.model.find_input_port(sink_port_name)
    if !source_port
        raise InternalError, "#{source_port_name} is not a port of #{source_task.model}"
    elsif !sink_port
        raise InternalError, "#{sink_port_name} is not a port of #{sink_task.model}"
    end
    DataFlowDynamics.debug { "   #{source_task}:#{source_port.name} => #{sink_task}:#{sink_port.name}" }

    if !sink_port.needs_reliable_connection?
        if sink_port.required_connection_type == :data
            policy = Orocos::Port.prepare_policy(:type => :data)
            DataFlowDynamics.debug { "     result: #{policy}" }
            return policy
        elsif sink_port.required_connection_type == :buffer
            policy = Orocos::Port.prepare_policy(:type => :buffer, :size => 1)
            DataFlowDynamics.debug { "     result: #{policy}" }
            return policy
        end
    end

    # Compute the buffer size
    input_dynamics =
        if has_final_information_for_port?(source_task, source_port.name)
            port_info(source_task, source_port.name)
        end

    sink_task_dynamics  =
        if has_final_information_for_task?(sink_task)
            task_info(sink_task)
        end

    reading_latency =
        if sink_port.trigger_port?
            sink_task.trigger_latency
        elsif sink_task_dynamics && sink_task_dynamics.minimal_period
            sink_task_dynamics.minimal_period + sink_task.trigger_latency
        end

    if !input_dynamics || !reading_latency
        if fallback_policy
            if !input_dynamics
                DataFlowDynamics.warn do
                    DataFlowDynamics.warn "Cannot compute the period information for the output port"
                    DataFlowDynamics.warn "   #{source_task}:#{source_port.name}"
                    DataFlowDynamics.warn "   This is needed to compute the policy to connect to"
                    DataFlowDynamics.warn "   #{sink_task}:#{sink_port_name}"
                    DataFlowDynamics.warn "   The fallback policy #{fallback_policy} will be used"
                    break
                end

            else
                DataFlowDynamics.warn "#{sink_task} has no minimal period"
                DataFlowDynamics.warn "This is needed to compute the reading latency on #{sink_port.name}"
                DataFlowDynamics.warn "The fallback policy #{fallback_policy} will be used"
            end
            policy = fallback_policy
        elsif !input_dynamics
            raise SpecError, "the period information for output port #{source_task}:#{source_port.name} cannot be computed. This is needed to compute the policy to connect to #{sink_task}:#{sink_port_name}"
        else
            raise SpecError, "#{sink_task} has no minimal period, needed to compute reading latency on #{sink_port.name}"
        end
    else
        policy[:type] = :buffer
        size = (1.0 + Syskit.conf.buffer_size_margin) * input_dynamics.queue_size(reading_latency)
        policy[:size] = Integer(size) + 1
        DataFlowDynamics.debug do
            DataFlowDynamics.debug "     input_period:#{input_dynamics.minimal_period} => reading_latency:#{reading_latency}"
            DataFlowDynamics.debug "     sample_size:#{input_dynamics.sample_size}"
            input_dynamics.triggers.each do |tr|
                DataFlowDynamics.debug "     trigger(#{tr.name}): period=#{tr.period} count=#{tr.sample_count}"
            end
            break
        end
        DataFlowDynamics.debug { "     result: #{policy}" }
    end

    policy
end

#propagate_task(task) ⇒ Object



467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
# File 'lib/syskit/network_generation/dataflow_dynamics.rb', line 467

def propagate_task(task)
    if !missing_ports.has_key?(task)
        return true
    end

    done = true
    required = missing_ports[task].dup
    DataFlowDynamics.debug do
        DataFlowDynamics.debug "trying to compute dataflow dynamics for #{task}"
        DataFlowDynamics.debug "  requires information on: #{required.map(&:to_s).join(", ")}"
        break
    end

    required.each do |missing|
        if !compute_info_for(task, missing)
            DataFlowDynamics.debug do
                DataFlowDynamics.debug "  cannot compute information on #{missing}"
                break
            end
            done = false
        end
    end
    done
end

#required_information(tasks) ⇒ Object

Returns the set of objects for which information is required as an output of the algorithm

The returned value is a map:

task => ports

Where ports is the set of port names that are required on task. nil can be used to denote the task itself.



414
415
416
417
418
419
420
421
422
423
424
# File 'lib/syskit/network_generation/dataflow_dynamics.rb', line 414

def required_information(tasks)
    result = Hash.new
    tasks.each do |t|
        ports = t.model.each_output_port.to_a
        if !ports.empty?
            result[t] = ports.map(&:name).to_set
            result[t] << nil
        end
    end
    result
end

#reset(tasks = Array.new) ⇒ Object



163
164
165
166
167
168
169
170
# File 'lib/syskit/network_generation/dataflow_dynamics.rb', line 163

def reset(tasks = Array.new)
    super
    @triggers = Hash.new { |h, k| h[k] = Set.new }
    @task_from_name = Hash.new
    tasks.each do |t|
        task_from_name[t.orocos_name] = t
    end
end

#task_info(task) ⇒ Object



188
189
190
# File 'lib/syskit/network_generation/dataflow_dynamics.rb', line 188

def task_info(task)
    port_info(task, nil)
end

#triggering_inputs(task) ⇒ Object

Computes the set of input ports in task that are used during the information propagation



376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
# File 'lib/syskit/network_generation/dataflow_dynamics.rb', line 376

def triggering_inputs(task)
    all_triggers = Set.new
    @triggers[[task, nil]] = Set.new
    task.model.each_event_port do |port|
        if task.has_concrete_input_connection?(port.name)
            all_triggers << port
            @triggers[[task, nil]] << [task, port.name]
        end
    end
    task.model.each_output_port do |port|
        if port.triggered_on_update?
            @triggers[[task, port.name]] << [task, nil]
        end
        port.port_triggers.each do |trigger_port|
            if task.has_concrete_input_connection?(trigger_port.name)
                @triggers[[task, port.name]] << [task, trigger_port.name]
                all_triggers << trigger_port
            end
        end
    end
    task.model.each_output_port do |port|
        if !@triggers.has_key?([task, port.name])
            done_port_info(task, port.name)
        end
    end

    all_triggers
end