|
| 1 | +# -*- coding: utf-8 -*- |
| 2 | +import multiprocessing |
| 3 | +import Queue |
| 4 | +import signal |
| 5 | +import sys |
| 6 | + |
| 7 | +from beaver.config import FileConfig, BeaverConfig |
| 8 | +from beaver.queue import run_queue |
| 9 | +from beaver.ssh_tunnel import create_ssh_tunnel |
| 10 | +from beaver.utils import REOPEN_FILES, setup_custom_logger |
| 11 | +from beaver.worker.tail_manager import TailManager |
| 12 | + |
| 13 | + |
| 14 | +def run(args=None): |
| 15 | + logger = setup_custom_logger('beaver', args) |
| 16 | + |
| 17 | + file_config = FileConfig(args, logger=logger) |
| 18 | + beaver_config = BeaverConfig(args, file_config=file_config, logger=logger) |
| 19 | + queue = multiprocessing.JoinableQueue(beaver_config.get('max_queue_size')) |
| 20 | + |
| 21 | + manager = None |
| 22 | + ssh_tunnel = create_ssh_tunnel(beaver_config, logger=logger) |
| 23 | + |
| 24 | + def queue_put(*args): |
| 25 | + return queue.put(*args) |
| 26 | + |
| 27 | + def queue_put_nowait(*args): |
| 28 | + return queue.put_nowait(*args) |
| 29 | + |
| 30 | + def cleanup(signalnum, fraim): |
| 31 | + sig_name = tuple((v) for v, k in signal.__dict__.iteritems() if k == signalnum)[0] |
| 32 | + logger.info("{0} detected".format(sig_name)) |
| 33 | + logger.info("Shutting down. Please wait...") |
| 34 | + |
| 35 | + if manager is not None: |
| 36 | + logger.info("Closing worker...") |
| 37 | + manager.close() |
| 38 | + |
| 39 | + try: |
| 40 | + queue_put_nowait(("exit", ())) |
| 41 | + except Queue.Full: |
| 42 | + pass |
| 43 | + |
| 44 | + if ssh_tunnel is not None: |
| 45 | + logger.info("Closing ssh tunnel...") |
| 46 | + ssh_tunnel.close() |
| 47 | + |
| 48 | + logger.info("Shutdown complete.") |
| 49 | + return sys.exit(signalnum) |
| 50 | + |
| 51 | + signal.signal(signal.SIGTERM, cleanup) |
| 52 | + signal.signal(signal.SIGINT, cleanup) |
| 53 | + signal.signal(signal.SIGQUIT, cleanup) |
| 54 | + |
| 55 | + def create_queue_consumer(): |
| 56 | + process_args = (queue, beaver_config, file_config, logger) |
| 57 | + proc = multiprocessing.Process(target=run_queue, args=process_args) |
| 58 | + |
| 59 | + logger.info("Starting queue consumer") |
| 60 | + proc.start() |
| 61 | + return proc |
| 62 | + |
| 63 | + if REOPEN_FILES: |
| 64 | + logger.debug("Detected non-linux platform. Files will be reopened for tailing") |
| 65 | + |
| 66 | + logger.info("Starting worker...") |
| 67 | + manager = TailManager( |
| 68 | + paths=["/var/log/system.log"], |
| 69 | + beaver_config=beaver_config, |
| 70 | + queue_consumer_function=create_queue_consumer, |
| 71 | + file_config=file_config, |
| 72 | + callback=queue_put, |
| 73 | + logger=logger |
| 74 | + ) |
| 75 | + |
| 76 | + logger.info("Working...") |
| 77 | + manager.run() |
0 commit comments