pFad - Phone/Frame/Anonymizer/Declutterfier! Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

URL: http://github.com/python-beaver/python-beaver/commit/eeb7459648f104586d229f30dbbdbbee1b63e6e0

Improved kafka test · python-beaver/python-beaver@eeb7459 · GitHub
Skip to content

Commit eeb7459

Browse files
committed
Improved kafka test
1 parent dd2786f commit eeb7459

1 file changed

Lines changed: 20 additions & 3 deletions

File tree

beaver/tests/test_kafka_transport.py

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import tempfile
55
import logging
66

7+
from kafka import KafkaClient, MultiProcessConsumer
78

89
import beaver
910
from beaver.config import BeaverConfig
@@ -49,8 +50,14 @@ def test_builtin_kafka(cls):
4950
cls.beaver_config.set('kafka_hosts', cls.server.host + ":" + str(cls.server.port))
5051

5152
transport = create_transport(cls.beaver_config, logger=cls.logger)
53+
54+
cls.assertIsInstance(transport, beaver.transports.kafka_transport.KafkaTransport)
55+
5256
data = {}
53-
lines = ['log1\n', 'log2\n']
57+
lines = []
58+
n=100
59+
for i in range(n):
60+
lines.append('log' + str(i) + '\n')
5461
new_lines = []
5562
for line in lines:
5663
message = unicode_dammit(line)
@@ -60,8 +67,18 @@ def test_builtin_kafka(cls):
6067
data['lines'] = new_lines
6168
data['fields'] = []
6269
transport.callback("test.log", **data)
63-
transport.interrupt()
64-
cls.assertIsInstance(transport, beaver.transports.kafka_transport.KafkaTransport)
6570

71+
messages = cls._consume_messages(cls.server.host, cls.server.port)
72+
cls.assertEqual(n, messages.__len__())
73+
for message in messages:
74+
cls.assertIn('"file": "test.log", "message": "log', message.message.value);
75+
print(message)
76+
print('\n')
77+
78+
transport.interrupt()
6679

80+
def _consume_messages(cls, host, port):
81+
kafka = KafkaClient(cls.server.host + ":" + str(cls.server.port))
82+
consumer = MultiProcessConsumer(kafka, None, cls.beaver_config.get('kafka_topic'), num_procs=5)
83+
return consumer.get_messages(count=100, block=True, timeout=5)
6784

0 commit comments

Comments
 (0)
pFad - Phonifier reborn

Pfad - The Proxy pFad © 2024 Your Company Name. All rights reserved.





Check this box to remove all script contents from the fetched content.



Check this box to remove all images from the fetched content.


Check this box to remove all CSS styles from the fetched content.


Check this box to keep images inefficiently compressed and original size.

Note: This service is not intended for secure transactions such as banking, social media, email, or purchasing. Use at your own risk. We assume no liability whatsoever for broken pages.


Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy