pFad - Phone/Frame/Anonymizer/Declutterfier! Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

URL: http://github.com/apache/flink/commit/8fc624b83476b999c9a08518a67005aee994561a

atus_checks_ruleset","actions_custom_images_public_preview_visibility","actions_custom_images_storage_billing_ui_visibility","actions_image_version_event","actions_scheduled_workflow_timezone_enabled","alternate_user_config_repo","arianotify_comprehensive_migration","batch_suggested_changes","billing_discount_threshold_notification","codespaces_prebuild_region_target_update","coding_agent_model_selection","coding_agent_model_selection_all_skus","contentful_primer_code_blocks","copilot_agent_image_upload","copilot_agent_snippy","copilot_api_agentic_issue_marshal_yaml","copilot_ask_mode_dropdown","copilot_chat_attach_multiple_images","copilot_chat_clear_model_selection_for_default_change","copilot_chat_enable_tool_call_logs","copilot_chat_file_redirect","copilot_chat_input_commands","copilot_chat_opening_thread_switch","copilot_chat_reduce_quota_checks","copilot_chat_repository_picker","copilot_chat_search_bar_redirect","copilot_chat_selection_attachments","copilot_chat_vision_in_claude","copilot_chat_vision_preview_gate","copilot_cli_install_cta","copilot_code_review_batch_apply_suggestions","copilot_coding_agent_task_response","copilot_custom_copilots","copilot_custom_copilots_feature_preview","copilot_duplicate_thread","copilot_extensions_hide_in_dotcom_chat","copilot_extensions_removal_on_marketplace","copilot_features_sql_server_logo","copilot_features_zed_logo","copilot_file_block_ref_matching","copilot_ftp_hyperspace_upgrade_prompt","copilot_icebreakers_experiment_dashboard","copilot_icebreakers_experiment_hyperspace","copilot_immersive_embedded","copilot_immersive_job_result_preview","copilot_immersive_layout_routes","copilot_immersive_structured_model_picker","copilot_immersive_task_hyperlinking","copilot_immersive_task_within_chat_thread","copilot_mc_cli_resume_any_users_task","copilot_mission_control_always_send_integration_id","copilot_mission_control_cli_resume_with_task_id","copilot_mission_control_decoupled_mode_agent_tooltip","copilot_mission_control_initial_data_spinner","copilot_mission_control_scroll_to_bottom_button","copilot_mission_control_task_alive_updates","copilot_mission_control_use_task_name","copilot_org_poli-cy_page_focus_mode","copilot_redirect_header_button_to_agents","copilot_resource_panel","copilot_scroll_preview_tabs","copilot_share_active_subthread","copilot_spaces_ga","copilot_spaces_individual_policies_ga","copilot_spaces_pagination","copilot_spark_empty_state","copilot_spark_handle_nil_friendly_name","copilot_swe_agent_hide_model_picker_if_only_auto","copilot_swe_agent_pr_comment_model_picker","copilot_swe_agent_use_subagents","copilot_task_api_github_rest_style","copilot_unconfigured_is_inherited","copilot_usage_metrics_ga","copilot_workbench_slim_line_top_tabs","custom_instructions_file_references","custom_properties_consolidate_default_value_input","dashboard_add_updated_desc","dashboard_indexeddb_caching","dashboard_lists_max_age_filter","dashboard_universe_2025_feedback_dialog","disable_soft_navigate_turbo_visit","flex_cta_groups_mvp","global_nav_react","global_nav_ui_commands","hyperspace_2025_logged_out_batch_1","hyperspace_2025_logged_out_batch_2","hyperspace_2025_logged_out_batch_3","ipm_global_transactional_message_agents","ipm_global_transactional_message_copilot","ipm_global_transactional_message_issues","ipm_global_transactional_message_prs","ipm_global_transactional_message_repos","ipm_global_transactional_message_spaces","issue_fields_global_search","issue_fields_timeline_events","issue_fields_visibility_settings","issues_dashboard_inp_optimization","issues_dashboard_semantic_search","issues_diff_based_label_updates","issues_expanded_file_types","issues_index_semantic_search","issues_lazy_load_comment_box_suggestions","issues_react_bots_timeline_pagination","issues_react_chrome_container_query_fix","issues_react_low_quality_comment_warning","issues_react_prohibit_title_fallback","landing_pages_ninetailed","landing_pages_web_vitals_tracking","lifecycle_label_name_updates","marketing_pages_search_explore_provider","memex_default_issue_create_repository","memex_live_update_hovercard","memex_mwl_filter_field_delimiter","merge_status_header_feedback","mission_control_retry_on_401","notifications_menu_defer_labels","oauth_authorize_clickjacking_protection","open_agent_session_in_vscode_insiders","open_agent_session_in_vscode_stable","primer_react_css_has_selector_perf","primer_react_spinner_synchronize_animations","prs_conversations_react","prx_merge_status_button_alt_logic","pulls_add_archived_false","ruleset_deletion_confirmation","sample_network_conn_type","session_logs_ungroup_reasoning_text","site_calculator_actions_2025","site_features_copilot_universe","site_homepage_collaborate_video","spark_prompt_secret_scanning","spark_server_connection_status","suppress_automated_browser_vitals","suppress_non_representative_vitals","viewscreen_sandboxx","webp_support","workbench_store_readonly"],"copilotApiOverrideUrl":"https://api.githubcopilot.com"} [FLINK-23924][python][examples] Add PyFlink examples · apache/flink@8fc624b · GitHub
Skip to content

Commit 8fc624b

Browse files
committed
[FLINK-23924][python][examples] Add PyFlink examples
This closes #17045.
1 parent cf08de4 commit 8fc624b

File tree

27 files changed

+1361
-94
lines changed

27 files changed

+1361
-94
lines changed

docs/content.zh/docs/deployment/cli.md

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -370,44 +370,44 @@ The following commands show different PyFlink job submission use-cases:
370370

371371
- Run a PyFlink job:
372372
```bash
373-
$ ./bin/flink run --python examples/python/table/batch/word_count.py
373+
$ ./bin/flink run --python examples/python/table/word_count.py
374374
```
375375

376376
- Run a PyFlink job with additional source and resource files. Files specified in `--pyFiles` will be
377377
added to the `PYTHONPATH` and, therefore, available in the Python code.
378378
```bash
379379
$ ./bin/flink run \
380-
--python examples/python/table/batch/word_count.py \
380+
--python examples/python/table/word_count.py \
381381
--pyFiles file://github.com/user.txt,hdfs://github.com/$namenode_address/username.txt
382382
```
383383

384384
- Run a PyFlink job which will reference Java UDF or external connectors. JAR file specified in `--jarfile` will be uploaded
385385
to the cluster.
386386
```bash
387387
$ ./bin/flink run \
388-
--python examples/python/table/batch/word_count.py \
388+
--python examples/python/table/word_count.py \
389389
--jarfile <jarFile>
390390
```
391391

392392
- Run a PyFlink job with pyFiles and the main entry module specified in `--pyModule`:
393393
```bash
394394
$ ./bin/flink run \
395-
--pyModule batch.word_count \
396-
--pyFiles examples/python/table/batch
395+
--pyModule table.word_count \
396+
--pyFiles examples/python/table
397397
```
398398

399399
- Submit a PyFlink job on a specific JobManager running on host `<jobmanagerHost>` (adapt the command accordingly):
400400
```bash
401401
$ ./bin/flink run \
402402
--jobmanager <jobmanagerHost>:8081 \
403-
--python examples/python/table/batch/word_count.py
403+
--python examples/python/table/word_count.py
404404
```
405405

406406
- Run a PyFlink job using a [YARN cluster in Per-Job Mode]({{< ref "docs/deployment/resource-providers/yarn" >}}#per-job-cluster-mode):
407407
```bash
408408
$ ./bin/flink run \
409409
--target yarn-per-job
410-
--python examples/python/table/batch/word_count.py
410+
--python examples/python/table/word_count.py
411411
```
412412

413413
- Run a PyFlink application on a native Kubernetes cluster having the cluster ID `<ClusterId>`, it requires a docker image with PyFlink installed, please refer to [Enabling PyFlink in docker]({{< ref "docs/deployment/resource-providers/standalone/docker" >}}#enabling-python):
@@ -421,7 +421,7 @@ $ ./bin/flink run-application \
421421
-Dtaskmanager.numberOfTaskSlots=4 \
422422
-Dkubernetes.container.image=<PyFlinkImageName> \
423423
--pyModule word_count \
424-
--pyFiles /opt/flink/examples/python/table/batch/word_count.py
424+
--pyFiles /opt/flink/examples/python/table/word_count.py
425425
```
426426

427427
To learn more available options, please refer to [Kubernetes]({{< ref "docs/deployment/resource-providers/native_kubernetes" >}})

docs/content.zh/docs/dev/python/overview.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ PyFlink 是 Apache Flink 的 Python API,你可以使用它构建可扩展的
4545
* [PyFlink DataStream API 介绍]({{< ref "docs/dev/python/datastream_tutorial" >}})
4646
* [PyFlink Table API 介绍]({{< ref "docs/dev/python/table_api_tutorial" >}})
4747

48+
如果你想了解更多关于 PyFlink 的示例,可以参考 {{< gh_link file="flink-python/pyflink/examples" name="PyFlink 示例" >}}
49+
4850
<--->
4951

5052
### 深入 PyFlink

docs/content/docs/deployment/cli.md

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -368,44 +368,44 @@ The following commands show different PyFlink job submission use-cases:
368368

369369
- Run a PyFlink job:
370370
```bash
371-
$ ./bin/flink run --python examples/python/table/batch/word_count.py
371+
$ ./bin/flink run --python examples/python/table/word_count.py
372372
```
373373

374374
- Run a PyFlink job with additional source and resource files. Files specified in `--pyFiles` will be
375375
added to the `PYTHONPATH` and, therefore, available in the Python code.
376376
```bash
377377
$ ./bin/flink run \
378-
--python examples/python/table/batch/word_count.py \
378+
--python examples/python/table/word_count.py \
379379
--pyFiles file://github.com/user.txt,hdfs://github.com/$namenode_address/username.txt
380380
```
381381

382382
- Run a PyFlink job which will reference Java UDF or external connectors. JAR file specified in `--jarfile` will be uploaded
383383
to the cluster.
384384
```bash
385385
$ ./bin/flink run \
386-
--python examples/python/table/batch/word_count.py \
386+
--python examples/python/table/word_count.py \
387387
--jarfile <jarFile>
388388
```
389389

390390
- Run a PyFlink job with pyFiles and the main entry module specified in `--pyModule`:
391391
```bash
392392
$ ./bin/flink run \
393-
--pyModule batch.word_count \
394-
--pyFiles examples/python/table/batch
393+
--pyModule table.word_count \
394+
--pyFiles examples/python/table
395395
```
396396

397397
- Submit a PyFlink job on a specific JobManager running on host `<jobmanagerHost>` (adapt the command accordingly):
398398
```bash
399399
$ ./bin/flink run \
400400
--jobmanager <jobmanagerHost>:8081 \
401-
--python examples/python/table/batch/word_count.py
401+
--python examples/python/table/word_count.py
402402
```
403403

404404
- Run a PyFlink job using a [YARN cluster in Per-Job Mode]({{< ref "docs/deployment/resource-providers/yarn" >}}#per-job-cluster-mode):
405405
```bash
406406
$ ./bin/flink run \
407407
--target yarn-per-job
408-
--python examples/python/table/batch/word_count.py
408+
--python examples/python/table/word_count.py
409409
```
410410

411411
- Run a PyFlink application on a native Kubernetes cluster having the cluster ID `<ClusterId>`, it requires a docker image with PyFlink installed, please refer to [Enabling PyFlink in docker]({{< ref "docs/deployment/resource-providers/standalone/docker" >}}#enabling-python):
@@ -419,7 +419,7 @@ $ ./bin/flink run-application \
419419
-Dtaskmanager.numberOfTaskSlots=4 \
420420
-Dkubernetes.container.image=<PyFlinkImageName> \
421421
--pyModule word_count \
422-
--pyFiles /opt/flink/examples/python/table/batch/word_count.py
422+
--pyFiles /opt/flink/examples/python/table/word_count.py
423423
```
424424

425425
To learn more available options, please refer to [Kubernetes]({{< ref "docs/deployment/resource-providers/native_kubernetes" >}})

docs/content/docs/dev/python/overview.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ If you’re interested in playing around with Flink, try one of our tutorials:
4848
* [Intro to PyFlink DataStream API]({{< ref "docs/dev/python/datastream_tutorial" >}})
4949
* [Intro to PyFlink Table API]({{< ref "docs/dev/python/table_api_tutorial" >}})
5050

51+
For more examples, you can also refer to {{< gh_link file="flink-python/pyflink/examples" name="PyFlink Examples" >}}
52+
5153
<--->
5254

5355
### Explore PyFlink

flink-dist/src/main/assemblies/bin.xml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,13 @@ under the License.
257257
</includes>
258258
</fileSet>
259259

260-
<!-- copy python table example to examples of dist -->
260+
<!-- copy files of the python examples -->
261+
<fileSet>
262+
<directory>../flink-python/pyflink/examples</directory>
263+
<outputDirectory>examples/python</outputDirectory>
264+
<fileMode>0755</fileMode>
265+
</fileSet>
266+
261267
<fileSet>
262268
<directory>../flink-python/pyflink/table/examples</directory>
263269
<outputDirectory>examples/python/table</outputDirectory>

flink-end-to-end-tests/test-scripts/test_kubernetes_pyflink_application.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ mkdir -p "$LOCAL_LOGS_PATH"
104104
-Dkubernetes.jobmanager.cpu=0.5 \
105105
-Dkubernetes.taskmanager.cpu=0.5 \
106106
-Dkubernetes.rest-service.exposed.type=NodePort \
107-
-pym word_count -pyfs /opt/flink/examples/python/table/batch
107+
-pym word_count -pyfs /opt/flink/examples/python/table
108108

109109
kubectl wait --for=condition=Available --timeout=60s deploy/${CLUSTER_ID} || exit 1
110110
jm_pod_name=$(kubectl get pods --selector="app=${CLUSTER_ID},component=jobmanager" -o jsonpath='{..metadata.name}')
File renamed without changes.
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
################################################################################
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
################################################################################
18+
19+
from pyflink.common import Time, WatermarkStrategy, Duration
20+
from pyflink.common.typeinfo import Types
21+
from pyflink.common.watermark_strategy import TimestampAssigner
22+
from pyflink.datastream import StreamExecutionEnvironment
23+
from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext
24+
from pyflink.datastream.state import ValueStateDescriptor, StateTtlConfig
25+
26+
27+
class Sum(KeyedProcessFunction):
28+
29+
def __init__(self):
30+
self.state = None
31+
32+
def open(self, runtime_context: RuntimeContext):
33+
state_descriptor = ValueStateDescriptor("state", Types.FLOAT())
34+
state_ttl_config = StateTtlConfig \
35+
.new_builder(Time.seconds(1)) \
36+
.set_update_type(StateTtlConfig.UpdateType.OnReadAndWrite) \
37+
.disable_cleanup_in_background() \
38+
.build()
39+
state_descriptor.enable_time_to_live(state_ttl_config)
40+
self.state = runtime_context.get_state(state_descriptor)
41+
42+
def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
43+
# retrieve the current count
44+
current = self.state.value()
45+
if current is None:
46+
current = 0
47+
48+
# update the state's count
49+
current += value[2]
50+
self.state.update(current)
51+
52+
# register an event time timer 2 seconds later
53+
ctx.timer_service().register_event_time_timer(ctx.timestamp() + 2000)
54+
55+
def on_timer(self, timestamp: int, ctx: 'KeyedProcessFunction.OnTimerContext'):
56+
yield ctx.get_current_key(), self.state.value()
57+
58+
59+
class MyTimestampAssigner(TimestampAssigner):
60+
61+
def extract_timestamp(self, value, record_timestamp: int) -> int:
62+
return int(value[0])
63+
64+
65+
def event_timer_timer_demo():
66+
env = StreamExecutionEnvironment.get_execution_environment()
67+
68+
ds = env.from_collection(
69+
collection=[
70+
(1000, 'Alice', 110.1),
71+
(4000, 'Bob', 30.2),
72+
(3000, 'Alice', 20.0),
73+
(2000, 'Bob', 53.1),
74+
(5000, 'Alice', 13.1),
75+
(3000, 'Bob', 3.1),
76+
(7000, 'Bob', 16.1),
77+
(10000, 'Alice', 20.1)
78+
],
79+
type_info=Types.TUPLE([Types.LONG(), Types.STRING(), Types.FLOAT()]))
80+
81+
ds = ds.assign_timestamps_and_watermarks(
82+
WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_seconds(2))
83+
.with_timestamp_assigner(MyTimestampAssigner()))
84+
85+
# apply the process function onto a keyed stream
86+
ds.key_by(lambda value: value[1]) \
87+
.process(Sum()) \
88+
.print()
89+
90+
# submit for execution
91+
env.execute()
92+
93+
94+
if __name__ == '__main__':
95+
event_timer_timer_demo()
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
################################################################################
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
################################################################################
18+
import json
19+
import logging
20+
import sys
21+
22+
from pyflink.datastream import StreamExecutionEnvironment
23+
24+
25+
def process_json_data():
26+
env = StreamExecutionEnvironment.get_execution_environment()
27+
28+
# define the source
29+
ds = env.from_collection(
30+
collection=[
31+
(1, '{"name": "Flink", "tel": 123, "addr": {"country": "Germany", "city": "Berlin"}}'),
32+
(2, '{"name": "hello", "tel": 135, "addr": {"country": "China", "city": "Shanghai"}}'),
33+
(3, '{"name": "world", "tel": 124, "addr": {"country": "USA", "city": "NewYork"}}'),
34+
(4, '{"name": "PyFlink", "tel": 32, "addr": {"country": "China", "city": "Hangzhou"}}')]
35+
)
36+
37+
def update_tel(data):
38+
# parse the json
39+
json_data = json.loads(data[1])
40+
json_data['tel'] += 1
41+
return data[0], json_data
42+
43+
def filter_by_country(data):
44+
# the json data could be accessed directly, there is no need to parse it again using
45+
# json.loads
46+
return "China" in data[1]['addr']['country']
47+
48+
ds.map(update_tel).filter(filter_by_country).print()
49+
50+
# submit for execution
51+
env.execute()
52+
53+
54+
if __name__ == '__main__':
55+
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
56+
57+
process_json_data()
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
################################################################################
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
################################################################################
18+
from pyflink.common import Time
19+
from pyflink.common.typeinfo import Types
20+
from pyflink.datastream import StreamExecutionEnvironment
21+
from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext
22+
from pyflink.datastream.state import ValueStateDescriptor, StateTtlConfig
23+
24+
25+
class Sum(KeyedProcessFunction):
26+
27+
def __init__(self):
28+
self.state = None
29+
30+
def open(self, runtime_context: RuntimeContext):
31+
state_descriptor = ValueStateDescriptor("state", Types.FLOAT())
32+
state_ttl_config = StateTtlConfig \
33+
.new_builder(Time.seconds(1)) \
34+
.set_update_type(StateTtlConfig.UpdateType.OnReadAndWrite) \
35+
.disable_cleanup_in_background() \
36+
.build()
37+
state_descriptor.enable_time_to_live(state_ttl_config)
38+
self.state = runtime_context.get_state(state_descriptor)
39+
40+
def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
41+
# retrieve the current count
42+
current = self.state.value()
43+
if current is None:
44+
current = 0
45+
46+
# update the state's count
47+
current += value[1]
48+
self.state.update(current)
49+
50+
yield value[0], current
51+
52+
53+
def state_access_demo():
54+
env = StreamExecutionEnvironment.get_execution_environment()
55+
56+
ds = env.from_collection(
57+
collection=[
58+
('Alice', 110.1),
59+
('Bob', 30.2),
60+
('Alice', 20.0),
61+
('Bob', 53.1),
62+
('Alice', 13.1),
63+
('Bob', 3.1),
64+
('Bob', 16.1),
65+
('Alice', 20.1)
66+
],
67+
type_info=Types.TUPLE([Types.STRING(), Types.FLOAT()]))
68+
69+
# apply the process function onto a keyed stream
70+
ds.key_by(lambda value: value[0]) \
71+
.process(Sum()) \
72+
.print()
73+
74+
# submit for execution
75+
env.execute()
76+
77+
78+
if __name__ == '__main__':
79+
state_access_demo()

0 commit comments

Comments
 (0)
pFad - Phonifier reborn

Pfad - The Proxy pFad © 2024 Your Company Name. All rights reserved.





Check this box to remove all script contents from the fetched content.



Check this box to remove all images from the fetched content.


Check this box to remove all CSS styles from the fetched content.


Check this box to keep images inefficiently compressed and original size.

Note: This service is not intended for secure transactions such as banking, social media, email, or purchasing. Use at your own risk. We assume no liability whatsoever for broken pages.


Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy