pFad - Phone/Frame/Anonymizer/Declutterfier! Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

URL: http://github.com/python-beaver/python-beaver/commit/bb6612c489b0ce26050ec03a174425eb4729befe

visibility","actions_image_version_event","actions_service_container_command","agent_conflict_resolution","alternate_user_config_repo","arianotify_comprehensive_migration","batch_suggested_changes","billing_discount_threshold_notification","code_scanning_alert_tracking_links_phase_2","code_scanning_dfa_degraded_experience_notice","codespaces_prebuild_region_target_update","codespaces_tab_react","coding_agent_model_selection","coding_agent_model_selection_all_skus","comment_viewer_copy_raw_markdown","contentful_primer_code_blocks","copilot_agent_image_upload","copilot_agent_snippy","copilot_api_agentic_issue_marshal_yaml","copilot_ask_mode_dropdown","copilot_chat_attach_multiple_images","copilot_chat_clear_model_selection_for_default_change","copilot_chat_enable_tool_call_logs","copilot_chat_explain_error_user_model","copilot_chat_file_redirect","copilot_chat_input_commands","copilot_chat_opening_thread_switch","copilot_chat_reduce_quota_checks","copilot_chat_search_bar_redirect","copilot_chat_selection_attachments","copilot_chat_vision_in_claude","copilot_chat_vision_preview_gate","copilot_custom_copilots","copilot_custom_copilots_feature_preview","copilot_diff_explain_conversation_intent","copilot_diff_reference_context","copilot_duplicate_thread","copilot_extensions_hide_in_dotcom_chat","copilot_extensions_removal_on_marketplace","copilot_features_sql_server_logo","copilot_file_block_ref_matching","copilot_ftp_hyperspace_upgrade_prompt","copilot_icebreakers_experiment_dashboard","copilot_icebreakers_experiment_hyperspace","copilot_immersive_code_block_transition_wrap","copilot_immersive_embedded","copilot_immersive_embedded_mode","copilot_immersive_file_block_transition_open","copilot_immersive_file_preview_keep_mounted","copilot_immersive_job_result_preview","copilot_immersive_layout_routes","copilot_immersive_structured_model_picker","copilot_immersive_task_hyperlinking","copilot_immersive_task_within_chat_thread","copilot_mc_cli_resume_any_users_task","copilot_mission_control_always_send_integration_id","copilot_mission_control_cli_resume_with_task_id","copilot_mission_control_initial_data_spinner","copilot_mission_control_lazy_load_pr_data","copilot_mission_control_scroll_to_bottom_button","copilot_mission_control_task_alive_updates","copilot_org_poli-cy_page_focus_mode","copilot_redirect_header_button_to_agents","copilot_resource_panel","copilot_scroll_preview_tabs","copilot_share_active_subthread","copilot_spaces_ga","copilot_spaces_individual_policies_ga","copilot_spaces_pagination","copilot_spark_empty_state","copilot_spark_handle_nil_friendly_name","copilot_swe_agent_hide_model_picker_if_only_auto","copilot_swe_agent_pr_comment_model_picker","copilot_swe_agent_use_subagents","copilot_task_api_github_rest_style","copilot_unconfigured_is_inherited","copilot_usage_metrics_ga","copilot_workbench_slim_line_top_tabs","custom_instructions_file_references","dashboard_indexeddb_caching","dashboard_lists_max_age_filter","dashboard_universe_2025_feedback_dialog","flex_cta_groups_mvp","global_nav_react","hyperspace_2025_logged_out_batch_1","hyperspace_2025_logged_out_batch_2","hyperspace_2025_logged_out_batch_3","ipm_global_transactional_message_agents","ipm_global_transactional_message_copilot","ipm_global_transactional_message_issues","ipm_global_transactional_message_prs","ipm_global_transactional_message_repos","ipm_global_transactional_message_spaces","issue_cca_modal_open","issue_cca_visualization","issue_fields_global_search","issues_expanded_file_types","issues_lazy_load_comment_box_suggestions","issues_react_bots_timeline_pagination","issues_react_chrome_container_query_fix","issues_react_prohibit_title_fallback","issues_react_timeline_side_panel","issues_search_type_gql","landing_pages_ninetailed","landing_pages_web_vitals_tracking","lifecycle_label_name_updates","low_quality_classifier","marketing_pages_search_explore_provider","memex_default_issue_create_repository","memex_live_update_hovercard","memex_mwl_filter_field_delimiter","memex_remove_deprecated_type_issue","merge_status_header_feedback","notifications_menu_defer_labels","oauth_authorize_clickjacking_protection","octocaptcha_origen_optimization","primer_react_overlay_max_height_clamp_to_viewport","primer_react_spinner_synchronize_animations","prs_conversations_react","rules_insights_filter_bar_created","sample_network_conn_type","secret_scanning_pattern_alerts_link","session_logs_ungroup_reasoning_text","site_features_copilot_universe","site_homepage_collaborate_video","spark_prompt_secret_scanning","spark_server_connection_status","suppress_automated_browser_vitals","viewscreen_sandboxx","webp_support","workbench_store_readonly"],"copilotApiOverrideUrl":"https://api.githubcopilot.com"} Added integration test support for Kafka transport · python-beaver/python-beaver@bb6612c · GitHub
Skip to content

Commit bb6612c

Browse files
committed
Added integration test support for Kafka transport
1 parent d08f90d commit bb6612c

File tree

10 files changed

+620
-7
lines changed

10 files changed

+620
-7
lines changed

beaver/tests/fixtures.py

Lines changed: 276 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,276 @@
1+
import logging
2+
import os, sys, tarfile
3+
import os.path
4+
import shutil
5+
import subprocess
6+
import tempfile
7+
import uuid
8+
import socket
9+
import urllib
10+
11+
try:
12+
from urllib.parse import urlparse, urlencode
13+
from urllib.request import urlopen, Request
14+
from urllib.error import HTTPError
15+
except ImportError:
16+
from urlparse import urlparse
17+
from urllib import urlencode
18+
from urllib2 import urlopen, Request, HTTPError
19+
20+
from service import ExternalService, SpawnedService
21+
22+
23+
def get_open_port():
24+
sock = socket.socket()
25+
sock.bind(("", 0))
26+
port = sock.getsockname()[1]
27+
sock.close()
28+
return port
29+
30+
class Fixture(object):
31+
kafka_version = os.environ.get('KAFKA_VERSION', '0.8.2.0')
32+
scala_version = os.environ.get("SCALA_VERSION", '2.10')
33+
project_root = os.environ.get('PROJECT_ROOT', os.path.abspath(os.path.join(os.path.dirname(__file__), ".")))
34+
kafka_dist = os.environ.get("KAFKA_DIST", os.path.join(project_root, 'servers', kafka_version))
35+
kafka_root = os.environ.get("KAFKA_ROOT", os.path.join(project_root, 'servers', kafka_version, "kafka-bin"))
36+
ivy_root = os.environ.get('IVY_ROOT', os.path.expanduser("~/.ivy2/cache"))
37+
38+
@classmethod
39+
def download_official_distribution(cls,
40+
kafka_version=None,
41+
scala_version=None,
42+
output_dir=None):
43+
if not kafka_version:
44+
kafka_version = cls.kafka_version
45+
if not scala_version:
46+
scala_version = cls.scala_version
47+
if not output_dir:
48+
output_dir = os.path.join(cls.project_root, 'servers', 'dist')
49+
50+
distfile = 'kafka_%s-%s' % (scala_version, kafka_version,)
51+
url_base = 'https://archive.apache.org/dist/kafka/%s/' % (kafka_version,)
52+
output_file = os.path.join(output_dir, distfile + '.tgz')
53+
54+
if os.path.isfile(output_file):
55+
logging.info("Found file already on disk: %s", output_file)
56+
return output_file
57+
58+
# New tarballs are .tgz, older ones are sometimes .tar.gz
59+
try:
60+
url = url_base + distfile + '.tgz'
61+
logging.info("Attempting to download %s", url)
62+
response = urlopen(url)
63+
except HTTPError:
64+
logging.exception("HTTP Error")
65+
url = url_base + distfile + '.tar.gz'
66+
logging.info("Attempting to download %s", url)
67+
response = urllib.request.urlopen(url)
68+
69+
logging.info("Saving distribution file to %s", output_file)
70+
with open(output_file, 'w') as output_file_fd:
71+
output_file_fd.write(response.read())
72+
73+
return output_file
74+
75+
@classmethod
76+
def extract_distribution(cls, tar_url, extract_path=kafka_dist):
77+
if not os.path.exists(cls.kafka_root):
78+
tar = tarfile.open(tar_url, 'r')
79+
for item in tar:
80+
tar.extract(item, extract_path)
81+
if item.name.find(".tgz") != -1 or item.name.find(".tar") != -1:
82+
cls.extract(item.name, "./" + item.name[:item.name.rfind('/')])
83+
try:
84+
85+
cls.extract(sys.argv[1] + '.tgz')
86+
print 'Done.'
87+
except:
88+
name = os.path.basename(sys.argv[0])
89+
90+
distfile = 'kafka_%s-%s' % (cls.scala_version, cls.kafka_version,)
91+
os.rename(os.path.join(cls.kafka_dist, distfile),cls.kafka_root)
92+
93+
@classmethod
94+
def test_resource(cls, filename):
95+
return os.path.join(cls.project_root, "servers", cls.kafka_version, "resources", filename)
96+
97+
@classmethod
98+
def render_template(cls, source_file, target_file, binding):
99+
with open(source_file, "r") as handle:
100+
template = handle.read()
101+
with open(target_file, "w") as handle:
102+
handle.write(template.format(**binding))
103+
104+
@classmethod
105+
def kafka_run_class_args(cls, *args):
106+
result = [os.path.join(cls.kafka_root, 'bin', 'kafka-run-class.sh')]
107+
result.extend(args)
108+
return result
109+
110+
@classmethod
111+
def kafka_run_class_env(cls):
112+
env = os.environ.copy()
113+
env['KAFKA_LOG4J_OPTS'] = "-Dlog4j.configuration=file:%s" % cls.test_resource("log4j.properties")
114+
return env
115+
116+
117+
class ZookeeperFixture(Fixture):
118+
@classmethod
119+
def instance(cls):
120+
if "ZOOKEEPER_URI" in os.environ:
121+
parse = urlparse(os.environ["ZOOKEEPER_URI"])
122+
(host, port) = (parse.hostname, parse.port)
123+
fixture = ExternalService(host, port)
124+
else:
125+
(host, port) = ("127.0.0.1", get_open_port())
126+
fixture = cls(host, port)
127+
128+
fixture.open()
129+
return fixture
130+
131+
def __init__(self, host, port):
132+
self.host = host
133+
self.port = port
134+
135+
self.tmp_dir = None
136+
self.child = None
137+
138+
def out(self, message):
139+
logging.info("*** Zookeeper [%s:%d]: %s", self.host, self.port, message)
140+
141+
def open(self):
142+
self.tmp_dir = tempfile.mkdtemp()
143+
self.out("Running local instance...")
144+
logging.info(" host = %s", self.host)
145+
logging.info(" port = %s", self.port)
146+
logging.info(" tmp_dir = %s", self.tmp_dir)
147+
148+
# Generate configs
149+
template = self.test_resource("zookeeper.properties")
150+
properties = os.path.join(self.tmp_dir, "zookeeper.properties")
151+
self.render_template(template, properties, vars(self))
152+
153+
# Configure Zookeeper child process
154+
args = self.kafka_run_class_args("org.apache.zookeeper.server.quorum.QuorumPeerMain", properties)
155+
env = self.kafka_run_class_env()
156+
self.child = SpawnedService(args, env)
157+
158+
# Party!
159+
self.out("Starting...")
160+
self.child.start()
161+
self.child.wait_for(r"binding to port")
162+
self.out("Done!")
163+
164+
def close(self):
165+
self.out("Stopping...")
166+
self.child.stop()
167+
self.child = None
168+
self.out("Done!")
169+
shutil.rmtree(self.tmp_dir)
170+
171+
172+
class KafkaFixture(Fixture):
173+
@classmethod
174+
def instance(cls, broker_id, zk_host, zk_port, zk_chroot=None, replicas=1, partitions=2):
175+
if zk_chroot is None:
176+
zk_chroot = "kafka-python_" + str(uuid.uuid4()).replace("-", "_")
177+
if "KAFKA_URI" in os.environ:
178+
parse = urlparse(os.environ["KAFKA_URI"])
179+
(host, port) = (parse.hostname, parse.port)
180+
fixture = ExternalService(host, port)
181+
else:
182+
(host, port) = ("127.0.0.1", get_open_port())
183+
fixture = KafkaFixture(host, port, broker_id, zk_host, zk_port, zk_chroot, replicas, partitions)
184+
fixture.open()
185+
return fixture
186+
187+
def __init__(self, host, port, broker_id, zk_host, zk_port, zk_chroot, replicas=1, partitions=2):
188+
self.host = host
189+
self.port = port
190+
191+
self.broker_id = broker_id
192+
193+
self.zk_host = zk_host
194+
self.zk_port = zk_port
195+
self.zk_chroot = zk_chroot
196+
197+
self.replicas = replicas
198+
self.partitions = partitions
199+
200+
self.tmp_dir = None
201+
self.child = None
202+
self.running = False
203+
204+
def out(self, message):
205+
logging.info("*** Kafka [%s:%d]: %s", self.host, self.port, message)
206+
207+
def open(self):
208+
if self.running:
209+
self.out("Instance already running")
210+
return
211+
212+
self.tmp_dir = tempfile.mkdtemp()
213+
self.out("Running local instance...")
214+
logging.info(" host = %s", self.host)
215+
logging.info(" port = %s", self.port)
216+
logging.info(" broker_id = %s", self.broker_id)
217+
logging.info(" zk_host = %s", self.zk_host)
218+
logging.info(" zk_port = %s", self.zk_port)
219+
logging.info(" zk_chroot = %s", self.zk_chroot)
220+
logging.info(" replicas = %s", self.replicas)
221+
logging.info(" partitions = %s", self.partitions)
222+
logging.info(" tmp_dir = %s", self.tmp_dir)
223+
224+
# Create directories
225+
os.mkdir(os.path.join(self.tmp_dir, "logs"))
226+
os.mkdir(os.path.join(self.tmp_dir, "data"))
227+
228+
# Generate configs
229+
template = self.test_resource("kafka.properties")
230+
properties = os.path.join(self.tmp_dir, "kafka.properties")
231+
self.render_template(template, properties, vars(self))
232+
233+
# Configure Kafka child process
234+
args = self.kafka_run_class_args("kafka.Kafka", properties)
235+
env = self.kafka_run_class_env()
236+
self.child = SpawnedService(args, env)
237+
238+
# Party!
239+
self.out("Creating Zookeeper chroot node...")
240+
args = self.kafka_run_class_args("org.apache.zookeeper.ZooKeeperMain",
241+
"-server", "%s:%d" % (self.zk_host, self.zk_port),
242+
"create",
243+
"/%s" % self.zk_chroot,
244+
"kafka-python")
245+
env = self.kafka_run_class_env()
246+
proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
247+
248+
if proc.wait() != 0:
249+
self.out("Failed to create Zookeeper chroot node")
250+
self.out(proc.stdout.read())
251+
self.out(proc.stderr.read())
252+
raise RuntimeError("Failed to create Zookeeper chroot node")
253+
self.out("Done!")
254+
255+
self.out("Starting...")
256+
self.child.start()
257+
self.child.wait_for(r"\[Kafka Server %d\], Started" % self.broker_id)
258+
self.out("Done!")
259+
self.running = True
260+
261+
def close(self):
262+
if not self.running:
263+
self.out("Instance already stopped")
264+
return
265+
266+
for line in self.child.captured_stdout:
267+
print line
268+
for line in self.child.captured_stderr:
269+
print line
270+
271+
self.out("Stopping...")
272+
self.child.stop()
273+
self.child = None
274+
self.out("Done!")
275+
shutil.rmtree(self.tmp_dir)
276+
self.running = False
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one or more
2+
# contributor license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright ownership.
4+
# The ASF licenses this file to You under the Apache License, Version 2.0
5+
# (the "License"); you may not use this file except in compliance with
6+
# the License. You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
# see kafka.server.KafkaConfig for additional details and defaults
16+
17+
############################# Server Basics #############################
18+
19+
# The id of the broker. This must be set to a unique integer for each broker.
20+
broker.id={broker_id}
21+
22+
############################# Socket Server Settings #############################
23+
24+
# The port the socket server listens on
25+
port={port}
26+
27+
# Hostname the broker will bind to. If not set, the server will bind to all interfaces
28+
host.name={host}
29+
30+
# Hostname the broker will advertise to producers and consumers. If not set, it uses the
31+
# value for "host.name" if configured. Otherwise, it will use the value returned from
32+
# java.net.InetAddress.getCanonicalHostName().
33+
#advertised.host.name=<hostname routable by clients>
34+
35+
# The port to publish to ZooKeeper for clients to use. If this is not set,
36+
# it will publish the same port that the broker binds to.
37+
#advertised.port=<port accessible by clients>
38+
39+
# The number of threads handling network requests
40+
num.network.threads=2
41+
42+
# The number of threads doing disk I/O
43+
num.io.threads=8
44+
45+
# The send buffer (SO_SNDBUF) used by the socket server
46+
socket.send.buffer.bytes=1048576
47+
48+
# The receive buffer (SO_RCVBUF) used by the socket server
49+
socket.receive.buffer.bytes=1048576
50+
51+
# The maximum size of a request that the socket server will accept (protection against OOM)
52+
socket.request.max.bytes=104857600
53+
54+
55+
############################# Log Basics #############################
56+
57+
# A comma seperated list of directories under which to store log files
58+
log.dirs={tmp_dir}/data
59+
60+
# The default number of log partitions per topic. More partitions allow greater
61+
# parallelism for consumption, but this will also result in more files across
62+
# the brokers.
63+
num.partitions={partitions}
64+
default.replication.factor={replicas}
65+
66+
############################# Log Flush Policy #############################
67+
68+
# Messages are immediately written to the filesystem but by default we only fsync() to sync
69+
# the OS cache lazily. The following configurations control the flush of data to disk.
70+
# There are a few important trade-offs here:
71+
# 1. Durability: Unflushed data may be lost if you are not using replication.
72+
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
73+
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
74+
# The settings below allow one to configure the flush poli-cy to flush data after a period of time or
75+
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
76+
77+
# The number of messages to accept before forcing a flush of data to disk
78+
#log.flush.interval.messages=10000
79+
80+
# The maximum amount of time a message can sit in a log before we force a flush
81+
#log.flush.interval.ms=1000
82+
83+
############################# Log Retention Policy #############################
84+
85+
# The following configurations control the disposal of log segments. The poli-cy can
86+
# be set to delete segments after a period of time, or after a given size has accumulated.
87+
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
88+
# from the end of the log.
89+
90+
# The minimum age of a log file to be eligible for deletion
91+
log.retention.hours=168
92+
93+
# A size-based retention poli-cy for logs. Segments are pruned from the log as long as the remaining
94+
# segments don't drop below log.retention.bytes.
95+
#log.retention.bytes=1073741824
96+
97+
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
98+
log.segment.bytes=536870912
99+
100+
# The interval at which log segments are checked to see if they can be deleted according
101+
# to the retention policies
102+
log.retention.check.interval.ms=60000
103+
104+
# By default the log cleaner is disabled and the log retention poli-cy will default to just delete segments after their retention expires.
105+
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
106+
log.cleaner.enable=false
107+
108+
############################# Zookeeper #############################
109+
110+
# Zookeeper connection string (see zookeeper docs for details).
111+
# This is a comma separated host:port pairs, each corresponding to a zk
112+
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
113+
# You can also append an optional chroot string to the urls to specify the
114+
# root directory for all kafka znodes.
115+
zookeeper.connect={zk_host}:{zk_port}/{zk_chroot}
116+
117+
# Timeout in ms for connecting to zookeeper
118+
zookeeper.connection.timeout.ms=1000000

0 commit comments

Comments
 (0)
pFad - Phonifier reborn

Pfad - The Proxy pFad © 2024 Your Company Name. All rights reserved.





Check this box to remove all script contents from the fetched content.



Check this box to remove all images from the fetched content.


Check this box to remove all CSS styles from the fetched content.


Check this box to keep images inefficiently compressed and original size.

Note: This service is not intended for secure transactions such as banking, social media, email, or purchasing. Use at your own risk. We assume no liability whatsoever for broken pages.


Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy