From 673b448c83f3640b9119a324c91c16479371db11 Mon Sep 17 00:00:00 2001 From: Valay Dave Date: Fri, 29 Jul 2022 06:31:59 +0000 Subject: [PATCH 1/3] Airflow on Kubernetes minus Foreachs. - Support for all metaflow construct without foreach and sensors Squashed commit of the following: commit ef8b1e3768695bc4d3375a947ab1da9c6520bcf1 Author: Valay Dave Date: Fri Jul 29 01:06:26 2022 +0000 Removed sernsors and banned foreach's commit 8d517c4fecc6568777ad03eca81aaacfa3e91156 Author: Valay Dave Date: Fri Jul 29 00:59:01 2022 +0000 commiting k8s related file from master. commit a7e1ecdbf7b8b8d1cc21321cc8e196053f8305e4 Author: Valay Dave Date: Fri Jul 29 00:54:45 2022 +0000 Uncommented code for foreach support with k8s KubernetesPodOperator version 4.2.0 renamed `resources` to `container_resources` - Check : (https://github.com/apache/airflow/pull/24673) / - (https://github.com/apache/airflow/commit/45f4290712f5f779e57034f81dbaab5d77d5de85) This was done because `KubernetesPodOperator` didn't play nice with dynamic task mapping and they had to deprecate the `resources` argument. Hence the below codepath checks for the version of `KubernetesPodOperator` and then sets the argument. If the version < 4.2.0 then we set the argument as `resources`. If it is > 4.2.0 then we set the argument as `container_resources` The `resources` argument of KuberentesPodOperator is going to be deprecated soon in the future. So we will only use it for `KuberentesPodOperator` version < 4.2.0 The `resources` argument will also not work for foreach's. commit 2719f5d792ada91e3ae0af6f1a9a0c7d90f74660 Author: Valay Dave Date: Mon Jul 18 18:31:58 2022 +0000 nit fixes : - fixing comments. - refactor some variable/function names. commit 2079293fbba0d3d862476a7d67b36af8a3389342 Author: Valay Dave Date: Mon Jul 18 18:14:53 2022 +0000 change `token` to `production_token` commit 14aad5ff717418e4183a88fa84b2f5e5bb13927a Author: Valay Dave Date: Mon Jul 18 18:11:56 2022 +0000 Refactored import Airflow Sensors. commit b1472d5f7a629024ca45e8b83700400d02a4d455 Author: Valay Dave Date: Mon Jul 18 18:08:41 2022 +0000 new comment on `startup_timeout_seconds` env var. commit 6d81b758e8f06911258d26f790029f557488a0d7 Author: Valay Dave Date: Mon Jul 18 18:06:09 2022 +0000 Removing traces of `@airflow_schedule_interval` commit 0673db7475b22f3ce17c2680fc0a7c4271b5c946 Author: Valay Dave Date: Thu Jul 14 12:43:08 2022 -0700 Foreach polish (valayDave/metaflow#62) * Removing unused imports * Added validation logic for airflow version numbers with foreaches * Removed `airflow_schedule_interval` decorator. * Added production/deployment token related changes - Uses s3 as a backend to store the production token - Token used for avoiding nameclashes - token stored via `FlowDatastore` * Graph type validation for airflow foreachs - Airflow foreachs only support single node fanout. - validation invalidates graphs with nested foreachs * Added configuration about startup_timeout. * Added final todo on `resources` argument of k8sOp - added a commented code block - it needs to be uncommented when airflow releasese the patch for the op - Code seems feature complete keeping aside airflow patch commit 4b2dd1211fe2daeb76e29e4084f21e96b10cdae9 Author: Valay Dave Date: Thu Jul 7 19:33:07 2022 +0000 Removed retries from user-defaults. commit 0e87a97fea15ba3aaa6d4228b141bd796b767c43 Author: Valay Dave Date: Wed Jul 6 16:29:33 2022 +0000 updated pod startup time commit fce2bd263f368dbb78a34ac71f64e13c89277222 Author: Valay Dave Date: Wed Jun 29 18:44:11 2022 +0000 Adding default 1 retry for any airflow worker. commit 5ef6bbcde51b1f4923a192291ed0e07d07ec7321 Author: Valay Dave Date: Mon Jun 27 01:22:42 2022 +0000 Airflow Foreach Integration - Simple one node foreach-join support as gaurenteed by airflow - Fixed env variable setting issue - introduced MetaflowKuberentesOperator - Created a new operator to allow smootness in plumbing xcom values - Some todos commit d319fa915c558d82f1d127736ce34d3ae0da521d Author: Valay Dave Date: Fri Jun 24 21:12:09 2022 +0000 simplifying run-id macro. commit 0ffc813b1c4e6ba0103be51520f42d191371741a Author: Valay Dave Date: Fri Jun 24 11:51:42 2022 -0700 Refactored parameter macro settings. (valayDave/metaflow#60) commit a3a495077f34183d706c0edbe56d6213766bf5f6 Author: Valay Dave Date: Fri Jun 24 02:05:57 2022 +0000 added comment on need for `start_date` commit a3147bee08a260aa78ab2fb14c6232bfab2c2dec Author: Valay Dave Date: Tue Jun 21 06:03:56 2022 +0000 Refactored an `id_creator` method. commit 04d7f207ef2dae0ce2da2ec37163ac871f4517bc Author: Valay Dave Date: Tue Jun 21 05:52:05 2022 +0000 refactor : -`RUN_ID_LEN` to `RUN_HASH_ID_LEN` - `TASK_ID_LEN` to `TASK_ID_HASH_LEN` commit cde4605cd57ad9214f5a6afd7f58fe4c377e09e2 Author: Valay Dave Date: Tue Jun 21 05:48:55 2022 +0000 refactored an error string commit 11458188b6c59d044fca0dd2d1f5024ec84f6488 Author: Valay Dave Date: Mon Jun 20 22:42:36 2022 -0700 addressing savins comments. (#59) - Added many adhoc changes based for some comments. - Integrated secrets and `KUBERNETES_SECRETS` - cleaned up parameter setting - cleaned up setting of scheduling interval - renamed `AIRFLOW_TASK_ID_TEMPLATE_VALUE` to `AIRFLOW_TASK_ID` - renamed `AirflowSensorDecorator.compile` to `AirflowSensorDecorator.validate` - Checking if dagfile and flow file are same. - fixing variable names. - checking out `kubernetes_decorator.py` from master (6441ed5) - bug fixing secret setting in airflow. - simplified parameter type parsing logic - refactoring airflow argument parsing code. commit 83b20a7c6a13b3aedb7e603e139f07f0ef2fb646 Author: Valay Dave Date: Mon Jun 13 14:02:57 2022 -0700 Addressing Final comments. (#57) - Added dag-run timeout. - airflow related scheduling checks in decorator. - Auto naming sensors if no name is provided - Annotations to k8s operators - fix: argument serialization for `DAG` arguments (method names refactored like `to_dict` became `serialize`) - annotation bug fix - setting`workflow-timeout` for only scheduled dags commit 4931f9c84e6a1d20fc3ecb41cf138b72e5dee629 Author: Valay Dave Date: Mon Jun 6 04:50:49 2022 +0000 k8s bug fix commit 200ae8ed4a00028f094281f73a939e7a4dcdf83a Author: Valay Dave Date: Mon Jun 6 04:39:50 2022 +0000 removed un-used function commit 70e285e9a7cfbec71fc293508a62c96f33562a01 Author: Valay Dave Date: Mon Jun 6 04:38:37 2022 +0000 Removed unused `sanitize_label` function commit 84fc622d8b11e718a849b2e2d91ceb3ea69917e6 Author: Valay Dave Date: Mon Jun 6 04:37:34 2022 +0000 GPU support added + container naming same as argo commit c92280d8796ec12b4ff17fa2ff3c736c7244f39c Author: Valay Dave Date: Mon Jun 6 04:25:17 2022 +0000 Refactored sensors to different files + bug fix - bug caused due `util.compress_list`. - The function doesn't play nice with strings with variety of characters. - Ensured that exceptions are handled appropriately. - Made new file for each sensor under `airflow.sensors` module. commit b72a1dcf0dbbbcb814581d92738fd27ec31ef673 Author: Valay Dave Date: Sat Jun 4 01:41:49 2022 +0000 ran black. commit 558c82f65b383ed0d61ded6bc80326471e284550 Author: Valay Dave Date: Fri Jun 3 18:32:48 2022 -0700 Moving information from airflow_utils to compiler (#56) - commenting todos to organize unfinished changes. - some environment variables set via`V1EnvVar` - `client.V1ObjectFieldSelector` mapped env vars were not working in json form - Moving k8s operator import into its own function. - env vars moved. commit 9bb5f638792a671164ec95891e97f599e9a3385f Author: Valay Dave Date: Fri Jun 3 18:06:03 2022 +0000 added mising Run-id prefixes to variables. - merged `hash` and `dash_connect` filters. commit 37b5e6a9d8ca93cc91244c8d77c7d4f61280ba59 Author: Valay Dave Date: Fri Jun 3 18:00:22 2022 +0000 nit fix : variable name change. commit 660756f952ebd92ba1e26d7f908b81036c31ff10 Author: Valay Dave Date: Fri Jun 3 17:58:34 2022 +0000 nit fixes to dag.py's templating variables. commit 1202f5bc92f76df52b5957f11c8574cadfa62196 Author: Valay Dave Date: Fri Jun 3 17:56:53 2022 +0000 Fixed defaults passing - Addressed comments for airflow.py commit b9387dd428c1a37f9a3bfe2c72cab475da708c02 Author: Valay Dave Date: Fri Jun 3 17:52:24 2022 +0000 Following Changes: - Refactors setting scheduling interval - refactor dag file creating function - refactored is_active to is_paused_upon_creation - removed catchup commit 054e3f389febc6c447494a1dedb01228f5f5650f Author: Valay Dave Date: Fri Jun 3 17:33:25 2022 +0000 Multiple Changes based on comments: 1. refactored `create_k8s_args` into _to_job 2. Addressed comments for snake casing 3. refactored `attrs` for simplicity. 4. refactored `metaflow_parameters` to `parameters`. 5. Refactored setting of `input_paths` commit d481b2fca7914b6b657a69af407cfe1a894a46dc Author: Valay Dave Date: Fri Jun 3 16:42:24 2022 +0000 Removed Sensor metadata extraction. commit d8e6ec044ef8c285d7fbe1b83c10c07d51c063e3 Author: Valay Dave Date: Fri Jun 3 16:30:34 2022 +0000 porting savin's comments - next changes : addressing comments. commit 3f2353a647e53bc240e28792769c42a71ea8f8c9 Merge: d370ffb c1ff469 Author: Valay Dave Date: Thu Jul 28 23:52:16 2022 +0000 Merge branch 'master' into airflow commit d370ffb248411ad4675f9d55de709dbd75d3806e Merge: a82f144 e4eb751 Author: Valay Dave Date: Thu Jul 14 19:38:48 2022 +0000 Merge branch 'master' into airflow commit a82f1447b414171fc5611758cb6c12fc692f55f9 Merge: bdb1f0d 6f097e3 Author: Valay Dave Date: Wed Jul 13 00:35:49 2022 +0000 Merge branch 'master' into airflow commit bdb1f0dd248d01318d4a493c75b6f54248c7be64 Merge: 8511215 f9a4968 Author: Valay Dave Date: Wed Jun 29 18:44:51 2022 +0000 Merge branch 'master' into airflow commit 85112158cd352cb7de95a2262c011c6f43d98283 Author: Valay Dave Date: Tue Jun 21 02:53:11 2022 +0000 Bug fix from master merge. commit 90c06f12bb14eda51c6a641766c5f67d6763abaa Merge: 0fb73af 6441ed5 Author: Valay Dave Date: Mon Jun 20 21:20:20 2022 +0000 Merge branch 'master' into airflow commit 0fb73af8af9fca2875261e3bdd305a0daab1b229 Author: Valay Dave Date: Sat Jun 4 00:53:10 2022 +0000 squashing bugs after changes from master. commit 09c6ba779f6b1b6ef1d7ed5b1bb2be70ec76575d Merge: 7bdf662 ffff49b Author: Valay Dave Date: Sat Jun 4 00:20:38 2022 +0000 Merge branch 'master' into af-mmr commit 7bdf662e14966b929b8369c65d5bd3bbe5741937 Author: Valay Dave Date: Mon May 16 17:42:38 2022 -0700 Airflow sensor api (#3) * Fixed run-id setting - Change gaurentees that multiple dags triggered at same moment have unique run-id * added allow multiple in `Decorator` class * Airflow sensor integration. >> support added for : - ExternalTaskSensor - S3KeySensor - SqlSensor >> sensors allow multiple decorators >> sensors accept those arguments which are supported by airflow * Added `@airflow_schedule_interval` decorator * Fixing bug run-id related in env variable setting. commit 2604a29452e794354cf4c612f48bae7cf45856ee Author: Valay Dave Date: Thu Apr 21 18:26:59 2022 +0000 Addressed comments. commit 584e88b679fed7d6eec8ce564bf3707359170568 Author: Valay Dave Date: Wed Apr 20 03:33:55 2022 +0000 fixed printing bug commit 169ac1535e5567149d94749ddaf70264e882d62c Author: Valay Dave Date: Wed Apr 20 03:30:59 2022 +0000 Option help bug fix. commit 6f8489bcc3bd715b65d8a8554a0f3932dc78c6f5 Author: Valay Dave Date: Wed Apr 20 03:25:54 2022 +0000 variable renamemetaflow_specific_args commit 0c779abcd1d9574878da6de8183461b53e0da366 Merge: d299b13 5a61508 Author: Valay Dave Date: Wed Apr 20 03:23:10 2022 +0000 Merge branch 'airflow-tests' into airflow commit 5a61508e61583b567ef8d3fea04e049d74a6d973 Author: Valay Dave Date: Wed Apr 20 03:22:54 2022 +0000 Removing un-used code / resolved-todos. commit d030830f2543f489a1c4ebd17da1b47942f041d6 Author: Valay Dave Date: Wed Apr 20 03:06:03 2022 +0000 ran black, commit 2d1fc06e41cbe45ccfd46e03bc87b09c7a78da45 Merge: f2cb319 7921d13 Author: Valay Dave Date: Wed Apr 20 03:04:19 2022 +0000 Merge branch 'master' into airflow-tests commit d299b13ce38d027ab27ce23c9bbcc0f43b222cfa Merge: f2cb319 7921d13 Author: Valay Dave Date: Wed Apr 20 03:02:37 2022 +0000 Merge branch 'master' into airflow commit f2cb3197725f11520da0d49cbeef8de215c243eb Author: Valay Dave Date: Wed Apr 20 02:54:03 2022 +0000 reverting change. commit 05b9db9cf0fe8b40873b2b74e203b4fc82e7fea4 Author: Valay Dave Date: Wed Apr 20 02:47:41 2022 +0000 3 changes: - Removing s3 dep - remove uesless import - added `deployed_on` in dag file template commit c6afba95f5ec05acf7f33fd3228cffd784556e3b Author: Valay Dave Date: Fri Apr 15 22:50:52 2022 +0000 Fixed passing secrets with kubernetes. commit c3ce7e9faa5f7a23d309e2f66f778dbca85df22a Author: Valay Dave Date: Fri Apr 15 22:04:22 2022 +0000 Refactored code . - removed compute/k8s.py - Moved k8s code to airflow_compiler.py - ran isort to airflow_compiler.py commit d1c343dbbffbddbebd2aeda26d6846e595144e0b Author: Valay Dave Date: Fri Apr 15 18:02:25 2022 +0000 Added validations about: - un-supported decorators - foreach Changed where validations are done to not save the package. commit 7b19f8e66e278c75d836daf6a1c7ed2c607417ce Author: Valay Dave Date: Fri Apr 15 03:34:26 2022 +0000 Fixing mf log related bug - No double logging on metaflow. commit 4d1f6bf9bb32868c949d8c103c8fe44ea41b3f13 Author: Valay Dave Date: Thu Apr 14 03:10:51 2022 +0000 Removed usless code WRT project decorator. commit 5ad9a3949e351b0ac13f11df13446953932e8ffc Author: Valay Dave Date: Thu Apr 14 03:03:19 2022 +0000 Remove readme. commit 60cb6a79404efe2bcf9bf9a118a68f0b98c7d771 Author: Valay Dave Date: Thu Apr 14 03:02:38 2022 +0000 Made file path required arguement. commit 9f0dc1b2e01ee04b05620630f3a0ec04fe873a31 Author: Valay Dave Date: Thu Apr 14 03:01:07 2022 +0000 changed `--is-active`->`--is-paused-upon-creation` - dags are active by default. commit 5b98f937a62ee74de8aed8b0efde5045a28f068b Author: Valay Dave Date: Thu Apr 14 02:55:46 2022 +0000 shortened length of run-id and task-id hashes. commit e53426eaa4b156e8bd70ae7510c2e7c66745d101 Author: Valay Dave Date: Thu Apr 14 02:41:32 2022 +0000 Removing un-used args. commit 72cbbfc7424f9be415c22d9144b16a0953f15295 Author: Valay Dave Date: Thu Apr 14 02:39:59 2022 +0000 Moved exceptions to airflow compiler commit b2970ddaa86c393c8abb7f203f6507c386ecbe00 Author: Valay Dave Date: Thu Apr 14 02:33:02 2022 +0000 Changes based on PR comments: - removed airflow xcom push file , moved to decorator code - removed prefix configuration - nit fixes. commit 9e622bac5a75eb9e7a6594d8fa0e47f076634b44 Author: Valay Dave Date: Mon Apr 11 20:39:00 2022 +0000 Removing un-used code paths + code cleanup commit 7425f62cff2c9128eea785223ddeb40fa2d8f503 Author: Valay Dave Date: Mon Apr 11 19:45:04 2022 +0000 Fixing bug fix in schedule. commit eb775cbadd1d2d2c90f160a95a0f42c8ff0d7f4c Author: Valay Dave Date: Sun Apr 10 02:52:59 2022 +0000 Bug fixes WRT Kubernetes secrets + k8s deployments. - Fixing some error messages. - Added some comments. commit 04c92b92c312a4789d3c1e156f61ef57b08dba9f Author: Valay Dave Date: Sun Apr 10 01:20:53 2022 +0000 Added secrets support. commit 4a0a85dff77327640233767e567aee2b379ac13e Author: Valay Dave Date: Sun Apr 10 00:11:46 2022 +0000 Bug fix. commit af91099c0a30c26b58d58696a3ef697ec49a8503 Author: Valay Dave Date: Sun Apr 10 00:03:34 2022 +0000 bug fix. commit c17f04a253dfe6118e2779db79da9669aa2fcef2 Author: Valay Dave Date: Sat Apr 9 23:55:41 2022 +0000 Bug fix in active defaults. commit 0d372361297857076df6af235d1de7005ac1544a Author: Valay Dave Date: Sat Apr 9 23:54:02 2022 +0000 @project, @schedule, default active dag support. - Added a flag to allow setting dag as active on creation - Airflow compatible schedule interval - Project name fixes. commit 5c97b15cb11b5e8279befc5b14c239463750e9b7 Author: Valay Dave Date: Thu Apr 7 21:15:18 2022 +0000 Max workers and worker pool support. commit 9c973f2f44c3cb3a98e3e63f6e4dcef898bc8bf2 Author: Valay Dave Date: Thu Apr 7 19:34:33 2022 +0000 Adding exceptions for missing features. commit 2a946e2f083a34b4b6ed84c70aebf96b084ee8a2 Author: Valay Dave Date: Mon Mar 28 19:34:11 2022 +0000 2 changes : - removed hacky line - added support to directly throw dags in s3. commit e0772ec1bad473482c6fd19f8c5e8b9845303c0a Author: Valay Dave Date: Wed Mar 23 22:38:20 2022 +0000 fixing bugs in service account setting commit 874b94aeeabc664f12551864eff9d8fdc24dc37b Author: Valay Dave Date: Sun Mar 20 23:49:15 2022 +0000 Added support for Branching with Airflow - remove `next` function in `AirflowTask` - `AirflowTask`s has no knowledge of next tasks. - removed todos + added some todos - Graph construction on airflow side using graph_structure datastructure. - graph_structure comes from`FlowGraph.output_steps()[1]` commit 8e9f649bd8c51171c38a1e5af70a44a85e7009ca Author: Valay Dave Date: Sun Mar 20 02:33:04 2022 +0000 Added hacky line commit fd5db04cf0a81b14efda5eaf40cd9227e2bac0d3 Author: Valay Dave Date: Sun Mar 20 02:06:38 2022 +0000 Removed hacky line. commit 5b23eb7d8446bef71246d853b11edafa93c6ef95 Author: Valay Dave Date: Sun Mar 20 01:44:57 2022 +0000 Added support for Parameters. - Supporting int, str, bool, float, JSONType commit c9378e9b284657357ad2997f2b492bc2f4aaefac Author: Valay Dave Date: Sun Mar 20 00:14:10 2022 +0000 Removed todos + added some validation logic. commit 7250a44e1dea1da3464f6f71d0c5188bd314275a Author: Valay Dave Date: Sat Mar 19 23:45:15 2022 +0000 Fixing logs related change from master. commit d125978619ab666dcf96db330acdca40f41b7114 Merge: 8cdac53 7e210a2 Author: Valay Dave Date: Sat Mar 19 23:42:48 2022 +0000 Merge branch 'master' into aft-mm commit 8cdac53dd32648455e36955badb8e0ef7b95a2b3 Author: Valay Dave Date: Sat Mar 19 23:36:47 2022 +0000 making changes sync with master commit 5a93d9f5198c360b2a84ab13a86496986850953c Author: Valay Dave Date: Sat Mar 19 23:29:47 2022 +0000 Fixed bug when using catch + retry commit 62bc8dff68a6171b3b4222075a8e8ac109f65b4c Author: Valay Dave Date: Sat Mar 19 22:58:37 2022 +0000 Changed retry setting. commit 563a20036a2dfcc48101f680f29d4917d53aa247 Author: Valay Dave Date: Sat Mar 19 22:42:57 2022 +0000 Fixed setting `task_id` : - switch task-id from airflow job is to hash to "runid/stepname" - refactor xcom setting variables - added comments commit e2a1e502221dc603385263c82e2c068b9f055188 Author: Valay Dave Date: Sat Mar 19 17:51:59 2022 +0000 setting retry logic. commit a697b56052210c8f009b68772c902bbf77713202 Author: Valay Dave Date: Thu Mar 17 01:02:11 2022 +0000 Nit fix. commit 68f13beb17c7e73c0dddc142ef2418675a506439 Author: Valay Dave Date: Wed Mar 16 20:46:19 2022 +0000 Added @schedule support + readme commit 57bdde54f9ad2c8fe5513dbdb9fd02394664e234 Author: Valay Dave Date: Tue Mar 15 19:47:06 2022 +0000 Fixed setting run-id / task-id to labels in k8s - Fixed setting run-id has from cli macro - added hashing macro to ensure that jinja template set the correct run-id to k8s labels - commit 3d6c31917297d0be5f9915b13680fc415ddb4421 Author: Valay Dave Date: Tue Mar 15 05:39:04 2022 +0000 Got linear workflows working on airflow. - Still not feature complete as lots of args are still unfilled / lots of unknows - minor tweek in eks to ensure airflow is k8s compatible. - passing state around via xcom-push - HACK : AWS keys are passed in a shady way. : Reverse this soon. commit db074b8012f76d9d85225a4ceddb2cde8fefa0f4 Author: Valay Dave Date: Fri Mar 11 12:34:33 2022 -0800 Tweeks commit a9f0468c4721a2017f1b26eb8edcdd80aaa57203 Author: Valay Dave Date: Tue Mar 1 17:14:47 2022 -0800 some changes based on savin's comments. - Added changes to task datastore for different reason : (todo) Decouple these - Added comments to SFN for reference. - Airflow DAG is no longer dependent on metaflow commit f32d089cd3865927bc7510f24ba3418d859410b6 Author: Valay Dave Date: Wed Feb 23 00:54:17 2022 -0800 First version of dynamic dag compiler. - Not completely finished code - Creates generic .py file a JSON that is parsed to create Airflow DAG. - Currently only boiler plate to make a linear dag but doesn't execute anything. - Unfinished code. commit d2def665a86d6a6622d6076882c1c2d54044e773 Author: Valay Dave Date: Sat Feb 19 14:01:47 2022 -0800 more tweeks. commit b176311f166788cc3dfc93354a0c5045a4e6a3d4 Author: Valay Dave Date: Thu Feb 17 09:04:29 2022 -0800 commit 0 - unfinished code. --- metaflow/decorators.py | 41 ++++++++++++++++++++++++++++++----------- 1 file changed, 30 insertions(+), 11 deletions(-) diff --git a/metaflow/decorators.py b/metaflow/decorators.py index 8723367b79b..80024ffa6eb 100644 --- a/metaflow/decorators.py +++ b/metaflow/decorators.py @@ -108,6 +108,8 @@ class Decorator(object): name = "NONAME" defaults = {} + # `allow_multiple` allows setting many decorators of the same type to a step/flow. + allow_multiple = False def __init__(self, attributes=None, statically_defined=False): self.attributes = self.defaults.copy() @@ -255,9 +257,6 @@ class MyDecorator(StepDecorator): pass them around with every lifecycle call. """ - # `allow_multiple` allows setting many decorators of the same type to a step. - allow_multiple = False - def step_init( self, flow, graph, step_name, decorators, environment, flow_datastore, logger ): @@ -403,12 +402,17 @@ def _base_flow_decorator(decofunc, *args, **kwargs): if isinstance(cls, type) and issubclass(cls, FlowSpec): # flow decorators add attributes in the class dictionary, # _flow_decorators. - if decofunc.name in cls._flow_decorators: + if decofunc.name in cls._flow_decorators and not decofunc.allow_multiple: raise DuplicateFlowDecoratorException(decofunc.name) else: - cls._flow_decorators[decofunc.name] = decofunc( - attributes=kwargs, statically_defined=True - ) + deco_instance = decofunc(attributes=kwargs, statically_defined=True) + if decofunc.allow_multiple: + if decofunc.name not in cls._flow_decorators: + cls._flow_decorators[decofunc.name] = [deco_instance] + else: + cls._flow_decorators[decofunc.name].append(deco_instance) + else: + cls._flow_decorators[decofunc.name] = deco_instance else: raise BadFlowDecoratorException(decofunc.name) return cls @@ -503,11 +507,26 @@ def _attach_decorators_to_step(step, decospecs): def _init_flow_decorators( flow, graph, environment, flow_datastore, metadata, logger, echo, deco_options ): + # Certain decorators can be specified multiple times and exist as lists in the _flow_decorators dictionary for deco in flow._flow_decorators.values(): - opts = {option: deco_options[option] for option in deco.options} - deco.flow_init( - flow, graph, environment, flow_datastore, metadata, logger, echo, opts - ) + if type(deco) == list: + for rd in deco: + opts = {option: deco_options[option] for option in rd.options} + rd.flow_init( + flow, + graph, + environment, + flow_datastore, + metadata, + logger, + echo, + opts, + ) + else: + opts = {option: deco_options[option] for option in deco.options} + deco.flow_init( + flow, graph, environment, flow_datastore, metadata, logger, echo, opts + ) def _init_step_decorators(flow, graph, environment, flow_datastore, logger): From 09ab1ed2cc9c28b611cc0a36734cf84635e52a2d Mon Sep 17 00:00:00 2001 From: Valay Dave Date: Fri, 29 Jul 2022 06:46:54 +0000 Subject: [PATCH 2/3] Sensors and foreach support --- metaflow/plugins/__init__.py | 4 +- metaflow/plugins/airflow/airflow.py | 18 ++++ metaflow/plugins/airflow/airflow_cli.py | 8 +- metaflow/plugins/airflow/airflow_utils.py | 63 +++++++++++++ metaflow/plugins/airflow/sensors/__init__.py | 9 ++ .../plugins/airflow/sensors/base_sensor.py | 74 +++++++++++++++ .../airflow/sensors/external_task_sensor.py | 94 +++++++++++++++++++ metaflow/plugins/airflow/sensors/s3_sensor.py | 26 +++++ .../plugins/airflow/sensors/sql_sensor.py | 31 ++++++ 9 files changed, 319 insertions(+), 8 deletions(-) create mode 100644 metaflow/plugins/airflow/sensors/__init__.py create mode 100644 metaflow/plugins/airflow/sensors/base_sensor.py create mode 100644 metaflow/plugins/airflow/sensors/external_task_sensor.py create mode 100644 metaflow/plugins/airflow/sensors/s3_sensor.py create mode 100644 metaflow/plugins/airflow/sensors/sql_sensor.py diff --git a/metaflow/plugins/__init__.py b/metaflow/plugins/__init__.py index bb5c52ea746..460298a4fe7 100644 --- a/metaflow/plugins/__init__.py +++ b/metaflow/plugins/__init__.py @@ -174,11 +174,13 @@ def get_plugin_cli(): from .project_decorator import ProjectDecorator +from .airflow.sensors import SUPPORTED_SENSORS + FLOW_DECORATORS = [ CondaFlowDecorator, ScheduleDecorator, ProjectDecorator, -] +] + SUPPORTED_SENSORS _merge_lists(FLOW_DECORATORS, _ext_plugins["FLOW_DECORATORS"], "name") # Cards diff --git a/metaflow/plugins/airflow/airflow.py b/metaflow/plugins/airflow/airflow.py index 5480a79c59e..88b8b3a338c 100644 --- a/metaflow/plugins/airflow/airflow.py +++ b/metaflow/plugins/airflow/airflow.py @@ -35,6 +35,7 @@ from . import airflow_utils from .exception import AirflowException +from .sensors import SUPPORTED_SENSORS from .airflow_utils import ( TASK_ID_XCOM_KEY, AirflowTask, @@ -88,6 +89,7 @@ def __init__( self.username = username self.max_workers = max_workers self.description = description + self._depends_on_upstream_sensors = False self._file_path = file_path _, self.graph_structure = self.graph.output_steps() self.worker_pool = worker_pool @@ -584,6 +586,17 @@ def _step_cli(self, node, paths, code_package_url, user_code_retries): cmds.append(" ".join(entrypoint + top_level + step)) return cmds + def _collect_flow_sensors(self): + decos_lists = [ + self.flow._flow_decorators.get(s.name) + for s in SUPPORTED_SENSORS + if self.flow._flow_decorators.get(s.name) is not None + ] + af_tasks = [deco.create_task() for decos in decos_lists for deco in decos] + if len(af_tasks) > 0: + self._depends_on_upstream_sensors = True + return af_tasks + def _contains_foreach(self): for node in self.graph: if node.type == "foreach": @@ -638,6 +651,7 @@ def _visit(node, workflow, exit_node=None): if self.workflow_timeout is not None and self.schedule is not None: airflow_dag_args["dagrun_timeout"] = dict(seconds=self.workflow_timeout) + appending_sensors = self._collect_flow_sensors() workflow = Workflow( dag_id=self.name, default_args=self._create_defaults(), @@ -658,6 +672,10 @@ def _visit(node, workflow, exit_node=None): workflow = _visit(self.graph["start"], workflow) workflow.set_parameters(self.parameters) + if len(appending_sensors) > 0: + for s in appending_sensors: + workflow.add_state(s) + workflow.graph_structure.insert(0, [[s.name] for s in appending_sensors]) return self._to_airflow_dag_file(workflow.to_dict()) def _to_airflow_dag_file(self, json_dag): diff --git a/metaflow/plugins/airflow/airflow_cli.py b/metaflow/plugins/airflow/airflow_cli.py index 5ac676978c2..7d524d4f33e 100644 --- a/metaflow/plugins/airflow/airflow_cli.py +++ b/metaflow/plugins/airflow/airflow_cli.py @@ -322,7 +322,6 @@ def make_flow( def _validate_foreach_constraints(graph): - # Todo :Invoke this function when we integrate `foreach`s def traverse_graph(node, state): if node.type == "foreach" and node.is_inside_foreach: raise NotSupportedException( @@ -378,18 +377,13 @@ def _validate_workflow(flow, graph, flow_datastore, metadata, workflow_timeout): "A default value is required for parameters when deploying flows on Airflow." ) # check for other compute related decorators. + _validate_foreach_constraints(graph) for node in graph: if node.parallel_foreach: raise AirflowException( "Deploying flows with @parallel decorator(s) " "to Airflow is not supported currently." ) - - if node.type == "foreach": - raise NotSupportedException( - "Step *%s* is a foreach step and Foreach steps are not currently supported with Airflow." - % node.name - ) if any([d.name == "batch" for d in node.decorators]): raise NotSupportedException( "Step *%s* is marked for execution on AWS Batch with Airflow which isn't currently supported." diff --git a/metaflow/plugins/airflow/airflow_utils.py b/metaflow/plugins/airflow/airflow_utils.py index 26e544e8d61..c94b6734818 100644 --- a/metaflow/plugins/airflow/airflow_utils.py +++ b/metaflow/plugins/airflow/airflow_utils.py @@ -44,6 +44,10 @@ class IncompatibleKubernetesProviderVersionException(Exception): ) % (sys.executable, KUBERNETES_PROVIDER_FOREACH_VERSION) +class AirflowSensorNotFound(Exception): + headline = "Sensor package not found" + + def create_absolute_version_number(version): abs_version = None # For all digits @@ -189,6 +193,16 @@ def pathspec(cls, flowname, is_foreach=False): ) +class SensorNames: + EXTERNAL_TASK_SENSOR = "ExternalTaskSensor" + S3_SENSOR = "S3KeySensor" + SQL_SENSOR = "SQLSensor" + + @classmethod + def get_supported_sensors(cls): + return list(cls.__dict__.values()) + + def run_id_creator(val): # join `[dag-id,run-id]` of airflow dag. return hashlib.md5("-".join([str(x) for x in val]).encode("utf-8")).hexdigest()[ @@ -375,6 +389,46 @@ def _kubernetes_pod_operator_args(operator_args): return args +def _parse_sensor_args(name, kwargs): + if name == SensorNames.EXTERNAL_TASK_SENSOR: + if "execution_delta" in kwargs: + if type(kwargs["execution_delta"]) == dict: + kwargs["execution_delta"] = timedelta(**kwargs["execution_delta"]) + else: + del kwargs["execution_delta"] + return kwargs + + +def _get_sensor(name): + # from airflow import XComArg + # XComArg() + if name == SensorNames.EXTERNAL_TASK_SENSOR: + # ExternalTaskSensors uses an execution_date of a dag to + # determine the appropriate DAG. + # This is set to the exact date the current dag gets executed on. + # For example if "DagA" (Upstream DAG) got scheduled at + # 12 Jan 4:00 PM PDT then "DagB"(current DAG)'s task sensor will try to + # look for a "DagA" that got executed at 12 Jan 4:00 PM PDT **exactly**. + # They also support a `execution_timeout` argument to + from airflow.sensors.external_task_sensor import ExternalTaskSensor + + return ExternalTaskSensor + elif name == SensorNames.S3_SENSOR: + try: + from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor + except ImportError: + raise AirflowSensorNotFound( + "This DAG requires a `S3KeySensor`. " + "Install the Airflow AWS provider using : " + "`pip install apache-airflow-providers-amazon`" + ) + return S3KeySensor + elif name == SensorNames.SQL_SENSOR: + from airflow.sensors.sql import SqlSensor + + return SqlSensor + + def get_metaflow_kubernetes_operator(): try: from airflow.contrib.operators.kubernetes_pod_operator import ( @@ -493,6 +547,13 @@ def set_operator_args(self, **kwargs): self._operator_args = kwargs return self + def _make_sensor(self): + TaskSensor = _get_sensor(self._operator_type) + return TaskSensor( + task_id=self.name, + **_parse_sensor_args(self._operator_type, self._operator_args) + ) + def to_dict(self): return { "name": self.name, @@ -541,6 +602,8 @@ def to_task(self): return self._kubernetes_task() else: return self._kubernetes_mapper_task() + elif self._operator_type in SensorNames.get_supported_sensors(): + return self._make_sensor() class Workflow(object): diff --git a/metaflow/plugins/airflow/sensors/__init__.py b/metaflow/plugins/airflow/sensors/__init__.py new file mode 100644 index 00000000000..02952d0c9a4 --- /dev/null +++ b/metaflow/plugins/airflow/sensors/__init__.py @@ -0,0 +1,9 @@ +from .external_task_sensor import ExternalTaskSensorDecorator +from .s3_sensor import S3KeySensorDecorator +from .sql_sensor import SQLSensorDecorator + +SUPPORTED_SENSORS = [ + ExternalTaskSensorDecorator, + S3KeySensorDecorator, + SQLSensorDecorator, +] diff --git a/metaflow/plugins/airflow/sensors/base_sensor.py b/metaflow/plugins/airflow/sensors/base_sensor.py new file mode 100644 index 00000000000..9412072cd23 --- /dev/null +++ b/metaflow/plugins/airflow/sensors/base_sensor.py @@ -0,0 +1,74 @@ +import uuid +from metaflow.decorators import FlowDecorator +from ..exception import AirflowException +from ..airflow_utils import AirflowTask, id_creator, TASK_ID_HASH_LEN + + +class AirflowSensorDecorator(FlowDecorator): + """ + Base class for all Airflow sensor decorators. + """ + + allow_multiple = True + + defaults = dict( + timeout=3600, + poke_interval=60, + mode="reschedule", + exponential_backoff=True, + pool=None, + soft_fail=False, + name=None, + description=None, + ) + + operator_type = None + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._airflow_task_name = None + self._id = str(uuid.uuid4()) + + def serialize_operator_args(self): + """ + Subclasses will parse the decorator arguments to + Airflow task serializable arguments. + """ + task_args = dict(**self.attributes) + del task_args["name"] + if task_args["description"] is not None: + task_args["doc"] = task_args["description"] + del task_args["description"] + task_args["do_xcom_push"] = True + return task_args + + def create_task(self): + task_args = self.serialize_operator_args() + return AirflowTask( + self._airflow_task_name, + operator_type=self.operator_type, + ).set_operator_args(**{k: v for k, v in task_args.items() if v is not None}) + + def validate(self): + """ + Validate if the arguments for the sensor are correct. + """ + # If there is no name set then auto-generate the name. This is done because there can be more than + # one `AirflowSensorDecorator` of the same type. + if self.attributes["name"] is None: + deco_index = [ + d._id + for d in self._flow_decorators + if issubclass(d.__class__, AirflowSensorDecorator) + ].index(self._id) + self._airflow_task_name = "%s-%s" % ( + self.operator_type, + id_creator([self.operator_type, str(deco_index)], TASK_ID_HASH_LEN), + ) + else: + self._airflow_task_name = self.attributes["name"] + + def flow_init( + self, flow, graph, environment, flow_datastore, metadata, logger, echo, options + ): + self.validate() diff --git a/metaflow/plugins/airflow/sensors/external_task_sensor.py b/metaflow/plugins/airflow/sensors/external_task_sensor.py new file mode 100644 index 00000000000..649edba706c --- /dev/null +++ b/metaflow/plugins/airflow/sensors/external_task_sensor.py @@ -0,0 +1,94 @@ +from .base_sensor import AirflowSensorDecorator +from ..airflow_utils import SensorNames +from ..exception import AirflowException +from datetime import timedelta + + +AIRFLOW_STATES = dict( + QUEUED="queued", + RUNNING="running", + SUCCESS="success", + SHUTDOWN="shutdown", # External request to shut down, + FAILED="failed", + UP_FOR_RETRY="up_for_retry", + UP_FOR_RESCHEDULE="up_for_reschedule", + UPSTREAM_FAILED="upstream_failed", + SKIPPED="skipped", +) + + +class ExternalTaskSensorDecorator(AirflowSensorDecorator): + operator_type = SensorNames.EXTERNAL_TASK_SENSOR + # Docs: + # https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/sensors/external_task/index.html#airflow.sensors.external_task.ExternalTaskSensor + name = "airflow_external_task_sensor" + defaults = dict( + **AirflowSensorDecorator.defaults, + external_dag_id=None, + external_task_ids=None, + allowed_states=[AIRFLOW_STATES["SUCCESS"]], + failed_states=None, + execution_delta=None, + check_existence=True, + # We cannot add `execution_date_fn` as it requires a python callable. + # Passing around a python callable is non-trivial since we are passing a + # callable from metaflow-code to airflow python script. Since we cannot + # transfer dependencies of the callable, we cannot gaurentee that the callable + # behave exactly as the user expects + ) + + def serialize_operator_args(self): + task_args = super().serialize_operator_args() + if task_args["execution_delta"] is not None: + task_args["execution_delta"] = dict( + seconds=task_args["execution_delta"].total_seconds() + ) + return task_args + + def validate(self): + if self.attributes["external_dag_id"] is None: + raise AirflowException( + "`%s` argument of `@%s`cannot be `None`." + % ("external_dag_id", self.name) + ) + + if type(self.attributes["allowed_states"]) == str: + if self.attributes["allowed_states"] not in list(AIRFLOW_STATES.values()): + raise AirflowException( + "`%s` is an invalid input of `%s` for `@%s`. Accepted values are %s" + % ( + str(self.attributes["allowed_states"]), + "allowed_states", + self.name, + ", ".join(list(AIRFLOW_STATES.values())), + ) + ) + elif type(self.attributes["allowed_states"]) == list: + enum_not_matched = [ + x + for x in self.attributes["allowed_states"] + if x not in list(AIRFLOW_STATES.values()) + ] + if len(enum_not_matched) > 0: + raise AirflowException( + "`%s` is an invalid input of `%s` for `@%s`. Accepted values are %s" + % ( + str(" OR ".join(["'%s'" % i for i in enum_not_matched])), + "allowed_states", + self.name, + ", ".join(list(AIRFLOW_STATES.values())), + ) + ) + else: + self.attributes["allowed_states"] = [AIRFLOW_STATES["SUCCESS"]] + + if self.attributes["execution_delta"] is not None: + if not isinstance(self.attributes["execution_delta"], timedelta): + raise AirflowException( + "`%s` is an invalid input type of `execution_delta` for `@%s`. Accepted type is `datetime.timedelta`" + % ( + str(type(self.attributes["execution_delta"])), + self.name, + ) + ) + super().validate() diff --git a/metaflow/plugins/airflow/sensors/s3_sensor.py b/metaflow/plugins/airflow/sensors/s3_sensor.py new file mode 100644 index 00000000000..b4f7ae5b6de --- /dev/null +++ b/metaflow/plugins/airflow/sensors/s3_sensor.py @@ -0,0 +1,26 @@ +from .base_sensor import AirflowSensorDecorator +from ..airflow_utils import SensorNames +from ..exception import AirflowException + + +class S3KeySensorDecorator(AirflowSensorDecorator): + name = "airflow_s3_key_sensor" + operator_type = SensorNames.S3_SENSOR + # Arg specification can be found here : + # https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_api/airflow/providers/amazon/aws/sensors/s3/index.html#airflow.providers.amazon.aws.sensors.s3.S3KeySensor + defaults = dict( + **AirflowSensorDecorator.defaults, + bucket_key=None, # Required + bucket_name=None, + wildcard_match=False, + aws_conn_id=None, + verify=None, # `verify (Optional[Union[str, bool]])` Whether or not to verify SSL certificates for S3 connection. + # `verify` is a airflow variable. + ) + + def validate(self): + if self.attributes["bucket_key"] is None: + raise AirflowException( + "`bucket_key` for `@%s`cannot be empty." % (self.name) + ) + super().validate() diff --git a/metaflow/plugins/airflow/sensors/sql_sensor.py b/metaflow/plugins/airflow/sensors/sql_sensor.py new file mode 100644 index 00000000000..c97c41b283e --- /dev/null +++ b/metaflow/plugins/airflow/sensors/sql_sensor.py @@ -0,0 +1,31 @@ +from .base_sensor import AirflowSensorDecorator +from ..airflow_utils import SensorNames +from ..exception import AirflowException + + +class SQLSensorDecorator(AirflowSensorDecorator): + name = "airflow_sql_sensor" + operator_type = SensorNames.SQL_SENSOR + # Arg specification can be found here : + # https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/sensors/sql/index.html#airflow.sensors.sql.SqlSensor + defaults = dict( + **AirflowSensorDecorator.defaults, + conn_id=None, + sql=None, + # success = None, # sucess/failure require callables. Wont be supported at start since not serialization friendly. + # failure = None, + parameters=None, + fail_on_empty=True, + ) + + def validate(self): + if self.attributes["conn_id"] is None: + raise AirflowException( + "`%s` argument of `@%s`cannot be `None`." % ("conn_id", self.name) + ) + raise _arg_exception("conn_id", self.name, None) + if self.attributes["sql"] is None: + raise AirflowException( + "`%s` argument of `@%s`cannot be `None`." % ("sql", self.name) + ) + super().validate() From 25c0928fe61d725f7b686d0e74dc62d4db6b24eb Mon Sep 17 00:00:00 2001 From: Valay Dave Date: Fri, 29 Jul 2022 21:51:19 +0000 Subject: [PATCH 3/3] finsihed cli for foreachstack --- metaflow/plugins/airflow/airflow_cli.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metaflow/plugins/airflow/airflow_cli.py b/metaflow/plugins/airflow/airflow_cli.py index 7d524d4f33e..1f48a1fa481 100644 --- a/metaflow/plugins/airflow/airflow_cli.py +++ b/metaflow/plugins/airflow/airflow_cli.py @@ -337,7 +337,7 @@ def traverse_graph(node, state): if node.type == "linear" and node.is_inside_foreach: state["foreach_stack"].append(node.name) - if len(state["foreach_stack"]) > 2: + if "foreach_stack" in state and len(state["foreach_stack"]) > 2: raise NotSupportedException( "The foreach step *%s* created by step *%s* needs to have an immediate join step. " "Step *%s* is invalid since it is a linear step with a foreach. "