# Licensed under a 3-clause BSD style license - see LICENSE.rst"""Utilities to create scripts and command-line tools."""importcodecsimportos.pathimportwarningsfrombase64importurlsafe_b64encodefrompathlibimportPathfromuuidimportuuid4importyamlfromgammapy.utils.checkimportadd_checksum,verify_checksum__all__=["get_images_paths","make_path","read_yaml","recursive_merge_dicts","write_yaml",]PATH_DOCS=Path(__file__).resolve().parent/".."/".."/"docs"SKIP=["_static","_build","_checkpoints","docs/user-guide/model-gallery/"]
[docs]defget_images_paths(folder=PATH_DOCS):"""Generator yields a Path for each image used in notebook. Parameters ---------- folder : str Folder where to search. """foriinPath(folder).rglob("images/*"):ifnotany(sinstr(i)forsinSKIP):yieldi.resolve()
[docs]defread_yaml(filename,logger=None,checksum=False):"""Read YAML file. Parameters ---------- filename : `~pathlib.Path` Filename. logger : `~logging.Logger` Logger. checksum : bool Whether to perform checksum verification. Default is False. Returns ------- data : dict YAML file content as a dictionary. """path=make_path(filename)ifloggerisnotNone:logger.info(f"Reading {path}")text=path.read_text()data=yaml.safe_load(text)checksum_str=data.pop("checksum",None)ifchecksum:index=text.find("checksum")ifnotverify_checksum(text[:index],checksum_str):warnings.warn(f"Checksum verification failed for {filename}.",UserWarning)returndata
[docs]defwrite_yaml(dictionary,filename,logger=None,sort_keys=True,checksum=False):"""Write YAML file. Parameters ---------- dictionary : dict Python dictionary. filename : `~pathlib.Path` Filename. logger : `~logging.Logger`, optional Logger. Default is None. sort_keys : bool, optional Whether to sort keys. Default is True. checksum : bool, optional Whether to add checksum keyword. Default is False. """text=yaml.safe_dump(dictionary,default_flow_style=False,sort_keys=sort_keys)ifchecksum:text=add_checksum(text,sort_keys=sort_keys)path=make_path(filename)path.parent.mkdir(exist_ok=True)ifloggerisnotNone:logger.info(f"Writing {path}")path.write_text(text)
defmake_name(name=None):"""Make a dataset name."""ifnameisNone:name=urlsafe_b64encode(codecs.decode(uuid4().hex,"hex")).decode()[:8]whilename[0]=="_":name=urlsafe_b64encode(codecs.decode(uuid4().hex,"hex")).decode()[:8]ifnotisinstance(name,str):raiseValueError("Name argument must be a string, "f"got '{name}', which is of type '{type(name)}'")returnname
[docs]defmake_path(path):"""Expand environment variables on `~pathlib.Path` construction. Parameters ---------- path : str, `pathlib.Path` Path to expand. """# TODO: raise error or warning if environment variables that don't resolve are used# e.g. "spam/$DAMN/ham" where `$DAMN` is not defined# Otherwise this can result in cryptic errors later onifpathisNone:returnNoneelse:returnPath(os.path.expandvars(path))
[docs]defrecursive_merge_dicts(a,b):"""Recursively merge two dictionaries. Entries in 'b' override entries in 'a'. The built-in update function cannot be used for hierarchical dicts, see: http://stackoverflow.com/questions/3232943/update-value-of-a-nested-dictionary-of-varying-depth/3233356#3233356 Parameters ---------- a : dict Dictionary to be merged. b : dict Dictionary to be merged. Returns ------- c : dict Merged dictionary. Examples -------- >>> from gammapy.utils.scripts import recursive_merge_dicts >>> a = dict(a=42, b=dict(c=43, e=44)) >>> b = dict(d=99, b=dict(c=50, g=98)) >>> c = recursive_merge_dicts(a, b) >>> print(c) {'a': 42, 'b': {'c': 50, 'e': 44, 'g': 98}, 'd': 99} """c=a.copy()fork,vinb.items():ifkincandisinstance(c[k],dict):c[k]=recursive_merge_dicts(c[k],v)else:c[k]=vreturnc