SND@LHC Software
Loading...
Searching...
No Matches
run_Monitoring.py
Go to the documentation of this file.
1#!/usr/bin/env python
2import ROOT,os,sys,subprocess,atexit,time
3import rootUtils as ut
4from XRootD import client
5from XRootD.client.flags import DirListFlags, OpenFlags, MkDirFlags, QueryCode
6import Monitor
7import Scifi_monitoring
8import Mufi_monitoring
9import DAQ_monitoring
10import EventDisplay_Task
11import SndlhcMuonReco
12
13def pyExit():
14 print("Make suicide until solution found for freezing")
15 os.system('kill '+str(os.getpid()))
16atexit.register(pyExit)
17
18ROOT.gStyle.SetTitleSize(0.045, "XYZ")
19ROOT.gStyle.SetLabelSize(0.045, "XYZ")
20ROOT.gROOT.ForceStyle()
21
22from argparse import ArgumentParser
23
24parser = ArgumentParser()
25parser.add_argument("-A", "--auto", dest="auto", help="run in auto mode online monitoring",default=False,action='store_true')
26parser.add_argument("--Nupdate", dest="Nupdate", help="frequence of updating online plots",default=100,type=int)
27parser.add_argument("--Nlast", dest="Nlast", help="last N events to analyze on file",default=10,type=int)
28parser.add_argument("--Cosmics", dest="cosmics", help="use default data stream if no beam",action='store_true',default=False)
29parser.add_argument("--sudo", dest="sudo", help="update files on EOS",default=False,action='store_true')
30
31parser.add_argument("-M", "--online", dest="online", help="online mode",default=False,action='store_true')
32parser.add_argument("--batch", dest="batch", help="batch mode",default=False,action='store_true')
33parser.add_argument("--server", dest="server", help="xrootd server",default=os.environ["EOSSHIP"])
34parser.add_argument("-r", "--runNumber", dest="runNumber", help="run number", type=int,default=-1)
35parser.add_argument("-p", "--path", dest="path", help="path to converted data",required=False,default="")
36parser.add_argument("-praw", dest="rawDataPath", help="path to raw data",required=False,default=False)
37parser.add_argument("-P", "--partition", dest="partition", help="partition of data", type=int,required=False,default=-1)
38parser.add_argument("-d", "--Debug", dest="debug", help="debug", default=False)
39parser.add_argument("-cpp", "--convRawCPP", action='store_true', dest="FairTask_convRaw", help="convert raw data using ConvRawData FairTask", default=False)
40parser.add_argument( "--withCalibration", action='store_true', dest="makeCalibration", help="make QDC and TDC calibration, not taking from raw data", default=False)
41
42parser.add_argument("-f", "--inputFile", dest="fname", help="file name for MC", type=str,default=None,required=False)
43parser.add_argument("-g", "--geoFile", dest="geoFile", help="geofile", required=False,default=False)
44parser.add_argument("-b", "--heartBeat", dest="heartBeat", help="heart beat", default=10000,type=int)
45parser.add_argument("-c", "--command", dest="command", help="command", default="")
46parser.add_argument("-n", "--nEvents", dest="nEvents", help="number of events", default=-1,type=int)
47parser.add_argument("-s", "--nStart", dest="nStart", help="first event", default=0,type=int)
48parser.add_argument("-t", "--trackType", dest="trackType", help="DS or Scifi", default="DS")
49
50parser.add_argument("--ScifiNbinsRes", dest="ScifiNbinsRes", default=100)
51parser.add_argument("--Scifixmin", dest="Scifixmin", default=-2000.)
52parser.add_argument("--ScifialignPar", dest="ScifialignPar", default=False)
53parser.add_argument("--ScifiResUnbiased", dest="ScifiResUnbiased", default=False)
54parser.add_argument("--Mufixmin", dest="Mufixmin", default=-10.)
55parser.add_argument("--ScifiStationMasked", dest="ScifiStationMasked", default="-9")
56
57parser.add_argument("--goodEvents", dest="goodEvents", action='store_true',default=False)
58parser.add_argument("--withTrack", dest="withTrack", action='store_true',default=False)
59parser.add_argument("--nTracks", dest="nTracks",default=0,type=int)
60parser.add_argument("--save", dest="save", action='store_true',default=False)
61parser.add_argument("--sH", dest="saveHistos", action='store_true',default=False,help="save all histos not only TCanvas")
62parser.add_argument("--interactive", dest="interactive", action='store_true',default=False)
63
64parser.add_argument("--parallel", dest="parallel",default=1,type=int)
65
66parser.add_argument("--postScale", dest="postScale",help="post scale events, 1..10..100", default=-1,type=int)
67
68parser.add_argument("--saveTo", dest="saveTo", help="output storage path", default="")
69
70options = parser.parse_args()
71options.slowStream = True
72if options.cosmics: options.slowStream = False
73options.startTime = ""
74options.dashboard = "/mnt/raid10/data_online/run_status.json"
75options.monitorTag = ''
76if (options.auto and not options.interactive) or options.batch: ROOT.gROOT.SetBatch(True)
77
78# if no geofile given, use defaults according to run number
79if options.runNumber < 0 and not options.geoFile:
80 print('No run number given and no geoFile. Do not know what to do. Exit.')
81 exit()
82#RUN0: 7 Apr 2022 - 26 Jul 2022 (Run 4575 started - test run after replacing emulsions -Ettore)
83#RUN1: 26 Jul 2022 - 13 Sept 2022 (Run 4855 September 14)
84#RUN2: 13 Sept 2022 - 4 Nov 2022 (Run 5172 test run after emulsion replacement)
85#RUN3: 4 Nov 2022 -
86
87if not options.geoFile:
88 if options.path.find('TI18')<0:
89 if options.path.find('2022')>0 : options.geoFile = "geofile_sndlhc_TI18_V0_2022.root"
90 if options.path.find('2023')>0 : options.geoFile = "geofile_sndlhc_TI18_V1_2023.root"
91 if options.path.find('2024')>0 : options.geoFile = "geofile_sndlhc_TI18_V0_2024.root"
92 else:
93 if options.runNumber < 4575:
94 options.geoFile = "geofile_sndlhc_TI18_V3_08August2022.root"
95 elif options.runNumber < 4855:
96 options.geoFile = "geofile_sndlhc_TI18_V5_14August2022.root"
97 elif options.runNumber < 5172:
98 options.geoFile = "geofile_sndlhc_TI18_V6_08October2022.root"
99 else:
100 options.geoFile = "geofile_sndlhc_TI18_V7_22November2022.root"
101
102# to be extended for future new alignments.
103
105 with client.File() as f:
106 f.open(options.server+options.dashboard)
107 status, L = f.read()
108 Lcrun = L.decode().split('\n')
109 f.close()
110 curRun,curPart,start ="","",""
111 X=eval(Lcrun[0])
112 if not (X['state']=='running'):
113 print("DAQ not running.",X)
114 else:
115 curRun = 'run_'+str(X['run_number']).zfill(6)
116 curFile = X['currently_written_file']
117 k = curFile.rfind('data_')+5
118 curPart = int(curFile[k:k+4])
119 start = X['start_time']
120 # assume monitor file always present on DAQ server
121 if options.slowStream: options.monitorTag = 'monitoring_'
122 else: options.monitorTag = ''
123 return curRun,curPart,start
124
125if options.auto:
126 options.online = True
127# search for current run
128 if options.runNumber < 0:
129 curRun = ""
130 while curRun.find('run') < 0:
131 curRun,curPart,options.startTime = currentRun()
132 if curRun.find('run') < 0:
133 print("no run info, sleep 30sec.",time.ctime())
134 time.sleep(30)
135 options.runNumber = int(curRun.split('_')[1])
136 lastRun = curRun
137 if options.slowStream: options.partition = 0 # monitoring file set to data_0000.root int(curPart.split('_')[1].split('.')[0])
138 else: options.partition = int(curPart.split('_')[1].split('.')[0])
139else:
140 if options.runNumber < 0:
141 print("run number required for non-auto mode")
142 os._exit(1)
143 if options.rawDataPath: rawDataPath = options.rawDataPath
144# works only for runs on EOS
145 if not options.server.find('eos')<0:
146 if options.rawDataPath: rawDataPath = options.rawDataPath
147 elif options.path.find('2025')>0:
148 rawDataPath = "/eos/experiment/sndlhc/raw_data/physics/2025/run_251"
149 elif options.path.find('2024')>0:
150 rawDataPath = "/eos/experiment/sndlhc/raw_data/physics/2024/run_241"
151 elif options.path.find('2023')>0:
152 rawDataPath = "/eos/experiment/sndlhc/raw_data/physics/2023/"
153 elif options.path.find('2022')>0:
154 rawDataPath = "/eos/experiment/sndlhc/raw_data/physics/2022/"
155 else:
156 rawDataPath = "/eos/experiment/sndlhc/raw_data/commissioning/TI18/data/"
157 options.rawDataPath = rawDataPath
158 runDir = rawDataPath+"run_"+str(options.runNumber).zfill(6)
159 jname = "run_timestamps.json"
160 dirlist = str( subprocess.check_output("xrdfs "+os.environ['EOSSHIP']+" ls "+runDir,shell=True) )
161 if jname in dirlist:
162 with client.File() as f:
163 f.open(options.server+runDir+"/run_timestamps.json")
164 status, jsonStr = f.read()
165 exec("date = "+jsonStr.decode())
166 options.startTime = date['start_time'].replace('Z','')
167 if 'stop_time' in date:
168 options.startTime += " - "+ date['stop_time'].replace('Z','')
169
170# prepare tasks:
171FairTasks = []
172houghTransform = False # under construction, not yet tested
173if houghTransform:
174 muon_reco_task = SndlhcMuonReco.MuonReco()
175 muon_reco_task.SetName("houghTransform")
176 FairTasks.append(muon_reco_task)
177else:
178 import SndlhcTracking
180 trackTask.SetName('simpleTracking')
181 FairTasks.append(trackTask)
182
183M = Monitor.Monitoring(options,FairTasks)
184if options.nEvents < 0 : options.nEvents = M.GetEntries()
185if options.postScale==0 and options.nEvents>100E6: options.postScale = 100
186if options.postScale==0 and options.nEvents>10E6: options.postScale = 10
187print('using postScale ',options.postScale,' for run ',options.runNumber)
188monitorTasks = {}
189if not options.fname:
190 monitorTasks['daq'] = DAQ_monitoring.DAQ_boards()
191 monitorTasks['rates'] = DAQ_monitoring.Time_evolution()
192monitorTasks['Scifi_hitMaps'] = Scifi_monitoring.Scifi_hitMaps()
193monitorTasks['Mufi_hitMaps'] = Mufi_monitoring.Mufi_hitMaps()
194monitorTasks['Mufi_QDCcorellations'] = Mufi_monitoring.Mufi_largeVSsmall()
195if options.postScale<2: monitorTasks['Veto_Efficiency'] = Mufi_monitoring.Veto_Efficiency()
196#monitorTasks['Scifi_residuals'] = Scifi_monitoring.Scifi_residuals() # time consuming
197if options.interactive: monitorTasks['EventDisplay'] = EventDisplay_Task.twod()
198
199for m in monitorTasks:
200 monitorTasks[m].Init(options,M)
201
202# monitorTasks['Veto_Efficiency'].debug = True
203
204if not options.auto: # default online/offline mode
205 process = []
206 pid = 0
207 for i in range(options.parallel):
208 if options.parallel==1:
209 nstart,nstop = options.nStart,options.nStart+options.nEvents
210 else:
211 try:
212 pid = os.fork()
213 except OSError:
214 print("Could not create a child process")
215 print('pid',pid,i)
216 if pid!=0:
217 process.append(pid)
218 else:
219 dN = options.nEvents//options.parallel
220 nstart = i*dN
221 nstop = nstart + dN
222 if i==(options.parallel-1): nstop = options.nEvents
223 if pid == 0:
224 print('start ',i,nstart,nstop)
225 Tcounter = {'Monitor':0}
226 for m in monitorTasks:
227 Tcounter[m] = 0
228
229 for n in range(nstart,nstop):
230 if options.postScale>1:
231 if ROOT.gRandom.Rndm()>1./options.postScale: continue
232 tic = time.perf_counter_ns()
233 event = M.GetEvent(n)
234 toc = time.perf_counter_ns()
235 Tcounter['Monitor']+=toc-tic
236
237 if not options.online:
238 if n%options.heartBeat == 0:
239 print("--> run/event nr: %i %i %5.2F%%"%(M.eventTree.EventHeader.GetRunId(),n,(n-nstart)/(nstop-nstart)*100))
240# assume for the moment file does not contain fitted tracks
241 for m in monitorTasks:
242 tic = time.perf_counter_ns()
243 monitorTasks[m].ExecuteEvent(M.eventTree)
244 toc = time.perf_counter_ns()
245 Tcounter[m]+=toc-tic
246 for m in monitorTasks:
247 monitorTasks[m].Plot()
248 txt = ''
249 for x in Tcounter: txt+=x+':%5.1Fs '%(Tcounter[x]/1E9)
250 print('timing performance:',txt)
251 if options.parallel>1: # save partitions
252 ut.writeHists(M.h,'tmp'+str(options.runNumber)+'p'+str(i))
253 exit(0)
254 if options.parallel>1:
255 while process:
256 pid,exit_code = os.wait()
257 if pid == 0: time.sleep(100)
258 else:
259 print('child process has finished',len(process)-1,pid,exit_code)
260 process.remove(pid)
261 for i in range(options.parallel):
262 tmp = 'tmp'+str(options.runNumber)+'p'+str(i)
263 if tmp in os.listdir('.'): ut.readHists(M.h,tmp)
264 else: print('file missing ',tmp)
265 M.presenterFile.Close()
266 if options.saveTo=="":
267 name = 'run'+self.runNr+'.root'
268 else:
269 name = options.saveTo+'run'+M.runNr+'_'+str(options.nStart//1000000)+'.root'
270 M.presenterFile = ROOT.TFile(name,'update')
271
272 for m in monitorTasks:
273 monitorTasks[m].Plot()
274 # check if all events had been processed
275 if 'Etime' in M.h:
276 if not M.h['Etime'].GetEntries() == options.nEvents:
277 print('event count failed! Processed:',M.h['Etime'].GetEntries(),' total number of events:',options.nEvents)
278 else:
279 print('i am finished, all events processed')
280 if options.saveHistos: ut.writeHists(M.h,'allHistos-run'+M.runNr+'.root')
281
282 M.publishRootFile()
283 if options.sudo:
284 print(options.runNumber,options.startTime)
285 options.startTime += " #events="+str(options.nEvents)
286 M.updateHtml()
287else:
288 """ auto mode
289 check for open data file on the online machine
290 analyze N events
291 re-open file and check for new events,
292 if none available,
293 check every 5 seconds: if no new file re-open again
294 if new file, finish run, publish histograms, and restart with new file
295 """
296 N0 = 0
297 if options.slowStream: lastPart = 0 # reading from second low speed DAQ stream tmp[len(tmp)-1]
298 nLast = options.nEvents
299 nStart = nLast-options.Nlast
300 if not options.interactive and options.sudo: M.updateHtml()
301 lastFile = M.converter.fiN.GetName()
302 if not options.slowStream:
303 tmp = lastFile.split('/')
304 lastRun = tmp[len(tmp)-2].replace('monitoring_','')
305 lastPart = tmp[len(tmp)-1]
306
307 M.updateSource(lastFile)
308 while 1>0:
309 for n in range(nStart,nLast):
310 event = M.GetEvent(n)
311 N0+=1
312 # trackTask.event = event
313 for m in monitorTasks:
314 monitorTasks[m].ExecuteEvent(M.eventTree)
315# update plots
316 if N0%options.Nupdate==0 or N0==int(options.Nupdate/10):
317 for m in monitorTasks:
318 monitorTasks[m].Plot()
319 if options.sudo: M.publishRootFile()
320
321 M.updateSource(lastFile)
322 newEntries = M.GetEntries()
323 if newEntries>nLast:
324 nStart = max(nLast,newEntries-options.Nlast)
325 nLast = newEntries
326 else:
327 # check if file has changed
328 curRun,curPart,options.startTime = currentRun()
329 while curRun.find('run') < 0:
330 curRun,curPart,options.startTime = currentRun()
331 if curRun.find('run') < 0:
332 print("sleep 300sec.",curRun,time.ctime())
333 time.sleep(300)
334 if not curRun == lastRun:
335 for m in monitorTasks:
336 if not options.interactive: monitorTasks[m].Plot()
337 print("run ",lastRun," has finished.")
338 quit() # reinitialize everything with new run number
339 if not curPart == lastPart and not options.slowStream:
340 lastPart = curPart
341 lastFile = options.server+options.path+options.monitorTag+lastRun+"/"+ lastPart
342 M.converter.fiN = ROOT.TFile.Open(lastFile)
343 else:
344 time.sleep(10) # sleep 10 seconds and check for new events
345 print('DAQ inactive for 10sec. Last event = ',M.GetEntries(), curRun,curPart,N0)
346 nStart = nLast