Class for loading and replaying OROCOS log files.
This class creates TaskContexts and OutputPorts to simulate the recorded tasks.
actual replay speed this can be different to speed if the hard disk is too slow
array of stream annotations
last replayed sample
name of the log file which holds the logged properties is converted into Regexp
indicates if the replayed data are replayed synchronously <0 means the replayed samples are behind the simulated times >0 means that the replayed samples are replayed to fast
set it to true if processing of qt events is needed during synced replay
array of all replayed ports
this array is filled after align was called
array of all replayed properties this array is filled after align was called
desired replay speed = 1 –> record time
hash of code blocks which are used to calculate the replayed timestamps during replay
If true, the alignment algorithm is going to use the sample time for alignment. Otherwise, it uses the time at which the sample got written on disk (logical time)
See also #time_source
Creates a new instance of Replay
If a path is givien load is called after creation to load the log files.
# File lib/orocos/log/replay.rb, line 123 def initialize(*path) if !path.empty? raise ArgumentError, "Replay.new(*path) is deprecated, use Replay.open(*path) to create and load files at the same time" end @default_timestamp = nil @timestamps = Hash.new @tasks = Hash.new @annotations = Array.new @speed = 1 @replayed_ports = Array.new @replayed_properties = Array.new @replayed_objects = Array.new @used_streams = Array.new @stream = nil @current_sample = nil @process_qt_events = false @log_config_file = Replay::log_config_file reset_time_sync time_sync end
# File lib/orocos/log/replay.rb, line 107 def self.open(*path) replay = new replay.load(*path) replay rescue ArgumentError => e Orocos.error "Cannot load logfiles" raise e rescue Pocolog::Logfiles::MissingPrologue => e Orocos.error "Wrong log format" raise e end
# File lib/orocos/log/replay.rb, line 362 def advance if(@stream) return @stream.advance else throw "Stream is not initialized yet" end end
Aligns all streams which have at least:
one reader
or one connections
or track set to true.
After calling this method no more ports can be tracked.
#time_source is passed through to the StreamAligner. It can be used to override the global #time_source parameter. See #time_source for available values.
# File lib/orocos/log/replay.rb, line 310 def align( time_source = self.time_source ) @replayed_ports = Array.new @used_streams = Array.new if !replay? Log.warn "No ports are selected. Assuming that all ports shall be replayed." Log.warn "Connect port(s) or set their track flag to true to get rid of this message." track(true) end #get all streams which shall be replayed each_port do |port| if port.used? if !port.stream.empty? @replayed_ports << port @used_streams << port.stream end end port.set_replay end #get all properties which shall be replayed each_task do |task| if task.used? task.properties.values.each do |property| @replayed_properties << property @used_streams << property.stream end end end @replayed_objects = @replayed_ports + @replayed_properties Log.info "Aligning streams --> all ports which are unused will not be loaded!!!" if @used_streams.size == 0 Log.warn "No log data are replayed. All selected streams are empty." return end Log.info "Replayed Ports:" @replayed_ports.each {|port| Log.info PP.pp(port,"")} #register task on the local name server register_tasks #join streams @stream = Pocolog::StreamAligner.new(time_source, *@used_streams) @stream.rewind reset_time_sync return step end
# File lib/orocos/log/replay.rb, line 407 def aligned? return @stream != nil end
Clears all reader buffers. This is usefull if you are changing the replay direction.
# File lib/orocos/log/replay.rb, line 714 def clear_reader_buffers @tasks.each_value do |task| task.clear_reader_buffers end end
Tries to connect all input ports of the OROCOS task to simulated OutputPorts
*task => task to connect to *port_mappings => hash to define port mappings {src_port_name => dst_port_name} *ports_ignored => array of port names which shall be ignored
# File lib/orocos/log/replay.rb, line 259 def connect_to(task,port_mappings = Hash.new ,port_policies = Hash.new,ports_ignored = Array.new) #convenience block to do connect_to(task,:auto_ignore) if port_mappings == :auto_ignore ports_ignored = port_mappings port_mappings = Hash.new end ports_ignored = Array.new(ports_ignored) #start task if necessary if task.state == :PRE_OPERATIONAL task.configure end if task.state == :STOPPED task.start end #to have a better user interface the hash is inverted #port1 connect_to port2 is written as ('port1' => 'port2') port_mappings = port_mappings.invert task.each_port do |port| if port.to_orocos_port.kind_of?(Orocos::InputPort) && !ports_ignored.include?(port.name) target_port = find_port(port.type_name,port_mappings[port.name]||port.name) if target_port target_port.connect_to(port,port_policies[port.name]) elsif !ports_ignored.include? :auto_ignore raise ArgumentError, "cannot find an output port for #{port.name}" else Log.warn "No input port can be found for output port #{port.full_name}." end end end end
returns the last port which recieved data
# File lib/orocos/log/replay.rb, line 532 def current_port if @current_sample index,_,_ = @current_sample replayed_ports[index] end end
Sets a code block to calculate the default timestamp duricng replay.
# File lib/orocos/log/replay.rb, line 211 def default_timestamp(&block) if block_given? then @default_timestamp = block else @default_timestamp end end
Iterates through all simulated ports.
# File lib/orocos/log/replay.rb, line 547 def each_port(&block) @tasks.each_value do |task| task.each_port(&block) end end
Iterates through all simulated tasks
# File lib/orocos/log/replay.rb, line 584 def each_task (&block) @tasks.each_value do |task| yield(task) if block_given? end end
Returns true if the end of file is reached.
# File lib/orocos/log/replay.rb, line 606 def eof? return @stream.eof? end
exports all aligned stream to a new log file if no start and end index is given all data are exported otherwise the data are truncated according to the given global indexes the block is called for each sample to update a progress bar
# File lib/orocos/log/replay.rb, line 724 def export_to_file(file,start_index=0,end_index=0,&block) @stream.export_to_file(file,start_index,end_index,&block) end
# File lib/orocos/log/replay.rb, line 233 def find_all_ports(type_name, port_name=nil) Orocos::TaskContext.find_all_ports(ports, type_name, port_name) end
# File lib/orocos/log/replay.rb, line 236 def find_port(type_name, port_name=nil) Orocos::TaskContext.find_port(ports, type_name, port_name) end
# File lib/orocos/log/replay.rb, line 370 def first_sample_pos(stream) @stream.first_sample_pos(stream) end
# File lib/orocos/log/replay.rb, line 579 def has_task?(name) @tasks.has_key?(name.to_s) end
# File lib/orocos/log/replay.rb, line 374 def last_sample_pos(stream) @stream.last_sample_pos(stream) end
Loads a log files and creates TaskContexts which simulates the recorded tasks. You can either specify a single file or a hole directory. If you want to load more than one directory or file simultaneously you can use an array.
# File lib/orocos/log/replay.rb, line 671 def load(*paths) paths.flatten! raise ArgumentError, "No log file was given" if paths.empty? logreg = nil if paths.last.kind_of?(Typelib::Registry) logreg = paths.pop end paths.each do |path| #check if path is a directory path = File.expand_path(path) if File.directory?(path) all_files = Dir.enum_for(:glob, File.join(path, '*.*.log')) by_basename = all_files.inject(Hash.new) do |h, path| split = path.match(/^(.*)\.(\d+)\.log$/) basename, number = split[1], Integer(split[2]) h[basename] ||= Array.new h[basename][number] = path h end by_basename.each_value do |files| args = files.compact.map do |path| File.open(path) end args << logreg logfile = Pocolog::Logfiles.new(*args.compact) load_log_file(logfile, files.first) end elsif File.file?(path) file = Pocolog::Logfiles.open(path, logreg) load_log_file(file, path) else raise ArgumentError, "Can not load log file: #{path} is neither a directory nor a file" end end raise ArgumentError, "Nothing was loded from the following log files #{paths.join("; ")}" if @tasks.empty? end
Loads all the streams defined in the provided log file
# File lib/orocos/log/replay.rb, line 657 def load_log_file(file, path) Log.info " loading log file #{path}" file.streams.each do |s| if s.metadata["rock_stream_type"] == "annotations" @annotations << Annotations.new(path,s) next end load_task_from_stream(s,path) end end
# File lib/orocos/log/replay.rb, line 629 def load_task_from_stream(stream,path) #get the name of the task which was logged into the stream task_name = if stream.metadata.has_key? "rock_task_name" stream.metadata["rock_task_name"] else result = stream.name.to_s.match(/^(.*)\./) result[1] if result end if task_name == nil task_name = "unknown" Log.warn "Stream name (#{stream.name}) does not follow the convention TASKNAME.PORTNAME and has no metadata, assuming as TASKNAME \"#{task_name}\"" end task = @tasks[task_name] if !task task = @tasks[task_name]= TaskContext.new(self,task_name, path,@log_config_file) end begin task.add_stream(path,stream) Log.info " loading stream #{stream.name} (#{stream.type_name})" rescue InitializePortError => error Log.warn " loading stream #{stream.name} (#{stream.type_name}) failed. Call the port for an error message." end task end
returns an array of #log_markers
# File lib/orocos/log/replay.rb, line 563 def log_markers @markers ||= Array.new return @markers if !@markers.empty? annotations.each do |annotation| #check if this is the right type if annotation.stream.type_name == "/logger/Annotations" @markers.concat LogMarker::parse(annotation.samples) end end @markers.sort! do |a,b| a.time <=> b.time end @markers end
Tries to find a OutputPort for a specefic data type. For port_name Regexp is allowed. If precise is set to true an error will be raised if more than one port is matching type_name and port_name.
# File lib/orocos/log/replay.rb, line 244 def port_for(type_name, port_name, precise=true) Log.warn "#port_for is deprecated. Use either #find_all_ports or #find_port" if precise find_port(type_name, port_name) else find_all_ports(type_name, port_name) end end
Returns an array of all simulated ports
# File lib/orocos/log/replay.rb, line 554 def ports result = Array.new each_port do |port| result << port end result end
pretty print for Replay
# File lib/orocos/log/replay.rb, line 169 def pretty_print(pp) pp.text "Orocos::Log::Replay" pp.nest(2) do pp.breakable pp.text "replay speed = #{@speed}" pp.breakable pp.text "Markers = #{@markers}" pp.breakable pp.text "TaskContext(s):" @tasks.each_value do |task| pp.breakable task.pretty_print(pp) end pp.breakable pp.text "Stream Annotations:" @annotations.each do |a| pp.breakable a.pretty_print(pp) end end end
replays the last sample to the log port
# File lib/orocos/log/replay.rb, line 624 def refresh index, time, data = @current_sample @replayed_objects[index].write(data) end
registers all replayed log tasks on the local name server
# File lib/orocos/log/replay.rb, line 379 def register_tasks #enable local name service service = if Nameservice.enabled? :Local Nameservice.get :Local else Nameservice.enable :Local end each_task do |task| if task.used? service.registered_tasks[task.name] = task end end end
returns false if no ports are or will be replayed
# File lib/orocos/log/replay.rb, line 156 def replay? #check if stream was initialized if @stream return true else each_port do |port| return true if port.used? end end return false end
Resets the simulated time. This should be called after the replay was paused.
# File lib/orocos/log/replay.rb, line 413 def reset_time_sync @start_time = nil @base_time = nil @actual_speed = 0 @out_of_sync_delta = 0 end
Rewinds all streams and replays the first sample.
# File lib/orocos/log/replay.rb, line 526 def rewind() @stream.rewind step end
Runs through the log files until the end is reached.
# File lib/orocos/log/replay.rb, line 540 def run(time_sync = false,speed=1,&block) @speed = speed while step(time_sync,&block) do end end #Iterates through all simulated ports. def each_port(&block) @tasks.each_value do |task| task.each_port(&block) end end #Returns an array of all simulated ports def ports result = Array.new each_port do |port| result << port end result end #returns an array of log_markers def log_markers @markers ||= Array.new return @markers if !@markers.empty? annotations.each do |annotation| #check if this is the right type if annotation.stream.type_name == "/logger/Annotations" @markers.concat LogMarker::parse(annotation.samples) end end @markers.sort! do |a,b| a.time <=> b.time end @markers end def has_task?(name) @tasks.has_key?(name.to_s) end #Iterates through all simulated tasks def each_task (&block) @tasks.each_value do |task| yield(task) if block_given? end end #Returns the current position of the replayed sample. def sample_index @stream.sample_index end #Returns the number of samples. def size return @stream.size end #Returns the time of the current sample. def time return @stream.time end #Returns true if the end of file is reached. def eof? return @stream.eof? end #Seeks to the given position def seek(pos) @current_sample = @stream.seek(pos) #write all data to the ports 0.upto(@stream.streams.length-1) do |index| data = @stream.single_data(index) #only write samples if they are available if(data) @replayed_objects[index].write(data) end end end #replays the last sample to the log port def refresh index, time, data = @current_sample @replayed_objects[index].write(data) end def load_task_from_stream(stream,path) #get the name of the task which was logged into the stream task_name = if stream.metadata.has_key? "rock_task_name" stream.metadata["rock_task_name"] else result = stream.name.to_s.match(/^(.*)\./) result[1] if result end if task_name == nil task_name = "unknown" Log.warn "Stream name (#{stream.name}) does not follow the convention TASKNAME.PORTNAME and has no metadata, assuming as TASKNAME \"#{task_name}\"" end task = @tasks[task_name] if !task task = @tasks[task_name]= TaskContext.new(self,task_name, path,@log_config_file) end begin task.add_stream(path,stream) Log.info " loading stream #{stream.name} (#{stream.type_name})" rescue InitializePortError => error Log.warn " loading stream #{stream.name} (#{stream.type_name}) failed. Call the port for an error message." end task end # Loads all the streams defined in the provided log file def load_log_file(file, path) Log.info " loading log file #{path}" file.streams.each do |s| if s.metadata["rock_stream_type"] == "annotations" @annotations << Annotations.new(path,s) next end load_task_from_stream(s,path) end end #Loads a log files and creates TaskContexts which simulates the recorded tasks. #You can either specify a single file or a hole directory. If you want to load #more than one directory or file simultaneously you can use an array. def load(*paths) paths.flatten! raise ArgumentError, "No log file was given" if paths.empty? logreg = nil if paths.last.kind_of?(Typelib::Registry) logreg = paths.pop end paths.each do |path| #check if path is a directory path = File.expand_path(path) if File.directory?(path) all_files = Dir.enum_for(:glob, File.join(path, '*.*.log')) by_basename = all_files.inject(Hash.new) do |h, path| split = path.match(/^(.*)\.(\d+)\.log$/) basename, number = split[1], Integer(split[2]) h[basename] ||= Array.new h[basename][number] = path h end by_basename.each_value do |files| args = files.compact.map do |path| File.open(path) end args << logreg logfile = Pocolog::Logfiles.new(*args.compact) load_log_file(logfile, files.first) end elsif File.file?(path) file = Pocolog::Logfiles.open(path, logreg) load_log_file(file, path) else raise ArgumentError, "Can not load log file: #{path} is neither a directory nor a file" end end raise ArgumentError, "Nothing was loded from the following log files #{paths.join("; ")}" if @tasks.empty? end # Clears all reader buffers. # This is usefull if you are changing the replay direction. def clear_reader_buffers @tasks.each_value do |task| task.clear_reader_buffers end end # exports all aligned stream to a new log file # if no start and end index is given all data are exported # otherwise the data are truncated according to the given global indexes # the block is called for each sample to update a progress bar def export_to_file(file,start_index=0,end_index=0,&block) @stream.export_to_file(file,start_index,end_index,&block) end #This is used to support the syntax. #log_replay.task def method_missing(m,*args,&block) #:nodoc: task = @tasks[m.to_s] return task if task begin super(m.to_sym,*args,&block) rescue NoMethodError => e Log.error "#{m} is not a Log::Task of the current log file(s)" Log.error "The following tasks are availabe:" @tasks.each_value do |task| Log.error " #{task.name}" end raise e end end end
# File lib/orocos/log/replay.rb, line 199 def sample_index() return @stream.sample_index if @stream return nil end
# File lib/orocos/log/replay.rb, line 191 def sample_index_for_time(time) prev_pos = sample_index seek(time) target_sample_pos = sample_index seek(prev_pos) return target_sample_pos end
Seeks to the given position
# File lib/orocos/log/replay.rb, line 611 def seek(pos) @current_sample = @stream.seek(pos) #write all data to the ports 0.upto(@stream.streams.length-1) do |index| data = @stream.single_data(index) #only write samples if they are available if(data) @replayed_objects[index].write(data) end end end
# File lib/orocos/log/replay.rb, line 204 def single_data(id) if @stream return @stream.single_data(id) end end
Returns the number of samples.
# File lib/orocos/log/replay.rb, line 596 def size return @stream.size end
Gets the next sample and writes it to the ports which are connected to the OutputPort and updates all its readers.
If #time_sync is set to true the method will wait until the simulated time delta is equal the recorded time delta.
If a block is given it is called this the name of the replayed port.
You can change the replay speed by changing the instance variable speed.
# File lib/orocos/log/replay.rb, line 456 def step(time_sync=false,&block) #check if stream was generated otherwise call align if @stream == nil return align end @current_sample = @stream.step return if !@current_sample index, time, data = @current_sample if getter = (timestamps[data.class.name] || default_timestamp) time = getter[data] end @base_time ||= time @start_time ||= Time.now required_delta = (time - @base_time)/@speed actual_delta = Time.now - @start_time #wait if replay is faster than the desired speed and time_sync is set to true if time_sync while (wait = @time_sync_proc.call(time,actual_delta,required_delta)) > 0.001 #process qt events every 0.01 sec if @process_qt_events == true start_wait = Time.now while true $qApp.processEvents() break if !@start_time #break if start_time was reseted throuh processEvents wait2 =wait -(Time.now - start_wait) if wait2 > 0.001 sleep [0.01,wait2].min else break end end else sleep(wait) end break if !@start_time # if time was resetted go out actual_delta = Time.now - @start_time end actual_delta = @start_time ? Time.now - @start_time : required_delta @out_of_sync_delta = required_delta - actual_delta end @actual_speed = required_delta/actual_delta*@speed #write sample to simulated ports or properties @replayed_objects[index].write(data) yield(@replayed_objects[index],data) if block_given? @current_sample end
Gets the previous sample and writes it to the ports which are connected to the OutputPort and updated its readers (see step).
# File lib/orocos/log/replay.rb, line 508 def step_back(time_sync=false,&block) #check if stream was generated otherwise call start if @stream == nil start return end @current_sample = @stream.step_back return nil if @current_sample == nil index, time, data = @current_sample #write sample to connected ports @replayed_objects[index].write(data) yield(@replayed_objects[index],data) if block_given? return @current_sample end
# File lib/orocos/log/replay.rb, line 393 def stream_index_for_name(name) if @stream return @stream.stream_index_for_name(name) end throw "Stream is not initialized yet" end
# File lib/orocos/log/replay.rb, line 400 def stream_index_for_type(name) if @stream return @stream.stream_index_for_type(name) end throw "Stream is not initialized yet" end
Returns the simulated task with the given namen.
# File lib/orocos/log/replay.rb, line 294 def task(name) raise "cannot find TaskContext which is called #{name}" if !@tasks.has_key?(name) return @tasks[name] end
returns an array of all simulated tasks
# File lib/orocos/log/replay.rb, line 146 def tasks @tasks.values end
returns the time of the current sample replayed
# File lib/orocos/log/replay.rb, line 151 def time @base_time end
Returns where from the time used for alignment should be taken. It can be one of
use the time at which the logger received the data (“logical time”)
for streams whose data contains a field called “time” of type base/Time (from Rock’s base/types package), use the time contained in that field. Otherwise, use the logical time.
See #use_sample_time, #use_sample_time
# File lib/orocos/log/replay.rb, line 93 def time_source if use_sample_time return :use_sample_time else return false end end
this can be used to set a different time sync logic the code block has three parameters time = current sample time actual_delta = time between start of replay and now required_delta = time which should have elapsed between start and now to replay at the desired speed the code block must return the number of seconds which the replay shall wait before the sample is repalyed
Do not block the program otherwise qt events are no longer processed!!!
Example #time_sync do |time,actual_delta,required_delta|
my_object.busy? ? 1 : 0
end
# File lib/orocos/log/replay.rb, line 436 def time_sync(&block) if block_given? @time_sync_proc = block else @time_sync_proc = Proc.new do |time,actual_delta,required_delta| required_delta - actual_delta end end end
Sets a code block for a special type to calculate the timestamp during repaly.
# File lib/orocos/log/replay.rb, line 218 def timestamp(type_name, &block) timestamps[type_name] = block end
If set to true all ports are replayed and are not filtered out by the filter otherwise only ports are replayed which have a reader or a connection to an other port
# File lib/orocos/log/replay.rb, line 226 def track(value,filter=Hash.new) options, filter = Kernel::filter_options(filter,[:tasks]) @tasks.each_value do |task| task.track(value,filter) if !options.has_key?(:tasks) || task.name =~ options[:tasks] end end