

getint ( 'core', "DAGBAG_IMPORT_TIMEOUT" )): try : m = imp. hexdigest () + '_' + org_mod_name ) if mod_name in sys. split ( filepath )) mod_name = ( 'unusual_prefix_' + hashlib. debug ( "Importing %s ", filepath ) org_mod_name, _ = os. Skipping.", filepath ) return found_dags self. info ( "File %s assumed to contain no DAGs. file_last_changed = file_last_changed_on_disk # Don't want to spam user with skip messages if not self. is_zipfile ( filepath ) if not is_zipfile : if safe_mode : with open ( filepath, 'rb' ) as f : content = f.
#Airflow dag bag mods#
exception ( e ) return found_dags mods = is_zipfile = zipfile. file_last_changed : return found_dags except Exception as e : self. file_last_changed \Īnd file_last_changed_on_disk = self. getmtime ( filepath )) if only_if_updated \Īnd filepath in self. isfile ( filepath ): return found_dags try : # This failed before in what may have been a git sync # race condition file_last_changed_on_disk = datetime. """ from import DAG # Avoid circular import found_dags = # if the source file no longer exists in the DB or in the filesystem, # return an empty list # todo: raise exception? if filepath is None or not os.
#Airflow dag bag zip#
def process_file ( self, filepath, only_if_updated = True, safe_mode = True ): """ Given a path to a python module or zip file, this method imports the module and look for dag objects within it. fileloc ), only_if_updated = False ) # If the source file no longer exports `dag_id`, delete it from self.dags if found_dags and dag_id in : return self. process_file ( filepath = correct_maybe_zipped ( orm_dag. last_expired ) ): # Reprocess source file found_dags = self. get_current ( root_dag_id ) if orm_dag and ( root_dag_id not in self. dag_id # If the dag corresponding to root_dag_id is absent or expired orm_dag = DagModel. def get_dag ( self, dag_id ): """ Gets the DAG out of the dictionary, and refreshes it if expired """ from import DagModel # Avoid circular import # If asking for a known subdag, we want to refresh the parent root_dag_id = dag_id if dag_id in self. """ # static class variables to detetct dag cycle Therefore only once per DagBag is a file logged being skipped. This is to prevent overloading the user with logging messages about skipped files. :param dag_folder: the folder to scan to find DAGs :type dag_folder: unicode :param executor: the executor to use when executing task instances in this DagBag :param include_examples: whether to include the examples that ship with airflow or not :type include_examples: bool :param has_logged: an instance boolean that gets flipped from False to True after a file has been skipped.

What would have been system level settings are now dagbag level so that one system can run multiple, independent settings sets.

This makes it easier to run distinct environments for say production and development, tests, or for different teams or security profiles. class DagBag ( BaseDagBag, LoggingMixin ): """ A dagbag is a collection of dags, parsed out of a folder tree and has high level configuration settings, like what database to use as a backend and what executor to use to fire off tasks. from _future_ import division from _future_ import unicode_literals import hashlib import imp import importlib import os import sys import textwrap import zipfile from collections import namedtuple from datetime import datetime, timedelta import six from croniter import croniter, CroniterBadCronError, CroniterBadDateError, CroniterNotAlphaError from sqlalchemy import or_ from airflow import configuration, settings from _dag import BaseDagBag from airflow.exceptions import AirflowDagCycleException from airflow.executors import get_default_executor from ttings import Stats from airflow.utils import timezone from _processing import list_py_file_paths, correct_maybe_zipped from import provide_session from import pprinttable from _mixin import LoggingMixin from import State from import timeout

#Airflow dag bag license#
See the License for the # specific language governing permissions and limitations # under the License.
#Airflow dag bag software#
You may obtain a copy of the License at # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License") you may not use this file except in compliance # with the License. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. # -*- coding: utf-8 -*- # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements.
