-
Notifications
You must be signed in to change notification settings - Fork 165
Expand file tree
/
Copy pathrun_queue.py
More file actions
109 lines (94 loc) · 4.21 KB
/
run_queue.py
File metadata and controls
109 lines (94 loc) · 4.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# -*- coding: utf-8 -*-
import Queue
import signal
import sys
import time
from beaver.transports import create_transport
from beaver.transports.exception import TransportException
from unicode_dammit import unicode_dammit
def run_queue(queue, beaver_config, logger=None):
signal.signal(signal.SIGTERM, signal.SIG_DFL)
signal.signal(signal.SIGINT, signal.SIG_DFL)
signal.signal(signal.SIGQUIT, signal.SIG_DFL)
last_update_time = int(time.time())
queue_timeout = beaver_config.get('queue_timeout')
wait_timeout = beaver_config.get('wait_timeout')
count = 0
transport = None
try:
logger.debug('Logging using the {0} transport'.format(beaver_config.get('transport')))
transport = create_transport(beaver_config, logger=logger)
failure_count = 0
while True:
if not transport.valid():
logger.info('Transport connection issues, stopping queue')
break
command = None
try:
if queue.full():
logger.error("Queue is full")
else:
if count == 1000:
logger.debug("Main consumer queue Size is: " + str(queue.qsize()))
count = 0
command, data = queue.get(block=True, timeout=wait_timeout)
if command == "callback":
last_update_time = int(time.time())
logger.debug('Last update time now {0}'.format(last_update_time))
except Queue.Empty:
if not queue.empty():
logger.error('Recieved timeout from main consumer queue - stopping queue')
break
else:
logger.debug('No data')
if int(time.time()) - last_update_time > queue_timeout:
logger.info('Queue timeout of "{0}" seconds exceeded, stopping queue'.format(queue_timeout))
break
if command == 'callback':
if data.get('ignore_empty', False):
logger.debug('removing empty lines')
lines = data['lines']
new_lines = []
for line in lines:
message = unicode_dammit(line)
if len(message) == 0:
continue
new_lines.append(message)
data['lines'] = new_lines
if len(data['lines']) == 0:
logger.debug('0 active lines sent from worker')
continue
while True:
try:
transport.callback(**data)
count += 1
logger.debug("Number of transports: " + str(count))
break
except TransportException as e:
failure_count = failure_count + 1
if failure_count > beaver_config.get('max_failure'):
failure_count = beaver_config.get('max_failure')
sleep_time = beaver_config.get('respawn_delay') ** failure_count
logger.info('Caught transport exception: %s', e)
logger.info('Reconnecting in %d seconds' % sleep_time)
try:
transport.invalidate()
time.sleep(sleep_time)
transport.reconnect()
if transport.valid():
failure_count = 0
logger.info('Reconnected successfully')
except KeyboardInterrupt:
logger.info('User cancelled respawn.')
transport.interrupt()
sys.exit(0)
elif command == 'addglob':
beaver_config.addglob(*data)
transport.addglob(*data)
elif command == 'exit':
break
except KeyboardInterrupt:
logger.debug('Queue Interruped')
if transport is not None:
transport.interrupt()
logger.debug('Queue Shutdown')