Modules
Top-level package for bgcflow_wrapper.
bgcflow
¶
Main module.
cloner(**kwargs)
¶
Clone the BGCFlow repository to a specified destination.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
**kwargs |
dict |
Keyword arguments for the cloning. |
{} |
Returns:
Type | Description |
---|---|
None |
Source code in bgcflow/bgcflow.py
def cloner(**kwargs):
"""
Clone the BGCFlow repository to a specified destination.
Args:
**kwargs (dict): Keyword arguments for the cloning.
Returns:
None
"""
destination_dir = Path(kwargs["destination"])
click.echo(f"Cloning BGCFlow to {destination_dir}...")
destination_dir.mkdir(parents=True, exist_ok=True)
try:
Repo.clone_from(
"https://github.com/NBChub/bgcflow.git",
Path(kwargs["destination"]),
branch=kwargs["branch"],
)
except GitCommandError:
print(
f"Oops, it seems {kwargs['destination']} already exists and is not an empty directory."
)
return
deployer(**kwargs)
¶
Deploy the BGCFlow repository to a specified destination using Snakedeploy.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
**kwargs |
dict |
Keyword arguments for the deployment. |
{} |
Returns:
Type | Description |
---|---|
None |
Source code in bgcflow/bgcflow.py
def deployer(**kwargs):
"""
Deploy the BGCFlow repository to a specified destination using Snakedeploy.
Args:
**kwargs (dict): Keyword arguments for the deployment.
Returns:
None
"""
dplyr(
"https://github.com/NBChub/bgcflow.git",
branch=kwargs["branch"],
name="bgcflow",
dest_path=Path(kwargs["destination"]),
tag=kwargs["tag"],
)
return
get_all_rules(**kwargs)
¶
Print information about available rules in the BGCFlow repository.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
**kwargs |
dict |
Keyword arguments for the function. |
{} |
Returns:
Type | Description |
---|---|
None |
Source code in bgcflow/bgcflow.py
def get_all_rules(**kwargs):
"""
Print information about available rules in the BGCFlow repository.
Args:
**kwargs (dict): Keyword arguments for the function.
Returns:
None
"""
path = Path(kwargs["bgcflow_dir"])
rule_file = path / "workflow/rules.yaml"
if rule_file.is_file():
with open(rule_file, "r") as file:
data = yaml.safe_load(file)
try:
if type(kwargs["describe"]) is str:
rule_name = kwargs["describe"]
print(f"Description for {rule_name}:")
print(f" - {data[rule_name]['description']}")
if type(kwargs["cite"]) is str:
rule_name = kwargs["cite"]
print(f"Citations for {rule_name}:")
[print("-", c) for c in data[rule_name]["references"]]
if (not type(kwargs["describe"]) is str) and (
not type(kwargs["cite"]) is str
):
print("Printing available rules:")
for item in data.keys():
print(f" - {item}")
except KeyError:
rule_name = [
r for r in [kwargs["describe"], kwargs["cite"]] if type(r) is str
]
print(
f"ERROR: Cannot find rule {rule_name} in dictionary. Find available rules with `bgcflow rules`."
)
else:
print(
"ERROR: Cannot find BGCFlow directory.\nPoint to the right directory using `--bgcflow_dir <destination>` or clone BGCFlow using `bgcflow clone <destination>`."
)
return
snakemake_wrapper(**kwargs)
¶
Wrapper function for running Snakemake with BGCFlow.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
**kwargs |
dict |
Keyword arguments for Snakemake and BGCFlow. |
{} |
Returns:
Type | Description |
---|---|
None |
Source code in bgcflow/bgcflow.py
def snakemake_wrapper(**kwargs):
"""
Wrapper function for running Snakemake with BGCFlow.
Args:
**kwargs (dict): Keyword arguments for Snakemake and BGCFlow.
Returns:
None
"""
p = "Empty process catcher"
dryrun = ""
touch = ""
unlock = ""
until = ""
profile = ""
antismash_mode = kwargs["antismash_mode"]
os.environ["BGCFLOW_ANTISMASH_MODE"] = antismash_mode
if kwargs["dryrun"]:
dryrun = "--dryrun"
if kwargs["touch"]:
touch = "--touch"
if kwargs["unlock"]:
unlock = "--unlock"
if kwargs["until"] is not None:
until = f"--until {kwargs['until']}"
if kwargs["profile"] is not None:
profile = f"--profile {kwargs['profile']}"
if kwargs["monitor_off"]:
pass
else:
click.echo("Monitoring BGCFlow jobs with Panoptes...")
# Run Panoptes if not yet run
port = int(kwargs["wms_monitor"].split(":")[-1])
try:
item = requests.get(f"{kwargs['wms_monitor']}/api/service-info")
status = item.json()["status"]
assert status == "running"
click.echo(f"Panoptes already {status} on {kwargs['wms_monitor']}")
except requests.exceptions.RequestException: # This is the correct syntax
click.echo(
f"Running Panoptes to monitor BGCFlow jobs at {kwargs['wms_monitor']}"
)
p = subprocess.Popen(
["panoptes", "--port", str(port)], stderr=subprocess.DEVNULL
)
click.echo(f"Panoptes job id: {p.pid}")
# Connect to Panoptes
click.echo("Connecting to Panoptes...")
ctr = 1
for tries in range(10):
try:
item = requests.get(f"{kwargs['wms_monitor']}/api/service-info")
status = item.json()["status"]
if status == "running":
click.echo(f"Panoptes status: {status}")
break
except requests.exceptions.RequestException: # This is the correct syntax
click.echo(f"Retrying to connect: {ctr}x")
ctr = ctr + 1
time.sleep(1)
pass
else:
time.sleep(1)
# Check Snakefile
valid_workflows = {
"Snakefile": "Main BGCFlow snakefile for genome mining",
"BGC": "Subworkflow for comparative analysis of BGCs",
"Report": "Build a static html report of a BGCFlow run",
"Database": "Build a DuckDB database for a BGCFlow run",
"Metabase": "Run a metabase server for visual exploration of the DuckDB database",
"lsabgc": "Run population genetic and evolutionary analysis with lsaBGC-Easy.py using BiG-SCAPE output",
"ppanggolin": "Build pangenome graph and detect region of genome plasticity with PPanGGOLiN",
}
bgcflow_dir = Path(kwargs["bgcflow_dir"])
if kwargs["workflow"] in [
"workflow/Snakefile",
"workflow/BGC",
"workflow/Report",
"workflow/Database",
"workflow/Metabase",
"workflow/lsabgc",
"workflow/ppanggolin",
]:
snakefile = bgcflow_dir / kwargs["workflow"]
elif kwargs["workflow"] in [
"Snakefile",
"BGC",
"Report",
"Database",
"Metabase",
"lsabgc",
"ppanggolin",
]:
snakefile = bgcflow_dir / f'workflow/{kwargs["workflow"]}'
else:
snakefile = bgcflow_dir / kwargs["workflow"]
assert (
snakefile.is_file()
), f"Snakefile {snakefile} does not exist. Available workflows are:\n" + "\n".join(
[f" - {k}: {v}" for k, v in valid_workflows.items()]
)
# Run Snakemake
if kwargs["cores"] > multiprocessing.cpu_count():
click.echo(
f"\nWARNING: Number of cores inputted ({kwargs['cores']}) is higher than the number of available cores ({multiprocessing.cpu_count()})."
)
click.echo(
f"DEBUG: Setting number of cores to available cores: {multiprocessing.cpu_count()}\n"
)
kwargs["cores"] = multiprocessing.cpu_count()
else:
click.echo(
f"\nDEBUG: Using {kwargs['cores']} out of {multiprocessing.cpu_count()} available cores\n"
)
snakemake_command = f"cd {kwargs['bgcflow_dir']} && snakemake --snakefile {snakefile} --use-conda --keep-going --rerun-incomplete --rerun-triggers mtime -c {kwargs['cores']} {dryrun} {touch} {until} {unlock} {profile} --wms-monitor {kwargs['wms_monitor']}"
click.echo(f"Running Snakemake with command:\n{snakemake_command}")
subprocess.call(snakemake_command, shell=True)
# Kill Panoptes
try:
if not type(p) == str:
click.echo(f"Stopping panoptes server: PID {p.pid}")
p.kill()
except UnboundLocalError as e:
click.echo(e)
return
cli
¶
Console script for bgcflow.
metabase
¶
sync_dbt_models_to_metabase(dbt_dir, dbt_database, metabase_host, metabase_database, metabase_user, metabase_password, dbt_schema='main', metabase_http=True, dbt_excludes=None)
¶
Synchronizes dbt models to Metabase using the dbt-metabase package.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dbt_dir |
str |
The path to the dbt project directory. |
required |
dbt_database |
str |
The name of the dbt database to use. |
required |
metabase_host |
str |
The URL of the Metabase server. |
required |
metabase_user |
str |
The username of the Metabase account to use. |
required |
metabase_password |
str |
The password of the Metabase account to use. |
required |
metabase_database |
str |
The name of the Metabase database to use. |
required |
dbt_schema |
str |
The name of the dbt schema to use. Defaults to "main". |
'main' |
metabase_http |
bool |
Whether to use HTTP instead of HTTPS for the Metabase connection. Defaults to False. |
True |
Returns:
Type | Description |
---|---|
str |
The output of the dbt-metabase command as a string. |
Source code in bgcflow/metabase.py
def sync_dbt_models_to_metabase(
dbt_dir: str,
dbt_database: str,
metabase_host: str,
metabase_database: str,
metabase_user: str,
metabase_password: str,
dbt_schema: str = "main",
metabase_http: bool = True,
dbt_excludes: list = None,
) -> str:
"""
Synchronizes dbt models to Metabase using the dbt-metabase package.
Args:
dbt_dir (str): The path to the dbt project directory.
dbt_database (str): The name of the dbt database to use.
metabase_host (str): The URL of the Metabase server.
metabase_user (str): The username of the Metabase account to use.
metabase_password (str): The password of the Metabase account to use.
metabase_database (str): The name of the Metabase database to use.
dbt_schema (str, optional): The name of the dbt schema to use. Defaults to "main".
metabase_http (bool, optional): Whether to use HTTP instead of HTTPS for the Metabase connection. Defaults to False.
Returns:
str: The output of the dbt-metabase command as a string.
"""
click.echo(" - Synchronizing dbt models schema to Metabase...")
if metabase_http:
click.echo(" - Connecting with HTTP method...")
metabase_http = "--metabase_http"
else:
click.echo(" - Connecting with HTTPS method...")
metabase_http = "--metabase_https"
command = [
"dbt-metabase",
"models",
"--dbt_path",
str(dbt_dir),
"--dbt_database",
dbt_database,
"--metabase_host",
metabase_host.split("://")[-1],
"--metabase_user",
metabase_user,
"--metabase_password",
metabase_password,
"--metabase_database",
metabase_database,
"--dbt_schema",
dbt_schema,
metabase_http,
]
if dbt_excludes and len(dbt_excludes) > 0:
command += ["--dbt_excludes", *dbt_excludes]
# Run the command and capture the output
result = subprocess.run(
command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True
)
# the output
click.echo(result.stdout)
click.echo(result.stderr)
upload_and_sync_to_metabase(project_name, bgcflow_dir, dbt_dir, metabase_host, mb_username, mb_password, dbt_schema='main', dbt_database='dbt_bgcflow', metabase_http=True, metabase_database=None, dbt_excludes=None)
¶
Uploads a DuckDB database file generated by dbt to Metabase and syncs the dbt models.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
project_name |
str |
The name of the project to upload to Metabase. |
required |
bgcflow_dir |
str |
The root directory of the BGCFlow project. |
required |
dbt_dir |
str |
The directory containing the dbt project to upload. If None, the directory is inferred from the BGCFlow project directory. |
required |
metabase_host |
str |
The URL of the Metabase server. |
required |
mb_username |
str |
The Metabase username. If None, the user will be prompted to enter their username. |
required |
mb_password |
str |
The Metabase password. If None, the user will be prompted to enter their password. |
required |
dbt_schema |
str |
The name of the dbt schema to use. |
'main' |
dbt_database |
str |
The name of the dbt database to use. |
'dbt_bgcflow' |
metabase_http |
bool |
Whether to use HTTP instead of HTTPS to connect to Metabase. |
True |
metabase_database |
str |
The name of the Metabase database to use. If None, the project name is used. |
None |
Returns:
Type | Description |
---|---|
str |
The output of the dbt-metabase command as a string. |
Exceptions:
Type | Description |
---|---|
AssertionError |
If the dbt_dir or bgcflow_dir do not exist or are not directories. |
subprocess.CalledProcessError |
If the dbt-metabase command fails. |
Source code in bgcflow/metabase.py
def upload_and_sync_to_metabase(
project_name: str,
bgcflow_dir: str,
dbt_dir: str,
metabase_host: str,
mb_username: str,
mb_password: str,
dbt_schema: str = "main",
dbt_database: str = "dbt_bgcflow",
metabase_http: bool = True,
metabase_database: str = None,
dbt_excludes: list = None,
) -> str:
"""
Uploads a DuckDB database file generated by dbt to Metabase and syncs the dbt models.
Args:
project_name (str): The name of the project to upload to Metabase.
bgcflow_dir (str): The root directory of the BGCFlow project.
dbt_dir (str): The directory containing the dbt project to upload. If None, the directory is inferred from the BGCFlow project directory.
metabase_host (str): The URL of the Metabase server.
mb_username (str): The Metabase username. If None, the user will be prompted to enter their username.
mb_password (str): The Metabase password. If None, the user will be prompted to enter their password.
dbt_schema (str): The name of the dbt schema to use.
dbt_database (str): The name of the dbt database to use.
metabase_http (bool): Whether to use HTTP instead of HTTPS to connect to Metabase.
metabase_database (str): The name of the Metabase database to use. If None, the project name is used.
Returns:
str: The output of the dbt-metabase command as a string.
Raises:
AssertionError: If the dbt_dir or bgcflow_dir do not exist or are not directories.
subprocess.CalledProcessError: If the dbt-metabase command fails.
"""
# available dbt models in bgcflow_dbt-duckdb v0.2.1
dbt_model_dict = {
"query-bigslice": ["bigfam_hits", "bigfam_network"],
"bigscape": ["bigscape_cluster", "bigscape_network", "mibig_hits"],
"checkm": ["checkm"],
"seqfu": ["seqfu"],
"antismash": ["genomes"],
}
if dbt_excludes is None:
dbt_excludes = []
else:
dbt_excludes = list(dbt_excludes)
if dbt_dir is None:
report_dir = Path(bgcflow_dir) / f"data/processed/{project_name}"
click.echo(f" - Accessing BGCFlow report directory in: {report_dir}")
with open(report_dir / "metadata/dependency_versions.json", "r") as f:
dependency_version = json.load(f)
antismash_version = dependency_version["antismash"]
click.echo(f" - AntiSMASH version: {antismash_version}")
project_metadata_json = report_dir / "metadata/project_metadata.json"
click.echo(f" - Reading project metadata from: {project_metadata_json}")
with open(project_metadata_json, "r") as f:
project_metadata = json.load(f)
used_pipelines = list(project_metadata[project_name]["rule_used"].keys())
click.echo(f" - Used pipelines: {', '.join(used_pipelines)}")
for pipeline in dbt_model_dict.keys():
if pipeline not in used_pipelines:
dbt_excludes += dbt_model_dict[pipeline]
click.echo(f" - Excluding models for sync: {', '.join(dbt_excludes)}")
dbt_dir = report_dir / f"dbt/antiSMASH_{antismash_version}"
elif isinstance(dbt_dir, str):
click.echo(f" - Accessing dbt project directory in: {dbt_dir}")
click.echo(
f" - Using all models for sync: {', '.join(list(dbt_model_dict.values()))}"
)
dbt_dir = Path(dbt_dir)
# Get Metabase session token
if mb_username is None:
mb_username = click.prompt("Enter your Metabase username")
if mb_password is None:
mb_password = click.prompt("Enter your Metabase password", hide_input=True)
response, session_token = upload_dbt_to_metabase(
project_name, bgcflow_dir, dbt_dir, metabase_host, mb_username, mb_password
)
if response == 200:
if metabase_database is None:
metabase_database = project_name
sync_dbt_models_to_metabase(
dbt_dir,
dbt_database,
metabase_host,
metabase_database,
mb_username,
mb_password,
dbt_schema,
metabase_http,
dbt_excludes,
)
upload_dbt_to_metabase(project_name, bgcflow_dir, dbt_dir, metabase_host, mb_username, mb_password)
¶
Uploads a DuckDB database file generated by dbt to Metabase.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
project_name |
str |
The name of the project to upload to Metabase. |
required |
bgcflow_dir |
str |
The path to the BGCflow directory. |
required |
dbt_dir |
str |
The path to the dbt directory containing the DuckDB database file. |
required |
metabase_host |
str |
The URL of the Metabase server. |
required |
mb_username |
str |
The username to use for authentication with Metabase. |
required |
mb_password |
str |
The password to use for authentication with Metabase. |
required |
Returns:
Type | Description |
---|---|
str |
The HTTP status code of the request. |
Exceptions:
Type | Description |
---|---|
AssertionError |
If the DuckDB database file does not exist or is not a regular file. |
Source code in bgcflow/metabase.py
def upload_dbt_to_metabase(
project_name: str,
bgcflow_dir: str,
dbt_dir: str,
metabase_host: str,
mb_username: str,
mb_password: str,
) -> str:
"""
Uploads a DuckDB database file generated by dbt to Metabase.
Args:
project_name (str): The name of the project to upload to Metabase.
bgcflow_dir (str): The path to the BGCflow directory.
dbt_dir (str): The path to the dbt directory containing the DuckDB database file.
metabase_host (str): The URL of the Metabase server.
mb_username (str): The username to use for authentication with Metabase.
mb_password (str): The password to use for authentication with Metabase.
Returns:
str: The HTTP status code of the request.
Raises:
AssertionError: If the DuckDB database file does not exist or is not a regular file.
"""
duckdb_path = dbt_dir / "dbt_bgcflow.duckdb"
assert (
duckdb_path.is_file()
), f"Error: {duckdb_path} does not exist or is not a regular file"
session_response = requests.post(
f"{metabase_host}/api/session",
json={"username": mb_username, "password": mb_password},
)
session_token = session_response.json()["id"]
# Check if database already exists
database_response = requests.get(
f"{metabase_host}/api/database", headers={"X-Metabase-Session": session_token}
)
databases = database_response.json()
database_id = None
for k, v in databases.items():
if k == "data":
for db in v:
if db["name"] == project_name:
database_id = db["id"]
break
# Prompt user to continue or cancel upload
if database_id is not None:
user_input = input(
f" - WARNING: A database with the name '{project_name}' already exists in Metabase. Do you want to continue with the upload? (y/n) "
)
if user_input.lower() != "y":
click.echo(" - Database upload cancelled by user")
return
# Upload or update database in Metabase
if database_id is None:
database_response = requests.post(
f"{metabase_host}/api/database",
headers={"X-Metabase-Session": session_token},
json={
"engine": "duckdb",
"name": project_name,
"details": {"database_file": str(duckdb_path.resolve())},
},
)
if database_response.status_code == 200:
click.echo(f" - Database '{project_name}' uploaded successfully")
else:
click.echo(
f" - Error uploading database '{project_name}': {database_response.text}"
)
else:
database_response = requests.put(
f"{metabase_host}/api/database/{database_id}",
headers={"X-Metabase-Session": session_token},
json={
"engine": "duckdb",
"name": project_name,
"details": {"database_file": str(duckdb_path.resolve())},
},
)
if database_response.status_code == 200:
click.echo(f" - Database '{project_name}' updated successfully")
else:
click.echo(
f" - Error updating database '{project_name}': {database_response.text}"
)
return database_response.status_code, session_token
mkdocs
¶
Dict2Class
¶
A class that converts a dictionary to an object with attributes.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
my_dict |
dict |
The dictionary to convert to an object. |
required |
Methods
print_references():
Returns a formatted string of the references
attribute of the object.
Source code in bgcflow/mkdocs.py
class Dict2Class(object):
"""
A class that converts a dictionary to an object with attributes.
Args:
my_dict (dict): The dictionary to convert to an object.
Attributes:
All keys in the dictionary are converted to attributes of the object.
Methods:
print_references():
Returns a formatted string of the `references` attribute of the object.
"""
def __init__(self, my_dict):
"""
Initializes the object with attributes from the dictionary.
Args:
my_dict (dict): The dictionary to convert to an object.
"""
for key in my_dict:
setattr(self, key, my_dict[key])
def print_references(self):
"""
Returns a formatted string of the `references` attribute of the object.
Returns:
str: A formatted string of the `references` attribute of the object.
"""
text = ""
for r in self.references:
text = "\n".join([text, f"- {r}"])
return text
__init__(self, my_dict)
special
¶
Initializes the object with attributes from the dictionary.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
my_dict |
dict |
The dictionary to convert to an object. |
required |
Source code in bgcflow/mkdocs.py
def __init__(self, my_dict):
"""
Initializes the object with attributes from the dictionary.
Args:
my_dict (dict): The dictionary to convert to an object.
"""
for key in my_dict:
setattr(self, key, my_dict[key])
print_references(self)
¶
Returns a formatted string of the references
attribute of the object.
Returns:
Type | Description |
---|---|
str |
A formatted string of the |
Source code in bgcflow/mkdocs.py
def print_references(self):
"""
Returns a formatted string of the `references` attribute of the object.
Returns:
str: A formatted string of the `references` attribute of the object.
"""
text = ""
for r in self.references:
text = "\n".join([text, f"- {r}"])
return text
generate_mkdocs_report(bgcflow_dir, project_name, port=8001, fileserver='http://localhost:8002', ipynb=True)
¶
Generates an MkDocs report for a BGCFlow project.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bgcflow_dir |
str |
The path to the BGCFlow project directory. |
required |
project_name |
str |
The name of the BGCFlow project. |
required |
port |
int |
The port number to use for the MkDocs server, by default 8001. |
8001 |
fileserver |
str |
The URL of the file server to use, by default "http://localhost:8002". |
'http://localhost:8002' |
ipynb |
bool |
Whether to use IPython notebooks for the reports, by default True. |
True |
Source code in bgcflow/mkdocs.py
def generate_mkdocs_report(
bgcflow_dir: str,
project_name: str,
port: int = 8001,
fileserver: str = "http://localhost:8002",
ipynb: bool = True,
) -> None:
"""
Generates an MkDocs report for a BGCFlow project.
Args:
bgcflow_dir (str): The path to the BGCFlow project directory.
project_name (str: The name of the BGCFlow project.
port (int, optional): The port number to use for the MkDocs server, by default 8001.
fileserver (str, optional): The URL of the file server to use, by default "http://localhost:8002".
ipynb (bool, optional): Whether to use IPython notebooks for the reports, by default True.
"""
logging.info("Checking input folder..")
# is it a bgcflow data directory or just a result directory?
input_dir = Path(bgcflow_dir)
if (input_dir / "metadata/project_metadata.json").is_file():
report_dir = input_dir
else:
report_dir = input_dir / f"data/processed/{project_name}"
assert (
report_dir / "metadata/project_metadata.json"
).is_file(), "Unable to find BGCFlow results"
logging.debug(f"Found project_metadata. Using [{report_dir}] as report directory.")
# Get project metadata
p = load_project_metadata(report_dir / "metadata/project_metadata.json")
assert (
p.name == project_name
), "Project metadata does not match with user provided input!"
logging.debug(
f"Project [{p.name}] was analysed using BGCFlow version {p.bgcflow_version}"
)
# available reports, check all output files
logging.debug(f"Available reports: {list(p.rule_used.keys())}")
df_results = pd.DataFrame.from_dict(p.rule_used).T
# check available reports
logging.info("Preparing mkdocs config...")
if ipynb:
extension = "ipynb"
else:
extension = "md"
report_category_containers = {}
for r, v in p.rule_used.items():
jupyter_template = report_dir / f"docs/{r}.{extension}"
# logging.debug(jupyter_template.is_file()) # TO DO ASSERT IPYNB FILES, THEY SHOULD BE IN THE DOCS
logging.debug(f"Adding report [{r} : {jupyter_template.name}]")
report_category = v["category"]
if report_category not in report_category_containers.keys():
report_category_containers[report_category] = []
report_category_containers[report_category].append({r: jupyter_template.name})
for k, v in report_category_containers.items():
mkdocs_template["nav"].append({k: v})
# write mkdocs template
mkdocs_yml = report_dir / "mkdocs.yml"
logging.info(f"Generating mkdocs config at: {mkdocs_yml}")
write_mkdocs_file(mkdocs_template, mkdocs_yml, "yaml")
# Generate index.md
docs_dir = report_dir / "docs"
docs_dir.mkdir(exist_ok=True, parents=True)
mkdocs_index = docs_dir / "index.md"
logging.info(f"Generating homepage at: {mkdocs_index}")
df_results.loc[:, "BGCFlow_rules"] = df_results.index
df_results = df_results.loc[:, ["BGCFlow_rules", "description"]].reset_index(
drop=True
)
df_results.loc[:, "BGCFlow_rules"] = [
f"[{i}]({i}/)" + "{.md-button}" for i in df_results.loc[:, "BGCFlow_rules"]
]
data = {
"p_name": p.name,
"p_description": p.description,
"p_sample_size": p.sample_size,
"p_references": p.references,
"rule_table": df_results.to_markdown(index=False),
}
j2_template = Template(index_template)
write_mkdocs_file(j2_template.render(data), mkdocs_index, "write")
# generate main.py macros
mkdocs_py = report_dir / "main.py"
logging.info(f"Generating python macros at: {mkdocs_py}")
j2_template = Template(macros_template)
write_mkdocs_file(
j2_template.render({"file_server": fileserver}), mkdocs_py, "write"
)
# generate custom javascripts
# script_dir = docs_dir / "scripts"
# script_dir.mkdir(parents=True, exist_ok=True)
# logging.info(f"Generating custom site javascripts at: {script_dir / 'site.js'}")
# with open(script_dir / 'site.js', "w") as f:
# f.write(script_js)
# extend main html
override_dir = report_dir / "overrides"
override_dir.mkdir(exist_ok=True, parents=True)
logging.info(f"Extends main html: {override_dir / 'main.html'}")
with open(override_dir / "main.html", "w") as f:
f.write(main_html)
# generate assets
asset_path = docs_dir / "assets/bgcflow"
asset_path.mkdir(exist_ok=True, parents=True)
logging.info("Generating assets...")
logo_path = asset_path / "BGCFlow_logo.svg"
shutil.copy(Path(__file__).parent / "outputs/svg/BGCFlow_logo.svg", logo_path)
# generate symlink
# for r in ['antismash', 'bigscape']:
# target_path_raw = report_dir / r
# for target_path in target_path_raw.glob("*"):
# if any(target_path.name.startswith(keywords) for keywords in ['result', '6']):
# if target_path.is_dir():
# symlink_path = asset_path / r
# if symlink_path.is_symlink():
# symlink_path.unlink()
# symlink_path.symlink_to(target_path.resolve())
# Running fileserver
if fileserver == "http://localhost:8002":
fs = subprocess.Popen(
[
"python",
"-m",
"http.server",
"--directory",
report_dir,
fileserver.split(":")[-1],
],
stderr=subprocess.DEVNULL,
)
fs_run_by_bgcflow = True
logging.info(f"Running http file-server. Job id: {fs.pid}")
else:
fs_run_by_bgcflow = False
# dumping file server location
with open("bgcflow_wrapper.log", "w") as f:
log_port = {"report_server": port, "file_server": fileserver}
if fs_run_by_bgcflow:
log_port["pid"] = fs.pid
json.dump(log_port, f, indent=2)
try:
signal.signal(signal.SIGINT, signal_handler)
subprocess.call(
f"(cd {str(report_dir)} && mkdocs serve -a localhost:{port})", shell=True
)
if fs_run_by_bgcflow:
fs.kill()
# asset_path.rmdir()
except subprocess.CalledProcessError:
if fs_run_by_bgcflow:
fs.kill()
# asset_path.rmdir()
return
load_project_metadata(path_to_metadata)
¶
Loads project metadata from a JSON file and returns it as an object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path_to_metadata |
str or Path |
The path to the JSON file containing the project metadata. |
required |
Returns:
Type | Description |
---|---|
Dict2Class |
An object representing the project metadata. |
Source code in bgcflow/mkdocs.py
def load_project_metadata(path_to_metadata):
"""
Loads project metadata from a JSON file and returns it as an object.
Args:
path_to_metadata (str or Path): The path to the JSON file containing the project metadata.
Returns:
Dict2Class: An object representing the project metadata.
"""
with open(path_to_metadata, "r") as f:
project_metadata = json.load(f)
p = list(project_metadata.values())[0]
p["name"] = [i for i in project_metadata.keys()][0]
p = Dict2Class(p)
return p
signal_handler(signal, frame)
¶
A signal handler function that prints a message and exits the program.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
signal |
int |
The signal number. |
required |
frame |
FrameType |
The current stack frame. |
required |
Source code in bgcflow/mkdocs.py
def signal_handler(signal, frame):
"""
A signal handler function that prints a message and exits the program.
Args:
signal (int): The signal number.
frame (FrameType): The current stack frame.
"""
print("\nThank you for using BGCFlow Report!")
# with open('bgcflow_wrapper.log', "r") as f:
# log_port = json.load(f)
# os.kill(log_port['pid'], signal.signal.SIGKILL)
sys.exit(0)
write_mkdocs_file(data_input, output_file, action)
¶
Writes data to a file in either YAML or plain text format.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data_input |
dict or str |
The data to write to the file. |
required |
output_file |
str or Path |
The path to the file to write. |
required |
action |
str |
The action to perform. Either "yaml" to write the data in YAML format, or "write" to write the data as plain text. |
required |
Source code in bgcflow/mkdocs.py
def write_mkdocs_file(data_input, output_file, action):
"""
Writes data to a file in either YAML or plain text format.
Args:
data_input (dict or str): The data to write to the file.
output_file (str or Path): The path to the file to write.
action (str): The action to perform. Either "yaml" to write the data in YAML format, or "write" to write the data as plain text.
"""
if output_file.exists():
overwrite = input(
f"WARNING: {output_file} already exists. Do you want to overwrite it? (y/n) "
)
if overwrite.lower() != "y":
print("Skipping file write.")
else:
# continue with writing the file
with open(output_file, "w", encoding="utf-8") as f:
if action == "yaml":
yaml.dump(data_input, f)
elif action == "write":
f.write(data_input)
else:
# continue with writing the file
with open(output_file, "w", encoding="utf-8") as f:
if action == "yaml":
yaml.dump(data_input, f)
elif action == "write":
f.write(data_input)
projects_util
¶
bgcflow_init(bgcflow_dir, global_config)
¶
Initialize BGCFlow configuration and display available projects.
Initializes BGCFlow configuration based on the provided directory and global configuration path. If the global configuration file exists, it lists the available projects. If not, generates a global configuration file from the template and provides instructions for a test run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bgcflow_dir |
str or pathlib.PosixPath |
The directory where the BGCFlow configuration is located. |
required |
global_config |
str or pathlib.PosixPath |
The path to the global configuration file. |
required |
Source code in bgcflow/projects_util.py
def bgcflow_init(bgcflow_dir, global_config):
"""
Initialize BGCFlow configuration and display available projects.
Initializes BGCFlow configuration based on the provided directory and global configuration path.
If the global configuration file exists, it lists the available projects.
If not, generates a global configuration file from the template and provides instructions for a test run.
Args:
bgcflow_dir (str or pathlib.PosixPath): The directory where the BGCFlow configuration is located.
global_config (str or pathlib.PosixPath): The path to the global configuration file.
"""
# check if global config available
if global_config.is_file():
# grab available projects
logging.debug(f"Found config file at: {global_config}")
with open(global_config, "r") as file:
config_yaml = yaml.safe_load(file)
project_names = [p for p in config_yaml["projects"]]
list_of_projects = {}
for p in project_names:
if "pep" in p.keys():
p["name"] = p.pop("pep")
if p["name"].endswith(".yaml"):
pep = peppy.Project(
str(bgcflow_dir / p["name"]), sample_table_index="genome_id"
)
name = pep.name
file_path = pep.config["sample_table"]
else:
name = p["name"]
file_path = p["samples"]
list_of_projects[name] = file_path
print("Available projects:")
for p in list_of_projects.keys():
print(f" - {p} : {file_path}")
else:
generate_global_config(bgcflow_dir, global_config)
print("\nDo a test run by: `bgcflow run -n`")
copy_final_output(**kwargs)
¶
Copy final project output files to a specified destination.
This function facilitates the copying of processed project output files to a designated destination. It can also preserve symbolic links during the copy process if specified.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
**kwargs |
dict |
Keyword argument for the function. |
{} |
Keyword arguments:
Name | Type | Description |
---|---|---|
bgcflow_dir |
str |
The directory where the BGCFlow configuration is located. |
project |
str |
The name of the project whose output should be copied. |
resolve_symlinks |
str |
Indicate whether to preserve symbolic links. Defaults to False. |
destination |
str |
The destination directory where the output should be copied. |
Source code in bgcflow/projects_util.py
def copy_final_output(**kwargs):
"""
Copy final project output files to a specified destination.
This function facilitates the copying of processed project output files to a designated destination. It can
also preserve symbolic links during the copy process if specified.
Args:
**kwargs (dict): Keyword argument for the function.
Keyword arguments:
bgcflow_dir (str): The directory where the BGCFlow configuration is located.
project (str): The name of the project whose output should be copied.
resolve_symlinks (str, optional): Indicate whether to preserve symbolic links. Defaults to False.
destination (str): The destination directory where the output should be copied.
"""
bgcflow_dir = Path(kwargs["bgcflow_dir"]).resolve()
project_output = bgcflow_dir / f"data/processed/{kwargs['project']}"
assert (
project_output.is_dir()
), f"ERROR: Cannot find project [{kwargs['project']}] results. Run `bgcflow init` to find available projects."
if "resolve_symlinks" in kwargs.keys():
assert kwargs["resolve_symlinks"] in [
"True",
"False",
], f'Invalid argument {kwargs["resolve_symlinks"]} in --resolve-symlinks. Choose between "True" or "False"'
if kwargs["resolve_symlinks"] == "True":
resolve_symlinks = "-L"
else:
resolve_symlinks = ""
exclude_copy = f"{str(project_output.stem)}/bigscape/*/cache"
command = [
"rsync",
"-avPhr",
resolve_symlinks,
"--exclude",
exclude_copy,
str(project_output),
kwargs["destination"],
]
logging.debug(f'Running command: {" ".join(command)}')
subprocess.call(command)
generate_global_config(bgcflow_dir, global_config)
¶
Generate a BGCFlow global configuration file from a template.
Copies the template configuration file to the specified global configuration path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bgcflow_dir |
str or pathlib.PosixPath |
The directory where the BGCFlow configuration is located. |
required |
global_config |
str or pathlib.PosixPath |
The path to the global configuration file to be generated. |
required |
Source code in bgcflow/projects_util.py
def generate_global_config(bgcflow_dir, global_config):
"""
Generate a BGCFlow global configuration file from a template.
Copies the template configuration file to the specified global configuration path.
Args:
bgcflow_dir (str or pathlib.PosixPath): The directory where the BGCFlow configuration is located.
global_config (str or pathlib.PosixPath): The path to the global configuration file to be generated.
"""
logging.info(f"Generating config file from template at: {global_config}")
template_config = bgcflow_dir / ".examples/_config_example.yaml"
assert (
template_config.is_file()
), "Cannot find template file. Are you using BGCFlow version >= 0.4.1?"
shutil.copy(template_config, global_config)
# scan for example projects
def copy_project_example(project_type):
"""
Scan global config for example projects and (sub projects) and copy them to the config directory.
"""
with open(global_config, "r") as file:
config_yaml = yaml.safe_load(file)
example_projects = [
Path(p["pep"])
for p in config_yaml[project_type]
if "pep" in p.keys() and p["pep"].endswith(".yaml")
]
for example_project in example_projects:
example_project_dir = (
bgcflow_dir / ".examples" / example_project.parent.name
)
target_dir = bgcflow_dir / "config" / example_project_dir.name
if str(example_project).startswith(".examples"):
logging.warning(
f"\n - WARNING: You are using BGCFlow version <= 0.7.1. In the global config file (`{global_config}`), please change the location of your `{example_project}` to `config/{example_project.parent.name}/{example_project.name}`."
)
shutil.copytree(example_project_dir, target_dir)
for project_type in ["projects", "bgc_projects"]:
copy_project_example(project_type)
generate_project(bgcflow_dir, project_name, pep_version='2.1.0', use_project_rules=False, samples_csv=False, prokka_db=False, gtdb_tax=False, description=False)
¶
Generate a PEP project configuration in BGCFlow.
This function creates a configuration file for a Project Enhanced Pipelines (PEP) project within the BGCFlow framework. It allows you to define various aspects of the project, such as its name, version, description, sample data, custom annotations, and more.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bgcflow_dir |
str or pathlib.PosixPath |
The directory where the BGCFlow configuration is located. |
required |
project_name |
str |
The name of the project. |
required |
pep_version |
str |
The version of the PEP specification. Defaults to "2.1.0". |
'2.1.0' |
use_project_rules |
bool |
Flag indicating whether to use project-specific rules. Defaults to False. |
False |
samples_csv |
pd.core.frame.DataFrame or str |
Sample data in Pandas DataFrame or path to a CSV file. Defaults to False. |
False |
prokka_db |
str |
Path to a custom Prokka annotation file. Defaults to False. |
False |
gtdb_tax |
str |
Path to a custom GTDB taxonomy file. Defaults to False. |
False |
description |
str |
Description for the project. Defaults to False. |
False |
Source code in bgcflow/projects_util.py
def generate_project(
bgcflow_dir,
project_name,
pep_version="2.1.0",
use_project_rules=False,
samples_csv=False,
prokka_db=False,
gtdb_tax=False,
description=False,
):
"""
Generate a PEP project configuration in BGCFlow.
This function creates a configuration file for a Project Enhanced Pipelines (PEP)
project within the BGCFlow framework. It allows you to define various aspects of
the project, such as its name, version, description, sample data, custom annotations,
and more.
Args:
bgcflow_dir (str or pathlib.PosixPath): The directory where the BGCFlow configuration is located.
project_name (str): The name of the project.
pep_version (str, optional): The version of the PEP specification. Defaults to "2.1.0".
use_project_rules (bool, optional): Flag indicating whether to use project-specific rules. Defaults to False.
samples_csv (pd.core.frame.DataFrame or str, optional): Sample data in Pandas DataFrame or path to a CSV file. Defaults to False.
prokka_db (str, optional): Path to a custom Prokka annotation file. Defaults to False.
gtdb_tax (str, optional): Path to a custom GTDB taxonomy file. Defaults to False.
description (str, optional): Description for the project. Defaults to False.
"""
# Ensure bgcflow_dir is a pathlib.PosixPath
if not isinstance(bgcflow_dir, Path):
bgcflow_dir = Path(bgcflow_dir)
# Define paths and template dictionary
global_config = bgcflow_dir / "config/config.yaml"
template_dict = {
"name": project_name,
"pep_version": pep_version,
"description": "<TO DO: give a description to your project>",
"sample_table": "samples.csv",
"prokka-db": "OPTIONAL: relative path to your `prokka-db.csv`",
"gtdb-tax": "OPTIONAL: relative path to your `gtdbtk.bac120.summary.tsv`",
}
# Update template_dict with project rules if enabled
if use_project_rules:
with open(bgcflow_dir / "workflow/rules.yaml", "r") as file:
available_rules = yaml.safe_load(file)
available_rules = {rule: "FALSE" for rule in available_rules.keys()}
template_dict["rules"] = available_rules
# Create project directory
project_dir = bgcflow_dir / f"config/{project_name}"
project_dir.mkdir(parents=True, exist_ok=True)
# Handle samples_csv input
if isinstance(samples_csv, pd.core.frame.DataFrame):
logging.debug("Generating samples file from Pandas DataFrame")
assert samples_csv.index.name == "genome_id"
assert (
samples_csv.columns
== [
"source",
"organism",
"genus",
"species",
"strain",
"closest_placement_reference",
]
).all
samples_csv.to_csv(project_dir / "samples.csv")
elif isinstance(samples_csv, str):
logging.debug(f"Copying samples file from {samples_csv}")
samples_csv = Path(samples_csv)
assert samples_csv.is_file()
shutil.copy(samples_csv, project_dir / "samples.csv")
# Handle prokka_db input
if isinstance(prokka_db, str):
logging.debug(f"Copying custom annotation file from {prokka_db}")
prokka_db = Path(prokka_db)
assert prokka_db.is_file()
shutil.copy(prokka_db, project_dir / "prokka-db.csv")
template_dict["prokka-db"] = "prokka-db.csv"
# Handle gtdb_tax input
if isinstance(gtdb_tax, str):
logging.debug(f"Copying custom taxonomy from {gtdb_tax}")
gtdb_tax = Path(gtdb_tax)
assert gtdb_tax.is_file()
shutil.copy(gtdb_tax, project_dir / "gtdbtk.bac120.summary.tsv")
template_dict["gtdb-tax"] = "gtdbtk.bac120.summary.tsv"
# Update template_dict with project description
if isinstance(description, str):
logging.debug("Writing project description...")
template_dict["description"] = description
# Generate project configuration file
logging.info(f"Project config file generated in: {project_dir}")
with open(project_dir / "project_config.yaml", "w") as file:
yaml.dump(template_dict, file, sort_keys=False)
# Initialize global config if not present
if not global_config.is_file():
bgcflow_init(bgcflow_dir, global_config)
# Update global config.yaml with project information
with open(bgcflow_dir / "config/config.yaml", "r") as file:
logging.debug("Updating global config.yaml")
main_config = yaml.safe_load(file)
# Rename 'pep' to 'name' for consistency
for item in main_config["projects"]:
if "pep" in item:
item["name"] = item.pop("pep")
# Rename 'pipelines' to 'rules'
if "pipelines" in main_config.keys():
main_config["rules"] = main_config.pop("pipelines")
project_names = [p["name"] for p in main_config["projects"]]
assert (
project_name not in project_names
), f"Project name: '{project_name}' already exists!\nUse a different name or edit the files in: {project_dir}"
assert (
str(project_dir / "project_config.yaml") not in project_names
), f"Project name: '{project_name}' already exists!\nUse a different name or edit the files in: {project_dir}"
main_config["projects"].append(
{"name": str(project_dir / "project_config.yaml")}
)
# Update and save global config
with open(bgcflow_dir / "config/config.yaml", "w") as file:
yaml.dump(main_config, file, sort_keys=False)
projects_util(**kwargs)
¶
Utility function for managing BGCflow projects.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
**kwargs |
dict |
Keyword arguments for the function. |
{} |
Keyword arguments:
Name | Type | Description |
---|---|---|
bgcflow_dir |
str |
Path to the BGCflow directory. |
project |
str |
Name of the BGCflow project to generate. |
use_project_pipeline |
bool |
Whether to use the project-specific pipeline rules. |
prokka_db |
str |
Path to the Prokka database. |
gtdb_tax |
str |
Path to the GTDB taxonomy file. |
samples_csv |
str |
Path to the samples CSV file. |
Source code in bgcflow/projects_util.py
def projects_util(**kwargs):
"""
Utility function for managing BGCflow projects.
Args:
**kwargs (dict): Keyword arguments for the function.
Keyword Arguments:
bgcflow_dir (str): Path to the BGCflow directory.
project (str): Name of the BGCflow project to generate.
use_project_pipeline (bool): Whether to use the project-specific pipeline rules.
prokka_db (str): Path to the Prokka database.
gtdb_tax (str): Path to the GTDB taxonomy file.
samples_csv (str): Path to the samples CSV file.
"""
# pep_version = "2.1.0"
bgcflow_dir = Path(kwargs["bgcflow_dir"]).resolve()
config_dir = bgcflow_dir / "config"
config_dir.mkdir(parents=True, exist_ok=True)
global_config = config_dir / "config.yaml"
if type(kwargs["project"]) == str:
# project_name = kwargs["project"]
generate_project(
bgcflow_dir,
kwargs["project"],
use_project_rules=kwargs["use_project_pipeline"],
prokka_db=kwargs["prokka_db"],
gtdb_tax=kwargs["gtdb_tax"],
samples_csv=kwargs["samples_csv"],
)
else:
bgcflow_init(bgcflow_dir, global_config)