44import tempfile
55import logging
66
7+ from kafka import KafkaClient , MultiProcessConsumer
78
89import beaver
910from beaver .config import BeaverConfig
@@ -49,8 +50,14 @@ def test_builtin_kafka(cls):
4950 cls .beaver_config .set ('kafka_hosts' , cls .server .host + ":" + str (cls .server .port ))
5051
5152 transport = create_transport (cls .beaver_config , logger = cls .logger )
53+
54+ cls .assertIsInstance (transport , beaver .transports .kafka_transport .KafkaTransport )
55+
5256 data = {}
53- lines = ['log1\n ' , 'log2\n ' ]
57+ lines = []
58+ n = 100
59+ for i in range (n ):
60+ lines .append ('log' + str (i ) + '\n ' )
5461 new_lines = []
5562 for line in lines :
5663 message = unicode_dammit (line )
@@ -60,8 +67,18 @@ def test_builtin_kafka(cls):
6067 data ['lines' ] = new_lines
6168 data ['fields' ] = []
6269 transport .callback ("test.log" , ** data )
63- transport .interrupt ()
64- cls .assertIsInstance (transport , beaver .transports .kafka_transport .KafkaTransport )
6570
71+ messages = cls ._consume_messages (cls .server .host , cls .server .port )
72+ cls .assertEqual (n , messages .__len__ ())
73+ for message in messages :
74+ cls .assertIn ('"file": "test.log", "message": "log' , message .message .value );
75+ print (message )
76+ print ('\n ' )
77+
78+ transport .interrupt ()
6679
80+ def _consume_messages (cls , host , port ):
81+ kafka = KafkaClient (cls .server .host + ":" + str (cls .server .port ))
82+ consumer = MultiProcessConsumer (kafka , None , cls .beaver_config .get ('kafka_topic' ), num_procs = 5 )
83+ return consumer .get_messages (count = 100 , block = True , timeout = 5 )
6784
0 commit comments