SND@LHC Software
Loading...
Searching...
No Matches
runProd.prodManager Class Reference

Public Member Functions

 Init (self, options)
 
 count_python_processes (self, macroName)
 
 list_of_runs (self, macroName)
 
 getPartitions (self, runList, path)
 
 convert (self, runList, path, partitions={})
 
 getRunNrFromOffline (self, rMin=-1, rMax=99999)
 
 runDataQuality (self, latest)
 
 RerunDataQuality (self, runNrs=[], rMin=-1, rMax=99999)
 
 check4NewFiles (self, latest, rmax)
 
 runSinglePartition (self, path, r, p, EOScopy=False, check=True)
 
 checkFile (self, path, r, p)
 
 copy2EOS (self, path, success, overwrite)
 
 check (self, path, partitions)
 
 getFileList (self, p, latest, rmax=999999, minSize=10E6)
 
 getNumberOfEvents (self, r)
 
 checkEOS (self, copy=False, latest=4361, rlast=999999)
 
 getConvStats (self, runList)
 
 rawStats (self, runList)
 
 makeHistos (self, runList)
 
 mips (self)
 

Public Attributes

 Mext
 
 options
 
 runNrs
 
 dqDataFiles
 
 RawrunNrs
 
 eosInventory
 
 daqInventory
 
 missing
 

Static Public Attributes

str command = "$SNDSW_ROOT/shipLHC/scripts/Survey-MufiScifi.py -r "+str(run)+" -p "+pathConv+" -g geofile_sndlhc_H6.root -c Mufi_Efficiency -n -1 -t DS"
 

Detailed Description

Definition at line 29 of file runProd.py.

Member Function Documentation

◆ check()

runProd.prodManager.check (   self,
  path,
  partitions 
)

Definition at line 239 of file runProd.py.

239 def check(self,path,partitions):
240 success = {}
241 for r in partitions:
242 success[r] = []
243 runNr = str(r).zfill(6)
244 for x in partitions[r]:
245 p = str(x).zfill(4)
246 rc = checkFile(path,runNr,p)
247 if rc==0: success[r].append(x)
248 return success
249

◆ check4NewFiles()

runProd.prodManager.check4NewFiles (   self,
  latest,
  rmax 
)

Definition at line 157 of file runProd.py.

157 def check4NewFiles(self,latest,rmax):
158 rawDataFiles = self.getFileList(path,latest,rmax,minSize=10E6)
159 convDataFiles = self.getFileList(pathConv,latest,rmax,minSize=0)
160 orderedRDF = list(rawDataFiles.keys())
161 orderedCDF = list(convDataFiles.keys())
162 orderedRDF.sort(),orderedCDF.sort()
163
164 for x in orderedRDF:
165 if x in orderedCDF: continue
166 lpruns = self.list_of_runs('convertRawData')[1]
167 if x in lpruns: continue
168 r = x//10000
169 p = x%10000
170 if r==4541 and p==38: continue # corrupted raw data file
171 print('executing run %i and partition %i'%(r,p))
172 self.runSinglePartition(path,str(r).zfill(6),str(p).zfill(4),EOScopy=True,check=True)
173

◆ checkEOS()

runProd.prodManager.checkEOS (   self,
  copy = False,
  latest = 4361,
  rlast = 999999 
)

Definition at line 308 of file runProd.py.

308 def checkEOS(self,copy=False,latest=4361,rlast=999999):
309 self.eosInventory = self.getFileList(path,latest,rlast)
310 tmp = self.options.server
311 self.options.server = "root://snd-server-1.cern.ch/"
312 self.daqInventory = self.getFileList('/mnt/raid5/data_online/',latest,rlast)
313 self.options.server = tmp
314 self.missing = {}
315 for r in self.daqInventory:
316 if not r in self.eosInventory:
317 p = r%10000
318 run = r//10000
319 if not run in self.missing: self.missing[run]=[]
320 self.missing[run].append(p)
321 if copy:
322 for r in self.missing:
323 dirname ='run_'+str(r).zfill(6)
324 for p in self.missing[r]:
325 filename = 'data_'+str(p).zfill(4)+'.root'
326 source = '/mnt/raid5/data_online/'+dirname+'/'+filename
327 target = path+dirname+'/'+filename
328 os.system("xrdcp -f "+source+" "+os.environ['EOSSHIP']+target)
329

◆ checkFile()

runProd.prodManager.checkFile (   self,
  path,
  r,
  p 
)

Definition at line 208 of file runProd.py.

208 def checkFile(self,path,r,p):
209 print('checkfile',path,r,p)
210 inFile = self.options.server+path+'run_'+ r+'/data_'+p+'.root'
211 fI = ROOT.TFile.Open(inFile)
212 if fI.Get('event'): Nraw = fI.event.GetEntries()
213 else: Nraw = fI.data.GetEntries()
214 outFile = 'sndsw_raw_'+r+'-'+p+self.Mext+'.root'
215 fC = ROOT.TFile(outFile)
216 test = fC.Get('rawConv')
217 if not test:
218 print('Error:',path,r,p,' rawConv not found')
219 return -2
220 Nconv = fC.rawConv.GetEntries()
221 if Nraw != Nconv:
222 print('Error:',path,r,p,':',Nraw,Nconv)
223 return -1
224 return 0
225

◆ convert()

runProd.prodManager.convert (   self,
  runList,
  path,
  partitions = {} 
)

Definition at line 76 of file runProd.py.

76 def convert(self,runList,path,partitions={}):
77 if len(partitions)==0: partitions = self.getPartitions(runList,path)
78 for r in partitions:
79 runNr = str(r).zfill(6)
80 # find partitions
81 for p in partitions[r]:
82 self.runSinglePartition(path,runNr,str(p).zfill(4),EOScopy=True,check=True)
83

◆ copy2EOS()

runProd.prodManager.copy2EOS (   self,
  path,
  success,
  overwrite 
)

Definition at line 226 of file runProd.py.

226 def copy2EOS(self,path,success,overwrite):
227 for i in success:
228 r = str(i).zfill(6)
229 for x in success[i]:
230 p = str(x).zfill(4)
231 outFile = 'sndsw_raw_'+r+'-'+p+self.Mext+'.root'
232 tmpPath = os.environ['EOSSHIP']+pathConv+"run_"+r+"/sndsw_raw-"+p+".root"
233 print('copy '+outFile+' '+tmpPath)
234 command = 'xrdcp '
235 if overwrite: command+=' -f '
236 os.system(command+outFile+' '+tmpPath)
237 os.system('rm '+outFile)
238

◆ count_python_processes()

runProd.prodManager.count_python_processes (   self,
  macroName 
)

Definition at line 37 of file runProd.py.

37 def count_python_processes(self,macroName):
38 username = pwd.getpwuid(os.getuid()).pw_name
39 callstring = "ps -f -u " + username
40# only works if screen is wide enough to print full name!
41 status = subprocess.check_output(callstring,shell=True)
42 n=0
43 for x in status.decode().split('\n'):
44 if not x.find(macroName)<0 and not x.find('python') <0: n+=1
45 return n

◆ getConvStats()

runProd.prodManager.getConvStats (   self,
  runList 
)

Definition at line 330 of file runProd.py.

330 def getConvStats(self,runList):
331 for run in runList:
332 try:
333 f=ROOT.TFile.Open("sndsw_raw_"+str(run).zfill(6)+'.root')
334 print(run,':',f.rawConv.GetEntries())
335 except:
336 print('problem:',run)
337

◆ getFileList()

runProd.prodManager.getFileList (   self,
  p,
  latest,
  rmax = 999999,
  minSize = 10E6 
)

Definition at line 250 of file runProd.py.

250 def getFileList(self,p,latest,rmax=999999,minSize=10E6):
251 inventory = {}
252 dirList = str( subprocess.check_output("xrdfs "+self.options.server+" ls "+p,shell=True) )
253 for x in dirList.split('\\n'):
254 aDir = x[x.rfind('/')+1:]
255 if not aDir.find('run')==0 or aDir.find('json')>0:continue
256 runNr = int(aDir.split('_')[1])
257 if not runNr > latest: continue
258 if runNr > rmax: continue
259 fileList = str( subprocess.check_output("xrdfs "+self.options.server+" ls -l "+p+"/"+aDir,shell=True) )
260 for z in fileList.split('\\n'):
261 jj=0
262 tmp = []
263 for o in z.split(' '):
264 if not o=='': tmp.append(o)
265 if self.options.server.find('snd-server')>0:
266 jj = 0
267 k = z.rfind('data_')
268 if not k>0: continue
269 if not z[k+9:k+10]=='.': continue
270 first = False
271 for x in z.split(' '):
272 if x=='': continue
273 if not first and x=='sndecs': first = True
274 if first and not x=='sndecs':
275 size = int(x)
276 break
277 else:
278 k = max(z.find('data_'),z.find('sndsw'))
279 if not k>0: continue
280 size = int(tmp[3+jj])
281 if size<minSize: continue
282 theDay = tmp[4+jj]
283 theTime = tmp[5+jj]
284 fname = z[k:]
285 run = int(aDir.split('_')[1])
286 if run>900000: continue # not a physical run
287 k = fname.find('.root')
288 partition = int(fname[k-4:k])
289 d = theDay.split('-')
290 t = theTime.split(':')
291 gmt = time.mktime( (int(d[0]), int(d[1]), int(d[2]), int(t[0]), int(t[1]), int(t[2]), 0, 0, 0 ) )
292 inventory[run*10000+partition] = [aDir+"/"+fname,gmt]
293 return inventory
294

◆ getNumberOfEvents()

runProd.prodManager.getNumberOfEvents (   self,
  r 
)

Definition at line 295 of file runProd.py.

295 def getNumberOfEvents(self,r):
296# check for partitions
297 dirlist = str( subprocess.check_output("xrdfs "+options.server+" ls "+pathConv+"run_"+str(r).zfill(6),shell=True) )
298 partitions = []
299 for x in dirlist.split('\\n'):
300 ix = x.find('sndsw_raw-')
301 if ix<0: continue
302 partitions.append(x[ix:])
303 eventChain = ROOT.TChain('rawConv')
304 for p in partitions:
305 eventChain.Add(options.server+pathConv+'run_'+str(r).zfill(6)+'/'+p)
306 return eventChain.GetEntries()
307

◆ getPartitions()

runProd.prodManager.getPartitions (   self,
  runList,
  path 
)

Definition at line 63 of file runProd.py.

63 def getPartitions(self,runList,path):
64 partitions = {}
65 for r in runList:
66 directory = path+"run_"+str(r).zfill(6)
67 dirlist = str( subprocess.check_output("xrdfs "+options.server+" ls "+directory,shell=True) )
68 partitions[r] = []
69 for x in dirlist.split('\\n'):
70 if x.find('.root')<0: continue
71 ix = x.rfind('data_')
72 if ix<0: continue
73 partitions[r].append( int(x[ix+5:ix+9]) )
74 return partitions
75

◆ getRunNrFromOffline()

runProd.prodManager.getRunNrFromOffline (   self,
  rMin = -1,
  rMax = 99999 
)

Definition at line 84 of file runProd.py.

84 def getRunNrFromOffline(self,rMin=-1,rMax=99999):
85 self.dqDataFiles = []
86 hList = str( subprocess.check_output("xrdfs "+os.environ['EOSSHIP']+" ls /eos/experiment/sndlhc/www/offline",shell=True) )
87 for x in hList.split('\\n'):
88 if x.find('root')<0: continue
89 run = x.split('/')[-1]
90 if run.find('run')!=0: continue
91 r = int(run[3:9])
92 if r>=rMin and r<=rMax: self.dqDataFiles.append(r)

◆ Init()

runProd.prodManager.Init (   self,
  options 
)

Definition at line 31 of file runProd.py.

31 def Init(self,options):
32 self.Mext = ''
33 if options.FairTask_convRaw:
34 self.Mext = '_CPP'
35 self.options = options
36 self.runNrs = []

◆ list_of_runs()

runProd.prodManager.list_of_runs (   self,
  macroName 
)

Definition at line 46 of file runProd.py.

46 def list_of_runs(self,macroName):
47 lpruns = [[],[]]
48 username = pwd.getpwuid(os.getuid()).pw_name
49 callstring = "ps -f -u " + username
50 status = subprocess.check_output(callstring,shell=True)
51 for x in status.decode().split('\n'):
52 if not x.find(macroName)<0 and not x.find('python') <0:
53 i = x.find('-r')
54 k = x.find('-P')
55 if i > 0:
56 r = int(x[i+3:].split(' ')[0])
57 lpruns[0].append(r)
58 if k>0:
59 p = int(x[k+3:].split(' ')[0])
60 lpruns[1].append(r*10000+p)
61 return lpruns
62

◆ makeHistos()

runProd.prodManager.makeHistos (   self,
  runList 
)

Definition at line 346 of file runProd.py.

346 def makeHistos(self,runList):

◆ mips()

runProd.prodManager.mips (   self)

Definition at line 354 of file runProd.py.

354 def mips(self):

◆ rawStats()

runProd.prodManager.rawStats (   self,
  runList 
)

Definition at line 338 of file runProd.py.

338 def rawStats(self,runList):
339 for run in runList:
340 runNr = str(run).zfill(6)
341 r = ROOT.TFile.Open(os.environ['EOSSHIP']+path+"/run_"+runNr+"/data.root")
342 if fI.Get('event'): raw = r.event.GetEntries()
343 else: raw = r.data.GetEntries()
344 print(run,':',raw)
345

◆ RerunDataQuality()

runProd.prodManager.RerunDataQuality (   self,
  runNrs = [],
  rMin = -1,
  rMax = 99999 
)

Definition at line 137 of file runProd.py.

137 def RerunDataQuality(self,runNrs=[],rMin=-1,rMax=99999):
138 monitorCommand = "python $SNDSW_ROOT/shipLHC/scripts/run_Monitoring.py -r XXXX --server=$EOSSHIP \
139 -b 100000 -p "+pathConv+" -g GGGG "\
140 +" --postScale "+str(options.postScale)+ " --ScifiResUnbiased 1 --batch --sudo "
141 if options.parallel>1: monitorCommand += " --postScale "+str(options.parallel)
142 if len(runNrs) ==0:
143 self.getRunNrFromOffline(rMin,rMax)
144 runNrs = self.dqDataFiles
145 for r in runNrs:
146 print('executing DQ for run %i'%(r))
147 if r < 4575: geoFile = "../geofile_sndlhc_TI18_V3_08August2022.root"
148 elif r < 4855: geoFile = "../geofile_sndlhc_TI18_V5_14August2022.root"
149 elif r < 5172: geoFile = "../geofile_sndlhc_TI18_V6_08October2022.root"
150 elif r < 5485: geoFile = "../geofile_sndlhc_TI18_V7_22November2022.root"
151 elif r < 7357: geoFile = "../geofile_sndlhc_TI18_V1_2023.root"
152 else: geoFile = options.geofile
153 os.system(monitorCommand.replace('XXXX',str(r)).replace('GGGG',geoFile)+" &")
154 time.sleep(20)
155 while self.count_python_processes('run_Monitoring')>(ncpus-5) or psutil.virtual_memory()[2]>90 : time.sleep(300)
156

◆ runDataQuality()

runProd.prodManager.runDataQuality (   self,
  latest 
)

Definition at line 93 of file runProd.py.

93 def runDataQuality(self,latest):
94 monitorCommand = "python $SNDSW_ROOT/shipLHC/scripts/run_Monitoring.py -r XXXX --server=$EOSSHIP \
95 -b 100000 -p "+pathConv+" -praw "+path+" -g GGGG "\
96 +" --postScale "+str(options.postScale)+ " --ScifiResUnbiased 1 --batch --sudo "
97 if options.parallel>1: monitorCommand += " --parallel "+str(options.parallel)
98 convDataFiles = self.getFileList(pathConv,latest,options.rMax,minSize=0)
99 self.checkEOS(copy=False,latest=latest)
100 # remove directories which are not completely copied
101 for r in self.missing:
102 if r in convDataFiles: convDataFiles.pop(r)
103 # remove directories which are not fully converted
104 rawDataFiles = self.getFileList(path,latest,options.rMax,minSize=10E6)
105 self.RawrunNrs = {}
106 for x in rawDataFiles:
107 r = x//10000
108 if not r in self.RawrunNrs: self.RawrunNrs[r] = []
109 self.RawrunNrs[r].append(x)
110
111 orderedCDF = list(convDataFiles.keys())
112 lpruns = self.list_of_runs('run_Monitoring')
113
114 self.getRunNrFromOffline(latest)
115 self.runNrs = {}
116 for x in orderedCDF:
117 r = x//10000
118 if r in self.runNrs:
119 self.runNrs[r].append(x)
120 continue
121 if r in lpruns[0]: continue
122 if r in self.dqDataFiles: continue
123 self.runNrs[r] = [x]
124
125 for r in self.runNrs:
126 if r > options.rMax or r < options.rMin: continue # outside range
127 if not r in self.RawrunNrs:
128 print('run not found in raw data',r)
129 continue # file converted but not enough events
130 if len(self.runNrs[r]) != len(self.RawrunNrs[r]):
131 print('run not complete',r)
132 continue # not all files converted.
133 print('executing DQ for run %i'%(r))
134 os.system(monitorCommand.replace('XXXX',str(r)).replace('GGGG',options.geofile)+" &")
135 while self.count_python_processes('run_Monitoring')>(ncpus-2) or psutil.virtual_memory()[2]>90 : time.sleep(1800)
136

◆ runSinglePartition()

runProd.prodManager.runSinglePartition (   self,
  path,
  r,
  p,
  EOScopy = False,
  check = True 
)

Definition at line 174 of file runProd.py.

174 def runSinglePartition(self,path,r,p,EOScopy=False,check=True):
175 while self.count_python_processes('convertRawData')>ncpus or self.count_python_processes('runProd')>ncpus: time.sleep(300)
176 process = []
177 pid = 0
178 try:
179 pid = os.fork()
180 except OSError:
181 print("Could not create a child process")
182 if pid!=0:
183 process.append(pid)
184 else:
185 inFile = self.options.server+path+'run_'+ r+'/data_'+p+'.root'
186 fI = ROOT.TFile.Open(inFile)
187 if not fI:
188 print('file not found',path,r,p)
189 exit()
190 if not fI.Get('event') and not fI.Get('data'):
191 print('file corrupted',path,r,p)
192 exit()
193 command = "python $SNDSW_ROOT/shipLHC/rawData/convertRawData.py -r "+str(int(r))+ " -b 1000 -p "+path+" --server="+self.options.server
194 if options.FairTask_convRaw: command+= " -cpp "
195 command += " -P "+str(int(p)) + " >log_"+r+'-'+p
196 print("execute ",command)
197 os.system(command)
198 if check:
199 rc = self.checkFile(path,r,p)
200 if rc<0:
201 print('converting failed',r,p,rc)
202 exit()
203 print('finished converting ',r,p)
204 tmp = {int(r):[int(p)]}
205 if EOScopy: self.copy2EOS(path,tmp,self.options.overwrite)
206 exit(0)
207

Member Data Documentation

◆ command

str runProd.prodManager.command = "$SNDSW_ROOT/shipLHC/scripts/Survey-MufiScifi.py -r "+str(run)+" -p "+pathConv+" -g geofile_sndlhc_H6.root -c Mufi_Efficiency -n -1 -t DS"
static

Definition at line 349 of file runProd.py.

◆ daqInventory

runProd.prodManager.daqInventory

Definition at line 312 of file runProd.py.

◆ dqDataFiles

runProd.prodManager.dqDataFiles

Definition at line 85 of file runProd.py.

◆ eosInventory

runProd.prodManager.eosInventory

Definition at line 309 of file runProd.py.

◆ Mext

runProd.prodManager.Mext

Definition at line 32 of file runProd.py.

◆ missing

runProd.prodManager.missing

Definition at line 314 of file runProd.py.

◆ options

runProd.prodManager.options

Definition at line 35 of file runProd.py.

◆ RawrunNrs

runProd.prodManager.RawrunNrs

Definition at line 105 of file runProd.py.

◆ runNrs

runProd.prodManager.runNrs

Definition at line 36 of file runProd.py.


The documentation for this class was generated from the following file: