SND@LHC Software
Loading...
Searching...
No Matches
runProd.py
Go to the documentation of this file.
1import os,subprocess,time,multiprocessing,psutil
2import pwd
3import ROOT
4ncpus = multiprocessing.cpu_count() - 2
5
6""" input runlist
7 for each run in runlist. look for number of partitions
8 for each partition, start conversion job.
9 check consistency of output file
10 copy optional file to EOS
11 run as many jobs in parallel as cpus are available
12"""
13
14def delProcesses(pname):
15 username = pwd.getpwuid(os.getuid()).pw_name
16 callstring = "ps -f -u " + username
17 status = subprocess.check_output(callstring,shell=True)
18 for x in str(status).split("\\n"):
19 if not x.find(pname)<0:
20 first = True
21 for pid in x.split(' '):
22 if first:
23 first = False
24 continue
25 if pid=='':continue
26 os.system('kill '+pid)
27 break
28
30
31 def Init(self,options):
32 self.Mext = ''
33 if options.FairTask_convRaw:
34 self.Mext = '_CPP'
35 self.options = options
36 self.runNrs = []
37 def count_python_processes(self,macroName):
38 username = pwd.getpwuid(os.getuid()).pw_name
39 callstring = "ps -f -u " + username
40# only works if screen is wide enough to print full name!
41 status = subprocess.check_output(callstring,shell=True)
42 n=0
43 for x in status.decode().split('\n'):
44 if not x.find(macroName)<0 and not x.find('python') <0: n+=1
45 return n
46 def list_of_runs(self,macroName):
47 lpruns = [[],[]]
48 username = pwd.getpwuid(os.getuid()).pw_name
49 callstring = "ps -f -u " + username
50 status = subprocess.check_output(callstring,shell=True)
51 for x in status.decode().split('\n'):
52 if not x.find(macroName)<0 and not x.find('python') <0:
53 i = x.find('-r')
54 k = x.find('-P')
55 if i > 0:
56 r = int(x[i+3:].split(' ')[0])
57 lpruns[0].append(r)
58 if k>0:
59 p = int(x[k+3:].split(' ')[0])
60 lpruns[1].append(r*10000+p)
61 return lpruns
62
63 def getPartitions(self,runList,path):
64 partitions = {}
65 for r in runList:
66 directory = path+"run_"+str(r).zfill(6)
67 dirlist = str( subprocess.check_output("xrdfs "+options.server+" ls "+directory,shell=True) )
68 partitions[r] = []
69 for x in dirlist.split('\\n'):
70 if x.find('.root')<0: continue
71 ix = x.rfind('data_')
72 if ix<0: continue
73 partitions[r].append( int(x[ix+5:ix+9]) )
74 return partitions
75
76 def convert(self,runList,path,partitions={}):
77 if len(partitions)==0: partitions = self.getPartitions(runList,path)
78 for r in partitions:
79 runNr = str(r).zfill(6)
80 # find partitions
81 for p in partitions[r]:
82 self.runSinglePartition(path,runNr,str(p).zfill(4),EOScopy=True,check=True)
83
84 def getRunNrFromOffline(self,rMin=-1,rMax=99999):
85 self.dqDataFiles = []
86 hList = str( subprocess.check_output("xrdfs "+os.environ['EOSSHIP']+" ls /eos/experiment/sndlhc/www/offline",shell=True) )
87 for x in hList.split('\\n'):
88 if x.find('root')<0: continue
89 run = x.split('/')[-1]
90 if run.find('run')!=0: continue
91 r = int(run[3:9])
92 if r>=rMin and r<=rMax: self.dqDataFiles.append(r)
93 def runDataQuality(self,latest):
94 monitorCommand = "python $SNDSW_ROOT/shipLHC/scripts/run_Monitoring.py -r XXXX --server=$EOSSHIP \
95 -b 100000 -p "+pathConv+" -praw "+path+" -g GGGG "\
96 +" --postScale "+str(options.postScale)+ " --ScifiResUnbiased 1 --batch --sudo "
97 if options.parallel>1: monitorCommand += " --parallel "+str(options.parallel)
98 convDataFiles = self.getFileList(pathConv,latest,options.rMax,minSize=0)
99 self.checkEOS(copy=False,latest=latest)
100 # remove directories which are not completely copied
101 for r in self.missing:
102 if r in convDataFiles: convDataFiles.pop(r)
103 # remove directories which are not fully converted
104 rawDataFiles = self.getFileList(path,latest,options.rMax,minSize=10E6)
105 self.RawrunNrs = {}
106 for x in rawDataFiles:
107 r = x//10000
108 if not r in self.RawrunNrs: self.RawrunNrs[r] = []
109 self.RawrunNrs[r].append(x)
110
111 orderedCDF = list(convDataFiles.keys())
112 lpruns = self.list_of_runs('run_Monitoring')
113
114 self.getRunNrFromOffline(latest)
115 self.runNrs = {}
116 for x in orderedCDF:
117 r = x//10000
118 if r in self.runNrs:
119 self.runNrs[r].append(x)
120 continue
121 if r in lpruns[0]: continue
122 if r in self.dqDataFiles: continue
123 self.runNrs[r] = [x]
124
125 for r in self.runNrs:
126 if r > options.rMax or r < options.rMin: continue # outside range
127 if not r in self.RawrunNrs:
128 print('run not found in raw data',r)
129 continue # file converted but not enough events
130 if len(self.runNrs[r]) != len(self.RawrunNrs[r]):
131 print('run not complete',r)
132 continue # not all files converted.
133 print('executing DQ for run %i'%(r))
134 os.system(monitorCommand.replace('XXXX',str(r)).replace('GGGG',options.geofile)+" &")
135 while self.count_python_processes('run_Monitoring')>(ncpus-2) or psutil.virtual_memory()[2]>90 : time.sleep(1800)
136
137 def RerunDataQuality(self,runNrs=[],rMin=-1,rMax=99999):
138 monitorCommand = "python $SNDSW_ROOT/shipLHC/scripts/run_Monitoring.py -r XXXX --server=$EOSSHIP \
139 -b 100000 -p "+pathConv+" -g GGGG "\
140 +" --postScale "+str(options.postScale)+ " --ScifiResUnbiased 1 --batch --sudo "
141 if options.parallel>1: monitorCommand += " --postScale "+str(options.parallel)
142 if len(runNrs) ==0:
143 self.getRunNrFromOffline(rMin,rMax)
144 runNrs = self.dqDataFiles
145 for r in runNrs:
146 print('executing DQ for run %i'%(r))
147 if r < 4575: geoFile = "../geofile_sndlhc_TI18_V3_08August2022.root"
148 elif r < 4855: geoFile = "../geofile_sndlhc_TI18_V5_14August2022.root"
149 elif r < 5172: geoFile = "../geofile_sndlhc_TI18_V6_08October2022.root"
150 elif r < 5485: geoFile = "../geofile_sndlhc_TI18_V7_22November2022.root"
151 elif r < 7357: geoFile = "../geofile_sndlhc_TI18_V1_2023.root"
152 else: geoFile = options.geofile
153 os.system(monitorCommand.replace('XXXX',str(r)).replace('GGGG',geoFile)+" &")
154 time.sleep(20)
155 while self.count_python_processes('run_Monitoring')>(ncpus-5) or psutil.virtual_memory()[2]>90 : time.sleep(300)
156
157 def check4NewFiles(self,latest,rmax):
158 rawDataFiles = self.getFileList(path,latest,rmax,minSize=10E6)
159 convDataFiles = self.getFileList(pathConv,latest,rmax,minSize=0)
160 orderedRDF = list(rawDataFiles.keys())
161 orderedCDF = list(convDataFiles.keys())
162 orderedRDF.sort(),orderedCDF.sort()
163
164 for x in orderedRDF:
165 if x in orderedCDF: continue
166 lpruns = self.list_of_runs('convertRawData')[1]
167 if x in lpruns: continue
168 r = x//10000
169 p = x%10000
170 if r==4541 and p==38: continue # corrupted raw data file
171 print('executing run %i and partition %i'%(r,p))
172 self.runSinglePartition(path,str(r).zfill(6),str(p).zfill(4),EOScopy=True,check=True)
173
174 def runSinglePartition(self,path,r,p,EOScopy=False,check=True):
175 while self.count_python_processes('convertRawData')>ncpus or self.count_python_processes('runProd')>ncpus: time.sleep(300)
176 process = []
177 pid = 0
178 try:
179 pid = os.fork()
180 except OSError:
181 print("Could not create a child process")
182 if pid!=0:
183 process.append(pid)
184 else:
185 inFile = self.options.server+path+'run_'+ r+'/data_'+p+'.root'
186 fI = ROOT.TFile.Open(inFile)
187 if not fI:
188 print('file not found',path,r,p)
189 exit()
190 if not fI.Get('event') and not fI.Get('data'):
191 print('file corrupted',path,r,p)
192 exit()
193 command = "python $SNDSW_ROOT/shipLHC/rawData/convertRawData.py -r "+str(int(r))+ " -b 1000 -p "+path+" --server="+self.options.server
194 if options.FairTask_convRaw: command+= " -cpp "
195 command += " -P "+str(int(p)) + " >log_"+r+'-'+p
196 print("execute ",command)
197 os.system(command)
198 if check:
199 rc = self.checkFile(path,r,p)
200 if rc<0:
201 print('converting failed',r,p,rc)
202 exit()
203 print('finished converting ',r,p)
204 tmp = {int(r):[int(p)]}
205 if EOScopy: self.copy2EOS(path,tmp,self.options.overwrite)
206 exit(0)
207
208 def checkFile(self,path,r,p):
209 print('checkfile',path,r,p)
210 inFile = self.options.server+path+'run_'+ r+'/data_'+p+'.root'
211 fI = ROOT.TFile.Open(inFile)
212 if fI.Get('event'): Nraw = fI.event.GetEntries()
213 else: Nraw = fI.data.GetEntries()
214 outFile = 'sndsw_raw_'+r+'-'+p+self.Mext+'.root'
215 fC = ROOT.TFile(outFile)
216 test = fC.Get('rawConv')
217 if not test:
218 print('Error:',path,r,p,' rawConv not found')
219 return -2
220 Nconv = fC.rawConv.GetEntries()
221 if Nraw != Nconv:
222 print('Error:',path,r,p,':',Nraw,Nconv)
223 return -1
224 return 0
225
226 def copy2EOS(self,path,success,overwrite):
227 for i in success:
228 r = str(i).zfill(6)
229 for x in success[i]:
230 p = str(x).zfill(4)
231 outFile = 'sndsw_raw_'+r+'-'+p+self.Mext+'.root'
232 tmpPath = os.environ['EOSSHIP']+pathConv+"run_"+r+"/sndsw_raw-"+p+".root"
233 print('copy '+outFile+' '+tmpPath)
234 command = 'xrdcp '
235 if overwrite: command+=' -f '
236 os.system(command+outFile+' '+tmpPath)
237 os.system('rm '+outFile)
238
239 def check(self,path,partitions):
240 success = {}
241 for r in partitions:
242 success[r] = []
243 runNr = str(r).zfill(6)
244 for x in partitions[r]:
245 p = str(x).zfill(4)
246 rc = checkFile(path,runNr,p)
247 if rc==0: success[r].append(x)
248 return success
249
250 def getFileList(self,p,latest,rmax=999999,minSize=10E6):
251 inventory = {}
252 dirList = str( subprocess.check_output("xrdfs "+self.options.server+" ls "+p,shell=True) )
253 for x in dirList.split('\\n'):
254 aDir = x[x.rfind('/')+1:]
255 if not aDir.find('run')==0 or aDir.find('json')>0:continue
256 runNr = int(aDir.split('_')[1])
257 if not runNr > latest: continue
258 if runNr > rmax: continue
259 fileList = str( subprocess.check_output("xrdfs "+self.options.server+" ls -l "+p+"/"+aDir,shell=True) )
260 for z in fileList.split('\\n'):
261 jj=0
262 tmp = []
263 for o in z.split(' '):
264 if not o=='': tmp.append(o)
265 if self.options.server.find('snd-server')>0:
266 jj = 0
267 k = z.rfind('data_')
268 if not k>0: continue
269 if not z[k+9:k+10]=='.': continue
270 first = False
271 for x in z.split(' '):
272 if x=='': continue
273 if not first and x=='sndecs': first = True
274 if first and not x=='sndecs':
275 size = int(x)
276 break
277 else:
278 k = max(z.find('data_'),z.find('sndsw'))
279 if not k>0: continue
280 size = int(tmp[3+jj])
281 if size<minSize: continue
282 theDay = tmp[4+jj]
283 theTime = tmp[5+jj]
284 fname = z[k:]
285 run = int(aDir.split('_')[1])
286 if run>900000: continue # not a physical run
287 k = fname.find('.root')
288 partition = int(fname[k-4:k])
289 d = theDay.split('-')
290 t = theTime.split(':')
291 gmt = time.mktime( (int(d[0]), int(d[1]), int(d[2]), int(t[0]), int(t[1]), int(t[2]), 0, 0, 0 ) )
292 inventory[run*10000+partition] = [aDir+"/"+fname,gmt]
293 return inventory
294
295 def getNumberOfEvents(self,r):
296# check for partitions
297 dirlist = str( subprocess.check_output("xrdfs "+options.server+" ls "+pathConv+"run_"+str(r).zfill(6),shell=True) )
298 partitions = []
299 for x in dirlist.split('\\n'):
300 ix = x.find('sndsw_raw-')
301 if ix<0: continue
302 partitions.append(x[ix:])
303 eventChain = ROOT.TChain('rawConv')
304 for p in partitions:
305 eventChain.Add(options.server+pathConv+'run_'+str(r).zfill(6)+'/'+p)
306 return eventChain.GetEntries()
307
308 def checkEOS(self,copy=False,latest=4361,rlast=999999):
309 self.eosInventory = self.getFileList(path,latest,rlast)
310 tmp = self.options.server
311 self.options.server = "root://snd-server-1.cern.ch/"
312 self.daqInventory = self.getFileList('/mnt/raid5/data_online/',latest,rlast)
313 self.options.server = tmp
314 self.missing = {}
315 for r in self.daqInventory:
316 if not r in self.eosInventory:
317 p = r%10000
318 run = r//10000
319 if not run in self.missing: self.missing[run]=[]
320 self.missing[run].append(p)
321 if copy:
322 for r in self.missing:
323 dirname ='run_'+str(r).zfill(6)
324 for p in self.missing[r]:
325 filename = 'data_'+str(p).zfill(4)+'.root'
326 source = '/mnt/raid5/data_online/'+dirname+'/'+filename
327 target = path+dirname+'/'+filename
328 os.system("xrdcp -f "+source+" "+os.environ['EOSSHIP']+target)
329
330 def getConvStats(self,runList):
331 for run in runList:
332 try:
333 f=ROOT.TFile.Open("sndsw_raw_"+str(run).zfill(6)+'.root')
334 print(run,':',f.rawConv.GetEntries())
335 except:
336 print('problem:',run)
337
338 def rawStats(self,runList):
339 for run in runList:
340 runNr = str(run).zfill(6)
341 r = ROOT.TFile.Open(os.environ['EOSSHIP']+path+"/run_"+runNr+"/data.root")
342 if fI.Get('event'): raw = r.event.GetEntries()
343 else: raw = r.data.GetEntries()
344 print(run,':',raw)
345
346 def makeHistos(self,runList):
347 # historic, used for H8 testbeam, use always DS tracks to survey US because of larger overlap
348 for run in runList:
349 command = "$SNDSW_ROOT/shipLHC/scripts/Survey-MufiScifi.py -r "+str(run)+" -p "+pathConv+" -g geofile_sndlhc_H6.root -c Mufi_Efficiency -n -1 -t DS"
350 os.system("python "+command+ " &")
351 while self.count_python_processes('Survey-MufiScifi')>ncpus:
352 time.sleep(200)
353
354 def mips(self):
355 # historic, used for H8 testbeam, use always DS tracks to survey US because of larger overlap
356 for run in runs:
357 command = "Survey-MufiScifi.py -r "+str(run)+" -p "+pathConv+" -g geofile_sndlhc_H6.root -c mips"
358 os.system("python "+command+ " &")
359 while count_python_processes('Survey-MufiScifi')>multiprocessing.cpu_count()-2:
360 time.sleep(200)
361
362if __name__ == '__main__':
363
364# use cases: H6, TI18
365 from argparse import ArgumentParser
366 parser = ArgumentParser()
367
368 parser.add_argument("-r", "--runNumbers", dest="runNumbers", help="list of run numbers",required=False, type=str,default="")
369 parser.add_argument("-P", "--production", dest="prod", help="H6 / epfl / TI18" ,required=False,default="TI18")
370 parser.add_argument("-c", "--command", dest="command", help="command", default=None)
371 parser.add_argument("-o", "--overwrite", dest="overwrite", action='store_true', help="overwrite EOS", default=False)
372 parser.add_argument("-cpp", "--convRawCPP", action='store_true', dest="FairTask_convRaw", help="convert raw data using ConvRawData FairTask", default=False)
373 parser.add_argument("--latest", dest="latest", help="last fully converted run", default=0,type=int)
374 parser.add_argument("-A", "--auto", dest="auto", help="run in auto mode checking regularly for new files",default=False,action='store_true')
375 parser.add_argument("--server", dest="server", help="xrootd server",default=os.environ["EOSSHIP"])
376 parser.add_argument("-g", dest="geofile", help="geometry and alignment",default="geofile_sndlhc_TI18.root")
377 parser.add_argument("--postScale", dest="postScale",help="post scale events, 1..10..100", default=-1,type=int)
378 parser.add_argument("--parallel", dest="parallel",default=1,type=int)
379 parser.add_argument("-rMin", dest="rMin",help="first run to process", default=-1,type=int)
380 parser.add_argument("-rMax", dest="rMax",help="last run to process", default=99999,type=int)
381 parser.add_argument("-p", dest="path", help="path to raw data",required=False,default="")
382 parser.add_argument("-d", dest="pathConv", help="path to converted data",required=False,default="")
383 parser.add_argument("--saveTo", dest="saveTo", help="output storage path", default="")
384 parser.add_argument("-n", "--nEvents", dest="nEvents", help="number of events", default=-1,type=int)
385 parser.add_argument("-s", "--nStart", dest="nStart", help="first event", default=0,type=int)
386
387
388 options = parser.parse_args()
390 M.Init(options)
391
392 runList = []
393 if options.prod == "TI18":
394 if options.server.find('eospublic')<0:
395 path = "/mnt/raid5/data_online/"
396 else:
397 path = options.path
398 pathConv = options.pathConv
399
400 elif options.prod == "reproc2022":
401 path = "/eos/experiment/sndlhc/raw_data/physics/2022/"
402 pathConv = "/eos/experiment/sndlhc/convertedData/physics/2022/"
403 # first run = 4361
404 elif options.prod == "H6":
405 path = "/eos/experiment/sndlhc/raw_data/commissioning/TB_H6/data/"
406 pathConv = "/eos/experiment/sndlhc/convertedData/commissioning/TB_H6/"
407 if options.runNumbers=="":
408 runList = [273,276,283,284,285,286,295,296,290,294,298,301,298,292,23,27,34,35,39,40, 41, 42, 43, 44, 45, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 123, 146, 165, 184]
409 elif options.prod == "epfl":
410 path = "/eos/experiment/sndlhc/raw_data/commissioning/scifi_cosmics_epfl/data/"
411 if options.runNumbers=="":
412 runList = [3,8]
413 elif options.prod == "test":
414 path = options.path
415 pathConv = options.pathConv
416 else:
417 print("production not known. you are on your own",options.prod)
418
419 if options.auto:
420 while 1 > 0:
421 M.check4NewFiles(options.latest,options.rMax)
422 time.sleep(600)
423 exit(0)
424
425 if options.command == "DQ":
426 while 1 > 0:
427 M.runDataQuality(options.latest)
428 time.sleep(360)
429 exit(0)
430 elif options.command == "rerunDQ":
431 runList = []
432 for r in options.runNumbers.split(','):
433 if r!= '': runList.append(int(r))
434 M.RerunDataQuality(runList,options.rMin,options.rMax)
435
436 elif options.command == "convert":
437 for r in options.runNumbers.split(','):
438 if r!= '': runList.append(int(r))
439 M.convert(runList,path)
440
441 elif options.command:
442 tmp = options.command.split(';')
443 command = tmp[0]+"("
444 for i in range(1,len(tmp)):
445 if i<4: command+='"'+tmp[i]+'"'
446 else: command+=tmp[i]
447 if i<len(tmp)-1: command+=","
448 command+=")"
449 print('executing ' + command )
450 eval("M."+command)
count_python_processes(self, macroName)
Definition runProd.py:37
checkEOS(self, copy=False, latest=4361, rlast=999999)
Definition runProd.py:308
getPartitions(self, runList, path)
Definition runProd.py:63
RerunDataQuality(self, runNrs=[], rMin=-1, rMax=99999)
Definition runProd.py:137
getFileList(self, p, latest, rmax=999999, minSize=10E6)
Definition runProd.py:250
makeHistos(self, runList)
Definition runProd.py:346
check4NewFiles(self, latest, rmax)
Definition runProd.py:157
check(self, path, partitions)
Definition runProd.py:239
getRunNrFromOffline(self, rMin=-1, rMax=99999)
Definition runProd.py:84
checkFile(self, path, r, p)
Definition runProd.py:208
list_of_runs(self, macroName)
Definition runProd.py:46
getConvStats(self, runList)
Definition runProd.py:330
copy2EOS(self, path, success, overwrite)
Definition runProd.py:226
runDataQuality(self, latest)
Definition runProd.py:93
getNumberOfEvents(self, r)
Definition runProd.py:295
Init(self, options)
Definition runProd.py:31
rawStats(self, runList)
Definition runProd.py:338
convert(self, runList, path, partitions={})
Definition runProd.py:76
runSinglePartition(self, path, r, p, EOScopy=False, check=True)
Definition runProd.py:174
delProcesses(pname)
Definition runProd.py:14