Aaron Krohn

Fixed bug where statter would stat same few files repeatedly; As a result, neede…

…d to balance number of statters and walkers; Added try/except to os.stat() in case of broken links;
... ... @@ -64,22 +64,29 @@ def statter (sq, wq, wid, debug=False):
while path_name:
stats = os.stat(path_name)
out = '{0}\t{1}\t{2}\t{3}'.format(stats.st_uid, stats.st_size, stats.st_mtime, path_name)
try:
wq.put(out, block=True, timeout=4)
except Full:
stats = os.stat(path_name)
except OSError:
# This can happen if we try to stat a broken link
# Expected behavior is to skip this file and output nothing
if debug:
sys.stderr.write('P{0}: exiting, write queue full\n'.format(wid))
return False
sys.stderr.write('P{0}: file not found {1}\n'.format(wid, path_name))
else:
out = '{0}\t{1}\t{2}\t{3}'.format(stats.st_uid, stats.st_size, stats.st_mtime, path_name)
try:
sq.get(block=True, timeout=4)
except Empty:
if debug:
sys.stderr.write('P{0}: stat queue empty\n'.format(wid))
return False
try:
wq.put(out, block=True, timeout=4)
except Full:
if debug:
sys.stderr.write('P{0}: exiting, write queue full\n'.format(wid))
return False
finally:
try:
path_name = sq.get(block=True, timeout=4)
except Empty:
if debug:
sys.stderr.write('P{0}: stat queue empty\n'.format(wid))
return False
def walker(dq, sq, wid, debug=False):
""" Checks directory queue for items, walks a single path, and checks the queue again """
... ... @@ -105,7 +112,7 @@ def walker(dq, sq, wid, debug=False):
if debug:
sys.stderr.write('P{0}: walking\n'.format(wid, next_dir))
# Add root path to write queue, exit if queue is full
# Add root path to stat queue, exit if queue is full
try:
sq.put(r, block=True, timeout=4)
except Full:
... ... @@ -128,7 +135,7 @@ def walker(dq, sq, wid, debug=False):
sys.stderr.write('P{0}: walker exiting\n'.format(wid))
return False
# Add each full file path to the write queue for printing to stdout
# Add each full file path to the stat queue for file attribs lookup
for nf in f:
queue_file = os.path.join(r, nf)
try:
... ... @@ -190,21 +197,30 @@ def main():
# Don't recurse
break
# Distribute CPUs
n_writers = 1
n_statters = (ncpus - n_writers) / 2
n_walkers = ncpus - n_statters - n_writers
# Create stdout writer process
ncpus -= 1
p_name = 'proc_{0}'.format(ncpus)
procs.append(Process(target=writer, args=(wq, ncpus, args.debug), name=p_name))
if args.debug:
sys.stderr.write('Starting write worker {0}\n'.format(p_name))
for i in xrange(n_writers):
ncpus -= 1
p_name = 'proc_{0}'.format(ncpus)
procs.append(Process(target=writer, args=(wq, ncpus, args.debug), name=p_name))
if args.debug:
sys.stderr.write('Starting write worker {0}\n'.format(p_name))
for i in xrange(int(ncpus/2)):
# Create statter process
for i in xrange(n_statters):
ncpus -= 1
p_name = 'proc_{0}'.format(i)
procs.append(Process(target=statter, args=(sq, wq, i, args.debug), name=p_name))
if args.debug:
sys.stderr.write('Starting stat worker {0}\n'.format(p_name))
# Create tree walker processes (could probably use pool here, but... meh)
for i in xrange(ncpus):
for i in xrange(n_walkers):
ncpus -= 1
p_name = 'proc_{0}'.format(i)
procs.append(Process(target=walker, args=(dq, sq, i, args.debug), name=p_name))
if args.debug:
... ... @@ -232,3 +248,4 @@ def main():
if __name__ == "__main__":
main()
... ...