walker_attr.py 8.73 KB
#!/bin/env python

# Copyright 2017 Aaron E Krohn, OnNIX LLC
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

from Queue import Empty, Full
from multiprocessing import Queue, Process, cpu_count
import os
import sys
import time
import argparse

def writer(wq, wid, debug=False):
    """ Checks writer queue and prints each """

    # Get first item from queue
    try:
        out_text = wq.get()
    except Empty:
        if debug:
            sys.stderr.write('P{0}: writer exiting prematurely\n'.format(wid))
        return False

    # Print items while queue is not empty
    while out_text:

        out = out_text.strip()
        print out

        # Get next item
        try:
            out_text = wq.get(block=True, timeout=5)
        except Empty:
            if debug:
                sys.stderr.write('P{0}: writer exiting\n'.format(wid))
            return False

def statter (sq, wq, wid, debug=False):

    # Get first queue item
    try:
        path_name = sq.get()
    except Empty:
        if debug:
            sys.stderr.write('P{0}: statter exiting prematurely\n'.format(wid))
        return False

    while path_name:

        try:
            stats = os.stat(path_name)
        except OSError:
            # This can happen if we try to stat a broken link
            # Expected behavior is to skip this file and output nothing
            if debug:
                sys.stderr.write('P{0}: file not found {1}\n'.format(wid, path_name))
        else:
            out = '{0}\t{1}\t{2}\t{3}'.format(stats.st_uid, stats.st_size, stats.st_mtime, path_name)

            try:
                wq.put(out, block=True, timeout=4)
            except Full:
                if debug:
                    sys.stderr.write('P{0}: exiting, write queue full\n'.format(wid))
                return False
        finally:
            try:
                path_name = sq.get(block=True, timeout=4)
            except Empty:
                if debug:
                    sys.stderr.write('P{0}: stat queue empty\n'.format(wid))
                return False

def walker(dq, sq, wid, debug=False):
    """ Checks directory queue for items, walks a single path, and checks the queue again """

    # Get first directory from queue
    try:
        next_dir = dq.get(block=True, timeout=4)
    except Empty:
        return False

    # Continue getting directories while queue isn't empty
    while next_dir:

        if debug:
            sys.stderr.write('P{0}: next_dir :: {1}\n'.format(wid, next_dir))

        # Iterate directory once.
        # r: root path name
        # d: list of dirs in root path
        # f: list of files in root path
        for r,d,f in os.walk(next_dir):

            if debug:
                sys.stderr.write('P{0}: walking\n'.format(wid, next_dir))

            # Add root path to stat queue, exit if queue is full
            try:
                sq.put(r, block=True, timeout=4)
            except Full:
                if debug:
                    sys.stderr.write('P{0}: walker exiting\n'.format(wid))
                return False

            if debug:
                if len(d) < 1:
                    sys.stderr.write('No more dirs\n')

            # Add each full directory to path to the directory queue
            # Exit if queue is full
            for nd in d:
                queue_dir = os.path.join(r, nd)
                try:
                    dq.put(queue_dir, block=True, timeout=4)
                except Full:
                    if debug:
                        sys.stderr.write('P{0}: walker exiting\n'.format(wid))
                    return False

            # Add each full file path to the stat queue for file attribs lookup
            for nf in f:
                queue_file = os.path.join(r, nf)
                try:
                    sq.put(queue_file, block=True, timeout=1)
                except Full:
                    if debug:
                        sys.stderr.write('P{0}: walker exiting\n'.format(wid))
                    return False
            # Don't let os.walk() recurse
            break

        # Check the queue for our next directory, exit if queue is empty
        try:
            next_dir = dq.get(block=True, timeout=4)
        except Empty:
            if debug:
                sys.stderr.write('P{0}: walker finished\n'.format(wid))
            return False

def main():

    # Parse CLI arguments
    parser = argparse.ArgumentParser()
    parser.add_argument("-p", "--path", help="Root path to scan",
                        action="store", dest="root_path",
                        required=True)
    parser.add_argument("-d", "--debug", help="Send debugging output to stderr",
                        action="store_true", dest="debug")
    parser.add_argument("-q", "--queue-size", help="Length of queues. Should be larger than total file count.",
                        action="store", dest="queue_size", default=100000000)
    args = parser.parse_args()

    # Set to ridiculously large. Worst case scenario, every file and path name
    # in your tree end up in the 'wq' queue. If you have more files than these
    # values, increase them accordingly.
    dq = Queue(args.queue_size) # Unwalked directory queue
    sq = Queue(args.queue_size) # File stat queue
    wq = Queue(args.queue_size) # Path writer queue

    # Core count
    ncpus = cpu_count()

    # Keep track of our running processes
    procs = []

    # Initialize queues by walking root directory
    for r,d,f in os.walk(args.root_path):

        # Add root directory name to writer queue
        sq.put(r)
        # Add directories to directory queue
        for dn in d:
            add_dir = os.path.join(r, dn)
            dq.put(add_dir)
        # Add file names to writer queue
        for fn in f:
            add_file = os.path.join(r, fn)
            sq.put(add_file)
        # Don't recurse
        break

    if ncpus < 4:
        sys.stderr.write("""
WARNING: This system does not have enough CPUs to utilize the benefits
         of a parallelized directory walk. The system's built-in tools
         would likely be as fast or faster for this process if it will
         even run at all. Some systems have been found to hang.
""")
        time.sleep(10)
        sys.stderr.write('Process starting...')

    # Distribute CPUs
    n_writers = 1
    n_statters = (ncpus - n_writers) / 2
    n_walkers = ncpus - n_statters - n_writers

    # Create stdout writer process
    for i in xrange(n_writers):
        ncpus -= 1
        p_name = 'proc_{0}'.format(ncpus)
        procs.append(Process(target=writer, args=(wq, ncpus, args.debug), name=p_name))
        if args.debug:
            sys.stderr.write('Starting write worker {0}\n'.format(p_name))

    # Create statter process
    for i in xrange(n_statters):
        ncpus -= 1
        p_name = 'proc_{0}'.format(i)
        procs.append(Process(target=statter, args=(sq, wq, i, args.debug), name=p_name))
        if args.debug:
            sys.stderr.write('Starting stat worker {0}\n'.format(p_name))

    # Create tree walker processes (could probably use pool here, but... meh)
    for i in xrange(n_walkers):
        ncpus -= 1
        p_name = 'proc_{0}'.format(i)
        procs.append(Process(target=walker, args=(dq, sq, i, args.debug), name=p_name))
        if args.debug:
            sys.stderr.write('Starting walk worker {0}\n'.format(p_name))

    # Start all processes
    for proc in procs:
        proc.start()

    run_count = len(procs)

    # Keep an eye on our running processes
    while run_count > 0:

        for proc in procs:
            if proc.exitcode is not None:
                if args.debug:
                    sys.stderr.write('Terminating {0}\n'.format(proc.name))
                proc.terminate()
                run_count -= 1
        time.sleep(1)

    if args.debug:
        sys.stderr.write('Fin.\n')

if __name__ == "__main__":
    main()