Skip to content

Modules

Top-level package for bgcflow_wrapper.

bgcflow

Main module.

cloner(**kwargs)

Clone the BGCFlow repository to a specified destination.

Parameters:

Name Type Description Default
**kwargs dict

Keyword arguments for the cloning.

{}

Returns:

Type Description

None

Source code in bgcflow/bgcflow.py
def cloner(**kwargs):
    """
    Clone the BGCFlow repository to a specified destination.

    Args:
        **kwargs (dict): Keyword arguments for the cloning.

    Returns:
        None
    """
    destination_dir = Path(kwargs["destination"])
    click.echo(f"Cloning BGCFlow to {destination_dir}...")
    destination_dir.mkdir(parents=True, exist_ok=True)
    try:
        Repo.clone_from(
            "https://github.com/NBChub/bgcflow.git",
            Path(kwargs["destination"]),
            branch=kwargs["branch"],
        )
    except GitCommandError:
        print(
            f"Oops, it seems {kwargs['destination']} already exists and is not an empty directory."
        )
    return

deployer(**kwargs)

Deploy the BGCFlow repository to a specified destination using Snakedeploy.

Parameters:

Name Type Description Default
**kwargs dict

Keyword arguments for the deployment.

{}

Returns:

Type Description

None

Source code in bgcflow/bgcflow.py
def deployer(**kwargs):
    """
    Deploy the BGCFlow repository to a specified destination using Snakedeploy.

    Args:
        **kwargs (dict): Keyword arguments for the deployment.

    Returns:
        None
    """
    dplyr(
        "https://github.com/NBChub/bgcflow.git",
        branch=kwargs["branch"],
        name="bgcflow",
        dest_path=Path(kwargs["destination"]),
        tag=kwargs["tag"],
    )
    return

get_all_rules(**kwargs)

Print information about available rules in the BGCFlow repository.

Parameters:

Name Type Description Default
**kwargs dict

Keyword arguments for the function.

{}

Returns:

Type Description

None

Source code in bgcflow/bgcflow.py
def get_all_rules(**kwargs):
    """
    Print information about available rules in the BGCFlow repository.

    Args:
        **kwargs (dict): Keyword arguments for the function.

    Returns:
        None
    """
    path = Path(kwargs["bgcflow_dir"])
    rule_file = path / "workflow/rules.yaml"

    if rule_file.is_file():
        with open(rule_file, "r") as file:
            data = yaml.safe_load(file)
        try:
            if type(kwargs["describe"]) is str:
                rule_name = kwargs["describe"]
                print(f"Description for {rule_name}:")
                print(f" - {data[rule_name]['description']}")

            if type(kwargs["cite"]) is str:
                rule_name = kwargs["cite"]
                print(f"Citations for {rule_name}:")
                [print("-", c) for c in data[rule_name]["references"]]

            if (not type(kwargs["describe"]) is str) and (
                not type(kwargs["cite"]) is str
            ):
                print("Printing available rules:")
                for item in data.keys():
                    print(f" - {item}")

        except KeyError:
            rule_name = [
                r for r in [kwargs["describe"], kwargs["cite"]] if type(r) is str
            ]
            print(
                f"ERROR: Cannot find rule {rule_name} in dictionary. Find available rules with `bgcflow rules`."
            )

    else:
        print(
            "ERROR: Cannot find BGCFlow directory.\nPoint to the right directory using `--bgcflow_dir <destination>` or clone BGCFlow using `bgcflow clone <destination>`."
        )

    return

snakemake_wrapper(**kwargs)

Wrapper function for running Snakemake with BGCFlow.

Parameters:

Name Type Description Default
**kwargs dict

Keyword arguments for Snakemake and BGCFlow.

{}

Returns:

Type Description

None

Source code in bgcflow/bgcflow.py
def snakemake_wrapper(**kwargs):
    """
    Wrapper function for running Snakemake with BGCFlow.

    Args:
        **kwargs (dict): Keyword arguments for Snakemake and BGCFlow.

    Returns:
        None
    """
    p = "Empty process catcher"

    dryrun = ""
    touch = ""
    unlock = ""
    until = ""
    profile = ""
    antismash_mode = kwargs["antismash_mode"]
    os.environ["BGCFLOW_ANTISMASH_MODE"] = antismash_mode

    if kwargs["dryrun"]:
        dryrun = "--dryrun"
    if kwargs["touch"]:
        touch = "--touch"
    if kwargs["unlock"]:
        unlock = "--unlock"
    if kwargs["until"] is not None:
        until = f"--until {kwargs['until']}"
    if kwargs["profile"] is not None:
        profile = f"--profile {kwargs['profile']}"

    if kwargs["monitor_off"]:
        pass
    else:
        click.echo("Monitoring BGCFlow jobs with Panoptes...")
        # Run Panoptes if not yet run
        port = int(kwargs["wms_monitor"].split(":")[-1])

        try:
            item = requests.get(f"{kwargs['wms_monitor']}/api/service-info")
            status = item.json()["status"]
            assert status == "running"
            click.echo(f"Panoptes already {status} on {kwargs['wms_monitor']}")
        except requests.exceptions.RequestException:  # This is the correct syntax
            click.echo(
                f"Running Panoptes to monitor BGCFlow jobs at {kwargs['wms_monitor']}"
            )
            p = subprocess.Popen(
                ["panoptes", "--port", str(port)], stderr=subprocess.DEVNULL
            )
            click.echo(f"Panoptes job id: {p.pid}")

        # Connect to Panoptes
        click.echo("Connecting to Panoptes...")
        ctr = 1
        for tries in range(10):
            try:
                item = requests.get(f"{kwargs['wms_monitor']}/api/service-info")
                status = item.json()["status"]
                if status == "running":
                    click.echo(f"Panoptes status: {status}")
                    break
            except requests.exceptions.RequestException:  # This is the correct syntax
                click.echo(f"Retrying to connect: {ctr}x")
                ctr = ctr + 1
                time.sleep(1)
                pass
            else:
                time.sleep(1)

    # Check Snakefile
    valid_workflows = {
        "Snakefile": "Main BGCFlow snakefile for genome mining",
        "BGC": "Subworkflow for comparative analysis of BGCs",
        "Report": "Build a static html report of a BGCFlow run",
        "Database": "Build a DuckDB database for a BGCFlow run",
        "Metabase": "Run a metabase server for visual exploration of the DuckDB database",
        "lsabgc": "Run population genetic and evolutionary analysis with lsaBGC-Easy.py using BiG-SCAPE output",
        "ppanggolin": "Build pangenome graph and detect region of genome plasticity with PPanGGOLiN",
    }

    bgcflow_dir = Path(kwargs["bgcflow_dir"])
    if kwargs["workflow"] in [
        "workflow/Snakefile",
        "workflow/BGC",
        "workflow/Report",
        "workflow/Database",
        "workflow/Metabase",
        "workflow/lsabgc",
        "workflow/ppanggolin",
    ]:
        snakefile = bgcflow_dir / kwargs["workflow"]
    elif kwargs["workflow"] in [
        "Snakefile",
        "BGC",
        "Report",
        "Database",
        "Metabase",
        "lsabgc",
        "ppanggolin",
    ]:
        snakefile = bgcflow_dir / f'workflow/{kwargs["workflow"]}'
    else:
        snakefile = bgcflow_dir / kwargs["workflow"]

    assert (
        snakefile.is_file()
    ), f"Snakefile {snakefile} does not exist. Available workflows are:\n" + "\n".join(
        [f" - {k}: {v}" for k, v in valid_workflows.items()]
    )

    # Run Snakemake
    if kwargs["cores"] > multiprocessing.cpu_count():
        click.echo(
            f"\nWARNING: Number of cores inputted ({kwargs['cores']}) is higher than the number of available cores ({multiprocessing.cpu_count()})."
        )
        click.echo(
            f"DEBUG: Setting number of cores to available cores: {multiprocessing.cpu_count()}\n"
        )
        kwargs["cores"] = multiprocessing.cpu_count()
    else:
        click.echo(
            f"\nDEBUG: Using {kwargs['cores']} out of {multiprocessing.cpu_count()} available cores\n"
        )
    snakemake_command = f"cd {kwargs['bgcflow_dir']} && snakemake --snakefile {snakefile} --use-conda --keep-going --rerun-incomplete --rerun-triggers mtime -c {kwargs['cores']} {dryrun} {touch} {until} {unlock} {profile} --wms-monitor {kwargs['wms_monitor']}"
    click.echo(f"Running Snakemake with command:\n{snakemake_command}")
    subprocess.call(snakemake_command, shell=True)

    # Kill Panoptes
    try:
        if not type(p) == str:
            click.echo(f"Stopping panoptes server: PID {p.pid}")
            p.kill()
    except UnboundLocalError as e:
        click.echo(e)
    return

cli

Console script for bgcflow.

metabase

sync_dbt_models_to_metabase(dbt_dir, dbt_database, metabase_host, metabase_database, metabase_user, metabase_password, dbt_schema='main', metabase_http=True, dbt_excludes=None)

Synchronizes dbt models to Metabase using the dbt-metabase package.

Parameters:

Name Type Description Default
dbt_dir str

The path to the dbt project directory.

required
dbt_database str

The name of the dbt database to use.

required
metabase_host str

The URL of the Metabase server.

required
metabase_user str

The username of the Metabase account to use.

required
metabase_password str

The password of the Metabase account to use.

required
metabase_database str

The name of the Metabase database to use.

required
dbt_schema str

The name of the dbt schema to use. Defaults to "main".

'main'
metabase_http bool

Whether to use HTTP instead of HTTPS for the Metabase connection. Defaults to False.

True

Returns:

Type Description
str

The output of the dbt-metabase command as a string.

Source code in bgcflow/metabase.py
def sync_dbt_models_to_metabase(
    dbt_dir: str,
    dbt_database: str,
    metabase_host: str,
    metabase_database: str,
    metabase_user: str,
    metabase_password: str,
    dbt_schema: str = "main",
    metabase_http: bool = True,
    dbt_excludes: list = None,
) -> str:
    """
    Synchronizes dbt models to Metabase using the dbt-metabase package.

    Args:
        dbt_dir (str): The path to the dbt project directory.
        dbt_database (str): The name of the dbt database to use.
        metabase_host (str): The URL of the Metabase server.
        metabase_user (str): The username of the Metabase account to use.
        metabase_password (str): The password of the Metabase account to use.
        metabase_database (str): The name of the Metabase database to use.
        dbt_schema (str, optional): The name of the dbt schema to use. Defaults to "main".
        metabase_http (bool, optional): Whether to use HTTP instead of HTTPS for the Metabase connection. Defaults to False.

    Returns:
        str: The output of the dbt-metabase command as a string.
    """
    click.echo(" - Synchronizing dbt models schema to Metabase...")
    if metabase_http:
        click.echo(" - Connecting with HTTP method...")
        metabase_http = "--metabase_http"
    else:
        click.echo(" - Connecting with HTTPS method...")
        metabase_http = "--metabase_https"
    command = [
        "dbt-metabase",
        "models",
        "--dbt_path",
        str(dbt_dir),
        "--dbt_database",
        dbt_database,
        "--metabase_host",
        metabase_host.split("://")[-1],
        "--metabase_user",
        metabase_user,
        "--metabase_password",
        metabase_password,
        "--metabase_database",
        metabase_database,
        "--dbt_schema",
        dbt_schema,
        metabase_http,
    ]
    if dbt_excludes and len(dbt_excludes) > 0:
        command += ["--dbt_excludes", *dbt_excludes]

    # Run the command and capture the output
    result = subprocess.run(
        command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True
    )

    #  the output
    click.echo(result.stdout)
    click.echo(result.stderr)

upload_and_sync_to_metabase(project_name, bgcflow_dir, dbt_dir, metabase_host, mb_username, mb_password, dbt_schema='main', dbt_database='dbt_bgcflow', metabase_http=True, metabase_database=None, dbt_excludes=None)

Uploads a DuckDB database file generated by dbt to Metabase and syncs the dbt models.

Parameters:

Name Type Description Default
project_name str

The name of the project to upload to Metabase.

required
bgcflow_dir str

The root directory of the BGCFlow project.

required
dbt_dir str

The directory containing the dbt project to upload. If None, the directory is inferred from the BGCFlow project directory.

required
metabase_host str

The URL of the Metabase server.

required
mb_username str

The Metabase username. If None, the user will be prompted to enter their username.

required
mb_password str

The Metabase password. If None, the user will be prompted to enter their password.

required
dbt_schema str

The name of the dbt schema to use.

'main'
dbt_database str

The name of the dbt database to use.

'dbt_bgcflow'
metabase_http bool

Whether to use HTTP instead of HTTPS to connect to Metabase.

True
metabase_database str

The name of the Metabase database to use. If None, the project name is used.

None

Returns:

Type Description
str

The output of the dbt-metabase command as a string.

Exceptions:

Type Description
AssertionError

If the dbt_dir or bgcflow_dir do not exist or are not directories.

subprocess.CalledProcessError

If the dbt-metabase command fails.

Source code in bgcflow/metabase.py
def upload_and_sync_to_metabase(
    project_name: str,
    bgcflow_dir: str,
    dbt_dir: str,
    metabase_host: str,
    mb_username: str,
    mb_password: str,
    dbt_schema: str = "main",
    dbt_database: str = "dbt_bgcflow",
    metabase_http: bool = True,
    metabase_database: str = None,
    dbt_excludes: list = None,
) -> str:
    """
    Uploads a DuckDB database file generated by dbt to Metabase and syncs the dbt models.

    Args:
        project_name (str): The name of the project to upload to Metabase.
        bgcflow_dir (str): The root directory of the BGCFlow project.
        dbt_dir (str): The directory containing the dbt project to upload. If None, the directory is inferred from the BGCFlow project directory.
        metabase_host (str): The URL of the Metabase server.
        mb_username (str): The Metabase username. If None, the user will be prompted to enter their username.
        mb_password (str): The Metabase password. If None, the user will be prompted to enter their password.
        dbt_schema (str): The name of the dbt schema to use.
        dbt_database (str): The name of the dbt database to use.
        metabase_http (bool): Whether to use HTTP instead of HTTPS to connect to Metabase.
        metabase_database (str): The name of the Metabase database to use. If None, the project name is used.

    Returns:
        str: The output of the dbt-metabase command as a string.

    Raises:
        AssertionError: If the dbt_dir or bgcflow_dir do not exist or are not directories.
        subprocess.CalledProcessError: If the dbt-metabase command fails.
    """
    # available dbt models in bgcflow_dbt-duckdb v0.2.1
    dbt_model_dict = {
        "query-bigslice": ["bigfam_hits", "bigfam_network"],
        "bigscape": ["bigscape_cluster", "bigscape_network", "mibig_hits"],
        "checkm": ["checkm"],
        "seqfu": ["seqfu"],
        "antismash": ["genomes"],
    }

    if dbt_excludes is None:
        dbt_excludes = []
    else:
        dbt_excludes = list(dbt_excludes)

    if dbt_dir is None:
        report_dir = Path(bgcflow_dir) / f"data/processed/{project_name}"
        click.echo(f" - Accessing BGCFlow report directory in: {report_dir}")
        with open(report_dir / "metadata/dependency_versions.json", "r") as f:
            dependency_version = json.load(f)
        antismash_version = dependency_version["antismash"]
        click.echo(f" - AntiSMASH version: {antismash_version}")

        project_metadata_json = report_dir / "metadata/project_metadata.json"
        click.echo(f" - Reading project metadata from: {project_metadata_json}")
        with open(project_metadata_json, "r") as f:
            project_metadata = json.load(f)
        used_pipelines = list(project_metadata[project_name]["rule_used"].keys())
        click.echo(f" - Used pipelines: {', '.join(used_pipelines)}")
        for pipeline in dbt_model_dict.keys():
            if pipeline not in used_pipelines:
                dbt_excludes += dbt_model_dict[pipeline]
        click.echo(f" - Excluding models for sync: {', '.join(dbt_excludes)}")
        dbt_dir = report_dir / f"dbt/antiSMASH_{antismash_version}"

    elif isinstance(dbt_dir, str):
        click.echo(f" - Accessing dbt project directory in: {dbt_dir}")
        click.echo(
            f" - Using all models for sync: {', '.join(list(dbt_model_dict.values()))}"
        )
        dbt_dir = Path(dbt_dir)

    # Get Metabase session token
    if mb_username is None:
        mb_username = click.prompt("Enter your Metabase username")
    if mb_password is None:
        mb_password = click.prompt("Enter your Metabase password", hide_input=True)

    response, session_token = upload_dbt_to_metabase(
        project_name, bgcflow_dir, dbt_dir, metabase_host, mb_username, mb_password
    )
    if response == 200:
        if metabase_database is None:
            metabase_database = project_name
        sync_dbt_models_to_metabase(
            dbt_dir,
            dbt_database,
            metabase_host,
            metabase_database,
            mb_username,
            mb_password,
            dbt_schema,
            metabase_http,
            dbt_excludes,
        )

upload_dbt_to_metabase(project_name, bgcflow_dir, dbt_dir, metabase_host, mb_username, mb_password)

Uploads a DuckDB database file generated by dbt to Metabase.

Parameters:

Name Type Description Default
project_name str

The name of the project to upload to Metabase.

required
bgcflow_dir str

The path to the BGCflow directory.

required
dbt_dir str

The path to the dbt directory containing the DuckDB database file.

required
metabase_host str

The URL of the Metabase server.

required
mb_username str

The username to use for authentication with Metabase.

required
mb_password str

The password to use for authentication with Metabase.

required

Returns:

Type Description
str

The HTTP status code of the request.

Exceptions:

Type Description
AssertionError

If the DuckDB database file does not exist or is not a regular file.

Source code in bgcflow/metabase.py
def upload_dbt_to_metabase(
    project_name: str,
    bgcflow_dir: str,
    dbt_dir: str,
    metabase_host: str,
    mb_username: str,
    mb_password: str,
) -> str:
    """
    Uploads a DuckDB database file generated by dbt to Metabase.

    Args:
        project_name (str): The name of the project to upload to Metabase.
        bgcflow_dir (str): The path to the BGCflow directory.
        dbt_dir (str): The path to the dbt directory containing the DuckDB database file.
        metabase_host (str): The URL of the Metabase server.
        mb_username (str): The username to use for authentication with Metabase.
        mb_password (str): The password to use for authentication with Metabase.

    Returns:
        str: The HTTP status code of the request.

    Raises:
        AssertionError: If the DuckDB database file does not exist or is not a regular file.

    """
    duckdb_path = dbt_dir / "dbt_bgcflow.duckdb"
    assert (
        duckdb_path.is_file()
    ), f"Error: {duckdb_path} does not exist or is not a regular file"

    session_response = requests.post(
        f"{metabase_host}/api/session",
        json={"username": mb_username, "password": mb_password},
    )
    session_token = session_response.json()["id"]

    # Check if database already exists
    database_response = requests.get(
        f"{metabase_host}/api/database", headers={"X-Metabase-Session": session_token}
    )
    databases = database_response.json()
    database_id = None
    for k, v in databases.items():
        if k == "data":
            for db in v:
                if db["name"] == project_name:
                    database_id = db["id"]
                    break

    # Prompt user to continue or cancel upload
    if database_id is not None:
        user_input = input(
            f" - WARNING: A database with the name '{project_name}' already exists in Metabase. Do you want to continue with the upload? (y/n) "
        )
        if user_input.lower() != "y":
            click.echo(" - Database upload cancelled by user")
            return

    # Upload or update database in Metabase
    if database_id is None:
        database_response = requests.post(
            f"{metabase_host}/api/database",
            headers={"X-Metabase-Session": session_token},
            json={
                "engine": "duckdb",
                "name": project_name,
                "details": {"database_file": str(duckdb_path.resolve())},
            },
        )
        if database_response.status_code == 200:
            click.echo(f" - Database '{project_name}' uploaded successfully")
        else:
            click.echo(
                f" - Error uploading database '{project_name}': {database_response.text}"
            )

    else:
        database_response = requests.put(
            f"{metabase_host}/api/database/{database_id}",
            headers={"X-Metabase-Session": session_token},
            json={
                "engine": "duckdb",
                "name": project_name,
                "details": {"database_file": str(duckdb_path.resolve())},
            },
        )
        if database_response.status_code == 200:
            click.echo(f" - Database '{project_name}' updated successfully")
        else:
            click.echo(
                f" - Error updating database '{project_name}': {database_response.text}"
            )

    return database_response.status_code, session_token

mkdocs

Dict2Class

A class that converts a dictionary to an object with attributes.

Parameters:

Name Type Description Default
my_dict dict

The dictionary to convert to an object.

required

Methods

print_references(): Returns a formatted string of the references attribute of the object.

Source code in bgcflow/mkdocs.py
class Dict2Class(object):
    """
    A class that converts a dictionary to an object with attributes.

    Args:
        my_dict (dict): The dictionary to convert to an object.

    Attributes:
        All keys in the dictionary are converted to attributes of the object.

    Methods:
        print_references():
            Returns a formatted string of the `references` attribute of the object.
    """

    def __init__(self, my_dict):
        """
        Initializes the object with attributes from the dictionary.

        Args:
            my_dict (dict): The dictionary to convert to an object.
        """
        for key in my_dict:
            setattr(self, key, my_dict[key])

    def print_references(self):
        """
        Returns a formatted string of the `references` attribute of the object.

        Returns:
            str: A formatted string of the `references` attribute of the object.
        """
        text = ""
        for r in self.references:
            text = "\n".join([text, f"- {r}"])
        return text

__init__(self, my_dict) special

Initializes the object with attributes from the dictionary.

Parameters:

Name Type Description Default
my_dict dict

The dictionary to convert to an object.

required
Source code in bgcflow/mkdocs.py
def __init__(self, my_dict):
    """
    Initializes the object with attributes from the dictionary.

    Args:
        my_dict (dict): The dictionary to convert to an object.
    """
    for key in my_dict:
        setattr(self, key, my_dict[key])

print_references(self)

Returns a formatted string of the references attribute of the object.

Returns:

Type Description
str

A formatted string of the references attribute of the object.

Source code in bgcflow/mkdocs.py
def print_references(self):
    """
    Returns a formatted string of the `references` attribute of the object.

    Returns:
        str: A formatted string of the `references` attribute of the object.
    """
    text = ""
    for r in self.references:
        text = "\n".join([text, f"- {r}"])
    return text

generate_mkdocs_report(bgcflow_dir, project_name, port=8001, fileserver='http://localhost:8002', ipynb=True)

Generates an MkDocs report for a BGCFlow project.

Parameters:

Name Type Description Default
bgcflow_dir str

The path to the BGCFlow project directory.

required
project_name str

The name of the BGCFlow project.

required
port int

The port number to use for the MkDocs server, by default 8001.

8001
fileserver str

The URL of the file server to use, by default "http://localhost:8002".

'http://localhost:8002'
ipynb bool

Whether to use IPython notebooks for the reports, by default True.

True
Source code in bgcflow/mkdocs.py
def generate_mkdocs_report(
    bgcflow_dir: str,
    project_name: str,
    port: int = 8001,
    fileserver: str = "http://localhost:8002",
    ipynb: bool = True,
) -> None:
    """
    Generates an MkDocs report for a BGCFlow project.

    Args:
        bgcflow_dir (str): The path to the BGCFlow project directory.
        project_name (str: The name of the BGCFlow project.
        port (int, optional): The port number to use for the MkDocs server, by default 8001.
        fileserver (str, optional): The URL of the file server to use, by default "http://localhost:8002".
        ipynb (bool, optional): Whether to use IPython notebooks for the reports, by default True.
    """
    logging.info("Checking input folder..")

    # is it a bgcflow data directory or just a result directory?
    input_dir = Path(bgcflow_dir)
    if (input_dir / "metadata/project_metadata.json").is_file():
        report_dir = input_dir
    else:
        report_dir = input_dir / f"data/processed/{project_name}"
        assert (
            report_dir / "metadata/project_metadata.json"
        ).is_file(), "Unable to find BGCFlow results"
    logging.debug(f"Found project_metadata. Using [{report_dir}] as report directory.")

    # Get project metadata
    p = load_project_metadata(report_dir / "metadata/project_metadata.json")
    assert (
        p.name == project_name
    ), "Project metadata does not match with user provided input!"
    logging.debug(
        f"Project [{p.name}] was analysed using BGCFlow version {p.bgcflow_version}"
    )

    # available reports, check all output files
    logging.debug(f"Available reports: {list(p.rule_used.keys())}")
    df_results = pd.DataFrame.from_dict(p.rule_used).T

    # check available reports
    logging.info("Preparing mkdocs config...")
    if ipynb:
        extension = "ipynb"
    else:
        extension = "md"

    report_category_containers = {}
    for r, v in p.rule_used.items():
        jupyter_template = report_dir / f"docs/{r}.{extension}"
        # logging.debug(jupyter_template.is_file()) # TO DO ASSERT IPYNB FILES, THEY SHOULD BE IN THE DOCS
        logging.debug(f"Adding report [{r} : {jupyter_template.name}]")
        report_category = v["category"]
        if report_category not in report_category_containers.keys():
            report_category_containers[report_category] = []
        report_category_containers[report_category].append({r: jupyter_template.name})

    for k, v in report_category_containers.items():
        mkdocs_template["nav"].append({k: v})

    # write mkdocs template
    mkdocs_yml = report_dir / "mkdocs.yml"
    logging.info(f"Generating mkdocs config at: {mkdocs_yml}")
    write_mkdocs_file(mkdocs_template, mkdocs_yml, "yaml")

    # Generate index.md
    docs_dir = report_dir / "docs"
    docs_dir.mkdir(exist_ok=True, parents=True)
    mkdocs_index = docs_dir / "index.md"
    logging.info(f"Generating homepage at: {mkdocs_index}")
    df_results.loc[:, "BGCFlow_rules"] = df_results.index
    df_results = df_results.loc[:, ["BGCFlow_rules", "description"]].reset_index(
        drop=True
    )
    df_results.loc[:, "BGCFlow_rules"] = [
        f"[{i}]({i}/)" + "{.md-button}" for i in df_results.loc[:, "BGCFlow_rules"]
    ]
    data = {
        "p_name": p.name,
        "p_description": p.description,
        "p_sample_size": p.sample_size,
        "p_references": p.references,
        "rule_table": df_results.to_markdown(index=False),
    }
    j2_template = Template(index_template)

    write_mkdocs_file(j2_template.render(data), mkdocs_index, "write")

    # generate main.py macros
    mkdocs_py = report_dir / "main.py"
    logging.info(f"Generating python macros at: {mkdocs_py}")
    j2_template = Template(macros_template)
    write_mkdocs_file(
        j2_template.render({"file_server": fileserver}), mkdocs_py, "write"
    )

    # generate custom javascripts
    # script_dir = docs_dir / "scripts"
    # script_dir.mkdir(parents=True, exist_ok=True)
    # logging.info(f"Generating custom site javascripts at: {script_dir / 'site.js'}")
    # with open(script_dir / 'site.js', "w") as f:
    #    f.write(script_js)

    # extend main html
    override_dir = report_dir / "overrides"
    override_dir.mkdir(exist_ok=True, parents=True)
    logging.info(f"Extends main html: {override_dir / 'main.html'}")
    with open(override_dir / "main.html", "w") as f:
        f.write(main_html)

    # generate assets
    asset_path = docs_dir / "assets/bgcflow"
    asset_path.mkdir(exist_ok=True, parents=True)
    logging.info("Generating assets...")
    logo_path = asset_path / "BGCFlow_logo.svg"
    shutil.copy(Path(__file__).parent / "outputs/svg/BGCFlow_logo.svg", logo_path)

    # generate symlink
    # for r in ['antismash', 'bigscape']:
    #    target_path_raw = report_dir / r
    #    for target_path in target_path_raw.glob("*"):
    #        if any(target_path.name.startswith(keywords) for keywords in ['result', '6']):
    #            if target_path.is_dir():
    #                symlink_path = asset_path / r
    #                if symlink_path.is_symlink():
    #                    symlink_path.unlink()
    #                symlink_path.symlink_to(target_path.resolve())

    # Running fileserver
    if fileserver == "http://localhost:8002":
        fs = subprocess.Popen(
            [
                "python",
                "-m",
                "http.server",
                "--directory",
                report_dir,
                fileserver.split(":")[-1],
            ],
            stderr=subprocess.DEVNULL,
        )
        fs_run_by_bgcflow = True
        logging.info(f"Running http file-server. Job id: {fs.pid}")
    else:
        fs_run_by_bgcflow = False
    # dumping file server location
    with open("bgcflow_wrapper.log", "w") as f:
        log_port = {"report_server": port, "file_server": fileserver}
        if fs_run_by_bgcflow:
            log_port["pid"] = fs.pid
        json.dump(log_port, f, indent=2)

    try:
        signal.signal(signal.SIGINT, signal_handler)
        subprocess.call(
            f"(cd {str(report_dir)} && mkdocs serve -a localhost:{port})", shell=True
        )
        if fs_run_by_bgcflow:
            fs.kill()
        # asset_path.rmdir()
    except subprocess.CalledProcessError:
        if fs_run_by_bgcflow:
            fs.kill()
        # asset_path.rmdir()
    return

load_project_metadata(path_to_metadata)

Loads project metadata from a JSON file and returns it as an object.

Parameters:

Name Type Description Default
path_to_metadata str or Path

The path to the JSON file containing the project metadata.

required

Returns:

Type Description
Dict2Class

An object representing the project metadata.

Source code in bgcflow/mkdocs.py
def load_project_metadata(path_to_metadata):
    """
    Loads project metadata from a JSON file and returns it as an object.

    Args:
        path_to_metadata (str or Path): The path to the JSON file containing the project metadata.

    Returns:
        Dict2Class: An object representing the project metadata.
    """
    with open(path_to_metadata, "r") as f:
        project_metadata = json.load(f)
        p = list(project_metadata.values())[0]
        p["name"] = [i for i in project_metadata.keys()][0]
        p = Dict2Class(p)
    return p

signal_handler(signal, frame)

A signal handler function that prints a message and exits the program.

Parameters:

Name Type Description Default
signal int

The signal number.

required
frame FrameType

The current stack frame.

required
Source code in bgcflow/mkdocs.py
def signal_handler(signal, frame):
    """
    A signal handler function that prints a message and exits the program.

    Args:
        signal (int): The signal number.
        frame (FrameType): The current stack frame.
    """
    print("\nThank you for using BGCFlow Report!")
    # with open('bgcflow_wrapper.log', "r") as f:
    #    log_port = json.load(f)
    #    os.kill(log_port['pid'], signal.signal.SIGKILL)
    sys.exit(0)

write_mkdocs_file(data_input, output_file, action)

Writes data to a file in either YAML or plain text format.

Parameters:

Name Type Description Default
data_input dict or str

The data to write to the file.

required
output_file str or Path

The path to the file to write.

required
action str

The action to perform. Either "yaml" to write the data in YAML format, or "write" to write the data as plain text.

required
Source code in bgcflow/mkdocs.py
def write_mkdocs_file(data_input, output_file, action):
    """
    Writes data to a file in either YAML or plain text format.

    Args:
        data_input (dict or str): The data to write to the file.
        output_file (str or Path): The path to the file to write.
        action (str): The action to perform. Either "yaml" to write the data in YAML format, or "write" to write the data as plain text.
    """
    if output_file.exists():
        overwrite = input(
            f"WARNING: {output_file} already exists. Do you want to overwrite it? (y/n) "
        )
        if overwrite.lower() != "y":
            print("Skipping file write.")
        else:
            # continue with writing the file
            with open(output_file, "w", encoding="utf-8") as f:
                if action == "yaml":
                    yaml.dump(data_input, f)
                elif action == "write":
                    f.write(data_input)
    else:
        # continue with writing the file
        with open(output_file, "w", encoding="utf-8") as f:
            if action == "yaml":
                yaml.dump(data_input, f)
            elif action == "write":
                f.write(data_input)

projects_util

bgcflow_init(bgcflow_dir, global_config)

Initialize BGCFlow configuration and display available projects.

Initializes BGCFlow configuration based on the provided directory and global configuration path. If the global configuration file exists, it lists the available projects. If not, generates a global configuration file from the template and provides instructions for a test run.

Parameters:

Name Type Description Default
bgcflow_dir str or pathlib.PosixPath

The directory where the BGCFlow configuration is located.

required
global_config str or pathlib.PosixPath

The path to the global configuration file.

required
Source code in bgcflow/projects_util.py
def bgcflow_init(bgcflow_dir, global_config):
    """
    Initialize BGCFlow configuration and display available projects.

    Initializes BGCFlow configuration based on the provided directory and global configuration path.
    If the global configuration file exists, it lists the available projects.
    If not, generates a global configuration file from the template and provides instructions for a test run.

    Args:
        bgcflow_dir (str or pathlib.PosixPath): The directory where the BGCFlow configuration is located.
        global_config (str or pathlib.PosixPath): The path to the global configuration file.
    """
    # check if global config available
    if global_config.is_file():
        # grab available projects
        logging.debug(f"Found config file at: {global_config}")
        with open(global_config, "r") as file:
            config_yaml = yaml.safe_load(file)
            project_names = [p for p in config_yaml["projects"]]
            list_of_projects = {}
            for p in project_names:
                if "pep" in p.keys():
                    p["name"] = p.pop("pep")
                if p["name"].endswith(".yaml"):
                    pep = peppy.Project(
                        str(bgcflow_dir / p["name"]), sample_table_index="genome_id"
                    )
                    name = pep.name
                    file_path = pep.config["sample_table"]
                else:
                    name = p["name"]
                    file_path = p["samples"]
                list_of_projects[name] = file_path

            print("Available projects:")
            for p in list_of_projects.keys():
                print(f" - {p} : {file_path}")
    else:
        generate_global_config(bgcflow_dir, global_config)

    print("\nDo a test run by: `bgcflow run -n`")

copy_final_output(**kwargs)

Copy final project output files to a specified destination.

This function facilitates the copying of processed project output files to a designated destination. It can also preserve symbolic links during the copy process if specified.

Parameters:

Name Type Description Default
**kwargs dict

Keyword argument for the function.

{}

Keyword arguments:

Name Type Description
bgcflow_dir str

The directory where the BGCFlow configuration is located.

project str

The name of the project whose output should be copied.

resolve_symlinks str

Indicate whether to preserve symbolic links. Defaults to False.

destination str

The destination directory where the output should be copied.

Source code in bgcflow/projects_util.py
def copy_final_output(**kwargs):
    """
    Copy final project output files to a specified destination.

    This function facilitates the copying of processed project output files to a designated destination. It can
    also preserve symbolic links during the copy process if specified.

    Args:
        **kwargs (dict): Keyword argument for the function.

    Keyword arguments:
        bgcflow_dir (str): The directory where the BGCFlow configuration is located.
        project (str): The name of the project whose output should be copied.
        resolve_symlinks (str, optional): Indicate whether to preserve symbolic links. Defaults to False.
        destination (str): The destination directory where the output should be copied.
    """
    bgcflow_dir = Path(kwargs["bgcflow_dir"]).resolve()
    project_output = bgcflow_dir / f"data/processed/{kwargs['project']}"
    assert (
        project_output.is_dir()
    ), f"ERROR: Cannot find project [{kwargs['project']}] results. Run `bgcflow init` to find available projects."
    if "resolve_symlinks" in kwargs.keys():
        assert kwargs["resolve_symlinks"] in [
            "True",
            "False",
        ], f'Invalid argument {kwargs["resolve_symlinks"]} in --resolve-symlinks. Choose between "True" or "False"'
        if kwargs["resolve_symlinks"] == "True":
            resolve_symlinks = "-L"
    else:
        resolve_symlinks = ""
    exclude_copy = f"{str(project_output.stem)}/bigscape/*/cache"
    command = [
        "rsync",
        "-avPhr",
        resolve_symlinks,
        "--exclude",
        exclude_copy,
        str(project_output),
        kwargs["destination"],
    ]
    logging.debug(f'Running command: {" ".join(command)}')
    subprocess.call(command)

generate_global_config(bgcflow_dir, global_config)

Generate a BGCFlow global configuration file from a template.

Copies the template configuration file to the specified global configuration path.

Parameters:

Name Type Description Default
bgcflow_dir str or pathlib.PosixPath

The directory where the BGCFlow configuration is located.

required
global_config str or pathlib.PosixPath

The path to the global configuration file to be generated.

required
Source code in bgcflow/projects_util.py
def generate_global_config(bgcflow_dir, global_config):
    """
    Generate a BGCFlow global configuration file from a template.

    Copies the template configuration file to the specified global configuration path.

    Args:
        bgcflow_dir (str or pathlib.PosixPath): The directory where the BGCFlow configuration is located.
        global_config (str or pathlib.PosixPath): The path to the global configuration file to be generated.
    """
    logging.info(f"Generating config file from template at: {global_config}")
    template_config = bgcflow_dir / ".examples/_config_example.yaml"
    assert (
        template_config.is_file()
    ), "Cannot find template file. Are you using BGCFlow version >= 0.4.1?"

    shutil.copy(template_config, global_config)

    # scan for example projects
    def copy_project_example(project_type):
        """
        Scan global config for example projects and (sub projects) and copy them to the config directory.
        """
        with open(global_config, "r") as file:
            config_yaml = yaml.safe_load(file)
        example_projects = [
            Path(p["pep"])
            for p in config_yaml[project_type]
            if "pep" in p.keys() and p["pep"].endswith(".yaml")
        ]

        for example_project in example_projects:
            example_project_dir = (
                bgcflow_dir / ".examples" / example_project.parent.name
            )
            target_dir = bgcflow_dir / "config" / example_project_dir.name
            if str(example_project).startswith(".examples"):
                logging.warning(
                    f"\n - WARNING: You are using BGCFlow version <= 0.7.1. In the global config file (`{global_config}`), please change the location of your `{example_project}` to `config/{example_project.parent.name}/{example_project.name}`."
                )
            shutil.copytree(example_project_dir, target_dir)

    for project_type in ["projects", "bgc_projects"]:
        copy_project_example(project_type)

generate_project(bgcflow_dir, project_name, pep_version='2.1.0', use_project_rules=False, samples_csv=False, prokka_db=False, gtdb_tax=False, description=False)

Generate a PEP project configuration in BGCFlow.

This function creates a configuration file for a Project Enhanced Pipelines (PEP) project within the BGCFlow framework. It allows you to define various aspects of the project, such as its name, version, description, sample data, custom annotations, and more.

Parameters:

Name Type Description Default
bgcflow_dir str or pathlib.PosixPath

The directory where the BGCFlow configuration is located.

required
project_name str

The name of the project.

required
pep_version str

The version of the PEP specification. Defaults to "2.1.0".

'2.1.0'
use_project_rules bool

Flag indicating whether to use project-specific rules. Defaults to False.

False
samples_csv pd.core.frame.DataFrame or str

Sample data in Pandas DataFrame or path to a CSV file. Defaults to False.

False
prokka_db str

Path to a custom Prokka annotation file. Defaults to False.

False
gtdb_tax str

Path to a custom GTDB taxonomy file. Defaults to False.

False
description str

Description for the project. Defaults to False.

False
Source code in bgcflow/projects_util.py
def generate_project(
    bgcflow_dir,
    project_name,
    pep_version="2.1.0",
    use_project_rules=False,
    samples_csv=False,
    prokka_db=False,
    gtdb_tax=False,
    description=False,
):
    """
    Generate a PEP project configuration in BGCFlow.

    This function creates a configuration file for a Project Enhanced Pipelines (PEP)
    project within the BGCFlow framework. It allows you to define various aspects of
    the project, such as its name, version, description, sample data, custom annotations,
    and more.

    Args:
        bgcflow_dir (str or pathlib.PosixPath): The directory where the BGCFlow configuration is located.
        project_name (str): The name of the project.
        pep_version (str, optional): The version of the PEP specification. Defaults to "2.1.0".
        use_project_rules (bool, optional): Flag indicating whether to use project-specific rules. Defaults to False.
        samples_csv (pd.core.frame.DataFrame or str, optional): Sample data in Pandas DataFrame or path to a CSV file. Defaults to False.
        prokka_db (str, optional): Path to a custom Prokka annotation file. Defaults to False.
        gtdb_tax (str, optional): Path to a custom GTDB taxonomy file. Defaults to False.
        description (str, optional): Description for the project. Defaults to False.
    """

    # Ensure bgcflow_dir is a pathlib.PosixPath
    if not isinstance(bgcflow_dir, Path):
        bgcflow_dir = Path(bgcflow_dir)

    # Define paths and template dictionary
    global_config = bgcflow_dir / "config/config.yaml"
    template_dict = {
        "name": project_name,
        "pep_version": pep_version,
        "description": "<TO DO: give a description to your project>",
        "sample_table": "samples.csv",
        "prokka-db": "OPTIONAL: relative path to your `prokka-db.csv`",
        "gtdb-tax": "OPTIONAL: relative path to your `gtdbtk.bac120.summary.tsv`",
    }

    # Update template_dict with project rules if enabled
    if use_project_rules:
        with open(bgcflow_dir / "workflow/rules.yaml", "r") as file:
            available_rules = yaml.safe_load(file)
            available_rules = {rule: "FALSE" for rule in available_rules.keys()}
            template_dict["rules"] = available_rules

    # Create project directory
    project_dir = bgcflow_dir / f"config/{project_name}"
    project_dir.mkdir(parents=True, exist_ok=True)

    # Handle samples_csv input
    if isinstance(samples_csv, pd.core.frame.DataFrame):
        logging.debug("Generating samples file from Pandas DataFrame")
        assert samples_csv.index.name == "genome_id"
        assert (
            samples_csv.columns
            == [
                "source",
                "organism",
                "genus",
                "species",
                "strain",
                "closest_placement_reference",
            ]
        ).all
        samples_csv.to_csv(project_dir / "samples.csv")
    elif isinstance(samples_csv, str):
        logging.debug(f"Copying samples file from {samples_csv}")
        samples_csv = Path(samples_csv)
        assert samples_csv.is_file()
        shutil.copy(samples_csv, project_dir / "samples.csv")

    # Handle prokka_db input
    if isinstance(prokka_db, str):
        logging.debug(f"Copying custom annotation file from {prokka_db}")
        prokka_db = Path(prokka_db)
        assert prokka_db.is_file()
        shutil.copy(prokka_db, project_dir / "prokka-db.csv")
        template_dict["prokka-db"] = "prokka-db.csv"

    # Handle gtdb_tax input
    if isinstance(gtdb_tax, str):
        logging.debug(f"Copying custom taxonomy from {gtdb_tax}")
        gtdb_tax = Path(gtdb_tax)
        assert gtdb_tax.is_file()
        shutil.copy(gtdb_tax, project_dir / "gtdbtk.bac120.summary.tsv")
        template_dict["gtdb-tax"] = "gtdbtk.bac120.summary.tsv"

    # Update template_dict with project description
    if isinstance(description, str):
        logging.debug("Writing project description...")
        template_dict["description"] = description

    # Generate project configuration file
    logging.info(f"Project config file generated in: {project_dir}")
    with open(project_dir / "project_config.yaml", "w") as file:
        yaml.dump(template_dict, file, sort_keys=False)

    # Initialize global config if not present
    if not global_config.is_file():
        bgcflow_init(bgcflow_dir, global_config)

    # Update global config.yaml with project information
    with open(bgcflow_dir / "config/config.yaml", "r") as file:
        logging.debug("Updating global config.yaml")
        main_config = yaml.safe_load(file)

        # Rename 'pep' to 'name' for consistency
        for item in main_config["projects"]:
            if "pep" in item:
                item["name"] = item.pop("pep")

        # Rename 'pipelines' to 'rules'
        if "pipelines" in main_config.keys():
            main_config["rules"] = main_config.pop("pipelines")

        project_names = [p["name"] for p in main_config["projects"]]
        assert (
            project_name not in project_names
        ), f"Project name: '{project_name}' already exists!\nUse a different name or edit the files in: {project_dir}"
        assert (
            str(project_dir / "project_config.yaml") not in project_names
        ), f"Project name: '{project_name}' already exists!\nUse a different name or edit the files in: {project_dir}"
        main_config["projects"].append(
            {"name": str(project_dir / "project_config.yaml")}
        )

    # Update and save global config
    with open(bgcflow_dir / "config/config.yaml", "w") as file:
        yaml.dump(main_config, file, sort_keys=False)

projects_util(**kwargs)

Utility function for managing BGCflow projects.

Parameters:

Name Type Description Default
**kwargs dict

Keyword arguments for the function.

{}

Keyword arguments:

Name Type Description
bgcflow_dir str

Path to the BGCflow directory.

project str

Name of the BGCflow project to generate.

use_project_pipeline bool

Whether to use the project-specific pipeline rules.

prokka_db str

Path to the Prokka database.

gtdb_tax str

Path to the GTDB taxonomy file.

samples_csv str

Path to the samples CSV file.

Source code in bgcflow/projects_util.py
def projects_util(**kwargs):
    """
    Utility function for managing BGCflow projects.

    Args:
        **kwargs (dict): Keyword arguments for the function.

    Keyword Arguments:
        bgcflow_dir (str): Path to the BGCflow directory.
        project (str): Name of the BGCflow project to generate.
        use_project_pipeline (bool): Whether to use the project-specific pipeline rules.
        prokka_db (str): Path to the Prokka database.
        gtdb_tax (str): Path to the GTDB taxonomy file.
        samples_csv (str): Path to the samples CSV file.
    """

    # pep_version = "2.1.0"
    bgcflow_dir = Path(kwargs["bgcflow_dir"]).resolve()
    config_dir = bgcflow_dir / "config"
    config_dir.mkdir(parents=True, exist_ok=True)
    global_config = config_dir / "config.yaml"

    if type(kwargs["project"]) == str:
        # project_name = kwargs["project"]

        generate_project(
            bgcflow_dir,
            kwargs["project"],
            use_project_rules=kwargs["use_project_pipeline"],
            prokka_db=kwargs["prokka_db"],
            gtdb_tax=kwargs["gtdb_tax"],
            samples_csv=kwargs["samples_csv"],
        )
    else:
        bgcflow_init(bgcflow_dir, global_config)