0%

如何带参触发airflow并在函数使用这些参数

如何带参触发airflow并在函数使用这些参数

首先我们在改触发页面传入参数, 以json的格式

然后写一个可变参数, 例如像

def test(**kwargs):
    print("kwargs值")
    print(kwargs)
    print("kwargs类型")
    print(type(kwargs))

这样打印出来就是

kwargs值
{'conf': <airflow.configuration.AirflowConfigParser object at 0x7fd4ea4ce510>, 'dag': <DAG: big_sku_send_plan_push_to_kd>, 'dag_run': <DagRun big_sku_send_plan_push_to_kd @ 2022-04-28 02:52:22.507201+00:00: manual__2022-04-28T02:52:22.507201+00:00, externally triggered: True>, 'ds': '2022-04-28', 'ds_nodash': '20220428', 'execution_date': DateTime(2022, 4, 28, 2, 52, 22, 507201, tzinfo=Timezone('+00:00')), 'inlets': [], 'macros': <module 'airflow.macros' from '/usr/local/airflow/lib/python3.7/site-packages/airflow/macros/__init__.py'>, 'next_ds': '2022-04-28', 'next_ds_nodash': '20220428', 'next_execution_date': DateTime(2022, 4, 28, 2, 52, 22, 507201, tzinfo=Timezone('+00:00')), 'outlets': [], 'params': {'psw': 'ybreal', 'sn': ['a', 'b']}, 'prev_ds': '2022-04-28', 'prev_ds_nodash': '20220428', 'prev_execution_date': DateTime(2022, 4, 28, 2, 52, 22, 507201, tzinfo=Timezone('+00:00')), 'prev_execution_date_success': <Proxy at 0x7fd47bb2cd70 with factory <function TaskInstance.get_template_context.<locals>.<lambda> at 0x7fd47bb4de60>>, 'prev_start_date_success': <Proxy at 0x7fd47bb18d20 with factory <function TaskInstance.get_template_context.<locals>.<lambda> at 0x7fd47bb4d7a0>>, 'run_id': 'manual__2022-04-28T02:52:22.507201+00:00', 'task': <Task(PythonOperator): push_to_kd>, 'task_instance': <TaskInstance: big_sku_send_plan_push_to_kd.push_to_kd 2022-04-28T02:52:22.507201+00:00 [running]>, 'task_instance_key_str': 'big_sku_send_plan_push_to_kd__push_to_kd__20220428', 'test_mode': False, 'ti': <TaskInstance: big_sku_send_plan_push_to_kd.push_to_kd 2022-04-28T02:52:22.507201+00:00 [running]>, 'tomorrow_ds': '2022-04-29', 'tomorrow_ds_nodash': '20220429', 'ts': '2022-04-28T02:52:22.507201+00:00', 'ts_nodash': '20220428T025222', 'ts_nodash_with_tz': '20220428T025222.507201+0000', 'var': {'json': None, 'value': None}, 'yesterday_ds': '2022-04-27', 'yesterday_ds_nodash': '20220427', 'templates_dict': None}

kwargs类型
<class 'dict'>

可以看出kwargs是一个字典类型, 而我们在触发前传入的json参数存放在kwargs的params中, 我们可以通过kwargs.get("params")获取我们传入的参数

原文博主: 热衷开源的宝藏Boy
原文链接: http://www.fangzengye.com/article/ad03876db3e19d7ef77f764bc3e665c1
版权声明: 自由转载-非商用-禁止演绎-保持署名| CC BY-NC-ND 3.0

微信扫码加入我的星球联系我

评论区