2import ROOT,os,sys,subprocess,atexit,time
4from XRootD
import client
5from XRootD.client.flags
import DirListFlags, OpenFlags, MkDirFlags, QueryCode
10import EventDisplay_Task
14 print(
"Make suicide until solution found for freezing")
15 os.system(
'kill '+
str(os.getpid()))
16atexit.register(pyExit)
18ROOT.gStyle.SetTitleSize(0.045,
"XYZ")
19ROOT.gStyle.SetLabelSize(0.045,
"XYZ")
20ROOT.gROOT.ForceStyle()
22from argparse
import ArgumentParser
24parser = ArgumentParser()
25parser.add_argument(
"-A",
"--auto", dest=
"auto", help=
"run in auto mode online monitoring",default=
False,action=
'store_true')
26parser.add_argument(
"--Nupdate", dest=
"Nupdate", help=
"frequence of updating online plots",default=100,type=int)
27parser.add_argument(
"--Nlast", dest=
"Nlast", help=
"last N events to analyze on file",default=10,type=int)
28parser.add_argument(
"--Cosmics", dest=
"cosmics", help=
"use default data stream if no beam",action=
'store_true',default=
False)
29parser.add_argument(
"--sudo", dest=
"sudo", help=
"update files on EOS",default=
False,action=
'store_true')
31parser.add_argument(
"-M",
"--online", dest=
"online", help=
"online mode",default=
False,action=
'store_true')
32parser.add_argument(
"--batch", dest=
"batch", help=
"batch mode",default=
False,action=
'store_true')
33parser.add_argument(
"--server", dest=
"server", help=
"xrootd server",default=os.environ[
"EOSSHIP"])
34parser.add_argument(
"-r",
"--runNumber", dest=
"runNumber", help=
"run number", type=int,default=-1)
35parser.add_argument(
"-p",
"--path", dest=
"path", help=
"path to converted data",required=
False,default=
"")
36parser.add_argument(
"-praw", dest=
"rawDataPath", help=
"path to raw data",required=
False,default=
False)
37parser.add_argument(
"-P",
"--partition", dest=
"partition", help=
"partition of data", type=int,required=
False,default=-1)
38parser.add_argument(
"-d",
"--Debug", dest=
"debug", help=
"debug", default=
False)
39parser.add_argument(
"-cpp",
"--convRawCPP", action=
'store_true', dest=
"FairTask_convRaw", help=
"convert raw data using ConvRawData FairTask", default=
False)
40parser.add_argument(
"--withCalibration", action=
'store_true', dest=
"makeCalibration", help=
"make QDC and TDC calibration, not taking from raw data", default=
False)
42parser.add_argument(
"-f",
"--inputFile", dest=
"fname", help=
"file name for MC", type=str,default=
None,required=
False)
43parser.add_argument(
"-g",
"--geoFile", dest=
"geoFile", help=
"geofile", required=
False,default=
False)
44parser.add_argument(
"-b",
"--heartBeat", dest=
"heartBeat", help=
"heart beat", default=10000,type=int)
45parser.add_argument(
"-c",
"--command", dest=
"command", help=
"command", default=
"")
46parser.add_argument(
"-n",
"--nEvents", dest=
"nEvents", help=
"number of events", default=-1,type=int)
47parser.add_argument(
"-s",
"--nStart", dest=
"nStart", help=
"first event", default=0,type=int)
48parser.add_argument(
"-t",
"--trackType", dest=
"trackType", help=
"DS or Scifi", default=
"DS")
50parser.add_argument(
"--ScifiNbinsRes", dest=
"ScifiNbinsRes", default=100)
51parser.add_argument(
"--Scifixmin", dest=
"Scifixmin", default=-2000.)
52parser.add_argument(
"--ScifialignPar", dest=
"ScifialignPar", default=
False)
53parser.add_argument(
"--ScifiResUnbiased", dest=
"ScifiResUnbiased", default=
False)
54parser.add_argument(
"--Mufixmin", dest=
"Mufixmin", default=-10.)
55parser.add_argument(
"--ScifiStationMasked", dest=
"ScifiStationMasked", default=
"-9")
57parser.add_argument(
"--goodEvents", dest=
"goodEvents", action=
'store_true',default=
False)
58parser.add_argument(
"--withTrack", dest=
"withTrack", action=
'store_true',default=
False)
59parser.add_argument(
"--nTracks", dest=
"nTracks",default=0,type=int)
60parser.add_argument(
"--save", dest=
"save", action=
'store_true',default=
False)
61parser.add_argument(
"--sH", dest=
"saveHistos", action=
'store_true',default=
False,help=
"save all histos not only TCanvas")
62parser.add_argument(
"--interactive", dest=
"interactive", action=
'store_true',default=
False)
64parser.add_argument(
"--parallel", dest=
"parallel",default=1,type=int)
66parser.add_argument(
"--postScale", dest=
"postScale",help=
"post scale events, 1..10..100", default=-1,type=int)
68parser.add_argument(
"--saveTo", dest=
"saveTo", help=
"output storage path", default=
"")
70options = parser.parse_args()
71options.slowStream =
True
72if options.cosmics: options.slowStream =
False
74options.dashboard =
"/mnt/raid10/data_online/run_status.json"
75options.monitorTag =
''
76if (options.auto
and not options.interactive)
or options.batch: ROOT.gROOT.SetBatch(
True)
79if options.runNumber < 0
and not options.geoFile:
80 print(
'No run number given and no geoFile. Do not know what to do. Exit.')
87if not options.geoFile:
88 if options.path.find(
'TI18')<0:
89 if options.path.find(
'2022')>0 : options.geoFile =
"geofile_sndlhc_TI18_V0_2022.root"
90 if options.path.find(
'2023')>0 : options.geoFile =
"geofile_sndlhc_TI18_V1_2023.root"
91 if options.path.find(
'2024')>0 : options.geoFile =
"geofile_sndlhc_TI18_V0_2024.root"
93 if options.runNumber < 4575:
94 options.geoFile =
"geofile_sndlhc_TI18_V3_08August2022.root"
95 elif options.runNumber < 4855:
96 options.geoFile =
"geofile_sndlhc_TI18_V5_14August2022.root"
97 elif options.runNumber < 5172:
98 options.geoFile =
"geofile_sndlhc_TI18_V6_08October2022.root"
100 options.geoFile =
"geofile_sndlhc_TI18_V7_22November2022.root"
105 with client.File()
as f:
106 f.open(options.server+options.dashboard)
108 Lcrun = L.decode().split(
'\n')
110 curRun,curPart,start =
"",
"",
""
112 if not (X[
'state']==
'running'):
113 print(
"DAQ not running.",X)
115 curRun =
'run_'+
str(X[
'run_number']).zfill(6)
116 curFile = X[
'currently_written_file']
117 k = curFile.rfind(
'data_')+5
118 curPart =
int(curFile[k:k+4])
119 start = X[
'start_time']
121 if options.slowStream: options.monitorTag =
'monitoring_'
122 else: options.monitorTag =
''
123 return curRun,curPart,start
126 options.online =
True
128 if options.runNumber < 0:
130 while curRun.find(
'run') < 0:
131 curRun,curPart,options.startTime =
currentRun()
132 if curRun.find(
'run') < 0:
133 print(
"no run info, sleep 30sec.",time.ctime())
135 options.runNumber =
int(curRun.split(
'_')[1])
137 if options.slowStream: options.partition = 0
138 else: options.partition =
int(curPart.split(
'_')[1].split(
'.')[0])
140 if options.runNumber < 0:
141 print(
"run number required for non-auto mode")
143 if options.rawDataPath: rawDataPath = options.rawDataPath
145 if not options.server.find(
'eos')<0:
146 if options.rawDataPath: rawDataPath = options.rawDataPath
147 elif options.path.find(
'2025')>0:
148 rawDataPath =
"/eos/experiment/sndlhc/raw_data/physics/2025/run_251"
149 elif options.path.find(
'2024')>0:
150 rawDataPath =
"/eos/experiment/sndlhc/raw_data/physics/2024/run_241"
151 elif options.path.find(
'2023')>0:
152 rawDataPath =
"/eos/experiment/sndlhc/raw_data/physics/2023/"
153 elif options.path.find(
'2022')>0:
154 rawDataPath =
"/eos/experiment/sndlhc/raw_data/physics/2022/"
156 rawDataPath =
"/eos/experiment/sndlhc/raw_data/commissioning/TI18/data/"
157 options.rawDataPath = rawDataPath
158 runDir = rawDataPath+
"run_"+
str(options.runNumber).zfill(6)
159 jname =
"run_timestamps.json"
160 dirlist =
str( subprocess.check_output(
"xrdfs "+os.environ[
'EOSSHIP']+
" ls "+runDir,shell=
True) )
162 with client.File()
as f:
163 f.open(options.server+runDir+
"/run_timestamps.json")
164 status, jsonStr = f.read()
165 exec(
"date = "+jsonStr.decode())
166 options.startTime = date[
'start_time'].replace(
'Z',
'')
167 if 'stop_time' in date:
168 options.startTime +=
" - "+ date[
'stop_time'].replace(
'Z',
'')
172houghTransform =
False
175 muon_reco_task.SetName(
"houghTransform")
176 FairTasks.append(muon_reco_task)
178 import SndlhcTracking
180 trackTask.SetName(
'simpleTracking')
181 FairTasks.append(trackTask)
184if options.nEvents < 0 : options.nEvents = M.GetEntries()
185if options.postScale==0
and options.nEvents>100E6: options.postScale = 100
186if options.postScale==0
and options.nEvents>10E6: options.postScale = 10
187print(
'using postScale ',options.postScale,
' for run ',options.runNumber)
199for m
in monitorTasks:
200 monitorTasks[m].Init(options,M)
207 for i
in range(options.parallel):
208 if options.parallel==1:
209 nstart,nstop = options.nStart,options.nStart+options.nEvents
214 print(
"Could not create a child process")
219 dN = options.nEvents//options.parallel
222 if i==(options.parallel-1): nstop = options.nEvents
224 print(
'start ',i,nstart,nstop)
225 Tcounter = {
'Monitor':0}
226 for m
in monitorTasks:
229 for n
in range(nstart,nstop):
230 if options.postScale>1:
231 if ROOT.gRandom.Rndm()>1./options.postScale:
continue
232 tic = time.perf_counter_ns()
233 event = M.GetEvent(n)
234 toc = time.perf_counter_ns()
235 Tcounter[
'Monitor']+=toc-tic
237 if not options.online:
238 if n%options.heartBeat == 0:
239 print(
"--> run/event nr: %i %i %5.2F%%"%(M.eventTree.EventHeader.GetRunId(),n,(n-nstart)/(nstop-nstart)*100))
241 for m
in monitorTasks:
242 tic = time.perf_counter_ns()
243 monitorTasks[m].ExecuteEvent(M.eventTree)
244 toc = time.perf_counter_ns()
246 for m
in monitorTasks:
247 monitorTasks[m].Plot()
249 for x
in Tcounter: txt+=x+
':%5.1Fs '%(Tcounter[x]/1E9)
250 print(
'timing performance:',txt)
251 if options.parallel>1:
252 ut.writeHists(M.h,
'tmp'+
str(options.runNumber)+
'p'+
str(i))
254 if options.parallel>1:
256 pid,exit_code = os.wait()
257 if pid == 0: time.sleep(100)
259 print(
'child process has finished',len(process)-1,pid,exit_code)
261 for i
in range(options.parallel):
262 tmp =
'tmp'+
str(options.runNumber)+
'p'+
str(i)
263 if tmp
in os.listdir(
'.'): ut.readHists(M.h,tmp)
264 else: print(
'file missing ',tmp)
265 M.presenterFile.Close()
266 if options.saveTo==
"":
267 name =
'run'+self.runNr+
'.root'
269 name = options.saveTo+
'run'+M.runNr+
'_'+
str(options.nStart//1000000)+
'.root'
270 M.presenterFile = ROOT.TFile(name,
'update')
272 for m
in monitorTasks:
273 monitorTasks[m].Plot()
276 if not M.h[
'Etime'].GetEntries() == options.nEvents:
277 print(
'event count failed! Processed:',M.h[
'Etime'].GetEntries(),
' total number of events:',options.nEvents)
279 print(
'i am finished, all events processed')
280 if options.saveHistos: ut.writeHists(M.h,
'allHistos-run'+M.runNr+
'.root')
284 print(options.runNumber,options.startTime)
285 options.startTime +=
" #events="+
str(options.nEvents)
289 check for open data file on the online machine
291 re-open file and check for new events,
293 check every 5 seconds: if no new file re-open again
294 if new file, finish run, publish histograms, and restart with new file
297 if options.slowStream: lastPart = 0
298 nLast = options.nEvents
299 nStart = nLast-options.Nlast
300 if not options.interactive
and options.sudo: M.updateHtml()
301 lastFile = M.converter.fiN.GetName()
302 if not options.slowStream:
303 tmp = lastFile.split(
'/')
304 lastRun = tmp[len(tmp)-2].replace(
'monitoring_',
'')
305 lastPart = tmp[len(tmp)-1]
307 M.updateSource(lastFile)
309 for n
in range(nStart,nLast):
310 event = M.GetEvent(n)
313 for m
in monitorTasks:
314 monitorTasks[m].ExecuteEvent(M.eventTree)
316 if N0%options.Nupdate==0
or N0==
int(options.Nupdate/10):
317 for m
in monitorTasks:
318 monitorTasks[m].Plot()
319 if options.sudo: M.publishRootFile()
321 M.updateSource(lastFile)
322 newEntries = M.GetEntries()
324 nStart = max(nLast,newEntries-options.Nlast)
328 curRun,curPart,options.startTime =
currentRun()
329 while curRun.find(
'run') < 0:
330 curRun,curPart,options.startTime =
currentRun()
331 if curRun.find(
'run') < 0:
332 print(
"sleep 300sec.",curRun,time.ctime())
334 if not curRun == lastRun:
335 for m
in monitorTasks:
336 if not options.interactive: monitorTasks[m].Plot()
337 print(
"run ",lastRun,
" has finished.")
339 if not curPart == lastPart
and not options.slowStream:
341 lastFile = options.server+options.path+options.monitorTag+lastRun+
"/"+ lastPart
342 M.converter.fiN = ROOT.TFile.Open(lastFile)
345 print(
'DAQ inactive for 10sec. Last event = ',M.GetEntries(), curRun,curPart,N0)