1import os,subprocess,time,multiprocessing,psutil
4ncpus = multiprocessing.cpu_count() - 2
7 for each run in runlist. look for number of partitions
8 for each partition, start conversion job.
9 check consistency of output file
10 copy optional file to EOS
11 run as many jobs in parallel as cpus are available
15 username = pwd.getpwuid(os.getuid()).pw_name
16 callstring =
"ps -f -u " + username
17 status = subprocess.check_output(callstring,shell=
True)
18 for x
in str(status).split(
"\\n"):
19 if not x.find(pname)<0:
21 for pid
in x.split(
' '):
26 os.system(
'kill '+pid)
33 if options.FairTask_convRaw:
38 username = pwd.getpwuid(os.getuid()).pw_name
39 callstring =
"ps -f -u " + username
41 status = subprocess.check_output(callstring,shell=
True)
43 for x
in status.decode().split(
'\n'):
44 if not x.find(macroName)<0
and not x.find(
'python') <0: n+=1
48 username = pwd.getpwuid(os.getuid()).pw_name
49 callstring =
"ps -f -u " + username
50 status = subprocess.check_output(callstring,shell=
True)
51 for x
in status.decode().split(
'\n'):
52 if not x.find(macroName)<0
and not x.find(
'python') <0:
56 r = int(x[i+3:].split(
' ')[0])
59 p = int(x[k+3:].split(
' ')[0])
60 lpruns[1].append(r*10000+p)
66 directory = path+
"run_"+
str(r).zfill(6)
67 dirlist =
str( subprocess.check_output(
"xrdfs "+options.server+
" ls "+directory,shell=
True) )
69 for x
in dirlist.split(
'\\n'):
70 if x.find(
'.root')<0:
continue
73 partitions[r].append( int(x[ix+5:ix+9]) )
76 def convert(self,runList,path,partitions={}):
77 if len(partitions)==0: partitions = self.
getPartitions(runList,path)
79 runNr =
str(r).zfill(6)
81 for p
in partitions[r]:
86 hList =
str( subprocess.check_output(
"xrdfs "+os.environ[
'EOSSHIP']+
" ls /eos/experiment/sndlhc/www/offline",shell=
True) )
87 for x
in hList.split(
'\\n'):
88 if x.find(
'root')<0:
continue
89 run = x.split(
'/')[-1]
90 if run.find(
'run')!=0:
continue
94 monitorCommand =
"python $SNDSW_ROOT/shipLHC/scripts/run_Monitoring.py -r XXXX --server=$EOSSHIP \
95 -b 100000 -p "+pathConv+
" -praw "+path+
" -g GGGG "\
96 +
" --postScale "+
str(options.postScale)+
" --ScifiResUnbiased 1 --batch --sudo "
97 if options.parallel>1: monitorCommand +=
" --parallel "+
str(options.parallel)
98 convDataFiles = self.
getFileList(pathConv,latest,options.rMax,minSize=0)
99 self.
checkEOS(copy=
False,latest=latest)
102 if r
in convDataFiles: convDataFiles.pop(r)
104 rawDataFiles = self.
getFileList(path,latest,options.rMax,minSize=10E6)
106 for x
in rawDataFiles:
111 orderedCDF = list(convDataFiles.keys())
121 if r
in lpruns[0]:
continue
126 if r > options.rMax
or r < options.rMin:
continue
128 print(
'run not found in raw data',r)
131 print(
'run not complete',r)
133 print(
'executing DQ for run %i'%(r))
134 os.system(monitorCommand.replace(
'XXXX',
str(r)).replace(
'GGGG',options.geofile)+
" &")
135 while self.
count_python_processes(
'run_Monitoring')>(ncpus-2)
or psutil.virtual_memory()[2]>90 : time.sleep(1800)
138 monitorCommand =
"python $SNDSW_ROOT/shipLHC/scripts/run_Monitoring.py -r XXXX --server=$EOSSHIP \
139 -b 100000 -p "+pathConv+
" -g GGGG "\
140 +
" --postScale "+
str(options.postScale)+
" --ScifiResUnbiased 1 --batch --sudo "
141 if options.parallel>1: monitorCommand +=
" --postScale "+
str(options.parallel)
146 print(
'executing DQ for run %i'%(r))
147 if r < 4575: geoFile =
"../geofile_sndlhc_TI18_V3_08August2022.root"
148 elif r < 4855: geoFile =
"../geofile_sndlhc_TI18_V5_14August2022.root"
149 elif r < 5172: geoFile =
"../geofile_sndlhc_TI18_V6_08October2022.root"
150 elif r < 5485: geoFile =
"../geofile_sndlhc_TI18_V7_22November2022.root"
151 elif r < 7357: geoFile =
"../geofile_sndlhc_TI18_V1_2023.root"
152 else: geoFile = options.geofile
153 os.system(monitorCommand.replace(
'XXXX',
str(r)).replace(
'GGGG',geoFile)+
" &")
155 while self.
count_python_processes(
'run_Monitoring')>(ncpus-5)
or psutil.virtual_memory()[2]>90 : time.sleep(300)
158 rawDataFiles = self.
getFileList(path,latest,rmax,minSize=10E6)
159 convDataFiles = self.
getFileList(pathConv,latest,rmax,minSize=0)
160 orderedRDF = list(rawDataFiles.keys())
161 orderedCDF = list(convDataFiles.keys())
162 orderedRDF.sort(),orderedCDF.sort()
165 if x
in orderedCDF:
continue
167 if x
in lpruns:
continue
170 if r==4541
and p==38:
continue
171 print(
'executing run %i and partition %i'%(r,p))
181 print(
"Could not create a child process")
185 inFile = self.
options.server+path+
'run_'+ r+
'/data_'+p+
'.root'
186 fI = ROOT.TFile.Open(inFile)
188 print(
'file not found',path,r,p)
190 if not fI.Get(
'event')
and not fI.Get(
'data'):
191 print(
'file corrupted',path,r,p)
193 command =
"python $SNDSW_ROOT/shipLHC/rawData/convertRawData.py -r "+
str(int(r))+
" -b 1000 -p "+path+
" --server="+self.
options.server
194 if options.FairTask_convRaw: command+=
" -cpp "
195 command +=
" -P "+
str(int(p)) +
" >log_"+r+
'-'+p
196 print(
"execute ",command)
201 print(
'converting failed',r,p,rc)
203 print(
'finished converting ',r,p)
204 tmp = {int(r):[int(p)]}
209 print(
'checkfile',path,r,p)
210 inFile = self.
options.server+path+
'run_'+ r+
'/data_'+p+
'.root'
211 fI = ROOT.TFile.Open(inFile)
212 if fI.Get(
'event'): Nraw = fI.event.GetEntries()
213 else: Nraw = fI.data.GetEntries()
214 outFile =
'sndsw_raw_'+r+
'-'+p+self.
Mext+
'.root'
215 fC = ROOT.TFile(outFile)
216 test = fC.Get(
'rawConv')
218 print(
'Error:',path,r,p,
' rawConv not found')
220 Nconv = fC.rawConv.GetEntries()
222 print(
'Error:',path,r,p,
':',Nraw,Nconv)
231 outFile =
'sndsw_raw_'+r+
'-'+p+self.
Mext+
'.root'
232 tmpPath = os.environ[
'EOSSHIP']+pathConv+
"run_"+r+
"/sndsw_raw-"+p+
".root"
233 print(
'copy '+outFile+
' '+tmpPath)
235 if overwrite: command+=
' -f '
236 os.system(command+outFile+
' '+tmpPath)
237 os.system(
'rm '+outFile)
243 runNr =
str(r).zfill(6)
244 for x
in partitions[r]:
247 if rc==0: success[r].append(x)
252 dirList =
str( subprocess.check_output(
"xrdfs "+self.
options.server+
" ls "+p,shell=
True) )
253 for x
in dirList.split(
'\\n'):
254 aDir = x[x.rfind(
'/')+1:]
255 if not aDir.find(
'run')==0
or aDir.find(
'json')>0:
continue
256 runNr = int(aDir.split(
'_')[1])
257 if not runNr > latest:
continue
258 if runNr > rmax:
continue
259 fileList =
str( subprocess.check_output(
"xrdfs "+self.
options.server+
" ls -l "+p+
"/"+aDir,shell=
True) )
260 for z
in fileList.split(
'\\n'):
263 for o
in z.split(
' '):
264 if not o==
'': tmp.append(o)
265 if self.
options.server.find(
'snd-server')>0:
269 if not z[k+9:k+10]==
'.':
continue
271 for x
in z.split(
' '):
273 if not first
and x==
'sndecs': first =
True
274 if first
and not x==
'sndecs':
278 k = max(z.find(
'data_'),z.find(
'sndsw'))
280 size = int(tmp[3+jj])
281 if size<minSize:
continue
285 run = int(aDir.split(
'_')[1])
286 if run>900000:
continue
287 k = fname.find(
'.root')
288 partition = int(fname[k-4:k])
289 d = theDay.split(
'-')
290 t = theTime.split(
':')
291 gmt = time.mktime( (int(d[0]), int(d[1]), int(d[2]), int(t[0]), int(t[1]), int(t[2]), 0, 0, 0 ) )
292 inventory[run*10000+partition] = [aDir+
"/"+fname,gmt]
297 dirlist =
str( subprocess.check_output(
"xrdfs "+options.server+
" ls "+pathConv+
"run_"+
str(r).zfill(6),shell=
True) )
299 for x
in dirlist.split(
'\\n'):
300 ix = x.find(
'sndsw_raw-')
302 partitions.append(x[ix:])
303 eventChain = ROOT.TChain(
'rawConv')
305 eventChain.Add(options.server+pathConv+
'run_'+
str(r).zfill(6)+
'/'+p)
306 return eventChain.GetEntries()
308 def checkEOS(self,copy=False,latest=4361,rlast=999999):
311 self.
options.server =
"root://snd-server-1.cern.ch/"
323 dirname =
'run_'+
str(r).zfill(6)
325 filename =
'data_'+
str(p).zfill(4)+
'.root'
326 source =
'/mnt/raid5/data_online/'+dirname+
'/'+filename
327 target = path+dirname+
'/'+filename
328 os.system(
"xrdcp -f "+source+
" "+os.environ[
'EOSSHIP']+target)
333 f=ROOT.TFile.Open(
"sndsw_raw_"+
str(run).zfill(6)+
'.root')
334 print(run,
':',f.rawConv.GetEntries())
336 print(
'problem:',run)
340 runNr =
str(run).zfill(6)
341 r = ROOT.TFile.Open(os.environ[
'EOSSHIP']+path+
"/run_"+runNr+
"/data.root")
342 if fI.Get(
'event'): raw = r.event.GetEntries()
343 else: raw = r.data.GetEntries()
349 command =
"$SNDSW_ROOT/shipLHC/scripts/Survey-MufiScifi.py -r "+
str(run)+
" -p "+pathConv+
" -g geofile_sndlhc_H6.root -c Mufi_Efficiency -n -1 -t DS"
350 os.system(
"python "+command+
" &")
357 command =
"Survey-MufiScifi.py -r "+
str(run)+
" -p "+pathConv+
" -g geofile_sndlhc_H6.root -c mips"
358 os.system(
"python "+command+
" &")
362if __name__ ==
'__main__':
365 from argparse
import ArgumentParser
366 parser = ArgumentParser()
368 parser.add_argument(
"-r",
"--runNumbers", dest=
"runNumbers", help=
"list of run numbers",required=
False, type=str,default=
"")
369 parser.add_argument(
"-P",
"--production", dest=
"prod", help=
"H6 / epfl / TI18" ,required=
False,default=
"TI18")
370 parser.add_argument(
"-c",
"--command", dest=
"command", help=
"command", default=
None)
371 parser.add_argument(
"-o",
"--overwrite", dest=
"overwrite", action=
'store_true', help=
"overwrite EOS", default=
False)
372 parser.add_argument(
"-cpp",
"--convRawCPP", action=
'store_true', dest=
"FairTask_convRaw", help=
"convert raw data using ConvRawData FairTask", default=
False)
373 parser.add_argument(
"--latest", dest=
"latest", help=
"last fully converted run", default=0,type=int)
374 parser.add_argument(
"-A",
"--auto", dest=
"auto", help=
"run in auto mode checking regularly for new files",default=
False,action=
'store_true')
375 parser.add_argument(
"--server", dest=
"server", help=
"xrootd server",default=os.environ[
"EOSSHIP"])
376 parser.add_argument(
"-g", dest=
"geofile", help=
"geometry and alignment",default=
"geofile_sndlhc_TI18.root")
377 parser.add_argument(
"--postScale", dest=
"postScale",help=
"post scale events, 1..10..100", default=-1,type=int)
378 parser.add_argument(
"--parallel", dest=
"parallel",default=1,type=int)
379 parser.add_argument(
"-rMin", dest=
"rMin",help=
"first run to process", default=-1,type=int)
380 parser.add_argument(
"-rMax", dest=
"rMax",help=
"last run to process", default=99999,type=int)
381 parser.add_argument(
"-p", dest=
"path", help=
"path to raw data",required=
False,default=
"")
382 parser.add_argument(
"-d", dest=
"pathConv", help=
"path to converted data",required=
False,default=
"")
383 parser.add_argument(
"--saveTo", dest=
"saveTo", help=
"output storage path", default=
"")
384 parser.add_argument(
"-n",
"--nEvents", dest=
"nEvents", help=
"number of events", default=-1,type=int)
385 parser.add_argument(
"-s",
"--nStart", dest=
"nStart", help=
"first event", default=0,type=int)
388 options = parser.parse_args()
393 if options.prod ==
"TI18":
394 if options.server.find(
'eospublic')<0:
395 path =
"/mnt/raid5/data_online/"
398 pathConv = options.pathConv
400 elif options.prod ==
"reproc2022":
401 path =
"/eos/experiment/sndlhc/raw_data/physics/2022/"
402 pathConv =
"/eos/experiment/sndlhc/convertedData/physics/2022/"
404 elif options.prod ==
"H6":
405 path =
"/eos/experiment/sndlhc/raw_data/commissioning/TB_H6/data/"
406 pathConv =
"/eos/experiment/sndlhc/convertedData/commissioning/TB_H6/"
407 if options.runNumbers==
"":
408 runList = [273,276,283,284,285,286,295,296,290,294,298,301,298,292,23,27,34,35,39,40, 41, 42, 43, 44, 45, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 123, 146, 165, 184]
409 elif options.prod ==
"epfl":
410 path =
"/eos/experiment/sndlhc/raw_data/commissioning/scifi_cosmics_epfl/data/"
411 if options.runNumbers==
"":
413 elif options.prod ==
"test":
415 pathConv = options.pathConv
417 print(
"production not known. you are on your own",options.prod)
421 M.check4NewFiles(options.latest,options.rMax)
425 if options.command ==
"DQ":
427 M.runDataQuality(options.latest)
430 elif options.command ==
"rerunDQ":
432 for r
in options.runNumbers.split(
','):
433 if r!=
'': runList.append(int(r))
434 M.RerunDataQuality(runList,options.rMin,options.rMax)
436 elif options.command ==
"convert":
437 for r
in options.runNumbers.split(
','):
438 if r!=
'': runList.append(int(r))
439 M.convert(runList,path)
441 elif options.command:
442 tmp = options.command.split(
';')
444 for i
in range(1,len(tmp)):
445 if i<4: command+=
'"'+tmp[i]+
'"'
446 else: command+=tmp[i]
447 if i<len(tmp)-1: command+=
","
449 print(
'executing ' + command )
count_python_processes(self, macroName)
checkEOS(self, copy=False, latest=4361, rlast=999999)
getPartitions(self, runList, path)
RerunDataQuality(self, runNrs=[], rMin=-1, rMax=99999)
getFileList(self, p, latest, rmax=999999, minSize=10E6)
makeHistos(self, runList)
check4NewFiles(self, latest, rmax)
check(self, path, partitions)
getRunNrFromOffline(self, rMin=-1, rMax=99999)
checkFile(self, path, r, p)
list_of_runs(self, macroName)
getConvStats(self, runList)
copy2EOS(self, path, success, overwrite)
runDataQuality(self, latest)
getNumberOfEvents(self, r)
convert(self, runList, path, partitions={})
runSinglePartition(self, path, r, p, EOScopy=False, check=True)