Class: Syskit::NetworkGeneration::MergeSolver

Inherits:
Object
  • Object
show all
Extended by:
Logger::Hierarchy
Includes:
Logger::Hierarchy, Roby::DRoby::EventLogging
Defined in:
lib/syskit/network_generation/merge_solver.rb

Overview

Implementation of the algorithms needed to reduce a component network to the minimal set of components that are actually needed

This is the core of the system deployment algorithm implemented in Engine

Constant Summary collapse

@@trace_file_pattern =
"syskit-trace-%04i.%i"
@@trace_enabled =
false
@@trace_count =
0
@@trace_last_phase =
1

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(plan, event_logger: plan.event_logger) ⇒ MergeSolver

Returns a new instance of MergeSolver



35
36
37
38
39
40
41
42
43
44
# File 'lib/syskit/network_generation/merge_solver.rb', line 35

def initialize(plan, event_logger: plan.event_logger)
    @plan = plan
    @event_logger = event_logger
    @dataflow_graph = plan.task_relation_graph_for(Flows::DataFlow)
    @dependency_graph = plan.task_relation_graph_for(Roby::TaskStructure::Dependency)
    @merging_candidates_queries = Hash.new
    @task_replacement_graph = Roby::Relations::BidirectionalDirectedAdjacencyGraph.new
    @resolved_replacements = Hash.new
    @invalid_merges = Set.new
end

Instance Attribute Details

#dataflow_graphObject (readonly)

The dataflow graph for #plan



17
18
19
# File 'lib/syskit/network_generation/merge_solver.rb', line 17

def dataflow_graph
  @dataflow_graph
end

#dependency_graphObject (readonly)

The dataflow graph for #plan



20
21
22
# File 'lib/syskit/network_generation/merge_solver.rb', line 20

def dependency_graph
  @dependency_graph
end

#event_loggerObject (readonly)

The Roby::DRoby::EventLogger object on which we log performance information



33
34
35
# File 'lib/syskit/network_generation/merge_solver.rb', line 33

def event_logger
  @event_logger
end

#invalid_mergesSet<(Syskit::Component,Syskit::Component)> (readonly)

The list of merges that are known to be invalid, as (merged_task, task)



29
30
31
# File 'lib/syskit/network_generation/merge_solver.rb', line 29

def invalid_merges
  @invalid_merges
end

#planObject (readonly)

The plan on which this solver applies



14
15
16
# File 'lib/syskit/network_generation/merge_solver.rb', line 14

def plan
  @plan
end

#task_replacement_graphObject (readonly)

A graph that holds all replacements done during resolution



23
24
25
# File 'lib/syskit/network_generation/merge_solver.rb', line 23

def task_replacement_graph
  @task_replacement_graph
end

Class Method Details

.disable_tracingObject



148
149
150
# File 'lib/syskit/network_generation/merge_solver.rb', line 148

def self.disable_tracing
    @@trace_enabled = false
end

.enable_tracingObject



144
145
146
# File 'lib/syskit/network_generation/merge_solver.rb', line 144

def self.enable_tracing
    @@trace_enabled = true
end

.merge_identical_tasks(plan) ⇒ Object

Create a new solver on the given plan and perform #merge_identical_tasks



188
189
190
191
# File 'lib/syskit/network_generation/merge_solver.rb', line 188

def self.merge_identical_tasks(plan)
    solver = MergeSolver.new(plan)
    solver.merge_identical_tasks
end

.trace?Boolean

Returns:

  • (Boolean)


152
153
154
# File 'lib/syskit/network_generation/merge_solver.rb', line 152

def self.trace?
    @@trace_enabled
end

.trace_export(plan, phase: 1, highlights: [], **dataflow_options) ⇒ Object



177
178
179
180
181
182
183
184
# File 'lib/syskit/network_generation/merge_solver.rb', line 177

def self.trace_export(plan, phase: 1, highlights: [], **dataflow_options)
    basename  = trace_next_file(phase)
    dataflow = basename + ".dataflow.svg"
    hierarchy = basename + ".hierarchy.svg"
    Syskit::Graphviz.new(plan).to_file('dataflow', 'svg', dataflow, highlights: highlights, **dataflow_options)
    Syskit::Graphviz.new(plan).to_file('hierarchy', 'svg', hierarchy, highlights: highlights)
    ::Robot.info "#{self} exported trace plan to #{dataflow} and #{hierarchy}"
end

.trace_file_patternObject



156
157
158
# File 'lib/syskit/network_generation/merge_solver.rb', line 156

def self.trace_file_pattern
    @@trace_file_pattern
end

.trace_file_pattern=(pattern) ⇒ Object



160
161
162
# File 'lib/syskit/network_generation/merge_solver.rb', line 160

def self.trace_file_pattern=(pattern)
    @@trace_file_pattern = pattern
end

.trace_next_file(phase) ⇒ Object



169
170
171
172
173
174
175
# File 'lib/syskit/network_generation/merge_solver.rb', line 169

def self.trace_next_file(phase)
    if @@trace_last_phase >= phase
        @@trace_count += 1
    end
    @@trace_last_phase = phase
    trace_file_pattern % [@@trace_count, phase]
end

Instance Method Details

#apply_merge_group(merged_task_to_task) ⇒ Object

Apply a set of merges computed by #resolve_merge



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/syskit/network_generation/merge_solver.rb', line 90

def apply_merge_group(merged_task_to_task)
    debug do
        merged_task_to_task.each do |merged_task, task|
            debug "merging"
            log_nest(2) do
                log_pp :debug, merged_task
            end
            debug "into"
            log_nest(2) do
                log_pp :debug, task
            end
        end
        break
    end

    if self.class.trace?
        remove_compositions = true
        if merged_task_to_task.each_key.any? { |t| t.kind_of?(Syskit::Composition) }
            remove_compositions = false
        end
        self.class.trace_export(plan, phase: 1, highlights: (merged_task_to_task.keys + merged_task_to_task.values), remove_compositions: remove_compositions)
    end

    merged_task_to_task.each do |merged_task, task|
        if merged_task == task
            raise "trying to merge a task onto itself: #{merged_task}"
        end
        if task.respond_to?(:merge)
            task.merge(merged_task)
        end
    end

    merged_event_to_event = Hash.new
    event_resolver = ->(e) { merged_task_to_task[e.task].event(e.symbol) }
    task_replacements = merged_task_to_task.map_value do |merged_task, task|
        merged_task.each_event do |ev|
            merged_event_to_event[ev] = [nil, event_resolver]
        end
        [task]
    end
    plan.replace_subplan(task_replacements, merged_event_to_event)

    merged_task_to_task.each do |merged_task, task|
        if !merged_task.transaction_proxy?
            plan.remove_task(merged_task)
        end
        register_replacement(merged_task, task)
    end

    if self.class.trace?
        self.class.trace_export(plan, phase: 2, highlights: merged_task_to_task.values, remove_compositions: remove_compositions)
    end
end

#clearObject



46
47
48
49
50
# File 'lib/syskit/network_generation/merge_solver.rb', line 46

def clear
    @task_replacement_graph.clear
    @resolved_replacements.clear
    @invalid_merges.clear
end

#composition_children_by_role(task) ⇒ Object



309
310
311
312
313
314
315
316
317
318
319
320
321
322
# File 'lib/syskit/network_generation/merge_solver.rb', line 309

def composition_children_by_role(task)
    result = Hash.new
    task_children_names = task.model.children_names.to_set
    task.each_out_neighbour_merged(
            Roby::TaskStructure::Dependency, intrusive: true).
        map do |child_task|
            dependency_graph.edge_info(task, child_task)[:roles].each do |r|
                if task_children_names.include?(r)
                    result[r] = child_task
                end
            end
        end
    result
end

#display_merge_graph(title, merge_graph) ⇒ Object



515
516
517
518
519
520
521
522
523
524
525
# File 'lib/syskit/network_generation/merge_solver.rb', line 515

def display_merge_graph(title, merge_graph)
    debug "  -- #{title}"
    debug do
        merge_graph.each_vertex do |vertex|
            vertex.each_child_vertex(merge_graph) do |child|
                debug "    #{vertex}.merge(#{child})"
            end
        end
        break
    end
end

#each_component_merge_candidate(task) ⇒ Object



225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
# File 'lib/syskit/network_generation/merge_solver.rb', line 225

def each_component_merge_candidate(task)
    # Get the set of candidates. We are checking if the tasks in
    # this set can be replaced by +task+
    candidates = plan.find_local_tasks(task.model.concrete_model).
        to_a
    debug do
        debug "#{candidates.to_a.size - 1} candidates for #{task}, matching model"
        debug "  #{task.model.concrete_model}"
        break
    end

    candidates.each do |merged_task|
        next if task == merged_task

        debug { "  #{merged_task}" }
        if merged_task.placeholder?
            debug "    data service proxy"
            next
        elsif !merged_task.plan
            debug "    removed from plan"
            next
        elsif invalid_merges.include?([merged_task, task])
            debug "    already evaluated as an invalid merge"
            next
        end
        yield(merged_task)
    end
end

#each_composition_merge_candidate(task) ⇒ Object



365
366
367
368
369
370
371
372
373
# File 'lib/syskit/network_generation/merge_solver.rb', line 365

def each_composition_merge_candidate(task)
    each_component_merge_candidate(task) do |merged_task|
        if may_merge_compositions?(merged_task, task)
            yield(merged_task)
        else
            invalid_merges << [merged_task, task]
        end
    end
end

#each_task_context_merge_candidate(task) ⇒ Object



254
255
256
257
258
259
260
261
262
263
264
# File 'lib/syskit/network_generation/merge_solver.rb', line 254

def each_task_context_merge_candidate(task)
    each_component_merge_candidate(task) do |merged_task|
        if may_merge_task_contexts?(merged_task, task)
            debug "  may merge"
            yield(merged_task)
        else
            debug "  invalid merge: may_merge_task_contexts? returned false"
            invalid_merges << [merged_task, task]
        end
    end
end

#enumerate_composition_exports(task) ⇒ Object



294
295
296
297
298
299
300
301
302
303
304
305
306
307
# File 'lib/syskit/network_generation/merge_solver.rb', line 294

def enumerate_composition_exports(task)
    task_exports = Set.new
    task.each_input_connection do |source_task, source_port, sink_port, _|
        if task.find_output_port(sink_port)
            task_exports << [source_task, source_port, sink_port]
        end
    end
    task.each_output_connection do |source_port, sink_task, sink_port, _|
        if task.find_input_port(source_port)
            task_exports << [source_port, sink_task, sink_port]
        end
    end
    task_exports
end

#may_merge_compositions?(merged_task, task) ⇒ Boolean

Returns:

  • (Boolean)


324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
# File 'lib/syskit/network_generation/merge_solver.rb', line 324

def may_merge_compositions?(merged_task, task)
    if !may_merge_task_contexts?(merged_task, task)
        return false
    end

    merged_task_children = composition_children_by_role(merged_task)
    task_children        = composition_children_by_role(task)
    merged_children = merged_task_children.merge(task_children) do |role, merged_task_child, task_child|
        if merged_task_child == task_child
            merged_task_child
        else
            info "rejected: compositions with different children or children in different roles"
            debug do
                debug "  in role #{role},"
                log_nest(2) do
                    log_pp(:debug, merged_task_child)
                end
                log_nest(2) do
                    log_pp(:debug, task_child)
                end
            end
            return false
        end
    end

    if merged_children.each_value.any? { |t| t.placeholder? }
        info "rejected: compositions still have unresolved children"
        return false
    end

    # Now verify that the exported ports are the same
    task_exports = enumerate_composition_exports(task)
    merged_task_exports = enumerate_composition_exports(merged_task)
    if merged_task_exports != task_exports
        info "rejected: compositions with different exports"
        return false
    end

    true
end

#may_merge_task_contexts?(merged_task, task) ⇒ false, true

Tests whether task.merge(target_task) is a valid operation

Parameters:

Returns:

  • (false, true)

    if false, the merge is not possible. If true, it is possible. If nil, the only thing that makes the merge impossible are missing inputs, and these tasks might therefore be merged if there was a dataflow cycle



202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
# File 'lib/syskit/network_generation/merge_solver.rb', line 202

def may_merge_task_contexts?(merged_task, task)
    can_merge = log_nest(2) do
        task.can_merge?(merged_task)
    end

    # Ask the task about intrinsic merge criteria.
    # Component#can_merge?  should not look at the relation graphs,
    # only at criteria internal to the tasks.
    if !can_merge
        info "rejected: can_merge? returned false"
        return false
    end

    # Merges involving a deployed task can only involve a
    # non-deployed task as well
    if task.execution_agent && merged_task.execution_agent
        info "rejected: deployment attribute mismatches"
        return false
    end

    true
end

#merge_compositionsObject



375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
# File 'lib/syskit/network_generation/merge_solver.rb', line 375

def merge_compositions
    debug "merging compositions"

    queue   = Array.new
    topsort = Array.new
    degrees = Hash.new
    dependency_graph.each_vertex do |task|
        d = dependency_graph.out_degree(task)
        queue << task if d == 0
        degrees[task] = d
    end

    while !queue.empty?
        task = queue.shift
        if task.kind_of?(Syskit::Composition)
            topsort << task
        end
        dependency_graph.each_in_neighbour(task) do |parent|
            d = (degrees[parent] -= 1)
            queue << parent if d == 0
        end
    end

    topsort.each do |composition|
        next if !composition.plan
        each_composition_merge_candidate(composition) do |merged_composition|
            apply_merge_group(merged_composition => composition)
        end
    end
end

#merge_identical_tasksObject



501
502
503
504
505
506
507
508
509
510
511
512
513
# File 'lib/syskit/network_generation/merge_solver.rb', line 501

def merge_identical_tasks
    log_timepoint_group_start 'syskit-merge-solver'
    dataflow_graph.enable_concrete_connection_graph
    log_timepoint_group 'merge_task_contexts' do
        merge_task_contexts
    end
    log_timepoint_group 'merge_compositions' do
        merge_compositions
    end
ensure
    dataflow_graph.disable_concrete_connection_graph
    log_timepoint_group_end 'syskit-merge-solver'
end

#merge_task_contextsObject

Merge the task contexts



267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
# File 'lib/syskit/network_generation/merge_solver.rb', line 267

def merge_task_contexts
    debug "merging task contexts"

    queue = plan.find_local_tasks(Syskit::TaskContext).sort_by do |t|
        dataflow_graph.in_degree(t)
    end.reverse

    invalid_merges.clear
    while !queue.empty?
        task = queue.shift
        # 'task' could have been merged already, ignore it
        next if !task.plan

        each_task_context_merge_candidate(task) do |merged_task|
            # Try to resolve the merge
            can_merge, mappings =
                resolve_merge(merged_task, task, merged_task => task)

            if can_merge
                apply_merge_group(mappings)
            else
                invalid_merges << [merged_task, task]
            end
        end
    end
end

#register_replacement(old_task, new_task) ⇒ void

This method returns an undefined value.

Registers a replacement in the plan

Parameters:

  • old_task (Roby::Task)

    the task that is being replaced

  • new_task (Roby::Task)

    the task that replaced old_task



81
82
83
84
85
86
87
# File 'lib/syskit/network_generation/merge_solver.rb', line 81

def register_replacement(old_task, new_task)
    if concrete_graph = dataflow_graph.concrete_connection_graph
        concrete_graph.replace_vertex(old_task, new_task)
    end

    task_replacement_graph.add_edge(old_task, new_task, nil)
end

#replacement_for(task) ⇒ Roby::Task

Returns the task that is used in place of the given task

Parameters:

  • the (Roby::Task)

    task for which we want to know the replacement

Returns:

  • (Roby::Task)

See Also:



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/syskit/network_generation/merge_solver.rb', line 58

def replacement_for(task)
    if replacement = @resolved_replacements[task]
        # Verify that this is still a leaf in the replacement graph
        if task_replacement_graph.leaf?(replacement)
            return replacement
        end
        @resolved_replacements.delete(task)
    end

    task_replacement_graph.depth_first_visit(task) do |to|
        if task_replacement_graph.leaf?(to)
            @resolved_replacements[task] = to
            return to
        end
    end
    return task
end

#resolve_input_matching(merged_task, task) ⇒ Array<(String,String,Roby::Task,Roby::Task)>?

Returns the set of inputs that differ in two given components, possibly using merge cycle information

Parameters:

  • mapping (Hash<Roby::Task,Roby::Task>)

    from the set of target tasks into the set of tasks that should be used to compare the inputs. This is exploited when resolving cycles

Returns:

  • (Array<(String,String,Roby::Task,Roby::Task)>, nil)

    If nil, the two tasks have inputs that do not match and could not match even after a merge cycle resolution pass. Otherwise, the set of mismatching inputs is returned, in which each mismatch is a tuple (port_name,source_port,task_source,target_source).



457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
# File 'lib/syskit/network_generation/merge_solver.rb', line 457

def resolve_input_matching(merged_task, task)
    m_inputs = Hash.new { |h, k| h[k] = Hash.new }
    merged_task.each_concrete_input_connection do |m_source_task, m_source_port, sink_port, m_policy|
        m_inputs[sink_port][[m_source_task, m_source_port]] = m_policy
    end

    mismatched_inputs = []
    task.each_concrete_input_connection do |source_task, source_port, sink_port, policy|
        # If +self+ has no connection on +sink_port+, it is valid
        if !m_inputs.has_key?(sink_port)
            next
        end

        if m_policy = m_inputs[sink_port][[source_task, source_port]]
            if !m_policy.empty? && !policy.empty? && (Syskit.update_connection_policy(m_policy, policy) != policy)
                debug { "rejected: incompatible policies on #{sink_port}" }
                return
            end
            next
        end

        # Different connections, check whether we could multiplex
        # them
        if (port_model = merged_task.model.find_input_port(sink_port)) && port_model.multiplexes?
            next
        end

        # If we are not multiplexing, there can be only one source
        # for merged_task
        (m_source_task, m_source_port), m_policy = m_inputs[sink_port].first
        if m_source_port != source_port
            debug { "rejected: sink #{sink_port} is connected to a port named #{m_source_port} resp. #{source_port}" }
            return
        end
        if !m_policy.empty? && !policy.empty? && (Syskit.update_connection_policy(m_policy, policy) != policy)
            debug { "rejected: incompatible policies on #{sink_port}" }
            return
        end

        mismatched_inputs << [sink_port, m_source_task, source_task]
    end
    mismatched_inputs
end

#resolve_merge(merged_task, task, mappings) ⇒ Object



406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
# File 'lib/syskit/network_generation/merge_solver.rb', line 406

def resolve_merge(merged_task, task, mappings)
    mismatched_inputs = log_nest(2) { resolve_input_matching(merged_task, task) }
    if !mismatched_inputs
        # Incompatible inputs
        return false, mappings
    end

    mismatched_inputs.each do |sink_port, merged_source_task, source_task|
        info do
            info "  looking to pair the inputs of port #{sink_port} of"
            info "    #{merged_source_task}"
            info "    -- and --"
            info "    #{source_task}"
            break
        end

        if mappings[merged_source_task] == source_task
            info "  are already paired in the merge resolution: matching"
            next
        elsif !may_merge_task_contexts?(merged_source_task, source_task)
            info "  rejected: may not be merged"
            return false, mappings
        end

        can_merge, mappings = log_nest(2) do
            resolve_merge(merged_source_task, source_task,
                          mappings.merge(merged_source_task => source_task))
        end

        if can_merge
            info "  resolved"
        else
            info "  rejected: cannot find mapping to merge both tasks"
            return false, mappings
        end
    end

    return true, mappings
end